CSIT-755: Presentation and analytics layer
[csit.git] / resources / traffic_scripts / send_icmp_check_headers.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
18 """
19
20 import sys
21
22 import ipaddress
23 from robot.api import logger
24 from scapy.layers.inet import ICMP, IP
25 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
26 from scapy.layers.l2 import Ether, Dot1Q
27
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
30
31
32 def valid_ipv4(ip):
33     try:
34         ipaddress.IPv4Address(unicode(ip))
35         return True
36     except (AttributeError, ipaddress.AddressValueError):
37         return False
38
39
40 def valid_ipv6(ip):
41     try:
42         ipaddress.IPv6Address(unicode(ip))
43         return True
44     except (AttributeError, ipaddress.AddressValueError):
45         return False
46
47
48 def main():
49     """Send IP ICMP packet from one traffic generator interface to the other."""
50     args = TrafficScriptArg(
51         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
52          'dut_if2_mac'],
53         ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
54          'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
55
56     tx_src_mac = args.get_arg('tg_src_mac')
57     tx_dst_mac = args.get_arg('dut_if1_mac')
58     rx_dst_mac = args.get_arg('tg_dst_mac')
59     rx_src_mac = args.get_arg('dut_if2_mac')
60     src_ip = args.get_arg('src_ip')
61     dst_ip = args.get_arg('dst_ip')
62     tx_if = args.get_arg('tx_if')
63     rx_if = args.get_arg('rx_if')
64
65     encaps_tx = args.get_arg('encaps_tx')
66     vlan_tx = args.get_arg('vlan_tx')
67     vlan_outer_tx = args.get_arg('vlan_outer_tx')
68     encaps_rx = args.get_arg('encaps_rx')
69     vlan_rx = args.get_arg('vlan_rx')
70     vlan_outer_rx = args.get_arg('vlan_outer_rx')
71
72     rxq = RxQueue(rx_if)
73     txq = TxQueue(tx_if)
74     sent_packets = []
75     ip_format = ''
76     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
77     if encaps_tx == 'Dot1q':
78         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
79     elif encaps_tx == 'Dot1ad':
80         pkt_raw.type = 0x88a8
81         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
82         pkt_raw /= Dot1Q(vlan=vlan_tx)
83     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
84         pkt_raw /= IP(src=src_ip, dst=dst_ip)
85         pkt_raw /= ICMP()
86         ip_format = IP
87     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
88         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
89         pkt_raw /= ICMPv6EchoRequest()
90         ip_format = IPv6
91     else:
92         raise ValueError("IP not in correct format")
93
94     sent_packets.append(pkt_raw)
95     txq.send(pkt_raw)
96
97     while True:
98         if tx_if == rx_if:
99             ether = rxq.recv(2, ignore=sent_packets)
100         else:
101             ether = rxq.recv(2)
102         if ether is None:
103             raise RuntimeError('ICMP echo Rx timeout')
104
105         if ether.haslayer(ICMPv6ND_NS):
106             # read another packet in the queue if the current one is ICMPv6ND_NS
107             continue
108         else:
109             # otherwise process the current packet
110             break
111
112     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
113         logger.trace("MAC matched")
114     else:
115         raise RuntimeError("Matching packet unsuccessful: {0}".
116                            format(ether.__repr__()))
117
118     if encaps_rx == 'Dot1q':
119         if ether[Dot1Q].vlan == int(vlan_rx):
120             logger.trace("VLAN matched")
121         else:
122             raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
123                                'received, {}-expected):\n{}'.
124                                format(ether[Dot1Q].vlan, vlan_rx,
125                                       ether.__repr__()))
126         ip = ether[Dot1Q].payload
127     elif encaps_rx == 'Dot1ad':
128         raise NotImplementedError()
129     else:
130         ip = ether.payload
131
132     if not isinstance(ip, ip_format):
133         raise RuntimeError("Not an IP packet received {0}".
134                            format(ip.__repr__()))
135
136     # Compare data from packets
137     if src_ip == ip.src:
138         logger.trace("Src IP matched")
139     else:
140         raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
141                            format(src_ip, ip.src))
142
143     if dst_ip == ip.dst:
144         logger.trace("Dst IP matched")
145     else:
146         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
147                            format(dst_ip, ip.dst))
148
149     sys.exit(0)
150
151
152 if __name__ == "__main__":
153     main()