ACL-plugin add "replace" semantics for adding a new MacIP acl
[vpp.git] / test / test_acl_plugin.py
index 5267cd2..3a180d2 100644 (file)
@@ -42,6 +42,7 @@ class TestACLplugin(VppTestCase):
     # port ranges
     PORTS_ALL = -1
     PORTS_RANGE = 0
+    PORTS_RANGE_2 = 1
     udp_sport_from = 10
     udp_sport_to = udp_sport_from + 5
     udp_dport_from = 20000
@@ -51,11 +52,27 @@ class TestACLplugin(VppTestCase):
     tcp_dport_from = 40000
     tcp_dport_to = tcp_dport_from + 5000
 
+    udp_sport_from_2 = 90
+    udp_sport_to_2 = udp_sport_from_2 + 5
+    udp_dport_from_2 = 30000
+    udp_dport_to_2 = udp_dport_from_2 + 5000
+    tcp_sport_from_2 = 130
+    tcp_sport_to_2 = tcp_sport_from_2 + 5
+    tcp_dport_from_2 = 20000
+    tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
     icmp4_type = 8  # echo request
     icmp4_code = 3
     icmp6_type = 128  # echo request
     icmp6_code = 3
 
+    icmp4_type_2 = 8
+    icmp4_code_from_2 = 5
+    icmp4_code_to_2 = 20
+    icmp6_type_2 = 128
+    icmp6_code_from_2 = 8
+    icmp6_code_to_2 = 42
+
     # Test variables
     bd_id = 1
 
@@ -121,6 +138,9 @@ class TestACLplugin(VppTestCase):
         super(TestACLplugin, self).tearDown()
         if not self.vpp_dead:
             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+            self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
+            self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
+            self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
                                              % self.bd_id))
 
@@ -179,6 +199,27 @@ class TestACLplugin(VppTestCase):
                 sport_to = self.udp_sport_to
                 dport_from = self.udp_dport_from
                 dport_to = self.udp_dport_to
+        elif ports == self.PORTS_RANGE_2:
+            if proto == 1:
+                sport_from = self.icmp4_type_2
+                sport_to = self.icmp4_type_2
+                dport_from = self.icmp4_code_from_2
+                dport_to = self.icmp4_code_to_2
+            elif proto == 58:
+                sport_from = self.icmp6_type_2
+                sport_to = self.icmp6_type_2
+                dport_from = self.icmp6_code_from_2
+                dport_to = self.icmp6_code_to_2
+            elif proto == self.proto[self.IP][self.TCP]:
+                sport_from = self.tcp_sport_from_2
+                sport_to = self.tcp_sport_to_2
+                dport_from = self.tcp_dport_from_2
+                dport_to = self.tcp_dport_to_2
+            elif proto == self.proto[self.IP][self.UDP]:
+                sport_from = self.udp_sport_from_2
+                sport_to = self.udp_sport_to_2
+                dport_from = self.udp_dport_from_2
+                dport_to = self.udp_dport_to_2
         else:
             sport_from = ports
             sport_to = ports
@@ -1120,5 +1161,158 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0023")
 
+    def test_0108_tcp_permit_v4(self):
+        """ permit TCPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0108")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0108")
+
+    def test_0109_tcp_permit_v6(self):
+        """ permit TCPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0109")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0109")
+
+    def test_0110_udp_permit_v4(self):
+        """ permit UDPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0110")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0110")
+
+    def test_0111_udp_permit_v6(self):
+        """ permit UDPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0111")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0111")
+
+    def test_0112_tcp_deny(self):
+        """ deny TCPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0112")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 tcp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0112")
+
+    def test_0113_udp_deny(self):
+        """ deny UDPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0113")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0113")
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)