Traffic scripts: Move valid_ipv* to a library
[csit.git] / GPL / traffic_scripts / send_ip_check_headers.py
index a384b68..a13b1b7 100644 (file)
@@ -30,33 +30,15 @@ MAC addresses are checked in received packet.
 
 import sys
 
 
 import sys
 
-import ipaddress
-
 from robot.api import logger
 from scapy.layers.inet import IP
 from robot.api import logger
 from scapy.layers.inet import IP
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.packet import Raw
 
 from .PacketVerifier import RxQueue, TxQueue
 from .TrafficScriptArg import TrafficScriptArg
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.packet import Raw
 
 from .PacketVerifier import RxQueue, TxQueue
 from .TrafficScriptArg import TrafficScriptArg
-
-
-def valid_ipv4(ip):
-    try:
-        ipaddress.IPv4Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
-
-def valid_ipv6(ip):
-    try:
-        ipaddress.IPv6Address(ip)
-        return True
-    except (AttributeError, ipaddress.AddressValueError):
-        return False
-
+from .ValidIp import valid_ipv4, valid_ipv6
 
 def main():
     """Send IP/IPv6 packet from one traffic generator interface to the other."""
 
 def main():
     """Send IP/IPv6 packet from one traffic generator interface to the other."""
@@ -90,7 +72,7 @@ def main():
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
 
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
 
-    sent_packets =list()
+    sent_packets = list()
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
 
     if encaps_tx == u"Dot1q":
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
 
     if encaps_tx == u"Dot1q":
@@ -125,9 +107,16 @@ def main():
         if ether.haslayer(ICMPv6ND_NS):
             # read another packet in the queue if the current one is ICMPv6ND_NS
             continue
         if ether.haslayer(ICMPv6ND_NS):
             # read another packet in the queue if the current one is ICMPv6ND_NS
             continue
-        else:
-            # otherwise process the current packet
-            break
+        elif ether.haslayer(ICMPv6MLReport2):
+            # read another packet in the queue if the current one is
+            # ICMPv6MLReport2
+            continue
+        elif ether.haslayer(ICMPv6ND_RA):
+            # read another packet in the queue if the current one is
+            # ICMPv6ND_RA
+            continue
+
+        break
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         logger.trace(u"MAC matched")
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
         logger.trace(u"MAC matched")