Python3: PIP requirement
[csit.git] / resources / traffic_scripts / dhcp / check_dhcp_request.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an DHCP OFFER message and checks if the DHCP
16 REQUEST contains all required fields."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP, UDP_SERVICES
22 from scapy.layers.dhcp import BOOTP, DHCP
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def is_discover(pkt):
29     """If DHCP message type option is set to dhcp discover return True,
30     else return False. False is returned also if exception occurs."""
31     dhcp_discover = 1
32     try:
33         dhcp_options = pkt['BOOTP']['DHCP options'].options
34         message_type = filter(lambda x: x[0] == 'message-type',
35                               dhcp_options)
36         message_type = message_type[0][1]
37         return message_type == dhcp_discover
38     except:
39         return False
40
41
42 def is_request(pkt):
43     """If DHCP message type option is DHCP REQUEST return True,
44     else return False. False is returned also if exception occurs."""
45     dhcp_request = 3
46     try:
47         dhcp_options = pkt['BOOTP']['DHCP options'].options
48         message_type = filter(lambda x: x[0] == 'message-type',
49                               dhcp_options)
50         message_type = message_type[0][1]
51         return message_type == dhcp_request
52     except:
53         return False
54
55
56 def main():
57     """Main function of the script file."""
58     args = TrafficScriptArg(['client_mac', 'server_mac', 'server_ip',
59                              'client_ip', 'client_mask'],
60                             ['hostname', 'offer_xid'])
61
62     server_if = args.get_arg('rx_if')
63     server_mac = args.get_arg('server_mac')
64     server_ip = args.get_arg('server_ip')
65
66     client_mac = args.get_arg('client_mac')
67     client_ip = args.get_arg('client_ip')
68     client_mask = args.get_arg('client_mask')
69
70     hostname = args.get_arg('hostname')
71     offer_xid = args.get_arg('offer_xid')
72
73     rx_src_ip = '0.0.0.0'
74     rx_dst_ip = '255.255.255.255'
75
76     rxq = RxQueue(server_if)
77     txq = TxQueue(server_if)
78     sent_packets = []
79
80     for _ in range(10):
81         dhcp_discover = rxq.recv(10)
82         if is_discover(dhcp_discover):
83             break
84     else:
85         raise RuntimeError("DHCP DISCOVER Rx error.")
86
87     dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src)
88     dhcp_offer /= IP(src=server_ip, dst="255.255.255.255")
89     dhcp_offer /= UDP(sport=67, dport=68)
90     dhcp_offer /= BOOTP(op=2,
91                         # if offer_xid differs from xid value in DHCP DISCOVER
92                         # the DHCP OFFER has to be discarded
93                         xid=int(offer_xid) if offer_xid
94                         else dhcp_discover['BOOTP'].xid,
95                         yiaddr=client_ip,
96                         siaddr=server_ip,
97                         chaddr=dhcp_discover['BOOTP'].chaddr)
98     dhcp_offer_options = [("message-type", "offer"),  # Option 53
99                           ("subnet_mask", client_mask),  # Option 1
100                           ("server_id", server_ip),  # Option 54, dhcp server
101                           ("lease_time", 43200),  # Option 51
102                           "end"]
103     dhcp_offer /= DHCP(options=dhcp_offer_options)
104
105     txq.send(dhcp_offer)
106     sent_packets.append(dhcp_offer)
107
108     max_other_pkts = 10
109     for _ in range(0, max_other_pkts):
110         dhcp_request = rxq.recv(5, sent_packets)
111         if not dhcp_request:
112             raise RuntimeError("DHCP REQUEST Rx timeout.")
113         if is_request(dhcp_request):
114             break
115     else:
116         raise RuntimeError("Max RX packet limit reached.")
117
118     if offer_xid:
119         # if offer_xid differs from xid value in DHCP DISCOVER the DHCP OFFER
120         # has to be discarded
121         raise RuntimeError("DHCP REQUEST received. DHCP OFFER with wrong XID "
122                            "has not been discarded.")
123
124     # CHECK ETHER, IP, UDP
125     if dhcp_request.dst != dhcp_discover.dst:
126         raise RuntimeError("Destination MAC error.")
127     print "Destination MAC: OK."
128
129     if dhcp_request.src != dhcp_discover.src:
130         raise RuntimeError("Source MAC error.")
131     print "Source MAC: OK."
132
133     if dhcp_request['IP'].dst != rx_dst_ip:
134         raise RuntimeError("Destination IP error.")
135     print "Destination IP: OK."
136
137     if dhcp_request['IP'].src != rx_src_ip:
138         raise RuntimeError("Source IP error.")
139     print "Source IP: OK."
140
141     if dhcp_request['IP']['UDP'].dport != UDP_SERVICES.bootps:
142         raise RuntimeError("BOOTPs error.")
143     print "BOOTPs: OK."
144
145     if dhcp_request['IP']['UDP'].sport != UDP_SERVICES.bootpc:
146         raise RuntimeError("BOOTPc error.")
147     print "BOOTPc: OK."
148
149     # CHECK BOOTP
150     if dhcp_request['BOOTP'].op != dhcp_discover['BOOTP'].op:
151         raise RuntimeError("BOOTP operation error.")
152     print "BOOTP operation: OK"
153
154     if dhcp_request['BOOTP'].xid != dhcp_discover['BOOTP'].xid:
155         raise RuntimeError("BOOTP XID error.")
156     print "BOOTP XID: OK"
157
158     if dhcp_request['BOOTP'].ciaddr != '0.0.0.0':
159         raise RuntimeError("BOOTP ciaddr error.")
160     print "BOOTP ciaddr: OK"
161
162     ca = dhcp_request['BOOTP'].chaddr[:dhcp_request['BOOTP'].hlen].encode('hex')
163     if ca != client_mac.replace(':', ''):
164         raise RuntimeError("BOOTP client hardware address error.")
165     print "BOOTP client hardware address: OK"
166
167     if dhcp_request['BOOTP'].options != dhcp_discover['BOOTP'].options:
168         raise RuntimeError("DHCP options error.")
169     print "DHCP options: OK"
170
171     # CHECK DHCP OPTIONS
172     dhcp_options = dhcp_request['DHCP options'].options
173
174     hn = filter(lambda x: x[0] == 'hostname', dhcp_options)
175     if hostname:
176         try:
177             if hn[0][1] != hostname:
178                 raise RuntimeError("Client's hostname doesn't match.")
179         except IndexError:
180             raise RuntimeError("Option list doesn't contain hostname option.")
181     else:
182         if len(hn) != 0:
183             raise RuntimeError("Option list contains hostname option.")
184     print "Option 12 hostname: OK"
185
186     # Option 50
187     ra = filter(lambda x: x[0] == 'requested_addr', dhcp_options)[0][1]
188     if ra != client_ip:
189         raise RuntimeError("Option 50 requested_addr error.")
190     print "Option 50 requested_addr: OK"
191
192     # Option 53
193     mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1]
194     if mt != 3:  # request
195         raise RuntimeError("Option 53 message-type error.")
196     print "Option 53 message-type: OK"
197
198     # Option 54
199     sid = filter(lambda x: x[0] == 'server_id', dhcp_options)[0][1]
200     if sid != server_ip:
201         raise RuntimeError("Option 54 server_id error.")
202     print "Option 54 server_id: OK"
203
204     # Option 55
205     prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1]
206     if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*':
207         raise RuntimeError("Option 55 param_req_list error.")
208     print "Option 55 param_req_list: OK"
209
210     # Option 255
211     if 'end' not in dhcp_options:
212         raise RuntimeError("end option error.")
213     print "end option: OK"
214
215     sys.exit(0)
216
217 if __name__ == "__main__":
218     main()