acl-plugin: add whitelisted ethertype mode (VPP-1163)
[vpp.git] / test / test_acl_plugin.py
index 2bbebe8..5fcf09c 100644 (file)
@@ -9,6 +9,7 @@ from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6ExtHdrFragment
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
@@ -41,6 +42,7 @@ class TestACLplugin(VppTestCase):
     # port ranges
     PORTS_ALL = -1
     PORTS_RANGE = 0
     # port ranges
     PORTS_ALL = -1
     PORTS_RANGE = 0
+    PORTS_RANGE_2 = 1
     udp_sport_from = 10
     udp_sport_to = udp_sport_from + 5
     udp_dport_from = 20000
     udp_sport_from = 10
     udp_sport_to = udp_sport_from + 5
     udp_dport_from = 20000
@@ -50,11 +52,27 @@ class TestACLplugin(VppTestCase):
     tcp_dport_from = 40000
     tcp_dport_to = tcp_dport_from + 5000
 
     tcp_dport_from = 40000
     tcp_dport_to = tcp_dport_from + 5000
 
+    udp_sport_from_2 = 90
+    udp_sport_to_2 = udp_sport_from_2 + 5
+    udp_dport_from_2 = 30000
+    udp_dport_to_2 = udp_dport_from_2 + 5000
+    tcp_sport_from_2 = 130
+    tcp_sport_to_2 = tcp_sport_from_2 + 5
+    tcp_dport_from_2 = 20000
+    tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
     icmp4_type = 8  # echo request
     icmp4_code = 3
     icmp6_type = 128  # echo request
     icmp6_code = 3
 
     icmp4_type = 8  # echo request
     icmp4_code = 3
     icmp6_type = 128  # echo request
     icmp6_code = 3
 
+    icmp4_type_2 = 8
+    icmp4_code_from_2 = 5
+    icmp4_code_to_2 = 20
+    icmp6_type_2 = 128
+    icmp6_code_from_2 = 8
+    icmp6_code_to_2 = 42
+
     # Test variables
     bd_id = 1
 
     # Test variables
     bd_id = 1
 
@@ -67,8 +85,6 @@ class TestACLplugin(VppTestCase):
         """
         super(TestACLplugin, cls).setUpClass()
 
         """
         super(TestACLplugin, cls).setUpClass()
 
-        random.seed()
-
         try:
             # Create 2 pg interfaces
             cls.create_pg_interfaces(range(2))
         try:
             # Create 2 pg interfaces
             cls.create_pg_interfaces(range(2))
@@ -120,6 +136,9 @@ class TestACLplugin(VppTestCase):
         super(TestACLplugin, self).tearDown()
         if not self.vpp_dead:
             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
         super(TestACLplugin, self).tearDown()
         if not self.vpp_dead:
             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+            self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
+            self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
+            self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
                                              % self.bd_id))
 
             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
                                              % self.bd_id))
 
@@ -178,6 +197,27 @@ class TestACLplugin(VppTestCase):
                 sport_to = self.udp_sport_to
                 dport_from = self.udp_dport_from
                 dport_to = self.udp_dport_to
                 sport_to = self.udp_sport_to
                 dport_from = self.udp_dport_from
                 dport_to = self.udp_dport_to
+        elif ports == self.PORTS_RANGE_2:
+            if proto == 1:
+                sport_from = self.icmp4_type_2
+                sport_to = self.icmp4_type_2
+                dport_from = self.icmp4_code_from_2
+                dport_to = self.icmp4_code_to_2
+            elif proto == 58:
+                sport_from = self.icmp6_type_2
+                sport_to = self.icmp6_type_2
+                dport_from = self.icmp6_code_from_2
+                dport_to = self.icmp6_code_to_2
+            elif proto == self.proto[self.IP][self.TCP]:
+                sport_from = self.tcp_sport_from_2
+                sport_to = self.tcp_sport_to_2
+                dport_from = self.tcp_dport_from_2
+                dport_to = self.tcp_dport_to_2
+            elif proto == self.proto[self.IP][self.UDP]:
+                sport_from = self.udp_sport_from_2
+                sport_to = self.udp_sport_to_2
+                dport_from = self.udp_dport_from_2
+                dport_to = self.udp_dport_to_2
         else:
             sport_from = ports
             sport_to = ports
         else:
             sport_from = ports
             sport_to = ports
