acl-plugin: fix the stateful ICMP handling and add testcases
[vpp.git] / test / test_acl_plugin_l2l3.py
index 66d053f..8e6b9e3 100644 (file)
@@ -40,7 +40,7 @@ from vpp_papi_provider import L2_PORT_TYPE
 import time
 
 
-class TestIpIrb(VppTestCase):
+class TestAclIpIrb(VppTestCase):
     """IRB Test Case"""
 
     @classmethod
@@ -53,7 +53,7 @@ class TestIpIrb(VppTestCase):
         #. Loopback BVI interface has remote hosts, one half of hosts are
            behind pg0 second behind pg1.
         """
-        super(TestIpIrb, cls).setUpClass()
+        super(TestAclIpIrb, cls).setUpClass()
 
         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
         cls.bd_id = 10
@@ -94,6 +94,8 @@ class TestIpIrb(VppTestCase):
 
         cls.WITHOUT_EH = False
         cls.WITH_EH = True
+        cls.STATELESS_ICMP = False
+        cls.STATEFUL_ICMP = True
 
         # Loopback BVI interface has remote hosts, one half of hosts are behind
         # pg0 second behind pg1
@@ -106,24 +108,24 @@ class TestIpIrb(VppTestCase):
         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
         ``show ip arp``.
         """
-        super(TestIpIrb, self).tearDown()
+        super(TestAclIpIrb, self).tearDown()
         if not self.vpp_dead:
             self.logger.info(self.vapi.cli("show l2patch"))
             self.logger.info(self.vapi.cli("show classify tables"))
-            self.logger.info(self.vapi.cli("show vlib graph"))
             self.logger.info(self.vapi.cli("show l2fib verbose"))
             self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
                                            self.bd_id))
             self.logger.info(self.vapi.cli("show ip arp"))
             self.logger.info(self.vapi.cli("show ip6 neighbors"))
-            self.logger.info(self.vapi.cli("show acl-plugin sessions"))
+            cmd = "show acl-plugin sessions verbose 1"
+            self.logger.info(self.vapi.cli(cmd))
             self.logger.info(self.vapi.cli("show acl-plugin acl"))
             self.logger.info(self.vapi.cli("show acl-plugin interface"))
             self.logger.info(self.vapi.cli("show acl-plugin tables"))
 
     def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
                       is_ip6, expect_blocked, expect_established,
-                      add_extension_header):
+                      add_extension_header, icmp_stateful=False):
         pkts = []
         rules = []
         permit_rules = []
@@ -131,7 +133,15 @@ class TestIpIrb(VppTestCase):
         total_packet_count = 8
         for i in range(0, total_packet_count):
             modulo = (i//2) % 2
-            can_reflect_this_packet = (modulo == 0)
+            icmp_type_delta = i % 2
+            icmp_code = i
+            is_udp_packet = (modulo == 0)
+            if is_udp_packet and icmp_stateful:
+                continue
+            is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
+                                   not is_udp_packet)
+            is_reflected_icmp = is_reflectable_icmp and expect_established
+            can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
             is_permit = i % 2
             remote_dst_index = i % len(dst_ip_if.remote_hosts)
             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
@@ -167,13 +177,15 @@ class TestIpIrb(VppTestCase):
                 dst_ip4 = remote_dst_host.ip4
                 src_l4 = 1234 + i
                 dst_l4 = 4321 + i
+            if is_reflected_icmp:
+                icmp_type_delta = 1
 
             # default ULP should be something we do not use in tests
             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
             # potentially a chain of protocols leading to ULP
             ulp = ulp_l4
 
-            if can_reflect_this_packet:
+            if is_udp_packet:
                 if is_ip6:
                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
                     if add_extension_header:
@@ -200,14 +212,15 @@ class TestIpIrb(VppTestCase):
                          Raw(payload))
             elif modulo == 1:
                 if is_ip6:
-                    ulp_l4 = ICMPv6Unknown(type=128 + (i % 2), code=i % 2)
+                    ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
+                                           code=icmp_code)
                     ulp = ulp_l4
                     p = (Ether(dst=dst_mac, src=src_mac) /
                          IPv6(src=src_ip6, dst=dst_ip6) /
                          ulp /
                          Raw(payload))
                 else:
