-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
import sys
import ipaddress
+from scapy.all import bind_layers, Packet
+from scapy.fields import FlagsField, BitField, IntField
from scapy.layers.inet import ICMP, IP, UDP
from scapy.layers.inet6 import ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
-from scapy.all import bind_layers, Packet
-from scapy.fields import FlagsField, BitField, IntField
+from scapy.packet import Raw
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
class LispHeader(Packet):
"""Scapy header for the LISP Layer."""
- name = "Lisp Header"
+
+ name = u"Lisp Header"
fields_desc = [
- FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]),
- BitField("nonce/map_version", 0, size=24),
- IntField("instance_id/locator_status_bits", 0)]
+ FlagsField(
+ u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
+ ),
+ BitField(u"nonce/map_version", 0, size=24),
+ IntField(u"instance_id/locator_status_bits", 0)]
class LispInnerIP(IP):
"""Scapy inner LISP layer for IPv4-in-IPv4."""
- name = "Lisp Inner Layer - IPv4"
+
+ name = u"Lisp Inner Layer - IPv4"
class LispInnerIPv6(IPv6):
"""Scapy inner LISP layer for IPv6-in-IPv6."""
- name = "Lisp Inner Layer - IPv6"
+
+ name = u"Lisp Inner Layer - IPv6"
def valid_ipv4(ip):
try:
- ipaddress.IPv4Address(unicode(ip))
+ ipaddress.IPv4Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
def valid_ipv6(ip):
try:
- ipaddress.IPv6Address(unicode(ip))
+ ipaddress.IPv6Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
:raises RuntimeError: If the received packet is not correct."""
args = TrafficScriptArg(
- ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
- 'dut_if2_mac', 'src_rloc', 'dst_rloc'])
-
- tx_src_mac = args.get_arg('tg_src_mac')
- tx_dst_mac = args.get_arg('dut_if1_mac')
- rx_dst_mac = args.get_arg('tg_dst_mac')
- rx_src_mac = args.get_arg('dut_if2_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- src_rloc = args.get_arg("src_rloc")
- dst_rloc = args.get_arg("dst_rloc")
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
+ [
+ u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+ u"dut_if2_mac", u"src_rloc", u"dst_rloc"
+ ],
+ [u"ot_mode"]
+ )
+
+ tx_src_mac = args.get_arg(u"tg_src_mac")
+ tx_dst_mac = args.get_arg(u"dut_if1_mac")
+ rx_dst_mac = args.get_arg(u"tg_dst_mac")
+ rx_src_mac = args.get_arg(u"dut_if2_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ src_rloc = args.get_arg(u"src_rloc")
+ dst_rloc = args.get_arg(u"dst_rloc")
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+ ot_mode = args.get_arg(u"ot_mode")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
- sent_packets = []
+
pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_format = IPv6
lisp_layer = LispInnerIPv6
else:
- raise ValueError("IP not in correct format")
+ raise ValueError(u"IP not in correct format")
bind_layers(UDP, LispHeader, dport=4341)
bind_layers(LispHeader, lisp_layer)
+
+ pkt_raw /= Raw()
+ sent_packets = list()
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
ether = rxq.recv(2)
if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ raise RuntimeError(u"ICMP echo Rx timeout")
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
- print("MAC addresses match.")
+ print(u"MAC addresses match.")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
ip = ether.payload
- if not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ if ot_mode == u"6to4":
+ if not isinstance(ip, IP):
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
+ elif ot_mode == u"4to6":
+ if not isinstance(ip, IPv6):
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
+ elif not isinstance(ip, ip_format):
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
lisp = ether.getlayer(lisp_layer)
if not lisp:
# Compare data from packets
if src_ip == lisp.src:
- print("Source IP matches source EID.")
+ print(u"Source IP matches source EID.")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, lisp.src))
+ raise RuntimeError(
+ f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
+ )
if dst_ip == lisp.dst:
- print("Destination IP matches destination EID.")
+ print(u"Destination IP matches destination EID.")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, lisp.dst))
+ raise RuntimeError(
+ f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
+ )
if src_rloc == ip.src:
- print("Source RLOC matches configuration.")
+ print(u"Source RLOC matches configuration.")
else:
- raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
- .format(src_rloc, ip.src))
+ raise RuntimeError(
+ f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
+ )
if dst_rloc == ip.dst:
- print("Destination RLOC matches configuration.")
+ print(u"Destination RLOC matches configuration.")
else:
- raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
- .format(dst_rloc, ip.dst))
+ raise RuntimeError(
+ f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
+ )
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()