2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends DHCPv6 proxy packets."""
17 from scapy.layers.inet import Ether
18 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
19 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
22 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
23 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26 def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
27 """Send ICMPv6 Neighbor Solicitation packet and expect a response
30 :param tx_if: Interface on TG.
31 :param src_mac: MAC address of TG interface.
32 :param dst_mac: MAC address of proxy interface.
33 :param src_ip: IP address of TG interface.
34 :param dst_ip: IP address of proxied interface.
40 :raises RuntimeError: If the received packet is not correct.
48 icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
50 ICMPv6ND_NS(tgt=dst_ip))
52 sent_packets.append(icmpv6nd_solicit_pkt)
53 txq.send(icmpv6nd_solicit_pkt)
58 pkt = rxq.recv(3, ignore=sent_packets)
59 if ether.haslayer(ICMPv6ND_NS):
60 # read another packet in the queue in case of ICMPv6ND_NS packet
63 # otherwise process the current packet
70 raise RuntimeError('ICMPv6ND Proxy response timeout.')
72 if ether.src != dst_mac:
73 raise RuntimeError("Source MAC address error: {} != {}".
74 format(ether.src, dst_mac))
75 print "Source MAC address: OK."
77 if ether.dst != src_mac:
78 raise RuntimeError("Destination MAC address error: {} != {}".
79 format(ether.dst, src_mac))
80 print "Destination MAC address: OK."
82 if ether[IPv6].src != dst_ip:
83 raise RuntimeError("Source IP address error: {} != {}".
84 format(ether[IPv6].src, dst_ip))
85 print "Source IP address: OK."
87 if ether[IPv6].dst != src_ip:
88 raise RuntimeError("Destination IP address error: {} != {}".
89 format(ether[IPv6].dst, src_ip))
90 print "Destination IP address: OK."
93 target_addr = ether[IPv6][ICMPv6ND_NA].tgt
94 except (KeyError, AttributeError):
95 raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
97 if target_addr != dst_ip:
98 raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
99 format(target_addr, dst_ip))
100 print "Target address field: OK."
103 def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
104 proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip):
105 """Sends ICMPv6 Echo Request, receive it and send a reply.
107 :param src_if: First TG interface on link to DUT.
108 :param dst_if: Second TG interface on link to DUT.
109 :param src_mac: MAC address of first interface.
110 :param dst_mac: MAC address of second interface.
111 :param proxy_to_src_mac: MAC address of first proxy interface on DUT.
112 :param proxy_to_dst_mac: MAC address of second proxy interface on DUT.
113 :param src_ip: IP address of first interface.
114 :param dst_ip: IP address of second interface.
119 :type proxy_to_src_mac: str
120 :type proxy_to_dst_mac: str
123 :raises RuntimeError: If a received packet is not correct.
125 rxq = RxQueue(dst_if)
126 txq = TxQueue(src_if)
128 icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
129 IPv6(src=src_ip, dst=dst_ip) /
132 txq.send(icmpv6_ping_pkt)
138 if ether.haslayer(ICMPv6ND_NS):
139 # read another packet in the queue in case of ICMPv6ND_NS packet
142 # otherwise process the current packet
149 raise RuntimeError('ICMPv6 Echo Request timeout.')
151 ether[IPv6]["ICMPv6 Echo Request"]
153 raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
154 print "ICMP Echo: OK."
156 rxq = RxQueue(src_if)
157 txq = TxQueue(dst_if)
159 icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
160 IPv6(src=dst_ip, dst=src_ip) /
163 txq.send(icmpv6_ping_pkt)
169 if ether.haslayer(ICMPv6ND_NS):
170 # read another packet in the queue in case of ICMPv6ND_NS packet
173 # otherwise process the current packet
180 raise RuntimeError('DHCPv6 SOLICIT timeout')
182 ether[IPv6]["ICMPv6 Echo Reply"]
184 raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
186 print "ICMP Reply: OK."
190 """Send DHCPv6 proxy messages."""
192 args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac',
193 'proxy_to_src_mac', 'proxy_to_dst_mac'])
195 src_if = args.get_arg('tx_if')
196 dst_if = args.get_arg('rx_if')
197 src_ip = args.get_arg("src_ip")
198 dst_ip = args.get_arg("dst_ip")
199 src_mac = args.get_arg('src_mac')
200 dst_mac = args.get_arg('dst_mac')
201 proxy_to_src_mac = args.get_arg('proxy_to_src_mac')
202 proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac')
204 # Neighbor solicitation
205 imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
207 # Verify route (ICMP echo/reply)
208 ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
209 proxy_to_dst_mac, src_ip, dst_ip)
212 if __name__ == "__main__":