CSIT-1288: Prepare data to be sent by Jenkins
[csit.git] / resources / traffic_scripts / send_icmp_wait_for_reply.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4 or ICMPv6."""
16
17 import sys
18 import ipaddress
19
20 from scapy.all import Ether
21 from scapy.layers.inet import ICMP, IP
22 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def is_icmp_reply(pkt, ipformat):
29     """Return True if pkt is echo reply, else return False. If exception occurs
30     return False.
31
32     :param pkt: Packet.
33     :param ipformat: Dictionary of names to distinguish IPv4 and IPv6.
34     :type pkt: dict
35     :type ipformat: dict
36     :rtype: bool
37     """
38     # pylint: disable=bare-except
39     try:
40         if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
41                 ipformat['Type']:
42             return True
43         else:
44             return False
45     except: # pylint: disable=bare-except
46         return False
47
48
49 def address_check(request, reply, ipformat):
50     """Compare request packet source address with reply destination address
51     and vice versa. If exception occurs return False.
52
53     :param request: Sent packet containing request.
54     :param reply: Received packet containing reply.
55     :param ipformat: Dictionary of names to distinguish IPv4 and IPv6.
56     :type request: dict
57     :type reply: dict
58     :type ipformat: dict
59     :rtype: bool
60     """
61     # pylint: disable=bare-except
62     try:
63         r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
64         r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
65         return r_src and r_dst
66     except: # pylint: disable=bare-except
67         return False
68
69
70 def valid_ipv4(ip):
71     """Check if IP address has the correct IPv4 address format.
72
73     :param ip: IP address.
74     :type ip: str
75     :return: True in case of correct IPv4 address format,
76              otherwise return False.
77     :rtype: bool
78     """
79     try:
80         ipaddress.IPv4Address(unicode(ip))
81         return True
82     except (AttributeError, ipaddress.AddressValueError):
83         return False
84
85
86 def valid_ipv6(ip):
87     """Check if IP address has the correct IPv6 address format.
88
89     :param ip: IP address.
90     :type ip: str
91     :return: True in case of correct IPv6 address format,
92              otherwise return False.
93     :rtype: bool
94     """
95     try:
96         ipaddress.IPv6Address(unicode(ip))
97         return True
98     except (AttributeError, ipaddress.AddressValueError):
99         return False
100
101
102 def main():
103     """Send ICMP echo request and wait for ICMP echo reply. It ignores all other
104     packets."""
105     args = TrafficScriptArg(['dst_mac', 'src_mac', 'dst_ip', 'src_ip',
106                              'timeout'])
107
108     dst_mac = args.get_arg('dst_mac')
109     src_mac = args.get_arg('src_mac')
110     dst_ip = args.get_arg('dst_ip')
111     src_ip = args.get_arg('src_ip')
112     tx_if = args.get_arg('tx_if')
113     rx_if = args.get_arg('rx_if')
114     timeout = int(args.get_arg('timeout'))
115     wait_step = 1
116
117     rxq = RxQueue(rx_if)
118     txq = TxQueue(tx_if)
119     sent_packets = []
120
121     # Create empty ip ICMP packet
122     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
123         icmp_request = (Ether(src=src_mac, dst=dst_mac) /
124                         IP(src=src_ip, dst=dst_ip) /
125                         ICMP())
126         ip_format = {'IPType': 'IP', 'ICMP_req': 'ICMP',
127                      'ICMP_rep': 'ICMP', 'Type': 0}
128     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
129         icmp_request = (Ether(src=src_mac, dst=dst_mac) /
130                         IPv6(src=src_ip, dst=dst_ip) /
131                         ICMPv6EchoRequest())
132         ip_format = {'IPType': 'IPv6', 'ICMP_req': 'ICMPv6 Echo Request',
133                      'ICMP_rep': 'ICMPv6 Echo Reply', 'Type': 129}
134     else:
135         raise ValueError("IP not in correct format")
136
137     # Send created packet on the interface
138     sent_packets.append(icmp_request)
139     txq.send(icmp_request)
140
141     for _ in range(1000):
142         while True:
143             icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
144             if icmp_reply is None:
145                 timeout -= wait_step
146                 if timeout < 0:
147                     raise RuntimeError("ICMP echo Rx timeout")
148
149             elif icmp_reply.haslayer(ICMPv6ND_NS):
150                 # read another packet in the queue in case of ICMPv6ND_NS packet
151                 continue
152             else:
153                 # otherwise process the current packet
154                 break
155
156         if is_icmp_reply(icmp_reply, ip_format):
157             if address_check(icmp_request, icmp_reply, ip_format):
158                 break
159     else:
160         raise RuntimeError("Max packet count limit reached")
161
162     print "ICMP echo reply received."
163
164     sys.exit(0)
165
166
167 if __name__ == "__main__":
168     main()