VOM: mroutes
[vpp.git] / test / test_acl_plugin.py
index 3a180d2..5ccfa39 100644 (file)
@@ -13,6 +13,8 @@ from scapy.layers.inet6 import IPv6ExtHdrFragment
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
+from vpp_lo_interface import VppLoInterface
+
 
 class TestACLplugin(VppTestCase):
     """ ACL plugin Test Case """
@@ -85,8 +87,6 @@ class TestACLplugin(VppTestCase):
         """
         super(TestACLplugin, cls).setUpClass()
 
-        random.seed()
-
         try:
             # Create 2 pg interfaces
             cls.create_pg_interfaces(range(2))
@@ -122,6 +122,23 @@ class TestACLplugin(VppTestCase):
 
             # warm-up the mac address tables
             # self.warmup_test()
+            count = 16
+            start = 0
+            n_int = len(cls.pg_interfaces)
+            macs_per_if = count / n_int
+            i = -1
+            for pg_if in cls.pg_interfaces:
+                i += 1
+                start_nr = macs_per_if * i + start
+                end_nr = count + start if i == (n_int - 1) \
+                    else macs_per_if * (i + 1) + start
+                hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
+                for j in range(start_nr, end_nr):
+                    host = Host(
+                        "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
+                        "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
+                        "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+                    hosts.append(host)
 
         except Exception:
             super(TestACLplugin, cls).tearDownClass()
@@ -137,6 +154,14 @@ class TestACLplugin(VppTestCase):
         """
         super(TestACLplugin, self).tearDown()
         if not self.vpp_dead:
+            cli = "show vlib graph l2-input-feat-arc"
+            self.logger.info(self.vapi.ppcli(cli))
+            cli = "show vlib graph l2-input-feat-arc-end"
+            self.logger.info(self.vapi.ppcli(cli))
+            cli = "show vlib graph l2-output-feat-arc"
+            self.logger.info(self.vapi.ppcli(cli))
+            cli = "show vlib graph l2-output-feat-arc-end"
+            self.logger.info(self.vapi.ppcli(cli))
             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
@@ -144,30 +169,6 @@ class TestACLplugin(VppTestCase):
             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
                                              % self.bd_id))
 
-    def create_hosts(self, count, start=0):
-        """
-        Create required number of host MAC addresses and distribute them among
-        interfaces. Create host IPv4 address for every host MAC address.
-
-        :param int count: Number of hosts to create MAC/IPv4 addresses for.
-        :param int start: Number to start numbering from.
-        """
-        n_int = len(self.pg_interfaces)
-        macs_per_if = count / n_int
-        i = -1
-        for pg_if in self.pg_interfaces:
-            i += 1
-            start_nr = macs_per_if * i + start
-            end_nr = count + start if i == (n_int - 1) \
-                else macs_per_if * (i + 1) + start
-            hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
-            for j in range(start_nr, end_nr):
-                host = Host(
-                    "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
-                    "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
-                    "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
-                hosts.append(host)
-
     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
                     s_prefix=0, s_ip='\x00\x00\x00\x00',
                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
@@ -238,16 +239,35 @@ class TestACLplugin(VppTestCase):
         return rule
 
     def apply_rules(self, rules, tag=''):
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
-                                         count=len(rules),
-                                         tag=tag)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+                                          tag=tag)
         self.logger.info("Dumped ACL: " + str(
-            self.api_acl_dump(reply.acl_index)))
+            self.vapi.acl_dump(reply.acl_index)))
         # Apply a ACL on the interface as inbound
         for i in self.pg_interfaces:
-            self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
-                                                count=1, n_input=1,
-                                                acls=[reply.acl_index])
+            self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
+                                                 n_input=1,
+                                                 acls=[reply.acl_index])
+        return
+
+    def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+                                          tag=tag)
+        self.logger.info("Dumped ACL: " + str(
+            self.vapi.acl_dump(reply.acl_index)))
+        # Apply a ACL on the interface as inbound
+        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+                                             n_input=1,
+                                             acls=[reply.acl_index])
+        return
+
+    def etype_whitelist(self, whitelist, n_input):
+        # Apply whitelists on all the interfaces
+        for i in self.pg_interfaces:
+            # checkstyle can't read long names. Help them.
+            fun = self.vapi.acl_interface_set_etype_whitelist
+            fun(sw_if_index=i.sw_if_index, n_input=n_input,
+                whitelist=whitelist)
         return
 
     def create_upper_layer(self, packet_index, proto, ports=0):
@@ -271,7 +291,8 @@ class TestACLplugin(VppTestCase):
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0, fragments=False, pkt_raw=True):
+                      proto=-1, ports=0, fragments=False,
+                      pkt_raw=True, etype=-1):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -303,6 +324,10 @@ class TestACLplugin(VppTestCase):
                         pkt_info.proto = proto
                     payload = self.info_to_payload(pkt_info)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
