acl-plugin: make each test in test_acl_plugin runnable separately 64/15164/3
authorAndrew Yourtchenko <ayourtch@gmail.com>
Sat, 6 Oct 2018 07:24:28 +0000 (09:24 +0200)
committerDave Barach <openvpp@barachs.net>
Sun, 14 Oct 2018 10:49:57 +0000 (10:49 +0000)
And improve the robustness of the ethertype whitelist test coverage

Change-Id: I64fe3a25208dbc619ae5cd6404f6122e69394a38
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
test/test_acl_plugin.py

index f3ab381..fd45b4a 100644 (file)
@@ -122,6 +122,23 @@ class TestACLplugin(VppTestCase):
 
             # warm-up the mac address tables
             # self.warmup_test()
+            count = 16
+            start = 0
+            n_int = len(cls.pg_interfaces)
+            macs_per_if = count / n_int
+            i = -1
+            for pg_if in cls.pg_interfaces:
+                i += 1
+                start_nr = macs_per_if * i + start
+                end_nr = count + start if i == (n_int - 1) \
+                    else macs_per_if * (i + 1) + start
+                hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
+                for j in range(start_nr, end_nr):
+                    host = Host(
+                        "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
+                        "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
+                        "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+                    hosts.append(host)
 
         except Exception:
             super(TestACLplugin, cls).tearDownClass()
@@ -144,30 +161,6 @@ class TestACLplugin(VppTestCase):
             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
                                              % self.bd_id))
 
-    def create_hosts(self, count, start=0):
-        """
-        Create required number of host MAC addresses and distribute them among
-        interfaces. Create host IPv4 address for every host MAC address.
-
-        :param int count: Number of hosts to create MAC/IPv4 addresses for.
-        :param int start: Number to start numbering from.
-        """
-        n_int = len(self.pg_interfaces)
-        macs_per_if = count / n_int
-        i = -1
-        for pg_if in self.pg_interfaces:
-            i += 1
-            start_nr = macs_per_if * i + start
-            end_nr = count + start if i == (n_int - 1) \
-                else macs_per_if * (i + 1) + start
-            hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
-            for j in range(start_nr, end_nr):
-                host = Host(
-                    "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
-                    "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
-                    "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
-                hosts.append(host)
-
     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
                     s_prefix=0, s_ip='\x00\x00\x00\x00',
                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
@@ -479,6 +472,7 @@ class TestACLplugin(VppTestCase):
         # Enable packet capture and start packet sendingself.IPV
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
+        self.logger.info("sent packets count: %d" % pkts_cnt)
 
         # Verify
         # Verify outgoing packet streams per packet-generator interface
@@ -494,6 +488,7 @@ class TestACLplugin(VppTestCase):
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
                               ports=0, frags=False, etype=-1):
         # Test
+        pkts_cnt = 0
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
@@ -502,10 +497,12 @@ class TestACLplugin(VppTestCase):
                                           frags, True, etype)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
+                    pkts_cnt += len(pkts)
 
         # Enable packet capture and start packet sending
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
+        self.logger.info("sent packets count: %d" % pkts_cnt)
 
         # Verify
         # Verify outgoing packet streams per packet-generator interface
@@ -520,8 +517,6 @@ class TestACLplugin(VppTestCase):
     def test_0000_warmup_test(self):
         """ ACL plugin version check; learn MACs
         """
-        self.create_hosts(16)
-        self.run_traffic_no_check()
         reply = self.vapi.papi.acl_plugin_get_version()
         self.assertEqual(reply.major, 1)
         self.logger.info("Working with ACL plugin version: %d.%d" % (
@@ -1359,17 +1354,13 @@ class TestACLplugin(VppTestCase):
         # Apply rules
         self.apply_rules(rules, "permit ipv4 tcp")
 
-        # Traffic should still pass
-        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
-
         # Traffic should still pass also for an odd ethertype
         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
                              0, False, True, 0xaaaa)
-
         self.logger.info("ACLP_TEST_FINISH_0300")
 
     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
-        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
         """
         self.logger.info("ACLP_TEST_START_0305")
 
@@ -1388,24 +1379,72 @@ class TestACLplugin(VppTestCase):
         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
         self.etype_whitelist([0xbbb], 1)
 
-        # The IPv4 traffic should still pass
-        self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
-
         # The oddball ethertype should be blocked
         self.run_verify_negat_test(self.IP, self.IPV4,
                                    self.proto[self.IP][self.TCP],
                                    0, False, 0xaaaa)
 
-        # The whitelisted traffic, on the other hand, should pass
+        # remove the whitelist
+        self.etype_whitelist([], 0)
+
+        self.logger.info("ACLP_TEST_FINISH_0305")
+
+    def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
+        """
+        self.logger.info("ACLP_TEST_START_0306")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+
+        # The whitelisted traffic, should pass
         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
                              0, False, True, 0x0bbb)
 
         # remove the whitelist, the previously blocked 0xAAAA should pass now
         self.etype_whitelist([], 0)
+
+        self.logger.info("ACLP_TEST_FINISH_0306")
+
+    def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
+        """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
+        """
+        self.logger.info("ACLP_TEST_START_0307")
+
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+                     self.proto[self.IP][self.TCP]))
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+                     self.proto[self.IP][self.TCP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit ipv4 tcp")
+
+        # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+        self.etype_whitelist([0xbbb], 1)
+        # remove the whitelist, the previously blocked 0xAAAA should pass now
+        self.etype_whitelist([], 0)
+
+        # The whitelisted traffic, should pass
         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
                              0, False, True, 0xaaaa)
 
-        self.logger.info("ACLP_TEST_FINISH_0305")
+        self.logger.info("ACLP_TEST_FINISH_0306")
 
     def test_0315_del_intf(self):
         """ apply an acl and delete the interface