VIRL test: Dot1Q-L2BD-vhost_user test (CSIT-502)
[csit.git] / resources / traffic_scripts / send_ip_icmp.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
16 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
17 """
18
19 import sys
20 import ipaddress
21
22 from scapy.layers.inet import ICMP, IP
23 from scapy.layers.l2 import Ether
24 from scapy.layers.l2 import Dot1Q
25 from scapy.layers.inet6 import ICMPv6EchoRequest
26 from scapy.layers.inet6 import IPv6
27
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
30
31
32 def valid_ipv4(ip):
33     """Check if IP address has the correct IPv4 address format.
34
35     :param ip: IP address.
36     :type ip: str
37     :return: True in case of correct IPv4 address format,
38     otherwise return false.
39     :rtype: bool
40     """
41     try:
42         ipaddress.IPv4Address(unicode(ip))
43         return True
44     except (AttributeError, ipaddress.AddressValueError):
45         return False
46
47
48 def valid_ipv6(ip):
49     """Check if IP address has the correct IPv6 address format.
50
51     :param ip: IP address.
52     :type ip: str
53     :return: True in case of correct IPv6 address format,
54     otherwise return false.
55     :rtype: bool
56     """
57     try:
58         ipaddress.IPv6Address(unicode(ip))
59         return True
60     except (AttributeError, ipaddress.AddressValueError):
61         return False
62
63
64 def main():
65     """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
66     the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
67     """
68     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'],
69                             ['encaps', 'vlan1', 'vlan2', 'encaps_rx',
70                              'vlan1_rx', 'vlan2_rx'])
71
72     src_mac = args.get_arg('src_mac')
73     dst_mac = args.get_arg('dst_mac')
74     src_ip = args.get_arg('src_ip')
75     dst_ip = args.get_arg('dst_ip')
76
77     encaps = args.get_arg('encaps')
78     vlan1 = args.get_arg('vlan1')
79     vlan2 = args.get_arg('vlan2')
80     encaps_rx = args.get_arg('encaps_rx')
81     vlan1_rx = args.get_arg('vlan1_rx')
82     vlan2_rx = args.get_arg('vlan2_rx')
83
84     tx_if = args.get_arg('tx_if')
85     rx_if = args.get_arg('rx_if')
86
87     rxq = RxQueue(rx_if)
88     txq = TxQueue(tx_if)
89
90     sent_packets = []
91     ip_format = ''
92     icmp_format = ''
93     # Create empty ip ICMP packet and add padding before sending
94     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
95         if encaps == 'Dot1q':
96             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
97                        Dot1Q(vlan=int(vlan1)) /
98                        IP(src=src_ip, dst=dst_ip) /
99                        ICMP())
100         elif encaps == 'Dot1ad':
101             pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
102                        Dot1Q(vlan=int(vlan1), type=0x8100) /
103                        Dot1Q(vlan=int(vlan2)) /
104                        IP(src=src_ip, dst=dst_ip) /
105                        ICMP())
106         else:
107             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
108                        IP(src=src_ip, dst=dst_ip) /
109                        ICMP())
110         ip_format = 'IP'
111         icmp_format = 'ICMP'
112     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
113         if encaps == 'Dot1q':
114             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
115                        Dot1Q(vlan=int(vlan1)) /
116                        IPv6(src=src_ip, dst=dst_ip) /
117                        ICMPv6EchoRequest())
118         elif encaps == 'Dot1ad':
119             pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
120                        Dot1Q(vlan=int(vlan1), type=0x8100) /
121                        Dot1Q(vlan=int(vlan2)) /
122                        IPv6(src=src_ip, dst=dst_ip) /
123                        ICMPv6EchoRequest())
124         else:
125             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
126                        IPv6(src=src_ip, dst=dst_ip) /
127                        ICMPv6EchoRequest())
128         ip_format = 'IPv6'
129         icmp_format = 'ICMPv6EchoRequest'
130     else:
131         raise ValueError("IP(s) not in correct format")
132
133     # Send created packet on one interface and receive on the other
134     sent_packets.append(pkt_raw)
135     txq.send(pkt_raw)
136
137     ether = rxq.recv(2)
138
139     # Check whether received packet contains layers Ether, IP and ICMP
140     if ether is None:
141         raise RuntimeError('ICMP echo Rx timeout')
142
143     if encaps_rx:
144         if encaps_rx == 'Dot1q':
145             if not vlan1_rx:
146                 vlan1_rx = vlan1
147             if not ether.haslayer(Dot1Q):
148                 raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'
149                                    .format(ether.__repr__()))
150             elif ether[Dot1Q].vlan != int(vlan1_rx):
151                 raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) '
152                                    'received ({} expected):\n{}'.format(
153                     ether[Dot1Q].vlan, vlan1_rx, ether.__repr__()))
154         elif encaps_rx == 'Dot1ad':
155             if not vlan1_rx:
156                 vlan1_rx = vlan1
157             if not vlan2_rx:
158                 vlan2_rx = vlan2
159             # TODO
160             raise RuntimeError('Encapsulation {0} not implemented yet.'
161                                .format(encaps_rx))
162         else:
163             raise RuntimeError('Unsupported/unknown encapsulation expected: {0}'
164                                .format(encaps_rx))
165
166     if not ether.haslayer(ip_format):
167         raise RuntimeError('Not an IP packet received:\n{0}'
168                            .format(ether.__repr__()))
169
170     if not ether.haslayer(icmp_format):
171         raise RuntimeError('Not an ICMP packet received:\n{0}'
172                            .format(ether.__repr__()))
173
174     sys.exit(0)
175
176 if __name__ == "__main__":
177     main()