Switch licenses in GPL directory
[csit.git] / GPL / traffic_scripts / send_ip_check_headers.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
18 #
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24
25 """Traffic script that sends an IP IPv4/IPv6 packet from one interface
26 to the other. Source and destination IP addresses and source and destination
27 MAC addresses are checked in received packet.
28 """
29
30 import sys
31
32 import ipaddress
33
34 from robot.api import logger
35 from scapy.layers.inet import IP
36 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
37 from scapy.layers.l2 import Ether, Dot1Q
38 from scapy.packet import Raw
39
40 from .PacketVerifier import RxQueue, TxQueue
41 from .TrafficScriptArg import TrafficScriptArg
42
43
44 def valid_ipv4(ip):
45     try:
46         ipaddress.IPv4Address(ip)
47         return True
48     except (AttributeError, ipaddress.AddressValueError):
49         return False
50
51
52 def valid_ipv6(ip):
53     try:
54         ipaddress.IPv6Address(ip)
55         return True
56     except (AttributeError, ipaddress.AddressValueError):
57         return False
58
59
60 def main():
61     """Send IP/IPv6 packet from one traffic generator interface to the other."""
62     args = TrafficScriptArg(
63         [
64             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
65             u"dut_if2_mac"
66         ],
67         [
68             u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
69             u"vlan_rx", u"vlan_outer_rx"
70         ]
71     )
72
73     tx_src_mac = args.get_arg(u"tg_src_mac")
74     tx_dst_mac = args.get_arg(u"dut_if1_mac")
75     rx_dst_mac = args.get_arg(u"tg_dst_mac")
76     rx_src_mac = args.get_arg(u"dut_if2_mac")
77     src_ip = args.get_arg(u"src_ip")
78     dst_ip = args.get_arg(u"dst_ip")
79     tx_if = args.get_arg(u"tx_if")
80     rx_if = args.get_arg(u"rx_if")
81
82     encaps_tx = args.get_arg(u"encaps_tx")
83     vlan_tx = args.get_arg(u"vlan_tx")
84     vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
85     encaps_rx = args.get_arg(u"encaps_rx")
86     vlan_rx = args.get_arg(u"vlan_rx")
87     vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
88
89     rxq = RxQueue(rx_if)
90     txq = TxQueue(tx_if)
91
92     sent_packets =list()
93     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
94
95     if encaps_tx == u"Dot1q":
96         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
97     elif encaps_tx == u"Dot1ad":
98         pkt_raw.type = 0x88a8
99         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
100         pkt_raw /= Dot1Q(vlan=vlan_tx)
101
102     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
103         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
104         ip_format = IP
105     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
106         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
107         ip_format = IPv6
108     else:
109         raise ValueError(u"IP not in correct format")
110
111     pkt_raw /= Raw()
112     sent_packets.append(pkt_raw)
113     txq.send(pkt_raw)
114
115     while True:
116         if tx_if == rx_if:
117             ether = rxq.recv(2, ignore=sent_packets)
118         else:
119             ether = rxq.recv(2)
120
121         if ether is None:
122             raise RuntimeError(u"IP packet Rx timeout")
123
124         if ether.haslayer(ICMPv6ND_NS):
125             # read another packet in the queue if the current one is ICMPv6ND_NS
126             continue
127         else:
128             # otherwise process the current packet
129             break
130
131     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
132         logger.trace(u"MAC matched")
133     else:
134         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
135
136     if encaps_rx == u"Dot1q":
137         if ether[Dot1Q].vlan == int(vlan_rx):
138             logger.trace(u"VLAN matched")
139         else:
140             raise RuntimeError(
141                 f"Ethernet frame with wrong VLAN tag "
142                 f"({ether[Dot1Q].vlan}-received, "
143                 f"{vlan_rx}-expected):\n{ether!r}"
144             )
145         ip = ether[Dot1Q].payload
146     elif encaps_rx == u"Dot1ad":
147         raise NotImplementedError()
148     else:
149         ip = ether.payload
150
151     if not isinstance(ip, ip_format):
152         raise RuntimeError(f"Not an IP packet received {ip!r}")
153
154     # Compare data from packets
155     if src_ip == ip.src:
156         logger.trace(u"Src IP matched")
157     else:
158         raise RuntimeError(
159             f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
160         )
161
162     if dst_ip == ip.dst:
163         logger.trace(u"Dst IP matched")
164     else:
165         raise RuntimeError(
166             f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
167         )
168
169     sys.exit(0)
170
171
172 if __name__ == u"__main__":
173     main()