Python3: PIP requirement
[csit.git] / resources / traffic_scripts / dhcp / check_dhcp_request_ack.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends a DHCP ACK message when DHCP REQUEST message is
16 received."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
22 from scapy.layers.dhcp import BOOTP, DHCP
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def is_discover(pkt):
29     """If DHCP message type option is set to dhcp discover return True,
30     else return False. False is returned also if exception occurs."""
31     dhcp_discover = 1
32     try:
33         dhcp_options = pkt['BOOTP']['DHCP options'].options
34         message_type = filter(lambda x: x[0] == 'message-type',
35                               dhcp_options)
36         message_type = message_type[0][1]
37         return message_type == dhcp_discover
38     except:
39         return False
40
41
42 def is_request(pkt):
43     """If DHCP message type option is DHCP REQUEST return True,
44     else return False. False is returned also if exception occurs."""
45     dhcp_request = 3
46     try:
47         dhcp_options = pkt['BOOTP']['DHCP options'].options
48         message_type = filter(lambda x: x[0] == 'message-type',
49                               dhcp_options)
50         message_type = message_type[0][1]
51         return message_type == dhcp_request
52     except:
53         return False
54
55
56 def main():
57     """Main function of the script file."""
58     args = TrafficScriptArg(['server_mac', 'server_ip', 'client_ip',
59                              'client_mask', 'lease_time'])
60
61     server_if = args.get_arg('rx_if')
62     server_mac = args.get_arg('server_mac')
63     server_ip = args.get_arg('server_ip')
64
65     client_ip = args.get_arg('client_ip')
66     client_mask = args.get_arg('client_mask')
67
68     lease_time = int(args.get_arg('lease_time'))
69
70     rxq = RxQueue(server_if)
71     txq = TxQueue(server_if)
72     sent_packets = []
73
74     for _ in range(10):
75         dhcp_discover = rxq.recv(10)
76         if is_discover(dhcp_discover):
77             break
78     else:
79         raise RuntimeError("DHCP DISCOVER Rx error.")
80
81     dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src)
82     dhcp_offer /= IP(src=server_ip, dst="255.255.255.255")
83     dhcp_offer /= UDP(sport=67, dport=68)
84     dhcp_offer /= BOOTP(op=2,
85                         xid=dhcp_discover['BOOTP'].xid,
86                         yiaddr=client_ip,
87                         siaddr=server_ip,
88                         chaddr=dhcp_discover['BOOTP'].chaddr)
89     dhcp_offer_options = [("message-type", "offer"),  # Option 53
90                           ("subnet_mask", client_mask),  # Option 1
91                           ("server_id", server_ip),  # Option 54, dhcp server
92                           ("lease_time", lease_time),  # Option 51
93                           "end"]
94     dhcp_offer /= DHCP(options=dhcp_offer_options)
95
96     txq.send(dhcp_offer)
97     sent_packets.append(dhcp_offer)
98
99     max_other_pkts = 10
100     for _ in range(0, max_other_pkts):
101         dhcp_request = rxq.recv(5, sent_packets)
102         if not dhcp_request:
103             raise RuntimeError("DHCP REQUEST Rx timeout.")
104         if is_request(dhcp_request):
105             break
106     else:
107         raise RuntimeError("Max RX packet limit reached.")
108
109     # Send dhcp ack
110     dhcp_ack = Ether(src=server_mac, dst=dhcp_request.src)
111     dhcp_ack /= IP(src=server_ip, dst="255.255.255.255")
112     dhcp_ack /= UDP(sport=67, dport=68)
113     dhcp_ack /= BOOTP(op=2,
114                       xid=dhcp_request['BOOTP'].xid,
115                       yiaddr=client_ip,
116                       siaddr=server_ip,
117                       flags=dhcp_request['BOOTP'].flags,
118                       chaddr=dhcp_request['BOOTP'].chaddr)
119     dhcp_ack_options = [("message-type", "ack"),  # Option 53. 5: ACK, 6: NAK
120                         ("subnet_mask", client_mask),  # Option 1
121                         ("server_id", server_ip),  # Option 54, dhcp server
122                         ("lease_time", lease_time),  # Option 51,
123                         "end"]
124     dhcp_ack /= DHCP(options=dhcp_ack_options)
125
126     txq.send(dhcp_ack)
127     sent_packets.append(dhcp_ack)
128
129     sys.exit(0)
130
131 if __name__ == "__main__":
132     main()