-                    ulp_l4 = ICMP(type=8 + (i % 2), code=i % 2)
+                    ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
                     ulp = ulp_l4
                     p = (Ether(dst=dst_mac, src=src_mac) /
                          IP(src=src_ip4, dst=dst_ip4) /
@@ -264,7 +277,9 @@ class TestIpIrb(VppTestCase):
                 new_rule_permit_and_reflect['is_permit'] = 2
             else:
                 new_rule_permit_and_reflect['is_permit'] = is_permit
+
             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
+            self.logger.info("create_stream pkt#%d: %s" % (i, payload))
 
         return {'stream': pkts,
                 'rules': rules,
@@ -452,20 +467,22 @@ class TestIpIrb(VppTestCase):
 
     def apply_acl_ip46_both_directions_reflect(self,
                                                primary_is_bridged_to_routed,
-                                               reflect_on_l2, is_ip6, add_eh):
+                                               reflect_on_l2, is_ip6, add_eh,
+                                               stateful_icmp):
         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
         self.reset_packet_infos()
         stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
                                              primary_is_bridged_to_routed,
                                              self.pg_if_packet_sizes, is_ip6,
-                                             False, False, add_eh)
+                                             False, False, add_eh,
+                                             stateful_icmp)
         acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
                                                     reflect_on_l2, True)
 
         stream_dict_rev = self.create_stream(self.pg2, self.loop0,
                                              not primary_is_bridged_to_routed,
                                              self.pg_if_packet_sizes, is_ip6,
-                                             True, True, add_eh)
+                                             True, True, add_eh, stateful_icmp)
         # We want the primary action to be "deny" rather than reflect
         acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
                                                     reflect_on_l2, False)
@@ -517,13 +534,14 @@ class TestIpIrb(VppTestCase):
 
     def run_traffic_ip46_x_to_y(self, bridged_to_routed,
                                 test_l2_deny, is_ip6,
-                                is_reflect, is_established, add_eh):
+                                is_reflect, is_established, add_eh,
+                                stateful_icmp=False):
         self.reset_packet_infos()
         stream_dict = self.create_stream(self.pg2, self.loop0,
                                          bridged_to_routed,
                                          self.pg_if_packet_sizes, is_ip6,
                                          not is_reflect, is_established,
-                                         add_eh)
+                                         add_eh, stateful_icmp)
         stream = stream_dict['stream']
 
         tx_if = self.pg0 if bridged_to_routed else self.pg2
@@ -537,14 +555,18 @@ class TestIpIrb(VppTestCase):
         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
 
     def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
-                                           is_reflect, is_established, add_eh):
+                                           is_reflect, is_established, add_eh,
+                                           stateful_icmp=False):
         self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
-                                     is_reflect, is_established, add_eh)
+                                     is_reflect, is_established, add_eh,
+                                     stateful_icmp)
 
     def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
-                                           is_reflect, is_established, add_eh):
+                                           is_reflect, is_established, add_eh,
+                                           stateful_icmp=False):
         self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
-                                     is_reflect, is_established, add_eh)
+                                     is_reflect, is_established, add_eh,
+                                     stateful_icmp)
 
     def run_test_ip46_routed_to_bridged(self, test_l2_deny,
                                         is_ip6, is_reflect, add_eh):
@@ -561,22 +583,30 @@ class TestIpIrb(VppTestCase):
                                                 is_reflect, False, add_eh)
 
     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
-                                                 is_ip6, add_eh):
+                                                 is_ip6, add_eh,
+                                                 stateful_icmp=False):
         self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
-                                                    is_ip6, add_eh)
+                                                    is_ip6, add_eh,
+                                                    stateful_icmp)
         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
-                                                True, False, add_eh)
+                                                True, False, add_eh,
+                                                stateful_icmp)
         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
-                                                False, True, add_eh)
+                                                False, True, add_eh,
+                                                stateful_icmp)
 
     def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
-                                                 is_ip6, add_eh):
+                                                 is_ip6, add_eh,
+                                                 stateful_icmp=False):
         self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
-                                                    is_ip6, add_eh)
+                                                    is_ip6, add_eh,
+                                                    stateful_icmp)
         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
-                                                True, False, add_eh)
+                                                True, False, add_eh,
+                                                stateful_icmp)
         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
-                                                False, True, add_eh)
+                                                False, True, add_eh,
+                                                stateful_icmp)
 
     def test_0000_ip6_irb_1(self):
         """ ACL plugin prepare"""
@@ -761,6 +791,56 @@ class TestIpIrb(VppTestCase):
         """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
         self.run_test_ip46_bridged_to_routed_and_back(False, False,
                                                       self.WITH_EH)
+    # Stateful ACL tests with stateful ICMP
+
+    def test_1401_ip6_irb_1(self):
+        """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(True, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1402_ip6_irb_1(self):
+        """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(True, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1403_ip4_irb_1(self):
+        """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(True, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1404_ip4_irb_1(self):
+        """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(True, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1411_ip6_irb_1(self):
+        """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(False, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1412_ip6_irb_1(self):
+        """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(False, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1413_ip4_irb_1(self):
+        """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(False, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1414_ip4_irb_1(self):
+        """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(False, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)