3 # Copyright (c) 2019 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script for IPsec verification."""
21 from ipaddress import ip_address
22 # pylint: disable=no-name-in-module
23 # pylint: disable=import-error
24 logging.getLogger(u"scapy.runtime").setLevel(logging.ERROR)
25 from scapy.layers.inet import IP
26 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
27 from scapy.layers.ipsec import SecurityAssociation, ESP
28 from scapy.layers.l2 import Ether
29 from scapy.packet import Raw
31 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
32 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
35 def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
36 """Check received IPsec packet.
38 :param pkt_recv: Received packet to verify.
39 :param ip_layer: Scapy IP layer.
40 :param dst_tun: IPsec tunnel destination address.
41 :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
42 :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
43 :param sa_in: IPsec SA for packet decryption.
44 :type pkt_recv: scapy.Ether
45 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
49 :type sa_in: scapy.layers.ipsec.SecurityAssociation
50 :raises RuntimeError: If received packet is invalid.
52 if not pkt_recv.haslayer(ip_layer):
54 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
57 if pkt_recv[ip_layer].dst != dst_tun:
59 f"Received packet has invalid destination address: "
60 f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
63 if not pkt_recv.haslayer(ESP):
64 raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
66 ip_pkt = pkt_recv[ip_layer]
67 d_pkt = sa_in.decrypt(ip_pkt)
69 if d_pkt[ip_layer].dst != dst_ip:
71 f"Decrypted packet has invalid destination address: "
72 f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
75 if d_pkt[ip_layer].src != src_ip:
77 f"Decrypted packet has invalid source address: "
78 f"{d_pkt[ip_layer].src} should be: {src_ip}"
81 if ip_layer == IP and d_pkt[ip_layer].proto != 61:
83 f"Decrypted packet has invalid IP protocol: "
84 f"{d_pkt[ip_layer].proto} should be: 61"
88 def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
89 """Check received IP/IPv6 packet.
91 :param pkt_recv: Received packet to verify.
92 :param ip_layer: Scapy IP layer.
93 :param src_ip: Source IP/IPv6 address.
94 :param dst_ip: Destination IP/IPv6 address.
95 :type pkt_recv: scapy.Ether
96 :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
99 :raises RuntimeError: If received packet is invalid.
101 if not pkt_recv.haslayer(ip_layer):
103 f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
106 if pkt_recv[ip_layer].dst != dst_ip:
108 f"Received packet has invalid destination address: "
109 f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
112 if pkt_recv[ip_layer].src != src_ip:
114 f"Received packet has invalid destination address: "
115 f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
118 if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
120 f"Received packet has invalid IP protocol: "
121 f"{pkt_recv[ip_layer].proto} should be: 61"
125 # pylint: disable=too-many-locals
126 # pylint: disable=too-many-statements
128 """Send and receive IPsec packet."""
130 args = TrafficScriptArg(
132 u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
133 u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
134 u"integ_key", u"l_spi", u"r_spi"
136 [u"src_tun", u"dst_tun"]
139 tx_txq = TxQueue(args.get_arg(u"tx_if"))
140 tx_rxq = RxQueue(args.get_arg(u"tx_if"))
141 rx_txq = TxQueue(args.get_arg(u"rx_if"))
142 rx_rxq = RxQueue(args.get_arg(u"rx_if"))
144 tx_src_mac = args.get_arg(u"tx_src_mac")
145 tx_dst_mac = args.get_arg(u"tx_dst_mac")
146 rx_src_mac = args.get_arg(u"rx_src_mac")
147 rx_dst_mac = args.get_arg(u"rx_dst_mac")
148 src_ip = args.get_arg(u"src_ip")
149 dst_ip = args.get_arg(u"dst_ip")
150 crypto_alg = args.get_arg(u"crypto_alg")
151 crypto_key = args.get_arg(u"crypto_key")
152 integ_alg = args.get_arg(u"integ_alg")
153 integ_key = args.get_arg(u"integ_key")
154 l_spi = int(args.get_arg(u"l_spi"))
155 r_spi = int(args.get_arg(u"r_spi"))
156 src_tun = args.get_arg(u"src_tun")
157 dst_tun = args.get_arg(u"dst_tun")
159 ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
161 tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
163 tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \
166 if not (src_tun and dst_tun):
169 sa_in = SecurityAssociation(
170 ESP, spi=r_spi, crypt_algo=crypto_alg,
171 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
172 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
175 sa_out = SecurityAssociation(
176 ESP, spi=l_spi, crypt_algo=crypto_alg,
177 crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
178 auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
181 ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
182 else ip_layer(src=src_ip, dst=dst_ip)
183 ip_pkt = ip_layer(ip_pkt)
185 e_pkt = sa_out.encrypt(ip_pkt)
186 tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
189 sent_packets = list()
191 sent_packets.append(tx_pkt_send)
192 tx_txq.send(tx_pkt_send)
195 rx_pkt_recv = rx_rxq.recv(2)
197 if rx_pkt_recv is None:
198 raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
200 if rx_pkt_recv.haslayer(ICMPv6ND_NS):
201 # read another packet in the queue if the current one is ICMPv6ND_NS
204 # otherwise process the current packet
207 check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
209 rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
210 else ip_layer(src=dst_ip, dst=src_ip)
211 rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
215 rx_txq.send(rx_pkt_send)
218 tx_pkt_recv = tx_rxq.recv(2, sent_packets)
220 if tx_pkt_recv is None:
221 raise RuntimeError(u"ESP packet Rx timeout")
223 if tx_pkt_recv.haslayer(ICMPv6ND_NS):
224 # read another packet in the queue if the current one is ICMPv6ND_NS
227 # otherwise process the current packet
230 check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in)
235 if __name__ == u"__main__":