ACL-plugin does not match UDP next-header, VPP-687
[vpp.git] / test / test_acl_plugin.py
index 2bbebe8..5267cd2 100644 (file)
@@ -9,6 +9,7 @@ from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6ExtHdrFragment
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp
 
@@ -229,7 +230,7 @@ class TestACLplugin(VppTestCase):
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
         return ''
 
     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
-                      proto=-1, ports=0):
+                      proto=-1, ports=0, fragments=False, pkt_raw=True):
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
         """
         Create input packet stream for defined interface using hosts or
         deleted_hosts list.
@@ -263,8 +264,14 @@ class TestACLplugin(VppTestCase):
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
                     p = Ether(dst=dst_host.mac, src=src_host.mac)
                     if pkt_info.ip:
                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
+                        if fragments:
+                            p /= IPv6ExtHdrFragment(offset=64, m=1)
                     else:
                     else:
-                        p /= IP(src=src_host.ip4, dst=dst_host.ip4)
+                        if fragments:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4,
+                                    flags=1, frag=64)
+                        else:
+                            p /= IP(src=src_host.ip4, dst=dst_host.ip4)
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
                     if traffic_type == self.ICMP:
                         if pkt_info.ip:
                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
@@ -274,10 +281,12 @@ class TestACLplugin(VppTestCase):
                                       code=self.icmp4_code)
                     else:
                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
                                       code=self.icmp4_code)
                     else:
                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
-                    p /= Raw(payload)
-                    pkt_info.data = p.copy()
-                    size = random.choice(packet_sizes)
-                    self.extend_packet(p, size)
+                    if pkt_raw:
+                        p /= Raw(payload)
+                        pkt_info.data = p.copy()
+                    if pkt_raw:
+                        size = random.choice(packet_sizes)
+                        self.extend_packet(p, size)
                     pkts.append(p)
         return pkts
 
                     pkts.append(p)
         return pkts
 
@@ -381,14 +390,16 @@ class TestACLplugin(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
+    def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
+                        frags=False, pkt_raw=True):
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
         # Test
         # Create incoming packet streams for packet-generator interfaces
         pkts_cnt = 0
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags, pkt_raw)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
                     pkts_cnt += len(pkts)
@@ -408,13 +419,14 @@ class TestACLplugin(VppTestCase):
                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
 
     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
-                              ports=0):
+                              ports=0, frags=False):
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
         # Test
         self.reset_packet_infos()
         for i in self.pg_interfaces:
             if self.flows.__contains__(i):
                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
-                                          traffic_type, ip_type, proto, ports)
+                                          traffic_type, ip_type, proto, ports,
+                                          frags)
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
                 if len(pkts) > 0:
                     i.add_stream(pkts)
 
@@ -1011,5 +1023,102 @@ class TestACLplugin(VppTestCase):
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
 
         self.logger.info("ACLP_TEST_FINISH_0020")
 
+    def test_0021_udp_deny_port_verify_fragment_deny(self):
+        """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
+        """
+        self.logger.info("ACLP_TEST_START_0021")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        rules.append(self.create_rule(self.IPV6, self.DENY, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV4, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+        rules.append(self.create_rule(self.IPV6, self.PERMIT,
+                                      self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+
+        # Traffic should not pass
+        self.run_verify_negat_test(self.IP, self.IPRANDOM,
+                                   self.proto[self.IP][self.UDP], port, True)
+
+        self.logger.info("ACLP_TEST_FINISH_0021")
+
+    def test_0022_zero_length_udp_ipv4(self):
+        """ VPP-687 zero length udp ipv4 packet"""
+        self.logger.info("ACLP_TEST_START_0022")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(
+            self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip4 " + str(port))
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV4,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0022")
+
+    def test_0023_zero_length_udp_ipv6(self):
+        """ VPP-687 zero length udp ipv6 packet"""
+        self.logger.info("ACLP_TEST_START_0023")
+
+        port = random.randint(0, 65535)
+        # Add an ACL
+        rules = []
+        rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
+                                      self.proto[self.IP][self.UDP]))
+        # deny ip any any in the end
+        rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+        # Apply rules
+        self.apply_rules(rules, "permit empty udp ip6 "+str(port))
+
+        # Traffic should still pass
+        # Create incoming packet streams for packet-generator interfaces
+        pkts_cnt = 0
+        pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+                                  self.IP, self.IPV6,
+                                  self.proto[self.IP][self.UDP], port,
+                                  False, False)
+        if len(pkts) > 0:
+            self.pg0.add_stream(pkts)
+            pkts_cnt += len(pkts)
+
+        # Enable packet capture and start packet sendingself.IPV
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        # Verify outgoing packet streams per packet-generator interface
+        self.pg1.get_capture(pkts_cnt)
+
+        self.logger.info("ACLP_TEST_FINISH_0023")
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)