X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin.py;h=f3ab38127166273f38c3da2e17ae429253637cd3;hb=34eb5d423d8d5851d0e901114d359c91acf05c4e;hp=361ced14c21aee7d1e086e4a5bb6c5e6c3a55e77;hpb=27cadd23b6c220e73552fa7b3fe61e5874d07cec;p=vpp.git diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 361ced14c21..f3ab3812716 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -13,6 +13,8 @@ from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp +from vpp_lo_interface import VppLoInterface + class TestACLplugin(VppTestCase): """ ACL plugin Test Case """ @@ -247,6 +249,26 @@ class TestACLplugin(VppTestCase): acls=[reply.acl_index]) return + def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) + # Apply a ACL on the interface as inbound + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return + + def etype_whitelist(self, whitelist, n_input): + # Apply whitelists on all the interfaces + for i in self.pg_interfaces: + # checkstyle can't read long names. Help them. + fun = self.vapi.acl_interface_set_etype_whitelist + fun(sw_if_index=i.sw_if_index, n_input=n_input, + whitelist=whitelist) + return + def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] if p == 'UDP': @@ -268,7 +290,8 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0, fragments=False, pkt_raw=True): + proto=-1, ports=0, fragments=False, + pkt_raw=True, etype=-1): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -300,6 +323,10 @@ class TestACLplugin(VppTestCase): pkt_info.proto = proto payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) + if etype > 0: + p = Ether(dst=dst_host.mac, + src=src_host.mac, + type=etype) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) if fragments: @@ -328,7 +355,8 @@ class TestACLplugin(VppTestCase): pkts.append(p) return pkts - def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0): + def verify_capture(self, pg_if, capture, + traffic_type=0, ip_type=0, etype=-1): """ Verify captured input packet stream for defined interface. @@ -341,6 +369,12 @@ class TestACLplugin(VppTestCase): last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: + if etype > 0: + if packet[Ether].type != etype: + self.logger.error(ppp("Unexpected ethertype in packet:", + packet)) + else: + continue try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: @@ -429,7 +463,7 @@ class TestACLplugin(VppTestCase): self.pg_start() def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, - frags=False, pkt_raw=True): + frags=False, pkt_raw=True, etype=-1): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 @@ -437,7 +471,7 @@ class TestACLplugin(VppTestCase): if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags, pkt_raw) + frags, pkt_raw, etype) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -454,17 +488,18 @@ class TestACLplugin(VppTestCase): capture = dst_if.get_capture(pkts_cnt) self.logger.info("Verifying capture on interface %s" % dst_if.name) - self.verify_capture(dst_if, capture, traffic_type, ip_type) + self.verify_capture(dst_if, capture, + traffic_type, ip_type, etype) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0, frags=False): + ports=0, frags=False, etype=-1): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags) + frags, True, etype) if len(pkts) > 0: i.add_stream(pkts) @@ -1058,7 +1093,8 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0020") def test_0021_udp_deny_port_verify_fragment_deny(self): - """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment + blocked """ self.logger.info("ACLP_TEST_START_0021") @@ -1306,6 +1342,96 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0113") + def test_0300_tcp_permit_v4_etype_aaaa(self): + """ permit TCPv4, send 0xAAAA etype + """ + self.logger.info("ACLP_TEST_START_0300") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + # Traffic should still pass also for an odd ethertype + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + + self.logger.info("ACLP_TEST_FINISH_0300") + + def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB + """ + self.logger.info("ACLP_TEST_START_0305") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The IPv4 traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + # The oddball ethertype should be blocked + self.run_verify_negat_test(self.IP, self.IPV4, + self.proto[self.IP][self.TCP], + 0, False, 0xaaaa) + + # The whitelisted traffic, on the other hand, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0x0bbb) + + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0) + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + + self.logger.info("ACLP_TEST_FINISH_0305") + + def test_0315_del_intf(self): + """ apply an acl and delete the interface + """ + self.logger.info("ACLP_TEST_START_0315") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # create an interface + intf = [] + intf.append(VppLoInterface(self)) + + # Apply rules + self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index) + + # Remove the interface + intf[0].remove_vpp_config() + + self.logger.info("ACLP_TEST_FINISH_0315") if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)