X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin.py;h=3a180d21961515fdf14b1f6f706b81985d0b7fff;hb=bb7f0f644ac861cf8f57d2445ce516ed378ced4c;hp=2bbebe85b1b6c1a0ecfaa558105bcfa73e04548e;hpb=59dda065bb92d1588824483ed5e7cf9adb228d3a;p=vpp.git diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 2bbebe85b1b..3a180d21961 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -9,6 +9,7 @@ from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp @@ -41,6 +42,7 @@ class TestACLplugin(VppTestCase): # port ranges PORTS_ALL = -1 PORTS_RANGE = 0 + PORTS_RANGE_2 = 1 udp_sport_from = 10 udp_sport_to = udp_sport_from + 5 udp_dport_from = 20000 @@ -50,11 +52,27 @@ class TestACLplugin(VppTestCase): tcp_dport_from = 40000 tcp_dport_to = tcp_dport_from + 5000 + udp_sport_from_2 = 90 + udp_sport_to_2 = udp_sport_from_2 + 5 + udp_dport_from_2 = 30000 + udp_dport_to_2 = udp_dport_from_2 + 5000 + tcp_sport_from_2 = 130 + tcp_sport_to_2 = tcp_sport_from_2 + 5 + tcp_dport_from_2 = 20000 + tcp_dport_to_2 = tcp_dport_from_2 + 5000 + icmp4_type = 8 # echo request icmp4_code = 3 icmp6_type = 128 # echo request icmp6_code = 3 + icmp4_type_2 = 8 + icmp4_code_from_2 = 5 + icmp4_code_to_2 = 20 + icmp6_type_2 = 128 + icmp6_code_from_2 = 8 + icmp6_code_to_2 = 42 + # Test variables bd_id = 1 @@ -120,6 +138,9 @@ class TestACLplugin(VppTestCase): super(TestACLplugin, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.ppcli("show l2fib verbose")) + self.logger.info(self.vapi.ppcli("show acl-plugin acl")) + self.logger.info(self.vapi.ppcli("show acl-plugin interface")) + self.logger.info(self.vapi.ppcli("show acl-plugin tables")) self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id)) @@ -178,6 +199,27 @@ class TestACLplugin(VppTestCase): sport_to = self.udp_sport_to dport_from = self.udp_dport_from dport_to = self.udp_dport_to + elif ports == self.PORTS_RANGE_2: + if proto == 1: + sport_from = self.icmp4_type_2 + sport_to = self.icmp4_type_2 + dport_from = self.icmp4_code_from_2 + dport_to = self.icmp4_code_to_2 + elif proto == 58: + sport_from = self.icmp6_type_2 + sport_to = self.icmp6_type_2 + dport_from = self.icmp6_code_from_2 + dport_to = self.icmp6_code_to_2 + elif proto == self.proto[self.IP][self.TCP]: + sport_from = self.tcp_sport_from_2 + sport_to = self.tcp_sport_to_2 + dport_from = self.tcp_dport_from_2 + dport_to = self.tcp_dport_to_2 + elif proto == self.proto[self.IP][self.UDP]: + sport_from = self.udp_sport_from_2 + sport_to = self.udp_sport_to_2 + dport_from = self.udp_dport_from_2 + dport_to = self.udp_dport_to_2 else: sport_from = ports sport_to = ports @@ -229,7 +271,7 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0): + proto=-1, ports=0, fragments=False, pkt_raw=True): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -263,8 +305,14 @@ class TestACLplugin(VppTestCase): p = Ether(dst=dst_host.mac, src=src_host.mac) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) + if fragments: + p /= IPv6ExtHdrFragment(offset=64, m=1) else: - p /= IP(src=src_host.ip4, dst=dst_host.ip4) + if fragments: + p /= IP(src=src_host.ip4, dst=dst_host.ip4, + flags=1, frag=64) + else: + p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: p /= ICMPv6EchoRequest(type=self.icmp6_type, @@ -274,10 +322,12 @@ class TestACLplugin(VppTestCase): code=self.icmp4_code) else: p /= self.create_upper_layer(i, pkt_info.proto, ports) - p /= Raw(payload) - pkt_info.data = p.copy() - size = random.choice(packet_sizes) - self.extend_packet(p, size) + if pkt_raw: + p /= Raw(payload) + pkt_info.data = p.copy() + if pkt_raw: + size = random.choice(packet_sizes) + self.extend_packet(p, size) pkts.append(p) return pkts @@ -381,14 +431,16 @@ class TestACLplugin(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0): + def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, + frags=False, pkt_raw=True): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags, pkt_raw) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -408,13 +460,14 @@ class TestACLplugin(VppTestCase): self.verify_capture(dst_if, capture, traffic_type, ip_type) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0): + ports=0, frags=False): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags) if len(pkts) > 0: i.add_stream(pkts) @@ -1011,5 +1064,255 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0020") + def test_0021_udp_deny_port_verify_fragment_deny(self): + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked + """ + self.logger.info("ACLP_TEST_START_0021") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, port, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP], port, True) + + self.logger.info("ACLP_TEST_FINISH_0021") + + def test_0022_zero_length_udp_ipv4(self): + """ VPP-687 zero length udp ipv4 packet""" + self.logger.info("ACLP_TEST_START_0022") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append( + self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit empty udp ip4 " + str(port)) + + # Traffic should still pass + # Create incoming packet streams for packet-generator interfaces + pkts_cnt = 0 + pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, + self.IP, self.IPV4, + self.proto[self.IP][self.UDP], port, + False, False) + if len(pkts) > 0: + self.pg0.add_stream(pkts) + pkts_cnt += len(pkts) + + # Enable packet capture and start packet sendingself.IPV + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.pg1.get_capture(pkts_cnt) + + self.logger.info("ACLP_TEST_FINISH_0022") + + def test_0023_zero_length_udp_ipv6(self): + """ VPP-687 zero length udp ipv6 packet""" + self.logger.info("ACLP_TEST_START_0023") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.PERMIT, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit empty udp ip6 "+str(port)) + + # Traffic should still pass + # Create incoming packet streams for packet-generator interfaces + pkts_cnt = 0 + pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, + self.IP, self.IPV6, + self.proto[self.IP][self.UDP], port, + False, False) + if len(pkts) > 0: + self.pg0.add_stream(pkts) + pkts_cnt += len(pkts) + + # Enable packet capture and start packet sendingself.IPV + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # Verify outgoing packet streams per packet-generator interface + self.pg1.get_capture(pkts_cnt) + + self.logger.info("ACLP_TEST_FINISH_0023") + + def test_0108_tcp_permit_v4(self): + """ permit TCPv4 + non-match range + """ + self.logger.info("ACLP_TEST_START_0108") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0108") + + def test_0109_tcp_permit_v6(self): + """ permit TCPv6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0109") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ip6 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0109") + + def test_0110_udp_permit_v4(self): + """ permit UDPv4 + non-match range + """ + self.logger.info("ACLP_TEST_START_0110") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ipv4 udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0110") + + def test_0111_udp_permit_v6(self): + """ permit UDPv6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0111") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "permit ip6 udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0111") + + def test_0112_tcp_deny(self): + """ deny TCPv4/v6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0112") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 tcp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0112") + + def test_0113_udp_deny(self): + """ deny UDPv4/v6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0113") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 udp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0113") + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)