2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends DHCPv6 proxy packets."""
17 from scapy.layers.inet import Ether
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
21 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
22 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
25 def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
26 """Send ICMPv6 Neighbor Solicitation packet and expect a response
29 :param tx_if: Interface on TG.
30 :param src_mac: MAC address of TG interface.
31 :param dst_mac: MAC address of proxy interface.
32 :param src_ip: IP address of TG interface.
33 :param dst_ip: IP address of proxied interface.
39 :raises RuntimeError: If the received packet is not correct.
47 icmpv6nd_solicit_pkt =\
48 Ether(src=src_mac, dst=dst_mac) / \
50 ICMPv6ND_NS(tgt=dst_ip)
52 sent_packets.append(icmpv6nd_solicit_pkt)
53 txq.send(icmpv6nd_solicit_pkt)
57 pkt = rxq.recv(3, ignore=sent_packets)
63 raise RuntimeError('ICMPv6ND Proxy response timeout.')
65 if ether.src != dst_mac:
66 raise RuntimeError("Source MAC address error: {} != {}".format(
68 print "Source MAC address: OK."
70 if ether.dst != src_mac:
71 raise RuntimeError("Destination MAC address error: {} != {}".format(
73 print "Destination MAC address: OK."
75 if ether['IPv6'].src != dst_ip:
76 raise RuntimeError("Source IP address error: {} != {}".format(
77 ether['IPv6'].src, dst_ip))
78 print "Source IP address: OK."
80 if ether['IPv6'].dst != src_ip:
81 raise RuntimeError("Destination IP address error: {} != {}".format(
82 ether['IPv6'].dst, src_ip))
83 print "Destination IP address: OK."
86 target_addr = ether['IPv6']\
87 ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
88 except (KeyError, AttributeError):
89 raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
91 if target_addr != dst_ip:
92 raise RuntimeError("ICMPv6 field 'Target address' error:"
93 " {} != {}".format(target_addr, dst_ip))
94 print "Target address field: OK."
97 def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
98 proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip):
99 """Sends ICMPv6 Echo Request, receive it and send a reply.
101 :param src_if: First TG interface on link to DUT.
102 :param dst_if: Second TG interface on link to DUT.
103 :param src_mac: MAC address of first interface.
104 :param dst_mac: MAC address of second interface.
105 :param proxy_to_src_mac: MAC address of first proxy interface on DUT.
106 :param proxy_to_dst_mac: MAC address of second proxy interface on DUT.
107 :param src_ip: IP address of first interface.
108 :param dst_ip: IP address of second interface.
113 :type proxy_to_src_mac: str
114 :type proxy_to_dst_mac: str
117 :raises RuntimeError: If a received packet is not correct.
119 rxq = RxQueue(dst_if)
120 txq = TxQueue(src_if)
123 Ether(src=src_mac, dst=proxy_to_src_mac) / \
124 IPv6(src=src_ip, dst=dst_ip) / \
127 txq.send(icmpv6_ping_pkt)
137 raise RuntimeError('ICMPv6 Echo Request timeout.')
139 ether["IPv6"]["ICMPv6 Echo Request"]
141 raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
142 print "ICMP Echo: OK."
144 rxq = RxQueue(src_if)
145 txq = TxQueue(dst_if)
148 Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
149 IPv6(src=dst_ip, dst=src_ip) / \
152 txq.send(icmpv6_ping_pkt)
162 raise RuntimeError('DHCPv6 SOLICIT timeout')
164 ether["IPv6"]["ICMPv6 Echo Reply"]
166 raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
168 print "ICMP Reply: OK."
172 """Send DHCPv6 proxy messages."""
174 args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac',
175 'proxy_to_src_mac', 'proxy_to_dst_mac'])
177 src_if = args.get_arg('tx_if')
178 dst_if = args.get_arg('rx_if')
179 src_ip = args.get_arg("src_ip")
180 dst_ip = args.get_arg("dst_ip")
181 src_mac = args.get_arg('src_mac')
182 dst_mac = args.get_arg('dst_mac')
183 proxy_to_src_mac = args.get_arg('proxy_to_src_mac')
184 proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac')
186 # Neighbor solicitation
187 imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
189 # Verify route (ICMP echo/reply)
190 ipv6_ping(src_if, dst_if, src_mac, dst_mac,
191 proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
193 if __name__ == "__main__":