CSIT-471: Fix check of DHCPv6 message types.
[csit.git] / resources / traffic_scripts / dhcp / send_dhcp_v6_messages.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends DHCPv6 proxy packets."""
16
17 from scapy.layers.dhcp6 import *
18 from scapy.layers.inet6 import IPv6, UDP, UDP_SERVICES
19
20 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
21 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
22
23
24 def _check_udp_checksum(pkt):
25     """Check udp checksum in ip packet.
26     Return true if checksum is correct."""
27     new = pkt.__class__(str(pkt))
28     del new['UDP'].chksum
29     new = new.__class__(str(new))
30     return new['UDP'].chksum == pkt['UDP'].chksum
31
32
33 def _get_dhcpv6_msgtype(msg_index):
34     """Return DHCPv6 message type string.
35
36     :param msg_index: Index of message type.
37     :return: Message type.
38     :type msg_index: int
39     :rtype msg_str: str
40     """
41     dhcp6_messages = {
42         1: "SOLICIT",
43         2: "ADVERTISE",
44         3: "REQUEST",
45         4: "CONFIRM",
46         5: "RENEW",
47         6: "REBIND",
48         7: "REPLY",
49         8: "RELEASE",
50         9: "DECLINE",
51         10: "RECONFIGURE",
52         11: "INFORMATION-REQUEST",
53         12: "RELAY-FORW",
54         13: "RELAY-REPL"
55     }
56     return dhcp6_messages[msg_index]
57
58
59 def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
60                    server_ip, server_mac, client_duid, client_mac):
61     """Send and check DHCPv6 SOLICIT proxy packet.
62
63     :param tx_if: Client interface.
64     :param rx_if: DHCPv6 server interface.
65     :param dhcp_multicast_ip: Servers and relay agents multicast address.
66     :param link_local_ip: Client link-local address.
67     :param proxy_ip: IP address of DHCPv6 proxy server.
68     :param server_ip: IP address of DHCPv6 server.
69     :param server_mac: MAC address of DHCPv6 server.
70     :param client_duid: Client DHCP Unique Identifier.
71     :param client_mac: Client MAC address.
72     :type tx_if: str
73     :type rx_if: str
74     :type dhcp_multicast_ip: str
75     :type link_local_ip: str
76     :type proxy_ip: str
77     :type server_ip: str
78     :type server_mac: str
79     :type client_duid: str
80     :type client_mac: str
81     :return interface_id: ID of proxy interface.
82     :rtype interface_id: str
83     """
84
85     rxq = RxQueue(rx_if)
86     txq = TxQueue(tx_if)
87
88     sent_packets = []
89
90     dhcp6_solicit_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
91                         IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
92                         UDP(sport=UDP_SERVICES.dhcpv6_client,
93                             dport=UDP_SERVICES.dhcpv6_server) / \
94                         DHCP6_Solicit() / \
95                         DHCP6OptClientId(duid=client_duid)
96
97     sent_packets.append(dhcp6_solicit_pkt)
98     txq.send(dhcp6_solicit_pkt)
99
100     ether = rxq.recv(2)
101
102     if ether is None:
103         raise RuntimeError('DHCPv6 SOLICIT timeout')
104
105     if ether.dst != server_mac:
106         raise RuntimeError("Destination MAC address error: {} != {}".format(
107             ether.dst, server_mac))
108     print "Destination MAC address: OK."
109
110     if ether['IPv6'].src != proxy_ip:
111         raise RuntimeError("Source IP address error: {} != {}".format(
112             ether['IPv6'].src, proxy_ip))
113     print "Source IP address: OK."
114
115     if ether['IPv6'].dst != server_ip:
116         raise RuntimeError("Destination IP address error: {} != {}".format(
117             ether['IPv6'].dst, server_ip))
118     print "Destination IP address: OK."
119
120     msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
121         ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
122     if msgtype != 'RELAY-FORW':
123         raise RuntimeError("Message type error: {} != RELAY-FORW".format(
124             msgtype))
125     print "Message type: OK."
126
127     linkaddr = ether['IPv6']['UDP']\
128         ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
129     if linkaddr != proxy_ip:
130         raise RuntimeError("Proxy IP address error: {} != {}".format(
131            linkaddr, proxy_ip))
132     print "Proxy IP address: OK."
133
134     try:
135         interface_id =  ether['IPv6']['UDP']\
136             ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\
137             ['Unknown DHCPv6 OPtion']['DHCP6 Interface-Id Option'].ifaceid
138     except Exception:
139         raise RuntimeError("DHCP6 Interface-Id error!")
140
141     return interface_id
142
143
144 def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip,
145                      server_ip, server_mac, proxy_to_server_mac, interface_id):
146     """Send and check DHCPv6 ADVERTISE proxy packet.
147
148     :param rx_if: DHCPv6 server interface.
149     :param tx_if: Client interface.
150     :param link_local_ip: Client link-local address.
151     :param proxy_ip: IP address of DHCPv6 proxy server.
152     :param server_ip: IP address of DHCPv6 server.
153     :param server_mac: MAC address of DHCPv6 server.
154     :param proxy_to_server_mac: MAC address of DHCPv6 proxy interface.
155     :param interface_id: ID of proxy interface.
156     :type rx_if: str
157     :type tx_if: str
158     :type link_local_ip: str
159     :type proxy_ip: str
160     :type server_ip: str
161     :type server_mac: str
162     :type proxy_to_server_mac: str
163     :type interface_id: str
164     """
165
166     rxq = RxQueue(rx_if)
167     txq = TxQueue(tx_if)
168
169     sent_packets = []
170
171     dhcp6_advertise_pkt = Ether(src=server_mac, dst=proxy_to_server_mac) / \
172                           IPv6(src=server_ip, dst=proxy_ip) / \
173                           UDP(sport=UDP_SERVICES.dhcpv6_server,
174                               dport=UDP_SERVICES.dhcpv6_client) / \
175                           DHCP6_RelayReply(peeraddr=link_local_ip,
176                                            linkaddr=proxy_ip) / \
177                           DHCP6OptIfaceId(ifaceid=interface_id) / \
178                           DHCP6OptRelayMsg() / \
179                           DHCP6_Advertise()
180
181     sent_packets.append(dhcp6_advertise_pkt)
182     txq.send(dhcp6_advertise_pkt)
183
184     ether = rxq.recv(2)
185
186     if ether is None:
187         raise RuntimeError('DHCPv6 ADVERTISE timeout')
188
189     if ether['IPv6'].src != proxy_ip:
190         raise RuntimeError("Source IP address error: {} != {}".format(
191             ether['IPv6'].src, proxy_ip))
192     print "Source IP address: OK."
193
194     if not _check_udp_checksum(ether['IPv6']):
195         raise RuntimeError("Checksum error!")
196     print "Checksum: OK."
197
198     msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
199                                   ['DHCPv6 Advertise Message'].msgtype)
200     if msgtype != 'ADVERTISE':
201         raise RuntimeError("Message type error: {} != ADVERTISE".format(
202             msgtype))
203     print "Message type: OK."
204
205
206 def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
207                    server_ip, client_duid, client_mac):
208     """Send and check DHCPv6 REQUEST proxy packet.
209
210     :param tx_if: Client interface.
211     :param rx_if: DHCPv6 server interface.
212     :param dhcp_multicast_ip: Servers and relay agents multicast address.
213     :param link_local_ip: Client link-local address.
214     :param proxy_ip: IP address of DHCPv6 proxy server.
215     :param server_ip: IP address of DHCPv6 server.
216     :param client_duid: Client DHCP Unique Identifier.
217     :param client_mac: Client MAC address.
218     :type tx_if: str
219     :type rx_if: str
220     :type dhcp_multicast_ip: str
221     :type link_local_ip: str
222     :type proxy_ip: str
223     :type server_ip: str
224     :type client_duid: str
225     :type client_mac: str
226     """
227
228     rxq = RxQueue(rx_if)
229     txq = TxQueue(tx_if)
230
231     sent_packets = []
232
233     dhcp6_request_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
234                         IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
235                         UDP(sport=UDP_SERVICES.dhcpv6_client,
236                             dport=UDP_SERVICES.dhcpv6_server) / \
237                         DHCP6_Request() / \
238                         DHCP6OptClientId(duid=client_duid)
239
240     sent_packets.append(dhcp6_request_pkt)
241     txq.send(dhcp6_request_pkt)
242
243     ether = rxq.recv(2)
244
245     if ether is None:
246         raise RuntimeError('DHCPv6 REQUEST timeout')
247
248     if ether['IPv6'].src != proxy_ip:
249         raise RuntimeError("Source IP address error: {} != {}".format(
250             ether['IPv6'].src, proxy_ip))
251     print "Source IP address: OK."
252
253     if ether['IPv6'].dst != server_ip:
254         raise RuntimeError("Destination IP address error: {} != {}".format(
255             ether['IPv6'].dst, server_ip))
256     print "Destination IP address: OK."
257
258     msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
259         ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
260     if msgtype != 'RELAY-FORW':
261         raise RuntimeError("Message type error: {} != RELAY-FORW".format(
262             msgtype))
263     print "Message type: OK."
264
265     linkaddr = ether['IPv6']['UDP']\
266         ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
267     if linkaddr != proxy_ip:
268         raise RuntimeError("Proxy IP address error: {} != {}".format(
269            linkaddr, proxy_ip))
270     print "Proxy IP address: OK."
271
272
273 def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac,
274                  interface_id):
275     """Send and check DHCPv6 REPLY proxy packet.
276
277     :param rx_if: DHCPv6 server interface.
278     :param tx_if: Client interface.
279     :param link_local_ip: Client link-local address.
280     :param proxy_ip: IP address of DHCPv6 proxy server.
281     :param server_ip: IP address of DHCPv6 server.
282     :param server_mac: MAC address of DHCPv6 server.
283     :param interface_id: ID of proxy interface.
284     :type rx_if: str
285     :type tx_if: str
286     :type link_local_ip: str
287     :type proxy_ip: str
288     :type server_ip: str
289     :type server_mac: str
290     :type interface_id: str
291     """
292
293     rxq = RxQueue(rx_if)
294     txq = TxQueue(tx_if)
295
296     sent_packets = []
297
298     dhcp_reply_pkt = Ether(src=server_mac) / \
299                     IPv6(src=server_ip, dst=proxy_ip) / \
300                     UDP(sport=UDP_SERVICES.dhcpv6_server,
301                         dport=UDP_SERVICES.dhcpv6_client) / \
302                     DHCP6_RelayReply(peeraddr=link_local_ip,
303                                      linkaddr=proxy_ip) / \
304                     DHCP6OptIfaceId(ifaceid=interface_id) / \
305                     DHCP6OptRelayMsg() / \
306                     DHCP6_Reply()
307
308     sent_packets.append(dhcp_reply_pkt)
309     txq.send(dhcp_reply_pkt)
310
311     ether = rxq.recv(2)
312
313     if ether is None:
314         raise RuntimeError('DHCPv6 REPLY timeout')
315
316     if ether['IPv6'].src != proxy_ip:
317         raise RuntimeError("Source IP address error: {} != {}".format(
318             ether['IPv6'].src, proxy_ip))
319     print "Source IP address: OK."
320
321     if not _check_udp_checksum(ether['IPv6']):
322         raise RuntimeError("Checksum error!")
323     print "Checksum: OK."
324
325     msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
326                                   ['DHCPv6 Reply Message'].msgtype)
327     if msgtype != 'REPLY':
328         raise RuntimeError("Message type error: {} != REPLY".format(msgtype))
329     print "Message type: OK."
330
331
332 def main():
333     """Send DHCPv6 proxy messages."""
334
335     args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip', 'proxy_ip', 'proxy_mac',
336                              'server_ip', 'client_mac', 'server_mac',
337                              'proxy_to_server_mac'])
338
339     client_if = args.get_arg('tx_if')
340     server_if = args.get_arg('rx_if')
341     proxy_ip = args.get_arg('proxy_ip')
342     proxy_mac = args.get_arg('proxy_mac')
343     proxy_to_server_mac = args.get_arg('proxy_to_server_mac')
344     server_ip = args.get_arg('server_ip')
345     client_mac = args.get_arg('client_mac')
346     server_mac = args.get_arg('server_mac')
347
348     link_local_ip = "fe80::1"
349     dhcp_multicast_ip = "ff02::1:2"
350     client_duid = str(random.randint(0, 9999))
351
352     # SOLICIT
353     interface_id = dhcpv6_solicit(client_if, server_if, dhcp_multicast_ip,
354                                   link_local_ip, proxy_ip, server_ip,
355                                   server_mac, client_duid, client_mac)
356
357     # ADVERTISE
358     dhcpv6_advertise(client_if, server_if, link_local_ip, proxy_ip,
359                      server_ip, server_mac, proxy_to_server_mac, interface_id)
360
361     # REQUEST
362     dhcpv6_request(client_if, server_if, dhcp_multicast_ip, link_local_ip,
363                    proxy_ip, server_ip, client_duid, client_mac)
364
365     # REPLY
366     dhcpv6_reply(client_if, server_if, link_local_ip, proxy_ip, server_ip,
367                  server_mac, interface_id)
368
369     sys.exit(0)
370
371 if __name__ == "__main__":
372     main()