acl-plugin: add whitelisted ethertype mode (VPP-1163)
[vpp.git] / test / test_acl_plugin.py
index 361ced1..5fcf09c 100644 (file)
@@ -247,6 +247,15 @@ class TestACLplugin(VppTestCase):
                                                  acls=[reply.acl_index])
         return
 
+    def etype_whitelist(self, whitelist, n_input):
+        # Apply whitelists on all the interfaces
+        for i in self.pg_interfaces:
+            # checkstyle can't read long names. Help them.
+            fun = self.vapi.acl_interface_set_etype_whitelist
+            fun(sw_if_index=i.sw_if_index, n_input=n_input,
+                whitelist=whitelist)
+        return
+
     def create_upper_layer(self, packet_index, proto, ports=0):
         p = self.proto_map[proto]
         if p == 'UDP':
@@ -268,7 +277,8 @@ class TestACLplugin(VppTestCase):
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0, fragments=False, pkt_raw=True):
+                      proto=-1, ports=0, fragments=False,
+                      pkt_raw=True, etype=-1):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -300,6 +310,10 @@ class TestACLplugin(VppTestCase):
                         pkt_info.proto = proto
                     payload = self.info_to_payload(pkt_info)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
+                    if etype > 0:
+                        p = Ether(dst=dst_host.mac,
+                                  src=src_host.mac,
+                                  type=etype)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
                         if fragments:
@@ -328,7 +342,8 @@ class TestACLplugin(VppTestCase):
                     pkts.append(p)
         return pkts
 
-    def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
+    def verify_capture(self, pg_if, capture,
+                       traffic_type=0, ip_type=0, etype=-1):
         """
         Verify captured input packet stream for defined interface.
 
@@ -341,6 +356,12 @@ class TestACLplugin(VppTestCase):
             last_info[i.sw_if_index] = None
         dst_sw_if_index = pg_if.sw_if_index
         for packet in capture:
+            if etype > 0:
+                if packet[Ether].type != etype:
+                    self.logger.error(ppp("Unexpected ethertype in packet:",
+                                          packet))
+                else:
+                    continue
             try:
                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
                 if traffic_type == self.ICMP and ip_type == self.IPV6:
@@ -429,7 +450,7 @@ class TestACLplugin(VppTestCase):
         self.pg_start()
 
     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
-                        frags=False, pkt_raw=True):
+                        frags=False, pkt_raw=True, etype=-1):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
@@ -437,7 +458,7 @@ class TestACLplugin(VppTestCase):
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
                                           traffic_type, ip_type, proto, ports,
-                                          frags, pkt_raw)
+                                          frags, pkt_raw, etype)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -454,17 +475,18 @@ class TestACLplugin(VppTestCase):
                     capture = dst_if.get_capture(pkts_cnt)
                     self.logger.info("Verifying capture on interface %s" %
                                      dst_if.name)
-                    self.verify_capture(dst_if, capture, traffic_type, ip_type)
+                    self.verify_capture(dst_if, capture,
+                                        traffic_type, ip_type, etype)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0, frags=False):
+                              ports=0, frags=False, etype=-1):
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
                                           traffic_type, ip_type, proto, ports,
-                                          frags)
+                                          frags, True, etype)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
@@ -1306,6 +1328,70 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0113")
 
+    def test_0300_tcp_permit_v4_etype_aaaa(self):
+        """ permit TCPv4, send 0xAAAA etype
+        """
+        self.logger.info("ACLP_TEST_START_0300")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        # Traffic should still pass also for an odd ethertype
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+
+        self.logger.info("ACLP_TEST_FINISH_0300")
+
+    def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
+        """
+        self.logger.info("ACLP_TEST_START_0305")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+
+        # The IPv4 traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        # The oddball ethertype should be blocked
+        self.run_verify_negat_test(self.IP, self.IPV4,
+                                   self.proto[self.IP][self.TCP],
+                                   0, False, 0xaaaa)
+
+        # The whitelisted traffic, on the other hand, should pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0x0bbb)
+
+        # remove the whitelist, the previously blocked 0xAAAA should pass now
+        self.etype_whitelist([], 0)
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+
+        self.logger.info("ACLP_TEST_FINISH_0305")
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)