tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_acl_plugin.py
index 1ca74d1..5e727d3 100644 (file)
@@ -1,23 +1,30 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 """ACL plugin Test Case HLD:
 """
 
 import unittest
 import random
 
+from config import config
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 from scapy.layers.inet6 import IPv6ExtHdrFragment
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner, tag_fixme_vpp_workers
 from util import Host, ppp
+from ipaddress import IPv4Network, IPv6Network
 
 from vpp_lo_interface import VppLoInterface
+from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
+from vpp_ip import INVALID_INDEX
 
 
+@unittest.skipIf("acl" in config.excluded_plugins, "Exclude ACL plugin tests")
+@tag_fixme_vpp_workers
 class TestACLplugin(VppTestCase):
-    """ ACL plugin Test Case """
+    """ACL plugin Test Case"""
 
     # traffic types
     IP = 0
@@ -34,7 +41,7 @@ class TestACLplugin(VppTestCase):
 
     # supported protocols
     proto = [[6, 17], [1, 58]]
-    proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
+    proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
     ICMPv4 = 0
     ICMPv6 = 1
     TCP = 0
@@ -100,11 +107,13 @@ class TestACLplugin(VppTestCase):
 
             # Create BD with MAC learning and unknown unicast flooding disabled
             # and put interfaces to this BD
-            cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
-                                           learn=1)
+            cls.vapi.bridge_domain_add_del_v2(
+                bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
+            )
             for pg_if in cls.pg_interfaces:
                 cls.vapi.sw_interface_set_l2_bridge(
-                    rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
+                    rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
+                )
 
             # Set up all interfaces
             for i in cls.pg_interfaces:
@@ -125,19 +134,21 @@ class TestACLplugin(VppTestCase):
             count = 16
             start = 0
             n_int = len(cls.pg_interfaces)
-            macs_per_if = count / n_int
+            macs_per_if = count // n_int
             i = -1
             for pg_if in cls.pg_interfaces:
                 i += 1
                 start_nr = macs_per_if * i + start
-                end_nr = count + start if i == (n_int - 1) \
-                    else macs_per_if * (i + 1) + start
+                end_nr = (
+                    count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
+                )
                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
-                for j in range(start_nr, end_nr):
+                for j in range(int(start_nr), int(end_nr)):
                     host = Host(
                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
-                        "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+                        "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
+                    )
                     hosts.append(host)
 
         except Exception:
@@ -171,133 +182,103 @@ class TestACLplugin(VppTestCase):
         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
-        self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
-                                         % self.bd_id))
-
-    def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
-                    s_prefix=0, s_ip='\x00\x00\x00\x00',
-                    d_prefix=0, d_ip='\x00\x00\x00\x00'):
-        if proto == -1:
-            return
-        if ports == self.PORTS_ALL:
-            sport_from = 0
-            dport_from = 0
-            sport_to = 65535 if proto != 1 and proto != 58 else 255
-            dport_to = sport_to
-        elif ports == self.PORTS_RANGE:
-            if proto == 1:
-                sport_from = self.icmp4_type
-                sport_to = self.icmp4_type
-                dport_from = self.icmp4_code
-                dport_to = self.icmp4_code
-            elif proto == 58:
-                sport_from = self.icmp6_type
-                sport_to = self.icmp6_type
-                dport_from = self.icmp6_code
-                dport_to = self.icmp6_code
-            elif proto == self.proto[self.IP][self.TCP]:
-                sport_from = self.tcp_sport_from
-                sport_to = self.tcp_sport_to
-                dport_from = self.tcp_dport_from
-                dport_to = self.tcp_dport_to
-            elif proto == self.proto[self.IP][self.UDP]:
-                sport_from = self.udp_sport_from
-                sport_to = self.udp_sport_to
-                dport_from = self.udp_dport_from
-                dport_to = self.udp_dport_to
-        elif ports == self.PORTS_RANGE_2:
-            if proto == 1:
-                sport_from = self.icmp4_type_2
-                sport_to = self.icmp4_type_2
-                dport_from = self.icmp4_code_from_2
-                dport_to = self.icmp4_code_to_2
-            elif proto == 58:
-                sport_from = self.icmp6_type_2
-                sport_to = self.icmp6_type_2
-                dport_from = self.icmp6_code_from_2
-                dport_to = self.icmp6_code_to_2
-            elif proto == self.proto[self.IP][self.TCP]:
-                sport_from = self.tcp_sport_from_2
-                sport_to = self.tcp_sport_to_2
-                dport_from = self.tcp_dport_from_2
-                dport_to = self.tcp_dport_to_2
-            elif proto == self.proto[self.IP][self.UDP]:
-                sport_from = self.udp_sport_from_2
-                sport_to = self.udp_sport_to_2
-                dport_from = self.udp_dport_from_2
-                dport_to = self.udp_dport_to_2
+        self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
+
+    def create_rule(
+        self,
+        ip=0,
+        permit_deny=0,
+        ports=PORTS_ALL,
+        proto=-1,
+        s_prefix=0,
+        s_ip=0,
+        d_prefix=0,
+        d_ip=0,
+    ):
+        if ip:
+            src_prefix = IPv6Network((s_ip, s_prefix))
+            dst_prefix = IPv6Network((d_ip, d_prefix))
         else:
