X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin.py;h=b051d457824f71b9c756a1cc3d20a1696b30f342;hb=d1b05647427c79cfd5322991bbe663fae65f37b5;hp=2bbebe85b1b6c1a0ecfaa558105bcfa73e04548e;hpb=0eb2b16f95c0c43302be79a1c4df8b828ac97e37;p=vpp.git diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 2bbebe85b1b..b051d457824 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -9,6 +9,7 @@ from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp @@ -229,7 +230,7 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0): + proto=-1, ports=0, fragments=False): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -263,8 +264,14 @@ class TestACLplugin(VppTestCase): p = Ether(dst=dst_host.mac, src=src_host.mac) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) + if fragments: + p /= IPv6ExtHdrFragment(offset=64, m=1) else: - p /= IP(src=src_host.ip4, dst=dst_host.ip4) + if fragments: + p /= IP(src=src_host.ip4, dst=dst_host.ip4, + flags=1, frag=64) + else: + p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: p /= ICMPv6EchoRequest(type=self.icmp6_type, @@ -381,14 +388,16 @@ class TestACLplugin(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0): + def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, + frags=False): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -408,13 +417,14 @@ class TestACLplugin(VppTestCase): self.verify_capture(dst_if, capture, traffic_type, ip_type) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0): + ports=0, frags=False): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags) if len(pkts) > 0: i.add_stream(pkts) @@ -1011,5 +1021,32 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0020") + def test_0021_udp_deny_port_verify_fragment_deny(self): + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked + """ + self.logger.info("ACLP_TEST_START_0021") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, port, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP], port, True) + + self.logger.info("ACLP_TEST_FINISH_0021") + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)