FIX: do not use ping in vpp-device tests
[csit.git] / resources / traffic_scripts / send_ip_check_headers.py
1 #!/usr/bin/env python
2 # Copyright (c) 2019 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP IPv4/IPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
18 """
19
20 import sys
21
22 import ipaddress
23 from robot.api import logger
24 from scapy.layers.inet import IP
25 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
26 from scapy.layers.l2 import Ether, Dot1Q
27
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
30
31
32 def valid_ipv4(ip):
33     try:
34         ipaddress.IPv4Address(unicode(ip))
35         return True
36     except (AttributeError, ipaddress.AddressValueError):
37         return False
38
39
40 def valid_ipv6(ip):
41     try:
42         ipaddress.IPv6Address(unicode(ip))
43         return True
44     except (AttributeError, ipaddress.AddressValueError):
45         return False
46
47
48 def main():
49     """Send IP/IPv6 packet from one traffic generator interface to the other."""
50     args = TrafficScriptArg(
51         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
52          'dut_if2_mac'],
53         ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
54          'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
55
56     tx_src_mac = args.get_arg('tg_src_mac')
57     tx_dst_mac = args.get_arg('dut_if1_mac')
58     rx_dst_mac = args.get_arg('tg_dst_mac')
59     rx_src_mac = args.get_arg('dut_if2_mac')
60     src_ip = args.get_arg('src_ip')
61     dst_ip = args.get_arg('dst_ip')
62     tx_if = args.get_arg('tx_if')
63     rx_if = args.get_arg('rx_if')
64
65     encaps_tx = args.get_arg('encaps_tx')
66     vlan_tx = args.get_arg('vlan_tx')
67     vlan_outer_tx = args.get_arg('vlan_outer_tx')
68     encaps_rx = args.get_arg('encaps_rx')
69     vlan_rx = args.get_arg('vlan_rx')
70     vlan_outer_rx = args.get_arg('vlan_outer_rx')
71
72     rxq = RxQueue(rx_if)
73     txq = TxQueue(tx_if)
74     sent_packets = []
75     ip_format = ''
76     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
77     if encaps_tx == 'Dot1q':
78         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
79     elif encaps_tx == 'Dot1ad':
80         pkt_raw.type = 0x88a8
81         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
82         pkt_raw /= Dot1Q(vlan=vlan_tx)
83     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
84         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
85         ip_format = IP
86     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
87         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
88         ip_format = IPv6
89     else:
90         raise ValueError("IP not in correct format")
91
92     sent_packets.append(pkt_raw)
93     txq.send(pkt_raw)
94
95     while True:
96         if tx_if == rx_if:
97             ether = rxq.recv(2, ignore=sent_packets)
98         else:
99             ether = rxq.recv(2)
100
101         if ether is None:
102             raise RuntimeError('IP packet Rx timeout')
103
104         if ether.haslayer(ICMPv6ND_NS):
105             # read another packet in the queue if the current one is ICMPv6ND_NS
106             continue
107         else:
108             # otherwise process the current packet
109             break
110
111     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
112         logger.trace("MAC matched")
113     else:
114         raise RuntimeError("Matching packet unsuccessful: {0}".
115                            format(ether.__repr__()))
116
117     if encaps_rx == 'Dot1q':
118         if ether[Dot1Q].vlan == int(vlan_rx):
119             logger.trace("VLAN matched")
120         else:
121             raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
122                                'received, {}-expected):\n{}'.
123                                format(ether[Dot1Q].vlan, vlan_rx,
124                                       ether.__repr__()))
125         ip = ether[Dot1Q].payload
126     elif encaps_rx == 'Dot1ad':
127         raise NotImplementedError()
128     else:
129         ip = ether.payload
130
131     if not isinstance(ip, ip_format):
132         raise RuntimeError("Not an IP packet received {0}".
133                            format(ip.__repr__()))
134
135     # Compare data from packets
136     if src_ip == ip.src:
137         logger.trace("Src IP matched")
138     else:
139         raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
140                            format(src_ip, ip.src))
141
142     if dst_ip == ip.dst:
143         logger.trace("Dst IP matched")
144     else:
145         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
146                            format(dst_ip, ip.dst))
147
148     sys.exit(0)
149
150
151 if __name__ == "__main__":
152     main()