2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
23 from scapy.layers.inet import IP, ICMP, ARP
24 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
25 from scapy.layers.l2 import Ether
27 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
28 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
31 def valid_ipv4(address):
32 """Check if IP address has the correct IPv4 address format.
34 :param address: IP address.
36 :return: True in case of correct IPv4 address format,
37 otherwise return false.
41 ipaddress.IPv4Address(unicode(address))
43 except (AttributeError, ipaddress.AddressValueError):
47 def valid_ipv6(address):
48 """Check if IP address has the correct IPv6 address format.
50 :param address: IP address.
52 :return: True in case of correct IPv6 address format,
53 otherwise return false.
57 ipaddress.IPv6Address(unicode(address))
59 except (AttributeError, ipaddress.AddressValueError):
64 """Send a simple L2 or ICMP packet from one TG interface to DUT, then
65 receive a copy of the packet on the second TG interface, and a copy of
67 args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
70 src_mac = args.get_arg('tg_src_mac')
71 dst_mac = args.get_arg('dut_if1_mac')
72 src_ip = args.get_arg('src_ip')
73 dst_ip = args.get_arg('dst_ip')
74 tx_if = args.get_arg('tx_if')
75 rx_if = args.get_arg('rx_if')
76 ptype = args.get_arg('ptype')
78 rxq_mirrored = RxQueue(rx_if)
79 rxq_tx = RxQueue(tx_if)
85 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
86 ARP(hwsrc=src_mac, hwdst="00:00:00:00:00:00",
87 psrc=src_ip, pdst=dst_ip, op="who-has"))
89 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
90 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
91 IP(src=src_ip, dst=dst_ip) /
92 ICMP(type="echo-request"))
94 raise ValueError("IP addresses not in correct format")
95 elif ptype == "ICMPv6":
96 if valid_ipv6(src_ip) and valid_ipv6(dst_ip):
97 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
98 IPv6(src=src_ip, dst=dst_ip) /
101 raise ValueError("IPv6 addresses not in correct format")
103 raise RuntimeError("Unexpected payload type.")
106 sent.append(auto_pad(pkt_raw))
108 # Receive copy of Rx packet.
110 ether = rxq_mirrored.recv(2)
112 raise RuntimeError("Rx timeout of mirrored Rx packet")
114 if ether.haslayer(ICMPv6ND_NS):
115 # read another packet in the queue if the current one is ICMPv6ND_NS
118 # otherwise process the current packet
121 pkt = auto_pad(pkt_raw)
122 if str(ether) != str(pkt):
123 print("Mirrored Rx packet doesn't match the original Rx packet.")
124 if ether.src != src_mac or ether.dst != dst_mac:
125 raise RuntimeError("MAC mismatch in mirrored Rx packet.")
127 if not ether.haslayer(ARP):
128 raise RuntimeError("Mirrored Rx packet is not an ARP packet.")
129 if ether['ARP'].op != 1: # 1=who-has
130 raise RuntimeError("Mirrored Rx packet is not an ARP request.")
131 if ether['ARP'].hwsrc != src_mac or ether['ARP'].hwdst != dst_mac:
132 raise RuntimeError("MAC mismatch in mirrored Rx ARP packet.")
133 if ether['ARP'].psrc != src_ip or ether['ARP'].pdst != dst_ip:
134 raise RuntimeError("IP address mismatch in mirrored "
136 elif ptype == "ICMP":
137 if not ether.haslayer(IP):
138 raise RuntimeError("Mirrored Rx packet is not an IPv4 packet.")
139 if ether['IP'].src != src_ip or ether['IP'].dst != dst_ip:
140 raise RuntimeError("IP address mismatch in mirrored "
142 if not ether.haslayer(ICMP):
143 raise RuntimeError("Mirrored Rx packet is not an ICMP packet.")
144 if ether['ICMP'].type != 8: # 8=echo-request
145 raise RuntimeError("Mirrored Rx packet is not an ICMP "
147 elif ptype == "ICMPv6":
148 if not ether.haslayer(IPv6):
149 raise RuntimeError("Mirrored Rx packet is not an IPv6 packet.")
150 if ether['IPv6'].src != src_ip or ether['IPv6'].dst != dst_ip:
151 raise RuntimeError("IP address mismatch in mirrored "
153 if not ether.haslayer(ICMPv6EchoRequest):
154 raise RuntimeError("Mirrored Rx packet is not an ICMPv6 "
156 print("Mirrored Rx packet check OK.\n")
158 # Receive reply on TG Tx port.
159 ether_repl = rxq_tx.recv(2, sent)
161 if ether_repl is None:
162 raise RuntimeError("Reply not received on TG Tx port.")
164 print("Reply received on TG Tx port.\n")
166 # Receive copy of Tx packet.
167 ether = rxq_mirrored.recv(2)
169 raise RuntimeError("Rx timeout of mirrored Tx packet")
171 if str(ether) != str(ether_repl):
172 print("Mirrored Tx packet doesn't match the received Tx packet.")
173 if ether.src != ether_repl.src or ether.dst != ether_repl.dst:
174 raise RuntimeError("MAC mismatch in mirrored Tx packet.")
176 if not ether.haslayer(ARP):
177 raise RuntimeError("Mirrored Tx packet is not an ARP packet.")
178 if ether['ARP'].op != ether_repl['ARP'].op: # 2=is_at
179 raise RuntimeError("ARP operational code mismatch "
180 "in mirrored Tx packet.")
181 if ether['ARP'].hwsrc != ether_repl['ARP'].hwsrc\
182 or ether['ARP'].hwdst != ether_repl['ARP'].hwdst:
183 raise RuntimeError("MAC mismatch in mirrored Tx ARP packet.")
184 if ether['ARP'].psrc != ether_repl['ARP'].psrc\
185 or ether['ARP'].pdst != ether_repl['ARP'].pdst:
186 raise RuntimeError("IP address mismatch in mirrored "
188 elif ptype == "ICMP":
189 if not ether.haslayer(IP):
190 raise RuntimeError("Mirrored Tx packet is not an IPv4 packet.")
191 if ether['IP'].src != ether_repl['IP'].src\
192 or ether['IP'].dst != ether_repl['IP'].dst:
193 raise RuntimeError("IP address mismatch in mirrored "
195 if not ether.haslayer(ICMP):
196 raise RuntimeError("Mirrored Tx packet is not an ICMP packet.")
197 if ether['ICMP'].type != ether_repl['ICMP'].type: # 0=echo-reply
198 raise RuntimeError("ICMP packet type mismatch "
199 "in mirrored Tx packet.")
200 elif ptype == "ICMPv6":
201 if not ether.haslayer(IPv6):
202 raise RuntimeError("Mirrored Tx packet is not an IPv6 packet.")
203 if ether['IPv6'].src != ether_repl['IPv6'].src\
204 or ether['IPv6'].dst != ether_repl['IPv6'].dst:
205 raise RuntimeError("IP address mismatch in mirrored "
207 if ether[2].name != ether_repl[2].name:
208 raise RuntimeError("ICMPv6 message type mismatch "
209 "in mirrored Tx packet.")
210 print("Mirrored Tx packet check OK.\n")
214 if __name__ == "__main__":