X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin.py;h=5fcf09c5ee75b3b915d558196397f43c81f20c18;hb=c43b3f986476ffb4506b7115898e809a6e34f601;hp=361ced14c21aee7d1e086e4a5bb6c5e6c3a55e77;hpb=be2251b0c5b10a3a556e75c9bfbea96df4799297;p=vpp.git diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 361ced14c21..5fcf09c5ee7 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -247,6 +247,15 @@ class TestACLplugin(VppTestCase): acls=[reply.acl_index]) return + def etype_whitelist(self, whitelist, n_input): + # Apply whitelists on all the interfaces + for i in self.pg_interfaces: + # checkstyle can't read long names. Help them. + fun = self.vapi.acl_interface_set_etype_whitelist + fun(sw_if_index=i.sw_if_index, n_input=n_input, + whitelist=whitelist) + return + def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] if p == 'UDP': @@ -268,7 +277,8 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0, fragments=False, pkt_raw=True): + proto=-1, ports=0, fragments=False, + pkt_raw=True, etype=-1): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -300,6 +310,10 @@ class TestACLplugin(VppTestCase): pkt_info.proto = proto payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) + if etype > 0: + p = Ether(dst=dst_host.mac, + src=src_host.mac, + type=etype) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) if fragments: @@ -328,7 +342,8 @@ class TestACLplugin(VppTestCase): pkts.append(p) return pkts - def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0): + def verify_capture(self, pg_if, capture, + traffic_type=0, ip_type=0, etype=-1): """ Verify captured input packet stream for defined interface. @@ -341,6 +356,12 @@ class TestACLplugin(VppTestCase): last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: + if etype > 0: + if packet[Ether].type != etype: + self.logger.error(ppp("Unexpected ethertype in packet:", + packet)) + else: + continue try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: @@ -429,7 +450,7 @@ class TestACLplugin(VppTestCase): self.pg_start() def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, - frags=False, pkt_raw=True): + frags=False, pkt_raw=True, etype=-1): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 @@ -437,7 +458,7 @@ class TestACLplugin(VppTestCase): if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags, pkt_raw) + frags, pkt_raw, etype) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -454,17 +475,18 @@ class TestACLplugin(VppTestCase): capture = dst_if.get_capture(pkts_cnt) self.logger.info("Verifying capture on interface %s" % dst_if.name) - self.verify_capture(dst_if, capture, traffic_type, ip_type) + self.verify_capture(dst_if, capture, + traffic_type, ip_type, etype) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0, frags=False): + ports=0, frags=False, etype=-1): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags) + frags, True, etype) if len(pkts) > 0: i.add_stream(pkts) @@ -1306,6 +1328,70 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0113") + def test_0300_tcp_permit_v4_etype_aaaa(self): + """ permit TCPv4, send 0xAAAA etype + """ + self.logger.info("ACLP_TEST_START_0300") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + # Traffic should still pass also for an odd ethertype + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + + self.logger.info("ACLP_TEST_FINISH_0300") + + def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB + """ + self.logger.info("ACLP_TEST_START_0305") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The IPv4 traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + # The oddball ethertype should be blocked + self.run_verify_negat_test(self.IP, self.IPV4, + self.proto[self.IP][self.TCP], + 0, False, 0xaaaa) + + # The whitelisted traffic, on the other hand, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0x0bbb) + + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0) + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + + self.logger.info("ACLP_TEST_FINISH_0305") if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)