3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script for IPsec verification."""
20 from ipaddress import ip_address
21 from scapy.layers.inet import IP
22 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
23 from scapy.layers.ipsec import SecurityAssociation, ESP
24 from scapy.layers.l2 import Ether
25 from scapy.packet import Raw
27 from .PacketVerifier import RxQueue, TxQueue
28 from .TrafficScriptArg import TrafficScriptArg
31 def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
32 """Check received IPsec packet.
34 :param pkt_recv: Received packet to verify.
35 :param ip_layer: Scapy IP layer.
36 :param dst_tun: IPsec tunnel destination address.
37 :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
38 :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
39 :param sa_in: IPsec SA for packet decryption.
40 :type pkt_recv: scapy.Ether
41 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
45 :type sa_in: scapy.layers.ipsec.SecurityAssociation
46 :raises RuntimeError: If received packet is invalid.
48 if not pkt_recv.haslayer(ip_layer):
50 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
53 if pkt_recv[ip_layer].dst != dst_tun:
55 f"Received packet has invalid destination address: "
56 f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
59 if not pkt_recv.haslayer(ESP):
60 raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
62 ip_pkt = pkt_recv[ip_layer]
63 d_pkt = sa_in.decrypt(ip_pkt)
65 if d_pkt[ip_layer].dst != dst_ip:
67 f"Decrypted packet has invalid destination address: "
68 f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
71 if d_pkt[ip_layer].src != src_ip:
73 f"Decrypted packet has invalid source address: "
74 f"{d_pkt[ip_layer].src} should be: {src_ip}"
77 if ip_layer == IP and d_pkt[ip_layer].proto != 61:
79 f"Decrypted packet has invalid IP protocol: "
80 f"{d_pkt[ip_layer].proto} should be: 61"
84 def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
85 """Check received IP/IPv6 packet.
87 :param pkt_recv: Received packet to verify.
88 :param ip_layer: Scapy IP layer.
89 :param src_ip: Source IP/IPv6 address.
90 :param dst_ip: Destination IP/IPv6 address.
91 :type pkt_recv: scapy.Ether
92 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
95 :raises RuntimeError: If received packet is invalid.
97 if not pkt_recv.haslayer(ip_layer):
99 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
102 if pkt_recv[ip_layer].dst != dst_ip:
104 f"Received packet has invalid destination address: "
105 f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
108 if pkt_recv[ip_layer].src != src_ip:
110 f"Received packet has invalid destination address: "
111 f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
114 if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
116 f"Received packet has invalid IP protocol: "
117 f"{pkt_recv[ip_layer].proto} should be: 61"
121 # TODO: Pylint says too-many-locals and too-many-statements. Refactor!
123 """Send and receive IPsec packet."""
125 args = TrafficScriptArg(
127 u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
128 u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
129 u"integ_key", u"l_spi", u"r_spi"
131 [u"src_tun", u"dst_tun"]
134 tx_txq = TxQueue(args.get_arg(u"tx_if"))
135 tx_rxq = RxQueue(args.get_arg(u"tx_if"))
136 rx_txq = TxQueue(args.get_arg(u"rx_if"))
137 rx_rxq = RxQueue(args.get_arg(u"rx_if"))
139 tx_src_mac = args.get_arg(u"tx_src_mac")
140 tx_dst_mac = args.get_arg(u"tx_dst_mac")
141 rx_src_mac = args.get_arg(u"rx_src_mac")
142 rx_dst_mac = args.get_arg(u"rx_dst_mac")
143 src_ip = args.get_arg(u"src_ip")
144 dst_ip = args.get_arg(u"dst_ip")
145 crypto_alg = args.get_arg(u"crypto_alg")
146 crypto_key = args.get_arg(u"crypto_key")
147 integ_alg = args.get_arg(u"integ_alg")
148 integ_key = args.get_arg(u"integ_key")
149 l_spi = int(args.get_arg(u"l_spi"))
150 r_spi = int(args.get_arg(u"r_spi"))
151 src_tun = args.get_arg(u"src_tun")
152 dst_tun = args.get_arg(u"dst_tun")
154 ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
156 tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
158 tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \
161 if not (src_tun and dst_tun):
164 sa_in = SecurityAssociation(
165 ESP, spi=r_spi, crypt_algo=crypto_alg,
166 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
167 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
170 sa_out = SecurityAssociation(
171 ESP, spi=l_spi, crypt_algo=crypto_alg,
172 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
173 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
176 ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
177 else ip_layer(src=src_ip, dst=dst_ip)
178 ip_pkt = ip_layer(ip_pkt)
180 e_pkt = sa_out.encrypt(ip_pkt)
181 tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
184 sent_packets = list()
186 sent_packets.append(tx_pkt_send)
187 tx_txq.send(tx_pkt_send)
190 rx_pkt_recv = rx_rxq.recv(2)
192 if rx_pkt_recv is None:
193 raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
195 if rx_pkt_recv.haslayer(ICMPv6ND_NS):
196 # read another packet in the queue if the current one is ICMPv6ND_NS
199 # otherwise process the current packet
202 check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
204 rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
205 else ip_layer(src=dst_ip, dst=src_ip)
206 rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
210 rx_txq.send(rx_pkt_send)
213 tx_pkt_recv = tx_rxq.recv(2, sent_packets)
215 if tx_pkt_recv is None:
216 raise RuntimeError(u"ESP packet Rx timeout")
218 if tx_pkt_recv.haslayer(ICMPv6ND_NS):
219 # read another packet in the queue if the current one is ICMPv6ND_NS
222 # otherwise process the current packet
225 check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in)
230 if __name__ == u"__main__":