X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Fframework.py;h=a7a3e2fdea7b53dbaa771acff6d3ab72ab676c4f;hb=738cf73b2c5e396a56b8fab19b64a07b171cb14c;hp=2b197326532f3172d879ba68e2602d20d2ceba1e;hpb=cc16e7bad74c18b1a682329af5e760c4cd8381ea;p=vpp.git diff --git a/test/framework.py b/test/framework.py index 2b197326532..a7a3e2fdea7 100644 --- a/test/framework.py +++ b/test/framework.py @@ -1325,7 +1325,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): if 0 == len(checksums): return temp = temp.__class__(scapy.compat.raw(temp)) - for layer, cf in checksums: + for layer, cf in reversed(checksums): calc_sum = getattr(temp[layer], cf) self.assert_equal( getattr(received[layer], cf), @@ -1338,9 +1338,24 @@ class VppTestCase(CPUInterface, unittest.TestCase): ) def assert_checksum_valid( - self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False + self, + received_packet, + layer, + checksum_field_names=["chksum", "cksum"], + ignore_zero_checksum=False, ): """Check checksum of received packet on given layer""" + layer_copy = received_packet[layer].copy() + layer_copy.remove_payload() + field_name = None + for f in checksum_field_names: + if hasattr(layer_copy, f): + field_name = f + break + if field_name is None: + raise Exception( + f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`." + ) received_packet_checksum = getattr(received_packet[layer], field_name) if ignore_zero_checksum and 0 == received_packet_checksum: return @@ -1350,7 +1365,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): self.assert_equal( received_packet_checksum, getattr(recalculated[layer], field_name), - "packet checksum on layer: %s" % layer, + f"packet checksum (field: {field_name}) on layer: %s" % layer, ) def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False): @@ -1386,12 +1401,12 @@ class VppTestCase(CPUInterface, unittest.TestCase): def assert_icmpv6_checksum_valid(self, pkt): if pkt.haslayer(ICMPv6DestUnreach): - self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum") + self.assert_checksum_valid(pkt, "ICMPv6DestUnreach") self.assert_embedded_icmp_checksum_valid(pkt) if pkt.haslayer(ICMPv6EchoRequest): - self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum") + self.assert_checksum_valid(pkt, "ICMPv6EchoRequest") if pkt.haslayer(ICMPv6EchoReply): - self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum") + self.assert_checksum_valid(pkt, "ICMPv6EchoReply") def get_counter(self, counter): if counter.startswith("/"):