import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.all import Ether
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
ipformat['Type']:
return True
else:
return False
- except:
+ except: # pylint: disable=bare-except
return False
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
return r_src and r_dst
- except:
+ except: # pylint: disable=bare-except
return False
txq.send(icmp_request)
for _ in range(1000):
- icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
- if icmp_reply is None:
- timeout -= wait_step
- if timeout < 0:
- raise RuntimeError("ICMP echo Rx timeout")
- elif is_icmp_reply(icmp_reply, ip_format):
+ while True:
+ icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
+ if icmp_reply is None:
+ timeout -= wait_step
+ if timeout < 0:
+ raise RuntimeError("ICMP echo Rx timeout")
+
+ elif icmp_reply.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
+ if is_icmp_reply(icmp_reply, ip_format):
if address_check(icmp_request, icmp_reply, ip_format):
break
else:
sys.exit(0)
+
if __name__ == "__main__":
main()