Python3: resources and libraries
[csit.git] / resources / traffic_scripts / lisp / lisp_check.py
index 9937de8..ebd769f 100755 (executable)
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -19,12 +20,13 @@ a LISP-encapsulated packet on the other interface and verifies received packet.
 import sys
 import ipaddress
 
+from scapy.all import bind_layers, Packet
+from scapy.fields import FlagsField, BitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
 from scapy.layers.inet6 import IPv6
 from scapy.layers.l2 import Ether
-from scapy.all import bind_layers, Packet
-from scapy.fields import FlagsField, BitField, IntField
+from scapy.packet import Raw
 
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -32,26 +34,31 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
 
 class LispHeader(Packet):
     """Scapy header for the LISP Layer."""
-    name = "Lisp Header"
+
+    name = u"Lisp Header"
     fields_desc = [
-        FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]),
-        BitField("nonce/map_version", 0, size=24),
-        IntField("instance_id/locator_status_bits", 0)]
+        FlagsField(
+            u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
+        ),
+        BitField(u"nonce/map_version", 0, size=24),
+        IntField(u"instance_id/locator_status_bits", 0)]
 
 
 class LispInnerIP(IP):
     """Scapy inner LISP layer for IPv4-in-IPv4."""
-    name = "Lisp Inner Layer - IPv4"
+
+    name = u"Lisp Inner Layer - IPv4"
 
 
 class LispInnerIPv6(IPv6):
     """Scapy inner LISP layer for IPv6-in-IPv6."""
-    name = "Lisp Inner Layer - IPv6"
+
+    name = u"Lisp Inner Layer - IPv6"
 
 
 def valid_ipv4(ip):
     try:
-        ipaddress.IPv4Address(unicode(ip))
+        ipaddress.IPv4Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -59,7 +66,7 @@ def valid_ipv4(ip):
 
 def valid_ipv6(ip):
     try:
-        ipaddress.IPv6Address(unicode(ip))
+        ipaddress.IPv6Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -71,25 +78,28 @@ def main():
     :raises RuntimeError: If the received packet is not correct."""
 
     args = TrafficScriptArg(
-        ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
-         'dut_if2_mac', 'src_rloc', 'dst_rloc'],
-        ['ot_mode'])
-
-    tx_src_mac = args.get_arg('tg_src_mac')
-    tx_dst_mac = args.get_arg('dut_if1_mac')
-    rx_dst_mac = args.get_arg('tg_dst_mac')
-    rx_src_mac = args.get_arg('dut_if2_mac')
-    src_ip = args.get_arg('src_ip')
-    dst_ip = args.get_arg('dst_ip')
-    src_rloc = args.get_arg("src_rloc")
-    dst_rloc = args.get_arg("dst_rloc")
-    tx_if = args.get_arg('tx_if')
-    rx_if = args.get_arg('rx_if')
-    ot_mode = args.get_arg('ot_mode')
+        [
+            u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+            u"dut_if2_mac", u"src_rloc", u"dst_rloc"
+        ],
+        [u"ot_mode"]
+    )
+
+    tx_src_mac = args.get_arg(u"tg_src_mac")
+    tx_dst_mac = args.get_arg(u"dut_if1_mac")
+    rx_dst_mac = args.get_arg(u"tg_dst_mac")
+    rx_src_mac = args.get_arg(u"dut_if2_mac")
+    src_ip = args.get_arg(u"src_ip")
+    dst_ip = args.get_arg(u"dst_ip")
+    src_rloc = args.get_arg(u"src_rloc")
+    dst_rloc = args.get_arg(u"dst_rloc")
+    tx_if = args.get_arg(u"tx_if")
+    rx_if = args.get_arg(u"rx_if")
+    ot_mode = args.get_arg(u"ot_mode")
 
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
-    sent_packets = []
+
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
 
     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
@@ -103,10 +113,13 @@ def main():
         ip_format = IPv6
         lisp_layer = LispInnerIPv6
     else:
-        raise ValueError("IP not in correct format")
+        raise ValueError(u"IP not in correct format")
 
     bind_layers(UDP, LispHeader, dport=4341)
     bind_layers(LispHeader, lisp_layer)
+
+    pkt_raw /= Raw()
+    sent_packets = list()
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
@@ -116,27 +129,23 @@ def main():
         ether = rxq.recv(2)
 
     if ether is None:
-        raise RuntimeError("ICMP echo Rx timeout")
+        raise RuntimeError(u"ICMP echo Rx timeout")
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
-        print("MAC addresses match.")
+        print(u"MAC addresses match.")
     else:
-        raise RuntimeError(
-            "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+        raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
 
     ip = ether.payload
 
-    if ot_mode == '6to4':
+    if ot_mode == u"6to4":
         if not isinstance(ip, IP):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
-    elif ot_mode == '4to6':
-        if not isinstance(ip, IP6):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
+    elif ot_mode == u"4to6":
+        if not isinstance(ip, IPv6):
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
 
     lisp = ether.getlayer(lisp_layer)
     if not lisp:
@@ -144,31 +153,35 @@ def main():
 
     # Compare data from packets
     if src_ip == lisp.src:
-        print("Source IP matches source EID.")
+        print(u"Source IP matches source EID.")
     else:
-        raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
-                           .format(src_ip, lisp.src))
+        raise RuntimeError(
+            f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
+        )
 
     if dst_ip == lisp.dst:
-        print("Destination IP matches destination EID.")
+        print(u"Destination IP matches destination EID.")
     else:
-        raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
-                           .format(dst_ip, lisp.dst))
+        raise RuntimeError(
+            f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
+        )
 
     if src_rloc == ip.src:
-        print("Source RLOC matches configuration.")
+        print(u"Source RLOC matches configuration.")
     else:
-        raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
-                           .format(src_rloc, ip.src))
+        raise RuntimeError(
+            f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
+        )
 
     if dst_rloc == ip.dst:
-        print("Destination RLOC matches configuration.")
+        print(u"Destination RLOC matches configuration.")
     else:
-        raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
-                           .format(dst_rloc, ip.dst))
+        raise RuntimeError(
+            f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
+        )
 
     sys.exit(0)
 
 
-if __name__ == "__main__":
+if __name__ == u"__main__":
     main()