X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fdhcp%2Fcheck_dhcp_discover.py;fp=resources%2Ftraffic_scripts%2Fdhcp%2Fcheck_dhcp_discover.py;h=0000000000000000000000000000000000000000;hp=2fdc5b7fbfbd8ed4b8834f6c66793692161177d6;hb=d68951ac245150eeefa6e0f4156e4c1b5c9e9325;hpb=ed0258a440cfad7023d643f717ab78ac568dc59b diff --git a/resources/traffic_scripts/dhcp/check_dhcp_discover.py b/resources/traffic_scripts/dhcp/check_dhcp_discover.py deleted file mode 100755 index 2fdc5b7fbf..0000000000 --- a/resources/traffic_scripts/dhcp/check_dhcp_discover.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that receives an DHCP packet on given interface and check if -is correct DHCP DISCOVER message. -""" - -import sys - -from scapy.layers.inet import UDP_SERVICES - -from resources.libraries.python.PacketVerifier import RxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg(['rx_src_mac'], ['hostname']) - - rx_if = args.get_arg('rx_if') - rx_src_mac = args.get_arg('rx_src_mac') - hostname = args.get_arg('hostname') - - rx_dst_mac = 'ff:ff:ff:ff:ff:ff' - rx_src_ip = '0.0.0.0' - rx_dst_ip = '255.255.255.255' - boot_request = 1 - dhcp_magic = 'c\x82Sc' - - rxq = RxQueue(rx_if) - - ether = rxq.recv(10) - - if ether is None: - raise RuntimeError("DHCP DISCOVER Rx timeout.") - - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC address error.") - print "Destination MAC address: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC address error.") - print "Source MAC address: OK." - - if ether['IP'].dst != rx_dst_ip: - raise RuntimeError("Destination IP address error.") - print "Destination IP address: OK." - - if ether['IP'].src != rx_src_ip: - raise RuntimeError("Source IP address error.") - print "Source IP address: OK." - - if ether['IP']['UDP'].dport != UDP_SERVICES.bootps: - raise RuntimeError("UDP destination port error.") - print "UDP destination port: OK." - - if ether['IP']['UDP'].sport != UDP_SERVICES.bootpc: - raise RuntimeError("UDP source port error.") - print "UDP source port: OK." - - bootp = ether['BOOTP'] - - if bootp.op != boot_request: - raise RuntimeError("BOOTP message type error.") - print "BOOTP message type: OK" - - if bootp.ciaddr != '0.0.0.0': - raise RuntimeError("BOOTP client IP address error.") - print "BOOTP client IP address: OK" - - if bootp.yiaddr != '0.0.0.0': - raise RuntimeError("BOOTP 'your' (client) IP address error.") - print "BOOTP 'your' (client) IP address: OK" - - if bootp.siaddr != '0.0.0.0': - raise RuntimeError("BOOTP next server IP address error.") - print "BOOTP next server IP address: OK" - - if bootp.giaddr != '0.0.0.0': - raise RuntimeError("BOOTP relay agent IP address error.") - print "BOOTP relay agent IP address: OK" - - chaddr = bootp.chaddr[:bootp.hlen].encode('hex') - if chaddr != rx_src_mac.replace(':', ''): - raise RuntimeError("BOOTP client hardware address error.") - print "BOOTP client hardware address: OK" - - # Check hostname - if bootp.sname != 64*'\x00': - raise RuntimeError("BOOTP server name error.") - print "BOOTP server name: OK" - - # Check boot file - if bootp.file != 128*'\x00': - raise RuntimeError("BOOTP boot file name error.") - print "BOOTP boot file name: OK" - - # Check bootp magic - if bootp.options != dhcp_magic: - raise RuntimeError("DHCP magic error.") - print "DHCP magic: OK" - - # Check options - dhcp_options = ether['DHCP options'].options - - # Option 12 - hn = filter(lambda x: x[0] == 'hostname', dhcp_options) - if hostname: - try: - if hn[0][1] != hostname: - raise RuntimeError("Client's hostname doesn't match.") - except IndexError: - raise RuntimeError("Option list doesn't contain hostname option.") - else: - if len(hn) != 0: - raise RuntimeError("Option list contains hostname option.") - print "Option 12 hostname: OK" - - # Option 53 - mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1] - if mt != 1: - raise RuntimeError("Option 53 message-type error.") - print "Option 53 message-type: OK" - - # Option 55 - prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1] - if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*': - raise RuntimeError("Option 55 param_req_list error.") - print "Option 55 param_req_list: OK" - - # Option 255 - if 'end' not in dhcp_options: - raise RuntimeError("end option error.") - print "end option: OK" - - sys.exit(0) - - -if __name__ == "__main__": - main()