import ipaddress
from scapy.layers.inet import IP, ICMP, ARP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
"""Send a simple L2 or ICMP packet from one TG interface to DUT, then
receive a copy of the packet on the second TG interface, and a copy of
the ICMP reply."""
- args = TrafficScriptArg(
- ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype'])
+ args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
+ 'ptype'])
src_mac = args.get_arg('tg_src_mac')
dst_mac = args.get_arg('dut_if1_mac')
txq.send(pkt_raw)
sent.append(auto_pad(pkt_raw))
- ether = rxq_mirrored.recv(2)
# Receive copy of Rx packet.
- if ether is None:
- raise RuntimeError("Rx timeout of mirrored Rx packet")
+ while True:
+ ether = rxq_mirrored.recv(2)
+ if ether is None:
+ raise RuntimeError("Rx timeout of mirrored Rx packet")
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
pkt = auto_pad(pkt_raw)
if str(ether) != str(pkt):
print("Mirrored Rx packet doesn't match the original Rx packet.")
# Receive reply on TG Tx port.
ether_repl = rxq_tx.recv(2, sent)
+
if ether_repl is None:
raise RuntimeError("Reply not received on TG Tx port.")
- else:
- print("Reply received on TG Tx port.\n")
+
+ print("Reply received on TG Tx port.\n")
# Receive copy of Tx packet.
ether = rxq_mirrored.recv(2)
if ether is None:
raise RuntimeError("Rx timeout of mirrored Tx packet")
+
if str(ether) != str(ether_repl):
print("Mirrored Tx packet doesn't match the received Tx packet.")
if ether.src != ether_repl.src or ether.dst != ether_repl.dst: