3 # Copyright (c) 2020 Cisco and/or its affiliates.
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later; you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
12 # http://www.apache.org/licenses/LICENSE-2.0
13 # https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+. If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
25 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
26 a LISPGPE-encapsulated packet on the other interface and verifies received
33 from scapy.all import bind_layers, Packet
34 from scapy.fields import FlagsField, BitField, XBitField, IntField
35 from scapy.layers.inet import ICMP, IP, UDP
36 from scapy.layers.inet6 import ICMPv6EchoRequest
37 from scapy.layers.inet6 import IPv6
38 from scapy.layers.l2 import Ether
39 from scapy.packet import Raw
41 from ..PacketVerifier import RxQueue, TxQueue
42 from ..TrafficScriptArg import TrafficScriptArg
45 class LispGPEHeader(Packet):
46 """Scapy header for the Lisp GPE Layer."""
48 name = "Lisp GPE Header"
51 u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
53 BitField(u"version", 0, size=2),
54 BitField(u"reserved", 0, size=14),
55 XBitField(u"next_protocol", 0, size=8),
56 IntField(u"instance_id/locator_status_bits", 0)
59 def guess_payload_class(self, payload):
62 0x2: LispGPEInnerIPv6,
63 0x3: LispGPEInnerEther,
66 return protocol[self.next_protocol]
69 class LispGPEInnerIP(IP):
70 """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
72 name = u"Lisp GPE Inner Layer - IPv4"
75 class LispGPEInnerIPv6(IPv6):
76 """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
78 name = u"Lisp GPE Inner Layer - IPv6"
81 class LispGPEInnerEther(Ether):
82 """Scapy inner LISP GPE layer for Lisp-L2."""
84 name = u"Lisp GPE Inner Layer - Ethernet"
87 class LispGPEInnerNSH(Packet):
88 """Scapy inner LISP GPE layer for Lisp-NSH.
90 Parsing not implemented.
96 ipaddress.IPv4Address(ip)
98 except (AttributeError, ipaddress.AddressValueError):
104 ipaddress.IPv6Address(ip)
106 except (AttributeError, ipaddress.AddressValueError):
111 """Send IP ICMP packet from one traffic generator interface to the other.
113 :raises RuntimeError: If the received packet is not correct."""
115 args = TrafficScriptArg(
117 u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
118 u"dut_if2_mac", u"src_rloc", u"dst_rloc"
123 tx_src_mac = args.get_arg(u"tg_src_mac")
124 tx_dst_mac = args.get_arg(u"dut_if1_mac")
125 rx_dst_mac = args.get_arg(u"tg_dst_mac")
126 rx_src_mac = args.get_arg(u"dut_if2_mac")
127 src_ip = args.get_arg(u"src_ip")
128 dst_ip = args.get_arg(u"dst_ip")
129 src_rloc = args.get_arg(u"src_rloc")
130 dst_rloc = args.get_arg(u"dst_rloc")
131 tx_if = args.get_arg(u"tx_if")
132 rx_if = args.get_arg(u"rx_if")
133 ot_mode = args.get_arg(u"ot_mode")
138 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
140 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
141 pkt_raw /= IP(src=src_ip, dst=dst_ip)
144 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
145 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
146 pkt_raw /= ICMPv6EchoRequest()
149 raise ValueError(u"IP not in correct format")
151 bind_layers(UDP, LispGPEHeader, dport=4341)
154 sent_packets = list()
155 sent_packets.append(pkt_raw)
159 ether = rxq.recv(2, ignore=sent_packets)
164 raise RuntimeError(u"ICMP echo Rx timeout")
166 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
167 print(u"MAC addresses match.")
169 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
173 if ot_mode == u"6to4":
174 if not isinstance(ip, IP):
175 raise RuntimeError(f"Not an IP packet received {ip!r}")
176 elif ot_mode == u"4to6":
177 if not isinstance(ip, IPv6):
178 raise RuntimeError(f"Not an IP packet received {ip!r}")
179 elif not isinstance(ip, ip_format):
180 raise RuntimeError(f"Not an IP packet received {ip!r}")
182 lisp = ether.getlayer(LispGPEHeader).underlayer
184 raise RuntimeError(u"Lisp layer not present or parsing failed.")
186 # Compare data from packets
187 if src_ip == lisp.src:
188 print(u"Source IP matches source EID.")
191 f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
194 if dst_ip == lisp.dst:
195 print(u"Destination IP matches destination EID.")
198 f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
201 if src_rloc == ip.src:
202 print(u"Source RLOC matches configuration.")
205 f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
208 if dst_rloc == ip.dst:
209 print(u"Destination RLOC matches configuration.")
212 f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
218 if __name__ == u"__main__":