-            sport_from = ports
-            sport_to = ports
-            dport_from = ports
-            dport_to = ports
-
-        rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
-                 'srcport_or_icmptype_first': sport_from,
-                 'srcport_or_icmptype_last': sport_to,
-                 'src_ip_prefix_len': s_prefix,
-                 'src_ip_addr': s_ip,
-                 'dstport_or_icmpcode_first': dport_from,
-                 'dstport_or_icmpcode_last': dport_to,
-                 'dst_ip_prefix_len': d_prefix,
-                 'dst_ip_addr': d_ip})
-        return rule
-
-    def apply_rules(self, rules, tag=b''):
-        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
-                                          tag=tag)
-        self.logger.info("Dumped ACL: " + str(
-            self.vapi.acl_dump(reply.acl_index)))
+            src_prefix = IPv4Network((s_ip, s_prefix))
+            dst_prefix = IPv4Network((d_ip, d_prefix))
+        return AclRule(
+            is_permit=permit_deny,
+            ports=ports,
+            proto=proto,
+            src_prefix=src_prefix,
+            dst_prefix=dst_prefix,
+        )
+
+    def apply_rules(self, rules, tag=None):
+        acl = VppAcl(self, rules, tag=tag)
+        acl.add_vpp_config()
+        self.logger.info("Dumped ACL: " + str(acl.dump()))
         # Apply a ACL on the interface as inbound
         for i in self.pg_interfaces:
-            self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
-                                                 n_input=1,
-                                                 acls=[reply.acl_index])
-        return
-
-    def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
-        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
-                                          tag=tag)
-        self.logger.info("Dumped ACL: " + str(
-            self.vapi.acl_dump(reply.acl_index)))
+            acl_if = VppAclInterface(
+                self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
+            )
+            acl_if.add_vpp_config()
+        return acl.acl_index
+
+    def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
+        acl = VppAcl(self, rules, tag=tag)
+        acl.add_vpp_config()
+        self.logger.info("Dumped ACL: " + str(acl.dump()))
         # Apply a ACL on the interface as inbound
-        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
-                                             n_input=1,
-                                             acls=[reply.acl_index])
-        return
+        acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
+        return acl.acl_index
 
-    def etype_whitelist(self, whitelist, n_input):
+    def etype_whitelist(self, whitelist, n_input, add=True):
         # Apply whitelists on all the interfaces
-        for i in self.pg_interfaces:
-            # checkstyle can't read long names. Help them.
-            fun = self.vapi.acl_interface_set_etype_whitelist
-            fun(sw_if_index=i.sw_if_index, n_input=n_input,
-                whitelist=whitelist)
-        return
+        if add:
+            self._wl = []
+            for i in self.pg_interfaces:
+                self._wl.append(
+                    VppEtypeWhitelist(
+                        self,
+                        sw_if_index=i.sw_if_index,
+                        whitelist=whitelist,
+                        n_input=n_input,
+                    ).add_vpp_config()
+                )
+        else:
+            if hasattr(self, "_wl"):
+                for wl in self._wl:
+                    wl.remove_vpp_config()
 
     def create_upper_layer(self, packet_index, proto, ports=0):
         p = self.proto_map[proto]
-        if p == 'UDP':
+        if p == "UDP":
             if ports == 0:
-                return UDP(sport=random.randint(self.udp_sport_from,
-                                                self.udp_sport_to),
-                           dport=random.randint(self.udp_dport_from,
-                                                self.udp_dport_to))
+                return UDP(
+                    sport=random.randint(self.udp_sport_from, self.udp_sport_to),
+                    dport=random.randint(self.udp_dport_from, self.udp_dport_to),
+                )
             else:
                 return UDP(sport=ports, dport=ports)
-        elif p == 'TCP':
+        elif p == "TCP":
             if ports == 0:
-                return TCP(sport=random.randint(self.tcp_sport_from,
-                                                self.tcp_sport_to),
-                           dport=random.randint(self.tcp_dport_from,
-                                                self.tcp_dport_to))
+                return TCP(
+                    sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
+                    dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
+                )
             else:
                 return TCP(sport=ports, dport=ports)
-        return ''
-
-    def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0, fragments=False,
-                      pkt_raw=True, etype=-1):
+        return ""
+
+    def create_stream(
+        self,
+        src_if,
+        packet_sizes,
+        traffic_type=0,
+        ipv6=0,
+        proto=-1,
+        ports=0,
+        fragments=False,
+        pkt_raw=True,
+        etype=-1,
+    ):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -314,7 +295,7 @@ class TestACLplugin(VppTestCase):
                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
                 n_int = len(dst_hosts) * len(src_hosts)
                 for i in range(0, n_int):
-                    dst_host = dst_hosts[i / len(src_hosts)]
+                    dst_host = dst_hosts[int(i / len(src_hosts))]
                     src_host = src_hosts[i % len(src_hosts)]
                     pkt_info = self.create_packet_info(src_if, dst_if)
                     if ipv6 == 1:
@@ -330,26 +311,25 @@ class TestACLplugin(VppTestCase):
                     payload = self.info_to_payload(pkt_info)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                     if etype > 0:
-                        p = Ether(dst=dst_host.mac,
-                                  src=src_host.mac,
-                                  type=etype)
+                        p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
                         if fragments:
                             p /= IPv6ExtHdrFragment(offset=64, m=1)
                     else:
                         if fragments:
-                            p /= IP(src=src_host.ip4, dst=dst_host.ip4,
-                                    flags=1, frag=64)
+                            p /= IP(
+                                src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
+                            )
                         else:
                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
-                            p /= ICMPv6EchoRequest(type=self.icmp6_type,
-                                                   code=self.icmp6_code)
+                            p /= ICMPv6EchoRequest(
+                                type=self.icmp6_type, code=self.icmp6_code
+                            )
                         else:
-                            p /= ICMP(type=self.icmp4_type,
-                                      code=self.icmp4_code)
+                            p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
                     else:
                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
                     if pkt_raw:
@@ -361,8 +341,7 @@ class TestACLplugin(VppTestCase):
                     pkts.append(p)
         return pkts
 
