Virl test: dot1q ip4base
[csit.git] / resources / traffic_scripts / send_icmp_check_headers.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
18 """
19
20 import sys
21
22 import ipaddress
23 from robot.api import logger
24 from scapy.layers.inet import ICMP, IP
25 from scapy.layers.inet6 import ICMPv6EchoRequest
26 from scapy.layers.inet6 import IPv6
27 from scapy.layers.l2 import Ether, Dot1Q
28
29 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
30 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
31
32
33 def valid_ipv4(ip):
34     try:
35         ipaddress.IPv4Address(unicode(ip))
36         return True
37     except (AttributeError, ipaddress.AddressValueError):
38         return False
39
40
41 def valid_ipv6(ip):
42     try:
43         ipaddress.IPv6Address(unicode(ip))
44         return True
45     except (AttributeError, ipaddress.AddressValueError):
46         return False
47
48
49 def main():
50     """Send IP ICMP packet from one traffic generator interface to the other."""
51     args = TrafficScriptArg(
52         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
53          'dut_if2_mac'],
54         ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
55          'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
56
57     tx_src_mac = args.get_arg('tg_src_mac')
58     tx_dst_mac = args.get_arg('dut_if1_mac')
59     rx_dst_mac = args.get_arg('tg_dst_mac')
60     rx_src_mac = args.get_arg('dut_if2_mac')
61     src_ip = args.get_arg('src_ip')
62     dst_ip = args.get_arg('dst_ip')
63     tx_if = args.get_arg('tx_if')
64     rx_if = args.get_arg('rx_if')
65
66     encaps_tx = args.get_arg('encaps_tx')
67     vlan_tx = args.get_arg('vlan_tx')
68     vlan_outer_tx = args.get_arg('vlan_outer_tx')
69     encaps_rx = args.get_arg('encaps_rx')
70     vlan_rx = args.get_arg('vlan_rx')
71     vlan_outer_rx = args.get_arg('vlan_outer_rx')
72
73     rxq = RxQueue(rx_if)
74     txq = TxQueue(tx_if)
75     sent_packets = []
76     ip_format = ''
77     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
78     if encaps_tx == 'Dot1q':
79         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
80     elif encaps_tx == 'Dot1ad':
81         pkt_raw.type = 0x88a8
82         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
83         pkt_raw /= Dot1Q(vlan=vlan_tx)
84     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
85         pkt_raw /= IP(src=src_ip, dst=dst_ip)
86         pkt_raw /= ICMP()
87         ip_format = IP
88     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
89         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
90         pkt_raw /= ICMPv6EchoRequest()
91         ip_format = IPv6
92     else:
93         raise ValueError("IP not in correct format")
94
95     sent_packets.append(pkt_raw)
96     txq.send(pkt_raw)
97     if tx_if == rx_if:
98         ether = rxq.recv(2, ignore=sent_packets)
99     else:
100         ether = rxq.recv(2)
101
102     if ether is None:
103         raise RuntimeError("ICMP echo Rx timeout")
104
105     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
106         logger.trace("MAC matched")
107     else:
108         raise RuntimeError(
109             "Matching packet unsuccessful: {0}".format(ether.__repr__()))
110
111     if encaps_rx == 'Dot1q':
112         if ether[Dot1Q].vlan == int(vlan_rx):
113             logger.trace("VLAN matched")
114         else:
115             raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
116                                'received, {}-expected):\n{}'
117                                .format(ether[Dot1Q].vlan, vlan_rx,
118                                        ether.__repr__()))
119         ip = ether[Dot1Q].payload
120     elif encaps_rx == 'Dot1ad':
121         raise NotImplementedError()
122     else:
123         ip = ether.payload
124
125     if not isinstance(ip, ip_format):
126         raise RuntimeError(
127             "Not an IP packet received {0}".format(ip.__repr__()))
128
129     # Compare data from packets
130     if src_ip == ip.src:
131         logger.trace("Src IP matched")
132     else:
133         raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
134                            .format(src_ip, ip.src))
135
136     if dst_ip == ip.dst:
137         logger.trace("Dst IP matched")
138     else:
139         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
140                            .format(dst_ip, ip.dst))
141
142     sys.exit(0)
143
144
145 if __name__ == "__main__":
146     main()