2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packets traffic
16 and check if it is divided into two paths."""
21 from scapy.all import Ether
22 from scapy.layers.inet import ICMP, IP
23 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
25 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
26 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
31 ipaddress.IPv4Address(unicode(ip))
33 except (AttributeError, ipaddress.AddressValueError):
39 ipaddress.IPv6Address(unicode(ip))
41 except (AttributeError, ipaddress.AddressValueError):
46 """Send 100 IP ICMP packets traffic and check if it is divided into
48 args = TrafficScriptArg(
49 ['src_ip', 'dst_ip', 'tg_if1_mac', 'dut_if1_mac', 'dut_if2_mac',
50 'path_1_mac', 'path_2_mac'])
52 src_ip = args.get_arg('src_ip')
53 dst_ip = args.get_arg('dst_ip')
54 tg_if1_mac = args.get_arg('tg_if1_mac')
55 dut_if1_mac = args.get_arg('dut_if1_mac')
56 dut_if2_mac = args.get_arg('dut_if2_mac')
57 path_1_mac = args.get_arg('path_1_mac')
58 path_2_mac = args.get_arg('path_2_mac')
59 tx_if = args.get_arg('tx_if')
60 rx_if = args.get_arg('rx_if')
71 if valid_ipv4(src_ip):
73 elif valid_ipv6(src_ip):
76 raise ValueError("Source address not in correct format")
78 src_ip_base = (src_ip.rsplit(separator, 1))[0] + separator
80 for i in range(1, 101):
81 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
82 pkt_raw = (Ether(src=tg_if1_mac, dst=dut_if1_mac) /
83 IP(src=src_ip_base+str(i), dst=dst_ip) /
86 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
87 pkt_raw = (Ether(src=tg_if1_mac, dst=dut_if1_mac) /
88 IPv6(src=src_ip_base+str(i), dst=dst_ip) /
92 raise ValueError("IP not in correct format")
94 sent_packets.append(pkt_raw)
100 raise RuntimeError('ICMPv6 echo reply Rx timeout')
102 if ether.haslayer(ICMPv6ND_NS):
103 # read another packet in the queue in case of ICMPv6ND_NS packet
106 # otherwise process the current packet
110 raise RuntimeError("ICMP echo Rx timeout")
111 if not ether.haslayer(ip_format):
112 raise RuntimeError("Not an IP packet received {0}".
113 format(ether.__repr__()))
115 if ether[Ether].src != dut_if2_mac:
116 raise RuntimeError("Source MAC address error")
118 if ether[Ether].dst == path_1_mac:
120 elif ether[Ether].dst == path_2_mac:
123 raise RuntimeError("Destination MAC address error")
125 if (path_1_counter + path_2_counter) != 100:
126 raise RuntimeError("Packet loss: recevied only {} packets of 100 ".
127 format(path_1_counter + path_2_counter))
129 if path_1_counter == 0:
130 raise RuntimeError("Path 1 error!")
132 if path_2_counter == 0:
133 raise RuntimeError("Path 2 error!")
135 print "Path_1 counter: {}".format(path_1_counter)
136 print "Path_2 counter: {}".format(path_2_counter)
141 if __name__ == "__main__":