tests: improve packet checksum functions
[vpp.git] / test / framework.py
index d130855..a7a3e2f 100644 (file)
@@ -274,6 +274,8 @@ class TestCaseTag(Enum):
     FIXME_UBUNTU2204 = 4
     # marks suites broken on Debian-11
     FIXME_DEBIAN11 = 5
+    # marks suites broken on debug vpp image
+    FIXME_VPP_DEBUG = 6
 
 
 def create_tag_decorator(e):
@@ -292,6 +294,7 @@ tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
+tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
 
 
 class DummyVpp:
@@ -379,6 +382,10 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
             cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
 
+    @classmethod
+    def skip_fixme_vpp_debug(cls):
+        cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
+
     @classmethod
     def instance(cls):
         """Return the instance of this testcase"""
@@ -1318,7 +1325,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         if 0 == len(checksums):
             return
         temp = temp.__class__(scapy.compat.raw(temp))
-        for layer, cf in checksums:
+        for layer, cf in reversed(checksums):
             calc_sum = getattr(temp[layer], cf)
             self.assert_equal(
                 getattr(received[layer], cf),
@@ -1331,9 +1338,24 @@ class VppTestCase(CPUInterface, unittest.TestCase):
             )
 
     def assert_checksum_valid(
-        self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
+        self,
+        received_packet,
+        layer,
+        checksum_field_names=["chksum", "cksum"],
+        ignore_zero_checksum=False,
     ):
         """Check checksum of received packet on given layer"""
+        layer_copy = received_packet[layer].copy()
+        layer_copy.remove_payload()
+        field_name = None
+        for f in checksum_field_names:
+            if hasattr(layer_copy, f):
+                field_name = f
+                break
+        if field_name is None:
+            raise Exception(
+                f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
+            )
         received_packet_checksum = getattr(received_packet[layer], field_name)
         if ignore_zero_checksum and 0 == received_packet_checksum:
             return
@@ -1343,7 +1365,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         self.assert_equal(
             received_packet_checksum,
             getattr(recalculated[layer], field_name),
-            "packet checksum on layer: %s" % layer,
+            f"packet checksum (field: {field_name}) on layer: %s" % layer,
         )
 
     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
@@ -1379,12 +1401,12 @@ class VppTestCase(CPUInterface, unittest.TestCase):
 
     def assert_icmpv6_checksum_valid(self, pkt):
         if pkt.haslayer(ICMPv6DestUnreach):
-            self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
             self.assert_embedded_icmp_checksum_valid(pkt)
         if pkt.haslayer(ICMPv6EchoRequest):
-            self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
         if pkt.haslayer(ICMPv6EchoReply):
-            self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
 
     def get_counter(self, counter):
         if counter.startswith("/"):
@@ -1842,6 +1864,10 @@ class VppTestResult(unittest.TestResult):
                 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
                 test.skip_fixme_debian11()
 
+            if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
+                test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
+                test.skip_fixme_vpp_debug()
+
             if hasattr(test, "vpp_worker_count"):
                 if test.vpp_worker_count == 0:
                     test_title += " [main thread only]"