X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Fframework.py;h=be8c209f4ea277c2b972f668d87463d8a2f37df6;hb=c8efa29b6f9a91381897b54f1147daf922ed7164;hp=973214503a840303e85e2a0f3059a7d7f5b9a42e;hpb=07a38572caa2c2d2d8658420a7c3df8e7f9d0e74;p=vpp.git diff --git a/test/framework.py b/test/framework.py index 973214503a8..be8c209f4ea 100644 --- a/test/framework.py +++ b/test/framework.py @@ -25,6 +25,10 @@ from vpp_papi_provider import VppPapiProvider from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \ getLogger, colorize from vpp_object import VppObjectRegistry +from util import ppp +from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror +from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest +from scapy.layers.inet6 import ICMPv6EchoReply if os.name == 'posix' and sys.version_info[0] < 3: # using subprocess32 is recommended by python official documentation # @ https://docs.python.org/2/library/subprocess.html @@ -32,6 +36,7 @@ if os.name == 'posix' and sys.version_info[0] < 3: else: import subprocess + debug_framework = False if os.getenv('TEST_DEBUG', "0") == "1": debug_framework = True @@ -729,6 +734,95 @@ class VppTestCase(unittest.TestCase): name, real_value, expected_min, expected_max) self.assertTrue(expected_min <= real_value <= expected_max, msg) + def assert_packet_checksums_valid(self, packet, + ignore_zero_udp_checksums=True): + received = packet.__class__(str(packet)) + self.logger.debug( + ppp("Verifying packet checksums for packet:", received)) + udp_layers = ['UDP', 'UDPerror'] + checksum_fields = ['cksum', 'chksum'] + checksums = [] + counter = 0 + temp = received.__class__(str(received)) + while True: + layer = temp.getlayer(counter) + if layer: + for cf in checksum_fields: + if hasattr(layer, cf): + if ignore_zero_udp_checksums and \ + 0 == getattr(layer, cf) and \ + layer.name in udp_layers: + continue + delattr(layer, cf) + checksums.append((counter, cf)) + else: + break + counter = counter + 1 + if 0 == len(checksums): + return + temp = temp.__class__(str(temp)) + for layer, cf in checksums: + calc_sum = getattr(temp[layer], cf) + self.assert_equal( + getattr(received[layer], cf), calc_sum, + "packet checksum on layer #%d: %s" % (layer, temp[layer].name)) + self.logger.debug( + "Checksum field `%s` on `%s` layer has correct value `%s`" % + (cf, temp[layer].name, calc_sum)) + + def assert_checksum_valid(self, received_packet, layer, + field_name='chksum', + ignore_zero_checksum=False): + """ Check checksum of received packet on given layer """ + received_packet_checksum = getattr(received_packet[layer], field_name) + if ignore_zero_checksum and 0 == received_packet_checksum: + return + recalculated = received_packet.__class__(str(received_packet)) + delattr(recalculated[layer], field_name) + recalculated = recalculated.__class__(str(recalculated)) + self.assert_equal(received_packet_checksum, + getattr(recalculated[layer], field_name), + "packet checksum on layer: %s" % layer) + + def assert_ip_checksum_valid(self, received_packet, + ignore_zero_checksum=False): + self.assert_checksum_valid(received_packet, 'IP', + ignore_zero_checksum=ignore_zero_checksum) + + def assert_tcp_checksum_valid(self, received_packet, + ignore_zero_checksum=False): + self.assert_checksum_valid(received_packet, 'TCP', + ignore_zero_checksum=ignore_zero_checksum) + + def assert_udp_checksum_valid(self, received_packet, + ignore_zero_checksum=True): + self.assert_checksum_valid(received_packet, 'UDP', + ignore_zero_checksum=ignore_zero_checksum) + + def assert_embedded_icmp_checksum_valid(self, received_packet): + if received_packet.haslayer(IPerror): + self.assert_checksum_valid(received_packet, 'IPerror') + if received_packet.haslayer(TCPerror): + self.assert_checksum_valid(received_packet, 'TCPerror') + if received_packet.haslayer(UDPerror): + self.assert_checksum_valid(received_packet, 'UDPerror', + ignore_zero_checksum=True) + if received_packet.haslayer(ICMPerror): + self.assert_checksum_valid(received_packet, 'ICMPerror') + + def assert_icmp_checksum_valid(self, received_packet): + self.assert_checksum_valid(received_packet, 'ICMP') + self.assert_embedded_icmp_checksum_valid(received_packet) + + def assert_icmpv6_checksum_valid(self, pkt): + if pkt.haslayer(ICMPv6DestUnreach): + self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum') + self.assert_embedded_icmp_checksum_valid(pkt) + if pkt.haslayer(ICMPv6EchoRequest): + self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum') + if pkt.haslayer(ICMPv6EchoReply): + self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum') + @classmethod def sleep(cls, timeout, remark=None): if hasattr(cls, 'logger'):