3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
17 a LISPGPE-encapsulated packet on the other interface and verifies received
24 from scapy.all import bind_layers, Packet
25 from scapy.fields import FlagsField, BitField, XBitField, IntField
26 from scapy.layers.inet import ICMP, IP, UDP
27 from scapy.layers.inet6 import ICMPv6EchoRequest
28 from scapy.layers.inet6 import IPv6
29 from scapy.layers.l2 import Ether
30 from scapy.packet import Raw
32 from ..PacketVerifier import RxQueue, TxQueue
33 from ..TrafficScriptArg import TrafficScriptArg
36 class LispGPEHeader(Packet):
37 """Scapy header for the Lisp GPE Layer."""
39 name = "Lisp GPE Header"
42 u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
44 BitField(u"version", 0, size=2),
45 BitField(u"reserved", 0, size=14),
46 XBitField(u"next_protocol", 0, size=8),
47 IntField(u"instance_id/locator_status_bits", 0)
50 def guess_payload_class(self, payload):
53 0x2: LispGPEInnerIPv6,
54 0x3: LispGPEInnerEther,
57 return protocol[self.next_protocol]
60 class LispGPEInnerIP(IP):
61 """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
63 name = u"Lisp GPE Inner Layer - IPv4"
66 class LispGPEInnerIPv6(IPv6):
67 """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
69 name = u"Lisp GPE Inner Layer - IPv6"
72 class LispGPEInnerEther(Ether):
73 """Scapy inner LISP GPE layer for Lisp-L2."""
75 name = u"Lisp GPE Inner Layer - Ethernet"
78 class LispGPEInnerNSH(Packet):
79 """Scapy inner LISP GPE layer for Lisp-NSH.
81 Parsing not implemented.
87 ipaddress.IPv4Address(ip)
89 except (AttributeError, ipaddress.AddressValueError):
95 ipaddress.IPv6Address(ip)
97 except (AttributeError, ipaddress.AddressValueError):
102 """Send IP ICMP packet from one traffic generator interface to the other.
104 :raises RuntimeError: If the received packet is not correct."""
106 args = TrafficScriptArg(
108 u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
109 u"dut_if2_mac", u"src_rloc", u"dst_rloc"
114 tx_src_mac = args.get_arg(u"tg_src_mac")
115 tx_dst_mac = args.get_arg(u"dut_if1_mac")
116 rx_dst_mac = args.get_arg(u"tg_dst_mac")
117 rx_src_mac = args.get_arg(u"dut_if2_mac")
118 src_ip = args.get_arg(u"src_ip")
119 dst_ip = args.get_arg(u"dst_ip")
120 src_rloc = args.get_arg(u"src_rloc")
121 dst_rloc = args.get_arg(u"dst_rloc")
122 tx_if = args.get_arg(u"tx_if")
123 rx_if = args.get_arg(u"rx_if")
124 ot_mode = args.get_arg(u"ot_mode")
129 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
131 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
132 pkt_raw /= IP(src=src_ip, dst=dst_ip)
135 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
136 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
137 pkt_raw /= ICMPv6EchoRequest()
140 raise ValueError(u"IP not in correct format")
142 bind_layers(UDP, LispGPEHeader, dport=4341)
145 sent_packets = list()
146 sent_packets.append(pkt_raw)
150 ether = rxq.recv(2, ignore=sent_packets)
155 raise RuntimeError(u"ICMP echo Rx timeout")
157 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
158 print(u"MAC addresses match.")
160 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
164 if ot_mode == u"6to4":
165 if not isinstance(ip, IP):
166 raise RuntimeError(f"Not an IP packet received {ip!r}")
167 elif ot_mode == u"4to6":
168 if not isinstance(ip, IPv6):
169 raise RuntimeError(f"Not an IP packet received {ip!r}")
170 elif not isinstance(ip, ip_format):
171 raise RuntimeError(f"Not an IP packet received {ip!r}")
173 lisp = ether.getlayer(LispGPEHeader).underlayer
175 raise RuntimeError(u"Lisp layer not present or parsing failed.")
177 # Compare data from packets
178 if src_ip == lisp.src:
179 print(u"Source IP matches source EID.")
182 f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
185 if dst_ip == lisp.dst:
186 print(u"Destination IP matches destination EID.")
189 f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
192 if src_rloc == ip.src:
193 print(u"Source RLOC matches configuration.")
196 f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
199 if dst_rloc == ip.dst:
200 print(u"Destination RLOC matches configuration.")
203 f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
209 if __name__ == u"__main__":