X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fcheck_ra_packet.py;h=9717c7db95b021f27cf8db807b866737d84a6c85;hb=c240327b6122bafc12ed102b6da3ac30f48b04c2;hp=d95ef2d2f5daae1044c3e86004094aaf011272e4;hpb=8be64446049878a0fb4a91a3a8b0481e75b20205;p=csit.git diff --git a/resources/traffic_scripts/check_ra_packet.py b/resources/traffic_scripts/check_ra_packet.py index d95ef2d2f5..9717c7db95 100755 --- a/resources/traffic_scripts/check_ra_packet.py +++ b/resources/traffic_scripts/check_ra_packet.py @@ -17,6 +17,8 @@ import sys import ipaddress +from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS + from resources.libraries.python.PacketVerifier import RxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -49,39 +51,53 @@ def main(): part. """ - args = TrafficScriptArg(['src_mac']) + args = TrafficScriptArg(['src_mac', 'interval']) rx_if = args.get_arg('rx_if') src_mac = args.get_arg('src_mac') + interval = int(args.get_arg('interval')) rxq = RxQueue(rx_if) - ether = rxq.recv(8) - - # Check whether received packet contains layer RA and check other values - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') - - if not ether.haslayer('ICMPv6ND_RA'): - raise RuntimeError('Not an RA packet received {0}' - .format(ether.__repr__())) - - address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) + # receive ICMPv6ND_RA packet + while True: + ether = rxq.recv(max(5, interval)) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + # Check if received packet contains layer RA and check other values + if not ether.haslayer(ICMPv6ND_RA): + raise RuntimeError('Not an RA packet received {0}'. + format(ether.__repr__())) + + src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) + dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst)) link_local = ipaddress.IPv6Address(unicode(mac_to_ipv6_linklocal(src_mac))) - - if address != link_local: - raise RuntimeError( - 'Source address ({0}) not matching link local address({1})'.format( - address, link_local)) - - if ether['IPv6'].hlim != 255: - raise RuntimeError('Hop limit not correct: {0}!=255'.format( - ether['IPv6'].hlim)) - - ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code + all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1') + + if src_address != link_local: + raise RuntimeError('Source address ({0}) not matching link local ' + 'address ({1})'.format(src_address, link_local)) + if dst_address != all_nodes_multicast: + raise RuntimeError('Packet destination address ({0}) is not the all' + ' nodes multicast address ({1}).'. + format(dst_address, all_nodes_multicast)) + if ether[IPv6].hlim != 255: + raise RuntimeError('Hop limit not correct: {0}!=255'. + format(ether[IPv6].hlim)) + + ra_code = ether[ICMPv6ND_RA].code if ra_code != 0: raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code)) sys.exit(0) + if __name__ == "__main__": main()