FIX: vpp_device ignore packets list
[csit.git] / GPL / traffic_scripts / nat.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script for NAT verification."""
27
28 import sys
29
30 import ipaddress
31
32 from scapy.layers.inet import IP, TCP, UDP
33 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
34 from scapy.layers.l2 import Ether
35 from scapy.packet import Raw
36
37 from .PacketVerifier import RxQueue, TxQueue
38 from .TrafficScriptArg import TrafficScriptArg
39
40
41 def valid_ipv4(ip_address):
42     """Check IPv4 address.
43
44     :param ip_address: IPv4 address to check.
45     :type ip_address: str
46     :returns: True if IP address is correct.
47     :rtype: bool
48     :raises AttributeError, AddressValueError: If IP address is not valid.
49     """
50     try:
51         ipaddress.IPv4Address(ip_address)
52         return True
53     except (AttributeError, ipaddress.AddressValueError):
54         return False
55
56
57 def valid_ipv6(ip_address):
58     """Check IPv6 address.
59
60     :param ip_address: IPv6 address to check.
61     :type ip_address: str
62     :returns: True if IP address is correct.
63     :rtype: bool
64     :raises AttributeError, AddressValueError: If IP address is not valid.
65     """
66     try:
67         ipaddress.IPv6Address(ip_address)
68         return True
69     except (AttributeError, ipaddress.AddressValueError):
70         return False
71
72
73 def main():
74     """Send, receive and check IP/IPv6 packets with UDP/TCP layer passing
75     through NAT.
76     """
77     args = TrafficScriptArg(
78         [
79             u"tx_src_mac", u"rx_dst_mac", u"src_ip_in", u"src_ip_out",
80             u"dst_ip", u"tx_dst_mac", u"rx_src_mac", u"protocol",
81             u"src_port_in", u"src_port_out", u"dst_port"
82         ]
83     )
84
85     tx_src_mac = args.get_arg(u"tx_src_mac")
86     tx_dst_mac = args.get_arg(u"tx_dst_mac")
87     rx_dst_mac = args.get_arg(u"rx_dst_mac")
88     rx_src_mac = args.get_arg(u"rx_src_mac")
89     src_ip_in = args.get_arg(u"src_ip_in")
90     src_ip_out = args.get_arg(u"src_ip_out")
91     dst_ip = args.get_arg(u"dst_ip")
92     protocol = args.get_arg(u"protocol")
93     sport_in = int(args.get_arg(u"src_port_in"))
94     try:
95         sport_out = int(args.get_arg(u"src_port_out"))
96     except ValueError:
97         sport_out = None
98     dst_port = int(args.get_arg(u"dst_port"))
99
100     tx_txq = TxQueue(args.get_arg(u"tx_if"))
101     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
102     rx_txq = TxQueue(args.get_arg(u"rx_if"))
103     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
104
105     sent_packets = list()
106     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
107
108     if valid_ipv4(src_ip_in) and valid_ipv4(dst_ip):
109         ip_layer = IP
110     elif valid_ipv6(src_ip_in) and valid_ipv6(dst_ip):
111         ip_layer = IPv6
112     else:
113         raise ValueError(u"IP not in correct format")
114     pkt_raw /= ip_layer(src=src_ip_in, dst=dst_ip)
115
116     if protocol == u"UDP":
117         pkt_raw /= UDP(sport=sport_in, dport=dst_port)
118         proto_layer = UDP
119     elif protocol == u"TCP":
120         # flags=0x2 => SYN flag set
121         pkt_raw /= TCP(sport=sport_in, dport=dst_port, flags=0x2)
122         proto_layer = TCP
123     else:
124         raise ValueError(u"Incorrect protocol")
125
126     pkt_raw /= Raw()
127     sent_packets.append(pkt_raw)
128     tx_txq.send(pkt_raw)
129
130     while True:
131         ether = rx_rxq.recv(2)
132
133         if ether is None:
134             raise RuntimeError(u"IP packet Rx timeout")
135
136         if ether.haslayer(ICMPv6ND_NS):
137             # read another packet in the queue if the current one is ICMPv6ND_NS
138             continue
139         elif ether.haslayer(ICMPv6MLReport2):
140             # read another packet in the queue if the current one is
141             # ICMPv6MLReport2
142             continue
143         elif ether.haslayer(ICMPv6ND_RA):
144             # read another packet in the queue if the current one is
145             # ICMPv6ND_RA
146             continue
147
148         break
149
150     if rx_dst_mac != ether[Ether].dst or rx_src_mac != ether[Ether].src:
151         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
152
153     ip_pkt = ether.payload
154     if not isinstance(ip_pkt, ip_layer):
155         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
156     if ip_pkt.src != src_ip_out:
157         raise RuntimeError(
158             f"Matching Src IP address unsuccessful: "
159             f"{src_ip_out} != {ip_pkt.src}"
160         )
161     if ip_pkt.dst != dst_ip:
162         raise RuntimeError(
163             f"Matching Dst IP address unsuccessful: {dst_ip} != {ip_pkt.dst}"
164         )
165
166     proto_pkt = ip_pkt.payload
167     if not isinstance(proto_pkt, proto_layer):
168         raise RuntimeError(
169             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
170         )
171     if sport_out is not None:
172         if proto_pkt.sport != sport_out:
173             raise RuntimeError(
174                 f"Matching Src {proto_layer!s} port unsuccessful: "
175                 f"{sport_out} != {proto_pkt.sport}"
176             )
177     else:
178         sport_out = proto_pkt.sport
179     if proto_pkt.dport != dst_port:
180         raise RuntimeError(
181             f"Matching Dst {proto_layer!s} port unsuccessful: "
182             f"{dst_port} != {proto_pkt.dport}"
183         )
184     if proto_layer == TCP:
185         if proto_pkt.flags != 0x2:
186             raise RuntimeError(
187                 f"Not a TCP SYN packet received: {proto_pkt!r}"
188             )
189
190     pkt_raw = Ether(src=rx_dst_mac, dst=rx_src_mac)
191     pkt_raw /= ip_layer(src=dst_ip, dst=src_ip_out)
192     pkt_raw /= proto_layer(sport=dst_port, dport=sport_out)
193     if proto_layer == TCP:
194         # flags=0x12 => SYN, ACK flags set
195         pkt_raw[TCP].flags = 0x12
196     pkt_raw /= Raw()
197     rx_txq.send(pkt_raw)
198
199     while True:
200         ether = tx_rxq.recv(2, ignore=sent_packets)
201
202         if ether is None:
203             raise RuntimeError(u"IP packet Rx timeout")
204
205         if ether.haslayer(ICMPv6ND_NS):
206             # read another packet in the queue if the current one is ICMPv6ND_NS
207             continue
208         elif ether.haslayer(ICMPv6MLReport2):
209             # read another packet in the queue if the current one is
210             # ICMPv6MLReport2
211             continue
212         elif ether.haslayer(ICMPv6ND_RA):
213             # read another packet in the queue if the current one is
214             # ICMPv6ND_RA
215             continue
216
217         break
218
219     if ether[Ether].dst != tx_src_mac or ether[Ether].src != tx_dst_mac:
220         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
221
222     ip_pkt = ether.payload
223     if not isinstance(ip_pkt, ip_layer):
224         raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
225     if ip_pkt.src != dst_ip:
226         raise RuntimeError(
227             f"Matching Src IP address unsuccessful: {dst_ip} != {ip_pkt.src}"
228         )
229     if ip_pkt.dst != src_ip_in:
230         raise RuntimeError(
231             f"Matching Dst IP address unsuccessful: {src_ip_in} != {ip_pkt.dst}"
232         )
233
234     proto_pkt = ip_pkt.payload
235     if not isinstance(proto_pkt, proto_layer):
236         raise RuntimeError(
237             f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
238         )
239     if proto_pkt.sport != dst_port:
240         raise RuntimeError(
241             f"Matching Src {proto_layer!s} port unsuccessful: "
242             f"{dst_port} != {proto_pkt.sport}"
243         )
244     if proto_pkt.dport != sport_in:
245         raise RuntimeError(
246             f"Matching Dst {proto_layer!s} port unsuccessful: "
247             f"{sport_in} != {proto_pkt.dport}"
248         )
249     if proto_layer == TCP:
250         if proto_pkt.flags != 0x12:
251             raise RuntimeError(
252                 f"Not a TCP SYN-ACK packet received: {proto_pkt!r}"
253             )
254
255     sys.exit(0)
256
257
258 if __name__ == u"__main__":
259     main()