Python3: resources and libraries
[csit.git] / resources / traffic_scripts / lisp / lispgpe_check.py
index d4de863..50857c4 100755 (executable)
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -20,12 +21,13 @@ packet.
 import sys
 import ipaddress
 
+from scapy.all import bind_layers, Packet
+from scapy.fields import FlagsField, BitField, XBitField, IntField
 from scapy.layers.inet import ICMP, IP, UDP
 from scapy.layers.inet6 import ICMPv6EchoRequest
 from scapy.layers.inet6 import IPv6
 from scapy.layers.l2 import Ether
-from scapy.all import bind_layers, Packet
-from scapy.fields import FlagsField, BitField, XBitField, IntField
+from scapy.packet import Raw
 
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -33,13 +35,17 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
 
 class LispGPEHeader(Packet):
     """Scapy header for the Lisp GPE Layer."""
+
     name = "Lisp GPE Header"
     fields_desc = [
-        FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
-        BitField("version", 0, size=2),
-        BitField("reserved", 0, size=14),
-        XBitField("next_protocol", 0, size=8),
-        IntField("instance_id/locator_status_bits", 0)]
+        FlagsField(
+            u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
+        ),
+        BitField(u"version", 0, size=2),
+        BitField(u"reserved", 0, size=14),
+        XBitField(u"next_protocol", 0, size=8),
+        IntField(u"instance_id/locator_status_bits", 0)
+    ]
 
     def guess_payload_class(self, payload):
         protocol = {
@@ -53,17 +59,20 @@ class LispGPEHeader(Packet):
 
 class LispGPEInnerIP(IP):
     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
-    name = "Lisp GPE Inner Layer - IPv4"
+
+    name = u"Lisp GPE Inner Layer - IPv4"
 
 
 class LispGPEInnerIPv6(IPv6):
     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
-    name = "Lisp GPE Inner Layer - IPv6"
+
+    name = u"Lisp GPE Inner Layer - IPv6"
 
 
 class LispGPEInnerEther(Ether):
     """Scapy inner LISP GPE layer for Lisp-L2."""
-    name = "Lisp GPE Inner Layer - Ethernet"
+
+    name = u"Lisp GPE Inner Layer - Ethernet"
 
 
 class LispGPEInnerNSH(Packet):
@@ -75,7 +84,7 @@ class LispGPEInnerNSH(Packet):
 
 def valid_ipv4(ip):
     try:
-        ipaddress.IPv4Address(unicode(ip))
+        ipaddress.IPv4Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -83,7 +92,7 @@ def valid_ipv4(ip):
 
 def valid_ipv6(ip):
     try:
-        ipaddress.IPv6Address(unicode(ip))
+        ipaddress.IPv6Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -95,25 +104,28 @@ def main():
     :raises RuntimeError: If the received packet is not correct."""
 
     args = TrafficScriptArg(
-        ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
-         'dut_if2_mac', 'src_rloc', 'dst_rloc'],
-        ['ot_mode'])
-
-    tx_src_mac = args.get_arg('tg_src_mac')
-    tx_dst_mac = args.get_arg('dut_if1_mac')
-    rx_dst_mac = args.get_arg('tg_dst_mac')
-    rx_src_mac = args.get_arg('dut_if2_mac')
-    src_ip = args.get_arg('src_ip')
-    dst_ip = args.get_arg('dst_ip')
-    src_rloc = args.get_arg("src_rloc")
-    dst_rloc = args.get_arg("dst_rloc")
-    tx_if = args.get_arg('tx_if')
-    rx_if = args.get_arg('rx_if')
-    ot_mode = args.get_arg('ot_mode')
+        [
+            u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+            u"dut_if2_mac", u"src_rloc", u"dst_rloc"
+        ],
+        [u"ot_mode"]
+    )
+
+    tx_src_mac = args.get_arg(u"tg_src_mac")
+    tx_dst_mac = args.get_arg(u"dut_if1_mac")
+    rx_dst_mac = args.get_arg(u"tg_dst_mac")
+    rx_src_mac = args.get_arg(u"dut_if2_mac")
+    src_ip = args.get_arg(u"src_ip")
+    dst_ip = args.get_arg(u"dst_ip")
+    src_rloc = args.get_arg(u"src_rloc")
+    dst_rloc = args.get_arg(u"dst_rloc")
+    tx_if = args.get_arg(u"tx_if")
+    rx_if = args.get_arg(u"rx_if")
+    ot_mode = args.get_arg(u"ot_mode")
 
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
-    sent_packets = []
+
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
 
     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
@@ -125,10 +137,12 @@ def main():
         pkt_raw /= ICMPv6EchoRequest()
         ip_format = IPv6
     else:
-        raise ValueError("IP not in correct format")
+        raise ValueError(u"IP not in correct format")
 
     bind_layers(UDP, LispGPEHeader, dport=4341)
 
+    pkt_raw /= Raw()
+    sent_packets = list()
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
@@ -138,59 +152,59 @@ def main():
         ether = rxq.recv(2)
 
     if ether is None:
-        raise RuntimeError("ICMP echo Rx timeout")
+        raise RuntimeError(u"ICMP echo Rx timeout")
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
-        print("MAC addresses match.")
+        print(u"MAC addresses match.")
     else:
-        raise RuntimeError(
-            "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+        raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
 
     ip = ether.payload
 
-    if ot_mode == '6to4':
+    if ot_mode == u"6to4":
         if not isinstance(ip, IP):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
-    elif ot_mode == '4to6':
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
+    elif ot_mode == u"4to6":
         if not isinstance(ip, IPv6):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
     elif not isinstance(ip, ip_format):
-            raise RuntimeError(
-                "Not an IP packet received {0}".format(ip.__repr__()))
+            raise RuntimeError(f"Not an IP packet received {ip!r}")
 
     lisp = ether.getlayer(LispGPEHeader).underlayer
     if not lisp:
-        raise RuntimeError("Lisp layer not present or parsing failed.")
+        raise RuntimeError(u"Lisp layer not present or parsing failed.")
 
     # Compare data from packets
     if src_ip == lisp.src:
-        print("Source IP matches source EID.")
+        print(u"Source IP matches source EID.")
     else:
-        raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
-                           .format(src_ip, lisp.src))
+        raise RuntimeError(
+            f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
+        )
 
     if dst_ip == lisp.dst:
-        print("Destination IP matches destination EID.")
+        print(u"Destination IP matches destination EID.")
     else:
-        raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
-                           .format(dst_ip, lisp.dst))
+        raise RuntimeError(
+            f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
+        )
 
     if src_rloc == ip.src:
-        print("Source RLOC matches configuration.")
+        print(u"Source RLOC matches configuration.")
     else:
-        raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
-                           .format(src_rloc, ip.src))
+        raise RuntimeError(
+            f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
+        )
 
     if dst_rloc == ip.dst:
-        print("Destination RLOC matches configuration.")
+        print(u"Destination RLOC matches configuration.")
     else:
-        raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
-                           .format(dst_rloc, ip.dst))
+        raise RuntimeError(
+            f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
+        )
 
     sys.exit(0)
 
 
-if __name__ == "__main__":
+if __name__ == u"__main__":
     main()