2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends a DHCP ACK message when DHCP REQUEST message is
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
22 from scapy.layers.dhcp import BOOTP, DHCP
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
29 """If DHCP message type option is set to dhcp discover return True,
30 else return False. False is returned also if exception occurs."""
33 dhcp_options = pkt['BOOTP']['DHCP options'].options
34 message_type = filter(lambda x: x[0] == 'message-type',
36 message_type = message_type[0][1]
37 return message_type == dhcp_discover
43 """If DHCP message type option is DHCP REQUEST return True,
44 else return False. False is returned also if exception occurs."""
47 dhcp_options = pkt['BOOTP']['DHCP options'].options
48 message_type = filter(lambda x: x[0] == 'message-type',
50 message_type = message_type[0][1]
51 return message_type == dhcp_request
57 """Main function of the script file."""
58 args = TrafficScriptArg(['server_mac', 'server_ip', 'client_ip',
59 'client_mask', 'lease_time'])
61 server_if = args.get_arg('rx_if')
62 server_mac = args.get_arg('server_mac')
63 server_ip = args.get_arg('server_ip')
65 client_ip = args.get_arg('client_ip')
66 client_mask = args.get_arg('client_mask')
68 lease_time = int(args.get_arg('lease_time'))
70 rxq = RxQueue(server_if)
71 txq = TxQueue(server_if)
75 dhcp_discover = rxq.recv(10)
76 if is_discover(dhcp_discover):
79 raise RuntimeError("DHCP DISCOVER Rx error.")
81 dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src)
82 dhcp_offer /= IP(src=server_ip, dst="255.255.255.255")
83 dhcp_offer /= UDP(sport=67, dport=68)
84 dhcp_offer /= BOOTP(op=2,
85 xid=dhcp_discover['BOOTP'].xid,
88 chaddr=dhcp_discover['BOOTP'].chaddr)
89 dhcp_offer_options = [("message-type", "offer"), # Option 53
90 ("subnet_mask", client_mask), # Option 1
91 ("server_id", server_ip), # Option 54, dhcp server
92 ("lease_time", lease_time), # Option 51
94 dhcp_offer /= DHCP(options=dhcp_offer_options)
97 sent_packets.append(dhcp_offer)
100 for _ in range(0, max_other_pkts):
101 dhcp_request = rxq.recv(5, sent_packets)
103 raise RuntimeError("DHCP REQUEST Rx timeout.")
104 if is_request(dhcp_request):
107 raise RuntimeError("Max RX packet limit reached.")
110 dhcp_ack = Ether(src=server_mac, dst=dhcp_request.src)
111 dhcp_ack /= IP(src=server_ip, dst="255.255.255.255")
112 dhcp_ack /= UDP(sport=67, dport=68)
113 dhcp_ack /= BOOTP(op=2,
114 xid=dhcp_request['BOOTP'].xid,
117 flags=dhcp_request['BOOTP'].flags,
118 chaddr=dhcp_request['BOOTP'].chaddr)
119 dhcp_ack_options = [("message-type", "ack"), # Option 53. 5: ACK, 6: NAK
120 ("subnet_mask", client_mask), # Option 1
121 ("server_id", server_ip), # Option 54, dhcp server
122 ("lease_time", lease_time), # Option 51,
124 dhcp_ack /= DHCP(options=dhcp_ack_options)
127 sent_packets.append(dhcp_ack)
131 if __name__ == "__main__":