X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Flisp%2Flispgpe_check.py;h=50857c4fe860dc722c971ba292e4ac87531a0969;hp=d4de8635d70f93127c1b7c83b2c3dde7ca37affe;hb=d68951ac245150eeefa6e0f4156e4c1b5c9e9325;hpb=ed0258a440cfad7023d643f717ab78ac568dc59b diff --git a/resources/traffic_scripts/lisp/lispgpe_check.py b/resources/traffic_scripts/lisp/lispgpe_check.py index d4de8635d7..50857c4fe8 100755 --- a/resources/traffic_scripts/lisp/lispgpe_check.py +++ b/resources/traffic_scripts/lisp/lispgpe_check.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python -# Copyright (c) 2017 Cisco and/or its affiliates. +#!/usr/bin/env python3 + +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -20,12 +21,13 @@ packet. import sys import ipaddress +from scapy.all import bind_layers, Packet +from scapy.fields import FlagsField, BitField, XBitField, IntField from scapy.layers.inet import ICMP, IP, UDP from scapy.layers.inet6 import ICMPv6EchoRequest from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether -from scapy.all import bind_layers, Packet -from scapy.fields import FlagsField, BitField, XBitField, IntField +from scapy.packet import Raw from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -33,13 +35,17 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg class LispGPEHeader(Packet): """Scapy header for the Lisp GPE Layer.""" + name = "Lisp GPE Header" fields_desc = [ - FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]), - BitField("version", 0, size=2), - BitField("reserved", 0, size=14), - XBitField("next_protocol", 0, size=8), - IntField("instance_id/locator_status_bits", 0)] + FlagsField( + u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"] + ), + BitField(u"version", 0, size=2), + BitField(u"reserved", 0, size=14), + XBitField(u"next_protocol", 0, size=8), + IntField(u"instance_id/locator_status_bits", 0) + ] def guess_payload_class(self, payload): protocol = { @@ -53,17 +59,20 @@ class LispGPEHeader(Packet): class LispGPEInnerIP(IP): """Scapy inner LISP GPE layer for IPv4-in-IPv4.""" - name = "Lisp GPE Inner Layer - IPv4" + + name = u"Lisp GPE Inner Layer - IPv4" class LispGPEInnerIPv6(IPv6): """Scapy inner LISP GPE layer for IPv6-in-IPv6.""" - name = "Lisp GPE Inner Layer - IPv6" + + name = u"Lisp GPE Inner Layer - IPv6" class LispGPEInnerEther(Ether): """Scapy inner LISP GPE layer for Lisp-L2.""" - name = "Lisp GPE Inner Layer - Ethernet" + + name = u"Lisp GPE Inner Layer - Ethernet" class LispGPEInnerNSH(Packet): @@ -75,7 +84,7 @@ class LispGPEInnerNSH(Packet): def valid_ipv4(ip): try: - ipaddress.IPv4Address(unicode(ip)) + ipaddress.IPv4Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -83,7 +92,7 @@ def valid_ipv4(ip): def valid_ipv6(ip): try: - ipaddress.IPv6Address(unicode(ip)) + ipaddress.IPv6Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -95,25 +104,28 @@ def main(): :raises RuntimeError: If the received packet is not correct.""" args = TrafficScriptArg( - ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'dut_if2_mac', 'src_rloc', 'dst_rloc'], - ['ot_mode']) - - tx_src_mac = args.get_arg('tg_src_mac') - tx_dst_mac = args.get_arg('dut_if1_mac') - rx_dst_mac = args.get_arg('tg_dst_mac') - rx_src_mac = args.get_arg('dut_if2_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - src_rloc = args.get_arg("src_rloc") - dst_rloc = args.get_arg("dst_rloc") - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - ot_mode = args.get_arg('ot_mode') + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac", u"src_rloc", u"dst_rloc" + ], + [u"ot_mode"] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_rloc = args.get_arg(u"src_rloc") + dst_rloc = args.get_arg(u"dst_rloc") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + ot_mode = args.get_arg(u"ot_mode") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) - sent_packets = [] + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) if valid_ipv4(src_ip) and valid_ipv4(dst_ip): @@ -125,10 +137,12 @@ def main(): pkt_raw /= ICMPv6EchoRequest() ip_format = IPv6 else: - raise ValueError("IP not in correct format") + raise ValueError(u"IP not in correct format") bind_layers(UDP, LispGPEHeader, dport=4341) + pkt_raw /= Raw() + sent_packets = list() sent_packets.append(pkt_raw) txq.send(pkt_raw) @@ -138,59 +152,59 @@ def main(): ether = rxq.recv(2) if ether is None: - raise RuntimeError("ICMP echo Rx timeout") + raise RuntimeError(u"ICMP echo Rx timeout") if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: - print("MAC addresses match.") + print(u"MAC addresses match.") else: - raise RuntimeError( - "Matching packet unsuccessful: {0}".format(ether.__repr__())) + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") ip = ether.payload - if ot_mode == '6to4': + if ot_mode == u"6to4": if not isinstance(ip, IP): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) - elif ot_mode == '4to6': + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif ot_mode == u"4to6": if not isinstance(ip, IPv6): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") elif not isinstance(ip, ip_format): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") lisp = ether.getlayer(LispGPEHeader).underlayer if not lisp: - raise RuntimeError("Lisp layer not present or parsing failed.") + raise RuntimeError(u"Lisp layer not present or parsing failed.") # Compare data from packets if src_ip == lisp.src: - print("Source IP matches source EID.") + print(u"Source IP matches source EID.") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}" - .format(src_ip, lisp.src)) + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}" + ) if dst_ip == lisp.dst: - print("Destination IP matches destination EID.") + print(u"Destination IP matches destination EID.") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}" - .format(dst_ip, lisp.dst)) + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}" + ) if src_rloc == ip.src: - print("Source RLOC matches configuration.") + print(u"Source RLOC matches configuration.") else: - raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}" - .format(src_rloc, ip.src)) + raise RuntimeError( + f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}" + ) if dst_rloc == ip.dst: - print("Destination RLOC matches configuration.") + print(u"Destination RLOC matches configuration.") else: - raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}" - .format(dst_rloc, ip.dst)) + raise RuntimeError( + f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}" + ) sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main()