-    def verify_capture(self, pg_if, capture,
-                       traffic_type=0, ip_type=0, etype=-1):
+    def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
         """
         Verify captured input packet stream for defined interface.
 
@@ -377,22 +356,23 @@ class TestACLplugin(VppTestCase):
         for packet in capture:
             if etype > 0:
                 if packet[Ether].type != etype:
-                    self.logger.error(ppp("Unexpected ethertype in packet:",
-                                          packet))
+                    self.logger.error(ppp("Unexpected ethertype in packet:", packet))
                 else:
                     continue
             try:
                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
                 if traffic_type == self.ICMP and ip_type == self.IPV6:
                     payload_info = self.payload_to_info(
-                        packet[ICMPv6EchoRequest], 'data')
+                        packet[ICMPv6EchoRequest], "data"
+                    )
                     payload = packet[ICMPv6EchoRequest]
                 else:
                     payload_info = self.payload_to_info(packet[Raw])
                     payload = packet[self.proto_map[payload_info.proto]]
             except:
-                self.logger.error(ppp("Unexpected or invalid packet "
-                                      "(outside network):", packet))
+                self.logger.error(
+                    ppp("Unexpected or invalid packet (outside network):", packet)
+                )
                 raise
 
             if ip_type != 0:
@@ -406,8 +386,9 @@ class TestACLplugin(VppTestCase):
                         self.assertEqual(payload.type, self.icmp6_type)
                         self.assertEqual(payload.code, self.icmp6_code)
                 except:
-                    self.logger.error(ppp("Unexpected or invalid packet "
-                                          "(outside network):", packet))
+                    self.logger.error(
+                        ppp("Unexpected or invalid packet (outside network):", packet)
+                    )
                     raise
             else:
                 try:
@@ -417,12 +398,13 @@ class TestACLplugin(VppTestCase):
                     packet_index = payload_info.index
 
                     self.assertEqual(payload_info.dst, dst_sw_if_index)
-                    self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
-                                      (pg_if.name, payload_info.src,
-                                       packet_index))
+                    self.logger.debug(
+                        "Got packet on port %s: src=%u (id=%u)"
+                        % (pg_if.name, payload_info.src, packet_index)
+                    )
                     next_info = self.get_next_packet_info_for_interface2(
-                        payload_info.src, dst_sw_if_index,
-                        last_info[payload_info.src])
+                        payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+                    )
                     last_info[payload_info.src] = next_info
                     self.assertTrue(next_info is not None)
                     self.assertEqual(packet_index, next_info.index)
@@ -431,29 +413,26 @@ class TestACLplugin(VppTestCase):
                     self.assertEqual(ip.src, saved_packet[ip_version].src)
                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
                     p = self.proto_map[payload_info.proto]
-                    if p == 'TCP':
+                    if p == "TCP":
                         tcp = packet[TCP]
-                        self.assertEqual(tcp.sport, saved_packet[
-                            TCP].sport)
-                        self.assertEqual(tcp.dport, saved_packet[
-                            TCP].dport)
-                    elif p == 'UDP':
+                        self.assertEqual(tcp.sport, saved_packet[TCP].sport)
+                        self.assertEqual(tcp.dport, saved_packet[TCP].dport)
+                    elif p == "UDP":
                         udp = packet[UDP]
-                        self.assertEqual(udp.sport, saved_packet[
-                            UDP].sport)
-                        self.assertEqual(udp.dport, saved_packet[
-                            UDP].dport)
+                        self.assertEqual(udp.sport, saved_packet[UDP].sport)
+                        self.assertEqual(udp.dport, saved_packet[UDP].dport)
                 except:
-                    self.logger.error(ppp("Unexpected or invalid packet:",
-                                          packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
         for i in self.pg_interfaces:
             remaining_packet = self.get_next_packet_info_for_interface2(
-                i, dst_sw_if_index, last_info[i.sw_if_index])
+                i, dst_sw_if_index, last_info[i.sw_if_index]
+            )
             self.assertTrue(
                 remaining_packet is None,
-                "Port %u: Packet expected from source %u didn't arrive" %
-                (dst_sw_if_index, i.sw_if_index))
+                "Port %u: Packet expected from source %u didn't arrive"
+                % (dst_sw_if_index, i.sw_if_index),
+            )
 
     def run_traffic_no_check(self):
         # Test
@@ -468,16 +447,32 @@ class TestACLplugin(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
-                        frags=False, pkt_raw=True, etype=-1):
+    def run_verify_test(
+        self,
+        traffic_type=0,
+        ip_type=0,
+        proto=-1,
+        ports=0,
+        frags=False,
+        pkt_raw=True,
+        etype=-1,
+    ):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
-                pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports,
-                                          frags, pkt_raw, etype)
+                pkts = self.create_stream(
+                    i,
+                    self.pg_if_packet_sizes,
+                    traffic_type,
+                    ip_type,
+                    proto,
+                    ports,
+                    frags,
+                    pkt_raw,
+                    etype,
+                )
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -493,21 +488,28 @@ class TestACLplugin(VppTestCase):
             if self.flows.__contains__(src_if):
                 for dst_if in self.flows[src_if]:
                     capture = dst_if.get_capture(pkts_cnt)
-                    self.logger.info("Verifying capture on interface %s" %
-                                     dst_if.name)
-                    self.verify_capture(dst_if, capture,
-                                        traffic_type, ip_type, etype)
+                    self.logger.info("Verifying capture on interface %s" % dst_if.name)
+                    self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
 
-    def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0, frags=False, etype=-1):
+    def run_verify_negat_test(
+        self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
+    ):
         # Test
         pkts_cnt = 0
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
-                pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports,
-                                          frags, True, etype)
+                pkts = self.create_stream(
+                    i,
+                    self.pg_if_packet_sizes,
+                    traffic_type,
+                    ip_type,
+                    proto,
+                    ports,
+                    frags,
+                    True,
+                    etype,
+                )
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -522,44 +524,33 @@ class TestACLplugin(VppTestCase):
         for src_if in self.pg_interfaces:
             if self.flows.__contains__(src_if):
                 for dst_if in self.flows[src_if]:
-                    self.logger.info("Verifying capture on interface %s" %
-                                     dst_if.name)
+                    self.logger.info("Verifying capture on interface %s" % dst_if.name)
                     capture = dst_if.get_capture(0)
                     self.assertEqual(len(capture), 0)
 
     def test_0000_warmup_test(self):
-        """ ACL plugin version check; learn MACs
-        """
+        """ACL plugin version check; learn MACs"""
         reply = self.vapi.papi.acl_plugin_get_version()
         self.assertEqual(reply.major, 1)
-        self.logger.info("Working with ACL plugin version: %d.%d" % (
-            reply.major, reply.minor))
+        self.logger.info(
+            "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
+        )
         # minor version changes are non breaking
         # self.assertEqual(reply.minor, 0)
 
     def test_0001_acl_create(self):
-        """ ACL create/delete test
-        """
+        """ACL create/delete test"""
 
         self.logger.info("ACLP_TEST_START_0001")
-        # Add an ACL
-        r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
-              'srcport_or_icmptype_first': 1234,
-              'srcport_or_icmptype_last': 1235,
-              'src_ip_prefix_len': 0,
-              'src_ip_addr': b'\x00\x00\x00\x00',
-              'dstport_or_icmpcode_first': 1234,
-              'dstport_or_icmpcode_last': 1234,
-              'dst_ip_addr': b'\x00\x00\x00\x00',
-              'dst_ip_prefix_len': 0}]
+        # Create a permit-1234 ACL
+        r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
         # Test 1: add a new ACL
-        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
-                                          tag=b"permit 1234")
-        self.assertEqual(reply.retval, 0)
+        first_acl = VppAcl(self, rules=r, tag="permit 1234")
+        first_acl.add_vpp_config()
+        self.assertTrue(first_acl.query_vpp_config())
         # The very first ACL gets #0
-        self.assertEqual(reply.acl_index, 0)
-        first_acl = reply.acl_index
-        rr = self.vapi.acl_dump(reply.acl_index)
+        self.assertEqual(first_acl.acl_index, 0)
+        rr = first_acl.dump()
         self.logger.info("Dumped ACL: " + str(rr))
         self.assertEqual(len(rr), 1)
         # We should have the same number of ACL entries as we had asked
@@ -568,168 +559,191 @@ class TestACLplugin(VppTestCase):
         # are different types, we need to iterate over rules and keys to get
         # to basic values.
         for i_rule in range(0, len(r) - 1):
-            for rule_key in r[i_rule]:
-                self.assertEqual(rr[0].r[i_rule][rule_key],
-                                 r[i_rule][rule_key])
-
-        # Add a deny-1234 ACL
-        r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
-                   'srcport_or_icmptype_first': 1234,
-                   'srcport_or_icmptype_last': 1235,
-                   'src_ip_prefix_len': 0,
-                   'src_ip_addr': b'\x00\x00\x00\x00',
-                   'dstport_or_icmpcode_first': 1234,
-                   'dstport_or_icmpcode_last': 1234,
-                   'dst_ip_addr': b'\x00\x00\x00\x00',
-                   'dst_ip_prefix_len': 0},
-                  {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
-                   'srcport_or_icmptype_first': 0,
-                   'srcport_or_icmptype_last': 0,
-                   'src_ip_prefix_len': 0,
-                   'src_ip_addr': b'\x00\x00\x00\x00',
-                   'dstport_or_icmpcode_first': 0,
-                   'dstport_or_icmpcode_last': 0,
-                   'dst_ip_addr': b'\x00\x00\x00\x00',
-                   'dst_ip_prefix_len': 0}]
-
-        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
-                                          tag=b"deny 1234;permit all")
-        self.assertEqual(reply.retval, 0)
+            encoded_rule = r[i_rule].encode()
+            for rule_key in encoded_rule:
+                self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
+
+        # Create a deny-1234 ACL
+        r_deny = [
+            AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
+            AclRule(is_permit=1, proto=17, ports=0),
+        ]
+        second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
+        second_acl.add_vpp_config()
+        self.assertTrue(second_acl.query_vpp_config())
         # The second ACL gets #1
-        self.assertEqual(reply.acl_index, 1)
-        second_acl = reply.acl_index
+        self.assertEqual(second_acl.acl_index, 1)
 
         # Test 2: try to modify a nonexistent ACL
-        reply = self.vapi.acl_add_replace(acl_index=432, r=r,
-                                          tag=b"FFFF:FFFF", expected_retval=-6)
-        self.assertEqual(reply.retval, -6)
-        # The ACL number should pass through
-        self.assertEqual(reply.acl_index, 432)
+        invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
+        reply = invalid_acl.add_vpp_config(expect_error=True)
+
         # apply an ACL on an interface inbound, try to delete ACL, must fail
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
-                                             n_input=1,
-                                             acls=[first_acl])
-        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
+        acl_if_list = VppAclInterface(
+            self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
+        )
+        acl_if_list.add_vpp_config()
+        first_acl.remove_vpp_config(expect_error=True)
         # Unapply an ACL and then try to delete it - must be ok
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
-                                             n_input=0,
-                                             acls=[])
-        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
-
-        # apply an ACL on an interface outbound, try to delete ACL, must fail
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
-                                             n_input=0,
-                                             acls=[second_acl])
-        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
-        # Unapply the ACL and then try to delete it - must be ok
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
-                                             n_input=0,
-                                             acls=[])
-        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+        acl_if_list.remove_vpp_config()
+        first_acl.remove_vpp_config()
+
+        # apply an ACL on an interface inbound, try to delete ACL, must fail
+        acl_if_list = VppAclInterface(
+            self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
+        )
+        acl_if_list.add_vpp_config()
+        second_acl.remove_vpp_config(expect_error=True)
+        # Unapply an ACL and then try to delete it - must be ok
+        acl_if_list.remove_vpp_config()
+        second_acl.remove_vpp_config()
 
         # try to apply a nonexistent ACL - must fail
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
-                                             n_input=1,
-                                             acls=[first_acl],
-                                             expected_retval=-6)
+        acl_if_list = VppAclInterface(
+            self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
+        )
+        acl_if_list.add_vpp_config(expect_error=True)
 
         self.logger.info("ACLP_TEST_FINISH_0001")
 
     def test_0002_acl_permit_apply(self):
-        """ permit ACL apply test
-        """
+        """permit ACL apply test"""
         self.logger.info("ACLP_TEST_START_0002")
 
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                     0, self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                     0, self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
+        )
+        rules.append(
+            self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
+        )
 
         # Apply rules
-        self.apply_rules(rules, b"permit per-flow")
+        acl_idx = self.apply_rules(rules, "permit per-flow")
+
+        # enable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV4, -1)
+
+        matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
+        self.logger.info("stat segment counters: %s" % repr(matches))
+        cli = "show acl-plugin acl"
+        self.logger.info(self.vapi.ppcli(cli))
+        cli = "show acl-plugin tables"
+        self.logger.info(self.vapi.ppcli(cli))
+
+        total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
+        self.assertEqual(total_hits, 64)
+
+        # disable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
+
         self.logger.info("ACLP_TEST_FINISH_0002")
 
     def test_0003_acl_deny_apply(self):
-        """ deny ACL apply test
-        """
+        """deny ACL apply test"""
         self.logger.info("ACLP_TEST_START_0003")
         # Add a deny-flows ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY,
-                     self.PORTS_ALL, self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
+            )
+        )
         # Permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny per-flow;permit all")
+        acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
+
+        # enable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPV4,
-                                   self.proto[self.IP][self.UDP])
+        self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
+
+        matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
+        self.logger.info("stat segment counters: %s" % repr(matches))
+        cli = "show acl-plugin acl"
+        self.logger.info(self.vapi.ppcli(cli))
+        cli = "show acl-plugin tables"
+        self.logger.info(self.vapi.ppcli(cli))
+        self.assertEqual(matches[0][0]["packets"], 64)
+        # disable counters
+        reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
         self.logger.info("ACLP_TEST_FINISH_0003")
-        # self.assertEqual(1, 0)
+        # self.assertEqual(, 0)
 
     def test_0004_vpp624_permit_icmpv4(self):
-        """ VPP_624 permit ICMPv4
-        """
+        """VPP_624 permit ICMPv4"""
         self.logger.info("ACLP_TEST_START_0004")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.ICMP][self.ICMPv4]))
+        rules.append(
+            self.create_rule(
+                self.IPV4,
+                self.PERMIT,
+                self.PORTS_RANGE,
+                self.proto[self.ICMP][self.ICMPv4],
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit icmpv4")
+        self.apply_rules(rules, "permit icmpv4")
 
         # Traffic should still pass
-        self.run_verify_test(self.ICMP, self.IPV4,
-                             self.proto[self.ICMP][self.ICMPv4])
+        self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
 
         self.logger.info("ACLP_TEST_FINISH_0004")
 
     def test_0005_vpp624_permit_icmpv6(self):
-        """ VPP_624 permit ICMPv6
-        """
+        """VPP_624 permit ICMPv6"""
         self.logger.info("ACLP_TEST_START_0005")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.ICMP][self.ICMPv6]))
+        rules.append(
+            self.create_rule(
+                self.IPV6,
+                self.PERMIT,
+                self.PORTS_RANGE,
+                self.proto[self.ICMP][self.ICMPv6],
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit icmpv6")
+        self.apply_rules(rules, "permit icmpv6")
 
         # Traffic should still pass
-        self.run_verify_test(self.ICMP, self.IPV6,
-                             self.proto[self.ICMP][self.ICMPv6])
+        self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
 
         self.logger.info("ACLP_TEST_FINISH_0005")
 
     def test_0006_vpp624_deny_icmpv4(self):
-        """ VPP_624 deny ICMPv4
-        """
+        """VPP_624 deny ICMPv4"""
         self.logger.info("ACLP_TEST_START_0006")
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.ICMP][self.ICMPv4]))
+        rules.append(
+            self.create_rule(
+                self.IPV4,
+                self.DENY,
+                self.PORTS_RANGE,
+                self.proto[self.ICMP][self.ICMPv4],
+            )
+        )
         # permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny icmpv4")
+        self.apply_rules(rules, "deny icmpv4")
 
         # Traffic should not pass
         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
@@ -737,19 +751,23 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0006")
 
     def test_0007_vpp624_deny_icmpv6(self):
-        """ VPP_624 deny ICMPv6
-        """
+        """VPP_624 deny ICMPv6"""
         self.logger.info("ACLP_TEST_START_0007")
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.ICMP][self.ICMPv6]))
+        rules.append(
+            self.create_rule(
+                self.IPV6,
+                self.DENY,
+                self.PORTS_RANGE,
+                self.proto[self.ICMP][self.ICMPv6],
+            )
+        )
         # deny ip any any in the end
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny icmpv6")
+        self.apply_rules(rules, "deny icmpv6")
 
         # Traffic should not pass
         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
@@ -757,19 +775,21 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0007")
 
     def test_0008_tcp_permit_v4(self):
-        """ permit TCPv4
-        """
+        """permit TCPv4"""
         self.logger.info("ACLP_TEST_START_0008")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                     self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv4 tcp")
+        self.apply_rules(rules, "permit ipv4 tcp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
@@ -777,19 +797,21 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0008")
 
     def test_0009_tcp_permit_v6(self):
-        """ permit TCPv6
-        """
+        """permit TCPv6"""
         self.logger.info("ACLP_TEST_START_0009")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip6 tcp")
+        self.apply_rules(rules, "permit ip6 tcp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
@@ -797,19 +819,21 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0008")
 
     def test_0010_udp_permit_v4(self):
-        """ permit UDPv4
-        """
+        """permit UDPv4"""
         self.logger.info("ACLP_TEST_START_0010")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv udp")
+        self.apply_rules(rules, "permit ipv udp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
@@ -817,19 +841,21 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0010")
 
     def test_0011_udp_permit_v6(self):
-        """ permit UDPv6
-        """
+        """permit UDPv6"""
         self.logger.info("ACLP_TEST_START_0011")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip6 udp")
+        self.apply_rules(rules, "permit ip6 udp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
@@ -837,94 +863,102 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0011")
 
     def test_0012_tcp_deny(self):
-        """ deny TCPv4/v6
-        """
+        """deny TCPv4/v6"""
         self.logger.info("ACLP_TEST_START_0012")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny ip4/ip6 tcp")
+        self.apply_rules(rules, "deny ip4/ip6 tcp")
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPRANDOM,
-                                   self.proto[self.IP][self.TCP])
+        self.run_verify_negat_test(
+            self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0012")
 
     def test_0013_udp_deny(self):
-        """ deny UDPv4/v6
-        """
+        """deny UDPv4/v6"""
         self.logger.info("ACLP_TEST_START_0013")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
         # permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny ip4/ip6 udp")
+        self.apply_rules(rules, "deny ip4/ip6 udp")
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPRANDOM,
-                                   self.proto[self.IP][self.UDP])
+        self.run_verify_negat_test(
+            self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0013")
 
     def test_0014_acl_dump(self):
-        """ verify add/dump acls
-        """
+        """verify add/dump acls"""
         self.logger.info("ACLP_TEST_START_0014")
 
-        r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
-             [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
-             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
-             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
-             [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
-             [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
-             [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
-             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
-             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
-             [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
-             [self.IPV4, self.DENY, self.PORTS_ALL, 0],
-             [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
-             [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
-             [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
-             [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
-             [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
-             [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
-             [self.IPV6, self.DENY, self.PORTS_ALL, 0]
-             ]
+        r = [
+            [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
+            [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
+            [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
+            [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
+            [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
+            [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
+            [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
+            [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
+            [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
+            [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
+            [self.IPV4, self.DENY, self.PORTS_ALL, 0],
+            [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
+            [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
+            [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
+            [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
+            [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
+            [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
+            [self.IPV6, self.DENY, self.PORTS_ALL, 0],
+        ]
 
         # Add and verify new ACLs
         rules = []
         for i in range(len(r)):
             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
 
-        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
-        result = self.vapi.acl_dump(reply.acl_index)
+        acl = VppAcl(self, rules=rules)
+        acl.add_vpp_config()
+        result = acl.dump()
 
         i = 0
         for drules in result:
             for dr in drules.r:
-                self.assertEqual(dr.is_ipv6, r[i][0])
                 self.assertEqual(dr.is_permit, r[i][1])
                 self.assertEqual(dr.proto, r[i][3])
 
@@ -936,221 +970,241 @@ class TestACLplugin(VppTestCase):
                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
                     else:
                         if dr.proto == self.proto[self.IP][self.TCP]:
-                            self.assertGreater(dr.srcport_or_icmptype_first,
-                                               self.tcp_sport_from-1)
-                            self.assertLess(dr.srcport_or_icmptype_first,
-                                            self.tcp_sport_to+1)
-                            self.assertGreater(dr.dstport_or_icmpcode_last,
-                                               self.tcp_dport_from-1)
-                            self.assertLess(dr.dstport_or_icmpcode_last,
-                                            self.tcp_dport_to+1)
+                            self.assertGreater(
+                                dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
+                            )
+                            self.assertLess(
+                                dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
+                            )
+                            self.assertGreater(
+                                dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
+                            )
+                            self.assertLess(
+                                dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
+                            )
                         elif dr.proto == self.proto[self.IP][self.UDP]:
-                            self.assertGreater(dr.srcport_or_icmptype_first,
-                                               self.udp_sport_from-1)
-                            self.assertLess(dr.srcport_or_icmptype_first,
-                                            self.udp_sport_to+1)
-                            self.assertGreater(dr.dstport_or_icmpcode_last,
-                                               self.udp_dport_from-1)
-                            self.assertLess(dr.dstport_or_icmpcode_last,
-                                            self.udp_dport_to+1)
+                            self.assertGreater(
+                                dr.srcport_or_icmptype_first, self.udp_sport_from - 1
+                            )
+                            self.assertLess(
+                                dr.srcport_or_icmptype_first, self.udp_sport_to + 1
+                            )
+                            self.assertGreater(
+                                dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
+                            )
+                            self.assertLess(
+                                dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
+                            )
                 i += 1
 
         self.logger.info("ACLP_TEST_FINISH_0014")
 
     def test_0015_tcp_permit_port_v4(self):
-        """ permit single TCPv4
-        """
+        """permit single TCPv4"""
         self.logger.info("ACLP_TEST_START_0015")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
-                                      self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
 
         # Traffic should still pass
-        self.run_verify_test(self.IP, self.IPV4,
-                             self.proto[self.IP][self.TCP], port)
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
 
         self.logger.info("ACLP_TEST_FINISH_0015")
 
     def test_0016_udp_permit_port_v4(self):
-        """ permit single UDPv4
-        """
+        """permit single UDPv4"""
         self.logger.info("ACLP_TEST_START_0016")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
 
         # Traffic should still pass
-        self.run_verify_test(self.IP, self.IPV4,
-                             self.proto[self.IP][self.UDP], port)
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
 
         self.logger.info("ACLP_TEST_FINISH_0016")
 
     def test_0017_tcp_permit_port_v6(self):
-        """ permit single TCPv6
-        """
+        """permit single TCPv6"""
         self.logger.info("ACLP_TEST_START_0017")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
-                                      self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
 
         # Traffic should still pass
-        self.run_verify_test(self.IP, self.IPV6,
-                             self.proto[self.IP][self.TCP], port)
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
 
         self.logger.info("ACLP_TEST_FINISH_0017")
 
     def test_0018_udp_permit_port_v6(self):
-        """ permit single UPPv6
-        """
+        """permit single UDPv6"""
         self.logger.info("ACLP_TEST_START_0018")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
+            )
+        )
         # deny ip any any in the end
-        rules.append(self.create_rule(self.IPV6, self.DENY,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+        self.apply_rules(rules, "permit ip4 tcp %d" % port)
 
         # Traffic should still pass
-        self.run_verify_test(self.IP, self.IPV6,
-                             self.proto[self.IP][self.UDP], port)
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
 
         self.logger.info("ACLP_TEST_FINISH_0018")
 
     def test_0019_udp_deny_port(self):
-        """ deny single TCPv4/v6
-        """
+        """deny single TCPv4/v6"""
         self.logger.info("ACLP_TEST_START_0019")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, port,
-                                      self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV6, self.DENY, port,
-                                      self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
+        )
+        rules.append(
+            self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
+        )
         # Permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+        self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPRANDOM,
-                                   self.proto[self.IP][self.TCP], port)
+        self.run_verify_negat_test(
+            self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0019")
 
     def test_0020_udp_deny_port(self):
-        """ deny single UDPv4/v6
-        """
+        """deny single UDPv4/v6"""
         self.logger.info("ACLP_TEST_START_0020")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, port,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV6, self.DENY, port,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
+        )
+        rules.append(
+            self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
+        )
         # Permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+        self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPRANDOM,
-                                   self.proto[self.IP][self.UDP], port)
+        self.run_verify_negat_test(
+            self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
     def test_0021_udp_deny_port_verify_fragment_deny(self):
-        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
+        """deny single UDPv4/v6, permit ip any, verify non-initial fragment
         blocked
         """
         self.logger.info("ACLP_TEST_START_0021")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, port,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV6, self.DENY, port,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
+        )
+        rules.append(
+            self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
+        )
         # deny ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+        self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPRANDOM,
-                                   self.proto[self.IP][self.UDP], port, True)
+        self.run_verify_negat_test(
+            self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0021")
 
     def test_0022_zero_length_udp_ipv4(self):
-        """ VPP-687 zero length udp ipv4 packet"""
+        """VPP-687 zero length udp ipv4 packet"""
         self.logger.info("ACLP_TEST_START_0022")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
-                                      self.proto[self.IP][self.UDP]))
-        # deny ip any any in the end
         rules.append(
-            self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+            self.create_rule(
+                self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
+            )
+        )
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
+        self.apply_rules(rules, "permit empty udp ip4 %d" % port)
 
         # Traffic should still pass
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
-        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
-                                  self.IP, self.IPV4,
-                                  self.proto[self.IP][self.UDP], port,
-                                  False, False)
+        pkts = self.create_stream(
+            self.pg0,
+            self.pg_if_packet_sizes,
+            self.IP,
+            self.IPV4,
+            self.proto[self.IP][self.UDP],
+            port,
+            False,
+            False,
+        )
         if len(pkts) > 0:
             self.pg0.add_stream(pkts)
             pkts_cnt += len(pkts)
@@ -1164,27 +1218,36 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0022")
 
     def test_0023_zero_length_udp_ipv6(self):
-        """ VPP-687 zero length udp ipv6 packet"""
+        """VPP-687 zero length udp ipv6 packet"""
         self.logger.info("ACLP_TEST_START_0023")
 
-        port = random.randint(0, 65535)
+        port = random.randint(16384, 65535)
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
+        self.apply_rules(rules, "permit empty udp ip6 %d" % port)
 
         # Traffic should still pass
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
-        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
-                                  self.IP, self.IPV6,
-                                  self.proto[self.IP][self.UDP], port,
-                                  False, False)
+        pkts = self.create_stream(
+            self.pg0,
+            self.pg_if_packet_sizes,
+            self.IP,
+            self.IPV6,
+            self.proto[self.IP][self.UDP],
+            port,
+            False,
+            False,
+        )
         if len(pkts) > 0:
             self.pg0.add_stream(pkts)
             pkts_cnt += len(pkts)
@@ -1199,21 +1262,26 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0023")
 
     def test_0108_tcp_permit_v4(self):
-        """ permit TCPv4 + non-match range
-        """
+        """permit TCPv4 + non-match range"""
         self.logger.info("ACLP_TEST_START_0108")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
-                     self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                     self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv4 tcp")
+        self.apply_rules(rules, "permit ipv4 tcp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
@@ -1221,21 +1289,26 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0108")
 
     def test_0109_tcp_permit_v6(self):
-        """ permit TCPv6 + non-match range
-        """
+        """permit TCPv6 + non-match range"""
         self.logger.info("ACLP_TEST_START_0109")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
-                                      self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip6 tcp")
+        self.apply_rules(rules, "permit ip6 tcp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
@@ -1243,21 +1316,26 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0109")
 
     def test_0110_udp_permit_v4(self):
-        """ permit UDPv4 + non-match range
-        """
+        """permit UDPv4 + non-match range"""
         self.logger.info("ACLP_TEST_START_0110")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv4 udp")
+        self.apply_rules(rules, "permit ipv4 udp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
@@ -1265,21 +1343,26 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0110")
 
     def test_0111_udp_permit_v6(self):
-        """ permit UDPv6 + non-match range
-        """
+        """permit UDPv6 + non-match range"""
         self.logger.info("ACLP_TEST_START_0111")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ip6 udp")
+        self.apply_rules(rules, "permit ip6 udp")
 
         # Traffic should still pass
         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
@@ -1287,189 +1370,243 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0111")
 
     def test_0112_tcp_deny(self):
-        """ deny TCPv4/v6 + non-match range
-        """
+        """deny TCPv4/v6 + non-match range"""
         self.logger.info("ACLP_TEST_START_0112")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_RANGE_2,
-                                      self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_RANGE_2,
-                                      self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4,
+                self.PERMIT,
+                self.PORTS_RANGE_2,
+                self.proto[self.IP][self.TCP],
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6,
+                self.PERMIT,
+                self.PORTS_RANGE_2,
+                self.proto[self.IP][self.TCP],
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny ip4/ip6 tcp")
+        self.apply_rules(rules, "deny ip4/ip6 tcp")
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPRANDOM,
-                                   self.proto[self.IP][self.TCP])
+        self.run_verify_negat_test(
+            self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0112")
 
     def test_0113_udp_deny(self):
-        """ deny UDPv4/v6 + non-match range
-        """
+        """deny UDPv4/v6 + non-match range"""
         self.logger.info("ACLP_TEST_START_0113")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_RANGE_2,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_RANGE_2,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
-        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
-                                      self.proto[self.IP][self.UDP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4,
+                self.PERMIT,
+                self.PORTS_RANGE_2,
+                self.proto[self.IP][self.UDP],
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6,
+                self.PERMIT,
+                self.PORTS_RANGE_2,
+                self.proto[self.IP][self.UDP],
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+            )
+        )
         # permit ip any any in the end
-        rules.append(self.create_rule(self.IPV4, self.PERMIT,
-                                      self.PORTS_ALL, 0))
-        rules.append(self.create_rule(self.IPV6, self.PERMIT,
-                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"deny ip4/ip6 udp")
+        self.apply_rules(rules, "deny ip4/ip6 udp")
 
         # Traffic should not pass
-        self.run_verify_negat_test(self.IP, self.IPRANDOM,
-                                   self.proto[self.IP][self.UDP])
+        self.run_verify_negat_test(
+            self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0113")
 
     def test_0300_tcp_permit_v4_etype_aaaa(self):
-        """ permit TCPv4, send 0xAAAA etype
-        """
+        """permit TCPv4, send 0xAAAA etype"""
         self.logger.info("ACLP_TEST_START_0300")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
-                     self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                     self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv4 tcp")
+        self.apply_rules(rules, "permit ipv4 tcp")
 
         # Traffic should still pass also for an odd ethertype
-        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
-                             0, False, True, 0xaaaa)
+        self.run_verify_test(
+            self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
+        )
         self.logger.info("ACLP_TEST_FINISH_0300")
 
     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
-        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
-        """
+        """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
         self.logger.info("ACLP_TEST_START_0305")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
-                     self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                     self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv4 tcp")
-
+        self.apply_rules(rules, "permit ipv4 tcp")
         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
-        self.etype_whitelist([0xbbb], 1)
+        self.etype_whitelist([0xBBB], 1)
 
         # The oddball ethertype should be blocked
-        self.run_verify_negat_test(self.IP, self.IPV4,
-                                   self.proto[self.IP][self.TCP],
-                                   0, False, 0xaaaa)
+        self.run_verify_negat_test(
+            self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
+        )
 
         # remove the whitelist
-        self.etype_whitelist([], 0)
+        self.etype_whitelist([], 0, add=False)
 
         self.logger.info("ACLP_TEST_FINISH_0305")
 
     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
-        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
-        """
+        """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
         self.logger.info("ACLP_TEST_START_0306")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
-                     self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                     self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv4 tcp")
-
+        self.apply_rules(rules, "permit ipv4 tcp")
         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
-        self.etype_whitelist([0xbbb], 1)
+        self.etype_whitelist([0xBBB], 1)
 
         # The whitelisted traffic, should pass
-        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
-                             0, False, True, 0x0bbb)
+        self.run_verify_test(
+            self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
+        )
 
         # remove the whitelist, the previously blocked 0xAAAA should pass now
-        self.etype_whitelist([], 0)
+        self.etype_whitelist([], 0, add=False)
 
         self.logger.info("ACLP_TEST_FINISH_0306")
 
     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
-        """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
-        """
+        """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
         self.logger.info("ACLP_TEST_START_0307")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
