94d5deabe7a31ee59d27444b363733e9c4dd5d0c
[csit.git] / resources / traffic_scripts / arp_request.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Send an ARP request and verify the reply"""
17
18 import sys
19
20 from scapy.all import Ether, ARP
21
22 from resources.libraries.python.PacketVerifier import Interface
23 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
24
25
26 def parse_arguments():
27     """Parse arguments of the script passed through command line
28
29     :return: tuple of parsed arguments
30     """
31     args = TrafficScriptArg(['src_if', 'src_mac', 'dst_mac',
32                              'src_ip', 'dst_ip'])
33
34     # check for mandatory parameters
35     params = (args.get_arg('tx_if'),
36               args.get_arg('src_mac'),
37               args.get_arg('dst_mac'),
38               args.get_arg('src_ip'),
39               args.get_arg('dst_ip'))
40     if None in params:
41         raise Exception('Missing mandatory parameter(s)!')
42
43     return params
44
45
46 def arp_request_test():
47     """Send ARP request, expect a reply and verify its fields.
48
49     returns: test status
50     """
51     test_passed = False
52     (src_if, src_mac, dst_mac, src_ip, dst_ip) = parse_arguments()
53
54     interface = Interface(src_if)
55
56     # build an ARP request
57     arp_request = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
58                    ARP(psrc=src_ip, hwsrc=src_mac, pdst=dst_ip,
59                        hwdst='ff:ff:ff:ff:ff:ff'))
60
61     # send the request
62     interface.send_pkt(arp_request)
63
64     try:
65         # wait for APR reply
66         ether = interface.recv_pkt()
67
68         # verify received packet
69
70         if not ether.haslayer(ARP):
71             raise RuntimeError('Unexpected packet: does not contain ARP ' +
72                                'header "{}"'.format(ether.__repr__()))
73
74         arp = ether['ARP']
75         arp_reply = 2
76
77         if arp.op != arp_reply:
78             raise RuntimeError('expected op={}, received {}'.format(arp_reply,
79                                                                     arp.op))
80         if arp.ptype != 0x800:
81             raise RuntimeError('expected ptype=0x800, received {}'.
82                                format(arp.ptype))
83         if arp.hwlen != 6:
84             raise RuntimeError('expected hwlen=6, received {}'.
85                                format(arp.hwlen))
86         if arp.plen != 4:
87             raise RuntimeError('expected plen=4, received {}'.format(arp.plen))
88         if arp.hwsrc != dst_mac:
89             raise RuntimeError('expected hwsrc={}, received {}'.
90                                format(dst_mac, arp.hwsrc))
91         if arp.psrc != dst_ip:
92             raise RuntimeError('expected psrc={}, received {}'.
93                                format(dst_ip, arp.psrc))
94         if arp.hwdst != src_mac:
95             raise RuntimeError('expected hwdst={}, received {}'.
96                                format(src_mac, arp.hwdst))
97         if arp.pdst != src_ip:
98             raise RuntimeError('expected pdst={}, received {}'.
99                                format(src_ip, arp.pdst))
100         test_passed = True
101
102     except RuntimeError as ex:
103         print 'Error occurred: {}'.format(ex)
104
105     return test_passed
106
107
108 def main():
109     """Run the test and collect result"""
110     if arp_request_test():
111         sys.exit(0)
112     else:
113         sys.exit(1)
114
115 if __name__ == '__main__':
116     main()