X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Fframework.py;h=f90197b9b6fa203c7ffe00a0033355be32ffd087;hb=e0d2bd6bd7fc59c0c6ac48195d7f825dc99bfd91;hp=973214503a840303e85e2a0f3059a7d7f5b9a42e;hpb=07a38572caa2c2d2d8658420a7c3df8e7f9d0e74;p=vpp.git diff --git a/test/framework.py b/test/framework.py index 973214503a8..f90197b9b6f 100644 --- a/test/framework.py +++ b/test/framework.py @@ -25,6 +25,9 @@ from vpp_papi_provider import VppPapiProvider from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \ getLogger, colorize from vpp_object import VppObjectRegistry +from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror +from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest +from scapy.layers.inet6 import ICMPv6EchoReply if os.name == 'posix' and sys.version_info[0] < 3: # using subprocess32 is recommended by python official documentation # @ https://docs.python.org/2/library/subprocess.html @@ -32,6 +35,7 @@ if os.name == 'posix' and sys.version_info[0] < 3: else: import subprocess + debug_framework = False if os.getenv('TEST_DEBUG', "0") == "1": debug_framework = True @@ -729,6 +733,87 @@ class VppTestCase(unittest.TestCase): name, real_value, expected_min, expected_max) self.assertTrue(expected_min <= real_value <= expected_max, msg) + def assert_packet_checksums_valid(self, packet, + ignore_zero_udp_checksums=True): + udp_layers = ['UDP', 'UDPerror'] + checksum_fields = ['cksum', 'chksum'] + checksums = [] + counter = 0 + temp = packet.__class__(str(packet)) + while True: + layer = temp.getlayer(counter) + if layer: + for cf in checksum_fields: + if hasattr(layer, cf): + if ignore_zero_udp_checksums and \ + 0 == getattr(layer, cf) and \ + layer.name in udp_layers: + continue + delattr(layer, cf) + checksums.append((counter, cf)) + else: + break + counter = counter + 1 + temp = temp.__class__(str(temp)) + for layer, cf in checksums: + self.assert_equal(getattr(packet[layer], cf), + getattr(temp[layer], cf), + "packet checksum on layer #%d: %s" % ( + layer, temp[layer].name)) + + def assert_checksum_valid(self, received_packet, layer, + field_name='chksum', + ignore_zero_checksum=False): + """ Check checksum of received packet on given layer """ + received_packet_checksum = getattr(received_packet[layer], field_name) + if ignore_zero_checksum and 0 == received_packet_checksum: + return + recalculated = received_packet.__class__(str(received_packet)) + delattr(recalculated[layer], field_name) + recalculated = recalculated.__class__(str(recalculated)) + self.assert_equal(received_packet_checksum, + getattr(recalculated[layer], field_name), + "packet checksum on layer: %s" % layer) + + def assert_ip_checksum_valid(self, received_packet, + ignore_zero_checksum=False): + self.assert_checksum_valid(received_packet, 'IP', + ignore_zero_checksum=ignore_zero_checksum) + + def assert_tcp_checksum_valid(self, received_packet, + ignore_zero_checksum=False): + self.assert_checksum_valid(received_packet, 'TCP', + ignore_zero_checksum=ignore_zero_checksum) + + def assert_udp_checksum_valid(self, received_packet, + ignore_zero_checksum=True): + self.assert_checksum_valid(received_packet, 'UDP', + ignore_zero_checksum=ignore_zero_checksum) + + def assert_embedded_icmp_checksum_valid(self, received_packet): + if received_packet.haslayer(IPerror): + self.assert_checksum_valid(received_packet, 'IPerror') + if received_packet.haslayer(TCPerror): + self.assert_checksum_valid(received_packet, 'TCPerror') + if received_packet.haslayer(UDPerror): + self.assert_checksum_valid(received_packet, 'UDPerror', + ignore_zero_checksum=True) + if received_packet.haslayer(ICMPerror): + self.assert_checksum_valid(received_packet, 'ICMPerror') + + def assert_icmp_checksum_valid(self, received_packet): + self.assert_checksum_valid(received_packet, 'ICMP') + self.assert_embedded_icmp_checksum_valid(received_packet) + + def assert_icmpv6_checksum_valid(self, pkt): + if pkt.haslayer(ICMPv6DestUnreach): + self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum') + self.assert_embedded_icmp_checksum_valid(pkt) + if pkt.haslayer(ICMPv6EchoRequest): + self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum') + if pkt.haslayer(ICMPv6EchoReply): + self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum') + @classmethod def sleep(cls, timeout, remark=None): if hasattr(cls, 'logger'):