-                     self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                     self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
         # Apply rules
-        self.apply_rules(rules, b"permit ipv4 tcp")
+        self.apply_rules(rules, "permit ipv4 tcp")
 
         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
-        self.etype_whitelist([0xbbb], 1)
+        self.etype_whitelist([0xBBB], 1)
         # remove the whitelist, the previously blocked 0xAAAA should pass now
-        self.etype_whitelist([], 0)
+        self.etype_whitelist([], 0, add=False)
 
         # The whitelisted traffic, should pass
-        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
-                             0, False, True, 0xaaaa)
+        self.run_verify_test(
+            self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
+        )
 
         self.logger.info("ACLP_TEST_FINISH_0306")
 
     def test_0315_del_intf(self):
-        """ apply an acl and delete the interface
-        """
+        """apply an acl and delete the interface"""
         self.logger.info("ACLP_TEST_START_0315")
 
         # Add an ACL
         rules = []
-        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
-                     self.proto[self.IP][self.TCP]))
-        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
-                     self.proto[self.IP][self.TCP]))
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+            )
+        )
+        rules.append(
+            self.create_rule(
+                self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+            )
+        )
         # deny ip any any in the end
         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
 
@@ -1478,12 +1615,13 @@ class TestACLplugin(VppTestCase):
         intf.append(VppLoInterface(self))
 
         # Apply rules
-        self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
+        self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
 
         # Remove the interface
         intf[0].remove_vpp_config()
 
         self.logger.info("ACLP_TEST_FINISH_0315")
 
-if __name__ == '__main__':
+
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)