@@ -196,16 +236,24 @@ class TestACLplugin(VppTestCase):
         return rule
 
     def apply_rules(self, rules, tag=''):
         return rule
 
     def apply_rules(self, rules, tag=''):
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
-                                         count=len(rules),
-                                         tag=tag)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+                                          tag=tag)
         self.logger.info("Dumped ACL: " + str(
         self.logger.info("Dumped ACL: " + str(
-            self.api_acl_dump(reply.acl_index)))
+            self.vapi.acl_dump(reply.acl_index)))
         # Apply a ACL on the interface as inbound
         for i in self.pg_interfaces:
         # Apply a ACL on the interface as inbound
         for i in self.pg_interfaces:
-            self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
-                                                count=1, n_input=1,
-                                                acls=[reply.acl_index])
+            self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
+                                                 n_input=1,
+                                                 acls=[reply.acl_index])
+        return
+
+    def etype_whitelist(self, whitelist, n_input):
+        # Apply whitelists on all the interfaces
+        for i in self.pg_interfaces:
+            # checkstyle can't read long names. Help them.
+            fun = self.vapi.acl_interface_set_etype_whitelist
+            fun(sw_if_index=i.sw_if_index, n_input=n_input,
+                whitelist=whitelist)
         return
 
     def create_upper_layer(self, packet_index, proto, ports=0):
         return
 
     def create_upper_layer(self, packet_index, proto, ports=0):
@@ -229,7 +277,8 @@ class TestACLplugin(VppTestCase):
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0):
+                      proto=-1, ports=0, fragments=False,
+                      pkt_raw=True, etype=-1):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -261,10 +310,20 @@ class TestACLplugin(VppTestCase):
                         pkt_info.proto = proto
                     payload = self.info_to_payload(pkt_info)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                         pkt_info.proto = proto
                     payload = self.info_to_payload(pkt_info)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
+                    if etype > 0:
+                        p = Ether(dst=dst_host.mac,
+                                  src=src_host.mac,
+                                  type=etype)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
+                        if fragments:
+                            p /= IPv6ExtHdrFragment(offset=64, m=1)
                     else:
                     else:
-                        p /= IP(src=src_host.ip4, dst=dst_host.ip4)
+                        if fragments:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4,
+                                    flags=1, frag=64)
+                        else:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4)
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
@@ -274,14 +333,17 @@ class TestACLplugin(VppTestCase):
                                       code=self.icmp4_code)
                     else:
                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
                                       code=self.icmp4_code)
                     else:
                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
-                    p /= Raw(payload)
-                    pkt_info.data = p.copy()
-                    size = random.choice(packet_sizes)
-                    self.extend_packet(p, size)
+                    if pkt_raw:
+                        p /= Raw(payload)
+                        pkt_info.data = p.copy()
+                    if pkt_raw:
+                        size = random.choice(packet_sizes)
+                        self.extend_packet(p, size)
                     pkts.append(p)
         return pkts
 
                     pkts.append(p)
         return pkts
 
-    def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
+    def verify_capture(self, pg_if, capture,
+                       traffic_type=0, ip_type=0, etype=-1):
         """
         Verify captured input packet stream for defined interface.
 
         """
         Verify captured input packet stream for defined interface.
 
@@ -294,6 +356,12 @@ class TestACLplugin(VppTestCase):
             last_info[i.sw_if_index] = None
         dst_sw_if_index = pg_if.sw_if_index
         for packet in capture:
             last_info[i.sw_if_index] = None
         dst_sw_if_index = pg_if.sw_if_index
         for packet in capture:
+            if etype > 0:
+                if packet[Ether].type != etype:
+                    self.logger.error(ppp("Unexpected ethertype in packet:",
+                                          packet))
+                else:
+                    continue
             try:
                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
                 if traffic_type == self.ICMP and ip_type == self.IPV6:
             try:
                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
                 if traffic_type == self.ICMP and ip_type == self.IPV6:
@@ -381,14 +449,16 @@ class TestACLplugin(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
+    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
+                        frags=False, pkt_raw=True, etype=-1):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags, pkt_raw, etype)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -405,16 +475,18 @@ class TestACLplugin(VppTestCase):
                     capture = dst_if.get_capture(pkts_cnt)
                     self.logger.info("Verifying capture on interface %s" %
                                      dst_if.name)
                     capture = dst_if.get_capture(pkts_cnt)
                     self.logger.info("Verifying capture on interface %s" %
                                      dst_if.name)
