VPP-1033: Python API support arbitrary sized input parameters.
[vpp.git] / test / test_acl_plugin.py
index 5f830ea..cd375a2 100644 (file)
@@ -42,6 +42,7 @@ class TestACLplugin(VppTestCase):
     # port ranges
     PORTS_ALL = -1
     PORTS_RANGE = 0
+    PORTS_RANGE_2 = 1
     udp_sport_from = 10
     udp_sport_to = udp_sport_from + 5
     udp_dport_from = 20000
@@ -51,11 +52,27 @@ class TestACLplugin(VppTestCase):
     tcp_dport_from = 40000
     tcp_dport_to = tcp_dport_from + 5000
 
+    udp_sport_from_2 = 90
+    udp_sport_to_2 = udp_sport_from_2 + 5
+    udp_dport_from_2 = 30000
+    udp_dport_to_2 = udp_dport_from_2 + 5000
+    tcp_sport_from_2 = 130
+    tcp_sport_to_2 = tcp_sport_from_2 + 5
+    tcp_dport_from_2 = 20000
+    tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
     icmp4_type = 8  # echo request
     icmp4_code = 3
     icmp6_type = 128  # echo request
     icmp6_code = 3
 
+    icmp4_type_2 = 8
+    icmp4_code_from_2 = 5
+    icmp4_code_to_2 = 20
+    icmp6_type_2 = 128
+    icmp6_code_from_2 = 8
+    icmp6_code_to_2 = 42
+
     # Test variables
     bd_id = 1
 
@@ -182,6 +199,27 @@ class TestACLplugin(VppTestCase):
                 sport_to = self.udp_sport_to
                 dport_from = self.udp_dport_from
                 dport_to = self.udp_dport_to
+        elif ports == self.PORTS_RANGE_2:
+            if proto == 1:
+                sport_from = self.icmp4_type_2
+                sport_to = self.icmp4_type_2
+                dport_from = self.icmp4_code_from_2
+                dport_to = self.icmp4_code_to_2
+            elif proto == 58:
+                sport_from = self.icmp6_type_2
+                sport_to = self.icmp6_type_2
+                dport_from = self.icmp6_code_from_2
+                dport_to = self.icmp6_code_to_2
+            elif proto == self.proto[self.IP][self.TCP]:
+                sport_from = self.tcp_sport_from_2
+                sport_to = self.tcp_sport_to_2
+                dport_from = self.tcp_dport_from_2
+                dport_to = self.tcp_dport_to_2
+            elif proto == self.proto[self.IP][self.UDP]:
+                sport_from = self.udp_sport_from_2
+                sport_to = self.udp_sport_to_2
+                dport_from = self.udp_dport_from_2
+                dport_to = self.udp_dport_to_2
         else:
             sport_from = ports
             sport_to = ports
@@ -200,16 +238,15 @@ class TestACLplugin(VppTestCase):
         return rule
 
     def apply_rules(self, rules, tag=''):
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
-                                         count=len(rules),
-                                         tag=tag)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+                                          tag=tag)
         self.logger.info("Dumped ACL: " + str(
-            self.api_acl_dump(reply.acl_index)))
+            self.vapi.acl_dump(reply.acl_index)))
         # Apply a ACL on the interface as inbound
         for i in self.pg_interfaces:
-            self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
-                                                count=1, n_input=1,
-                                                acls=[reply.acl_index])
+            self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
+                                                 n_input=1,
+                                                 acls=[reply.acl_index])
         return
 
     def create_upper_layer(self, packet_index, proto, ports=0):
@@ -447,37 +484,6 @@ class TestACLplugin(VppTestCase):
                     capture = dst_if.get_capture(0)
                     self.assertEqual(len(capture), 0)
 
-    def api_acl_add_replace(self, acl_index, r, count, tag='',
-                            expected_retval=0):
-        """Add/replace an ACL
-
-        :param int acl_index: ACL index to replace,
-        4294967295 to create new ACL.
-        :param acl_rule r: ACL rules array.
-        :param str tag: symbolic tag (description) for this ACL.
-        :param int count: number of rules.
-        """
-        return self.vapi.api(self.vapi.papi.acl_add_replace,
-                             {'acl_index': acl_index,
-                              'r': r,
-                              'count': count,
-                              'tag': tag},
-                             expected_retval=expected_retval)
-
-    def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
-                                       expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
-                             {'sw_if_index': sw_if_index,
-                              'count': count,
-                              'n_input': n_input,
-                              'acls': acls},
-                             expected_retval=expected_retval)
-
-    def api_acl_dump(self, acl_index, expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_dump,
-                             {'acl_index': acl_index},
-                             expected_retval=expected_retval)
-
     def test_0000_warmup_test(self):
         """ ACL plugin version check; learn MACs
         """
