Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets
[csit.git] / resources / traffic_scripts / ipv6_nd_proxy_check.py
index b102802..c921399 100755 (executable)
@@ -15,8 +15,9 @@
 """Traffic script that sends DHCPv6 proxy packets."""
 
 from scapy.layers.inet import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
-    ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
 
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
 
     sent_packets = []
 
-    icmpv6nd_solicit_pkt =\
-        Ether(src=src_mac, dst=dst_mac) / \
-        IPv6(src=src_ip) / \
-        ICMPv6ND_NS(tgt=dst_ip)
+    icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
+                            IPv6(src=src_ip) /
+                            ICMPv6ND_NS(tgt=dst_ip))
 
     sent_packets.append(icmpv6nd_solicit_pkt)
     txq.send(icmpv6nd_solicit_pkt)
 
     ether = None
     for _ in range(5):
-        pkt = rxq.recv(3, ignore=sent_packets)
+        while True:
+            pkt = rxq.recv(3, ignore=sent_packets)
+            if ether.haslayer(ICMPv6ND_NS):
+                # read another packet in the queue in case of ICMPv6ND_NS packet
+                continue
+            else:
+                # otherwise process the current packet
+                break
         if pkt is not None:
             ether = pkt
             break
@@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
         raise RuntimeError('ICMPv6ND Proxy response timeout.')
 
     if ether.src != dst_mac:
-        raise RuntimeError("Source MAC address error: {} != {}".format(
-            ether.src, dst_mac))
+        raise RuntimeError("Source MAC address error: {} != {}".
+                           format(ether.src, dst_mac))
     print "Source MAC address: OK."
 
     if ether.dst != src_mac:
-        raise RuntimeError("Destination MAC address error: {} != {}".format(
-            ether.dst, src_mac))
+        raise RuntimeError("Destination MAC address error: {} != {}".
+                           format(ether.dst, src_mac))
     print "Destination MAC address: OK."
 
-    if ether['IPv6'].src != dst_ip:
-        raise RuntimeError("Source IP address error: {} != {}".format(
-            ether['IPv6'].src, dst_ip))
+    if ether[IPv6].src != dst_ip:
+        raise RuntimeError("Source IP address error: {} != {}".
+                           format(ether[IPv6].src, dst_ip))
     print "Source IP address: OK."
 
-    if ether['IPv6'].dst != src_ip:
-        raise RuntimeError("Destination IP address error: {} != {}".format(
-            ether['IPv6'].dst, src_ip))
+    if ether[IPv6].dst != src_ip:
+        raise RuntimeError("Destination IP address error: {} != {}".
+                           format(ether[IPv6].dst, src_ip))
     print "Destination IP address: OK."
 
     try:
-        target_addr = ether['IPv6']\
-            ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
+        target_addr = ether[IPv6][ICMPv6ND_NA].tgt
     except (KeyError, AttributeError):
         raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
 
     if target_addr != dst_ip:
-        raise RuntimeError("ICMPv6 field 'Target address' error:"
-                           " {} != {}".format(target_addr, dst_ip))
+        raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
+                           format(target_addr, dst_ip))
     print "Target address field: OK."
 
 
@@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
     rxq = RxQueue(dst_if)
     txq = TxQueue(src_if)
 
-    icmpv6_ping_pkt = \
-        Ether(src=src_mac, dst=proxy_to_src_mac) / \
-        IPv6(src=src_ip, dst=dst_ip) / \
-        ICMPv6EchoRequest()
+    icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
+                       IPv6(src=src_ip, dst=dst_ip) /
+                       ICMPv6EchoRequest())
 
     txq.send(icmpv6_ping_pkt)
 
     ether = None
     for _ in range(5):
-        pkt = rxq.recv(3)
+        while True:
+            pkt = rxq.recv(3)
+            if ether.haslayer(ICMPv6ND_NS):
+                # read another packet in the queue in case of ICMPv6ND_NS packet
+                continue
+            else:
+                # otherwise process the current packet
+                break
         if pkt is not None:
             ether = pkt
             break
@@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
     if ether is None:
         raise RuntimeError('ICMPv6 Echo Request timeout.')
     try:
-        ether["IPv6"]["ICMPv6 Echo Request"]
+        ether[IPv6]["ICMPv6 Echo Request"]
     except KeyError:
         raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
     print "ICMP Echo: OK."
@@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
     rxq = RxQueue(src_if)
     txq = TxQueue(dst_if)
 
-    icmpv6_ping_pkt = \
-        Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
-        IPv6(src=dst_ip, dst=src_ip) / \
-        ICMPv6EchoReply()
+    icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
+                       IPv6(src=dst_ip, dst=src_ip) /
+                       ICMPv6EchoReply())
 
     txq.send(icmpv6_ping_pkt)
 
     ether = None
     for _ in range(5):
-        pkt = rxq.recv(3)
+        while True:
+            pkt = rxq.recv(3)
+            if ether.haslayer(ICMPv6ND_NS):
+                # read another packet in the queue in case of ICMPv6ND_NS packet
+                continue
+            else:
+                # otherwise process the current packet
+                break
         if pkt is not None:
             ether = pkt
             break
@@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
     if ether is None:
         raise RuntimeError('DHCPv6 SOLICIT timeout')
     try:
-        ether["IPv6"]["ICMPv6 Echo Reply"]
+        ether[IPv6]["ICMPv6 Echo Reply"]
     except KeyError:
         raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
 
@@ -187,8 +205,9 @@ def main():
     imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
 
     # Verify route (ICMP echo/reply)
-    ipv6_ping(src_if, dst_if, src_mac, dst_mac,
-              proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
+    ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
+              proxy_to_dst_mac, src_ip, dst_ip)
+
 
 if __name__ == "__main__":
     main()