tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_classify_l2_acl.py
index c5660a5..cc3617f 100644 (file)
@@ -1,24 +1,23 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 """ Classifier-based L2 ACL Test Case HLD:
 """
 
 import unittest
 import random
-import binascii
-import socket
-
 
 from scapy.packet import Raw
+from scapy.data import ETH_P_IP
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 from scapy.layers.inet6 import IPv6ExtHdrFragment
-from framework import VppTestCase, VppTestRunner
+from asfframework import VppTestRunner
 from util import Host, ppp
+from template_classifier import TestClassifier
 
 
-class TestClassifyAcl(VppTestCase):
-    """ Classifier-based L2 input and output ACL Test Case """
+class TestClassifyAcl(TestClassifier):
+    """Classifier-based L2 input and output ACL Test Case"""
 
     # traffic types
     IP = 0
@@ -35,7 +34,7 @@ class TestClassifyAcl(VppTestCase):
 
     # supported protocols
     proto = [[6, 17], [1, 58]]
-    proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
+    proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
     ICMPv4 = 0
     ICMPv6 = 1
     TCP = 0
@@ -87,6 +86,7 @@ class TestClassifyAcl(VppTestCase):
         variables and configure VPP.
         """
         super(TestClassifyAcl, cls).setUpClass()
+        cls.af = None
 
         try:
             # Create 2 pg interfaces
@@ -101,11 +101,13 @@ class TestClassifyAcl(VppTestCase):
 
             # Create BD with MAC learning and unknown unicast flooding disabled
             # and put interfaces to this BD
-            cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
-                                           learn=1)
+            cls.vapi.bridge_domain_add_del_v2(
+                bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
+            )
             for pg_if in cls.pg_interfaces:
-                cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
-                                                    bd_id=cls.bd_id)
+                cls.vapi.sw_interface_set_l2_bridge(
+                    rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
+                )
 
             # Set up all interfaces
             for i in cls.pg_interfaces:
@@ -124,155 +126,67 @@ class TestClassifyAcl(VppTestCase):
             # warm-up the mac address tables
             # self.warmup_test()
 
+            # Holder of the active classify table key
+            cls.acl_active_table = ""
+
         except Exception:
             super(TestClassifyAcl, cls).tearDownClass()
             raise
 
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifyAcl, cls).tearDownClass()
+
     def setUp(self):
         super(TestClassifyAcl, self).setUp()
-
         self.acl_tbl_idx = {}
-        self.reset_packet_infos()
 
     def tearDown(self):
         """
         Show various debug prints after each test.
         """
-        super(TestClassifyAcl, self).tearDown()
         if not self.vpp_dead:
-            self.logger.info(self.vapi.ppcli("show inacl type l2"))
-            self.logger.info(self.vapi.ppcli("show outacl type l2"))
-            self.logger.info(self.vapi.ppcli("show classify tables verbose"))
-            self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
-                                             % self.bd_id))
-
-    @staticmethod
-    def build_ip_mask(proto='', src_ip='', dst_ip='',
-                      src_port='', dst_port=''):
-        """Build IP ACL mask data with hexstring format
-
-        :param str proto: protocol number <0-ff>
-        :param str src_ip: source ip address <0-ffffffff>
-        :param str dst_ip: destination ip address <0-ffffffff>
-        :param str src_port: source port number <0-ffff>
-        :param str dst_port: destination port number <0-ffff>
-        """
-
-        return ('{:0>20}{:0>12}{:0>8}{:0>12}{:0>4}'.format(
-            proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
-
-    @staticmethod
-    def build_ip_match(proto='', src_ip='', dst_ip='',
-                       src_port='', dst_port=''):
-        """Build IP ACL match data with hexstring format
-
-        :param str proto: protocol number with valid option "<0-ff>"
-        :param str src_ip: source ip address with format of "x.x.x.x"
-        :param str dst_ip: destination ip address with format of "x.x.x.x"
-        :param str src_port: source port number <0-ffff>
-        :param str dst_port: destination port number <0-ffff>
-        """
-        if src_ip:
-            src_ip = socket.inet_aton(src_ip).encode('hex')
-        if dst_ip:
-            dst_ip = socket.inet_aton(dst_ip).encode('hex')
-
-        return ('{:0>20}{:0>12}{:0>8}{:0>12}{:0>4}'.format(
-            proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
-
-    @staticmethod
-    def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
-        """Build MAC ACL mask data with hexstring format
-
-        :param str dst_mac: source MAC address <0-ffffffffffff>
-        :param str src_mac: destination MAC address <0-ffffffffffff>
-        :param str ether_type: ethernet type <0-ffff>
-        """
-
-        return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
-                                              ether_type)).rstrip('0')
-
-    @staticmethod
-    def build_mac_match(dst_mac='', src_mac='', ether_type=''):
-        """Build MAC ACL match data with hexstring format
-
-        :param str dst_mac: source MAC address <x:x:x:x:x:x>
-        :param str src_mac: destination MAC address <x:x:x:x:x:x>
-        :param str ether_type: ethernet type <0-ffff>
-        """
-        if dst_mac:
-            dst_mac = dst_mac.replace(':', '')
-        if src_mac:
-            src_mac = src_mac.replace(':', '')
-
-        return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
-                                              ether_type)).rstrip('0')
+            if self.acl_active_table == "mac_inout":
+                self.output_acl_set_interface(
+                    self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
+                )
+                self.input_acl_set_interface(
+                    self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+                )
+                self.acl_active_table = ""
+            elif self.acl_active_table == "mac_out":
+                self.output_acl_set_interface(
+                    self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
+                )
+                self.acl_active_table = ""
+            elif self.acl_active_table == "mac_in":
+                self.input_acl_set_interface(
+                    self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+                )
+                self.acl_active_table = ""
 
-    def create_classify_table(self, key, mask, data_offset=0, is_add=1):
-        """Create Classify Table
+        super(TestClassifyAcl, self).tearDown()
 
