import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
-from scapy.layers.inet6 import ICMPv6EchoRequest
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
+
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if ether is None:
raise RuntimeError("ICMP echo Rx timeout")
if not ether.haslayer(ip_format):
- raise RuntimeError("Not an IP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ether.__repr__()))
- if ether['Ethernet'].src != dut_if2_mac:
+ if ether[Ether].src != dut_if2_mac:
raise RuntimeError("Source MAC address error")
- if ether['Ethernet'].dst == path_1_mac:
+ if ether[Ether].dst == path_1_mac:
path_1_counter += 1
- elif ether['Ethernet'].dst == path_2_mac:
+ elif ether[Ether].dst == path_2_mac:
path_2_counter += 1
else:
raise RuntimeError("Destination MAC address error")
if (path_1_counter + path_2_counter) != 100:
- raise RuntimeError("Packet loss: recevied only {} packets of 100 "
- .format(path_1_counter + path_2_counter))
+ raise RuntimeError("Packet loss: recevied only {} packets of 100 ".
+ format(path_1_counter + path_2_counter))
if path_1_counter == 0:
raise RuntimeError("Path 1 error!")
sys.exit(0)
+
if __name__ == "__main__":
main()