tests: improve packet checksum functions
[vpp.git] / test / framework.py
index 2b19732..a7a3e2f 100644 (file)
@@ -1325,7 +1325,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         if 0 == len(checksums):
             return
         temp = temp.__class__(scapy.compat.raw(temp))
-        for layer, cf in checksums:
+        for layer, cf in reversed(checksums):
             calc_sum = getattr(temp[layer], cf)
             self.assert_equal(
                 getattr(received[layer], cf),
@@ -1338,9 +1338,24 @@ class VppTestCase(CPUInterface, unittest.TestCase):
             )
 
     def assert_checksum_valid(
-        self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
+        self,
+        received_packet,
+        layer,
+        checksum_field_names=["chksum", "cksum"],
+        ignore_zero_checksum=False,
     ):
         """Check checksum of received packet on given layer"""
+        layer_copy = received_packet[layer].copy()
+        layer_copy.remove_payload()
+        field_name = None
+        for f in checksum_field_names:
+            if hasattr(layer_copy, f):
+                field_name = f
+                break
+        if field_name is None:
+            raise Exception(
+                f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
+            )
         received_packet_checksum = getattr(received_packet[layer], field_name)
         if ignore_zero_checksum and 0 == received_packet_checksum:
             return
@@ -1350,7 +1365,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         self.assert_equal(
             received_packet_checksum,
             getattr(recalculated[layer], field_name),
-            "packet checksum on layer: %s" % layer,
+            f"packet checksum (field: {field_name}) on layer: %s" % layer,
         )
 
     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
@@ -1386,12 +1401,12 @@ class VppTestCase(CPUInterface, unittest.TestCase):
 
     def assert_icmpv6_checksum_valid(self, pkt):
         if pkt.haslayer(ICMPv6DestUnreach):
-            self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
             self.assert_embedded_icmp_checksum_valid(pkt)
         if pkt.haslayer(ICMPv6EchoRequest):
-            self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
         if pkt.haslayer(ICMPv6EchoReply):
-            self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
 
     def get_counter(self, counter):
         if counter.startswith("/"):