-        :param str key: key for classify table (ex, ACL name).
-        :param str mask: mask value for interested traffic.
-        :param int match_n_vectors:
-        :param int is_add: option to configure classify table.
-            - create(1) or delete(0)
-        """
-        r = self.vapi.classify_add_del_table(
-            is_add,
-            binascii.unhexlify(mask),
-            match_n_vectors=(len(mask) - 1) // 32 + 1,
-            miss_next_index=0,
-            current_data_flag=1,
-            current_data_offset=data_offset)
-        self.assertIsNotNone(r, msg='No response msg for add_del_table')
-        self.acl_tbl_idx[key] = r.new_table_index
-
-    def create_classify_session(self, intf, table_index, match,
-                                hit_next_index=0xffffffff, is_add=1):
+    def create_classify_session(
+        self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1
+    ):
         """Create Classify Session
 
         :param VppInterface intf: Interface to apply classify session.
         :param int table_index: table index to identify classify table.
         :param str match: matched value for interested traffic.
-        :param int pbr_action: enable/disable PBR feature.
-        :param int vrfid: VRF id.
         :param int is_add: option to configure classify session.
             - create(1) or delete(0)
         """
+        mask_match, mask_match_len = self._resolve_mask_match(match)
         r = self.vapi.classify_add_del_session(
-            is_add,
-            table_index,
-            binascii.unhexlify(match),
-            hit_next_index=hit_next_index)
-        self.assertIsNotNone(r, msg='No response msg for add_del_session')
-
-    def input_acl_set_interface(self, intf, table_index, is_add=1):
-        """Configure Input ACL interface
-
-        :param VppInterface intf: Interface to apply Input ACL feature.
-        :param int table_index: table index to identify classify table.
-        :param int is_add: option to configure classify session.
-            - enable(1) or disable(0)
-        """
-        r = self.vapi.input_acl_set_interface(
-            is_add,
-            intf.sw_if_index,
-            l2_table_index=table_index)
-        self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
-
-    def output_acl_set_interface(self, intf, table_index, is_add=1):
-        """Configure Output ACL interface
-
-        :param VppInterface intf: Interface to apply Output ACL feature.
-        :param int table_index: table index to identify classify table.
-        :param int is_add: option to configure classify session.
-            - enable(1) or disable(0)
-        """
-        r = self.vapi.output_acl_set_interface(
-            is_add,
-            intf.sw_if_index,
-            l2_table_index=table_index)
-        self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+            is_add=is_add,
+            table_index=table_index,
+            match=mask_match,
+            match_len=mask_match_len,
+            hit_next_index=hit_next_index,
+        )
+        self.assertIsNotNone(r, "No response msg for add_del_session")
 
     def create_hosts(self, count, start=0):
         """
