X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fsend_icmp_wait_for_reply.py;fp=resources%2Ftraffic_scripts%2Fsend_icmp_wait_for_reply.py;h=7677152801ca051e25c733b6a0c2b2fce6af7389;hp=a77f15efc2e3e5dc583611b25ba4577a5ff38cee;hb=2a848f49308868dfe6fa3a9cb78bd085f8c16f40;hpb=6928a6be42016a6c5edade6369041670fe544f39 diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py index a77f15efc2..7677152801 100755 --- a/resources/traffic_scripts/send_icmp_wait_for_reply.py +++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py @@ -17,9 +17,9 @@ import sys import ipaddress -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.all import Ether +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -35,14 +35,14 @@ def is_icmp_reply(pkt, ipformat): :type ipformat: dict :rtype: bool """ - #pylint: disable=bare-except + # pylint: disable=bare-except try: if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \ ipformat['Type']: return True else: return False - except: + except: # pylint: disable=bare-except return False @@ -58,12 +58,12 @@ def address_check(request, reply, ipformat): :type ipformat: dict :rtype: bool """ - #pylint: disable=bare-except + # pylint: disable=bare-except try: r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src return r_src and r_dst - except: + except: # pylint: disable=bare-except return False @@ -139,12 +139,21 @@ def main(): txq.send(icmp_request) for _ in range(1000): - icmp_reply = rxq.recv(wait_step, ignore=sent_packets) - if icmp_reply is None: - timeout -= wait_step - if timeout < 0: - raise RuntimeError("ICMP echo Rx timeout") - elif is_icmp_reply(icmp_reply, ip_format): + while True: + icmp_reply = rxq.recv(wait_step, ignore=sent_packets) + if icmp_reply is None: + timeout -= wait_step + if timeout < 0: + raise RuntimeError("ICMP echo Rx timeout") + + elif icmp_reply.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break + + if is_icmp_reply(icmp_reply, ip_format): if address_check(icmp_request, icmp_reply, ip_format): break else: @@ -154,5 +163,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main()