Switch licenses in GPL directory
[csit.git] / GPL / traffic_scripts / nat.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
18 #
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24
25 """Traffic script for NAT verification."""
26
27 import sys
28
29 import ipaddress
30
31 from scapy.layers.inet import IP, TCP, UDP
32 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
33 from scapy.layers.l2 import Ether
34 from scapy.packet import Raw
35
36 from .PacketVerifier import RxQueue, TxQueue
37 from .TrafficScriptArg import TrafficScriptArg
38
39
40 def valid_ipv4(ip):
41     try:
42         ipaddress.IPv4Address(ip)
43         return True
44     except (AttributeError, ipaddress.AddressValueError):
45         return False
46
47
48 def valid_ipv6(ip):
49     try:
50         ipaddress.IPv6Address(ip)
51         return True
52     except (AttributeError, ipaddress.AddressValueError):
53         return False
54
55
56 def main():
57     """Send, receive and check IP/IPv6 packets with UDP/TCP layer passing
58     through NAT.
59     """
60     args = TrafficScriptArg(
61         [
62             u"tx_src_mac", u"rx_dst_mac", u"src_ip_in", u"src_ip_out",
63             u"dst_ip", u"tx_dst_mac", u"rx_src_mac", u"protocol",
64             u"src_port_in", u"src_port_out", u"dst_port"
65         ]
66     )
67
68     tx_src_mac = args.get_arg(u"tx_src_mac")
69     tx_dst_mac = args.get_arg(u"tx_dst_mac")
70     rx_dst_mac = args.get_arg(u"rx_dst_mac")
71     rx_src_mac = args.get_arg(u"rx_src_mac")
72     src_ip_in = args.get_arg(u"src_ip_in")
73     src_ip_out = args.get_arg(u"src_ip_out")
74     dst_ip = args.get_arg(u"dst_ip")
75     protocol = args.get_arg(u"protocol")
76     sport_in = int(args.get_arg(u"src_port_in"))
77     try:
78         sport_out = int(args.get_arg(u"src_port_out"))
79     except ValueError:
80         sport_out = None
81     dst_port = int(args.get_arg(u"dst_port"))
82
83     tx_txq = TxQueue(args.get_arg(u"tx_if"))
84     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
85     rx_txq = TxQueue(args.get_arg(u"rx_if"))
86     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
87
88     sent_packets = list()
89     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
90
91     if valid_ipv4(src_ip_in) and valid_ipv4(dst_ip):
92         ip_layer = IP
93     elif valid_ipv6(src_ip_in) and valid_ipv6(dst_ip):
94         ip_layer = IPv6
95     else:
96         raise ValueError(u"IP not in correct format")
97     pkt_raw /= ip_layer(src=src_ip_in, dst=dst_ip)
98
99     if protocol == u"UDP":
100         pkt_raw /= UDP(sport=sport_in, dport=dst_port)
101         proto_layer = UDP
102     elif protocol == u"TCP":
103         # flags=0x2 => SYN flag set
104         pkt_raw /= TCP(sport=sport_in, dport=dst_port, flags=0x2)
105         proto_layer = TCP
106     else:
107         raise ValueError(u"Incorrect protocol")
108
109     pkt_raw /= Raw()
110     sent_packets.append(pkt_raw)
111     tx_txq.send(pkt_raw)
112
113     while True:
114         ether = rx_rxq.recv(2)
115
116         if ether is None:
117             raise RuntimeError(u"IP packet Rx timeout")
118
119         if ether.haslayer(ICMPv6ND_NS):
120             # read another packet in the queue if the current one is ICMPv6ND_NS
121             continue
122         else:
123             # otherwise process the current packet
124             break
125
126     if rx_dst_mac != ether[Ether].dst or rx_src_mac != ether[Ether].src:
127         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
128
129     ip_pkt = ether.payload
130     if not isinstance(ip_pkt, ip_layer):
131         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
132     if ip_pkt.src != src_ip_out:
133         raise RuntimeError(
134             f"Matching Src IP address unsuccessful: "
135             f"{src_ip_out} != {ip_pkt.src}"
136         )
137     if ip_pkt.dst != dst_ip:
138         raise RuntimeError(
139             f"Matching Dst IP address unsuccessful: {dst_ip} != {ip_pkt.dst}"
140         )
141
142     proto_pkt = ip_pkt.payload
143     if not isinstance(proto_pkt, proto_layer):
144         raise RuntimeError(
145             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
146         )
147     if sport_out is not None:
148         if proto_pkt.sport != sport_out:
149             raise RuntimeError(
150                 f"Matching Src {proto_layer!s} port unsuccessful: "
151                 f"{sport_out} != {proto_pkt.sport}"
152             )
153     else:
154         sport_out = proto_pkt.sport
155     if proto_pkt.dport != dst_port:
156         raise RuntimeError(
157             f"Matching Dst {proto_layer!s} port unsuccessful: "
158             f"{dst_port} != {proto_pkt.dport}"
159         )
160     if proto_layer == TCP:
161         if proto_pkt.flags != 0x2:
162             raise RuntimeError(
163                 f"Not a TCP SYN packet received: {proto_pkt!r}"
164             )
165
166     pkt_raw = Ether(src=rx_dst_mac, dst=rx_src_mac)
167     pkt_raw /= ip_layer(src=dst_ip, dst=src_ip_out)
168     pkt_raw /= proto_layer(sport=dst_port, dport=sport_out)
169     if proto_layer == TCP:
170         # flags=0x12 => SYN, ACK flags set
171         pkt_raw[TCP].flags = 0x12
172     pkt_raw /= Raw()
173     rx_txq.send(pkt_raw)
174
175     while True:
176         ether = tx_rxq.recv(2, ignore=sent_packets)
177
178         if ether is None:
179             raise RuntimeError(u"IP packet Rx timeout")
180
181         if ether.haslayer(ICMPv6ND_NS):
182             # read another packet in the queue if the current one is ICMPv6ND_NS
183             continue
184         else:
185             # otherwise process the current packet
186             break
187
188     if ether[Ether].dst != tx_src_mac or ether[Ether].src != tx_dst_mac:
189         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
190
191     ip_pkt = ether.payload
192     if not isinstance(ip_pkt, ip_layer):
193         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
194     if ip_pkt.src != dst_ip:
195         raise RuntimeError(
196             f"Matching Src IP address unsuccessful: {dst_ip} != {ip_pkt.src}"
197         )
198     if ip_pkt.dst != src_ip_in:
199         raise RuntimeError(
200             f"Matching Dst IP address unsuccessful: {src_ip_in} != {ip_pkt.dst}"
201         )
202
203     proto_pkt = ip_pkt.payload
204     if not isinstance(proto_pkt, proto_layer):
205         raise RuntimeError(
206             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
207         )
208     if proto_pkt.sport != dst_port:
209         raise RuntimeError(
210             f"Matching Src {proto_layer!s} port unsuccessful: "
211             f"{dst_port} != {proto_pkt.sport}"
212         )
213     if proto_pkt.dport != sport_in:
214         raise RuntimeError(
215             f"Matching Dst {proto_layer!s} port unsuccessful: "
216             f"{sport_in} != {proto_pkt.dport}"
217         )
218     if proto_layer == TCP:
219         if proto_pkt.flags != 0x12:
220             raise RuntimeError(
221                 f"Not a TCP SYN-ACK packet received: {proto_pkt!r}"
222             )
223
224     sys.exit(0)
225
226
227 if __name__ == u"__main__":
228     main()