FIX: Pylint reduce
[csit.git] / GPL / traffic_scripts / lisp / lispgpe_check.py
index 41bc096..f8fa595 100644 (file)
@@ -35,7 +35,7 @@ from scapy.all import bind_layers, Packet
 from scapy.fields import FlagsField, BitField, XBitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
@@ -92,17 +92,33 @@ class LispGPEInnerNSH(Packet):
     """
 
 
-def valid_ipv4(ip):
+def valid_ipv4(ip_address):
+    """Check IPv4 address.
+
+    :param ip_address: IPv4 address to check.
+    :type ip_address: str
+    :returns: True if IP address is correct.
+    :rtype: bool
+    :raises AttributeError, AddressValueError: If IP address is not valid.
+    """
     try:
-        ipaddress.IPv4Address(ip)
+        ipaddress.IPv4Address(ip_address)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
 
 
-def valid_ipv6(ip):
+def valid_ipv6(ip_address):
+    """Check IPv6 address.
+
+    :param ip_address: IPv6 address to check.
+    :type ip_address: str
+    :returns: True if IP address is correct.
+    :rtype: bool
+    :raises AttributeError, AddressValueError: If IP address is not valid.
+    """
     try:
-        ipaddress.IPv6Address(ip)
+        ipaddress.IPv6Address(ip_address)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -156,13 +172,28 @@ def main():
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
-    if tx_if == rx_if:
-        ether = rxq.recv(2, ignore=sent_packets)
-    else:
-        ether = rxq.recv(2)
-
-    if ether is None:
-        raise RuntimeError(u"ICMP echo Rx timeout")
+    while True:
+        if tx_if == rx_if:
+            ether = rxq.recv(2, ignore=sent_packets)
+        else:
+            ether = rxq.recv(2)
+
+        if ether is None:
+            raise RuntimeError(u"ICMP echo Rx timeout")
+
+        if ether.haslayer(ICMPv6ND_NS):
+            # read another packet in the queue if the current one is ICMPv6ND_NS
+            continue
+        if ether.haslayer(ICMPv6ND_RA):
+            # read another packet in the queue if the current one is ICMPv6ND_RA
+            continue
+        elif ether.haslayer(ICMPv6MLReport2):
+            # read another packet in the queue if the current one is
+            # ICMPv6MLReport2
+            continue
+
+        # otherwise process the current packet
+        break
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         print(u"MAC addresses match.")