2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISPGPE-encapsulated packet on the other interface and verifies received
23 from scapy.layers.inet import ICMP, IP, UDP
24 from scapy.layers.inet6 import ICMPv6EchoRequest
25 from scapy.layers.inet6 import IPv6
26 from scapy.layers.l2 import Ether
27 from scapy.all import bind_layers, Packet
28 from scapy.fields import FlagsField, BitField, XBitField, IntField
30 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
31 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
34 class LispGPEHeader(Packet):
35 """Scapy header for the Lisp GPE Layer."""
36 name = "Lisp GPE Header"
38 FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
39 BitField("version", 0, size=2),
40 BitField("reserved", 0, size=14),
41 XBitField("next_protocol", 0, size=8),
42 IntField("instance_id/locator_status_bits", 0)]
44 def guess_payload_class(self, payload):
47 0x2: LispGPEInnerIPv6,
48 0x3: LispGPEInnerEther,
51 return protocol[self.next_protocol]
54 class LispGPEInnerIP(IP):
55 """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
56 name = "Lisp GPE Inner Layer - IPv4"
59 class LispGPEInnerIPv6(IPv6):
60 """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
61 name = "Lisp GPE Inner Layer - IPv6"
64 class LispGPEInnerEther(Ether):
65 """Scapy inner LISP GPE layer for Lisp-L2."""
66 name = "Lisp GPE Inner Layer - Ethernet"
69 class LispGPEInnerNSH(Packet):
70 """Scapy inner LISP GPE layer for Lisp-NSH.
72 Parsing not implemented.
78 ipaddress.IPv4Address(unicode(ip))
80 except (AttributeError, ipaddress.AddressValueError):
86 ipaddress.IPv6Address(unicode(ip))
88 except (AttributeError, ipaddress.AddressValueError):
93 """Send IP ICMP packet from one traffic generator interface to the other.
95 :raises RuntimeError: If the received packet is not correct."""
97 args = TrafficScriptArg(
98 ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
99 'dut_if2_mac', 'src_rloc', 'dst_rloc'],
102 tx_src_mac = args.get_arg('tg_src_mac')
103 tx_dst_mac = args.get_arg('dut_if1_mac')
104 rx_dst_mac = args.get_arg('tg_dst_mac')
105 rx_src_mac = args.get_arg('dut_if2_mac')
106 src_ip = args.get_arg('src_ip')
107 dst_ip = args.get_arg('dst_ip')
108 src_rloc = args.get_arg("src_rloc")
109 dst_rloc = args.get_arg("dst_rloc")
110 tx_if = args.get_arg('tx_if')
111 rx_if = args.get_arg('rx_if')
112 ot_mode = args.get_arg('ot_mode')
117 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
119 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
120 pkt_raw /= IP(src=src_ip, dst=dst_ip)
123 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
124 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
125 pkt_raw /= ICMPv6EchoRequest()
128 raise ValueError("IP not in correct format")
130 bind_layers(UDP, LispGPEHeader, dport=4341)
132 sent_packets.append(pkt_raw)
136 ether = rxq.recv(2, ignore=sent_packets)
141 raise RuntimeError("ICMP echo Rx timeout")
143 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
144 print("MAC addresses match.")
147 "Matching packet unsuccessful: {0}".format(ether.__repr__()))
151 if ot_mode == '6to4':
152 if not isinstance(ip, IP):
154 "Not an IP packet received {0}".format(ip.__repr__()))
155 elif ot_mode == '4to6':
156 if not isinstance(ip, IPv6):
158 "Not an IP packet received {0}".format(ip.__repr__()))
159 elif not isinstance(ip, ip_format):
161 "Not an IP packet received {0}".format(ip.__repr__()))
163 lisp = ether.getlayer(LispGPEHeader).underlayer
165 raise RuntimeError("Lisp layer not present or parsing failed.")
167 # Compare data from packets
168 if src_ip == lisp.src:
169 print("Source IP matches source EID.")
171 raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
172 .format(src_ip, lisp.src))
174 if dst_ip == lisp.dst:
175 print("Destination IP matches destination EID.")
177 raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
178 .format(dst_ip, lisp.dst))
180 if src_rloc == ip.src:
181 print("Source RLOC matches configuration.")
183 raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
184 .format(src_rloc, ip.src))
186 if dst_rloc == ip.dst:
187 print("Destination RLOC matches configuration.")
189 raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
190 .format(dst_rloc, ip.dst))
195 if __name__ == "__main__":