816c0053e9c87e14122b4e0f529bee76e4b4b81e
[csit.git] / resources / traffic_scripts / send_ip_check_headers.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
18 """
19
20 import sys
21
22 import ipaddress
23 from robot.api import logger
24 from scapy.layers.inet import IP
25 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
26 from scapy.layers.l2 import Ether, Dot1Q
27
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
30
31
32 def valid_ipv4(ip):
33     try:
34         ipaddress.IPv4Address(unicode(ip))
35         return True
36     except (AttributeError, ipaddress.AddressValueError):
37         return False
38
39
40 def valid_ipv6(ip):
41     try:
42         ipaddress.IPv6Address(unicode(ip))
43         return True
44     except (AttributeError, ipaddress.AddressValueError):
45         return False
46
47
48 def main():
49     """Send IP ICMP packet from one traffic generator interface to the other."""
50     args = TrafficScriptArg(
51         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
52          'dut_if2_mac'],
53         ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
54          'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
55
56     tx_src_mac = args.get_arg('tg_src_mac')
57     tx_dst_mac = args.get_arg('dut_if1_mac')
58     rx_dst_mac = args.get_arg('tg_dst_mac')
59     rx_src_mac = args.get_arg('dut_if2_mac')
60     src_ip = args.get_arg('src_ip')
61     dst_ip = args.get_arg('dst_ip')
62     tx_if = args.get_arg('tx_if')
63     rx_if = args.get_arg('rx_if')
64
65     encaps_tx = args.get_arg('encaps_tx')
66     vlan_tx = args.get_arg('vlan_tx')
67     vlan_outer_tx = args.get_arg('vlan_outer_tx')
68     encaps_rx = args.get_arg('encaps_rx')
69     vlan_rx = args.get_arg('vlan_rx')
70     vlan_outer_rx = args.get_arg('vlan_outer_rx')
71
72     rxq = RxQueue(rx_if)
73     txq = TxQueue(tx_if)
74     sent_packets = []
75     ip_format = ''
76     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
77     if encaps_tx == 'Dot1q':
78         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
79     elif encaps_tx == 'Dot1ad':
80         pkt_raw.type = 0x88a8
81         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
82         pkt_raw /= Dot1Q(vlan=vlan_tx)
83     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
84         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
85         ip_format = IP
86     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
87         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
88         ip_format = IPv6
89     else:
90         raise ValueError("IP not in correct format")
91
92     sent_packets.append(pkt_raw)
93     txq.send(pkt_raw)
94
95     while True:
96         if tx_if == rx_if:
97             ether = rxq.recv(2, ignore=sent_packets)
98         else:
99             ether = rxq.recv(2)
100         if ether is None:
101             raise RuntimeError('ICMP echo Rx timeout')
102
103         if ether.haslayer(ICMPv6ND_NS):
104             # read another packet in the queue if the current one is ICMPv6ND_NS
105             continue
106         else:
107             # otherwise process the current packet
108             break
109
110     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
111         logger.trace("MAC matched")
112     else:
113         raise RuntimeError("Matching packet unsuccessful: {0}".
114                            format(ether.__repr__()))
115
116     if encaps_rx == 'Dot1q':
117         if ether[Dot1Q].vlan == int(vlan_rx):
118             logger.trace("VLAN matched")
119         else:
120             raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
121                                'received, {}-expected):\n{}'.
122                                format(ether[Dot1Q].vlan, vlan_rx,
123                                       ether.__repr__()))
124         ip = ether[Dot1Q].payload
125     elif encaps_rx == 'Dot1ad':
126         raise NotImplementedError()
127     else:
128         ip = ether.payload
129
130     if not isinstance(ip, ip_format):
131         raise RuntimeError("Not an IP packet received {0}".
132                            format(ip.__repr__()))
133
134     # Compare data from packets
135     if src_ip == ip.src:
136         logger.trace("Src IP matched")
137     else:
138         raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
139                            format(src_ip, ip.src))
140
141     if dst_ip == ip.dst:
142         logger.trace("Dst IP matched")
143     else:
144         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
145                            format(dst_ip, ip.dst))
146
147     sys.exit(0)
148
149
150 if __name__ == "__main__":
151     main()