@@ -283,44 +197,55 @@ class TestClassifyAcl(VppTestCase):
         :param int start: Number to start numbering from.
         """
         n_int = len(self.pg_interfaces)
-        macs_per_if = count / n_int
+        macs_per_if = count // n_int
         i = -1
         for pg_if in self.pg_interfaces:
             i += 1
             start_nr = macs_per_if * i + start
-            end_nr = count + start if i == (n_int - 1) \
-                else macs_per_if * (i + 1) + start
+            end_nr = (
+                count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
+            )
             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
             for j in range(start_nr, end_nr):
                 host = Host(
                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
-                    "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+                    "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
+                )
                 hosts.append(host)
 
     def create_upper_layer(self, packet_index, proto, ports=0):
         p = self.proto_map[proto]
-        if p == 'UDP':
+        if p == "UDP":
             if ports == 0:
-                return UDP(sport=random.randint(self.udp_sport_from,
-                                                self.udp_sport_to),
-                           dport=random.randint(self.udp_dport_from,
-                                                self.udp_dport_to))
+                return UDP(
+                    sport=random.randint(self.udp_sport_from, self.udp_sport_to),
+                    dport=random.randint(self.udp_dport_from, self.udp_dport_to),
+                )
             else:
                 return UDP(sport=ports, dport=ports)
-        elif p == 'TCP':
+        elif p == "TCP":
             if ports == 0:
-                return TCP(sport=random.randint(self.tcp_sport_from,
-                                                self.tcp_sport_to),
-                           dport=random.randint(self.tcp_dport_from,
-                                                self.tcp_dport_to))
+                return TCP(
+                    sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
+                    dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
+                )
             else:
                 return TCP(sport=ports, dport=ports)
-        return ''
-
-    def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0, fragments=False,
-                      pkt_raw=True, etype=-1):
+        return ""
+
+    def create_stream(
+        self,
+        src_if,
+        packet_sizes,
+        traffic_type=0,
+        ipv6=0,
+        proto=-1,
+        ports=0,
+        fragments=False,
+        pkt_raw=True,
+        etype=-1,
+    ):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -337,7 +262,7 @@ class TestClassifyAcl(VppTestCase):
                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
                 n_int = len(dst_hosts) * len(src_hosts)
                 for i in range(0, n_int):
-                    dst_host = dst_hosts[i / len(src_hosts)]
+                    dst_host = dst_hosts[i // len(src_hosts)]
                     src_host = src_hosts[i % len(src_hosts)]
                     pkt_info = self.create_packet_info(src_if, dst_if)
                     if ipv6 == 1:
@@ -353,26 +278,25 @@ class TestClassifyAcl(VppTestCase):
                     payload = self.info_to_payload(pkt_info)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                     if etype > 0:
-                        p = Ether(dst=dst_host.mac,
-                                  src=src_host.mac,
-                                  type=etype)
+                        p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
                         if fragments:
                             p /= IPv6ExtHdrFragment(offset=64, m=1)
                     else:
                         if fragments:
-                            p /= IP(src=src_host.ip4, dst=dst_host.ip4,
-                                    flags=1, frag=64)
+                            p /= IP(
+                                src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
+                            )
                         else:
                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
-                            p /= ICMPv6EchoRequest(type=self.icmp6_type,
-                                                   code=self.icmp6_code)
+                            p /= ICMPv6EchoRequest(
+                                type=self.icmp6_type, code=self.icmp6_code
+                            )
                         else:
-                            p /= ICMP(type=self.icmp4_type,
-                                      code=self.icmp4_code)
+                            p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
                     else:
                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
                     if pkt_raw:
@@ -384,8 +308,7 @@ class TestClassifyAcl(VppTestCase):
                     pkts.append(p)
         return pkts
 
-    def verify_capture(self, pg_if, capture,
-                       traffic_type=0, ip_type=0, etype=-1):
+    def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
         """
         Verify captured input packet stream for defined interface.
 
