Python3: PIP requirement
[csit.git] / resources / traffic_scripts / dhcp / send_and_check_proxy_messages.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends DHCP packets."""
16
17 import sys
18
19 from scapy.all import Ether
20 from scapy.layers.inet import IP, UDP
21 from scapy.layers.inet import UDP_SERVICES
22 from scapy.layers.dhcp import DHCP, BOOTP
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def dhcp_discover(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip,
29                   client_mac):
30     """Send and check DHCP DISCOVER proxy packet."""
31
32     rxq = RxQueue(rx_if)
33     txq = TxQueue(tx_if)
34
35     sent_packets = []
36
37     dhcp_discover = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \
38                     IP(src=tx_src_ip, dst=tx_dst_ip) / \
39                     UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \
40                     BOOTP(op=1,) / \
41                     DHCP(options=[("message-type", "discover"),
42                                   "end"])
43
44     sent_packets.append(dhcp_discover)
45     txq.send(dhcp_discover)
46
47     ether = rxq.recv(2)
48
49     if ether is None:
50         raise RuntimeError('DHCP DISCOVER timeout')
51
52     if ether[IP].src != proxy_ip:
53         raise RuntimeError("Source IP address error.")
54     print "Source IP address: OK."
55
56     if ether[IP].dst != server_ip:
57         raise RuntimeError("Destination IP address error.")
58     print "Destination IP address: OK."
59
60     if ether[UDP].dport != UDP_SERVICES.bootps:
61         raise RuntimeError("UDP destination port error.")
62     print "UDP destination port: OK."
63
64     if ether[UDP].sport != UDP_SERVICES.bootpc:
65         raise RuntimeError("UDP source port error.")
66     print "UDP source port: OK."
67
68     if ether[DHCP].options[1][0] != 'relay_agent_Information':  # option 82
69         raise RuntimeError("Relay agent information error.")
70     option_82 = ether[DHCP].options[1][1]
71
72     if ether[DHCP].options[0][1] != 1:  # 1 - DISCOVER message
73         raise RuntimeError("DHCP DISCOVER message error.")
74     print "DHCP DISCOVER message OK."
75
76     return option_82
77
78
79 def dhcp_offer(rx_if, tx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
80                server_mac, option_82):
81     """Send and check DHCP OFFER proxy packet."""
82
83     rxq = RxQueue(rx_if)
84     txq = TxQueue(tx_if)
85
86     sent_packets = []
87
88     dhcp_offer = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \
89                  IP(src=server_ip, dst=tx_dst_ip) / \
90                  UDP(sport=UDP_SERVICES.bootps, dport=UDP_SERVICES.bootpc) / \
91                  BOOTP(op=2,
92                        yiaddr=client_ip,
93                        siaddr=server_ip) / \
94                  DHCP(options=
95                       [("message-type", "offer"),
96                        ("server_id", server_ip),
97                        ("relay_agent_Information", option_82),
98                        "end"])
99
100     txq.send(dhcp_offer)
101     sent_packets.append(dhcp_offer)
102
103     ether = rxq.recv(2)
104
105     if ether is None:
106         raise RuntimeError('DHCP OFFER timeout')
107
108     if ether[IP].dst != tx_dst_ip:
109         raise RuntimeError("Destination IP address error.")
110     print "Destination IP address: OK."
111
112     if ether[IP].src != proxy_ip:
113         raise RuntimeError("Source IP address error.")
114     print "Source IP address: OK."
115
116     if ether[UDP].dport != UDP_SERVICES.bootpc:
117         raise RuntimeError("UDP destination port error.")
118     print "UDP destination port: OK."
119
120     if ether[UDP].sport != UDP_SERVICES.bootps:
121         raise RuntimeError("UDP source port error.")
122     print "UDP source port: OK."
123
124     if ether[BOOTP].yiaddr != client_ip:
125         raise RuntimeError("Client IP address error.")
126     print "Client IP address: OK."
127
128     if ether[BOOTP].siaddr != server_ip:
129         raise RuntimeError("DHCP server IP address error.")
130     print "DHCP server IP address: OK."
131
132     if ether[DHCP].options[0][1] != 2:  # 2 - OFFER message
133         raise RuntimeError("DHCP OFFER message error.")
134     print "DHCP OFFER message OK."
135
136
137 def dhcp_request(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip,
138                  client_ip, client_mac):
139     """Send and check DHCP REQUEST proxy packet."""
140
141     rxq = RxQueue(rx_if)
142     txq = TxQueue(tx_if)
143
144     sent_packets = []
145
146     dhcp_request = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \
147                    IP(src=tx_src_ip, dst=tx_dst_ip) / \
148                    UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \
149                    BOOTP(op=1,
150                          giaddr=proxy_ip,
151                          siaddr=server_ip) / \
152                    DHCP(options=[("message-type", "request"),
153                                  ("server_id", server_ip),
154                                  ("requested_addr", client_ip),
155                                  "end"])
156
157     sent_packets.append(dhcp_request)
158     txq.send(dhcp_request)
159
160     ether = rxq.recv(2)
161
162     if ether is None:
163         raise RuntimeError('DHCP REQUEST timeout')
164
165     if ether[IP].dst != server_ip:
166         raise RuntimeError("Destination IP address error.")
167     print "Destination IP address: OK."
168
169     if ether[IP].src != proxy_ip:
170         raise RuntimeError("Source IP address error.")
171     print "Source IP address: OK."
172
173     if ether[UDP].dport != UDP_SERVICES.bootps:
174         raise RuntimeError("UDP destination port error.")
175     print "UDP destination port: OK."
176
177     if ether[UDP].sport != UDP_SERVICES.bootpc:
178         raise RuntimeError("UDP source port error.")
179     print "UDP source port: OK."
180
181     if ether[BOOTP].siaddr != server_ip:
182         raise RuntimeError("DHCP server IP address error.")
183     print "DHCP server IP address: OK."
184
185     if ether[DHCP].options[2][1] != client_ip:
186         raise RuntimeError("Requested IP address error.")
187     print "Requested IP address: OK."
188
189     if ether[DHCP].options[3][0] != 'relay_agent_Information':  # option 82
190         raise RuntimeError("Relay agent information error.")
191
192     if ether[DHCP].options[0][1] != 3:  # 2 - REQUEST message
193         raise RuntimeError("DHCP REQUEST message error.")
194     print "DHCP REQUEST message: OK."
195
196
197 def dhcp_ack(rx_if, tx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
198              server_mac, option_82):
199     """Send and check DHCP ACK proxy packet."""
200
201     rxq = RxQueue(rx_if)
202     txq = TxQueue(tx_if)
203
204     lease_time = 43200  # 12 hours
205
206     sent_packets = []
207
208     dhcp_ack = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \
209                IP(src=server_ip, dst=tx_dst_ip) / \
210                UDP(sport=UDP_SERVICES.bootps, dport=UDP_SERVICES.bootpc) / \
211                BOOTP(op=2,
212                      yiaddr=client_ip,
213                      siaddr=server_ip) / \
214                DHCP(options=
215                     [("message-type", "ack"),
216                      ("server_id", server_ip),
217                      ("lease_time", lease_time),
218                      ("relay_agent_Information", option_82),
219                      "end"])
220
221     txq.send(dhcp_ack)
222     sent_packets.append(dhcp_ack)
223
224     ether = rxq.recv(2)
225
226     if ether is None:
227         raise RuntimeError('DHCP ACK timeout')
228
229     if ether[IP].dst != tx_dst_ip:
230         raise RuntimeError("Destination IP address error.")
231     print "Destination IP address: OK."
232
233     if ether[IP].src != proxy_ip:
234         raise RuntimeError("Source IP address error.")
235     print "Source IP address: OK."
236
237     if ether[UDP].dport != UDP_SERVICES.bootpc:
238         raise RuntimeError("UDP destination port error.")
239     print "UDP destination port: OK."
240
241     if ether[UDP].sport != UDP_SERVICES.bootps:
242         raise RuntimeError("UDP source port error.")
243     print "UDP source port: OK."
244
245     if ether[BOOTP].yiaddr != client_ip:
246         raise RuntimeError("Client IP address error.")
247     print "Client IP address: OK."
248
249     if ether[BOOTP].siaddr != server_ip:
250         raise RuntimeError("DHCP server IP address error.")
251     print "DHCP server IP address: OK."
252
253     if ether[DHCP].options[2][1] != lease_time:
254         raise RuntimeError("DHCP lease time error.")
255     print "DHCP lease time OK."
256
257     if ether[DHCP].options[0][1] != 5:  # 5 - ACK message
258         raise RuntimeError("DHCP ACK message error.")
259     print "DHCP ACK message OK."
260
261
262 def main():
263     """Send DHCP proxy messages."""
264
265     args = TrafficScriptArg(['server_ip', 'server_mac', 'client_ip',
266                              'client_mac', 'proxy_ip'])
267
268     tx_if = args.get_arg('tx_if')
269     rx_if = args.get_arg('rx_if')
270
271     tx_src_ip = "0.0.0.0"
272     tx_dst_ip = "255.255.255.255"
273
274     server_ip = args.get_arg('server_ip')
275     client_ip = args.get_arg('client_ip')
276     proxy_ip = args.get_arg('proxy_ip')
277     client_mac = args.get_arg('client_mac')
278     server_mac = args.get_arg('server_mac')
279
280     # DHCP DISCOVER
281     option_82 = dhcp_discover(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip,
282                               proxy_ip, client_mac)
283
284     # DHCP OFFER
285     dhcp_offer(tx_if, rx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
286                server_mac, option_82)
287
288     # DHCP REQUEST
289     dhcp_request(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip,
290                  client_ip, client_mac)
291
292     # DHCP ACK
293     dhcp_ack(tx_if, rx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
294              server_mac, option_82)
295
296     sys.exit(0)
297
298 if __name__ == "__main__":
299     main()