FIX: Pylint reduce
[csit.git] / GPL / traffic_scripts / nat.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script for NAT verification."""
27
28 import sys
29
30 import ipaddress
31
32 from scapy.layers.inet import IP, TCP, UDP
33 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2
34 from scapy.layers.l2 import Ether
35 from scapy.packet import Raw
36
37 from .PacketVerifier import RxQueue, TxQueue
38 from .TrafficScriptArg import TrafficScriptArg
39
40
41 def valid_ipv4(ip_address):
42     """Check IPv4 address.
43
44     :param ip_address: IPv4 address to check.
45     :type ip_address: str
46     :returns: True if IP address is correct.
47     :rtype: bool
48     :raises AttributeError, AddressValueError: If IP address is not valid.
49     """
50     try:
51         ipaddress.IPv4Address(ip_address)
52         return True
53     except (AttributeError, ipaddress.AddressValueError):
54         return False
55
56
57 def valid_ipv6(ip_address):
58     """Check IPv6 address.
59
60     :param ip_address: IPv6 address to check.
61     :type ip_address: str
62     :returns: True if IP address is correct.
63     :rtype: bool
64     :raises AttributeError, AddressValueError: If IP address is not valid.
65     """
66     try:
67         ipaddress.IPv6Address(ip_address)
68         return True
69     except (AttributeError, ipaddress.AddressValueError):
70         return False
71
72
73 def main():
74     """Send, receive and check IP/IPv6 packets with UDP/TCP layer passing
75     through NAT.
76     """
77     args = TrafficScriptArg(
78         [
79             u"tx_src_mac", u"rx_dst_mac", u"src_ip_in", u"src_ip_out",
80             u"dst_ip", u"tx_dst_mac", u"rx_src_mac", u"protocol",
81             u"src_port_in", u"src_port_out", u"dst_port"
82         ]
83     )
84
85     tx_src_mac = args.get_arg(u"tx_src_mac")
86     tx_dst_mac = args.get_arg(u"tx_dst_mac")
87     rx_dst_mac = args.get_arg(u"rx_dst_mac")
88     rx_src_mac = args.get_arg(u"rx_src_mac")
89     src_ip_in = args.get_arg(u"src_ip_in")
90     src_ip_out = args.get_arg(u"src_ip_out")
91     dst_ip = args.get_arg(u"dst_ip")
92     protocol = args.get_arg(u"protocol")
93     sport_in = int(args.get_arg(u"src_port_in"))
94     try:
95         sport_out = int(args.get_arg(u"src_port_out"))
96     except ValueError:
97         sport_out = None
98     dst_port = int(args.get_arg(u"dst_port"))
99
100     tx_txq = TxQueue(args.get_arg(u"tx_if"))
101     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
102     rx_txq = TxQueue(args.get_arg(u"rx_if"))
103     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
104
105     sent_packets = list()
106     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
107
108     if valid_ipv4(src_ip_in) and valid_ipv4(dst_ip):
109         ip_layer = IP
110     elif valid_ipv6(src_ip_in) and valid_ipv6(dst_ip):
111         ip_layer = IPv6
112     else:
113         raise ValueError(u"IP not in correct format")
114     pkt_raw /= ip_layer(src=src_ip_in, dst=dst_ip)
115
116     if protocol == u"UDP":
117         pkt_raw /= UDP(sport=sport_in, dport=dst_port)
118         proto_layer = UDP
119     elif protocol == u"TCP":
120         # flags=0x2 => SYN flag set
121         pkt_raw /= TCP(sport=sport_in, dport=dst_port, flags=0x2)
122         proto_layer = TCP
123     else:
124         raise ValueError(u"Incorrect protocol")
125
126     pkt_raw /= Raw()
127     sent_packets.append(pkt_raw)
128     tx_txq.send(pkt_raw)
129
130     while True:
131         ether = rx_rxq.recv(2)
132
133         if ether is None:
134             raise RuntimeError(u"IP packet Rx timeout")
135
136         if ether.haslayer(ICMPv6ND_NS):
137             # read another packet in the queue if the current one is ICMPv6ND_NS
138             continue
139         else:
140             # otherwise process the current packet
141             break
142
143     if rx_dst_mac != ether[Ether].dst or rx_src_mac != ether[Ether].src:
144         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
145
146     ip_pkt = ether.payload
147     if not isinstance(ip_pkt, ip_layer):
148         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
149     if ip_pkt.src != src_ip_out:
150         raise RuntimeError(
151             f"Matching Src IP address unsuccessful: "
152             f"{src_ip_out} != {ip_pkt.src}"
153         )
154     if ip_pkt.dst != dst_ip:
155         raise RuntimeError(
156             f"Matching Dst IP address unsuccessful: {dst_ip} != {ip_pkt.dst}"
157         )
158
159     proto_pkt = ip_pkt.payload
160     if not isinstance(proto_pkt, proto_layer):
161         raise RuntimeError(
162             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
163         )
164     if sport_out is not None:
165         if proto_pkt.sport != sport_out:
166             raise RuntimeError(
167                 f"Matching Src {proto_layer!s} port unsuccessful: "
168                 f"{sport_out} != {proto_pkt.sport}"
169             )
170     else:
171         sport_out = proto_pkt.sport
172     if proto_pkt.dport != dst_port:
173         raise RuntimeError(
174             f"Matching Dst {proto_layer!s} port unsuccessful: "
175             f"{dst_port} != {proto_pkt.dport}"
176         )
177     if proto_layer == TCP:
178         if proto_pkt.flags != 0x2:
179             raise RuntimeError(
180                 f"Not a TCP SYN packet received: {proto_pkt!r}"
181             )
182
183     pkt_raw = Ether(src=rx_dst_mac, dst=rx_src_mac)
184     pkt_raw /= ip_layer(src=dst_ip, dst=src_ip_out)
185     pkt_raw /= proto_layer(sport=dst_port, dport=sport_out)
186     if proto_layer == TCP:
187         # flags=0x12 => SYN, ACK flags set
188         pkt_raw[TCP].flags = 0x12
189     pkt_raw /= Raw()
190     rx_txq.send(pkt_raw)
191
192     while True:
193         ether = tx_rxq.recv(2, ignore=sent_packets)
194
195         if ether is None:
196             raise RuntimeError(u"IP packet Rx timeout")
197
198         if ether.haslayer(ICMPv6ND_NS):
199             # read another packet in the queue if the current one is ICMPv6ND_NS
200             continue
201         elif ether.haslayer(ICMPv6MLReport2):
202             # read another packet in the queue if the current one is
203             # ICMPv6MLReport2
204             continue
205         else:
206             # otherwise process the current packet
207             break
208
209     if ether[Ether].dst != tx_src_mac or ether[Ether].src != tx_dst_mac:
210         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
211
212     ip_pkt = ether.payload
213     if not isinstance(ip_pkt, ip_layer):
214         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
215     if ip_pkt.src != dst_ip:
216         raise RuntimeError(
217             f"Matching Src IP address unsuccessful: {dst_ip} != {ip_pkt.src}"
218         )
219     if ip_pkt.dst != src_ip_in:
220         raise RuntimeError(
221             f"Matching Dst IP address unsuccessful: {src_ip_in} != {ip_pkt.dst}"
222         )
223
224     proto_pkt = ip_pkt.payload
225     if not isinstance(proto_pkt, proto_layer):
226         raise RuntimeError(
227             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
228         )
229     if proto_pkt.sport != dst_port:
230         raise RuntimeError(
231             f"Matching Src {proto_layer!s} port unsuccessful: "
232             f"{dst_port} != {proto_pkt.sport}"
233         )
234     if proto_pkt.dport != sport_in:
235         raise RuntimeError(
236             f"Matching Dst {proto_layer!s} port unsuccessful: "
237             f"{sport_in} != {proto_pkt.dport}"
238         )
239     if proto_layer == TCP:
240         if proto_pkt.flags != 0x12:
241             raise RuntimeError(
242                 f"Not a TCP SYN-ACK packet received: {proto_pkt!r}"
243             )
244
245     sys.exit(0)
246
247
248 if __name__ == u"__main__":
249     main()