Make all perf tests CRITICAL
[csit.git] / resources / traffic_scripts / dhcp / send_dhcp_messages.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends DHCP packets."""
16
17 import sys
18
19 from scapy.all import Ether
20 from scapy.layers.inet import IP, UDP
21 from scapy.layers.inet import UDP_SERVICES
22 from scapy.layers.dhcp import DHCP, BOOTP
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def dhcp_discover(args):
29     """Send DHCP DISCOVER packet."""
30
31     tx_if = args.get_arg('tx_if')
32     rx_if = args.get_arg('rx_if')
33
34     rxq = RxQueue(rx_if)
35     txq = TxQueue(tx_if)
36
37     tx_src_ip = "0.0.0.0"
38     tx_dst_ip = "255.255.255.255"
39
40     server_ip = args.get_arg('server_ip')
41     proxy_ip = args.get_arg('proxy_ip')
42     client_mac = args.get_arg('client_mac')
43
44     sent_packets = []
45
46     dhcp_discover = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \
47                     IP(src=tx_src_ip, dst=tx_dst_ip) / \
48                     UDP(sport=68, dport=67) / \
49                     BOOTP(op=1,) / \
50                     DHCP(options=[("message-type", "discover"),
51                                   "end"])
52
53     sent_packets.append(dhcp_discover)
54     txq.send(dhcp_discover)
55
56     ether = rxq.recv(2)
57
58     if ether is None:
59         raise RuntimeError('DHCP DISCOVER timeout')
60
61     if ether[IP].src != proxy_ip:
62         raise RuntimeError("Source IP address error.")
63     print "Source IP address: OK."
64
65     if ether[IP].dst != server_ip:
66         raise RuntimeError("Destination IP address error.")
67     print "Destination IP address: OK."
68
69     if ether[UDP].dport != UDP_SERVICES.bootps:
70         raise RuntimeError("UDP destination port error.")
71     print "UDP destination port: OK."
72
73     if ether[UDP].sport != UDP_SERVICES.bootpc:
74         raise RuntimeError("UDP source port error.")
75     print "UDP source port: OK."
76
77     if ether[DHCP].options[1][0] != 'relay_agent_Information':  # option 82
78         raise RuntimeError("Relay agent information error.")
79     option_82 = ether[DHCP].options[1][1]
80
81     if ether[DHCP].options[0][1] != 1:  # 1 - DISCOVER message
82         raise RuntimeError("DHCP DISCOVER message error.")
83     print "DHCP DISCOVER message OK."
84     dhcp_offer(args, option_82)
85
86
87 def dhcp_offer(args, option_82):
88     """Send DHCP OFFER packet."""
89
90     rx_if = args.get_arg('tx_if')
91     tx_if = args.get_arg('rx_if')
92
93     rxq = RxQueue(rx_if)
94     txq = TxQueue(tx_if)
95
96     tx_dst_ip = "255.255.255.255"
97     server_ip = args.get_arg('server_ip')
98     server_mac = args.get_arg('server_mac')
99     client_ip = args.get_arg('client_ip')
100     proxy_ip = args.get_arg('proxy_ip')
101
102     sent_packets = []
103
104     dhcp_offer = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \
105                  IP(src=server_ip, dst=tx_dst_ip) / \
106                  UDP(sport=67, dport=68) / \
107                  BOOTP(op=2,
108                        yiaddr=client_ip,
109                        siaddr=server_ip) / \
110                  DHCP(options=
111                       [("message-type", "offer"),
112                        ("server_id", server_ip),
113                        ("relay_agent_Information", option_82),
114                        "end"])
115
116     txq.send(dhcp_offer)
117     sent_packets.append(dhcp_offer)
118
119     ether = rxq.recv(2)
120
121     if ether is None:
122         raise RuntimeError('DHCP OFFER timeout')
123
124     if ether[IP].dst != tx_dst_ip:
125         raise RuntimeError("Destination IP address error.")
126     print "Destination IP address: OK."
127
128     if ether[IP].src != proxy_ip:
129         raise RuntimeError("Source IP address error.")
130     print "Source IP address: OK."
131
132     if ether[UDP].dport != UDP_SERVICES.bootpc:
133         raise RuntimeError("UDP destination port error.")
134     print "UDP destination port: OK."
135
136     if ether[UDP].sport != UDP_SERVICES.bootps:
137         raise RuntimeError("UDP source port error.")
138     print "UDP source port: OK."
139
140     if ether[BOOTP].yiaddr != client_ip:
141         raise RuntimeError("Client IP address error.")
142     print "Client IP address: OK."
143
144     if ether[BOOTP].siaddr != server_ip:
145         raise RuntimeError("DHCP server IP address error.")
146     print "DHCP server IP address: OK."
147
148     if ether[DHCP].options[0][1] != 2:  # 2 - OFFER message
149         raise RuntimeError("DHCP OFFER message error.")
150     print "DHCP OFFER message OK."
151     dhcp_request(args)
152
153
154 def dhcp_request(args):
155     """Send DHCP REQUEST packet."""
156
157     tx_if = args.get_arg('tx_if')
158     rx_if = args.get_arg('rx_if')
159
160     rxq = RxQueue(rx_if)
161     txq = TxQueue(tx_if)
162
163     tx_dst_ip = "255.255.255.255"
164     server_ip = args.get_arg('server_ip')
165     client_ip = args.get_arg('client_ip')
166     client_mac = args.get_arg('client_mac')
167     proxy_ip = args.get_arg('proxy_ip')
168
169     sent_packets = []
170
171     dhcp_request = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \
172                    IP(src="0.0.0.0", dst=tx_dst_ip) / \
173                    UDP(sport=68, dport=67) / \
174                    BOOTP(op=1,
175                          giaddr=proxy_ip,
176                          siaddr=server_ip) / \
177                    DHCP(options=[("message-type", "request"),
178                                  ("server_id", server_ip),
179                                  ("requested_addr", client_ip),
180                                  "end"])
181
182     sent_packets.append(dhcp_request)
183     txq.send(dhcp_request)
184
185     ether = rxq.recv(2)
186
187     if ether is None:
188         raise RuntimeError('DHCP REQUEST timeout')
189
190     if ether[IP].dst != server_ip:
191         raise RuntimeError("Destination IP address error.")
192     print "Destination IP address: OK."
193
194     if ether[IP].src != proxy_ip:
195         raise RuntimeError("Source IP address error.")
196     print "Source IP address: OK."
197
198     if ether[UDP].dport != UDP_SERVICES.bootps:
199         raise RuntimeError("UDP destination port error.")
200     print "UDP destination port: OK."
201
202     if ether[UDP].sport != UDP_SERVICES.bootpc:
203         raise RuntimeError("UDP source port error.")
204     print "UDP source port: OK."
205
206     if ether[BOOTP].siaddr != server_ip:
207         raise RuntimeError("DHCP server IP address error.")
208     print "DHCP server IP address: OK."
209
210     if ether[DHCP].options[2][1] != client_ip:
211         raise RuntimeError("Requested IP address error.")
212     print "Requested IP address: OK."
213
214     if ether[DHCP].options[3][0] != 'relay_agent_Information':  # option 82
215         raise RuntimeError("Relay agent information error.")
216     option_82 = ether[DHCP].options[3][1]
217
218     if ether[DHCP].options[0][1] != 3:  # 2 - REQUEST message
219         raise RuntimeError("DHCP REQUEST message error.")
220     print "DHCP REQUEST message: OK."
221     dhcp_ack(args, option_82)
222
223
224 def dhcp_ack(args, option_82):
225     """Send DHCP ACK packet."""
226
227     rx_if = args.get_arg('tx_if')
228     tx_if = args.get_arg('rx_if')
229
230     rxq = RxQueue(rx_if)
231     txq = TxQueue(tx_if)
232
233     tx_dst_ip = "255.255.255.255"
234     server_ip = args.get_arg('server_ip')
235     server_mac = args.get_arg('server_mac')
236     client_ip = args.get_arg('client_ip')
237     proxy_ip = args.get_arg('proxy_ip')
238     lease_time = 43200  # 12 hours
239
240     sent_packets = []
241
242     dhcp_ack = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \
243                IP(src=server_ip, dst=tx_dst_ip) / \
244                UDP(sport=67, dport=68) / \
245                BOOTP(op=2,
246                      yiaddr=client_ip,
247                      siaddr=server_ip) / \
248                DHCP(options=
249                     [("message-type", "ack"),
250                      ("server_id", server_ip),
251                      ("lease_time", lease_time),
252                      ("relay_agent_Information", option_82),
253                      "end"])
254
255     txq.send(dhcp_ack)
256     sent_packets.append(dhcp_ack)
257
258     ether = rxq.recv(2)
259
260     if ether is None:
261         raise RuntimeError('DHCP ACK timeout')
262
263     if ether[IP].dst != tx_dst_ip:
264         raise RuntimeError("Destination IP address error.")
265     print "Destination IP address: OK."
266
267     if ether[IP].src != proxy_ip:
268         raise RuntimeError("Source IP address error.")
269     print "Source IP address: OK."
270
271     if ether[UDP].dport != UDP_SERVICES.bootpc:
272         raise RuntimeError("UDP destination port error.")
273     print "UDP destination port: OK."
274
275     if ether[UDP].sport != UDP_SERVICES.bootps:
276         raise RuntimeError("UDP source port error.")
277     print "UDP source port: OK."
278
279     if ether[BOOTP].yiaddr != client_ip:
280         raise RuntimeError("Client IP address error.")
281     print "Client IP address: OK."
282
283     if ether[BOOTP].siaddr != server_ip:
284         raise RuntimeError("DHCP server IP address error.")
285     print "DHCP server IP address: OK."
286
287     if ether[DHCP].options[2][1] != lease_time:
288         raise RuntimeError("DHCP lease time error.")
289     print "DHCP lease time OK."
290
291     if ether[DHCP].options[0][1] != 5:  # 5 - ACK message
292         raise RuntimeError("DHCP ACK message error.")
293     print "DHCP ACK message OK."
294
295
296 def main():
297     """Send DHCP messages."""
298
299     args = TrafficScriptArg(['server_ip', 'server_mac', 'client_ip',
300                              'client_mac', 'proxy_ip'])
301
302     dhcp_discover(args)
303
304     sys.exit(0)
305
306 if __name__ == "__main__":
307     main()