0ba9f4aefc977c6b0bf72cad05e2b92aeea0eea7
[csit.git] / GPL / traffic_scripts / nat.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script for NAT verification."""
17
18 import sys
19
20 import ipaddress
21
22 from scapy.layers.inet import IP, TCP, UDP
23 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
24 from scapy.layers.l2 import Ether
25 from scapy.packet import Raw
26
27 from .PacketVerifier import RxQueue, TxQueue
28 from .TrafficScriptArg import TrafficScriptArg
29
30
31 def valid_ipv4(ip):
32     try:
33         ipaddress.IPv4Address(ip)
34         return True
35     except (AttributeError, ipaddress.AddressValueError):
36         return False
37
38
39 def valid_ipv6(ip):
40     try:
41         ipaddress.IPv6Address(ip)
42         return True
43     except (AttributeError, ipaddress.AddressValueError):
44         return False
45
46
47 def main():
48     """Send, receive and check IP/IPv6 packets with UDP/TCP layer passing
49     through NAT.
50     """
51     args = TrafficScriptArg(
52         [
53             u"tx_src_mac", u"rx_dst_mac", u"src_ip_in", u"src_ip_out",
54             u"dst_ip", u"tx_dst_mac", u"rx_src_mac", u"protocol",
55             u"src_port_in", u"src_port_out", u"dst_port"
56         ]
57     )
58
59     tx_src_mac = args.get_arg(u"tx_src_mac")
60     tx_dst_mac = args.get_arg(u"tx_dst_mac")
61     rx_dst_mac = args.get_arg(u"rx_dst_mac")
62     rx_src_mac = args.get_arg(u"rx_src_mac")
63     src_ip_in = args.get_arg(u"src_ip_in")
64     src_ip_out = args.get_arg(u"src_ip_out")
65     dst_ip = args.get_arg(u"dst_ip")
66     protocol = args.get_arg(u"protocol")
67     sport_in = int(args.get_arg(u"src_port_in"))
68     try:
69         sport_out = int(args.get_arg(u"src_port_out"))
70     except ValueError:
71         sport_out = None
72     dst_port = int(args.get_arg(u"dst_port"))
73
74     tx_txq = TxQueue(args.get_arg(u"tx_if"))
75     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
76     rx_txq = TxQueue(args.get_arg(u"rx_if"))
77     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
78
79     sent_packets = list()
80     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
81
82     if valid_ipv4(src_ip_in) and valid_ipv4(dst_ip):
83         ip_layer = IP
84     elif valid_ipv6(src_ip_in) and valid_ipv6(dst_ip):
85         ip_layer = IPv6
86     else:
87         raise ValueError(u"IP not in correct format")
88     pkt_raw /= ip_layer(src=src_ip_in, dst=dst_ip)
89
90     if protocol == u"UDP":
91         pkt_raw /= UDP(sport=sport_in, dport=dst_port)
92         proto_layer = UDP
93     elif protocol == u"TCP":
94         # flags=0x2 => SYN flag set
95         pkt_raw /= TCP(sport=sport_in, dport=dst_port, flags=0x2)
96         proto_layer = TCP
97     else:
98         raise ValueError(u"Incorrect protocol")
99
100     pkt_raw /= Raw()
101     sent_packets.append(pkt_raw)
102     tx_txq.send(pkt_raw)
103
104     while True:
105         ether = rx_rxq.recv(2)
106
107         if ether is None:
108             raise RuntimeError(u"IP packet Rx timeout")
109
110         if ether.haslayer(ICMPv6ND_NS):
111             # read another packet in the queue if the current one is ICMPv6ND_NS
112             continue
113         else:
114             # otherwise process the current packet
115             break
116
117     if rx_dst_mac != ether[Ether].dst or rx_src_mac != ether[Ether].src:
118         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
119
120     ip_pkt = ether.payload
121     if not isinstance(ip_pkt, ip_layer):
122         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
123     if ip_pkt.src != src_ip_out:
124         raise RuntimeError(
125             f"Matching Src IP address unsuccessful: "
126             f"{src_ip_out} != {ip_pkt.src}"
127         )
128     if ip_pkt.dst != dst_ip:
129         raise RuntimeError(
130             f"Matching Dst IP address unsuccessful: {dst_ip} != {ip_pkt.dst}"
131         )
132
133     proto_pkt = ip_pkt.payload
134     if not isinstance(proto_pkt, proto_layer):
135         raise RuntimeError(
136             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
137         )
138     if sport_out is not None:
139         if proto_pkt.sport != sport_out:
140             raise RuntimeError(
141                 f"Matching Src {proto_layer!s} port unsuccessful: "
142                 f"{sport_out} != {proto_pkt.sport}"
143             )
144     else:
145         sport_out = proto_pkt.sport
146     if proto_pkt.dport != dst_port:
147         raise RuntimeError(
148             f"Matching Dst {proto_layer!s} port unsuccessful: "
149             f"{dst_port} != {proto_pkt.dport}"
150         )
151     if proto_layer == TCP:
152         if proto_pkt.flags != 0x2:
153             raise RuntimeError(
154                 f"Not a TCP SYN packet received: {proto_pkt!r}"
155             )
156
157     pkt_raw = Ether(src=rx_dst_mac, dst=rx_src_mac)
158     pkt_raw /= ip_layer(src=dst_ip, dst=src_ip_out)
159     pkt_raw /= proto_layer(sport=dst_port, dport=sport_out)
160     if proto_layer == TCP:
161         # flags=0x12 => SYN, ACK flags set
162         pkt_raw[TCP].flags = 0x12
163     pkt_raw /= Raw()
164     rx_txq.send(pkt_raw)
165
166     while True:
167         ether = tx_rxq.recv(2, ignore=sent_packets)
168
169         if ether is None:
170             raise RuntimeError(u"IP packet Rx timeout")
171
172         if ether.haslayer(ICMPv6ND_NS):
173             # read another packet in the queue if the current one is ICMPv6ND_NS
174             continue
175         else:
176             # otherwise process the current packet
177             break
178
179     if ether[Ether].dst != tx_src_mac or ether[Ether].src != tx_dst_mac:
180         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
181
182     ip_pkt = ether.payload
183     if not isinstance(ip_pkt, ip_layer):
184         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
185     if ip_pkt.src != dst_ip:
186         raise RuntimeError(
187             f"Matching Src IP address unsuccessful: {dst_ip} != {ip_pkt.src}"
188         )
189     if ip_pkt.dst != src_ip_in:
190         raise RuntimeError(
191             f"Matching Dst IP address unsuccessful: {src_ip_in} != {ip_pkt.dst}"
192         )
193
194     proto_pkt = ip_pkt.payload
195     if not isinstance(proto_pkt, proto_layer):
196         raise RuntimeError(
197             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
198         )
199     if proto_pkt.sport != dst_port:
200         raise RuntimeError(
201             f"Matching Src {proto_layer!s} port unsuccessful: "
202             f"{dst_port} != {proto_pkt.sport}"
203         )
204     if proto_pkt.dport != sport_in:
205         raise RuntimeError(
206             f"Matching Dst {proto_layer!s} port unsuccessful: "
207             f"{sport_in} != {proto_pkt.dport}"
208         )
209     if proto_layer == TCP:
210         if proto_pkt.flags != 0x12:
211             raise RuntimeError(
212                 f"Not a TCP SYN-ACK packet received: {proto_pkt!r}"
213             )
214
215     sys.exit(0)
216
217
218 if __name__ == u"__main__":
219     main()