3 # Copyright (c) 2020 Cisco and/or its affiliates.
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later; you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
12 # http://www.apache.org/licenses/LICENSE-2.0
13 # https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+. If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
25 """Traffic script for IPsec verification."""
29 from ipaddress import ip_address
30 from scapy.layers.inet import IP
31 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
32 from scapy.layers.ipsec import SecurityAssociation, ESP
33 from scapy.layers.l2 import Ether
34 from scapy.packet import Raw
36 from .PacketVerifier import RxQueue, TxQueue
37 from .TrafficScriptArg import TrafficScriptArg
40 def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
41 """Check received IPsec packet.
43 :param pkt_recv: Received packet to verify.
44 :param ip_layer: Scapy IP layer.
45 :param dst_tun: IPsec tunnel destination address.
46 :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
47 :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
48 :param sa_in: IPsec SA for packet decryption.
49 :type pkt_recv: scapy.Ether
50 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
54 :type sa_in: scapy.layers.ipsec.SecurityAssociation
55 :raises RuntimeError: If received packet is invalid.
57 if not pkt_recv.haslayer(ip_layer):
59 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
62 if pkt_recv[ip_layer].dst != dst_tun:
64 f"Received packet has invalid destination address: "
65 f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
68 if not pkt_recv.haslayer(ESP):
69 raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
71 ip_pkt = pkt_recv[ip_layer]
72 d_pkt = sa_in.decrypt(ip_pkt)
74 if d_pkt[ip_layer].dst != dst_ip:
76 f"Decrypted packet has invalid destination address: "
77 f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
80 if d_pkt[ip_layer].src != src_ip:
82 f"Decrypted packet has invalid source address: "
83 f"{d_pkt[ip_layer].src} should be: {src_ip}"
86 if ip_layer == IP and d_pkt[ip_layer].proto != 61:
88 f"Decrypted packet has invalid IP protocol: "
89 f"{d_pkt[ip_layer].proto} should be: 61"
93 def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
94 """Check received IP/IPv6 packet.
96 :param pkt_recv: Received packet to verify.
97 :param ip_layer: Scapy IP layer.
98 :param src_ip: Source IP/IPv6 address.
99 :param dst_ip: Destination IP/IPv6 address.
100 :type pkt_recv: scapy.Ether
101 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
104 :raises RuntimeError: If received packet is invalid.
106 if not pkt_recv.haslayer(ip_layer):
108 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
111 if pkt_recv[ip_layer].dst != dst_ip:
113 f"Received packet has invalid destination address: "
114 f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
117 if pkt_recv[ip_layer].src != src_ip:
119 f"Received packet has invalid destination address: "
120 f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
123 if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
125 f"Received packet has invalid IP protocol: "
126 f"{pkt_recv[ip_layer].proto} should be: 61"
130 # TODO: Pylint says too-many-locals and too-many-statements. Refactor!
132 """Send and receive IPsec packet."""
134 args = TrafficScriptArg(
136 u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
137 u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
138 u"integ_key", u"l_spi", u"r_spi"
140 [u"src_tun", u"dst_tun"]
143 tx_txq = TxQueue(args.get_arg(u"tx_if"))
144 tx_rxq = RxQueue(args.get_arg(u"tx_if"))
145 rx_txq = TxQueue(args.get_arg(u"rx_if"))
146 rx_rxq = RxQueue(args.get_arg(u"rx_if"))
148 tx_src_mac = args.get_arg(u"tx_src_mac")
149 tx_dst_mac = args.get_arg(u"tx_dst_mac")
150 rx_src_mac = args.get_arg(u"rx_src_mac")
151 rx_dst_mac = args.get_arg(u"rx_dst_mac")
152 src_ip = args.get_arg(u"src_ip")
153 dst_ip = args.get_arg(u"dst_ip")
154 crypto_alg = args.get_arg(u"crypto_alg")
155 crypto_key = args.get_arg(u"crypto_key")
156 integ_alg = args.get_arg(u"integ_alg")
157 integ_key = args.get_arg(u"integ_key")
158 l_spi = int(args.get_arg(u"l_spi"))
159 r_spi = int(args.get_arg(u"r_spi"))
160 src_tun = args.get_arg(u"src_tun")
161 dst_tun = args.get_arg(u"dst_tun")
163 ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
165 tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
167 tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \
170 if not (src_tun and dst_tun):
173 sa_in = SecurityAssociation(
174 ESP, spi=r_spi, crypt_algo=crypto_alg,
175 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
176 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
179 sa_out = SecurityAssociation(
180 ESP, spi=l_spi, crypt_algo=crypto_alg,
181 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
182 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
185 ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
186 else ip_layer(src=src_ip, dst=dst_ip)
187 ip_pkt = ip_layer(ip_pkt)
189 e_pkt = sa_out.encrypt(ip_pkt)
190 tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
193 sent_packets = list()
195 sent_packets.append(tx_pkt_send)
196 tx_txq.send(tx_pkt_send)
199 rx_pkt_recv = rx_rxq.recv(2)
201 if rx_pkt_recv is None:
202 raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
204 if rx_pkt_recv.haslayer(ICMPv6ND_NS):
205 # read another packet in the queue if the current one is ICMPv6ND_NS
208 # otherwise process the current packet
211 check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
213 rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
214 else ip_layer(src=dst_ip, dst=src_ip)
215 rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
219 rx_txq.send(rx_pkt_send)
222 tx_pkt_recv = tx_rxq.recv(2, sent_packets)
224 if tx_pkt_recv is None:
225 raise RuntimeError(u"ESP packet Rx timeout")
227 if tx_pkt_recv.haslayer(ICMPv6ND_NS):
228 # read another packet in the queue if the current one is ICMPv6ND_NS
231 # otherwise process the current packet
234 check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in)
239 if __name__ == u"__main__":