140c205d4e11569b55a5e5e261f9df52fdfa151e
[csit.git] / resources / traffic_scripts / send_ip_icmp.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
16 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
17 """
18
19 import sys
20 import ipaddress
21
22 from scapy.layers.inet import ICMP, IP
23 from scapy.layers.l2 import Ether
24 from scapy.layers.l2 import Dot1Q
25 from scapy.layers.inet6 import ICMPv6EchoRequest
26 from scapy.layers.inet6 import IPv6
27
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
30
31
32 def valid_ipv4(ip):
33     """Check if IP address has the correct IPv4 address format.
34
35     :param ip: IP address.
36     :type ip: str
37     :return: True in case of correct IPv4 address format,
38     otherwise return false.
39     :rtype: bool
40     """
41     try:
42         ipaddress.IPv4Address(unicode(ip))
43         return True
44     except (AttributeError, ipaddress.AddressValueError):
45         return False
46
47
48 def valid_ipv6(ip):
49     """Check if IP address has the correct IPv6 address format.
50
51     :param ip: IP address.
52     :type ip: str
53     :return: True in case of correct IPv6 address format,
54     otherwise return false.
55     :rtype: bool
56     """
57     try:
58         ipaddress.IPv6Address(unicode(ip))
59         return True
60     except (AttributeError, ipaddress.AddressValueError):
61         return False
62
63
64 def main():
65     """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
66     the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
67     """
68     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'],
69                             ['encaps', 'vlan1', 'vlan2'])
70
71     src_mac = args.get_arg('src_mac')
72     dst_mac = args.get_arg('dst_mac')
73     src_ip = args.get_arg('src_ip')
74     dst_ip = args.get_arg('dst_ip')
75
76     encaps = args.get_arg('encaps')
77     vlan1 = args.get_arg('vlan1')
78     vlan2 = args.get_arg('vlan2')
79
80     tx_if = args.get_arg('tx_if')
81     rx_if = args.get_arg('rx_if')
82
83     rxq = RxQueue(rx_if)
84     txq = TxQueue(tx_if)
85
86     sent_packets = []
87     ip_format = ''
88     icmp_format = ''
89     # Create empty ip ICMP packet and add padding before sending
90     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
91         if encaps == 'Dot1q':
92             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
93                        Dot1Q(vlan=int(vlan1)) /
94                        IP(src=src_ip, dst=dst_ip) /
95                        ICMP())
96         elif encaps == 'Dot1ad':
97             pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
98                        Dot1Q(vlan=int(vlan1), type=0x8100) /
99                        Dot1Q(vlan=int(vlan2)) /
100                        IP(src=src_ip, dst=dst_ip) /
101                        ICMP())
102         else:
103             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
104                        IP(src=src_ip, dst=dst_ip) /
105                        ICMP())
106         ip_format = 'IP'
107         icmp_format = 'ICMP'
108     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
109         if encaps == 'Dot1q':
110             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
111                        Dot1Q(vlan=int(vlan1)) /
112                        IPv6(src=src_ip, dst=dst_ip) /
113                        ICMPv6EchoRequest())
114         elif encaps == 'Dot1ad':
115             pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
116                        Dot1Q(vlan=int(vlan1), type=0x8100) /
117                        Dot1Q(vlan=int(vlan2)) /
118                        IPv6(src=src_ip, dst=dst_ip) /
119                        ICMPv6EchoRequest())
120         else:
121             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
122                        IPv6(src=src_ip, dst=dst_ip) /
123                        ICMPv6EchoRequest())
124         ip_format = 'IPv6'
125         icmp_format = 'ICMPv6EchoRequest'
126     else:
127         raise ValueError("IP(s) not in correct format")
128
129     # Send created packet on one interface and receive on the other
130     sent_packets.append(pkt_raw)
131     txq.send(pkt_raw)
132
133     ether = rxq.recv(2)
134
135     # Check whether received packet contains layers Ether, IP and ICMP
136     if ether is None:
137         raise RuntimeError('ICMP echo Rx timeout')
138
139     if not ether.haslayer(ip_format):
140         raise RuntimeError('Not an IP packet received {0}'
141                            .format(ether.__repr__()))
142
143     if not ether.haslayer(icmp_format):
144         raise RuntimeError('Not an ICMP packet received {0}'
145                            .format(ether.__repr__()))
146
147     sys.exit(0)
148
149 if __name__ == "__main__":
150     main()