2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an IP ICMPv4 or ICMPv6."""
20 from scapy.all import Ether
21 from scapy.layers.inet import ICMP, IP
22 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
28 def is_icmp_reply(pkt, ipformat):
29 """Return True if pkt is echo reply, else return False. If exception occurs
33 :param ipformat: Dictionary of names to distinguish IPv4 and IPv6.
38 # pylint: disable=bare-except
40 if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
45 except: # pylint: disable=bare-except
49 def address_check(request, reply, ipformat):
50 """Compare request packet source address with reply destination address
51 and vice versa. If exception occurs return False.
53 :param request: Sent packet containing request.
54 :param reply: Received packet containing reply.
55 :param ipformat: Dictionary of names to distinguish IPv4 and IPv6.
61 # pylint: disable=bare-except
63 r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
64 r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
65 return r_src and r_dst
66 except: # pylint: disable=bare-except
71 """Check if IP address has the correct IPv4 address format.
73 :param ip: IP address.
75 :return: True in case of correct IPv4 address format,
76 otherwise return False.
80 ipaddress.IPv4Address(unicode(ip))
82 except (AttributeError, ipaddress.AddressValueError):
87 """Check if IP address has the correct IPv6 address format.
89 :param ip: IP address.
91 :return: True in case of correct IPv6 address format,
92 otherwise return False.
96 ipaddress.IPv6Address(unicode(ip))
98 except (AttributeError, ipaddress.AddressValueError):
103 """Send ICMP echo request and wait for ICMP echo reply. It ignores all other
105 args = TrafficScriptArg(['dst_mac', 'src_mac', 'dst_ip', 'src_ip',
108 dst_mac = args.get_arg('dst_mac')
109 src_mac = args.get_arg('src_mac')
110 dst_ip = args.get_arg('dst_ip')
111 src_ip = args.get_arg('src_ip')
112 tx_if = args.get_arg('tx_if')
113 rx_if = args.get_arg('rx_if')
114 timeout = int(args.get_arg('timeout'))
121 # Create empty ip ICMP packet
122 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
123 icmp_request = (Ether(src=src_mac, dst=dst_mac) /
124 IP(src=src_ip, dst=dst_ip) /
126 ip_format = {'IPType': 'IP', 'ICMP_req': 'ICMP',
127 'ICMP_rep': 'ICMP', 'Type': 0}
128 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
129 icmp_request = (Ether(src=src_mac, dst=dst_mac) /
130 IPv6(src=src_ip, dst=dst_ip) /
132 ip_format = {'IPType': 'IPv6', 'ICMP_req': 'ICMPv6 Echo Request',
133 'ICMP_rep': 'ICMPv6 Echo Reply', 'Type': 129}
135 raise ValueError("IP not in correct format")
137 # Send created packet on the interface
138 sent_packets.append(icmp_request)
139 txq.send(icmp_request)
141 for _ in range(1000):
143 icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
144 if icmp_reply is None:
147 raise RuntimeError("ICMP echo Rx timeout")
149 elif icmp_reply.haslayer(ICMPv6ND_NS):
150 # read another packet in the queue in case of ICMPv6ND_NS packet
153 # otherwise process the current packet
156 if is_icmp_reply(icmp_reply, ip_format):
157 if address_check(icmp_request, icmp_reply, ip_format):
160 raise RuntimeError("Max packet count limit reached")
162 print "ICMP echo reply received."
167 if __name__ == "__main__":