2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISPGPE-encapsulated packet on the other interface and verifies received
23 from scapy.layers.inet import ICMP, IP, UDP
24 from scapy.layers.inet6 import ICMPv6EchoRequest
25 from scapy.layers.inet6 import IPv6
26 from scapy.layers.l2 import Ether
27 from scapy.all import bind_layers, Packet
28 from scapy.fields import FlagsField, BitField, XBitField, IntField
30 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
31 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
34 class LispGPEHeader(Packet):
35 """Scapy header for the Lisp GPE Layer."""
36 name = "Lisp GPE Header"
38 FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
39 BitField("version", 0, size=2),
40 BitField("reserved", 0, size=14),
41 XBitField("next_protocol", 0, size=8),
42 IntField("instance_id/locator_status_bits", 0)]
44 def guess_payload_class(self, payload):
47 0x2: LispGPEInnerIPv6,
48 0x3: LispGPEInnerEther,
51 return protocol[self.next_protocol]
54 class LispGPEInnerIP(IP):
55 """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
56 name = "Lisp GPE Inner Layer - IPv4"
59 class LispGPEInnerIPv6(IPv6):
60 """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
61 name = "Lisp GPE Inner Layer - IPv6"
64 class LispGPEInnerEther(Ether):
65 """Scapy inner LISP GPE layer for Lisp-L2."""
66 name = "Lisp GPE Inner Layer - Ethernet"
69 class LispGPEInnerNSH(Packet):
70 """Scapy inner LISP GPE layer for Lisp-NSH.
72 Parsing not implemented.
78 ipaddress.IPv4Address(unicode(ip))
80 except (AttributeError, ipaddress.AddressValueError):
86 ipaddress.IPv6Address(unicode(ip))
88 except (AttributeError, ipaddress.AddressValueError):
93 """Send IP ICMP packet from one traffic generator interface to the other.
95 :raises RuntimeError: If the received packet is not correct."""
97 args = TrafficScriptArg(
98 ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
99 'dut_if2_mac', 'src_rloc', 'dst_rloc'])
101 tx_src_mac = args.get_arg('tg_src_mac')
102 tx_dst_mac = args.get_arg('dut_if1_mac')
103 rx_dst_mac = args.get_arg('tg_dst_mac')
104 rx_src_mac = args.get_arg('dut_if2_mac')
105 src_ip = args.get_arg('src_ip')
106 dst_ip = args.get_arg('dst_ip')
107 src_rloc = args.get_arg("src_rloc")
108 dst_rloc = args.get_arg("dst_rloc")
109 tx_if = args.get_arg('tx_if')
110 rx_if = args.get_arg('rx_if')
115 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
117 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
118 pkt_raw /= IP(src=src_ip, dst=dst_ip)
121 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
122 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
123 pkt_raw /= ICMPv6EchoRequest()
126 raise ValueError("IP not in correct format")
128 bind_layers(UDP, LispGPEHeader, dport=4341)
130 sent_packets.append(pkt_raw)
134 ether = rxq.recv(2, ignore=sent_packets)
139 raise RuntimeError("ICMP echo Rx timeout")
141 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
142 print("MAC addresses match.")
145 "Matching packet unsuccessful: {0}".format(ether.__repr__()))
149 if not isinstance(ip, ip_format):
151 "Not an IP packet received {0}".format(ip.__repr__()))
153 lisp = ether.getlayer(LispGPEHeader).underlayer
155 raise RuntimeError("Lisp layer not present or parsing failed.")
157 # Compare data from packets
158 if src_ip == lisp.src:
159 print("Source IP matches source EID.")
161 raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
162 .format(src_ip, lisp.src))
164 if dst_ip == lisp.dst:
165 print("Destination IP matches destination EID.")
167 raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
168 .format(dst_ip, lisp.dst))
170 if src_rloc == ip.src:
171 print("Source RLOC matches configuration.")
173 raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
174 .format(src_rloc, ip.src))
176 if dst_rloc == ip.dst:
177 print("Destination RLOC matches configuration.")
179 raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
180 .format(dst_rloc, ip.dst))
185 if __name__ == "__main__":