Tests: Refactor tearDown show command logging, add lifecycle markers.
[vpp.git] / test / test_acl_plugin_l2l3.py
index 66d053f..6b4ea47 100644 (file)
@@ -28,6 +28,7 @@ from socket import inet_pton, AF_INET, AF_INET6
 from random import choice, shuffle
 from pprint import pprint
 
+import scapy.compat
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, ICMP, TCP
@@ -36,12 +37,12 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
 from scapy.layers.inet6 import IPv6ExtHdrFragment
 
 from framework import VppTestCase, VppTestRunner
-from vpp_papi_provider import L2_PORT_TYPE
+from vpp_l2 import L2_PORT_TYPE
 import time
 
 
-class TestIpIrb(VppTestCase):
-    """IRB Test Case"""
+class TestACLpluginL2L3(VppTestCase):
+    """TestACLpluginL2L3 Test Case"""
 
     @classmethod
     def setUpClass(cls):
@@ -53,7 +54,7 @@ class TestIpIrb(VppTestCase):
         #. Loopback BVI interface has remote hosts, one half of hosts are
            behind pg0 second behind pg1.
         """
-        super(TestIpIrb, cls).setUpClass()
+        super(TestACLpluginL2L3, cls).setUpClass()
 
         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
         cls.bd_id = 10
@@ -71,12 +72,12 @@ class TestIpIrb(VppTestCase):
 
         # Create BD with MAC learning enabled and put interfaces to this BD
         cls.vapi.sw_interface_set_l2_bridge(
-            cls.loop0.sw_if_index, bd_id=cls.bd_id,
+            rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
             port_type=L2_PORT_TYPE.BVI)
-        cls.vapi.sw_interface_set_l2_bridge(
-            cls.pg0.sw_if_index, bd_id=cls.bd_id)
-        cls.vapi.sw_interface_set_l2_bridge(
-            cls.pg1.sw_if_index, bd_id=cls.bd_id)
+        cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
+                                            bd_id=cls.bd_id)
+        cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
+                                            bd_id=cls.bd_id)
 
         # Configure IPv4 addresses on loopback interface and routed interface
         cls.loop0.config_ip4()
@@ -94,6 +95,8 @@ class TestIpIrb(VppTestCase):
 
         cls.WITHOUT_EH = False
         cls.WITH_EH = True
+        cls.STATELESS_ICMP = False
+        cls.STATEFUL_ICMP = True
 
         # Loopback BVI interface has remote hosts, one half of hosts are behind
         # pg0 second behind pg1
@@ -101,29 +104,34 @@ class TestIpIrb(VppTestCase):
         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
 
+    @classmethod
+    def tearDownClass(cls):
+        super(TestACLpluginL2L3, cls).tearDownClass()
+
     def tearDown(self):
         """Run standard test teardown and log ``show l2patch``,
         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
         ``show ip arp``.
         """
-        super(TestIpIrb, self).tearDown()
-        if not self.vpp_dead:
-            self.logger.info(self.vapi.cli("show l2patch"))
-            self.logger.info(self.vapi.cli("show classify tables"))
-            self.logger.info(self.vapi.cli("show vlib graph"))
-            self.logger.info(self.vapi.cli("show l2fib verbose"))
-            self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
-                                           self.bd_id))
-            self.logger.info(self.vapi.cli("show ip arp"))
-            self.logger.info(self.vapi.cli("show ip6 neighbors"))
-            self.logger.info(self.vapi.cli("show acl-plugin sessions"))
-            self.logger.info(self.vapi.cli("show acl-plugin acl"))
-            self.logger.info(self.vapi.cli("show acl-plugin interface"))
-            self.logger.info(self.vapi.cli("show acl-plugin tables"))
+        super(TestACLpluginL2L3, self).tearDown()
+
+    def show_commands_at_teardown(self):
+        self.logger.info(self.vapi.cli("show l2patch"))
+        self.logger.info(self.vapi.cli("show classify tables"))
+        self.logger.info(self.vapi.cli("show l2fib verbose"))
+        self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
+                                       self.bd_id))
+        self.logger.info(self.vapi.cli("show ip arp"))
+        self.logger.info(self.vapi.cli("show ip6 neighbors"))
+        cmd = "show acl-plugin sessions verbose 1"
+        self.logger.info(self.vapi.cli(cmd))
+        self.logger.info(self.vapi.cli("show acl-plugin acl"))
+        self.logger.info(self.vapi.cli("show acl-plugin interface"))
+        self.logger.info(self.vapi.cli("show acl-plugin tables"))
 
     def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
                       is_ip6, expect_blocked, expect_established,
-                      add_extension_header):
+                      add_extension_header, icmp_stateful=False):
         pkts = []
         rules = []
         permit_rules = []
@@ -131,7 +139,15 @@ class TestIpIrb(VppTestCase):
         total_packet_count = 8
         for i in range(0, total_packet_count):
             modulo = (i//2) % 2
-            can_reflect_this_packet = (modulo == 0)
+            icmp_type_delta = i % 2
+            icmp_code = i
+            is_udp_packet = (modulo == 0)
+            if is_udp_packet and icmp_stateful:
+                continue
+            is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
+                                   not is_udp_packet)
+            is_reflected_icmp = is_reflectable_icmp and expect_established
+            can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
             is_permit = i % 2
             remote_dst_index = i % len(dst_ip_if.remote_hosts)
             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
@@ -167,13 +183,15 @@ class TestIpIrb(VppTestCase):
                 dst_ip4 = remote_dst_host.ip4
                 src_l4 = 1234 + i
                 dst_l4 = 4321 + i
+            if is_reflected_icmp:
+                icmp_type_delta = 1
 
             # default ULP should be something we do not use in tests
             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
             # potentially a chain of protocols leading to ULP
             ulp = ulp_l4
 
-            if can_reflect_this_packet:
+            if is_udp_packet:
                 if is_ip6:
                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
                     if add_extension_header:
@@ -200,14 +218,15 @@ class TestIpIrb(VppTestCase):
                          Raw(payload))
             elif modulo == 1:
                 if is_ip6:
-                    ulp_l4 = ICMPv6Unknown(type=128 + (i % 2), code=i % 2)
+                    ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
+                                           code=icmp_code)
                     ulp = ulp_l4
                     p = (Ether(dst=dst_mac, src=src_mac) /
                          IPv6(src=src_ip6, dst=dst_ip6) /
                          ulp /
                          Raw(payload))
                 else:
-                    ulp_l4 = ICMP(type=8 + (i % 2), code=i % 2)
+                    ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
                     ulp = ulp_l4
                     p = (Ether(dst=dst_mac, src=src_mac) /
                          IP(src=src_ip4, dst=dst_ip4) /
@@ -264,7 +283,9 @@ class TestIpIrb(VppTestCase):
                 new_rule_permit_and_reflect['is_permit'] = 2
             else:
                 new_rule_permit_and_reflect['is_permit'] = is_permit
+
             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
+            self.logger.info("create_stream pkt#%d: %s" % (i, payload))
 
         return {'stream': pkts,
                 'rules': rules,
@@ -277,7 +298,6 @@ class TestIpIrb(VppTestCase):
             last_info[i.sw_if_index] = None
 
         dst_ip_sw_if_index = dst_ip_if.sw_if_index
-        return
 
         for packet in capture:
             l3 = IP if packet.haslayer(IP) else IPv6
@@ -293,14 +313,18 @@ class TestIpIrb(VppTestCase):
             # Scapy IPv6 stuff is too smart for its own good.
             # So we do this and coerce the ICMP into unknown type
             if packet.haslayer(UDP):
-                data = str(packet[UDP][Raw])
+                data = scapy.compat.raw(packet[UDP][Raw])
             else:
                 if l3 == IP:
-                    data = str(ICMP(str(packet[l3].payload))[Raw])
+                    data = scapy.compat.raw(ICMP(
+                        scapy.compat.raw(packet[l3].payload))[Raw])
                 else:
-                    data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody)
+                    data = scapy.compat.raw(ICMPv6Unknown(
+                        scapy.compat.raw(packet[l3].payload)).msgbody)
             udp_or_icmp = packet[l3].payload
-            payload_info = self.payload_to_info(data)
+            data_obj = Raw(data)
+            # FIXME: make framework believe we are on object
+            payload_info = self.payload_to_info(data_obj)
             packet_index = payload_info.index
 
             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
@@ -327,8 +351,6 @@ class TestIpIrb(VppTestCase):
                 if l4 == UDP:
                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
-            else:
-                print("Saved packet is none")
             # self.assertEqual(ip.dst, host.ip4)
 
             # UDP:
@@ -354,17 +376,17 @@ class TestIpIrb(VppTestCase):
         shuffle(all_rules)
         reply = self.vapi.acl_add_replace(acl_index=4294967295,
                                           r=all_rules[::2],
-                                          tag="shuffle 1. acl")
+                                          tag=b"shuffle 1. acl")
         shuffle_acl_1 = reply.acl_index
         shuffle(all_rules)
         reply = self.vapi.acl_add_replace(acl_index=4294967295,
                                           r=all_rules[::3],
-                                          tag="shuffle 2. acl")
+                                          tag=b"shuffle 2. acl")
         shuffle_acl_2 = reply.acl_index
         shuffle(all_rules)
         reply = self.vapi.acl_add_replace(acl_index=4294967295,
                                           r=all_rules[::2],
-                                          tag="shuffle 3. acl")
+                                          tag=b"shuffle 3. acl")
         shuffle_acl_3 = reply.acl_index
 
         # apply the shuffle ACLs in front
@@ -389,15 +411,15 @@ class TestIpIrb(VppTestCase):
             shuffle(all_rules)
             reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
                                               r=all_rules[::1+(i % 2)],
-                                              tag="shuffle 1. acl")
+                                              tag=b"shuffle 1. acl")
             shuffle(all_rules)
             reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
                                               r=all_rules[::1+(i % 3)],
-                                              tag="shuffle 2. acl")
+                                              tag=b"shuffle 2. acl")
             shuffle(all_rules)
             reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
                                               r=all_rules[::1+(i % 5)],
-                                              tag="shuffle 3. acl")
+                                              tag=b"shuffle 3. acl")
 
         # restore to how it was before and clean up
         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
@@ -414,10 +436,10 @@ class TestIpIrb(VppTestCase):
         r_permit_reflect = stream_dict['permit_and_reflect_rules']
         r_action = r_permit_reflect if is_reflect else r
         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
-                                          tag="act. acl")
+                                          tag=b"act. acl")
         action_acl_index = reply.acl_index
         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
-                                          tag="perm. acl")
+                                          tag=b"perm. acl")
         permit_acl_index = reply.acl_index
         return {'L2': action_acl_index if test_l2_action else permit_acl_index,
                 'L3': permit_acl_index if test_l2_action else action_acl_index,
@@ -452,20 +474,22 @@ class TestIpIrb(VppTestCase):
 
     def apply_acl_ip46_both_directions_reflect(self,
                                                primary_is_bridged_to_routed,
-                                               reflect_on_l2, is_ip6, add_eh):
+                                               reflect_on_l2, is_ip6, add_eh,
+                                               stateful_icmp):
         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
         self.reset_packet_infos()
         stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
                                              primary_is_bridged_to_routed,
                                              self.pg_if_packet_sizes, is_ip6,
-                                             False, False, add_eh)
+                                             False, False, add_eh,
+                                             stateful_icmp)
         acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
                                                     reflect_on_l2, True)
 
         stream_dict_rev = self.create_stream(self.pg2, self.loop0,
                                              not primary_is_bridged_to_routed,
                                              self.pg_if_packet_sizes, is_ip6,
-                                             True, True, add_eh)
+                                             True, True, add_eh, stateful_icmp)
         # We want the primary action to be "deny" rather than reflect
         acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
                                                     reflect_on_l2, False)
@@ -517,13 +541,14 @@ class TestIpIrb(VppTestCase):
 
     def run_traffic_ip46_x_to_y(self, bridged_to_routed,
                                 test_l2_deny, is_ip6,
-                                is_reflect, is_established, add_eh):
+                                is_reflect, is_established, add_eh,
+                                stateful_icmp=False):
         self.reset_packet_infos()
         stream_dict = self.create_stream(self.pg2, self.loop0,
                                          bridged_to_routed,
                                          self.pg_if_packet_sizes, is_ip6,
                                          not is_reflect, is_established,
-                                         add_eh)
+                                         add_eh, stateful_icmp)
         stream = stream_dict['stream']
 
         tx_if = self.pg0 if bridged_to_routed else self.pg2
@@ -537,14 +562,18 @@ class TestIpIrb(VppTestCase):
         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
 
     def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
-                                           is_reflect, is_established, add_eh):
+                                           is_reflect, is_established, add_eh,
+                                           stateful_icmp=False):
         self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
-                                     is_reflect, is_established, add_eh)
+                                     is_reflect, is_established, add_eh,
+                                     stateful_icmp)
 
     def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
-                                           is_reflect, is_established, add_eh):
+                                           is_reflect, is_established, add_eh,
+                                           stateful_icmp=False):
         self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
-                                     is_reflect, is_established, add_eh)
+                                     is_reflect, is_established, add_eh,
+                                     stateful_icmp)
 
     def run_test_ip46_routed_to_bridged(self, test_l2_deny,
                                         is_ip6, is_reflect, add_eh):
@@ -561,22 +590,30 @@ class TestIpIrb(VppTestCase):
                                                 is_reflect, False, add_eh)
 
     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
-                                                 is_ip6, add_eh):
+                                                 is_ip6, add_eh,
+                                                 stateful_icmp=False):
         self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
-                                                    is_ip6, add_eh)
+                                                    is_ip6, add_eh,
+                                                    stateful_icmp)
         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
-                                                True, False, add_eh)
+                                                True, False, add_eh,
+                                                stateful_icmp)
         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
-                                                False, True, add_eh)
+                                                False, True, add_eh,
+                                                stateful_icmp)
 
     def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
-                                                 is_ip6, add_eh):
+                                                 is_ip6, add_eh,
+                                                 stateful_icmp=False):
         self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
-                                                    is_ip6, add_eh)
+                                                    is_ip6, add_eh,
+                                                    stateful_icmp)
         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
-                                                True, False, add_eh)
+                                                True, False, add_eh,
+                                                stateful_icmp)
         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
-                                                False, True, add_eh)
+                                                False, True, add_eh,
+                                                stateful_icmp)
 
     def test_0000_ip6_irb_1(self):
         """ ACL plugin prepare"""
@@ -761,6 +798,56 @@ class TestIpIrb(VppTestCase):
         """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
         self.run_test_ip46_bridged_to_routed_and_back(False, False,
                                                       self.WITH_EH)
+    # Stateful ACL tests with stateful ICMP
+
+    def test_1401_ip6_irb_1(self):
+        """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(True, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1402_ip6_irb_1(self):
+        """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(True, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1403_ip4_irb_1(self):
+        """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(True, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1404_ip4_irb_1(self):
+        """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(True, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1411_ip6_irb_1(self):
+        """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(False, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1412_ip6_irb_1(self):
+        """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(False, True,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1413_ip4_irb_1(self):
+        """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_routed_to_bridged_and_back(False, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
+    def test_1414_ip4_irb_1(self):
+        """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+        self.run_test_ip46_bridged_to_routed_and_back(False, False,
+                                                      self.WITHOUT_EH,
+                                                      self.STATEFUL_ICMP)
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)