feat(jobspec): Unify soak jobspecs
[csit.git] / GPL / traffic_scripts / send_icmp_wait_for_reply.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an IP ICMPv4 or ICMPv6."""
27
28 import sys
29
30 from scapy.layers.inet import ICMP, IP
31 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply,\
32     ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
33 from scapy.layers.l2 import Ether
34 from scapy.packet import Raw
35
36 from .PacketVerifier import RxQueue, TxQueue
37 from .TrafficScriptArg import TrafficScriptArg
38 from .ValidIp import valid_ipv4, valid_ipv6
39
40
41 def main():
42     """Send ICMP echo request and wait for ICMP echo reply. It ignores all other
43     packets."""
44     args = TrafficScriptArg(
45         [u"dst_mac", u"src_mac", u"dst_ip", u"src_ip", u"timeout"]
46     )
47
48     dst_mac = args.get_arg(u"dst_mac")
49     src_mac = args.get_arg(u"src_mac")
50     dst_ip = args.get_arg(u"dst_ip")
51     src_ip = args.get_arg(u"src_ip")
52     tx_if = args.get_arg(u"tx_if")
53     rx_if = args.get_arg(u"rx_if")
54     timeout = int(args.get_arg(u"timeout"))
55     wait_step = 1
56
57     rxq = RxQueue(rx_if)
58     txq = TxQueue(tx_if)
59     sent_packets = []
60
61     # Create empty ip ICMP packet
62     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
63         ip_layer = IP
64         icmp_req = ICMP
65         icmp_resp = ICMP
66         icmp_type = 0  # echo-reply
67     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
68         ip_layer = IP
69         icmp_req = ICMPv6EchoRequest
70         icmp_resp = ICMPv6EchoReply
71         icmp_type = 0  # Echo Reply
72     else:
73         raise ValueError(u"IP not in correct format")
74
75     icmp_request = (
76         Ether(src=src_mac, dst=dst_mac) /
77         ip_layer(src=src_ip, dst=dst_ip) /
78         icmp_req()
79     )
80
81     # Send created packet on the interface
82     icmp_request /= Raw()
83     sent_packets.append(icmp_request)
84     txq.send(icmp_request)
85
86     for _ in range(1000):
87         while True:
88             icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
89             if icmp_reply is None:
90                 timeout -= wait_step
91                 if timeout < 0:
92                     raise RuntimeError(u"ICMP echo Rx timeout")
93
94             elif icmp_reply.haslayer(ICMPv6ND_NS):
95                 # read another packet in the queue in case of ICMPv6ND_NS packet
96                 continue
97             elif icmp_reply.haslayer(ICMPv6MLReport2):
98                 # read another packet in the queue if the current one is
99                 # ICMPv6MLReport2
100                 continue
101             elif icmp_reply.haslayer(ICMPv6ND_RA):
102                 # read another packet in the queue if the current one is
103                 # ICMPv6ND_RA
104                 continue
105
106             break
107
108         if icmp_reply[ip_layer][icmp_resp].type == icmp_type:
109             if icmp_reply[ip_layer].src == dst_ip and \
110                     icmp_reply[ip_layer].dst == src_ip:
111                 break
112     else:
113         raise RuntimeError(u"Max packet count limit reached")
114
115     print(u"ICMP echo reply received.")
116
117     sys.exit(0)
118
119
120 if __name__ == u"__main__":
121     main()