Python3: resources and libraries
[csit.git] / resources / traffic_scripts / lisp / lispgpe_check.py
index d4de863..50857c4 100755 (executable)
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -20,12 +21,13 @@ packet.
 import sys
 import ipaddress
 
 import sys
 import ipaddress
 
+from scapy.all import bind_layers, Packet
+from scapy.fields import FlagsField, BitField, XBitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
 from scapy.layers.inet6 import IPv6
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
 from scapy.layers.inet6 import IPv6
 from scapy.layers.l2 import Ether
-from scapy.all import bind_layers, Packet
-from scapy.fields import FlagsField, BitField, XBitField, IntField
+from scapy.packet import Raw
 
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
 
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -33,13 +35,17 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
 
 class LispGPEHeader(Packet):
     """Scapy header for the Lisp GPE Layer."""
 
 class LispGPEHeader(Packet):
     """Scapy header for the Lisp GPE Layer."""
+
     name = "Lisp GPE Header"
     fields_desc = [
     name = "Lisp GPE Header"
     fields_desc = [
-        FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
-        BitField("version", 0, size=2),
-        BitField("reserved", 0, size=14),
-        XBitField("next_protocol", 0, size=8),
-        IntField("instance_id/locator_status_bits", 0)]
+        FlagsField(
+            u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
+        ),
+        BitField(u"version", 0, size=2),
+        BitField(u"reserved", 0, size=14),
+        XBitField(u"next_protocol", 0, size=8),
+        IntField(u"instance_id/locator_status_bits", 0)
+    ]
 
     def guess_payload_class(self, payload):
         protocol = {
 
     def guess_payload_class(self, payload):
         protocol = {
@@ -53,17 +59,20 @@ class LispGPEHeader(Packet):
 
 class LispGPEInnerIP(IP):
     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
 
 class LispGPEInnerIP(IP):
     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
-    name = "Lisp GPE Inner Layer - IPv4"
+
+    name = u"Lisp GPE Inner Layer - IPv4"
 
 
 class LispGPEInnerIPv6(IPv6):
     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
 
 
 class LispGPEInnerIPv6(IPv6):
     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
-    name = "Lisp GPE Inner Layer - IPv6"
+
+    name = u"Lisp GPE Inner Layer - IPv6"
 
 
 class LispGPEInnerEther(Ether):
     """Scapy inner LISP GPE layer for Lisp-L2."""
 
 
 class LispGPEInnerEther(Ether):
     """Scapy inner LISP GPE layer for Lisp-L2."""
-    name = "Lisp GPE Inner Layer - Ethernet"
+
+    name = u"Lisp GPE Inner Layer - Ethernet"
 
 
 class LispGPEInnerNSH(Packet):
 
 
 class LispGPEInnerNSH(Packet):
@@ -75,7 +84,7 @@ class LispGPEInnerNSH(Packet):
 
 def valid_ipv4(ip):
     try:
 
 def valid_ipv4(ip):
     try:
-        ipaddress.IPv4Address(unicode(ip))
+        ipaddress.IPv4Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -83,7 +92,7 @@ def valid_ipv4(ip):
 
 def valid_ipv6(ip):
     try:
 
 def valid_ipv6(ip):
     try:
-        ipaddress.IPv6Address(unicode(ip))
+        ipaddress.IPv6Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -95,25 +104,28 @@ def main():
     :raises RuntimeError: If the received packet is not correct."""
 
     args = TrafficScriptArg(
     :raises RuntimeError: If the received packet is not correct."""
 
     args = TrafficScriptArg(
-        ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
-         'dut_if2_mac', 'src_rloc', 'dst_rloc'],
-        ['ot_mode'])
-
-    tx_src_mac = args.get_arg('tg_src_mac')
-    tx_dst_mac = args.get_arg('dut_if1_mac')
-    rx_dst_mac = args.get_arg('tg_dst_mac')
-    rx_src_mac = args.get_arg('dut_if2_mac')
-    src_ip = args.get_arg('src_ip')
-    dst_ip = args.get_arg('dst_ip')
-    src_rloc = args.get_arg("src_rloc")
-    dst_rloc = args.get_arg("dst_rloc")
-    tx_if = args.get_arg('tx_if')
-    rx_if = args.get_arg('rx_if')
-    ot_mode = args.get_arg('ot_mode')
+        [
+            u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+            u"dut_if2_mac", u"src_rloc", u"dst_rloc"
+        ],
+        [u"ot_mode"]
+    )
+
+    tx_src_mac = args.get_arg(u"tg_src_mac")
+    tx_dst_mac = args.get_arg(u"dut_if1_mac")
+    rx_dst_mac = args.get_arg(u"tg_dst_mac")
+    rx_src_mac = args.get_arg(u"dut_if2_mac")
+    src_ip = args.get_arg(u"src_ip")
+    dst_ip = args.get_arg(u"dst_ip")
+    src_rloc = args.get_arg(u"src_rloc")
+    dst_rloc = args.get_arg(u"dst_rloc")
+    tx_if = args.get_arg(u"tx_if")
+    rx_if = args.get_arg(u"rx_if")
+    ot_mode = args.get_arg(u"ot_mode")
 
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
 
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
-    sent_packets = []
+
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
 
     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
 
     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
