X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fipv6_nd_proxy_check.py;fp=resources%2Ftraffic_scripts%2Fipv6_nd_proxy_check.py;h=c9213999ec8cb831e1bf6534c394c730d2f713ca;hp=b1028028b0c1c4d5f4da7d7c38870205ddad777c;hb=2a848f49308868dfe6fa3a9cb78bd085f8c16f40;hpb=6928a6be42016a6c5edade6369041670fe544f39 diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py index b1028028b0..c9213999ec 100755 --- a/resources/traffic_scripts/ipv6_nd_proxy_check.py +++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py @@ -15,8 +15,9 @@ """Traffic script that sends DHCPv6 proxy packets.""" from scapy.layers.inet import Ether -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ - ICMPv6ND_NS +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): sent_packets = [] - icmpv6nd_solicit_pkt =\ - Ether(src=src_mac, dst=dst_mac) / \ - IPv6(src=src_ip) / \ - ICMPv6ND_NS(tgt=dst_ip) + icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) / + IPv6(src=src_ip) / + ICMPv6ND_NS(tgt=dst_ip)) sent_packets.append(icmpv6nd_solicit_pkt) txq.send(icmpv6nd_solicit_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3, ignore=sent_packets) + while True: + pkt = rxq.recv(3, ignore=sent_packets) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): raise RuntimeError('ICMPv6ND Proxy response timeout.') if ether.src != dst_mac: - raise RuntimeError("Source MAC address error: {} != {}".format( - ether.src, dst_mac)) + raise RuntimeError("Source MAC address error: {} != {}". + format(ether.src, dst_mac)) print "Source MAC address: OK." if ether.dst != src_mac: - raise RuntimeError("Destination MAC address error: {} != {}".format( - ether.dst, src_mac)) + raise RuntimeError("Destination MAC address error: {} != {}". + format(ether.dst, src_mac)) print "Destination MAC address: OK." - if ether['IPv6'].src != dst_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, dst_ip)) + if ether[IPv6].src != dst_ip: + raise RuntimeError("Source IP address error: {} != {}". + format(ether[IPv6].src, dst_ip)) print "Source IP address: OK." - if ether['IPv6'].dst != src_ip: - raise RuntimeError("Destination IP address error: {} != {}".format( - ether['IPv6'].dst, src_ip)) + if ether[IPv6].dst != src_ip: + raise RuntimeError("Destination IP address error: {} != {}". + format(ether[IPv6].dst, src_ip)) print "Destination IP address: OK." try: - target_addr = ether['IPv6']\ - ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt + target_addr = ether[IPv6][ICMPv6ND_NA].tgt except (KeyError, AttributeError): raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") if target_addr != dst_ip: - raise RuntimeError("ICMPv6 field 'Target address' error:" - " {} != {}".format(target_addr, dst_ip)) + raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}". + format(target_addr, dst_ip)) print "Target address field: OK." @@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(dst_if) txq = TxQueue(src_if) - icmpv6_ping_pkt = \ - Ether(src=src_mac, dst=proxy_to_src_mac) / \ - IPv6(src=src_ip, dst=dst_ip) / \ - ICMPv6EchoRequest() + icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) / + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('ICMPv6 Echo Request timeout.') try: - ether["IPv6"]["ICMPv6 Echo Request"] + ether[IPv6]["ICMPv6 Echo Request"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") print "ICMP Echo: OK." @@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(src_if) txq = TxQueue(dst_if) - icmpv6_ping_pkt = \ - Ether(src=dst_mac, dst=proxy_to_dst_mac) / \ - IPv6(src=dst_ip, dst=src_ip) / \ - ICMPv6EchoReply() + icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) / + IPv6(src=dst_ip, dst=src_ip) / + ICMPv6EchoReply()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('DHCPv6 SOLICIT timeout') try: - ether["IPv6"]["ICMPv6 Echo Reply"] + ether[IPv6]["ICMPv6 Echo Reply"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") @@ -187,8 +205,9 @@ def main(): imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) # Verify route (ICMP echo/reply) - ipv6_ping(src_if, dst_if, src_mac, dst_mac, - proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip) + ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, + proxy_to_dst_mac, src_ip, dst_ip) + if __name__ == "__main__": main()