+                    if etype > 0:
+                        p = Ether(dst=dst_host.mac,
+                                  src=src_host.mac,
+                                  type=etype)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
                         if fragments:
@@ -331,7 +356,8 @@ class TestACLplugin(VppTestCase):
                     pkts.append(p)
         return pkts
 
-    def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
+    def verify_capture(self, pg_if, capture,
+                       traffic_type=0, ip_type=0, etype=-1):
         """
         Verify captured input packet stream for defined interface.
 
@@ -344,6 +370,12 @@ class TestACLplugin(VppTestCase):
             last_info[i.sw_if_index] = None
         dst_sw_if_index = pg_if.sw_if_index
         for packet in capture:
+            if etype > 0:
+                if packet[Ether].type != etype:
+                    self.logger.error(ppp("Unexpected ethertype in packet:",
+                                          packet))
+                else:
+                    continue
             try:
                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
                 if traffic_type == self.ICMP and ip_type == self.IPV6:
@@ -432,7 +464,7 @@ class TestACLplugin(VppTestCase):
         self.pg_start()
 
     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
-                        frags=False, pkt_raw=True):
+                        frags=False, pkt_raw=True, etype=-1):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
@@ -440,7 +472,7 @@ class TestACLplugin(VppTestCase):
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
                                           traffic_type, ip_type, proto, ports,
-                                          frags, pkt_raw)
+                                          frags, pkt_raw, etype)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -448,6 +480,7 @@ class TestACLplugin(VppTestCase):
         # Enable packet capture and start packet sendingself.IPV
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
+        self.logger.info("sent packets count: %d" % pkts_cnt)
 
         # Verify
         # Verify outgoing packet streams per packet-generator interface
@@ -457,23 +490,27 @@ class TestACLplugin(VppTestCase):
                     capture = dst_if.get_capture(pkts_cnt)
                     self.logger.info("Verifying capture on interface %s" %
                                      dst_if.name)
-                    self.verify_capture(dst_if, capture, traffic_type, ip_type)
+                    self.verify_capture(dst_if, capture,
+                                        traffic_type, ip_type, etype)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0, frags=False):
+                              ports=0, frags=False, etype=-1):
         # Test
+        pkts_cnt = 0
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
                                           traffic_type, ip_type, proto, ports,
-                                          frags)
+                                          frags, True, etype)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
+                    pkts_cnt += len(pkts)
 
         # Enable packet capture and start packet sending
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
+        self.logger.info("sent packets count: %d" % pkts_cnt)
 
         # Verify
         # Verify outgoing packet streams per packet-generator interface
@@ -485,42 +522,9 @@ class TestACLplugin(VppTestCase):
                     capture = dst_if.get_capture(0)
                     self.assertEqual(len(capture), 0)
 
-    def api_acl_add_replace(self, acl_index, r, count, tag='',
-                            expected_retval=0):
-        """Add/replace an ACL
-
-        :param int acl_index: ACL index to replace,
-        4294967295 to create new ACL.
-        :param acl_rule r: ACL rules array.
-        :param str tag: symbolic tag (description) for this ACL.
-        :param int count: number of rules.
-        """
-        return self.vapi.api(self.vapi.papi.acl_add_replace,
-                             {'acl_index': acl_index,
-                              'r': r,
-                              'count': count,
-                              'tag': tag},
-                             expected_retval=expected_retval)
-
-    def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
-                                       expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
-                             {'sw_if_index': sw_if_index,
-                              'count': count,
-                              'n_input': n_input,
-                              'acls': acls},
-                             expected_retval=expected_retval)
-
-    def api_acl_dump(self, acl_index, expected_retval=0):
-        return self.vapi.api(self.vapi.papi.acl_dump,
-                             {'acl_index': acl_index},
-                             expected_retval=expected_retval)
-
     def test_0000_warmup_test(self):
         """ ACL plugin version check; learn MACs
         """
-        self.create_hosts(16)
-        self.run_traffic_no_check()
         reply = self.vapi.papi.acl_plugin_get_version()
         self.assertEqual(reply.major, 1)
         self.logger.info("Working with ACL plugin version: %d.%d" % (
@@ -529,7 +533,7 @@ class TestACLplugin(VppTestCase):
         # self.assertEqual(reply.minor, 0)
 
     def test_0001_acl_create(self):
-        """ ACL create test
+        """ ACL create/delete test
         """
 
         self.logger.info("ACLP_TEST_START_0001")
@@ -544,12 +548,13 @@ class TestACLplugin(VppTestCase):
               'dst_ip_addr': '\x00\x00\x00\x00',
               'dst_ip_prefix_len': 0}]
         # Test 1: add a new ACL
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
-                                         count=len(r), tag="permit 1234")
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
+                                          tag="permit 1234")
         self.assertEqual(reply.retval, 0)
         # The very first ACL gets #0
         self.assertEqual(reply.acl_index, 0)
