2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends DHCPv6 proxy packets."""
17 from scapy.layers.dhcp6 import *
18 from scapy.layers.inet6 import IPv6, UDP, UDP_SERVICES
20 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
21 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
24 def _check_udp_checksum(pkt):
25 """Check udp checksum in ip packet.
26 Return true if checksum is correct."""
27 new = pkt.__class__(str(pkt))
29 new = new.__class__(str(new))
30 return new['UDP'].chksum == pkt['UDP'].chksum
33 def _get_dhcpv6_msgtype(msg_index):
34 """Return DHCPv6 message type string.
36 :param msg_index: Index of message type.
37 :return: Message type.
52 11: "INFORMATION-REQUEST",
56 return dhcp6_messages[msg_index]
59 def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
60 server_ip, server_mac, client_duid, client_mac):
61 """Send and check DHCPv6 SOLICIT proxy packet.
63 :param tx_if: Client interface.
64 :param rx_if: DHCPv6 server interface.
65 :param dhcp_multicast_ip: Servers and relay agents multicast address.
66 :param link_local_ip: Client link-local address.
67 :param proxy_ip: IP address of DHCPv6 proxy server.
68 :param server_ip: IP address of DHCPv6 server.
69 :param server_mac: MAC address of DHCPv6 server.
70 :param client_duid: Client DHCP Unique Identifier.
71 :param client_mac: Client MAC address.
74 :type dhcp_multicast_ip: str
75 :type link_local_ip: str
79 :type client_duid: str
81 :return interface_id: ID of proxy interface.
82 :rtype interface_id: str
90 dhcp6_solicit_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
91 IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
92 UDP(sport=UDP_SERVICES.dhcpv6_client,
93 dport=UDP_SERVICES.dhcpv6_server) / \
95 DHCP6OptClientId(duid=client_duid)
97 sent_packets.append(dhcp6_solicit_pkt)
98 txq.send(dhcp6_solicit_pkt)
103 raise RuntimeError('DHCPv6 SOLICIT timeout')
105 if ether.dst != server_mac:
106 raise RuntimeError("Destination MAC address error: {} != {}".format(
107 ether.dst, server_mac))
108 print "Destination MAC address: OK."
110 if ether['IPv6'].src != proxy_ip:
111 raise RuntimeError("Source IP address error: {} != {}".format(
112 ether['IPv6'].src, proxy_ip))
113 print "Source IP address: OK."
115 if ether['IPv6'].dst != server_ip:
116 raise RuntimeError("Destination IP address error: {} != {}".format(
117 ether['IPv6'].dst, server_ip))
118 print "Destination IP address: OK."
120 msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
121 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
122 if msgtype != 'RELAY-FORW':
123 raise RuntimeError("Message type error: {} != RELAY-FORW".format(
125 print "Message type: OK."
127 linkaddr = ether['IPv6']['UDP']\
128 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
129 if linkaddr != proxy_ip:
130 raise RuntimeError("Proxy IP address error: {} != {}".format(
132 print "Proxy IP address: OK."
135 interface_id = ether['IPv6']['UDP']\
136 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\
137 ['Unknown DHCPv6 OPtion']['DHCP6 Interface-Id Option'].ifaceid
139 raise RuntimeError("DHCP6 Interface-Id error!")
144 def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip,
145 server_ip, server_mac, proxy_to_server_mac, interface_id):
146 """Send and check DHCPv6 ADVERTISE proxy packet.
148 :param rx_if: DHCPv6 server interface.
149 :param tx_if: Client interface.
150 :param link_local_ip: Client link-local address.
151 :param proxy_ip: IP address of DHCPv6 proxy server.
152 :param server_ip: IP address of DHCPv6 server.
153 :param server_mac: MAC address of DHCPv6 server.
154 :param proxy_to_server_mac: MAC address of DHCPv6 proxy interface.
155 :param interface_id: ID of proxy interface.
158 :type link_local_ip: str
161 :type server_mac: str
162 :type proxy_to_server_mac: str
163 :type interface_id: str
171 dhcp6_advertise_pkt = Ether(src=server_mac, dst=proxy_to_server_mac) / \
172 IPv6(src=server_ip, dst=proxy_ip) / \
173 UDP(sport=UDP_SERVICES.dhcpv6_server,
174 dport=UDP_SERVICES.dhcpv6_client) / \
175 DHCP6_RelayReply(peeraddr=link_local_ip,
176 linkaddr=proxy_ip) / \
177 DHCP6OptIfaceId(ifaceid=interface_id) / \
178 DHCP6OptRelayMsg() / \
181 sent_packets.append(dhcp6_advertise_pkt)
182 txq.send(dhcp6_advertise_pkt)
187 raise RuntimeError('DHCPv6 ADVERTISE timeout')
189 if ether['IPv6'].src != proxy_ip:
190 raise RuntimeError("Source IP address error: {} != {}".format(
191 ether['IPv6'].src, proxy_ip))
192 print "Source IP address: OK."
194 if not _check_udp_checksum(ether['IPv6']):
195 raise RuntimeError("Checksum error!")
196 print "Checksum: OK."
198 msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
199 ['DHCPv6 Advertise Message'].msgtype)
200 if msgtype != 'ADVERTISE':
201 raise RuntimeError("Message type error: {} != ADVERTISE".format(
203 print "Message type: OK."
206 def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
207 server_ip, client_duid, client_mac):
208 """Send and check DHCPv6 REQUEST proxy packet.
210 :param tx_if: Client interface.
211 :param rx_if: DHCPv6 server interface.
212 :param dhcp_multicast_ip: Servers and relay agents multicast address.
213 :param link_local_ip: Client link-local address.
214 :param proxy_ip: IP address of DHCPv6 proxy server.
215 :param server_ip: IP address of DHCPv6 server.
216 :param client_duid: Client DHCP Unique Identifier.
217 :param client_mac: Client MAC address.
220 :type dhcp_multicast_ip: str
221 :type link_local_ip: str
224 :type client_duid: str
225 :type client_mac: str
233 dhcp6_request_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
234 IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
235 UDP(sport=UDP_SERVICES.dhcpv6_client,
236 dport=UDP_SERVICES.dhcpv6_server) / \
238 DHCP6OptClientId(duid=client_duid)
240 sent_packets.append(dhcp6_request_pkt)
241 txq.send(dhcp6_request_pkt)
246 raise RuntimeError('DHCPv6 REQUEST timeout')
248 if ether['IPv6'].src != proxy_ip:
249 raise RuntimeError("Source IP address error: {} != {}".format(
250 ether['IPv6'].src, proxy_ip))
251 print "Source IP address: OK."
253 if ether['IPv6'].dst != server_ip:
254 raise RuntimeError("Destination IP address error: {} != {}".format(
255 ether['IPv6'].dst, server_ip))
256 print "Destination IP address: OK."
258 msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
259 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
260 if msgtype != 'RELAY-FORW':
261 raise RuntimeError("Message type error: {} != RELAY-FORW".format(
263 print "Message type: OK."
265 linkaddr = ether['IPv6']['UDP']\
266 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
267 if linkaddr != proxy_ip:
268 raise RuntimeError("Proxy IP address error: {} != {}".format(
270 print "Proxy IP address: OK."
273 def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac,
275 """Send and check DHCPv6 REPLY proxy packet.
277 :param rx_if: DHCPv6 server interface.
278 :param tx_if: Client interface.
279 :param link_local_ip: Client link-local address.
280 :param proxy_ip: IP address of DHCPv6 proxy server.
281 :param server_ip: IP address of DHCPv6 server.
282 :param server_mac: MAC address of DHCPv6 server.
283 :param interface_id: ID of proxy interface.
286 :type link_local_ip: str
289 :type server_mac: str
290 :type interface_id: str
298 dhcp_reply_pkt = Ether(src=server_mac) / \
299 IPv6(src=server_ip, dst=proxy_ip) / \
300 UDP(sport=UDP_SERVICES.dhcpv6_server,
301 dport=UDP_SERVICES.dhcpv6_client) / \
302 DHCP6_RelayReply(peeraddr=link_local_ip,
303 linkaddr=proxy_ip) / \
304 DHCP6OptIfaceId(ifaceid=interface_id) / \
305 DHCP6OptRelayMsg() / \
308 sent_packets.append(dhcp_reply_pkt)
309 txq.send(dhcp_reply_pkt)
314 raise RuntimeError('DHCPv6 REPLY timeout')
316 if ether['IPv6'].src != proxy_ip:
317 raise RuntimeError("Source IP address error: {} != {}".format(
318 ether['IPv6'].src, proxy_ip))
319 print "Source IP address: OK."
321 if not _check_udp_checksum(ether['IPv6']):
322 raise RuntimeError("Checksum error!")
323 print "Checksum: OK."
325 msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
326 ['DHCPv6 Reply Message'].msgtype)
327 if msgtype != 'REPLY':
328 raise RuntimeError("Message type error: {} != REPLY".format(msgtype))
329 print "Message type: OK."
333 """Send DHCPv6 proxy messages."""
335 args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip', 'proxy_ip', 'proxy_mac',
336 'server_ip', 'client_mac', 'server_mac',
337 'proxy_to_server_mac'])
339 client_if = args.get_arg('tx_if')
340 server_if = args.get_arg('rx_if')
341 proxy_ip = args.get_arg('proxy_ip')
342 proxy_mac = args.get_arg('proxy_mac')
343 proxy_to_server_mac = args.get_arg('proxy_to_server_mac')
344 server_ip = args.get_arg('server_ip')
345 client_mac = args.get_arg('client_mac')
346 server_mac = args.get_arg('server_mac')
348 link_local_ip = "fe80::1"
349 dhcp_multicast_ip = "ff02::1:2"
350 client_duid = str(random.randint(0, 9999))
353 interface_id = dhcpv6_solicit(client_if, server_if, dhcp_multicast_ip,
354 link_local_ip, proxy_ip, server_ip,
355 server_mac, client_duid, client_mac)
358 dhcpv6_advertise(client_if, server_if, link_local_ip, proxy_ip,
359 server_ip, server_mac, proxy_to_server_mac, interface_id)
362 dhcpv6_request(client_if, server_if, dhcp_multicast_ip, link_local_ip,
363 proxy_ip, server_ip, client_duid, client_mac)
366 dhcpv6_reply(client_if, server_if, link_local_ip, proxy_ip, server_ip,
367 server_mac, interface_id)
371 if __name__ == "__main__":