fix(Pylint): Small fixes
[csit.git] / GPL / traffic_scripts / lisp / lisp_check.py
index 1bdafb7..6a5e85c 100644 (file)
@@ -1,11 +1,21 @@
 #!/usr/bin/env python3
 
 #!/usr/bin/env python3
 
-# Copyright (c) 2020 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
+# Copyright (c) 2021 Cisco and/or its affiliates.
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+#
+# Licensed under the Apache License 2.0 or
+# GNU General Public License v2.0 or later;  you may not use this file
+# except in compliance with one of these Licenses. You
+# may obtain a copy of the Licenses at:
 #
 #     http://www.apache.org/licenses/LICENSE-2.0
 #
 #     http://www.apache.org/licenses/LICENSE-2.0
+#     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
+#
+# Note: If this file is linked with Scapy, which is GPLv2+, your use of it
+# must be under GPLv2+.  If at any point in the future it is no longer linked
+# with Scapy (or other GPLv2+ licensed software), you are free to choose
+# Apache 2.
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +28,18 @@ a LISP-encapsulated packet on the other interface and verifies received packet.
 """
 
 import sys
 """
 
 import sys
-import ipaddress
 
 from scapy.all import bind_layers, Packet
 from scapy.fields import FlagsField, BitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
 
 from scapy.all import bind_layers, Packet
 from scapy.fields import FlagsField, BitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
 from ..PacketVerifier import RxQueue, TxQueue
 from ..TrafficScriptArg import TrafficScriptArg
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
 from ..PacketVerifier import RxQueue, TxQueue
 from ..TrafficScriptArg import TrafficScriptArg
+from ..ValidIp import valid_ipv4, valid_ipv6
 
 
 class LispHeader(Packet):
 
 
 class LispHeader(Packet):
@@ -56,26 +66,11 @@ class LispInnerIPv6(IPv6):
     name = u"Lisp Inner Layer - IPv6"
 
 
     name = u"Lisp Inner Layer - IPv6"
 
 
-def valid_ipv4(ip):
-    try:
-        ipaddress.IPv4Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
-
-def valid_ipv6(ip):
-    try:
-        ipaddress.IPv6Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
-
 def main():
     """Send IP ICMP packet from one traffic generator interface to the other.
 
 def main():
     """Send IP ICMP packet from one traffic generator interface to the other.
 
-    :raises RuntimeError: If the received packet is not correct."""
+    :raises RuntimeError: If the received packet is not correct.
+    """
 
     args = TrafficScriptArg(
         [
 
     args = TrafficScriptArg(
         [
@@ -123,13 +118,29 @@ def main():
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
-    if tx_if == rx_if:
-        ether = rxq.recv(2, ignore=sent_packets)
-    else:
-        ether = rxq.recv(2)
-
-    if ether is None:
-        raise RuntimeError(u"ICMP echo Rx timeout")
+    while True:
+        if tx_if == rx_if:
+            ether = rxq.recv(2, ignore=sent_packets)
+        else:
+            ether = rxq.recv(2)
+
+        if ether is None:
+            raise RuntimeError(u"ICMP echo Rx timeout")
+
+        if ether.haslayer(ICMPv6ND_NS):
+            # read another packet in the queue if the current one is ICMPv6ND_NS
+            continue
+        elif ether.haslayer(ICMPv6MLReport2):
+            # read another packet in the queue if the current one is
+            # ICMPv6MLReport2
+            continue
+        elif ether.haslayer(ICMPv6ND_RA):
+            # read another packet in the queue if the current one is
+            # ICMPv6ND_RA
+            continue
+
+        # otherwise process the current packet
+        break
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         print(u"MAC addresses match.")
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         print(u"MAC addresses match.")
@@ -145,7 +156,7 @@ def main():
         if not isinstance(ip, IPv6):
             raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
         if not isinstance(ip, IPv6):
             raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
-            raise RuntimeError(f"Not an IP packet received {ip!r}")
+        raise RuntimeError(f"Not an IP packet received {ip!r}")
 
     lisp = ether.getlayer(lisp_layer)
     if not lisp:
 
     lisp = ether.getlayer(lisp_layer)
     if not lisp: