fix(Pylint): Small fixes
[csit.git] / GPL / traffic_scripts / lisp / lisp_check.py
index 6612485..6a5e85c 100644 (file)
@@ -28,18 +28,18 @@ a LISP-encapsulated packet on the other interface and verifies received packet.
 """
 
 import sys
 """
 
 import sys
-import ipaddress
 
 from scapy.all import bind_layers, Packet
 from scapy.fields import FlagsField, BitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
 
 from scapy.all import bind_layers, Packet
 from scapy.fields import FlagsField, BitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
 from ..PacketVerifier import RxQueue, TxQueue
 from ..TrafficScriptArg import TrafficScriptArg
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
 from ..PacketVerifier import RxQueue, TxQueue
 from ..TrafficScriptArg import TrafficScriptArg
+from ..ValidIp import valid_ipv4, valid_ipv6
 
 
 class LispHeader(Packet):
 
 
 class LispHeader(Packet):
@@ -66,26 +66,11 @@ class LispInnerIPv6(IPv6):
     name = u"Lisp Inner Layer - IPv6"
 
 
     name = u"Lisp Inner Layer - IPv6"
 
 
-def valid_ipv4(ip):
-    try:
-        ipaddress.IPv4Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
-
-def valid_ipv6(ip):
-    try:
-        ipaddress.IPv6Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
-
 def main():
     """Send IP ICMP packet from one traffic generator interface to the other.
 
 def main():
     """Send IP ICMP packet from one traffic generator interface to the other.
 
-    :raises RuntimeError: If the received packet is not correct."""
+    :raises RuntimeError: If the received packet is not correct.
+    """
 
     args = TrafficScriptArg(
         [
 
     args = TrafficScriptArg(
         [
@@ -133,13 +118,29 @@ def main():
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
-    if tx_if == rx_if:
-        ether = rxq.recv(2, ignore=sent_packets)
-    else:
-        ether = rxq.recv(2)
-
-    if ether is None:
-        raise RuntimeError(u"ICMP echo Rx timeout")
+    while True:
+        if tx_if == rx_if:
+            ether = rxq.recv(2, ignore=sent_packets)
+        else:
+            ether = rxq.recv(2)
+
+        if ether is None:
+            raise RuntimeError(u"ICMP echo Rx timeout")
+
+        if ether.haslayer(ICMPv6ND_NS):
+            # read another packet in the queue if the current one is ICMPv6ND_NS
+            continue
+        elif ether.haslayer(ICMPv6MLReport2):
+            # read another packet in the queue if the current one is
+            # ICMPv6MLReport2
+            continue
+        elif ether.haslayer(ICMPv6ND_RA):
+            # read another packet in the queue if the current one is
+            # ICMPv6ND_RA
+            continue
+
+        # otherwise process the current packet
+        break
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         print(u"MAC addresses match.")
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         print(u"MAC addresses match.")
@@ -155,7 +156,7 @@ def main():
         if not isinstance(ip, IPv6):
             raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
         if not isinstance(ip, IPv6):
             raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
-            raise RuntimeError(f"Not an IP packet received {ip!r}")
+        raise RuntimeError(f"Not an IP packet received {ip!r}")
 
     lisp = ether.getlayer(lisp_layer)
     if not lisp:
 
     lisp = ether.getlayer(lisp_layer)
     if not lisp: