Python3: PIP requirement
[csit.git] / resources / traffic_scripts / dhcp / send_and_check_proxy_discover.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends DHCP DISCOVER packet
16  and check if is received on interface."""
17
18 import sys
19
20 from scapy.all import Ether
21 from scapy.layers.inet import IP, UDP
22 from scapy.layers.inet import UDP_SERVICES
23 from scapy.layers.dhcp import DHCP, BOOTP
24
25 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
26 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
27
28
29 def is_discover(pkt):
30     """If DHCP message type option is set to dhcp discover return True,
31     else return False. False is returned also if exception occurs."""
32     dhcp_discover = 1
33     try:
34         dhcp_options = pkt['BOOTP']['DHCP options'].options
35         message_type = filter(lambda x: x[0] == 'message-type',
36                               dhcp_options)
37         message_type = message_type[0][1]
38         return message_type == dhcp_discover
39     except:
40         return False
41
42
43 def main():
44     """Send DHCP DISCOVER packet."""
45
46     args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip'])
47
48     tx_if = args.get_arg('tx_if')
49     rx_if = args.get_arg('rx_if')
50
51     rxq = RxQueue(rx_if)
52     txq = TxQueue(tx_if)
53
54     tx_src_ip = args.get_arg('tx_src_ip')
55     tx_dst_ip = args.get_arg('tx_dst_ip')
56
57     sent_packets = []
58
59     dhcp_discover = Ether(dst="ff:ff:ff:ff:ff:ff") / \
60                     IP(src=tx_src_ip, dst=tx_dst_ip) / \
61                     UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \
62                     BOOTP(op=1,) / \
63                     DHCP(options=[("message-type", "discover"),
64                                   "end"])
65
66     sent_packets.append(dhcp_discover)
67     txq.send(dhcp_discover)
68
69     for _ in range(10):
70         dhcp_discover = rxq.recv(2)
71         if is_discover(dhcp_discover):
72             break
73     else:
74         raise RuntimeError("DHCP DISCOVER Rx timeout")
75
76     sys.exit(0)
77
78 if __name__ == "__main__":
79     main()