3 # Copyright (c) 2021 Cisco and/or its affiliates.
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later; you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
12 # http://www.apache.org/licenses/LICENSE-2.0
13 # https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+. If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
26 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
27 a LISPGPE-encapsulated packet on the other interface and verifies received
34 from scapy.all import bind_layers, Packet
35 from scapy.fields import FlagsField, BitField, XBitField, IntField
36 from scapy.layers.inet import ICMP, IP, UDP
37 from scapy.layers.inet6 import ICMPv6EchoRequest
38 from scapy.layers.inet6 import IPv6
39 from scapy.layers.l2 import Ether
40 from scapy.packet import Raw
42 from ..PacketVerifier import RxQueue, TxQueue
43 from ..TrafficScriptArg import TrafficScriptArg
46 class LispGPEHeader(Packet):
47 """Scapy header for the Lisp GPE Layer."""
49 name = "Lisp GPE Header"
52 u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
54 BitField(u"version", 0, size=2),
55 BitField(u"reserved", 0, size=14),
56 XBitField(u"next_protocol", 0, size=8),
57 IntField(u"instance_id/locator_status_bits", 0)
60 def guess_payload_class(self, payload):
63 0x2: LispGPEInnerIPv6,
64 0x3: LispGPEInnerEther,
67 return protocol[self.next_protocol]
70 class LispGPEInnerIP(IP):
71 """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
73 name = u"Lisp GPE Inner Layer - IPv4"
76 class LispGPEInnerIPv6(IPv6):
77 """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
79 name = u"Lisp GPE Inner Layer - IPv6"
82 class LispGPEInnerEther(Ether):
83 """Scapy inner LISP GPE layer for Lisp-L2."""
85 name = u"Lisp GPE Inner Layer - Ethernet"
88 class LispGPEInnerNSH(Packet):
89 """Scapy inner LISP GPE layer for Lisp-NSH.
91 Parsing not implemented.
97 ipaddress.IPv4Address(ip)
99 except (AttributeError, ipaddress.AddressValueError):
105 ipaddress.IPv6Address(ip)
107 except (AttributeError, ipaddress.AddressValueError):
112 """Send IP ICMP packet from one traffic generator interface to the other.
114 :raises RuntimeError: If the received packet is not correct."""
116 args = TrafficScriptArg(
118 u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
119 u"dut_if2_mac", u"src_rloc", u"dst_rloc"
124 tx_src_mac = args.get_arg(u"tg_src_mac")
125 tx_dst_mac = args.get_arg(u"dut_if1_mac")
126 rx_dst_mac = args.get_arg(u"tg_dst_mac")
127 rx_src_mac = args.get_arg(u"dut_if2_mac")
128 src_ip = args.get_arg(u"src_ip")
129 dst_ip = args.get_arg(u"dst_ip")
130 src_rloc = args.get_arg(u"src_rloc")
131 dst_rloc = args.get_arg(u"dst_rloc")
132 tx_if = args.get_arg(u"tx_if")
133 rx_if = args.get_arg(u"rx_if")
134 ot_mode = args.get_arg(u"ot_mode")
139 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
141 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
142 pkt_raw /= IP(src=src_ip, dst=dst_ip)
145 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
146 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
147 pkt_raw /= ICMPv6EchoRequest()
150 raise ValueError(u"IP not in correct format")
152 bind_layers(UDP, LispGPEHeader, dport=4341)
155 sent_packets = list()
156 sent_packets.append(pkt_raw)
160 ether = rxq.recv(2, ignore=sent_packets)
165 raise RuntimeError(u"ICMP echo Rx timeout")
167 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
168 print(u"MAC addresses match.")
170 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
174 if ot_mode == u"6to4":
175 if not isinstance(ip, IP):
176 raise RuntimeError(f"Not an IP packet received {ip!r}")
177 elif ot_mode == u"4to6":
178 if not isinstance(ip, IPv6):
179 raise RuntimeError(f"Not an IP packet received {ip!r}")
180 elif not isinstance(ip, ip_format):
181 raise RuntimeError(f"Not an IP packet received {ip!r}")
183 lisp = ether.getlayer(LispGPEHeader).underlayer
185 raise RuntimeError(u"Lisp layer not present or parsing failed.")
187 # Compare data from packets
188 if src_ip == lisp.src:
189 print(u"Source IP matches source EID.")
192 f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
195 if dst_ip == lisp.dst:
196 print(u"Destination IP matches destination EID.")
199 f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
202 if src_rloc == ip.src:
203 print(u"Source RLOC matches configuration.")
206 f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
209 if dst_rloc == ip.dst:
210 print(u"Destination RLOC matches configuration.")
213 f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
219 if __name__ == u"__main__":