acl: API cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin.py
index 7fc72d6..d5e195f 100644 (file)
-from __future__ import print_function
-import unittest, sys, time, threading, struct, logging, os
-import vpp_papi
-# import vpp_papi_plugins.acl
-from ipaddress import *
-papi_event = threading.Event()
-print(vpp_papi.vpe.VL_API_SW_INTERFACE_SET_FLAGS)
-def papi_event_handler(result):
-    if result.vl_msg_id == vpp_papi.vpe.VL_API_SW_INTERFACE_SET_FLAGS:
-        return
-    if result.vl_msg_id == vpp_papi.vpe.VL_API_VNET_INTERFACE_COUNTERS:
-        print('Interface counters', result)
-        return
-    if result.vl_msg_id == vpp_papi.vpe.VL_API_VNET_IP6_FIB_COUNTERS:
-        print('IPv6 FIB counters', result)
-        papi_event.set()
-        return
-
-    print('Unknown message id:', result.vl_msg_id)
-
-import glob, subprocess
-class TestAclPlugin(unittest.TestCase):
+#!/usr/bin/env python3
+"""ACL plugin Test Case HLD:
+"""
+
+import unittest
+import random
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, TCP, UDP, ICMP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6ExtHdrFragment
+from framework import VppTestCase, VppTestRunner
+from util import Host, ppp
+from ipaddress import IPv4Network, IPv6Network
+
+from vpp_lo_interface import VppLoInterface
+from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
+from vpp_ip import INVALID_INDEX
+
+
+class TestACLplugin(VppTestCase):
+    """ ACL plugin Test Case """
+
+    # traffic types
+    IP = 0
+    ICMP = 1
+
+    # IP version
+    IPRANDOM = -1
+    IPV4 = 0
+    IPV6 = 1
+
+    # rule types
+    DENY = 0
+    PERMIT = 1
+
+    # supported protocols
+    proto = [[6, 17], [1, 58]]
+    proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
+    ICMPv4 = 0
+    ICMPv6 = 1
+    TCP = 0
+    UDP = 1
+    PROTO_ALL = 0
+
+    # port ranges
+    PORTS_ALL = -1
+    PORTS_RANGE = 0
+    PORTS_RANGE_2 = 1
+    udp_sport_from = 10
+    udp_sport_to = udp_sport_from + 5
+    udp_dport_from = 20000
+    udp_dport_to = udp_dport_from + 5000
+    tcp_sport_from = 30
+    tcp_sport_to = tcp_sport_from + 5
+    tcp_dport_from = 40000
+    tcp_dport_to = tcp_dport_from + 5000
+
+    udp_sport_from_2 = 90
+    udp_sport_to_2 = udp_sport_from_2 + 5
+    udp_dport_from_2 = 30000
+    udp_dport_to_2 = udp_dport_from_2 + 5000
+    tcp_sport_from_2 = 130
+    tcp_sport_to_2 = tcp_sport_from_2 + 5
+    tcp_dport_from_2 = 20000
+    tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
+    icmp4_type = 8  # echo request
+    icmp4_code = 3
+    icmp6_type = 128  # echo request
+    icmp6_code = 3
+
+    icmp4_type_2 = 8
+    icmp4_code_from_2 = 5
+    icmp4_code_to_2 = 20
+    icmp6_type_2 = 128
+    icmp6_code_from_2 = 8
+    icmp6_code_to_2 = 42
+
+    # Test variables
+    bd_id = 1
+
     @classmethod
     def setUpClass(cls):
