3 # Copyright (c) 2019 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
17 a LISP-encapsulated packet on the other interface and verifies received packet.
23 from scapy.all import bind_layers, Packet
24 from scapy.fields import FlagsField, BitField, IntField
25 from scapy.layers.inet import ICMP, IP, UDP
26 from scapy.layers.inet6 import ICMPv6EchoRequest
27 from scapy.layers.inet6 import IPv6
28 from scapy.layers.l2 import Ether
29 from scapy.packet import Raw
31 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
32 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
35 class LispHeader(Packet):
36 """Scapy header for the LISP Layer."""
41 u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
43 BitField(u"nonce/map_version", 0, size=24),
44 IntField(u"instance_id/locator_status_bits", 0)]
47 class LispInnerIP(IP):
48 """Scapy inner LISP layer for IPv4-in-IPv4."""
50 name = u"Lisp Inner Layer - IPv4"
53 class LispInnerIPv6(IPv6):
54 """Scapy inner LISP layer for IPv6-in-IPv6."""
56 name = u"Lisp Inner Layer - IPv6"
61 ipaddress.IPv4Address(ip)
63 except (AttributeError, ipaddress.AddressValueError):
69 ipaddress.IPv6Address(ip)
71 except (AttributeError, ipaddress.AddressValueError):
76 """Send IP ICMP packet from one traffic generator interface to the other.
78 :raises RuntimeError: If the received packet is not correct."""
80 args = TrafficScriptArg(
82 u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
83 u"dut_if2_mac", u"src_rloc", u"dst_rloc"
88 tx_src_mac = args.get_arg(u"tg_src_mac")
89 tx_dst_mac = args.get_arg(u"dut_if1_mac")
90 rx_dst_mac = args.get_arg(u"tg_dst_mac")
91 rx_src_mac = args.get_arg(u"dut_if2_mac")
92 src_ip = args.get_arg(u"src_ip")
93 dst_ip = args.get_arg(u"dst_ip")
94 src_rloc = args.get_arg(u"src_rloc")
95 dst_rloc = args.get_arg(u"dst_rloc")
96 tx_if = args.get_arg(u"tx_if")
97 rx_if = args.get_arg(u"rx_if")
98 ot_mode = args.get_arg(u"ot_mode")
103 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
105 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
106 pkt_raw /= IP(src=src_ip, dst=dst_ip)
109 lisp_layer = LispInnerIP
110 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
111 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
112 pkt_raw /= ICMPv6EchoRequest()
114 lisp_layer = LispInnerIPv6
116 raise ValueError(u"IP not in correct format")
118 bind_layers(UDP, LispHeader, dport=4341)
119 bind_layers(LispHeader, lisp_layer)
122 sent_packets = list()
123 sent_packets.append(pkt_raw)
127 ether = rxq.recv(2, ignore=sent_packets)
132 raise RuntimeError(u"ICMP echo Rx timeout")
134 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
135 print(u"MAC addresses match.")
137 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
141 if ot_mode == u"6to4":
142 if not isinstance(ip, IP):
143 raise RuntimeError(f"Not an IP packet received {ip!r}")
144 elif ot_mode == u"4to6":
145 if not isinstance(ip, IPv6):
146 raise RuntimeError(f"Not an IP packet received {ip!r}")
147 elif not isinstance(ip, ip_format):
148 raise RuntimeError(f"Not an IP packet received {ip!r}")
150 lisp = ether.getlayer(lisp_layer)
152 raise RuntimeError("Lisp layer not present or parsing failed.")
154 # Compare data from packets
155 if src_ip == lisp.src:
156 print(u"Source IP matches source EID.")
159 f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
162 if dst_ip == lisp.dst:
163 print(u"Destination IP matches destination EID.")
166 f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
169 if src_rloc == ip.src:
170 print(u"Source RLOC matches configuration.")
173 f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
176 if dst_rloc == ip.dst:
177 print(u"Destination RLOC matches configuration.")
180 f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
186 if __name__ == u"__main__":