From 7ff7453af3ecb9819765a672f4b13417e225451a Mon Sep 17 00:00:00 2001 From: Andrew Yourtchenko Date: Sat, 6 Oct 2018 09:24:28 +0200 Subject: [PATCH 1/1] acl-plugin: make each test in test_acl_plugin runnable separately And improve the robustness of the ethertype whitelist test coverage Change-Id: I64fe3a25208dbc619ae5cd6404f6122e69394a38 Signed-off-by: Andrew Yourtchenko --- test/test_acl_plugin.py | 111 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 36 deletions(-) diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index f3ab3812716..fd45b4a27e7 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -122,6 +122,23 @@ class TestACLplugin(VppTestCase): # warm-up the mac address tables # self.warmup_test() + count = 16 + start = 0 + n_int = len(cls.pg_interfaces) + macs_per_if = count / n_int + i = -1 + for pg_if in cls.pg_interfaces: + i += 1 + start_nr = macs_per_if * i + start + end_nr = count + start if i == (n_int - 1) \ + else macs_per_if * (i + 1) + start + hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index] + for j in range(start_nr, end_nr): + host = Host( + "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), + "172.17.1%02x.%u" % (pg_if.sw_if_index, j), + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + hosts.append(host) except Exception: super(TestACLplugin, cls).tearDownClass() @@ -144,30 +161,6 @@ class TestACLplugin(VppTestCase): self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id)) - def create_hosts(self, count, start=0): - """ - Create required number of host MAC addresses and distribute them among - interfaces. Create host IPv4 address for every host MAC address. - - :param int count: Number of hosts to create MAC/IPv4 addresses for. - :param int start: Number to start numbering from. - """ - n_int = len(self.pg_interfaces) - macs_per_if = count / n_int - i = -1 - for pg_if in self.pg_interfaces: - i += 1 - start_nr = macs_per_if * i + start - end_nr = count + start if i == (n_int - 1) \ - else macs_per_if * (i + 1) + start - hosts = self.hosts_by_pg_idx[pg_if.sw_if_index] - for j in range(start_nr, end_nr): - host = Host( - "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), - "172.17.1%02x.%u" % (pg_if.sw_if_index, j), - "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) - hosts.append(host) - def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, s_prefix=0, s_ip='\x00\x00\x00\x00', d_prefix=0, d_ip='\x00\x00\x00\x00'): @@ -479,6 +472,7 @@ class TestACLplugin(VppTestCase): # Enable packet capture and start packet sendingself.IPV self.pg_enable_capture(self.pg_interfaces) self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) # Verify # Verify outgoing packet streams per packet-generator interface @@ -494,6 +488,7 @@ class TestACLplugin(VppTestCase): def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1): # Test + pkts_cnt = 0 self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): @@ -502,10 +497,12 @@ class TestACLplugin(VppTestCase): frags, True, etype) if len(pkts) > 0: i.add_stream(pkts) + pkts_cnt += len(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) # Verify # Verify outgoing packet streams per packet-generator interface @@ -520,8 +517,6 @@ class TestACLplugin(VppTestCase): def test_0000_warmup_test(self): """ ACL plugin version check; learn MACs """ - self.create_hosts(16) - self.run_traffic_no_check() reply = self.vapi.papi.acl_plugin_get_version() self.assertEqual(reply.major, 1) self.logger.info("Working with ACL plugin version: %d.%d" % ( @@ -1359,17 +1354,13 @@ class TestACLplugin(VppTestCase): # Apply rules self.apply_rules(rules, "permit ipv4 tcp") - # Traffic should still pass - self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) - # Traffic should still pass also for an odd ethertype self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xaaaa) - self.logger.info("ACLP_TEST_FINISH_0300") def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self): - """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked """ self.logger.info("ACLP_TEST_START_0305") @@ -1388,24 +1379,72 @@ class TestACLplugin(VppTestCase): # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) - # The IPv4 traffic should still pass - self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) - # The oddball ethertype should be blocked self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xaaaa) - # The whitelisted traffic, on the other hand, should pass + # remove the whitelist + self.etype_whitelist([], 0) + + self.logger.info("ACLP_TEST_FINISH_0305") + + def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass + """ + self.logger.info("ACLP_TEST_START_0306") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The whitelisted traffic, should pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0bbb) # remove the whitelist, the previously blocked 0xAAAA should pass now self.etype_whitelist([], 0) + + self.logger.info("ACLP_TEST_FINISH_0306") + + def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass + """ + self.logger.info("ACLP_TEST_START_0307") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0) + + # The whitelisted traffic, should pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xaaaa) - self.logger.info("ACLP_TEST_FINISH_0305") + self.logger.info("ACLP_TEST_FINISH_0306") def test_0315_del_intf(self): """ apply an acl and delete the interface -- 2.16.6