3 # Copyright (c) 2019 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script for IPsec verification."""
21 from ipaddress import ip_address
22 from scapy.layers.inet import IP
23 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
24 from scapy.layers.ipsec import SecurityAssociation, ESP
25 from scapy.layers.l2 import Ether
26 from scapy.packet import Raw
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
32 def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
33 """Check received IPsec packet.
35 :param pkt_recv: Received packet to verify.
36 :param ip_layer: Scapy IP layer.
37 :param dst_tun: IPsec tunnel destination address.
38 :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
39 :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
40 :param sa_in: IPsec SA for packet decryption.
41 :type pkt_recv: scapy.Ether
42 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
46 :type sa_in: scapy.layers.ipsec.SecurityAssociation
47 :raises RuntimeError: If received packet is invalid.
49 if not pkt_recv.haslayer(ip_layer):
51 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
54 if pkt_recv[ip_layer].dst != dst_tun:
56 f"Received packet has invalid destination address: "
57 f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
60 if not pkt_recv.haslayer(ESP):
61 raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
63 ip_pkt = pkt_recv[ip_layer]
64 d_pkt = sa_in.decrypt(ip_pkt)
66 if d_pkt[ip_layer].dst != dst_ip:
68 f"Decrypted packet has invalid destination address: "
69 f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
72 if d_pkt[ip_layer].src != src_ip:
74 f"Decrypted packet has invalid source address: "
75 f"{d_pkt[ip_layer].src} should be: {src_ip}"
78 if ip_layer == IP and d_pkt[ip_layer].proto != 61:
80 f"Decrypted packet has invalid IP protocol: "
81 f"{d_pkt[ip_layer].proto} should be: 61"
85 def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
86 """Check received IP/IPv6 packet.
88 :param pkt_recv: Received packet to verify.
89 :param ip_layer: Scapy IP layer.
90 :param src_ip: Source IP/IPv6 address.
91 :param dst_ip: Destination IP/IPv6 address.
92 :type pkt_recv: scapy.Ether
93 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
96 :raises RuntimeError: If received packet is invalid.
98 if not pkt_recv.haslayer(ip_layer):
100 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
103 if pkt_recv[ip_layer].dst != dst_ip:
105 f"Received packet has invalid destination address: "
106 f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
109 if pkt_recv[ip_layer].src != src_ip:
111 f"Received packet has invalid destination address: "
112 f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
115 if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
117 f"Received packet has invalid IP protocol: "
118 f"{pkt_recv[ip_layer].proto} should be: 61"
122 # TODO: Pylint says too-many-locals and too-many-statements. Refactor!
124 """Send and receive IPsec packet."""
126 args = TrafficScriptArg(
128 u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
129 u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
130 u"integ_key", u"l_spi", u"r_spi"
132 [u"src_tun", u"dst_tun"]
135 tx_txq = TxQueue(args.get_arg(u"tx_if"))
136 tx_rxq = RxQueue(args.get_arg(u"tx_if"))
137 rx_txq = TxQueue(args.get_arg(u"rx_if"))
138 rx_rxq = RxQueue(args.get_arg(u"rx_if"))
140 tx_src_mac = args.get_arg(u"tx_src_mac")
141 tx_dst_mac = args.get_arg(u"tx_dst_mac")
142 rx_src_mac = args.get_arg(u"rx_src_mac")
143 rx_dst_mac = args.get_arg(u"rx_dst_mac")
144 src_ip = args.get_arg(u"src_ip")
145 dst_ip = args.get_arg(u"dst_ip")
146 crypto_alg = args.get_arg(u"crypto_alg")
147 crypto_key = args.get_arg(u"crypto_key")
148 integ_alg = args.get_arg(u"integ_alg")
149 integ_key = args.get_arg(u"integ_key")
150 l_spi = int(args.get_arg(u"l_spi"))
151 r_spi = int(args.get_arg(u"r_spi"))
152 src_tun = args.get_arg(u"src_tun")
153 dst_tun = args.get_arg(u"dst_tun")
155 ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
157 tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
159 tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \
162 if not (src_tun and dst_tun):
165 sa_in = SecurityAssociation(
166 ESP, spi=r_spi, crypt_algo=crypto_alg,
167 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
168 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
171 sa_out = SecurityAssociation(
172 ESP, spi=l_spi, crypt_algo=crypto_alg,
173 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
174 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
177 ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
178 else ip_layer(src=src_ip, dst=dst_ip)
179 ip_pkt = ip_layer(ip_pkt)
181 e_pkt = sa_out.encrypt(ip_pkt)
182 tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
185 sent_packets = list()
187 sent_packets.append(tx_pkt_send)
188 tx_txq.send(tx_pkt_send)
191 rx_pkt_recv = rx_rxq.recv(2)
193 if rx_pkt_recv is None:
194 raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
196 if rx_pkt_recv.haslayer(ICMPv6ND_NS):
197 # read another packet in the queue if the current one is ICMPv6ND_NS
200 # otherwise process the current packet
203 check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
205 rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
206 else ip_layer(src=dst_ip, dst=src_ip)
207 rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
211 rx_txq.send(rx_pkt_send)
214 tx_pkt_recv = tx_rxq.recv(2, sent_packets)
216 if tx_pkt_recv is None:
217 raise RuntimeError(u"ESP packet Rx timeout")
219 if tx_pkt_recv.haslayer(ICMPv6ND_NS):
220 # read another packet in the queue if the current one is ICMPv6ND_NS
223 # otherwise process the current packet
226 check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in)
231 if __name__ == u"__main__":