@@ -400,22 +323,21 @@ class TestClassifyAcl(VppTestCase):
         for packet in capture:
             if etype > 0:
                 if packet[Ether].type != etype:
-                    self.logger.error(ppp("Unexpected ethertype in packet:",
-                                          packet))
+                    self.logger.error(ppp("Unexpected ethertype in packet:", packet))
                 else:
                     continue
             try:
                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
                 if traffic_type == self.ICMP and ip_type == self.IPV6:
-                    payload_info = self.payload_to_info(
-                        packet[ICMPv6EchoRequest].data)
+                    payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data)
                     payload = packet[ICMPv6EchoRequest]
                 else:
-                    payload_info = self.payload_to_info(str(packet[Raw]))
+                    payload_info = self.payload_to_info(packet[Raw])
                     payload = packet[self.proto_map[payload_info.proto]]
             except:
-                self.logger.error(ppp("Unexpected or invalid packet "
-                                      "(outside network):", packet))
+                self.logger.error(
+                    ppp("Unexpected or invalid packet (outside network):", packet)
+                )
                 raise
 
             if ip_type != 0:
@@ -429,8 +351,9 @@ class TestClassifyAcl(VppTestCase):
                         self.assertEqual(payload.type, self.icmp6_type)
                         self.assertEqual(payload.code, self.icmp6_code)
                 except:
-                    self.logger.error(ppp("Unexpected or invalid packet "
-                                          "(outside network):", packet))
+                    self.logger.error(
+                        ppp("Unexpected or invalid packet (outside network):", packet)
+                    )
                     raise
             else:
                 try:
@@ -440,12 +363,13 @@ class TestClassifyAcl(VppTestCase):
                     packet_index = payload_info.index
 
                     self.assertEqual(payload_info.dst, dst_sw_if_index)
-                    self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
-                                      (pg_if.name, payload_info.src,
-                                       packet_index))
+                    self.logger.debug(
+                        "Got packet on port %s: src=%u (id=%u)"
+                        % (pg_if.name, payload_info.src, packet_index)
+                    )
                     next_info = self.get_next_packet_info_for_interface2(
-                        payload_info.src, dst_sw_if_index,
-                        last_info[payload_info.src])
+                        payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+                    )
                     last_info[payload_info.src] = next_info
                     self.assertTrue(next_info is not None)
                     self.assertEqual(packet_index, next_info.index)
@@ -454,29 +378,26 @@ class TestClassifyAcl(VppTestCase):
                     self.assertEqual(ip.src, saved_packet[ip_version].src)
                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
                     p = self.proto_map[payload_info.proto]
-                    if p == 'TCP':
+                    if p == "TCP":
                         tcp = packet[TCP]
-                        self.assertEqual(tcp.sport, saved_packet[
-                            TCP].sport)
-                        self.assertEqual(tcp.dport, saved_packet[
-                            TCP].dport)
-                    elif p == 'UDP':
+                        self.assertEqual(tcp.sport, saved_packet[TCP].sport)
+                        self.assertEqual(tcp.dport, saved_packet[TCP].dport)
+                    elif p == "UDP":
                         udp = packet[UDP]
-                        self.assertEqual(udp.sport, saved_packet[
-                            UDP].sport)
-                        self.assertEqual(udp.dport, saved_packet[
-                            UDP].dport)
+                        self.assertEqual(udp.sport, saved_packet[UDP].sport)
+                        self.assertEqual(udp.dport, saved_packet[UDP].dport)
                 except:
-                    self.logger.error(ppp("Unexpected or invalid packet:",
-                                          packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
         for i in self.pg_interfaces:
             remaining_packet = self.get_next_packet_info_for_interface2(
-                i, dst_sw_if_index, last_info[i.sw_if_index])
+                i, dst_sw_if_index, last_info[i.sw_if_index]
+            )
             self.assertTrue(
                 remaining_packet is None,
-                "Port %u: Packet expected from source %u didn't arrive" %
-                (dst_sw_if_index, i.sw_if_index))
+                "Port %u: Packet expected from source %u didn't arrive"
+                % (dst_sw_if_index, i.sw_if_index),
+            )
 
     def run_traffic_no_check(self):
         # Test
