2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends DHCPv6 proxy packets."""
17 from scapy.layers.dhcp6 import *
18 from scapy.layers.inet6 import IPv6, UDP, UDP_SERVICES
20 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
21 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
24 def _check_udp_checksum(pkt):
25 """Check udp checksum in ip packet.
26 Return true if checksum is correct."""
27 new = pkt.__class__(str(pkt))
29 new = new.__class__(str(new))
30 return new['UDP'].chksum == pkt['UDP'].chksum
33 def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
34 server_ip, server_mac, client_duid, client_mac):
35 """Send and check DHCPv6 SOLICIT proxy packet.
37 :param tx_if: Client interface.
38 :param rx_if: DHCPv6 server interface.
39 :param dhcp_multicast_ip: Servers and relay agents multicast address.
40 :param link_local_ip: Client link-local address.
41 :param proxy_ip: IP address of DHCPv6 proxy server.
42 :param server_ip: IP address of DHCPv6 server.
43 :param server_mac: MAC address of DHCPv6 server.
44 :param client_duid: Client DHCP Unique Identifier.
45 :param client_mac: Client MAC address.
48 :type dhcp_multicast_ip: str
49 :type link_local_ip: str
53 :type client_duid: str
55 :return interface_id: ID of proxy interface.
56 :rtype interface_id: str
64 dhcp6_solicit_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
65 IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
66 UDP(sport=UDP_SERVICES.dhcpv6_client,
67 dport=UDP_SERVICES.dhcpv6_server) / \
69 DHCP6OptClientId(duid=client_duid)
71 sent_packets.append(dhcp6_solicit_pkt)
72 txq.send(dhcp6_solicit_pkt)
77 raise RuntimeError('DHCPv6 SOLICIT timeout')
79 if ether.dst != server_mac:
80 raise RuntimeError("Destination MAC address error: {} != {}".format(
81 ether.dst, server_mac))
82 print "Destination MAC address: OK."
84 if ether['IPv6'].src != proxy_ip:
85 raise RuntimeError("Source IP address error: {} != {}".format(
86 ether['IPv6'].src, proxy_ip))
87 print "Source IP address: OK."
89 if ether['IPv6'].dst != server_ip:
90 raise RuntimeError("Destination IP address error: {} != {}".format(
91 ether['IPv6'].dst, server_ip))
92 print "Destination IP address: OK."
94 if ether['IPv6']['UDP']\
95 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']:
96 print "Relay Agent/Server Message: OK."
98 raise RuntimeError("Relay Agent/Server Message error.")
100 linkaddr = ether['IPv6']['UDP']\
101 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
102 if linkaddr != proxy_ip:
103 raise RuntimeError("Proxy IP address error: {} != {}".format(
105 print "Proxy IP address: OK."
108 interface_id = ether['IPv6']['UDP']\
109 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\
110 ['Unknown DHCPv6 OPtion']['DHCP6 Interface-Id Option'].ifaceid
112 raise RuntimeError("DHCP6 Interface-Id error!")
117 def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip,
118 server_ip, server_mac, proxy_to_server_mac, interface_id):
119 """Send and check DHCPv6 ADVERTISE proxy packet.
121 :param rx_if: DHCPv6 server interface.
122 :param tx_if: Client interface.
123 :param link_local_ip: Client link-local address.
124 :param proxy_ip: IP address of DHCPv6 proxy server.
125 :param server_ip: IP address of DHCPv6 server.
126 :param server_mac: MAC address of DHCPv6 server.
127 :param proxy_to_server_mac: MAC address of DHCPv6 proxy interface.
128 :param interface_id: ID of proxy interface.
131 :type link_local_ip: str
134 :type server_mac: str
135 :type proxy_to_server_mac: str
136 :type interface_id: str
144 dhcp6_advertise_pkt = Ether(src=server_mac, dst=proxy_to_server_mac) / \
145 IPv6(src=server_ip, dst=proxy_ip) / \
146 UDP(sport=UDP_SERVICES.dhcpv6_server,
147 dport=UDP_SERVICES.dhcpv6_client) / \
148 DHCP6_RelayReply(peeraddr=link_local_ip,
149 linkaddr=proxy_ip) / \
150 DHCP6OptIfaceId(ifaceid=interface_id) / \
151 DHCP6OptRelayMsg() / \
154 sent_packets.append(dhcp6_advertise_pkt)
155 txq.send(dhcp6_advertise_pkt)
160 raise RuntimeError('DHCPv6 ADVERTISE timeout')
162 if ether['IPv6'].src != proxy_ip:
163 raise RuntimeError("Source IP address error: {} != {}".format(
164 ether['IPv6'].src, proxy_ip))
165 print "Source IP address: OK."
167 if not _check_udp_checksum(ether['IPv6']):
168 raise RuntimeError("Checksum error!")
169 print "Checksum: OK."
171 msgtype = ether['IPv6']['UDP']['DHCPv6 Advertise Message'].msgtype
172 if msgtype != 'ADVERTISE':
173 raise RuntimeError("Message type error: {} != ADVERTISE".format(
175 print "Message type: OK."
178 def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
179 server_ip, client_duid, client_mac):
180 """Send and check DHCPv6 REQUEST proxy packet.
182 :param tx_if: Client interface.
183 :param rx_if: DHCPv6 server interface.
184 :param dhcp_multicast_ip: Servers and relay agents multicast address.
185 :param link_local_ip: Client link-local address.
186 :param proxy_ip: IP address of DHCPv6 proxy server.
187 :param server_ip: IP address of DHCPv6 server.
188 :param client_duid: Client DHCP Unique Identifier.
189 :param client_mac: Client MAC address.
192 :type dhcp_multicast_ip: str
193 :type link_local_ip: str
196 :type client_duid: str
197 :type client_mac: str
205 dhcp6_request_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
206 IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
207 UDP(sport=UDP_SERVICES.dhcpv6_client,
208 dport=UDP_SERVICES.dhcpv6_server) / \
210 DHCP6OptClientId(duid=client_duid)
212 sent_packets.append(dhcp6_request_pkt)
213 txq.send(dhcp6_request_pkt)
218 raise RuntimeError('DHCPv6 REQUEST timeout')
220 if ether['IPv6'].src != proxy_ip:
221 raise RuntimeError("Source IP address error: {} != {}".format(
222 ether['IPv6'].src, proxy_ip))
223 print "Source IP address: OK."
225 if ether['IPv6'].dst != server_ip:
226 raise RuntimeError("Destination IP address error: {} != {}".format(
227 ether['IPv6'].dst, server_ip))
228 print "Destination IP address: OK."
230 if ether['IPv6']['UDP']\
231 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']:
232 print "Relay Agent/Server Message: OK."
234 raise RuntimeError("Relay Agent/Server Message error.")
236 linkaddr = ether['IPv6']['UDP']\
237 ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
238 if linkaddr != proxy_ip:
239 raise RuntimeError("Proxy IP address error: {} != {}".format(
241 print "Proxy IP address: OK."
244 def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac,
246 """Send and check DHCPv6 REPLY proxy packet.
248 :param rx_if: DHCPv6 server interface.
249 :param tx_if: Client interface.
250 :param link_local_ip: Client link-local address.
251 :param proxy_ip: IP address of DHCPv6 proxy server.
252 :param server_ip: IP address of DHCPv6 server.
253 :param server_mac: MAC address of DHCPv6 server.
254 :param interface_id: ID of proxy interface.
257 :type link_local_ip: str
260 :type server_mac: str
261 :type interface_id: str
269 dhcp_reply_pkt = Ether(src=server_mac) / \
270 IPv6(src=server_ip, dst=proxy_ip) / \
271 UDP(sport=UDP_SERVICES.dhcpv6_server,
272 dport=UDP_SERVICES.dhcpv6_client) / \
273 DHCP6_RelayReply(peeraddr=link_local_ip,
274 linkaddr=proxy_ip) / \
275 DHCP6OptIfaceId(ifaceid=interface_id) / \
276 DHCP6OptRelayMsg() / \
279 sent_packets.append(dhcp_reply_pkt)
280 txq.send(dhcp_reply_pkt)
285 raise RuntimeError('DHCPv6 REPLY timeout')
287 if ether['IPv6'].src != proxy_ip:
288 raise RuntimeError("Source IP address error: {} != {}".format(
289 ether['IPv6'].src, proxy_ip))
290 print "Source IP address: OK."
292 if not _check_udp_checksum(ether['IPv6']):
293 raise RuntimeError("Checksum error!")
294 print "Checksum: OK."
296 msgtype = ether['IPv6']['UDP']['DHCPv6 Reply Message'].msgtype
297 if msgtype != 'REPLY':
298 raise RuntimeError("Message type error: {} != REPLY".format(msgtype))
299 print "Message type: OK."
303 """Send DHCPv6 proxy messages."""
305 args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip', 'proxy_ip', 'proxy_mac',
306 'server_ip', 'client_mac', 'server_mac',
307 'proxy_to_server_mac'])
309 client_if = args.get_arg('tx_if')
310 server_if = args.get_arg('rx_if')
311 proxy_ip = args.get_arg('proxy_ip')
312 proxy_mac = args.get_arg('proxy_mac')
313 proxy_to_server_mac = args.get_arg('proxy_to_server_mac')
314 server_ip = args.get_arg('server_ip')
315 client_mac = args.get_arg('client_mac')
316 server_mac = args.get_arg('server_mac')
318 link_local_ip = "fe80::1"
319 dhcp_multicast_ip = "ff02::1:2"
320 client_duid = str(random.randint(0, 9999))
323 interface_id = dhcpv6_solicit(client_if, server_if, dhcp_multicast_ip,
324 link_local_ip, proxy_ip, server_ip,
325 server_mac, client_duid, client_mac)
328 dhcpv6_advertise(client_if, server_if, link_local_ip, proxy_ip,
329 server_ip, server_mac, proxy_to_server_mac, interface_id)
332 dhcpv6_request(client_if, server_if, dhcp_multicast_ip, link_local_ip,
333 proxy_ip, server_ip, client_duid, client_mac)
336 dhcpv6_reply(client_if, server_if, link_local_ip, proxy_ip, server_ip,
337 server_mac, interface_id)
341 if __name__ == "__main__":