-                    self.verify_capture(dst_if, capture, traffic_type, ip_type)
+                    self.verify_capture(dst_if, capture,
+                                        traffic_type, ip_type, etype)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0):
+                              ports=0, frags=False, etype=-1):
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags, True, etype)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
@@ -432,37 +504,6 @@ class TestACLplugin(VppTestCase):
                     capture = dst_if.get_capture(0)
                     self.assertEqual(len(capture), 0)
 
                     capture = dst_if.get_capture(0)
                     self.assertEqual(len(capture), 0)
 
-    def api_acl_add_replace(self, acl_index, r, count, tag='',
-                            expected_retval=0):
-        """Add/replace an ACL
-
-        :param int acl_index: ACL index to replace,
-        4294967295 to create new ACL.
-        :param acl_rule r: ACL rules array.
-        :param str tag: symbolic tag (description) for this ACL.
-        :param int count: number of rules.
-        """
-        return self.vapi.api(self.vapi.papi.acl_add_replace,
-                             {'acl_index': acl_index,
-                              'r': r,
-                              'count': count,
-                              'tag': tag},
-                             expected_retval=expected_retval)
-
-    def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
-                                       expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
-                             {'sw_if_index': sw_if_index,
-                              'count': count,
-                              'n_input': n_input,
-                              'acls': acls},
-                             expected_retval=expected_retval)
-
-    def api_acl_dump(self, acl_index, expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_dump,
-                             {'acl_index': acl_index},
-                             expected_retval=expected_retval)
-
     def test_0000_warmup_test(self):
         """ ACL plugin version check; learn MACs
         """
     def test_0000_warmup_test(self):
         """ ACL plugin version check; learn MACs
         """
@@ -476,7 +517,7 @@ class TestACLplugin(VppTestCase):
         # self.assertEqual(reply.minor, 0)
 
     def test_0001_acl_create(self):
         # self.assertEqual(reply.minor, 0)
 
     def test_0001_acl_create(self):
-        """ ACL create test
+        """ ACL create/delete test
         """
 
         self.logger.info("ACLP_TEST_START_0001")
         """
 
         self.logger.info("ACLP_TEST_START_0001")
@@ -491,12 +532,13 @@ class TestACLplugin(VppTestCase):
               'dst_ip_addr': '\x00\x00\x00\x00',
               'dst_ip_prefix_len': 0}]
         # Test 1: add a new ACL
               'dst_ip_addr': '\x00\x00\x00\x00',
               'dst_ip_prefix_len': 0}]
         # Test 1: add a new ACL
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
-                                         count=len(r), tag="permit 1234")
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
+                                          tag="permit 1234")
         self.assertEqual(reply.retval, 0)
         # The very first ACL gets #0
         self.assertEqual(reply.acl_index, 0)
         self.assertEqual(reply.retval, 0)
         # The very first ACL gets #0
         self.assertEqual(reply.acl_index, 0)
-        rr = self.api_acl_dump(reply.acl_index)
+        first_acl = reply.acl_index
+        rr = self.vapi.acl_dump(reply.acl_index)
         self.logger.info("Dumped ACL: " + str(rr))
         self.assertEqual(len(rr), 1)
         # We should have the same number of ACL entries as we had asked
         self.logger.info("Dumped ACL: " + str(rr))
         self.assertEqual(len(rr), 1)
         # We should have the same number of ACL entries as we had asked
@@ -510,7 +552,7 @@ class TestACLplugin(VppTestCase):
                                  r[i_rule][rule_key])
 
         # Add a deny-1234 ACL
                                  r[i_rule][rule_key])
 
         # Add a deny-1234 ACL
-        r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
+        r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
                    'srcport_or_icmptype_first': 1234,
                    'srcport_or_icmptype_last': 1235,
                    'src_ip_prefix_len': 0,
                    'srcport_or_icmptype_first': 1234,
                    'srcport_or_icmptype_last': 1235,
                    'src_ip_prefix_len': 0,
@@ -527,21 +569,48 @@ class TestACLplugin(VppTestCase):
                    'dstport_or_icmpcode_first': 0,
                    'dstport_or_icmpcode_last': 0,
                    'dst_ip_addr': '\x00\x00\x00\x00',
                    'dstport_or_icmpcode_first': 0,
                    'dstport_or_icmpcode_last': 0,
                    'dst_ip_addr': '\x00\x00\x00\x00',