@@ -125,10 +137,12 @@ def main():
         pkt_raw /= ICMPv6EchoRequest()
         ip_format = IPv6
     else:
         pkt_raw /= ICMPv6EchoRequest()
         ip_format = IPv6
     else:
-        raise ValueError("IP not in correct format")
+        raise ValueError(u"IP not in correct format")
 
     bind_layers(UDP, LispGPEHeader, dport=4341)
 
 
     bind_layers(UDP, LispGPEHeader, dport=4341)
 
+    pkt_raw /= Raw()
+    sent_packets = list()
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
@@ -138,59 +152,59 @@ def main():
         ether = rxq.recv(2)
 
     if ether is None:
         ether = rxq.recv(2)
 
     if ether is None:
-        raise RuntimeError("ICMP echo Rx timeout")
+        raise RuntimeError(u"ICMP echo Rx timeout")
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
-        print("MAC addresses match.")
+        print(u"MAC addresses match.")
     else:
     else:
-        raise RuntimeError(
-            "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+        raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
 
     ip = ether.payload
 
 
     ip = ether.payload
 
-    if ot_mode == '6to4':
+    if ot_mode == u"6to4":
         if not isinstance(ip, IP):
         if not isinstance(ip, IP):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
-    elif ot_mode == '4to6':
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
+    elif ot_mode == u"4to6":
         if not isinstance(ip, IPv6):
         if not isinstance(ip, IPv6):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
     elif not isinstance(ip, ip_format):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
 
     lisp = ether.getlayer(LispGPEHeader).underlayer
     if not lisp:
 
     lisp = ether.getlayer(LispGPEHeader).underlayer
     if not lisp:
-        raise RuntimeError("Lisp layer not present or parsing failed.")
+        raise RuntimeError(u"Lisp layer not present or parsing failed.")
 
     # Compare data from packets
     if src_ip == lisp.src:
 
     # Compare data from packets
     if src_ip == lisp.src:
-        print("Source IP matches source EID.")
+        print(u"Source IP matches source EID.")
     else:
     else:
-        raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
-                           .format(src_ip, lisp.src))
+        raise RuntimeError(
+            f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
+        )
 
     if dst_ip == lisp.dst:
 
     if dst_ip == lisp.dst:
-        print("Destination IP matches destination EID.")
+        print(u"Destination IP matches destination EID.")
     else:
     else:
-        raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
-                           .format(dst_ip, lisp.dst))
+        raise RuntimeError(
+            f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
+        )
 
     if src_rloc == ip.src:
 
     if src_rloc == ip.src:
-        print("Source RLOC matches configuration.")
+        print(u"Source RLOC matches configuration.")
     else:
     else:
-        raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
-                           .format(src_rloc, ip.src))
+        raise RuntimeError(
+            f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
+        )
 
     if dst_rloc == ip.dst:
 
     if dst_rloc == ip.dst:
-        print("Destination RLOC matches configuration.")
+        print(u"Destination RLOC matches configuration.")
     else:
     else:
-        raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
-                           .format(dst_rloc, ip.dst))
+        raise RuntimeError(
+            f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
+        )
 
     sys.exit(0)
 
 
 
     sys.exit(0)
 
 
-if __name__ == "__main__":
+if __name__ == u"__main__":
     main()
     main()