2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an DHCP OFFER message and checks if the DHCP
16 REQUEST contains all required fields."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP, UDP_SERVICES
22 from scapy.layers.dhcp import BOOTP, DHCP
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
29 """If DHCP message type option is set to dhcp discover return True,
30 else return False. False is returned also if exception occurs."""
33 dhcp_options = pkt['BOOTP']['DHCP options'].options
34 message_type = filter(lambda x: x[0] == 'message-type',
36 message_type = message_type[0][1]
37 return message_type == dhcp_discover
43 """If DHCP message type option is DHCP REQUEST return True,
44 else return False. False is returned also if exception occurs."""
47 dhcp_options = pkt['BOOTP']['DHCP options'].options
48 message_type = filter(lambda x: x[0] == 'message-type',
50 message_type = message_type[0][1]
51 return message_type == dhcp_request
57 """Main function of the script file."""
58 args = TrafficScriptArg(['client_mac', 'server_mac', 'server_ip',
59 'client_ip', 'client_mask'],
60 ['hostname', 'offer_xid'])
62 server_if = args.get_arg('rx_if')
63 server_mac = args.get_arg('server_mac')
64 server_ip = args.get_arg('server_ip')
66 client_mac = args.get_arg('client_mac')
67 client_ip = args.get_arg('client_ip')
68 client_mask = args.get_arg('client_mask')
70 hostname = args.get_arg('hostname')
71 offer_xid = args.get_arg('offer_xid')
74 rx_dst_ip = '255.255.255.255'
76 rxq = RxQueue(server_if)
77 txq = TxQueue(server_if)
81 dhcp_discover = rxq.recv(10)
82 if is_discover(dhcp_discover):
85 raise RuntimeError("DHCP DISCOVER Rx error.")
87 dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src)
88 dhcp_offer /= IP(src=server_ip, dst="255.255.255.255")
89 dhcp_offer /= UDP(sport=67, dport=68)
90 dhcp_offer /= BOOTP(op=2,
91 # if offer_xid differs from xid value in DHCP DISCOVER
92 # the DHCP OFFER has to be discarded
93 xid=int(offer_xid) if offer_xid
94 else dhcp_discover['BOOTP'].xid,
97 chaddr=dhcp_discover['BOOTP'].chaddr)
98 dhcp_offer_options = [("message-type", "offer"), # Option 53
99 ("subnet_mask", client_mask), # Option 1
100 ("server_id", server_ip), # Option 54, dhcp server
101 ("lease_time", 43200), # Option 51
103 dhcp_offer /= DHCP(options=dhcp_offer_options)
106 sent_packets.append(dhcp_offer)
109 for _ in range(0, max_other_pkts):
110 dhcp_request = rxq.recv(5, sent_packets)
112 raise RuntimeError("DHCP REQUEST Rx timeout.")
113 if is_request(dhcp_request):
116 raise RuntimeError("Max RX packet limit reached.")
119 # if offer_xid differs from xid value in DHCP DISCOVER the DHCP OFFER
120 # has to be discarded
121 raise RuntimeError("DHCP REQUEST received. DHCP OFFER with wrong XID "
122 "has not been discarded.")
124 # CHECK ETHER, IP, UDP
125 if dhcp_request.dst != dhcp_discover.dst:
126 raise RuntimeError("Destination MAC error.")
127 print "Destination MAC: OK."
129 if dhcp_request.src != dhcp_discover.src:
130 raise RuntimeError("Source MAC error.")
131 print "Source MAC: OK."
133 if dhcp_request['IP'].dst != rx_dst_ip:
134 raise RuntimeError("Destination IP error.")
135 print "Destination IP: OK."
137 if dhcp_request['IP'].src != rx_src_ip:
138 raise RuntimeError("Source IP error.")
139 print "Source IP: OK."
141 if dhcp_request['IP']['UDP'].dport != UDP_SERVICES.bootps:
142 raise RuntimeError("BOOTPs error.")
145 if dhcp_request['IP']['UDP'].sport != UDP_SERVICES.bootpc:
146 raise RuntimeError("BOOTPc error.")
150 if dhcp_request['BOOTP'].op != dhcp_discover['BOOTP'].op:
151 raise RuntimeError("BOOTP operation error.")
152 print "BOOTP operation: OK"
154 if dhcp_request['BOOTP'].xid != dhcp_discover['BOOTP'].xid:
155 raise RuntimeError("BOOTP XID error.")
156 print "BOOTP XID: OK"
158 if dhcp_request['BOOTP'].ciaddr != '0.0.0.0':
159 raise RuntimeError("BOOTP ciaddr error.")
160 print "BOOTP ciaddr: OK"
162 ca = dhcp_request['BOOTP'].chaddr[:dhcp_request['BOOTP'].hlen].encode('hex')
163 if ca != client_mac.replace(':', ''):
164 raise RuntimeError("BOOTP client hardware address error.")
165 print "BOOTP client hardware address: OK"
167 if dhcp_request['BOOTP'].options != dhcp_discover['BOOTP'].options:
168 raise RuntimeError("DHCP options error.")
169 print "DHCP options: OK"
172 dhcp_options = dhcp_request['DHCP options'].options
174 hn = filter(lambda x: x[0] == 'hostname', dhcp_options)
177 if hn[0][1] != hostname:
178 raise RuntimeError("Client's hostname doesn't match.")
180 raise RuntimeError("Option list doesn't contain hostname option.")
183 raise RuntimeError("Option list contains hostname option.")
184 print "Option 12 hostname: OK"
187 ra = filter(lambda x: x[0] == 'requested_addr', dhcp_options)[0][1]
189 raise RuntimeError("Option 50 requested_addr error.")
190 print "Option 50 requested_addr: OK"
193 mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1]
194 if mt != 3: # request
195 raise RuntimeError("Option 53 message-type error.")
196 print "Option 53 message-type: OK"
199 sid = filter(lambda x: x[0] == 'server_id', dhcp_options)[0][1]
201 raise RuntimeError("Option 54 server_id error.")
202 print "Option 54 server_id: OK"
205 prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1]
206 if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*':
207 raise RuntimeError("Option 55 param_req_list error.")
208 print "Option 55 param_req_list: OK"
211 if 'end' not in dhcp_options:
212 raise RuntimeError("end option error.")
213 print "end option: OK"
217 if __name__ == "__main__":