@@ -491,7 +497,7 @@ class TestACLplugin(VppTestCase):
         # self.assertEqual(reply.minor, 0)
 
     def test_0001_acl_create(self):
-        """ ACL create test
+        """ ACL create/delete test
         """
 
         self.logger.info("ACLP_TEST_START_0001")
@@ -506,12 +512,13 @@ class TestACLplugin(VppTestCase):
               'dst_ip_addr': '\x00\x00\x00\x00',
               'dst_ip_prefix_len': 0}]
         # Test 1: add a new ACL
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
-                                         count=len(r), tag="permit 1234")
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
+                                          tag="permit 1234")
         self.assertEqual(reply.retval, 0)
         # The very first ACL gets #0
         self.assertEqual(reply.acl_index, 0)
-        rr = self.api_acl_dump(reply.acl_index)
+        first_acl = reply.acl_index
+        rr = self.vapi.acl_dump(reply.acl_index)
         self.logger.info("Dumped ACL: " + str(rr))
         self.assertEqual(len(rr), 1)
         # We should have the same number of ACL entries as we had asked
@@ -525,7 +532,7 @@ class TestACLplugin(VppTestCase):
                                  r[i_rule][rule_key])
 
         # Add a deny-1234 ACL
-        r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
+        r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
                    'srcport_or_icmptype_first': 1234,
                    'srcport_or_icmptype_last': 1235,
                    'src_ip_prefix_len': 0,
@@ -542,21 +549,48 @@ class TestACLplugin(VppTestCase):
                    'dstport_or_icmpcode_first': 0,
                    'dstport_or_icmpcode_last': 0,
                    'dst_ip_addr': '\x00\x00\x00\x00',
-                   'dst_ip_prefix_len': 0})
+                   'dst_ip_prefix_len': 0}]
 
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
-                                         count=len(r_deny),
-                                         tag="deny 1234;permit all")
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
+                                          tag="deny 1234;permit all")
         self.assertEqual(reply.retval, 0)
         # The second ACL gets #1
         self.assertEqual(reply.acl_index, 1)
+        second_acl = reply.acl_index
 
         # Test 2: try to modify a nonexistent ACL
-        reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
-                                         tag="FFFF:FFFF", expected_retval=-1)
+        reply = self.vapi.acl_add_replace(acl_index=432, r=r,
+                                          tag="FFFF:FFFF", expected_retval=-1)
         self.assertEqual(reply.retval, -1)
         # The ACL number should pass through
         self.assertEqual(reply.acl_index, 432)
+        # apply an ACL on an interface inbound, try to delete ACL, must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=1,
+                                             acls=[first_acl])
+        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-1)
+        # Unapply an ACL and then try to delete it - must be ok
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[])
+        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
+
+        # apply an ACL on an interface outbound, try to delete ACL, must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[second_acl])
+        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-1)
+        # Unapply the ACL and then try to delete it - must be ok
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[])
+        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+
+        # try to apply a nonexistent ACL - must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=1,
+                                             acls=[first_acl],
+                                             expected_retval=-1)
 
         self.logger.info("ACLP_TEST_FINISH_0001")
 
@@ -843,9 +877,8 @@ class TestACLplugin(VppTestCase):
         for i in range(len(r)):
             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
 
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
-                                         count=len(rules))
-        result = self.api_acl_dump(reply.acl_index)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
+        result = self.vapi.acl_dump(reply.acl_index)
 
         i = 0
         for drules in result:
@@ -1123,5 +1156,158 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0023")
 
+    def test_0108_tcp_permit_v4(self):
+        """ permit TCPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0108")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0108")
+
+    def test_0109_tcp_permit_v6(self):
+        """ permit TCPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0109")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 tcp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0109")
+
+    def test_0110_udp_permit_v4(self):
+        """ permit UDPv4 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0110")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0110")
+
+    def test_0111_udp_permit_v6(self):
+        """ permit UDPv6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0111")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ip6 udp")
+
+        # Traffic should still pass
+        self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0111")
+
+    def test_0112_tcp_deny(self):
+        """ deny TCPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0112")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.TCP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 tcp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.TCP])
+
+        self.logger.info("ACLP_TEST_FINISH_0112")
+
+    def test_0113_udp_deny(self):
+        """ deny UDPv4/v6 + non-match range
+        """
+        self.logger.info("ACLP_TEST_START_0113")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_RANGE_2,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+                                      self.proto[self.IP][self.UDP]))
+        # permit ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp")
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP])
+
+        self.logger.info("ACLP_TEST_FINISH_0113")
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)