acl-plugin: make the IPv4/IPv6 non-first fragment handling in line with ACL (VPP...
[vpp.git] / test / test_acl_plugin.py
index 2bbebe8..b051d45 100644 (file)
@@ -9,6 +9,7 @@ from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6ExtHdrFragment
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
@@ -229,7 +230,7 @@ class TestACLplugin(VppTestCase):
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0):
+                      proto=-1, ports=0, fragments=False):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -263,8 +264,14 @@ class TestACLplugin(VppTestCase):
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
+                        if fragments:
+                            p /= IPv6ExtHdrFragment(offset=64, m=1)
                     else:
                     else:
-                        p /= IP(src=src_host.ip4, dst=dst_host.ip4)
+                        if fragments:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4,
+                                    flags=1, frag=64)
+                        else:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4)
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
@@ -381,14 +388,16 @@ class TestACLplugin(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
+    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
+                        frags=False):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -408,13 +417,14 @@ class TestACLplugin(VppTestCase):
                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0):
+                              ports=0, frags=False):
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
@@ -1011,5 +1021,32 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
+    def test_0021_udp_deny_port_verify_fragment_deny(self):
+        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
+        """
+        self.logger.info("ACLP_TEST_START_0021")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP], port, True)
+
+        self.logger.info("ACLP_TEST_FINISH_0021")
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)