@@ -491,16 +412,32 @@ class TestClassifyAcl(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
-                        frags=False, pkt_raw=True, etype=-1):
+    def run_verify_test(
+        self,
+        traffic_type=0,
+        ip_type=0,
+        proto=-1,
+        ports=0,
+        frags=False,
+        pkt_raw=True,
+        etype=-1,
+    ):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
-                pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports,
-                                          frags, pkt_raw, etype)
+                pkts = self.create_stream(
+                    i,
+                    self.pg_if_packet_sizes,
+                    traffic_type,
+                    ip_type,
+                    proto,
+                    ports,
+                    frags,
+                    pkt_raw,
+                    etype,
+                )
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -515,20 +452,27 @@ class TestClassifyAcl(VppTestCase):
             if self.flows.__contains__(src_if):
                 for dst_if in self.flows[src_if]:
                     capture = dst_if.get_capture(pkts_cnt)
-                    self.logger.info("Verifying capture on interface %s" %
-                                     dst_if.name)
-                    self.verify_capture(dst_if, capture,
-                                        traffic_type, ip_type, etype)
+                    self.logger.info("Verifying capture on interface %s" % dst_if.name)
+                    self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
 
-    def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0, frags=False, etype=-1):
+    def run_verify_negat_test(
+        self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
+    ):
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
-                pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports,
-                                          frags, True, etype)
+                pkts = self.create_stream(
+                    i,
+                    self.pg_if_packet_sizes,
+                    traffic_type,
+                    ip_type,
+                    proto,
+                    ports,
+                    frags,
+                    True,
+                    etype,
+                )
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
@@ -541,93 +485,166 @@ class TestClassifyAcl(VppTestCase):
         for src_if in self.pg_interfaces:
             if self.flows.__contains__(src_if):
                 for dst_if in self.flows[src_if]:
-                    self.logger.info("Verifying capture on interface %s" %
-                                     dst_if.name)
+                    self.logger.info("Verifying capture on interface %s" % dst_if.name)
                     capture = dst_if.get_capture(0)
                     self.assertEqual(len(capture), 0)
 
-    def build_classify_table(self, hit_next_index=0xffffffff):
-        # Basic ACL testing with source MAC
-        a_mask = self.build_mac_mask(src_mac='ffffffffffff')
-        self.create_classify_table('ip', a_mask)
+    def build_classify_table(
+        self,
+        src_mac="",
+        dst_mac="",
+        ether_type="",
+        etype="",
+        key="mac",
+        hit_next_index=0xFFFFFFFF,
+    ):
+        # Basic ACL testing
+        a_mask = self.build_mac_mask(
+            src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type
+        )
+        self.create_classify_table(key, a_mask)
         for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
-            self.create_classify_session(
-                self.pg0, self.acl_tbl_idx.get('ip'),
-                self.build_mac_match(src_mac=host.mac),
-                hit_next_index=hit_next_index)
+            s_mac = host.mac if src_mac else ""
+            if dst_mac:
+                for dst_if in self.flows[self.pg0]:
+                    for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
+                        self.create_classify_session(
+                            self.pg0,
+                            self.acl_tbl_idx.get(key),
+                            self.build_mac_match(
+                                src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype
+                            ),
+                            hit_next_index=hit_next_index,
+                        )
+            else:
+                self.create_classify_session(
+                    self.pg0,
+                    self.acl_tbl_idx.get(key),
+                    self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype),
+                    hit_next_index=hit_next_index,
+                )
 
     def test_0000_warmup_test(self):
-        """ Learn the MAC addresses
-        """
+        """Learn the MAC addresses"""
         self.create_hosts(2)
         self.run_traffic_no_check()
 
