2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
23 from scapy.layers.inet import IP, ICMP, ARP
24 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
25 from scapy.layers.l2 import Ether
27 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
28 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
31 def valid_ipv4(address):
32 """Check if IP address has the correct IPv4 address format.
34 :param address: IP address.
36 :return: True in case of correct IPv4 address format,
37 otherwise return false.
41 ipaddress.IPv4Address(unicode(address))
43 except (AttributeError, ipaddress.AddressValueError):
47 def valid_ipv6(address):
48 """Check if IP address has the correct IPv6 address format.
50 :param address: IP address.
52 :return: True in case of correct IPv6 address format,
53 otherwise return false.
57 ipaddress.IPv6Address(unicode(address))
59 except (AttributeError, ipaddress.AddressValueError):
64 """Send a simple L2 or ICMP packet from one TG interface to DUT, then
65 receive a copy of the packet on the second TG interface, and a copy of
67 args = TrafficScriptArg(
68 ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype'])
70 src_mac = args.get_arg('tg_src_mac')
71 dst_mac = args.get_arg('dut_if1_mac')
72 src_ip = args.get_arg('src_ip')
73 dst_ip = args.get_arg('dst_ip')
74 tx_if = args.get_arg('tx_if')
75 rx_if = args.get_arg('rx_if')
76 ptype = args.get_arg('ptype')
78 rxq_mirrored = RxQueue(rx_if)
79 rxq_tx = RxQueue(tx_if)
85 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
86 ARP(hwsrc=src_mac, hwdst="00:00:00:00:00:00",
87 psrc=src_ip, pdst=dst_ip, op="who-has"))
89 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
90 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
91 IP(src=src_ip, dst=dst_ip) /
92 ICMP(type="echo-request"))
94 raise ValueError("IP addresses not in correct format")
95 elif ptype == "ICMPv6":
96 if valid_ipv6(src_ip) and valid_ipv6(dst_ip):
97 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
98 IPv6(src=src_ip, dst=dst_ip) /
101 raise ValueError("IPv6 addresses not in correct format")
103 raise RuntimeError("Unexpected payload type.")
106 sent.append(auto_pad(pkt_raw))
107 ether = rxq_mirrored.recv(2)
109 # Receive copy of Rx packet.
111 raise RuntimeError("Rx timeout of mirrored Rx packet")
112 pkt = auto_pad(pkt_raw)
113 if str(ether) != str(pkt):
114 print("Mirrored Rx packet doesn't match the original Rx packet.")
115 if ether.src != src_mac or ether.dst != dst_mac:
116 raise RuntimeError("MAC mismatch in mirrored Rx packet.")
118 if not ether.haslayer(ARP):
119 raise RuntimeError("Mirrored Rx packet is not an ARP packet.")
120 if ether['ARP'].op != 1: # 1=who-has
121 raise RuntimeError("Mirrored Rx packet is not an ARP request.")
122 if ether['ARP'].hwsrc != src_mac or ether['ARP'].hwdst != dst_mac:
123 raise RuntimeError("MAC mismatch in mirrored Rx ARP packet.")
124 if ether['ARP'].psrc != src_ip or ether['ARP'].pdst != dst_ip:
125 raise RuntimeError("IP address mismatch in mirrored "
127 elif ptype == "ICMP":
128 if not ether.haslayer(IP):
129 raise RuntimeError("Mirrored Rx packet is not an IPv4 packet.")
130 if ether['IP'].src != src_ip or ether['IP'].dst != dst_ip:
131 raise RuntimeError("IP address mismatch in mirrored "
133 if not ether.haslayer(ICMP):
134 raise RuntimeError("Mirrored Rx packet is not an ICMP packet.")
135 if ether['ICMP'].type != 8: # 8=echo-request
136 raise RuntimeError("Mirrored Rx packet is not an ICMP "
138 elif ptype == "ICMPv6":
139 if not ether.haslayer(IPv6):
140 raise RuntimeError("Mirrored Rx packet is not an IPv6 packet.")
141 if ether['IPv6'].src != src_ip or ether['IPv6'].dst != dst_ip:
142 raise RuntimeError("IP address mismatch in mirrored "
144 if not ether.haslayer(ICMPv6EchoRequest):
145 raise RuntimeError("Mirrored Rx packet is not an ICMPv6 "
147 print("Mirrored Rx packet check OK.\n")
149 # Receive reply on TG Tx port.
150 ether_repl = rxq_tx.recv(2, sent)
151 if ether_repl is None:
152 raise RuntimeError("Reply not received on TG Tx port.")
154 print("Reply received on TG Tx port.\n")
156 # Receive copy of Tx packet.
157 ether = rxq_mirrored.recv(2)
159 raise RuntimeError("Rx timeout of mirrored Tx packet")
160 if str(ether) != str(ether_repl):
161 print("Mirrored Tx packet doesn't match the received Tx packet.")
162 if ether.src != ether_repl.src or ether.dst != ether_repl.dst:
163 raise RuntimeError("MAC mismatch in mirrored Tx packet.")
165 if not ether.haslayer(ARP):
166 raise RuntimeError("Mirrored Tx packet is not an ARP packet.")
167 if ether['ARP'].op != ether_repl['ARP'].op: # 2=is_at
168 raise RuntimeError("ARP operational code mismatch "
169 "in mirrored Tx packet.")
170 if ether['ARP'].hwsrc != ether_repl['ARP'].hwsrc\
171 or ether['ARP'].hwdst != ether_repl['ARP'].hwdst:
172 raise RuntimeError("MAC mismatch in mirrored Tx ARP packet.")
173 if ether['ARP'].psrc != ether_repl['ARP'].psrc\
174 or ether['ARP'].pdst != ether_repl['ARP'].pdst:
175 raise RuntimeError("IP address mismatch in mirrored "
177 elif ptype == "ICMP":
178 if not ether.haslayer(IP):
179 raise RuntimeError("Mirrored Tx packet is not an IPv4 packet.")
180 if ether['IP'].src != ether_repl['IP'].src\
181 or ether['IP'].dst != ether_repl['IP'].dst:
182 raise RuntimeError("IP address mismatch in mirrored "
184 if not ether.haslayer(ICMP):
185 raise RuntimeError("Mirrored Tx packet is not an ICMP packet.")
186 if ether['ICMP'].type != ether_repl['ICMP'].type: # 0=echo-reply
187 raise RuntimeError("ICMP packet type mismatch "
188 "in mirrored Tx packet.")
189 elif ptype == "ICMPv6":
190 if not ether.haslayer(IPv6):
191 raise RuntimeError("Mirrored Tx packet is not an IPv6 packet.")
192 if ether['IPv6'].src != ether_repl['IPv6'].src\
193 or ether['IPv6'].dst != ether_repl['IPv6'].dst:
194 raise RuntimeError("IP address mismatch in mirrored "
196 if ether[2].name != ether_repl[2].name:
197 raise RuntimeError("ICMPv6 message type mismatch "
198 "in mirrored Tx packet.")
199 print("Mirrored Tx packet check OK.\n")
203 if __name__ == "__main__":