X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin.py;h=53d9621594965ad4576940b7b88be6e7b300af58;hb=1b6c7932a8feb419aae73a00a6784d7c110decdc;hp=5f830ea2b50079218d54dfd34f6ed348edbfc061;hpb=7f4d577d6bc243f53a044d92ca9367b3f1fa170e;p=vpp.git diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 5f830ea2b50..53d96215949 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ACL plugin Test Case HLD: """ @@ -11,9 +11,16 @@ from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner +from framework import tag_fixme_vpp_workers from util import Host, ppp +from ipaddress import IPv4Network, IPv6Network +from vpp_lo_interface import VppLoInterface +from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist +from vpp_ip import INVALID_INDEX + +@tag_fixme_vpp_workers class TestACLplugin(VppTestCase): """ ACL plugin Test Case """ @@ -42,6 +49,7 @@ class TestACLplugin(VppTestCase): # port ranges PORTS_ALL = -1 PORTS_RANGE = 0 + PORTS_RANGE_2 = 1 udp_sport_from = 10 udp_sport_to = udp_sport_from + 5 udp_dport_from = 20000 @@ -51,11 +59,27 @@ class TestACLplugin(VppTestCase): tcp_dport_from = 40000 tcp_dport_to = tcp_dport_from + 5000 + udp_sport_from_2 = 90 + udp_sport_to_2 = udp_sport_from_2 + 5 + udp_dport_from_2 = 30000 + udp_dport_to_2 = udp_dport_from_2 + 5000 + tcp_sport_from_2 = 130 + tcp_sport_to_2 = tcp_sport_from_2 + 5 + tcp_dport_from_2 = 20000 + tcp_dport_to_2 = tcp_dport_from_2 + 5000 + icmp4_type = 8 # echo request icmp4_code = 3 icmp6_type = 128 # echo request icmp6_code = 3 + icmp4_type_2 = 8 + icmp4_code_from_2 = 5 + icmp4_code_to_2 = 20 + icmp6_type_2 = 128 + icmp6_code_from_2 = 8 + icmp6_code_to_2 = 42 + # Test variables bd_id = 1 @@ -68,8 +92,6 @@ class TestACLplugin(VppTestCase): """ super(TestACLplugin, cls).setUpClass() - random.seed() - try: # Create 2 pg interfaces cls.create_pg_interfaces(range(2)) @@ -86,8 +108,8 @@ class TestACLplugin(VppTestCase): cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1) for pg_if in cls.pg_interfaces: - cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index, - bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id) # Set up all interfaces for i in cls.pg_interfaces: @@ -105,11 +127,32 @@ class TestACLplugin(VppTestCase): # warm-up the mac address tables # self.warmup_test() + count = 16 + start = 0 + n_int = len(cls.pg_interfaces) + macs_per_if = count // n_int + i = -1 + for pg_if in cls.pg_interfaces: + i += 1 + start_nr = macs_per_if * i + start + end_nr = count + start if i == (n_int - 1) \ + else macs_per_if * (i + 1) + start + hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index] + for j in range(int(start_nr), int(end_nr)): + host = Host( + "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), + "172.17.1%02x.%u" % (pg_if.sw_if_index, j), + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + hosts.append(host) except Exception: super(TestACLplugin, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestACLplugin, cls).tearDownClass() + def setUp(self): super(TestACLplugin, self).setUp() self.reset_packet_infos() @@ -119,98 +162,67 @@ class TestACLplugin(VppTestCase): Show various debug prints after each test. """ super(TestACLplugin, self).tearDown() - if not self.vpp_dead: - self.logger.info(self.vapi.ppcli("show l2fib verbose")) - self.logger.info(self.vapi.ppcli("show acl-plugin acl")) - self.logger.info(self.vapi.ppcli("show acl-plugin interface")) - self.logger.info(self.vapi.ppcli("show acl-plugin tables")) - self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" - % self.bd_id)) - - def create_hosts(self, count, start=0): - """ - Create required number of host MAC addresses and distribute them among - interfaces. Create host IPv4 address for every host MAC address. - :param int count: Number of hosts to create MAC/IPv4 addresses for. - :param int start: Number to start numbering from. - """ - n_int = len(self.pg_interfaces) - macs_per_if = count / n_int - i = -1 - for pg_if in self.pg_interfaces: - i += 1 - start_nr = macs_per_if * i + start - end_nr = count + start if i == (n_int - 1) \ - else macs_per_if * (i + 1) + start - hosts = self.hosts_by_pg_idx[pg_if.sw_if_index] - for j in range(start_nr, end_nr): - host = Host( - "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), - "172.17.1%02x.%u" % (pg_if.sw_if_index, j), - "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) - hosts.append(host) + def show_commands_at_teardown(self): + cli = "show vlib graph l2-input-feat-arc" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-input-feat-arc-end" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-output-feat-arc" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-output-feat-arc-end" + self.logger.info(self.vapi.ppcli(cli)) + self.logger.info(self.vapi.ppcli("show l2fib verbose")) + self.logger.info(self.vapi.ppcli("show acl-plugin acl")) + self.logger.info(self.vapi.ppcli("show acl-plugin interface")) + self.logger.info(self.vapi.ppcli("show acl-plugin tables")) + self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" + % self.bd_id)) def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, - s_prefix=0, s_ip='\x00\x00\x00\x00', - d_prefix=0, d_ip='\x00\x00\x00\x00'): - if proto == -1: - return - if ports == self.PORTS_ALL: - sport_from = 0 - dport_from = 0 - sport_to = 65535 if proto != 1 and proto != 58 else 255 - dport_to = sport_to - elif ports == self.PORTS_RANGE: - if proto == 1: - sport_from = self.icmp4_type - sport_to = self.icmp4_type - dport_from = self.icmp4_code - dport_to = self.icmp4_code - elif proto == 58: - sport_from = self.icmp6_type - sport_to = self.icmp6_type - dport_from = self.icmp6_code - dport_to = self.icmp6_code - elif proto == self.proto[self.IP][self.TCP]: - sport_from = self.tcp_sport_from - sport_to = self.tcp_sport_to - dport_from = self.tcp_dport_from - dport_to = self.tcp_dport_to - elif proto == self.proto[self.IP][self.UDP]: - sport_from = self.udp_sport_from - sport_to = self.udp_sport_to - dport_from = self.udp_dport_from - dport_to = self.udp_dport_to + s_prefix=0, s_ip=0, + d_prefix=0, d_ip=0): + if ip: + src_prefix = IPv6Network((s_ip, s_prefix)) + dst_prefix = IPv6Network((d_ip, d_prefix)) else: - sport_from = ports - sport_to = ports - dport_from = ports - dport_to = ports - - rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto, - 'srcport_or_icmptype_first': sport_from, - 'srcport_or_icmptype_last': sport_to, - 'src_ip_prefix_len': s_prefix, - 'src_ip_addr': s_ip, - 'dstport_or_icmpcode_first': dport_from, - 'dstport_or_icmpcode_last': dport_to, - 'dst_ip_prefix_len': d_prefix, - 'dst_ip_addr': d_ip}) - return rule - - def apply_rules(self, rules, tag=''): - reply = self.api_acl_add_replace(acl_index=4294967295, r=rules, - count=len(rules), - tag=tag) - self.logger.info("Dumped ACL: " + str( - self.api_acl_dump(reply.acl_index))) + src_prefix = IPv4Network((s_ip, s_prefix)) + dst_prefix = IPv4Network((d_ip, d_prefix)) + return AclRule(is_permit=permit_deny, ports=ports, proto=proto, + src_prefix=src_prefix, dst_prefix=dst_prefix) + + def apply_rules(self, rules, tag=None): + acl = VppAcl(self, rules, tag=tag) + acl.add_vpp_config() + self.logger.info("Dumped ACL: " + str(acl.dump())) # Apply a ACL on the interface as inbound for i in self.pg_interfaces: - self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index, - count=1, n_input=1, - acls=[reply.acl_index]) - return + acl_if = VppAclInterface( + self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]) + acl_if.add_vpp_config() + return acl.acl_index + + def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX): + acl = VppAcl(self, rules, tag=tag) + acl.add_vpp_config() + self.logger.info("Dumped ACL: " + str(acl.dump())) + # Apply a ACL on the interface as inbound + acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, + acls=[acl]) + return acl.acl_index + + def etype_whitelist(self, whitelist, n_input, add=True): + # Apply whitelists on all the interfaces + if add: + self._wl = [] + for i in self.pg_interfaces: + self._wl.append(VppEtypeWhitelist( + self, sw_if_index=i.sw_if_index, whitelist=whitelist, + n_input=n_input).add_vpp_config()) + else: + if hasattr(self, "_wl"): + for wl in self._wl: + wl.remove_vpp_config() def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] @@ -233,7 +245,8 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0, fragments=False, pkt_raw=True): + proto=-1, ports=0, fragments=False, + pkt_raw=True, etype=-1): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -250,7 +263,7 @@ class TestACLplugin(VppTestCase): dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index] n_int = len(dst_hosts) * len(src_hosts) for i in range(0, n_int): - dst_host = dst_hosts[i / len(src_hosts)] + dst_host = dst_hosts[int(i / len(src_hosts))] src_host = src_hosts[i % len(src_hosts)] pkt_info = self.create_packet_info(src_if, dst_if) if ipv6 == 1: @@ -265,6 +278,10 @@ class TestACLplugin(VppTestCase): pkt_info.proto = proto payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) + if etype > 0: + p = Ether(dst=dst_host.mac, + src=src_host.mac, + type=etype) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) if fragments: @@ -293,7 +310,8 @@ class TestACLplugin(VppTestCase): pkts.append(p) return pkts - def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0): + def verify_capture(self, pg_if, capture, + traffic_type=0, ip_type=0, etype=-1): """ Verify captured input packet stream for defined interface. @@ -306,14 +324,20 @@ class TestACLplugin(VppTestCase): last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: + if etype > 0: + if packet[Ether].type != etype: + self.logger.error(ppp("Unexpected ethertype in packet:", + packet)) + else: + continue try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: payload_info = self.payload_to_info( - packet[ICMPv6EchoRequest].data) + packet[ICMPv6EchoRequest], 'data') payload = packet[ICMPv6EchoRequest] else: - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) payload = packet[self.proto_map[payload_info.proto]] except: self.logger.error(ppp("Unexpected or invalid packet " @@ -394,7 +418,7 @@ class TestACLplugin(VppTestCase): self.pg_start() def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, - frags=False, pkt_raw=True): + frags=False, pkt_raw=True, etype=-1): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 @@ -402,7 +426,7 @@ class TestACLplugin(VppTestCase): if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags, pkt_raw) + frags, pkt_raw, etype) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -410,6 +434,7 @@ class TestACLplugin(VppTestCase): # Enable packet capture and start packet sendingself.IPV self.pg_enable_capture(self.pg_interfaces) self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) # Verify # Verify outgoing packet streams per packet-generator interface @@ -419,23 +444,27 @@ class TestACLplugin(VppTestCase): capture = dst_if.get_capture(pkts_cnt) self.logger.info("Verifying capture on interface %s" % dst_if.name) - self.verify_capture(dst_if, capture, traffic_type, ip_type) + self.verify_capture(dst_if, capture, + traffic_type, ip_type, etype) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0, frags=False): + ports=0, frags=False, etype=-1): # Test + pkts_cnt = 0 self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags) + frags, True, etype) if len(pkts) > 0: i.add_stream(pkts) + pkts_cnt += len(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) # Verify # Verify outgoing packet streams per packet-generator interface @@ -447,42 +476,9 @@ class TestACLplugin(VppTestCase): capture = dst_if.get_capture(0) self.assertEqual(len(capture), 0) - def api_acl_add_replace(self, acl_index, r, count, tag='', - expected_retval=0): - """Add/replace an ACL - - :param int acl_index: ACL index to replace, - 4294967295 to create new ACL. - :param acl_rule r: ACL rules array. - :param str tag: symbolic tag (description) for this ACL. - :param int count: number of rules. - """ - return self.vapi.api(self.vapi.papi.acl_add_replace, - {'acl_index': acl_index, - 'r': r, - 'count': count, - 'tag': tag}, - expected_retval=expected_retval) - - def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls, - expected_retval=0): - return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list, - {'sw_if_index': sw_if_index, - 'count': count, - 'n_input': n_input, - 'acls': acls}, - expected_retval=expected_retval) - - def api_acl_dump(self, acl_index, expected_retval=0): - return self.vapi.api(self.vapi.papi.acl_dump, - {'acl_index': acl_index}, - expected_retval=expected_retval) - def test_0000_warmup_test(self): """ ACL plugin version check; learn MACs """ - self.create_hosts(16) - self.run_traffic_no_check() reply = self.vapi.papi.acl_plugin_get_version() self.assertEqual(reply.major, 1) self.logger.info("Working with ACL plugin version: %d.%d" % ( @@ -491,27 +487,19 @@ class TestACLplugin(VppTestCase): # self.assertEqual(reply.minor, 0) def test_0001_acl_create(self): - """ ACL create test + """ ACL create/delete test """ self.logger.info("ACLP_TEST_START_0001") - # Add an ACL - r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17, - 'srcport_or_icmptype_first': 1234, - 'srcport_or_icmptype_last': 1235, - 'src_ip_prefix_len': 0, - 'src_ip_addr': '\x00\x00\x00\x00', - 'dstport_or_icmpcode_first': 1234, - 'dstport_or_icmpcode_last': 1234, - 'dst_ip_addr': '\x00\x00\x00\x00', - 'dst_ip_prefix_len': 0}] + # Create a permit-1234 ACL + r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)] # Test 1: add a new ACL - reply = self.api_acl_add_replace(acl_index=4294967295, r=r, - count=len(r), tag="permit 1234") - self.assertEqual(reply.retval, 0) + first_acl = VppAcl(self, rules=r, tag="permit 1234") + first_acl.add_vpp_config() + self.assertTrue(first_acl.query_vpp_config()) # The very first ACL gets #0 - self.assertEqual(reply.acl_index, 0) - rr = self.api_acl_dump(reply.acl_index) + self.assertEqual(first_acl.acl_index, 0) + rr = first_acl.dump() self.logger.info("Dumped ACL: " + str(rr)) self.assertEqual(len(rr), 1) # We should have the same number of ACL entries as we had asked @@ -520,43 +508,49 @@ class TestACLplugin(VppTestCase): # are different types, we need to iterate over rules and keys to get # to basic values. for i_rule in range(0, len(r) - 1): - for rule_key in r[i_rule]: + encoded_rule = r[i_rule].encode() + for rule_key in encoded_rule: self.assertEqual(rr[0].r[i_rule][rule_key], - r[i_rule][rule_key]) - - # Add a deny-1234 ACL - r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17, - 'srcport_or_icmptype_first': 1234, - 'srcport_or_icmptype_last': 1235, - 'src_ip_prefix_len': 0, - 'src_ip_addr': '\x00\x00\x00\x00', - 'dstport_or_icmpcode_first': 1234, - 'dstport_or_icmpcode_last': 1234, - 'dst_ip_addr': '\x00\x00\x00\x00', - 'dst_ip_prefix_len': 0}, - {'is_permit': 1, 'is_ipv6': 0, 'proto': 17, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 0, - 'src_ip_prefix_len': 0, - 'src_ip_addr': '\x00\x00\x00\x00', - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 0, - 'dst_ip_addr': '\x00\x00\x00\x00', - 'dst_ip_prefix_len': 0}) - - reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny, - count=len(r_deny), - tag="deny 1234;permit all") - self.assertEqual(reply.retval, 0) + encoded_rule[rule_key]) + + # Create a deny-1234 ACL + r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235), + AclRule(is_permit=1, proto=17, ports=0)] + second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all") + second_acl.add_vpp_config() + self.assertTrue(second_acl.query_vpp_config()) # The second ACL gets #1 - self.assertEqual(reply.acl_index, 1) + self.assertEqual(second_acl.acl_index, 1) # Test 2: try to modify a nonexistent ACL - reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r), - tag="FFFF:FFFF", expected_retval=-1) - self.assertEqual(reply.retval, -1) - # The ACL number should pass through - self.assertEqual(reply.acl_index, 432) + invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF") + reply = invalid_acl.add_vpp_config(expect_error=True) + + # apply an ACL on an interface inbound, try to delete ACL, must fail + acl_if_list = VppAclInterface( + self, sw_if_index=self.pg0.sw_if_index, n_input=1, + acls=[first_acl]) + acl_if_list.add_vpp_config() + first_acl.remove_vpp_config(expect_error=True) + # Unapply an ACL and then try to delete it - must be ok + acl_if_list.remove_vpp_config() + first_acl.remove_vpp_config() + + # apply an ACL on an interface inbound, try to delete ACL, must fail + acl_if_list = VppAclInterface( + self, sw_if_index=self.pg0.sw_if_index, n_input=0, + acls=[second_acl]) + acl_if_list.add_vpp_config() + second_acl.remove_vpp_config(expect_error=True) + # Unapply an ACL and then try to delete it - must be ok + acl_if_list.remove_vpp_config() + second_acl.remove_vpp_config() + + # try to apply a nonexistent ACL - must fail + acl_if_list = VppAclInterface( + self, sw_if_index=self.pg0.sw_if_index, n_input=0, + acls=[invalid_acl]) + acl_if_list.add_vpp_config(expect_error=True) self.logger.info("ACLP_TEST_FINISH_0001") @@ -567,15 +561,32 @@ class TestACLplugin(VppTestCase): rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.UDP])) + 0, self.proto[self.IP][self.UDP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.TCP])) + 0, self.proto[self.IP][self.TCP])) # Apply rules - self.apply_rules(rules, "permit per-flow") + acl_idx = self.apply_rules(rules, "permit per-flow") + + # enable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, -1) + + matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + self.logger.info("stat segment counters: %s" % repr(matches)) + cli = "show acl-plugin acl" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show acl-plugin tables" + self.logger.info(self.vapi.ppcli(cli)) + + total_hits = matches[0][0]['packets'] + matches[0][1]['packets'] + self.assertEqual(total_hits, 64) + + # disable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0) + self.logger.info("ACLP_TEST_FINISH_0002") def test_0003_acl_deny_apply(self): @@ -584,20 +595,34 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_START_0003") # Add a deny-flows ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, - self.PORTS_ALL, self.proto[self.IP][self.UDP])) + rules.append(self.create_rule( + self.IPV4, self.DENY, self.PORTS_ALL, + self.proto[self.IP][self.UDP])) # Permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny per-flow;permit all") + acl_idx = self.apply_rules(rules, "deny per-flow;permit all") + + # enable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) + + matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + self.logger.info("stat segment counters: %s" % repr(matches)) + cli = "show acl-plugin acl" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show acl-plugin tables" + self.logger.info(self.vapi.ppcli(cli)) + self.assertEqual(matches[0][0]['packets'], 64) + # disable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0) self.logger.info("ACLP_TEST_FINISH_0003") - # self.assertEqual(1, 0) + # self.assertEqual(, 0) def test_0004_vpp624_permit_icmpv4(self): """ VPP_624 permit ICMPv4 @@ -689,7 +714,7 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -843,14 +868,13 @@ class TestACLplugin(VppTestCase): for i in range(len(r)): rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3])) - reply = self.api_acl_add_replace(acl_index=4294967295, r=rules, - count=len(rules)) - result = self.api_acl_dump(reply.acl_index) + acl = VppAcl(self, rules=rules) + acl.add_vpp_config() + result = acl.dump() i = 0 for drules in result: for dr in drules.r: - self.assertEqual(dr.is_ipv6, r[i][0]) self.assertEqual(dr.is_permit, r[i][1]) self.assertEqual(dr.proto, r[i][3]) @@ -888,7 +912,7 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0015") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, port, @@ -897,7 +921,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -910,7 +934,7 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0016") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, port, @@ -919,7 +943,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -932,7 +956,7 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0017") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, port, @@ -941,7 +965,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -950,11 +974,11 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0017") def test_0018_udp_permit_port_v6(self): - """ permit single UPPv6 + """ permit single UDPv6 """ self.logger.info("ACLP_TEST_START_0018") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, port, @@ -964,7 +988,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -977,7 +1001,7 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0019") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, port, @@ -991,7 +1015,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1004,7 +1028,7 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0020") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, port, @@ -1018,7 +1042,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1027,11 +1051,12 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0020") def test_0021_udp_deny_port_verify_fragment_deny(self): - """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment + blocked """ self.logger.info("ACLP_TEST_START_0021") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, port, @@ -1045,7 +1070,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1057,7 +1082,7 @@ class TestACLplugin(VppTestCase): """ VPP-687 zero length udp ipv4 packet""" self.logger.info("ACLP_TEST_START_0022") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, port, @@ -1067,7 +1092,7 @@ class TestACLplugin(VppTestCase): self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip4 " + str(port)) + self.apply_rules(rules, "permit empty udp ip4 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1092,7 +1117,7 @@ class TestACLplugin(VppTestCase): """ VPP-687 zero length udp ipv6 packet""" self.logger.info("ACLP_TEST_START_0023") - port = random.randint(0, 65535) + port = random.randint(16384, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, port, @@ -1101,7 +1126,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip6 "+str(port)) + self.apply_rules(rules, "permit empty udp ip6 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1123,5 +1148,291 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0023") + def test_0108_tcp_permit_v4(self): + """ permit TCPv4 + non-match range + """ + self.logger.info("ACLP_TEST_START_0108") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0108") + + def test_0109_tcp_permit_v6(self): + """ permit TCPv6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0109") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ip6 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0109") + + def test_0110_udp_permit_v4(self): + """ permit UDPv4 + non-match range + """ + self.logger.info("ACLP_TEST_START_0110") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0110") + + def test_0111_udp_permit_v6(self): + """ permit UDPv6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0111") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ip6 udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0111") + + def test_0112_tcp_deny(self): + """ deny TCPv4/v6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0112") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 tcp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0112") + + def test_0113_udp_deny(self): + """ deny UDPv4/v6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0113") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 udp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0113") + + def test_0300_tcp_permit_v4_etype_aaaa(self): + """ permit TCPv4, send 0xAAAA etype + """ + self.logger.info("ACLP_TEST_START_0300") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # Traffic should still pass also for an odd ethertype + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + self.logger.info("ACLP_TEST_FINISH_0300") + + def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked + """ + self.logger.info("ACLP_TEST_START_0305") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The oddball ethertype should be blocked + self.run_verify_negat_test(self.IP, self.IPV4, + self.proto[self.IP][self.TCP], + 0, False, 0xaaaa) + + # remove the whitelist + self.etype_whitelist([], 0, add=False) + + self.logger.info("ACLP_TEST_FINISH_0305") + + def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass + """ + self.logger.info("ACLP_TEST_START_0306") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The whitelisted traffic, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0x0bbb) + + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0, add=False) + + self.logger.info("ACLP_TEST_FINISH_0306") + + def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass + """ + self.logger.info("ACLP_TEST_START_0307") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0, add=False) + + # The whitelisted traffic, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + + self.logger.info("ACLP_TEST_FINISH_0306") + + def test_0315_del_intf(self): + """ apply an acl and delete the interface + """ + self.logger.info("ACLP_TEST_START_0315") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # create an interface + intf = [] + intf.append(VppLoInterface(self)) + + # Apply rules + self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index) + + # Remove the interface + intf[0].remove_vpp_config() + + self.logger.info("ACLP_TEST_FINISH_0315") + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)