3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script for NAT verification."""
22 from scapy.layers.inet import IP, TCP, UDP
23 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
24 from scapy.layers.l2 import Ether
25 from scapy.packet import Raw
27 from .PacketVerifier import RxQueue, TxQueue
28 from .TrafficScriptArg import TrafficScriptArg
33 ipaddress.IPv4Address(ip)
35 except (AttributeError, ipaddress.AddressValueError):
41 ipaddress.IPv6Address(ip)
43 except (AttributeError, ipaddress.AddressValueError):
48 """Send, receive and check IP/IPv6 packets with UDP/TCP layer passing
51 args = TrafficScriptArg(
53 u"tx_src_mac", u"rx_dst_mac", u"src_ip_in", u"src_ip_out",
54 u"dst_ip", u"tx_dst_mac", u"rx_src_mac", u"protocol",
55 u"src_port_in", u"src_port_out", u"dst_port"
59 tx_src_mac = args.get_arg(u"tx_src_mac")
60 tx_dst_mac = args.get_arg(u"tx_dst_mac")
61 rx_dst_mac = args.get_arg(u"rx_dst_mac")
62 rx_src_mac = args.get_arg(u"rx_src_mac")
63 src_ip_in = args.get_arg(u"src_ip_in")
64 src_ip_out = args.get_arg(u"src_ip_out")
65 dst_ip = args.get_arg(u"dst_ip")
66 protocol = args.get_arg(u"protocol")
67 sport_in = int(args.get_arg(u"src_port_in"))
69 sport_out = int(args.get_arg(u"src_port_out"))
72 dst_port = int(args.get_arg(u"dst_port"))
74 tx_txq = TxQueue(args.get_arg(u"tx_if"))
75 tx_rxq = RxQueue(args.get_arg(u"tx_if"))
76 rx_txq = TxQueue(args.get_arg(u"rx_if"))
77 rx_rxq = RxQueue(args.get_arg(u"rx_if"))
80 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
82 if valid_ipv4(src_ip_in) and valid_ipv4(dst_ip):
84 elif valid_ipv6(src_ip_in) and valid_ipv6(dst_ip):
87 raise ValueError(u"IP not in correct format")
88 pkt_raw /= ip_layer(src=src_ip_in, dst=dst_ip)
90 if protocol == u"UDP":
91 pkt_raw /= UDP(sport=sport_in, dport=dst_port)
93 elif protocol == u"TCP":
94 # flags=0x2 => SYN flag set
95 pkt_raw /= TCP(sport=sport_in, dport=dst_port, flags=0x2)
98 raise ValueError(u"Incorrect protocol")
101 sent_packets.append(pkt_raw)
105 ether = rx_rxq.recv(2)
108 raise RuntimeError(u"IP packet Rx timeout")
110 if ether.haslayer(ICMPv6ND_NS):
111 # read another packet in the queue if the current one is ICMPv6ND_NS
114 # otherwise process the current packet
117 if rx_dst_mac != ether[Ether].dst or rx_src_mac != ether[Ether].src:
118 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
120 ip_pkt = ether.payload
121 if not isinstance(ip_pkt, ip_layer):
122 raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
123 if ip_pkt.src != src_ip_out:
125 f"Matching Src IP address unsuccessful: "
126 f"{src_ip_out} != {ip_pkt.src}"
128 if ip_pkt.dst != dst_ip:
130 f"Matching Dst IP address unsuccessful: {dst_ip} != {ip_pkt.dst}"
133 proto_pkt = ip_pkt.payload
134 if not isinstance(proto_pkt, proto_layer):
136 f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
138 if sport_out is not None:
139 if proto_pkt.sport != sport_out:
141 f"Matching Src {proto_layer!s} port unsuccessful: "
142 f"{sport_out} != {proto_pkt.sport}"
145 sport_out = proto_pkt.sport
146 if proto_pkt.dport != dst_port:
148 f"Matching Dst {proto_layer!s} port unsuccessful: "
149 f"{dst_port} != {proto_pkt.dport}"
151 if proto_layer == TCP:
152 if proto_pkt.flags != 0x2:
154 f"Not a TCP SYN packet received: {proto_pkt!r}"
157 pkt_raw = Ether(src=rx_dst_mac, dst=rx_src_mac)
158 pkt_raw /= ip_layer(src=dst_ip, dst=src_ip_out)
159 pkt_raw /= proto_layer(sport=dst_port, dport=sport_out)
160 if proto_layer == TCP:
161 # flags=0x12 => SYN, ACK flags set
162 pkt_raw[TCP].flags = 0x12
167 ether = tx_rxq.recv(2, ignore=sent_packets)
170 raise RuntimeError(u"IP packet Rx timeout")
172 if ether.haslayer(ICMPv6ND_NS):
173 # read another packet in the queue if the current one is ICMPv6ND_NS
176 # otherwise process the current packet
179 if ether[Ether].dst != tx_src_mac or ether[Ether].src != tx_dst_mac:
180 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
182 ip_pkt = ether.payload
183 if not isinstance(ip_pkt, ip_layer):
184 raise RuntimeError(f"Not an {ip_layer!s} packet received: {ip_pkt!r}")
185 if ip_pkt.src != dst_ip:
187 f"Matching Src IP address unsuccessful: {dst_ip} != {ip_pkt.src}"
189 if ip_pkt.dst != src_ip_in:
191 f"Matching Dst IP address unsuccessful: {src_ip_in} != {ip_pkt.dst}"
194 proto_pkt = ip_pkt.payload
195 if not isinstance(proto_pkt, proto_layer):
197 f"Not a {proto_layer!s} packet received: {proto_pkt!r}"
199 if proto_pkt.sport != dst_port:
201 f"Matching Src {proto_layer!s} port unsuccessful: "
202 f"{dst_port} != {proto_pkt.sport}"
204 if proto_pkt.dport != sport_in:
206 f"Matching Dst {proto_layer!s} port unsuccessful: "
207 f"{sport_in} != {proto_pkt.dport}"
209 if proto_layer == TCP:
210 if proto_pkt.flags != 0x12:
212 f"Not a TCP SYN-ACK packet received: {proto_pkt!r}"
218 if __name__ == u"__main__":