2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISP-encapsulated packet on the other interface and verifies received packet.
22 from scapy.layers.inet import ICMP, IP, UDP
23 from scapy.layers.inet6 import ICMPv6EchoRequest
24 from scapy.layers.inet6 import IPv6
25 from scapy.layers.l2 import Ether
26 from scapy.all import bind_layers, Packet
27 from scapy.fields import FlagsField, BitField, IntField
29 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
30 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
33 class LispHeader(Packet):
34 """Scapy header for the LISP Layer."""
37 FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]),
38 BitField("nonce/map_version", 0, size=24),
39 IntField("instance_id/locator_status_bits", 0)]
42 class LispInnerIP(IP):
43 """Scapy inner LISP layer for IPv4-in-IPv4."""
44 name = "Lisp Inner Layer - IPv4"
47 class LispInnerIPv6(IPv6):
48 """Scapy inner LISP layer for IPv6-in-IPv6."""
49 name = "Lisp Inner Layer - IPv6"
54 ipaddress.IPv4Address(unicode(ip))
56 except (AttributeError, ipaddress.AddressValueError):
62 ipaddress.IPv6Address(unicode(ip))
64 except (AttributeError, ipaddress.AddressValueError):
69 """Send IP ICMP packet from one traffic generator interface to the other.
71 :raises RuntimeError: If the received packet is not correct."""
73 args = TrafficScriptArg(
74 ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
75 'dut_if2_mac', 'src_rloc', 'dst_rloc'],
78 tx_src_mac = args.get_arg('tg_src_mac')
79 tx_dst_mac = args.get_arg('dut_if1_mac')
80 rx_dst_mac = args.get_arg('tg_dst_mac')
81 rx_src_mac = args.get_arg('dut_if2_mac')
82 src_ip = args.get_arg('src_ip')
83 dst_ip = args.get_arg('dst_ip')
84 src_rloc = args.get_arg("src_rloc")
85 dst_rloc = args.get_arg("dst_rloc")
86 tx_if = args.get_arg('tx_if')
87 rx_if = args.get_arg('rx_if')
88 ot_mode = args.get_arg('ot_mode')
93 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
95 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
96 pkt_raw /= IP(src=src_ip, dst=dst_ip)
99 lisp_layer = LispInnerIP
100 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
101 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
102 pkt_raw /= ICMPv6EchoRequest()
104 lisp_layer = LispInnerIPv6
106 raise ValueError("IP not in correct format")
108 bind_layers(UDP, LispHeader, dport=4341)
109 bind_layers(LispHeader, lisp_layer)
110 sent_packets.append(pkt_raw)
114 ether = rxq.recv(2, ignore=sent_packets)
119 raise RuntimeError("ICMP echo Rx timeout")
121 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
122 print("MAC addresses match.")
125 "Matching packet unsuccessful: {0}".format(ether.__repr__()))
129 if ot_mode == '6to4':
130 if not isinstance(ip, IP):
132 "Not an IP packet received {0}".format(ip.__repr__()))
133 elif ot_mode == '4to6':
134 if not isinstance(ip, IP6):
136 "Not an IP packet received {0}".format(ip.__repr__()))
137 elif not isinstance(ip, ip_format):
139 "Not an IP packet received {0}".format(ip.__repr__()))
141 lisp = ether.getlayer(lisp_layer)
143 raise RuntimeError("Lisp layer not present or parsing failed.")
145 # Compare data from packets
146 if src_ip == lisp.src:
147 print("Source IP matches source EID.")
149 raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
150 .format(src_ip, lisp.src))
152 if dst_ip == lisp.dst:
153 print("Destination IP matches destination EID.")
155 raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
156 .format(dst_ip, lisp.dst))
158 if src_rloc == ip.src:
159 print("Source RLOC matches configuration.")
161 raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
162 .format(src_rloc, ip.src))
164 if dst_rloc == ip.dst:
165 print("Destination RLOC matches configuration.")
167 raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
168 .format(dst_rloc, ip.dst))
173 if __name__ == "__main__":