-        print("Setup")
+        """
+        Perform standard class setup (defined by class method setUpClass in
+        class VppTestCase) before running the test case, set test case related
+        variables and configure VPP.
+        """
+        super(TestACLplugin, cls).setUpClass()
+
+        try:
+            # Create 2 pg interfaces
+            cls.create_pg_interfaces(range(2))
+
+            # Packet flows mapping pg0 -> pg1, pg2 etc.
+            cls.flows = dict()
+            cls.flows[cls.pg0] = [cls.pg1]
+
+            # Packet sizes
+            cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
+
+            # Create BD with MAC learning and unknown unicast flooding disabled
+            # and put interfaces to this BD
+            cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
+                                           learn=1)
+            for pg_if in cls.pg_interfaces:
+                cls.vapi.sw_interface_set_l2_bridge(
+                    rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
+
+            # Set up all interfaces
+            for i in cls.pg_interfaces:
+                i.admin_up()
+
+            # Mapping between packet-generator index and lists of test hosts
+            cls.hosts_by_pg_idx = dict()
+            for pg_if in cls.pg_interfaces:
+                cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
+
+            # Create list of deleted hosts
+            cls.deleted_hosts_by_pg_idx = dict()
+            for pg_if in cls.pg_interfaces:
+                cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
+
+            # warm-up the mac address tables
+            # self.warmup_test()
+            count = 16
+            start = 0
+            n_int = len(cls.pg_interfaces)
+            macs_per_if = count // n_int
+            i = -1
+            for pg_if in cls.pg_interfaces:
+                i += 1
+                start_nr = macs_per_if * i + start
+                end_nr = count + start if i == (n_int - 1) \
+                    else macs_per_if * (i + 1) + start
+                hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
+                for j in range(int(start_nr), int(end_nr)):
+                    host = Host(
+                        "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
+                        "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
+                        "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+                    hosts.append(host)
+
+        except Exception:
+            super(TestACLplugin, cls).tearDownClass()
+            raise
+
     @classmethod
     def tearDownClass(cls):
-        print("Teardown")
+        super(TestACLplugin, cls).tearDownClass()
 
     def setUp(self):
-        print("Connecting API")
-        r = vpp_papi.connect("test_papi")
-        self.assertEqual(r, 0)
+        super(TestACLplugin, self).setUp()
+        self.reset_packet_infos()
 
     def tearDown(self):
-        r = vpp_papi.disconnect()
-        self.assertEqual(r, 0)
-
-    #
-    # The tests themselves
-    #
-
-    #
-    # Basic request / reply
-    #
-    def test_show_version(self):
-        t = vpp_papi.show_version()
-        print('T', t);
-        program = t.program.decode().rstrip('\x00')
-        self.assertEqual('vpe', program)
-
-    def x_test_acl_add(self):
-        print("Test ACL add")
-        self.assertEqual(1, 1)
-    
-    #
-    # Details / Dump
-    #
-    def x_test_details_dump(self):
-        t = vpp_papi.sw_interface_dump(0, b'')
-        print('Dump/details T', t)
-
-    #
-    # Arrays
-    #
-    def x_test_arrays(self):
-        t = vpp_papi.vnet_get_summary_stats()
-        print('Summary stats', t)
-        print('Packets:', t.total_pkts[0])
-        print('Packets:', t.total_pkts[1])
-    #
-    # Variable sized arrays and counters
-    #
-    #@unittest.skip("stats")
-    def x_test_want_stats(self):
-        pid = 123
-        vpp_papi.register_event_callback(papi_event_handler)
-        papi_event.clear()
-
-        # Need to configure IPv6 to get som IPv6 FIB stats
-        t = vpp_papi.create_loopback('')
-        print(t)
-        self.assertEqual(t.retval, 0)
-
-        ifindex = t.sw_if_index
-        addr = str(IPv6Address(u'1::1').packed)
-        t = vpp_papi.sw_interface_add_del_address(ifindex, 1, 1, 0, 16, addr)
-        print(t)
-        self.assertEqual(t.retval, 0)
-
-        # Check if interface is up
-        # XXX: Add new API to query interface state based on ifindex, instead of dump all.
-        t = vpp_papi.sw_interface_set_flags(ifindex, 1, 1, 0)
-        self.assertEqual(t.retval, 0)
-
-        t = vpp_papi.want_stats(True, pid)
-
-        print (t)
-
-        #
-        # Wait for some stats
-        #
-        self.assertEqual(papi_event.wait(15), True)
-        t = vpp_papi.want_stats(False, pid)
-        print (t)
-
-
-    #
-    # Plugins?
-    #
-
-if __name__ == '__main__' or __name__ == '__builtin__':
-    print("This is main")
-    suite = unittest.TestLoader().loadTestsFromTestCase(TestAclPlugin)
-    unittest.TextTestRunner(verbosity=2).run(suite)
-    #logging.basicConfig(level=logging.DEBUG)
-    # unittest.main()
+        """
+        Show various debug prints after each test.
+        """
+        super(TestACLplugin, self).tearDown()
+
+    def show_commands_at_teardown(self):
+        cli = "show vlib graph l2-input-feat-arc"
+        self.logger.info(self.vapi.ppcli(cli))
+        cli = "show vlib graph l2-input-feat-arc-end"
+        self.logger.info(self.vapi.ppcli(cli))
+        cli = "show vlib graph l2-output-feat-arc"
+        self.logger.info(self.vapi.ppcli(cli))
+        cli = "show vlib graph l2-output-feat-arc-end"
+        self.logger.info(self.vapi.ppcli(cli))
+        self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+        self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
+        self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
+        self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
+        self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
+                                         % self.bd_id))
+
+    def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
+                    s_prefix=0, s_ip=0,
+                    d_prefix=0, d_ip=0):
+        if ip:
+            src_prefix = IPv6Network((s_ip, s_prefix))
+            dst_prefix = IPv6Network((d_ip, d_prefix))
+        else:
+            src_prefix = IPv4Network((s_ip, s_prefix))
+            dst_prefix = IPv4Network((d_ip, d_prefix))
+        return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
+                       src_prefix=src_prefix, dst_prefix=dst_prefix)
+
+    def apply_rules(self, rules, tag=None):
+        acl = VppAcl(self, rules, tag=tag)
+        acl.add_vpp_config()
+        self.logger.info("Dumped ACL: " + str(acl.dump()))
+        # Apply a ACL on the interface as inbound
+        for i in self.pg_interfaces:
+            acl_if = VppAclInterface(
+                self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
+            acl_if.add_vpp_config()
+        return acl.acl_index
+
+    def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
+        acl = VppAcl(self, rules, tag=tag)
+        acl.add_vpp_config()
+        self.logger.info("Dumped ACL: " + str(acl.dump()))
+        # Apply a ACL on the interface as inbound
+        acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
+                                 acls=[acl])
+        return acl.acl_index
+
+    def etype_whitelist(self, whitelist, n_input, add=True):
+        # Apply whitelists on all the interfaces
+        if add:
+            self._wl = []
+            for i in self.pg_interfaces:
+                self._wl.append(VppEtypeWhitelist(
+                    self, sw_if_index=i.sw_if_index, whitelist=whitelist,
+                    n_input=n_input).add_vpp_config())
+        else:
+            if hasattr(self, "_wl"):
+                for wl in self._wl:
+                    wl.remove_vpp_config()
+
+    def create_upper_layer(self, packet_index, proto, ports=0):
+        p = self.proto_map[proto]
+        if p == 'UDP':
+            if ports == 0:
+                return UDP(sport=random.randint(self.udp_sport_from,
+                                                self.udp_sport_to),
+                           dport=random.randint(self.udp_dport_from,
+                                                self.udp_dport_to))
+            else:
+                return UDP(sport=ports, dport=ports)
+        elif p == 'TCP':
+            if ports == 0:
+                return TCP(sport=random.randint(self.tcp_sport_from,
+                                                self.tcp_sport_to),
+                           dport=random.randint(self.tcp_dport_from,
+                                                self.tcp_dport_to))
+            else:
+                return TCP(sport=ports, dport=ports)
+        return ''
+
+    def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
+                      proto=-1, ports=0, fragments=False,
+                      pkt_raw=True, etype=-1):
+        """
+        Create input packet stream for defined interface using hosts or
+        deleted_hosts list.
+
+        :param object src_if: Interface to create packet stream for.
+        :param list packet_sizes: List of required packet sizes.
+        :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
+        :return: Stream of packets.
+        """
+        pkts = []
+        if self.flows.__contains__(src_if):
+            src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
+            for dst_if in self.flows[src_if]:
+                dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
+                n_int = len(dst_hosts) * len(src_hosts)
+                for i in range(0, n_int):
+                    dst_host = dst_hosts[int(i / len(src_hosts))]
+                    src_host = src_hosts[i % len(src_hosts)]
+                    pkt_info = self.create_packet_info(src_if, dst_if)
+                    if ipv6 == 1:
+                        pkt_info.ip = 1
+                    elif ipv6 == 0:
+                        pkt_info.ip = 0
+                    else:
+                        pkt_info.ip = random.choice([0, 1])
+                    if proto == -1:
+                        pkt_info.proto = random.choice(self.proto[self.IP])
+                    else:
+                        pkt_info.proto = proto
+                    payload = self.info_to_payload(pkt_info)
+                    p = Ether(dst=dst_host.mac, src=src_host.mac)
+                    if etype > 0:
+                        p = Ether(dst=dst_host.mac,
+                                  src=src_host.mac,
+                                  type=etype)
+                    if pkt_info.ip:
+                        p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
+                        if fragments:
+                            p /= IPv6ExtHdrFragment(offset=64, m=1)
+                    else:
+                        if fragments:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4,
+                                    flags=1, frag=64)
+                        else:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4)
+                    if traffic_type == self.ICMP:
+                        if pkt_info.ip:
+                            p /= ICMPv6EchoRequest(type=self.icmp6_type,
+                                                   code=self.icmp6_code)
+                        else:
+                            p /= ICMP(type=self.icmp4_type,
+                                      code=self.icmp4_code)
+                    else:
+                        p /= self.create_upper_layer(i, pkt_info.proto, ports)
+                    if pkt_raw:
+                        p /= Raw(payload)
+                        pkt_info.data = p.copy()
+                    if pkt_raw:
+                        size = random.choice(packet_sizes)
+                        self.extend_packet(p, size)
+                    pkts.append(p)
+        return pkts
+
+    def verify_capture(self, pg_if, capture,
+                       traffic_type=0, ip_type=0, etype=-1):
+        """
+        Verify captured input packet stream for defined interface.
+
+        :param object pg_if: Interface to verify captured packet stream for.
+        :param list capture: Captured packet stream.
+        :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
+        """
+        last_info = dict()
+        for i in self.pg_interfaces:
+            last_info[i.sw_if_index] = None
+        dst_sw_if_index = pg_if.sw_if_index
+        for packet in capture:
+            if etype > 0:
+                if packet[Ether].type != etype:
+                    self.logger.error(ppp("Unexpected ethertype in packet:",
+                                          packet))
+                else:
+                    continue
+            try:
+                # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
+                if traffic_type == self.ICMP and ip_type == self.IPV6:
+                    payload_info = self.payload_to_info(
+                        packet[ICMPv6EchoRequest], 'data')
+                    payload = packet[ICMPv6EchoRequest]
+                else:
+                    payload_info = self.payload_to_info(packet[Raw])
+                    payload = packet[self.proto_map[payload_info.proto]]
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet "
+                                      "(outside network):", packet))
+                raise
+
+            if ip_type != 0:
+                self.assertEqual(payload_info.ip, ip_type)
+            if traffic_type == self.ICMP:
+                try:
+                    if payload_info.ip == 0:
+                        self.assertEqual(payload.type, self.icmp4_type)
+                        self.assertEqual(payload.code, self.icmp4_code)
+                    else:
+                        self.assertEqual(payload.type, self.icmp6_type)
+                        self.assertEqual(payload.code, self.icmp6_code)
+                except:
+                    self.logger.error(ppp("Unexpected or invalid packet "
+                                          "(outside network):", packet))
+                    raise
+            else:
+                try:
+                    ip_version = IPv6 if payload_info.ip == 1 else IP
+
+                    ip = packet[ip_version]
+                    packet_index = payload_info.index
+
+                    self.assertEqual(payload_info.dst, dst_sw_if_index)
+                    self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
+                                      (pg_if.name, payload_info.src,
+                                       packet_index))
+                    next_info = self.get_next_packet_info_for_interface2(
+                        payload_info.src, dst_sw_if_index,
+                        last_info[payload_info.src])
+                    last_info[payload_info.src] = next_info
+                    self.assertTrue(next_info is not None)
+                    self.assertEqual(packet_index, next_info.index)
+                    saved_packet = next_info.data
+                    # Check standard fields
+                    self.assertEqual(ip.src, saved_packet[ip_version].src)
+                    self.assertEqual(ip.dst, saved_packet[ip_version].dst)
+                    p = self.proto_map[payload_info.proto]
+                    if p == 'TCP':
+                        tcp = packet[TCP]
+                        self.assertEqual(tcp.sport, saved_packet[
+                            TCP].sport)
+                        self.assertEqual(tcp.dport, saved_packet[
+                            TCP].dport)
+                    elif p == 'UDP':
+                        udp = packet[UDP]
+                        self.assertEqual(udp.sport, saved_packet[
+                            UDP].sport)
+                        self.assertEqual(udp.dport, saved_packet[
+                            UDP].dport)
+                except:
+                    self.logger.error(ppp("Unexpected or invalid packet:",
+                                          packet))
+                    raise
+        for i in self.pg_interfaces:
+            remaining_packet = self.get_next_packet_info_for_interface2(
+                i, dst_sw_if_index, last_info[i.sw_if_index])
+            self.assertTrue(
+                remaining_packet is None,
+                "Port %u: Packet expected from source %u didn't arrive" %
+                (dst_sw_if_index, i.sw_if_index))
+
+    def run_traffic_no_check(self):
+        # Test
+        # Create incoming packet streams for packet-generator interfaces
+        for i in self.pg_interfaces:
+            if self.flows.__contains__(i):
+                pkts = self.create_stream(i, self.pg_if_packet_sizes)
+                if len(pkts) > 0:
+                    i.add_stream(pkts)
+
+        # Enable packet capture and start packet sending
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
+                        frags=False, pkt_raw=True, etype=-1):
+        # Test
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        for i in self.pg_interfaces:
+            if self.flows.__contains__(i):
+                pkts = self.create_stream(i, self.pg_if_packet_sizes,
+                                          traffic_type, ip_type, proto, ports,
+                                          frags, pkt_raw, etype)
+                if len(pkts) > 0:
+                    i.add_stream(pkts)
+                    pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        self.logger.info("sent packets count: %d" % pkts_cnt)
+
+        # Verify
+        # Verify outgoing packet streams per packet-generator interface
+        for src_if in self.pg_interfaces:
+            if self.flows.__contains__(src_if):
+                for dst_if in self.flows[src_if]:
+                    capture = dst_if.get_capture(pkts_cnt)
+                    self.logger.info("Verifying capture on interface %s" %
+                                     dst_if.name)
+                    self.verify_capture(dst_if, capture,
+                                        traffic_type, ip_type, etype)
+
+    def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
+                              ports=0, frags=False, etype=-1):
+        # Test
+        pkts_cnt = 0
+        self.reset_packet_infos()
+        for i in self.pg_interfaces:
+            if self.flows.__contains__(i):
+                pkts = self.create_stream(i, self.pg_if_packet_sizes,
+                                          traffic_type, ip_type, proto, ports,
+                                          frags, True, etype)
+                if len(pkts) > 0:
+                    i.add_stream(pkts)
+                    pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sending
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        self.logger.info("sent packets count: %d" % pkts_cnt)
+
+        # Verify
+        # Verify outgoing packet streams per packet-generator interface
+        for src_if in self.pg_interfaces:
+            if self.flows.__contains__(src_if):
+                for dst_if in self.flows[src_if]:
+                    self.logger.info("Verifying capture on interface %s" %
+                                     dst_if.name)
+                    capture = dst_if.get_capture(0)
+                    self.assertEqual(len(capture), 0)
+
+    def test_0000_warmup_test(self):
+        """ ACL plugin version check; learn MACs
+        """
+        reply = self.vapi.papi.acl_plugin_get_version()
+        self.assertEqual(reply.major, 1)
+        self.logger.info("Working with ACL plugin version: %d.%d" % (
+            reply.major, reply.minor))
+        # minor version changes are non breaking
+        # self.assertEqual(reply.minor, 0)
+
+    def test_0001_acl_create(self):
+        """ ACL create/delete test
+        """
+
+        self.logger.info("ACLP_TEST_START_0001")
+        # Create a permit-1234 ACL
+        r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
+        # Test 1: add a new ACL
+        first_acl = VppAcl(self, rules=r, tag="permit 1234")
+        first_acl.add_vpp_config()
+        self.assertTrue(first_acl.query_vpp_config())
+        # The very first ACL gets #0
+        self.assertEqual(first_acl.acl_index, 0)
+        rr = first_acl.dump()
+        self.logger.info("Dumped ACL: " + str(rr))
+        self.assertEqual(len(rr), 1)
+        # We should have the same number of ACL entries as we had asked
+        self.assertEqual(len(rr[0].r), len(r))
+        # The rules should be the same. But because the submitted and returned
+        # are different types, we need to iterate over rules and keys to get
+        # to basic values.
+        for i_rule in range(0, len(r) - 1):
+            encoded_rule = r[i_rule].encode()
+            for rule_key in encoded_rule:
+                self.assertEqual(rr[0].r[i_rule][rule_key],
+                                 encoded_rule[rule_key])
+
+        # Create a deny-1234 ACL
+        r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
+                  AclRule(is_permit=1, proto=17, ports=0)]
+        second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
+        second_acl.add_vpp_config()
+        self.assertTrue(second_acl.query_vpp_config())
+        # The second ACL gets #1
+        self.assertEqual(second_acl.acl_index, 1)
+
+        # Test 2: try to modify a nonexistent ACL
+        invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
+        reply = invalid_acl.add_vpp_config(expect_error=True)
+
+        # apply an ACL on an interface inbound, try to delete ACL, must fail
+        acl_if_list = VppAclInterface(
+            self, sw_if_index=self.pg0.sw_if_index, n_input=1,
+            acls=[first_acl])
+        acl_if_list.add_vpp_config()
+        first_acl.remove_vpp_config(expect_error=True)
+        # Unapply an ACL and then try to delete it - must be ok
+        acl_if_list.remove_vpp_config()
+        first_acl.remove_vpp_config()
+
+        # apply an ACL on an interface inbound, try to delete ACL, must fail
+        acl_if_list = VppAclInterface(
+            self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+            acls=[second_acl])
+        acl_if_list.add_vpp_config()
+        second_acl.remove_vpp_config(expect_error=True)
+        # Unapply an ACL and then try to delete it - must be ok
+        acl_if_list.remove_vpp_config()
+        second_acl.remove_vpp_config()
+
+        # try to apply a nonexistent ACL - must fail
+        acl_if_list = VppAclInterface(
+            self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+            acls=[invalid_acl])
+        acl_if_list.add_vpp_config(expect_error=True)
+
+        self.logger.info("ACLP_TEST_FINISH_0001")
+
+    def test_0002_acl_permit_apply(self):
+        """ permit ACL apply test
+        """
+        self.logger.info("ACLP_TEST_START_0002")
+
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      0, self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      0, self.proto[self.IP][self.TCP]))
+
+        # Apply rules
+        acl_idx = self.apply_rules(rules, "permit per-flow")
+
+        # enable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, -1)
+
+        matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+        self.logger.info("stat segment counters: %s" % repr(matches))
+        cli = "show acl-plugin acl"
+        self.logger.info(self.vapi.ppcli(cli))
+        cli = "show acl-plugin tables"
+        self.logger.info(self.vapi.ppcli(cli))
+
+        total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
+        self.assertEqual(total_hits, 64)
+
+        # disable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
+
+        self.logger.info("ACLP_TEST_FINISH_0002")
+
+    def test_0003_acl_deny_apply(self):
+        """ deny ACL apply test
+        """
+        self.logger.info("ACLP_TEST_START_0003")
+        # Add a deny-flows ACL
+        rules = []
+        rules.append(self.create_rule(
+            self.IPV4, self.DENY, self.PORTS_ALL,
+            self.proto[self.IP][self.UDP]))
+        # Permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
+
+        # enable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPV4,
+                                   self.proto[self.IP][self.UDP])
+
+        matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+        self.logger.info("stat segment counters: %s" % repr(matches))
+        cli = "show acl-plugin acl"
+        self.logger.info(self.vapi.ppcli(cli))
+        cli = "show acl-plugin tables"
+        self.logger.info(self.vapi.ppcli(cli))
+        self.assertEqual(matches[0][0]['packets'], 64)
+        # disable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
+        self.logger.info("ACLP_TEST_FINISH_0003")
+        # self.assertEqual(, 0)
+
+    def test_0004_vpp624_permit_icmpv4(self):
+        """ VPP_624 permit ICMPv4
+        """
+        self.logger.info("ACLP_TEST_START_0004")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.ICMP][self.ICMPv4]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit icmpv4")
+
+        # Traffic should still pass
+        self.run_verify_test(self.ICMP, self.IPV4,
+                             self.proto[self.ICMP][self.ICMPv4])
+
+        self.logger.info("ACLP_TEST_FINISH_0004")
+
+    def test_0005_vpp624_permit_icmpv6(self):
+        """ VPP_624 permit ICMPv6
+        """
+        self.logger.info("ACLP_TEST_START_0005")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.ICMP][self.ICMPv6]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit icmpv6")
+
+        # Traffic should still pass
+        self.run_verify_test(self.ICMP, self.IPV6,
+                             self.proto[self.ICMP][self.ICMPv6])
+
+        self.logger.info("ACLP_TEST_FINISH_0005")
+
+    def test_0006_vpp624_deny_icmpv4(self):
+        """ VPP_624 deny ICMPv4
+        """
+        self.logger.info("ACLP_TEST_START_0006")
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.ICMP][self.ICMPv4]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny icmpv4")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
+
+        self.logger.info("ACLP_TEST_FINISH_0006")
+
+    def test_0007_vpp624_deny_icmpv6(self):
+        """ VPP_624 deny ICMPv6
+        """
+        self.logger.info("ACLP_TEST_START_0007")
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.ICMP][self.ICMPv6]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny icmpv6")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
+
+        self.logger.info("ACLP_TEST_FINISH_0007")
+
+    def test_0008_tcp_permit_v4(self):
+        """ permit TCPv4
+        """
+        self.logger.info("ACLP_TEST_START_0008")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0008")
+
+    def test_0009_tcp_permit_v6(self):
+        """ permit TCPv6
+        """
+        self.logger.info("ACLP_TEST_START_0009")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0008")
+
+    def test_0010_udp_permit_v4(self):
+        """ permit UDPv4
+        """
+        self.logger.info("ACLP_TEST_START_0010")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0010")
+
+    def test_0011_udp_permit_v6(self):
+        """ permit UDPv6
+        """
+        self.logger.info("ACLP_TEST_START_0011")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0011")
+
+    def test_0012_tcp_deny(self):
+        """ deny TCPv4/v6
+        """
+        self.logger.info("ACLP_TEST_START_0012")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 tcp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0012")
+
+    def test_0013_udp_deny(self):
+        """ deny UDPv4/v6
+        """
+        self.logger.info("ACLP_TEST_START_0013")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0013")
+
+    def test_0014_acl_dump(self):
+        """ verify add/dump acls
+        """
+        self.logger.info("ACLP_TEST_START_0014")
+
+        r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
+             [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
+             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
+             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
+             [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
+             [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
+             [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
+             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
+             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
+             [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
+             [self.IPV4, self.DENY, self.PORTS_ALL, 0],
+             [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
+             [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
+             [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
+             [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
+             [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
+             [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
+             [self.IPV6, self.DENY, self.PORTS_ALL, 0]
+             ]
+
+        # Add and verify new ACLs
+        rules = []
+        for i in range(len(r)):
+            rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
+
+        acl = VppAcl(self, rules=rules)
+        acl.add_vpp_config()
+        result = acl.dump()
+
+        i = 0
+        for drules in result:
+            for dr in drules.r:
+                self.assertEqual(dr.is_permit, r[i][1])
+                self.assertEqual(dr.proto, r[i][3])
+
+                if r[i][2] > 0:
+                    self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
+                else:
+                    if r[i][2] < 0:
+                        self.assertEqual(dr.srcport_or_icmptype_first, 0)
+                        self.assertEqual(dr.srcport_or_icmptype_last, 65535)
+                    else:
+                        if dr.proto == self.proto[self.IP][self.TCP]:
+                            self.assertGreater(dr.srcport_or_icmptype_first,
+                                               self.tcp_sport_from-1)
+                            self.assertLess(dr.srcport_or_icmptype_first,
+                                            self.tcp_sport_to+1)
+                            self.assertGreater(dr.dstport_or_icmpcode_last,
+                                               self.tcp_dport_from-1)
+                            self.assertLess(dr.dstport_or_icmpcode_last,
+                                            self.tcp_dport_to+1)
+                        elif dr.proto == self.proto[self.IP][self.UDP]:
+                            self.assertGreater(dr.srcport_or_icmptype_first,
+                                               self.udp_sport_from-1)
+                            self.assertLess(dr.srcport_or_icmptype_first,
+                                            self.udp_sport_to+1)
+                            self.assertGreater(dr.dstport_or_icmpcode_last,
+                                               self.udp_dport_from-1)
+                            self.assertLess(dr.dstport_or_icmpcode_last,
+                                            self.udp_dport_to+1)
+                i += 1
+
+        self.logger.info("ACLP_TEST_FINISH_0014")
+
+    def test_0015_tcp_permit_port_v4(self):
+        """ permit single TCPv4
+        """
+        self.logger.info("ACLP_TEST_START_0015")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4,
+                             self.proto[self.IP][self.TCP], port)
+
+        self.logger.info("ACLP_TEST_FINISH_0015")
+
+    def test_0016_udp_permit_port_v4(self):
+        """ permit single UDPv4
+        """
+        self.logger.info("ACLP_TEST_START_0016")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4,
+                             self.proto[self.IP][self.UDP], port)
+
+        self.logger.info("ACLP_TEST_FINISH_0016")
+
+    def test_0017_tcp_permit_port_v6(self):
+        """ permit single TCPv6
+        """
+        self.logger.info("ACLP_TEST_START_0017")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6,
+                             self.proto[self.IP][self.TCP], port)
+
+        self.logger.info("ACLP_TEST_FINISH_0017")
+
+    def test_0018_udp_permit_port_v6(self):
+        """ permit single UDPv6
+        """
+        self.logger.info("ACLP_TEST_START_0018")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6,
+                             self.proto[self.IP][self.UDP], port)
+
+        self.logger.info("ACLP_TEST_FINISH_0018")
+
+    def test_0019_udp_deny_port(self):
+        """ deny single TCPv4/v6
+        """
+        self.logger.info("ACLP_TEST_START_0019")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, port,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, port,
+                                      self.proto[self.IP][self.TCP]))
+        # Permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.TCP], port)
+
+        self.logger.info("ACLP_TEST_FINISH_0019")
+
+    def test_0020_udp_deny_port(self):
+        """ deny single UDPv4/v6
+        """
+        self.logger.info("ACLP_TEST_START_0020")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        # Permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP], port)
+
+        self.logger.info("ACLP_TEST_FINISH_0020")
+
+    def test_0021_udp_deny_port_verify_fragment_deny(self):
+        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
+        blocked
+        """
+        self.logger.info("ACLP_TEST_START_0021")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP], port, True)
+
+        self.logger.info("ACLP_TEST_FINISH_0021")
+
+    def test_0022_zero_length_udp_ipv4(self):
+        """ VPP-687 zero length udp ipv4 packet"""
+        self.logger.info("ACLP_TEST_START_0022")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(
+            self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip4 %d" % port)
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV4,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0022")
+
+    def test_0023_zero_length_udp_ipv6(self):
+        """ VPP-687 zero length udp ipv6 packet"""
+        self.logger.info("ACLP_TEST_START_0023")
+
+        port = random.randint(16384, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip6 %d" % port)
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV6,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        # Verify outgoing packet streams per packet-generator interface
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0023")
+
+    def test_0108_tcp_permit_v4(self):
+        """ permit TCPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0108")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0108")
+
+    def test_0109_tcp_permit_v6(self):
+        """ permit TCPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0109")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0109")
+
+    def test_0110_udp_permit_v4(self):
+        """ permit UDPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0110")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0110")
+
+    def test_0111_udp_permit_v6(self):
+        """ permit UDPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0111")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0111")
+
+    def test_0112_tcp_deny(self):
+        """ deny TCPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0112")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 tcp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0112")
+
+    def test_0113_udp_deny(self):
+        """ deny UDPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0113")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0113")
+
+    def test_0300_tcp_permit_v4_etype_aaaa(self):
+        """ permit TCPv4, send 0xAAAA etype
+        """
+        self.logger.info("ACLP_TEST_START_0300")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass also for an odd ethertype
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+        self.logger.info("ACLP_TEST_FINISH_0300")
+
+    def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
+        """
+        self.logger.info("ACLP_TEST_START_0305")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+
+        # The oddball ethertype should be blocked
+        self.run_verify_negat_test(self.IP, self.IPV4,
+                                   self.proto[self.IP][self.TCP],
+                                   0, False, 0xaaaa)
+
+        # remove the whitelist
+        self.etype_whitelist([], 0, add=False)
+
+        self.logger.info("ACLP_TEST_FINISH_0305")
+
+    def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
+        """
+        self.logger.info("ACLP_TEST_START_0306")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+
+        # The whitelisted traffic, should pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0x0bbb)
+
+        # remove the whitelist, the previously blocked 0xAAAA should pass now
+        self.etype_whitelist([], 0, add=False)
+
+        self.logger.info("ACLP_TEST_FINISH_0306")
+
+    def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
+        """
+        self.logger.info("ACLP_TEST_START_0307")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+        # remove the whitelist, the previously blocked 0xAAAA should pass now
+        self.etype_whitelist([], 0, add=False)
+
+        # The whitelisted traffic, should pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+
+        self.logger.info("ACLP_TEST_FINISH_0306")
+
+    def test_0315_del_intf(self):
+        """ apply an acl and delete the interface
+        """
+        self.logger.info("ACLP_TEST_START_0315")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # create an interface
+        intf = []
+        intf.append(VppLoInterface(self))
+
+        # Apply rules
+        self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
+
+        # Remove the interface
+        intf[0].remove_vpp_config()
+
+        self.logger.info("ACLP_TEST_FINISH_0315")
+
 
+if __name__ == '__main__':
+    unittest.main(testRunner=VppTestRunner)