ACL-plugin does not match UDP next-header, VPP-687
[vpp.git] / test / test_acl_plugin.py
index 2bbebe8..5267cd2 100644 (file)
@@ -9,6 +9,7 @@ from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6ExtHdrFragment
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
@@ -229,7 +230,7 @@ class TestACLplugin(VppTestCase):
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0):
+                      proto=-1, ports=0, fragments=False, pkt_raw=True):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -263,8 +264,14 @@ class TestACLplugin(VppTestCase):
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
+                        if fragments:
+                            p /= IPv6ExtHdrFragment(offset=64, m=1)
                     else:
-                        p /= IP(src=src_host.ip4, dst=dst_host.ip4)
+                        if fragments:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4,
+                                    flags=1, frag=64)
+                        else:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4)
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
@@ -274,10 +281,12 @@ class TestACLplugin(VppTestCase):
                                       code=self.icmp4_code)
                     else:
                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
-                    p /= Raw(payload)
-                    pkt_info.data = p.copy()
-                    size = random.choice(packet_sizes)
-                    self.extend_packet(p, size)
+                    if pkt_raw:
+                        p /= Raw(payload)
+                        pkt_info.data = p.copy()
+                    if pkt_raw:
+                        size = random.choice(packet_sizes)
+                        self.extend_packet(p, size)
                     pkts.append(p)
         return pkts
 
@@ -381,14 +390,16 @@ class TestACLplugin(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
+    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
+                        frags=False, pkt_raw=True):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags, pkt_raw)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -408,13 +419,14 @@ class TestACLplugin(VppTestCase):
                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0):
+                              ports=0, frags=False):
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
@@ -1011,5 +1023,102 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
+    def test_0021_udp_deny_port_verify_fragment_deny(self):
+        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
+        """
+        self.logger.info("ACLP_TEST_START_0021")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP], port, True)
+
+        self.logger.info("ACLP_TEST_FINISH_0021")
+
+    def test_0022_zero_length_udp_ipv4(self):
+        """ VPP-687 zero length udp ipv4 packet"""
+        self.logger.info("ACLP_TEST_START_0022")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(
+            self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip4 " + str(port))
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV4,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0022")
+
+    def test_0023_zero_length_udp_ipv6(self):
+        """ VPP-687 zero length udp ipv6 packet"""
+        self.logger.info("ACLP_TEST_START_0023")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip6 "+str(port))
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV6,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        # Verify outgoing packet streams per packet-generator interface
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0023")
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)