-    def test_0010_inacl_permit(self):
-        """ Input  L2 ACL test - permit
+    def test_0010_inacl_permit_src_mac(self):
+        """Input  L2 ACL test - permit source MAC
 
         Test scenario for basic IP ACL with source IP
             - Create IPv4 stream for pg0 -> pg1 interface.
             - Create ACL with source MAC address.
             - Send and verify received packets on pg1 interface.
         """
-        self.build_classify_table()
-        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+        key = "mac_in"
+        self.build_classify_table(src_mac="ffffffffffff", key=key)
+        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
+        self.run_verify_test(self.IP, self.IPV4, -1)
+
+    def test_0011_inacl_permit_dst_mac(self):
+        """Input  L2 ACL test - permit destination MAC
+
+        Test scenario for basic IP ACL with source IP
+            - Create IPv4 stream for pg0 -> pg1 interface.
+            - Create ACL with destination MAC address.
+            - Send and verify received packets on pg1 interface.
+        """
+        key = "mac_in"
+        self.build_classify_table(dst_mac="ffffffffffff", key=key)
+        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
+        self.run_verify_test(self.IP, self.IPV4, -1)
+
+    def test_0012_inacl_permit_src_dst_mac(self):
+        """Input  L2 ACL test - permit source and destination MAC
+
+        Test scenario for basic IP ACL with source IP
+            - Create IPv4 stream for pg0 -> pg1 interface.
+            - Create ACL with source and destination MAC addresses.
+            - Send and verify received packets on pg1 interface.
+        """
+        key = "mac_in"
+        self.build_classify_table(
+            src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key
+        )
+        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
+        self.run_verify_test(self.IP, self.IPV4, -1)
+
+    def test_0013_inacl_permit_ether_type(self):
+        """Input  L2 ACL test - permit ether_type
+
+        Test scenario for basic IP ACL with source IP
+            - Create IPv4 stream for pg0 -> pg1 interface.
+            - Create ACL with destination MAC address.
+            - Send and verify received packets on pg1 interface.
+        """
+        key = "mac_in"
+        self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key)
+        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
         self.run_verify_test(self.IP, self.IPV4, -1)
-        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
 
     def test_0015_inacl_deny(self):
-        """ Input  L2 ACL test - deny
+        """Input  L2 ACL test - deny
 
         Test scenario for basic IP ACL with source IP
             - Create IPv4 stream for pg0 -> pg1 interface.
+
             - Create ACL with source MAC address.
             - Send and verify no received packets on pg1 interface.
         """
-        self.build_classify_table(hit_next_index=0)
-        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+        key = "mac_in"
+        self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
+        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
         self.run_verify_negat_test(self.IP, self.IPV4, -1)
-        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
 
     def test_0020_outacl_permit(self):
-        """ Output L2 ACL test - permit
+        """Output L2 ACL test - permit
 
         Test scenario for basic IP ACL with source IP
             - Create IPv4 stream for pg0 -> pg1 interface.
             - Create ACL with source MAC address.
             - Send and verify received packets on pg1 interface.
         """
-        self.build_classify_table()
-        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
+        key = "mac_out"
+        self.build_classify_table(src_mac="ffffffffffff", key=key)
+        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
         self.run_verify_test(self.IP, self.IPV4, -1)
-        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
 
     def test_0025_outacl_deny(self):
-        """ Output L2 ACL test - deny
+        """Output L2 ACL test - deny
 
         Test scenario for basic IP ACL with source IP
             - Create IPv4 stream for pg0 -> pg1 interface.
             - Create ACL with source MAC address.
             - Send and verify no received packets on pg1 interface.
         """
-        self.build_classify_table(hit_next_index=0)
-        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
+        key = "mac_out"
+        self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
+        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
         self.run_verify_negat_test(self.IP, self.IPV4, -1)
-        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
 
     def test_0030_inoutacl_permit(self):
-        """ Input+Output L2 ACL test - permit
+        """Input+Output L2 ACL test - permit
 
         Test scenario for basic IP ACL with source IP
             - Create IPv4 stream for pg0 -> pg1 interface.
             - Create ACLs with source MAC address.
             - Send and verify received packets on pg1 interface.
         """
-        self.build_classify_table()
-        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
-        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+        key = "mac_inout"
+        self.build_classify_table(src_mac="ffffffffffff", key=key)
+        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
+        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+        self.acl_active_table = key
         self.run_verify_test(self.IP, self.IPV4, -1)
-        self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
-        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
 
-if __name__ == '__main__':
+
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)