tests: improve packet checksum functions 54/37654/2
authorKlement Sekera <klement.sekera@gmail.com>
Mon, 14 Nov 2022 10:26:18 +0000 (11:26 +0100)
committerDave Wallace <dwallacelf@gmail.com>
Wed, 18 Jan 2023 00:39:57 +0000 (00:39 +0000)
Fool-proof assert_checksum_valid so that one does not verify checksum on
wrong layer (because of how scapy internally works).

Make assert_packet_checksums_valid start checksum checking at inner
layers and outwards to make it more obvious where the error is. With old
behaviour, if one received an ICMP packet carrying a truncated TCP
packet, an error would be raised for ICMP checksum, as that one would be
the first to be wrong after recalculating all packet checksums, while
the real issue is TCP header being truncated and thus unsuitable for use
with this function.

Type: improvement
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Change-Id: I39a2b50ec5610f969cfde9796416ee3a50ae0ba3

test/framework.py

index 2b19732..a7a3e2f 100644 (file)
@@ -1325,7 +1325,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         if 0 == len(checksums):
             return
         temp = temp.__class__(scapy.compat.raw(temp))
-        for layer, cf in checksums:
+        for layer, cf in reversed(checksums):
             calc_sum = getattr(temp[layer], cf)
             self.assert_equal(
                 getattr(received[layer], cf),
@@ -1338,9 +1338,24 @@ class VppTestCase(CPUInterface, unittest.TestCase):
             )
 
     def assert_checksum_valid(
-        self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
+        self,
+        received_packet,
+        layer,
+        checksum_field_names=["chksum", "cksum"],
+        ignore_zero_checksum=False,
     ):
         """Check checksum of received packet on given layer"""
+        layer_copy = received_packet[layer].copy()
+        layer_copy.remove_payload()
+        field_name = None
+        for f in checksum_field_names:
+            if hasattr(layer_copy, f):
+                field_name = f
+                break
+        if field_name is None:
+            raise Exception(
+                f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
+            )
         received_packet_checksum = getattr(received_packet[layer], field_name)
         if ignore_zero_checksum and 0 == received_packet_checksum:
             return
@@ -1350,7 +1365,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
         self.assert_equal(
             received_packet_checksum,
             getattr(recalculated[layer], field_name),
-            "packet checksum on layer: %s" % layer,
+            f"packet checksum (field: {field_name}) on layer: %s" % layer,
         )
 
     def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
@@ -1386,12 +1401,12 @@ class VppTestCase(CPUInterface, unittest.TestCase):
 
     def assert_icmpv6_checksum_valid(self, pkt):
         if pkt.haslayer(ICMPv6DestUnreach):
-            self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
             self.assert_embedded_icmp_checksum_valid(pkt)
         if pkt.haslayer(ICMPv6EchoRequest):
-            self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
         if pkt.haslayer(ICMPv6EchoReply):
-            self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
+            self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
 
     def get_counter(self, counter):
         if counter.startswith("/"):