Traffic scripts: Fix docstrings
[csit.git] / GPL / traffic_scripts / nat.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script for NAT verification."""
27
28 import sys
29
30 import ipaddress
31
32 from scapy.layers.inet import IP, TCP, UDP
33 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
34 from scapy.layers.l2 import Ether
35 from scapy.packet import Raw
36
37 from .PacketVerifier import RxQueue, TxQueue
38 from .TrafficScriptArg import TrafficScriptArg
39
40
41 def valid_ipv4(ip_address):
42     """Check IPv4 address.
43
44     :param ip_address: IPv4 address to check.
45     :type ip_address: str
46     :returns: True if IP address is correct.
47     :rtype: bool
48     """
49     try:
50         ipaddress.IPv4Address(ip_address)
51         return True
52     except (AttributeError, ipaddress.AddressValueError):
53         return False
54
55
56 def valid_ipv6(ip_address):
57     """Check IPv6 address.
58
59     :param ip_address: IPv6 address to check.
60     :type ip_address: str
61     :returns: True if IP address is correct.
62     :rtype: bool
63     """
64     try:
65         ipaddress.IPv6Address(ip_address)
66         return True
67     except (AttributeError, ipaddress.AddressValueError):
68         return False
69
70
71 def main():
72     """Send, receive and check IP/IPv6 packets with UDP/TCP layer passing
73     through NAT.
74     """
75     args = TrafficScriptArg(
76         [
77             u"tx_src_mac", u"rx_dst_mac", u"src_ip_in", u"src_ip_out",
78             u"dst_ip", u"tx_dst_mac", u"rx_src_mac", u"protocol",
79             u"src_port_in", u"src_port_out", u"dst_port"
80         ]
81     )
82
83     tx_src_mac = args.get_arg(u"tx_src_mac")
84     tx_dst_mac = args.get_arg(u"tx_dst_mac")
85     rx_dst_mac = args.get_arg(u"rx_dst_mac")
86     rx_src_mac = args.get_arg(u"rx_src_mac")
87     src_ip_in = args.get_arg(u"src_ip_in")
88     src_ip_out = args.get_arg(u"src_ip_out")
89     dst_ip = args.get_arg(u"dst_ip")
90     protocol = args.get_arg(u"protocol")
91     sport_in = int(args.get_arg(u"src_port_in"))
92     try:
93         sport_out = int(args.get_arg(u"src_port_out"))
94     except ValueError:
95         sport_out = None
96     dst_port = int(args.get_arg(u"dst_port"))
97
98     tx_txq = TxQueue(args.get_arg(u"tx_if"))
99     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
100     rx_txq = TxQueue(args.get_arg(u"rx_if"))
101     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
102
103     sent_packets = list()
104     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
105
106     if valid_ipv4(src_ip_in) and valid_ipv4(dst_ip):
107         ip_layer = IP
108     elif valid_ipv6(src_ip_in) and valid_ipv6(dst_ip):
109         ip_layer = IPv6
110     else:
111         raise ValueError(u"IP not in correct format")
112     pkt_raw /= ip_layer(src=src_ip_in, dst=dst_ip)
113
114     if protocol == u"UDP":
115         pkt_raw /= UDP(sport=sport_in, dport=dst_port)
116         proto_layer = UDP
117     elif protocol == u"TCP":
118         # flags=0x2 => SYN flag set
119         pkt_raw /= TCP(sport=sport_in, dport=dst_port, flags=0x2)
120         proto_layer = TCP
121     else:
122         raise ValueError(u"Incorrect protocol")
123
124     pkt_raw /= Raw()
125     sent_packets.append(pkt_raw)
126     tx_txq.send(pkt_raw)
127
128     while True:
129         ether = rx_rxq.recv(2)
130
131         if ether is None:
132             raise RuntimeError(u"IP packet Rx timeout")
133
134         if ether.haslayer(ICMPv6ND_NS):
135             # read another packet in the queue if the current one is ICMPv6ND_NS
136             continue
137         elif ether.haslayer(ICMPv6MLReport2):
138             # read another packet in the queue if the current one is
139             # ICMPv6MLReport2
140             continue
141         elif ether.haslayer(ICMPv6ND_RA):
142             # read another packet in the queue if the current one is
143             # ICMPv6ND_RA
144             continue
145
146         break
147
148     if rx_dst_mac != ether[Ether].dst or rx_src_mac != ether[Ether].src:
149         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
150
151     ip_pkt = ether.payload
152     if not isinstance(ip_pkt, ip_layer):
153         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
154     if ip_pkt.src != src_ip_out:
155         raise RuntimeError(
156             f"Matching Src IP address unsuccessful: "
157             f"{src_ip_out} != {ip_pkt.src}"
158         )
159     if ip_pkt.dst != dst_ip:
160         raise RuntimeError(
161             f"Matching Dst IP address unsuccessful: {dst_ip} != {ip_pkt.dst}"
162         )
163
164     proto_pkt = ip_pkt.payload
165     if not isinstance(proto_pkt, proto_layer):
166         raise RuntimeError(
167             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
168         )
169     if sport_out is not None:
170         if proto_pkt.sport != sport_out:
171             raise RuntimeError(
172                 f"Matching Src {proto_layer!s} port unsuccessful: "
173                 f"{sport_out} != {proto_pkt.sport}"
174             )
175     else:
176         sport_out = proto_pkt.sport
177     if proto_pkt.dport != dst_port:
178         raise RuntimeError(
179             f"Matching Dst {proto_layer!s} port unsuccessful: "
180             f"{dst_port} != {proto_pkt.dport}"
181         )
182     if proto_layer == TCP:
183         if proto_pkt.flags != 0x2:
184             raise RuntimeError(
185                 f"Not a TCP SYN packet received: {proto_pkt!r}"
186             )
187
188     pkt_raw = Ether(src=rx_dst_mac, dst=rx_src_mac)
189     pkt_raw /= ip_layer(src=dst_ip, dst=src_ip_out)
190     pkt_raw /= proto_layer(sport=dst_port, dport=sport_out)
191     if proto_layer == TCP:
192         # flags=0x12 => SYN, ACK flags set
193         pkt_raw[TCP].flags = 0x12
194     pkt_raw /= Raw()
195     rx_txq.send(pkt_raw)
196
197     while True:
198         ether = tx_rxq.recv(2, ignore=sent_packets)
199
200         if ether is None:
201             raise RuntimeError(u"IP packet Rx timeout")
202
203         if ether.haslayer(ICMPv6ND_NS):
204             # read another packet in the queue if the current one is ICMPv6ND_NS
205             continue
206         elif ether.haslayer(ICMPv6MLReport2):
207             # read another packet in the queue if the current one is
208             # ICMPv6MLReport2
209             continue
210         elif ether.haslayer(ICMPv6ND_RA):
211             # read another packet in the queue if the current one is
212             # ICMPv6ND_RA
213             continue
214
215         break
216
217     if ether[Ether].dst != tx_src_mac or ether[Ether].src != tx_dst_mac:
218         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
219
220     ip_pkt = ether.payload
221     if not isinstance(ip_pkt, ip_layer):
222         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
223     if ip_pkt.src != dst_ip:
224         raise RuntimeError(
225             f"Matching Src IP address unsuccessful: {dst_ip} != {ip_pkt.src}"
226         )
227     if ip_pkt.dst != src_ip_in:
228         raise RuntimeError(
229             f"Matching Dst IP address unsuccessful: {src_ip_in} != {ip_pkt.dst}"
230         )
231
232     proto_pkt = ip_pkt.payload
233     if not isinstance(proto_pkt, proto_layer):
234         raise RuntimeError(
235             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
236         )
237     if proto_pkt.sport != dst_port:
238         raise RuntimeError(
239             f"Matching Src {proto_layer!s} port unsuccessful: "
240             f"{dst_port} != {proto_pkt.sport}"
241         )
242     if proto_pkt.dport != sport_in:
243         raise RuntimeError(
244             f"Matching Dst {proto_layer!s} port unsuccessful: "
245             f"{sport_in} != {proto_pkt.dport}"
246         )
247     if proto_layer == TCP:
248         if proto_pkt.flags != 0x12:
249             raise RuntimeError(
250                 f"Not a TCP SYN-ACK packet received: {proto_pkt!r}"
251             )
252
253     sys.exit(0)
254
255
256 if __name__ == u"__main__":
257     main()