-                   'dst_ip_prefix_len': 0})
+                   'dst_ip_prefix_len': 0}]
 
 
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
-                                         count=len(r_deny),
-                                         tag="deny 1234;permit all")
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
+                                          tag="deny 1234;permit all")
         self.assertEqual(reply.retval, 0)
         # The second ACL gets #1
         self.assertEqual(reply.acl_index, 1)
         self.assertEqual(reply.retval, 0)
         # The second ACL gets #1
         self.assertEqual(reply.acl_index, 1)
+        second_acl = reply.acl_index
 
         # Test 2: try to modify a nonexistent ACL
 
         # Test 2: try to modify a nonexistent ACL
-        reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
-                                         tag="FFFF:FFFF", expected_retval=-1)
-        self.assertEqual(reply.retval, -1)
+        reply = self.vapi.acl_add_replace(acl_index=432, r=r,
+                                          tag="FFFF:FFFF", expected_retval=-6)
+        self.assertEqual(reply.retval, -6)
         # The ACL number should pass through
         self.assertEqual(reply.acl_index, 432)
         # The ACL number should pass through
         self.assertEqual(reply.acl_index, 432)
+        # apply an ACL on an interface inbound, try to delete ACL, must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=1,
+                                             acls=[first_acl])
+        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
+        # Unapply an ACL and then try to delete it - must be ok
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[])
+        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
+
+        # apply an ACL on an interface outbound, try to delete ACL, must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[second_acl])
+        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
+        # Unapply the ACL and then try to delete it - must be ok
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[])
+        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+
+        # try to apply a nonexistent ACL - must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=1,
+                                             acls=[first_acl],
+                                             expected_retval=-6)
 
         self.logger.info("ACLP_TEST_FINISH_0001")
 
 
         self.logger.info("ACLP_TEST_FINISH_0001")
 
@@ -828,9 +897,8 @@ class TestACLplugin(VppTestCase):
         for i in range(len(r)):
             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
 
         for i in range(len(r)):
             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
 
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
-                                         count=len(rules))
-        result = self.api_acl_dump(reply.acl_index)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
+        result = self.vapi.acl_dump(reply.acl_index)
 
         i = 0
         for drules in result:
 
         i = 0
         for drules in result:
@@ -1011,5 +1079,319 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
+    def test_0021_udp_deny_port_verify_fragment_deny(self):
+        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
+        """
+        self.logger.info("ACLP_TEST_START_0021")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP], port, True)
+
+        self.logger.info("ACLP_TEST_FINISH_0021")
+
+    def test_0022_zero_length_udp_ipv4(self):
+        """ VPP-687 zero length udp ipv4 packet"""
+        self.logger.info("ACLP_TEST_START_0022")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(
+            self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip4 " + str(port))
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV4,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0022")
+
+    def test_0023_zero_length_udp_ipv6(self):
+        """ VPP-687 zero length udp ipv6 packet"""
+        self.logger.info("ACLP_TEST_START_0023")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip6 "+str(port))
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV6,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        # Verify outgoing packet streams per packet-generator interface
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0023")
+
+    def test_0108_tcp_permit_v4(self):
+        """ permit TCPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0108")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0108")
+
+    def test_0109_tcp_permit_v6(self):
+        """ permit TCPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0109")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0109")
+
+    def test_0110_udp_permit_v4(self):
+        """ permit UDPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0110")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0110")
+
+    def test_0111_udp_permit_v6(self):
+        """ permit UDPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0111")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0111")
+
+    def test_0112_tcp_deny(self):
+        """ deny TCPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0112")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 tcp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0112")
+
+    def test_0113_udp_deny(self):
+        """ deny UDPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0113")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0113")
+
+    def test_0300_tcp_permit_v4_etype_aaaa(self):
+        """ permit TCPv4, send 0xAAAA etype
+        """
+        self.logger.info("ACLP_TEST_START_0300")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        # Traffic should still pass also for an odd ethertype
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+
+        self.logger.info("ACLP_TEST_FINISH_0300")
+
+    def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
+        """
+        self.logger.info("ACLP_TEST_START_0305")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+
+        # The IPv4 traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        # The oddball ethertype should be blocked
+        self.run_verify_negat_test(self.IP, self.IPV4,
+                                   self.proto[self.IP][self.TCP],
+                                   0, False, 0xaaaa)
+
+        # The whitelisted traffic, on the other hand, should pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0x0bbb)
+
+        # remove the whitelist, the previously blocked 0xAAAA should pass now
+        self.etype_whitelist([], 0)
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+
+        self.logger.info("ACLP_TEST_FINISH_0305")
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)