fix(Pylint): Small fixes
[csit.git] / GPL / traffic_scripts / lisp / lisp_check.py
index 6612485..6a5e85c 100644 (file)
@@ -28,18 +28,18 @@ a LISP-encapsulated packet on the other interface and verifies received packet.
 """
 
 import sys
-import ipaddress
 
 from scapy.all import bind_layers, Packet
 from scapy.fields import FlagsField, BitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
 from ..PacketVerifier import RxQueue, TxQueue
 from ..TrafficScriptArg import TrafficScriptArg
+from ..ValidIp import valid_ipv4, valid_ipv6
 
 
 class LispHeader(Packet):
@@ -66,26 +66,11 @@ class LispInnerIPv6(IPv6):
     name = u"Lisp Inner Layer - IPv6"
 
 
-def valid_ipv4(ip):
-    try:
-        ipaddress.IPv4Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
-
-def valid_ipv6(ip):
-    try:
-        ipaddress.IPv6Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
-
 def main():
     """Send IP ICMP packet from one traffic generator interface to the other.
 
-    :raises RuntimeError: If the received packet is not correct."""
+    :raises RuntimeError: If the received packet is not correct.
+    """
 
     args = TrafficScriptArg(
         [
@@ -133,13 +118,29 @@ def main():
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
-    if tx_if == rx_if:
-        ether = rxq.recv(2, ignore=sent_packets)
-    else:
-        ether = rxq.recv(2)
-
-    if ether is None:
-        raise RuntimeError(u"ICMP echo Rx timeout")
+    while True:
+        if tx_if == rx_if:
+            ether = rxq.recv(2, ignore=sent_packets)
+        else:
+            ether = rxq.recv(2)
+
+        if ether is None:
+            raise RuntimeError(u"ICMP echo Rx timeout")
+
+        if ether.haslayer(ICMPv6ND_NS):
+            # read another packet in the queue if the current one is ICMPv6ND_NS
+            continue
+        elif ether.haslayer(ICMPv6MLReport2):
+            # read another packet in the queue if the current one is
+            # ICMPv6MLReport2
+            continue
+        elif ether.haslayer(ICMPv6ND_RA):
+            # read another packet in the queue if the current one is
+            # ICMPv6ND_RA
+            continue
+
+        # otherwise process the current packet
+        break
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         print(u"MAC addresses match.")
@@ -155,7 +156,7 @@ def main():
         if not isinstance(ip, IPv6):
             raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
-            raise RuntimeError(f"Not an IP packet received {ip!r}")
+        raise RuntimeError(f"Not an IP packet received {ip!r}")
 
     lisp = ether.getlayer(lisp_layer)
     if not lisp: