2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISP-encapsulated packet on the other interface and verifies received packet.
22 from scapy.layers.inet import ICMP, IP, UDP
23 from scapy.layers.inet6 import ICMPv6EchoRequest
24 from scapy.layers.inet6 import IPv6
25 from scapy.layers.l2 import Ether
26 from scapy.all import bind_layers, Packet
27 from scapy.fields import FlagsField, BitField, IntField
29 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
30 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
33 class LispHeader(Packet):
34 """Scapy header for the LISP Layer."""
37 FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]),
38 BitField("nonce/map_version", 0, size=24),
39 IntField("instance_id/locator_status_bits", 0)]
42 class LispInnerIP(IP):
43 """Scapy inner LISP layer for IPv4-in-IPv4."""
44 name = "Lisp Inner Layer - IPv4"
47 class LispInnerIPv6(IPv6):
48 """Scapy inner LISP layer for IPv6-in-IPv6."""
49 name = "Lisp Inner Layer - IPv6"
54 ipaddress.IPv4Address(unicode(ip))
56 except (AttributeError, ipaddress.AddressValueError):
62 ipaddress.IPv6Address(unicode(ip))
64 except (AttributeError, ipaddress.AddressValueError):
69 """Send IP ICMP packet from one traffic generator interface to the other.
71 :raises RuntimeError: If the received packet is not correct."""
73 args = TrafficScriptArg(
74 ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
75 'dut_if2_mac', 'src_rloc', 'dst_rloc'])
77 tx_src_mac = args.get_arg('tg_src_mac')
78 tx_dst_mac = args.get_arg('dut_if1_mac')
79 rx_dst_mac = args.get_arg('tg_dst_mac')
80 rx_src_mac = args.get_arg('dut_if2_mac')
81 src_ip = args.get_arg('src_ip')
82 dst_ip = args.get_arg('dst_ip')
83 src_rloc = args.get_arg("src_rloc")
84 dst_rloc = args.get_arg("dst_rloc")
85 tx_if = args.get_arg('tx_if')
86 rx_if = args.get_arg('rx_if')
91 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
93 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
94 pkt_raw /= IP(src=src_ip, dst=dst_ip)
97 lisp_layer = LispInnerIP
98 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
99 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
100 pkt_raw /= ICMPv6EchoRequest()
102 lisp_layer = LispInnerIPv6
104 raise ValueError("IP not in correct format")
106 bind_layers(UDP, LispHeader, dport=4341)
107 bind_layers(LispHeader, lisp_layer)
108 sent_packets.append(pkt_raw)
112 ether = rxq.recv(2, ignore=sent_packets)
117 raise RuntimeError("ICMP echo Rx timeout")
119 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
120 print("MAC addresses match.")
123 "Matching packet unsuccessful: {0}".format(ether.__repr__()))
127 if not isinstance(ip, ip_format):
129 "Not an IP packet received {0}".format(ip.__repr__()))
131 lisp = ether.getlayer(lisp_layer)
133 raise RuntimeError("Lisp layer not present or parsing failed.")
135 # Compare data from packets
136 if src_ip == lisp.src:
137 print("Source IP matches source EID.")
139 raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
140 .format(src_ip, lisp.src))
142 if dst_ip == lisp.dst:
143 print("Destination IP matches destination EID.")
145 raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
146 .format(dst_ip, lisp.dst))
148 if src_rloc == ip.src:
149 print("Source RLOC matches configuration.")
151 raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
152 .format(src_rloc, ip.src))
154 if dst_rloc == ip.dst:
155 print("Destination RLOC matches configuration.")
157 raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
158 .format(dst_rloc, ip.dst))
163 if __name__ == "__main__":