if 0 == len(checksums):
return
temp = temp.__class__(scapy.compat.raw(temp))
- for layer, cf in checksums:
+ for layer, cf in reversed(checksums):
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
getattr(received[layer], cf),
)
def assert_checksum_valid(
- self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
+ self,
+ received_packet,
+ layer,
+ checksum_field_names=["chksum", "cksum"],
+ ignore_zero_checksum=False,
):
"""Check checksum of received packet on given layer"""
+ layer_copy = received_packet[layer].copy()
+ layer_copy.remove_payload()
+ field_name = None
+ for f in checksum_field_names:
+ if hasattr(layer_copy, f):
+ field_name = f
+ break
+ if field_name is None:
+ raise Exception(
+ f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
+ )
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
self.assert_equal(
received_packet_checksum,
getattr(recalculated[layer], field_name),
- "packet checksum on layer: %s" % layer,
+ f"packet checksum (field: {field_name}) on layer: %s" % layer,
)
def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
def assert_icmpv6_checksum_valid(self, pkt):
if pkt.haslayer(ICMPv6DestUnreach):
- self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
self.assert_embedded_icmp_checksum_valid(pkt)
if pkt.haslayer(ICMPv6EchoRequest):
- self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
if pkt.haslayer(ICMPv6EchoReply):
- self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
def get_counter(self, counter):
if counter.startswith("/"):