Python3: resources and libraries
[csit.git] / resources / traffic_scripts / send_ip_check_headers.py
index 816c005..f5a5555 100755 (executable)
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -12,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
+"""Traffic script that sends an IP IPv4/IPv6 packet from one interface
 to the other. Source and destination IP addresses and source and destination
 MAC addresses are checked in received packet.
 """
 to the other. Source and destination IP addresses and source and destination
 MAC addresses are checked in received packet.
 """
@@ -20,10 +21,12 @@ MAC addresses are checked in received packet.
 import sys
 
 import ipaddress
 import sys
 
 import ipaddress
+
 from robot.api import logger
 from scapy.layers.inet import IP
 from robot.api import logger
 from scapy.layers.inet import IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Raw
 
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
 
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -31,7 +34,7 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
 
 def valid_ipv4(ip):
     try:
 
 def valid_ipv4(ip):
     try:
-        ipaddress.IPv4Address(unicode(ip))
+        ipaddress.IPv4Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
@@ -39,47 +42,54 @@ def valid_ipv4(ip):
 
 def valid_ipv6(ip):
     try:
 
 def valid_ipv6(ip):
     try:
-        ipaddress.IPv6Address(unicode(ip))
+        ipaddress.IPv6Address(ip)
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
 
 
 def main():
         return True
     except (AttributeError, ipaddress.AddressValueError):
         return False
 
 
 def main():
-    """Send IP ICMP packet from one traffic generator interface to the other."""
+    """Send IP/IPv6 packet from one traffic generator interface to the other."""
     args = TrafficScriptArg(
     args = TrafficScriptArg(
-        ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
-         'dut_if2_mac'],
-        ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
-         'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
-
-    tx_src_mac = args.get_arg('tg_src_mac')
-    tx_dst_mac = args.get_arg('dut_if1_mac')
-    rx_dst_mac = args.get_arg('tg_dst_mac')
-    rx_src_mac = args.get_arg('dut_if2_mac')
-    src_ip = args.get_arg('src_ip')
-    dst_ip = args.get_arg('dst_ip')
-    tx_if = args.get_arg('tx_if')
-    rx_if = args.get_arg('rx_if')
-
-    encaps_tx = args.get_arg('encaps_tx')
-    vlan_tx = args.get_arg('vlan_tx')
-    vlan_outer_tx = args.get_arg('vlan_outer_tx')
-    encaps_rx = args.get_arg('encaps_rx')
-    vlan_rx = args.get_arg('vlan_rx')
-    vlan_outer_rx = args.get_arg('vlan_outer_rx')
+        [
+            u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+            u"dut_if2_mac"
+        ],
+        [
+            u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
+            u"vlan_rx", u"vlan_outer_rx"
+        ]
+    )
+
+    tx_src_mac = args.get_arg(u"tg_src_mac")
+    tx_dst_mac = args.get_arg(u"dut_if1_mac")
+    rx_dst_mac = args.get_arg(u"tg_dst_mac")
+    rx_src_mac = args.get_arg(u"dut_if2_mac")
+    src_ip = args.get_arg(u"src_ip")
+    dst_ip = args.get_arg(u"dst_ip")
+    tx_if = args.get_arg(u"tx_if")
+    rx_if = args.get_arg(u"rx_if")
+
+    encaps_tx = args.get_arg(u"encaps_tx")
+    vlan_tx = args.get_arg(u"vlan_tx")
+    vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
+    encaps_rx = args.get_arg(u"encaps_rx")
+    vlan_rx = args.get_arg(u"vlan_rx")
+    vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
 
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
 
     rxq = RxQueue(rx_if)
     txq = TxQueue(tx_if)
-    sent_packets = []
-    ip_format = ''
+
+    sent_packets =list()
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
-    if encaps_tx == 'Dot1q':
+
+    if encaps_tx == u"Dot1q":
         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
-    elif encaps_tx == 'Dot1ad':
+    elif encaps_tx == u"Dot1ad":
         pkt_raw.type = 0x88a8
         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
         pkt_raw /= Dot1Q(vlan=vlan_tx)
         pkt_raw.type = 0x88a8
         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
         pkt_raw /= Dot1Q(vlan=vlan_tx)
+
     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
         ip_format = IP
     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
         ip_format = IP
@@ -87,8 +97,9 @@ def main():
         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
         ip_format = IPv6
     else:
         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
         ip_format = IPv6
     else:
-        raise ValueError("IP not in correct format")
+        raise ValueError(u"IP not in correct format")
 
 
+    pkt_raw /= Raw()
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
@@ -97,8 +108,9 @@ def main():
             ether = rxq.recv(2, ignore=sent_packets)
         else:
             ether = rxq.recv(2)
             ether = rxq.recv(2, ignore=sent_packets)
         else:
             ether = rxq.recv(2)
+
         if ether is None:
         if ether is None:
-            raise RuntimeError('ICMP echo Rx timeout')
+            raise RuntimeError(u"IP packet Rx timeout")
 
         if ether.haslayer(ICMPv6ND_NS):
             # read another packet in the queue if the current one is ICMPv6ND_NS
 
         if ether.haslayer(ICMPv6ND_NS):
             # read another packet in the queue if the current one is ICMPv6ND_NS
@@ -108,44 +120,45 @@ def main():
             break
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
             break
 
     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
-        logger.trace("MAC matched")
+        logger.trace(u"MAC matched")
     else:
     else:
-        raise RuntimeError("Matching packet unsuccessful: {0}".
-                           format(ether.__repr__()))
+        raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
 
 
-    if encaps_rx == 'Dot1q':
+    if encaps_rx == u"Dot1q":
         if ether[Dot1Q].vlan == int(vlan_rx):
         if ether[Dot1Q].vlan == int(vlan_rx):
-            logger.trace("VLAN matched")
+            logger.trace(u"VLAN matched")
         else:
         else:
-            raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
-                               'received, {}-expected):\n{}'.
-                               format(ether[Dot1Q].vlan, vlan_rx,
-                                      ether.__repr__()))
+            raise RuntimeError(
+                f"Ethernet frame with wrong VLAN tag "
+                f"({ether[Dot1Q].vlan}-received, "
+                f"{vlan_rx}-expected):\n{ether!r}"
+            )
         ip = ether[Dot1Q].payload
         ip = ether[Dot1Q].payload
-    elif encaps_rx == 'Dot1ad':
+    elif encaps_rx == u"Dot1ad":
         raise NotImplementedError()
     else:
         ip = ether.payload
 
     if not isinstance(ip, ip_format):
         raise NotImplementedError()
     else:
         ip = ether.payload
 
     if not isinstance(ip, ip_format):
-        raise RuntimeError("Not an IP packet received {0}".
-                           format(ip.__repr__()))
+        raise RuntimeError(f"Not an IP packet received {ip!r}")
 
     # Compare data from packets
     if src_ip == ip.src:
 
     # Compare data from packets
     if src_ip == ip.src:
-        logger.trace("Src IP matched")
+        logger.trace(u"Src IP matched")
     else:
     else:
-        raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
-                           format(src_ip, ip.src))
+        raise RuntimeError(
+            f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
+        )
 
     if dst_ip == ip.dst:
 
     if dst_ip == ip.dst:
-        logger.trace("Dst IP matched")
+        logger.trace(u"Dst IP matched")
     else:
     else:
-        raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
-                           format(dst_ip, ip.dst))
+        raise RuntimeError(
+            f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
+        )
 
     sys.exit(0)
 
 
 
     sys.exit(0)
 
 
-if __name__ == "__main__":
+if __name__ == u"__main__":
     main()
     main()