Performance: Fix l3fwd in 3node
[csit.git] / GPL / traffic_scripts / send_icmp_wait_for_reply.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script that sends an IP ICMPv4 or ICMPv6."""
17
18 import sys
19 import ipaddress
20
21 from scapy.layers.inet import ICMP, IP
22 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
23     ICMPv6ND_NS
24 from scapy.layers.l2 import Ether
25 from scapy.packet import Raw
26
27 from .PacketVerifier import RxQueue, TxQueue
28 from .TrafficScriptArg import TrafficScriptArg
29
30
31 def valid_ipv4(ip):
32     """Check if IP address has the correct IPv4 address format.
33
34     :param ip: IP address.
35     :type ip: str
36     :return: True in case of correct IPv4 address format,
37              otherwise return False.
38     :rtype: bool
39     """
40     try:
41         ipaddress.IPv4Address(ip)
42         return True
43     except (AttributeError, ipaddress.AddressValueError):
44         return False
45
46
47 def valid_ipv6(ip):
48     """Check if IP address has the correct IPv6 address format.
49
50     :param ip: IP address.
51     :type ip: str
52     :return: True in case of correct IPv6 address format,
53              otherwise return False.
54     :rtype: bool
55     """
56     try:
57         ipaddress.IPv6Address(ip)
58         return True
59     except (AttributeError, ipaddress.AddressValueError):
60         return False
61
62
63 def main():
64     """Send ICMP echo request and wait for ICMP echo reply. It ignores all other
65     packets."""
66     args = TrafficScriptArg(
67         [u"dst_mac", u"src_mac", u"dst_ip", u"src_ip", u"timeout"]
68     )
69
70     dst_mac = args.get_arg(u"dst_mac")
71     src_mac = args.get_arg(u"src_mac")
72     dst_ip = args.get_arg(u"dst_ip")
73     src_ip = args.get_arg(u"src_ip")
74     tx_if = args.get_arg(u"tx_if")
75     rx_if = args.get_arg(u"rx_if")
76     timeout = int(args.get_arg(u"timeout"))
77     wait_step = 1
78
79     rxq = RxQueue(rx_if)
80     txq = TxQueue(tx_if)
81     sent_packets = []
82
83     # Create empty ip ICMP packet
84     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
85         ip_layer = IP
86         icmp_req = ICMP
87         icmp_resp = ICMP
88         icmp_type = 0  # echo-reply
89     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
90         ip_layer = IP
91         icmp_req = ICMPv6EchoRequest
92         icmp_resp = ICMPv6EchoReply
93         icmp_type = 0  # Echo Reply
94     else:
95         raise ValueError(u"IP not in correct format")
96
97     icmp_request = (
98             Ether(src=src_mac, dst=dst_mac) /
99             ip_layer(src=src_ip, dst=dst_ip) /
100             icmp_req()
101     )
102
103     # Send created packet on the interface
104     icmp_request /= Raw()
105     sent_packets.append(icmp_request)
106     txq.send(icmp_request)
107
108     for _ in range(1000):
109         while True:
110             icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
111             if icmp_reply is None:
112                 timeout -= wait_step
113                 if timeout < 0:
114                     raise RuntimeError(u"ICMP echo Rx timeout")
115
116             elif icmp_reply.haslayer(ICMPv6ND_NS):
117                 # read another packet in the queue in case of ICMPv6ND_NS packet
118                 continue
119             else:
120                 # otherwise process the current packet
121                 break
122
123         if icmp_reply[ip_layer][icmp_resp].type == icmp_type:
124             if icmp_reply[ip_layer].src == dst_ip and \
125                     icmp_reply[ip_layer].dst == src_ip:
126                 break
127     else:
128         raise RuntimeError(u"Max packet count limit reached")
129
130     print(u"ICMP echo reply received.")
131
132     sys.exit(0)
133
134
135 if __name__ == u"__main__":
136     main()