Traffic scripts: Fix docstrings
[csit.git] / GPL / traffic_scripts / send_icmp_wait_for_reply.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an IP ICMPv4 or ICMPv6."""
27
28 import sys
29 import ipaddress
30
31 from scapy.layers.inet import ICMP, IP
32 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply,\
33     ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
34 from scapy.layers.l2 import Ether
35 from scapy.packet import Raw
36
37 from .PacketVerifier import RxQueue, TxQueue
38 from .TrafficScriptArg import TrafficScriptArg
39
40
41 def valid_ipv4(ip_address):
42     """Check IPv4 address.
43
44     :param ip_address: IPv4 address to check.
45     :type ip_address: str
46     :returns: True if IP address is correct.
47     :rtype: bool
48     """
49     try:
50         ipaddress.IPv4Address(ip_address)
51         return True
52     except (AttributeError, ipaddress.AddressValueError):
53         return False
54
55
56 def valid_ipv6(ip_address):
57     """Check IPv6 address.
58
59     :param ip_address: IPv6 address to check.
60     :type ip_address: str
61     :returns: True if IP address is correct.
62     :rtype: bool
63     """
64     try:
65         ipaddress.IPv6Address(ip_address)
66         return True
67     except (AttributeError, ipaddress.AddressValueError):
68         return False
69
70
71 def main():
72     """Send ICMP echo request and wait for ICMP echo reply. It ignores all other
73     packets."""
74     args = TrafficScriptArg(
75         [u"dst_mac", u"src_mac", u"dst_ip", u"src_ip", u"timeout"]
76     )
77
78     dst_mac = args.get_arg(u"dst_mac")
79     src_mac = args.get_arg(u"src_mac")
80     dst_ip = args.get_arg(u"dst_ip")
81     src_ip = args.get_arg(u"src_ip")
82     tx_if = args.get_arg(u"tx_if")
83     rx_if = args.get_arg(u"rx_if")
84     timeout = int(args.get_arg(u"timeout"))
85     wait_step = 1
86
87     rxq = RxQueue(rx_if)
88     txq = TxQueue(tx_if)
89     sent_packets = []
90
91     # Create empty ip ICMP packet
92     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
93         ip_layer = IP
94         icmp_req = ICMP
95         icmp_resp = ICMP
96         icmp_type = 0  # echo-reply
97     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
98         ip_layer = IP
99         icmp_req = ICMPv6EchoRequest
100         icmp_resp = ICMPv6EchoReply
101         icmp_type = 0  # Echo Reply
102     else:
103         raise ValueError(u"IP not in correct format")
104
105     icmp_request = (
106         Ether(src=src_mac, dst=dst_mac) /
107         ip_layer(src=src_ip, dst=dst_ip) /
108         icmp_req()
109     )
110
111     # Send created packet on the interface
112     icmp_request /= Raw()
113     sent_packets.append(icmp_request)
114     txq.send(icmp_request)
115
116     for _ in range(1000):
117         while True:
118             icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
119             if icmp_reply is None:
120                 timeout -= wait_step
121                 if timeout < 0:
122                     raise RuntimeError(u"ICMP echo Rx timeout")
123
124             elif icmp_reply.haslayer(ICMPv6ND_NS):
125                 # read another packet in the queue in case of ICMPv6ND_NS packet
126                 continue
127             elif icmp_reply.haslayer(ICMPv6MLReport2):
128                 # read another packet in the queue if the current one is
129                 # ICMPv6MLReport2
130                 continue
131             elif icmp_reply.haslayer(ICMPv6ND_RA):
132                 # read another packet in the queue if the current one is
133                 # ICMPv6ND_RA
134                 continue
135
136             break
137
138         if icmp_reply[ip_layer][icmp_resp].type == icmp_type:
139             if icmp_reply[ip_layer].src == dst_ip and \
140                     icmp_reply[ip_layer].dst == src_ip:
141                 break
142     else:
143         raise RuntimeError(u"Max packet count limit reached")
144
145     print(u"ICMP echo reply received.")
146
147     sys.exit(0)
148
149
150 if __name__ == u"__main__":
151     main()