X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fipv6_ns.py;fp=resources%2Ftraffic_scripts%2Fipv6_ns.py;h=5852e6317f7b08e1b3910afd5734ff0eb262fdaf;hp=70c6ab445a1500b9a70eacc03cd9353c71b8f58c;hb=2a848f49308868dfe6fa3a9cb78bd085f8c16f40;hpb=6928a6be42016a6c5edade6369041670fe544f39 diff --git a/resources/traffic_scripts/ipv6_ns.py b/resources/traffic_scripts/ipv6_ns.py index 70c6ab445a..5852e6317f 100755 --- a/resources/traffic_scripts/ipv6_ns.py +++ b/resources/traffic_scripts/ipv6_ns.py @@ -17,13 +17,18 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg + +from scapy.all import Ether from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr -from scapy.all import Ether + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def main(): @@ -41,57 +46,67 @@ def main(): # send ICMPv6 neighbor solicitation message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NS(tgt=dst_ip) / - ICMPv6NDOptSrcLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NS(tgt=dst_ip) / + ICMPv6NDOptSrcLLAddr(lladdr=src_mac)) sent_packets.append(pkt_send) txq.send(pkt_send) # receive ICMPv6 neighbor advertisement message - ether = rxq.recv(2, sent_packets) + while True: + ether = rxq.recv(2, sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + if ether is None: raise RuntimeError('ICMPv6 echo reply Rx timeout') if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] if not ipv6.haslayer(ICMPv6ND_NA): - raise RuntimeError( - 'Unexpected packet with no ICMPv6 ND-NA received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received ' + '{0}'.format(ipv6.__repr__())) - icmpv6_na = ipv6['ICMPv6 Neighbor Discovery - Neighbor Advertisement'] + icmpv6_na = ipv6[ICMPv6ND_NA] # verify target address if icmpv6_na.tgt != dst_ip: - raise RuntimeError('Invalid target address {0} should be {1}'.format( - icmpv6_na.tgt, dst_ip)) + raise RuntimeError('Invalid target address {0} should be {1}'. + format(icmpv6_na.tgt, dst_ip)) if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr): - raise RuntimeError( - 'Missing Destination Link-Layer Address option in ICMPv6 ' + - 'Neighbor Advertisement {0}'.format(icmpv6_na.__repr__())) + raise RuntimeError('Missing Destination Link-Layer Address option in ' + 'ICMPv6 Neighbor Advertisement {0}'. + format(icmpv6_na.__repr__())) - option = 'ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address' - dst_ll_addr = icmpv6_na[option] + dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr] # verify destination link-layer address field if dst_ll_addr.lladdr != dst_mac: - raise RuntimeError('Invalid lladdr {0} should be {1}'.format( - dst_ll_addr.lladdr, dst_mac)) + raise RuntimeError('Invalid lladdr {0} should be {1}'. + format(dst_ll_addr.lladdr, dst_mac)) # verify checksum cksum = icmpv6_na.cksum del icmpv6_na.cksum tmp = ICMPv6ND_NA(str(icmpv6_na)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main()