3 # Copyright (c) 2021 Cisco and/or its affiliates.
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later; you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
12 # http://www.apache.org/licenses/LICENSE-2.0
13 # https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+. If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
26 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
27 a LISP-encapsulated packet on the other interface and verifies received packet.
33 from scapy.all import bind_layers, Packet
34 from scapy.fields import FlagsField, BitField, IntField
35 from scapy.layers.inet import ICMP, IP, UDP
36 from scapy.layers.inet6 import ICMPv6EchoRequest
37 from scapy.layers.inet6 import IPv6
38 from scapy.layers.l2 import Ether
39 from scapy.packet import Raw
41 from ..PacketVerifier import RxQueue, TxQueue
42 from ..TrafficScriptArg import TrafficScriptArg
45 class LispHeader(Packet):
46 """Scapy header for the LISP Layer."""
51 u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
53 BitField(u"nonce/map_version", 0, size=24),
54 IntField(u"instance_id/locator_status_bits", 0)]
57 class LispInnerIP(IP):
58 """Scapy inner LISP layer for IPv4-in-IPv4."""
60 name = u"Lisp Inner Layer - IPv4"
63 class LispInnerIPv6(IPv6):
64 """Scapy inner LISP layer for IPv6-in-IPv6."""
66 name = u"Lisp Inner Layer - IPv6"
71 ipaddress.IPv4Address(ip)
73 except (AttributeError, ipaddress.AddressValueError):
79 ipaddress.IPv6Address(ip)
81 except (AttributeError, ipaddress.AddressValueError):
86 """Send IP ICMP packet from one traffic generator interface to the other.
88 :raises RuntimeError: If the received packet is not correct."""
90 args = TrafficScriptArg(
92 u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
93 u"dut_if2_mac", u"src_rloc", u"dst_rloc"
98 tx_src_mac = args.get_arg(u"tg_src_mac")
99 tx_dst_mac = args.get_arg(u"dut_if1_mac")
100 rx_dst_mac = args.get_arg(u"tg_dst_mac")
101 rx_src_mac = args.get_arg(u"dut_if2_mac")
102 src_ip = args.get_arg(u"src_ip")
103 dst_ip = args.get_arg(u"dst_ip")
104 src_rloc = args.get_arg(u"src_rloc")
105 dst_rloc = args.get_arg(u"dst_rloc")
106 tx_if = args.get_arg(u"tx_if")
107 rx_if = args.get_arg(u"rx_if")
108 ot_mode = args.get_arg(u"ot_mode")
113 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
115 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
116 pkt_raw /= IP(src=src_ip, dst=dst_ip)
119 lisp_layer = LispInnerIP
120 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
121 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
122 pkt_raw /= ICMPv6EchoRequest()
124 lisp_layer = LispInnerIPv6
126 raise ValueError(u"IP not in correct format")
128 bind_layers(UDP, LispHeader, dport=4341)
129 bind_layers(LispHeader, lisp_layer)
132 sent_packets = list()
133 sent_packets.append(pkt_raw)
137 ether = rxq.recv(2, ignore=sent_packets)
142 raise RuntimeError(u"ICMP echo Rx timeout")
144 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
145 print(u"MAC addresses match.")
147 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
151 if ot_mode == u"6to4":
152 if not isinstance(ip, IP):
153 raise RuntimeError(f"Not an IP packet received {ip!r}")
154 elif ot_mode == u"4to6":
155 if not isinstance(ip, IPv6):
156 raise RuntimeError(f"Not an IP packet received {ip!r}")
157 elif not isinstance(ip, ip_format):
158 raise RuntimeError(f"Not an IP packet received {ip!r}")
160 lisp = ether.getlayer(lisp_layer)
162 raise RuntimeError("Lisp layer not present or parsing failed.")
164 # Compare data from packets
165 if src_ip == lisp.src:
166 print(u"Source IP matches source EID.")
169 f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
172 if dst_ip == lisp.dst:
173 print(u"Destination IP matches destination EID.")
176 f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
179 if src_rloc == ip.src:
180 print(u"Source RLOC matches configuration.")
183 f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
186 if dst_rloc == ip.dst:
187 print(u"Destination RLOC matches configuration.")
190 f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
196 if __name__ == u"__main__":