-        rr = self.api_acl_dump(reply.acl_index)
+        first_acl = reply.acl_index
+        rr = self.vapi.acl_dump(reply.acl_index)
         self.logger.info("Dumped ACL: " + str(rr))
         self.assertEqual(len(rr), 1)
         # We should have the same number of ACL entries as we had asked
@@ -563,7 +568,7 @@ class TestACLplugin(VppTestCase):
                                  r[i_rule][rule_key])
 
         # Add a deny-1234 ACL
-        r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
+        r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
                    'srcport_or_icmptype_first': 1234,
                    'srcport_or_icmptype_last': 1235,
                    'src_ip_prefix_len': 0,
@@ -580,21 +585,48 @@ class TestACLplugin(VppTestCase):
                    'dstport_or_icmpcode_first': 0,
                    'dstport_or_icmpcode_last': 0,
                    'dst_ip_addr': '\x00\x00\x00\x00',
-                   'dst_ip_prefix_len': 0})
+                   'dst_ip_prefix_len': 0}]
 
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
-                                         count=len(r_deny),
-                                         tag="deny 1234;permit all")
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
+                                          tag="deny 1234;permit all")
         self.assertEqual(reply.retval, 0)
         # The second ACL gets #1
         self.assertEqual(reply.acl_index, 1)
+        second_acl = reply.acl_index
 
         # Test 2: try to modify a nonexistent ACL
-        reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
-                                         tag="FFFF:FFFF", expected_retval=-1)
-        self.assertEqual(reply.retval, -1)
+        reply = self.vapi.acl_add_replace(acl_index=432, r=r,
+                                          tag="FFFF:FFFF", expected_retval=-6)
+        self.assertEqual(reply.retval, -6)
         # The ACL number should pass through
         self.assertEqual(reply.acl_index, 432)
+        # apply an ACL on an interface inbound, try to delete ACL, must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=1,
+                                             acls=[first_acl])
+        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
+        # Unapply an ACL and then try to delete it - must be ok
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[])
+        reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
+
+        # apply an ACL on an interface outbound, try to delete ACL, must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[second_acl])
+        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
+        # Unapply the ACL and then try to delete it - must be ok
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=0,
+                                             acls=[])
+        reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+
+        # try to apply a nonexistent ACL - must fail
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=1,
+                                             acls=[first_acl],
+                                             expected_retval=-6)
 
         self.logger.info("ACLP_TEST_FINISH_0001")
 
@@ -881,9 +913,8 @@ class TestACLplugin(VppTestCase):
         for i in range(len(r)):
             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
 
-        reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
-                                         count=len(rules))
-        result = self.api_acl_dump(reply.acl_index)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
+        result = self.vapi.acl_dump(reply.acl_index)
 
         i = 0
         for drules in result:
@@ -1065,7 +1096,8 @@ class TestACLplugin(VppTestCase):
         self.logger.info("ACLP_TEST_FINISH_0020")
 
     def test_0021_udp_deny_port_verify_fragment_deny(self):
-        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
+        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
+        blocked
         """
         self.logger.info("ACLP_TEST_START_0021")
 
@@ -1313,6 +1345,140 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0113")
 
+    def test_0300_tcp_permit_v4_etype_aaaa(self):
+        """ permit TCPv4, send 0xAAAA etype
+        """
+        self.logger.info("ACLP_TEST_START_0300")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # Traffic should still pass also for an odd ethertype
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+        self.logger.info("ACLP_TEST_FINISH_0300")
+
+    def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
+        """
+        self.logger.info("ACLP_TEST_START_0305")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+
+        # The oddball ethertype should be blocked
+        self.run_verify_negat_test(self.IP, self.IPV4,
+                                   self.proto[self.IP][self.TCP],
+                                   0, False, 0xaaaa)
+
+        # remove the whitelist
+        self.etype_whitelist([], 0)
+
+        self.logger.info("ACLP_TEST_FINISH_0305")
+
+    def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
+        """
+        self.logger.info("ACLP_TEST_START_0306")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+
+        # The whitelisted traffic, should pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0x0bbb)
+
+        # remove the whitelist, the previously blocked 0xAAAA should pass now
+        self.etype_whitelist([], 0)
+
+        self.logger.info("ACLP_TEST_FINISH_0306")
+
+    def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
+        """
+        self.logger.info("ACLP_TEST_START_0307")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+        # remove the whitelist, the previously blocked 0xAAAA should pass now
+        self.etype_whitelist([], 0)
+
+        # The whitelisted traffic, should pass
+        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+                             0, False, True, 0xaaaa)
+
+        self.logger.info("ACLP_TEST_FINISH_0306")
+
+    def test_0315_del_intf(self):
+        """ apply an acl and delete the interface
+        """
+        self.logger.info("ACLP_TEST_START_0315")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # create an interface
+        intf = []
+        intf.append(VppLoInterface(self))
+
+        # Apply rules
+        self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
+
+        # Remove the interface
+        intf[0].remove_vpp_config()
+
+        self.logger.info("ACLP_TEST_FINISH_0315")
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)