API: deprecated IPSEC APIs
[csit.git] / GPL / traffic_scripts / ipsec_interface.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
18 #
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24
25 """Traffic script for IPsec verification."""
26
27 import sys
28
29 from ipaddress import ip_address
30 from scapy.layers.inet import IP
31 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
32 from scapy.layers.ipsec import SecurityAssociation, ESP
33 from scapy.layers.l2 import Ether
34 from scapy.packet import Raw
35
36 from .PacketVerifier import RxQueue, TxQueue
37 from .TrafficScriptArg import TrafficScriptArg
38
39
40 def check_ipsec(
41         pkt_recv, ip_layer, src_mac, dst_mac, src_tun, dst_tun, src_ip, dst_ip,
42         sa_in):
43     """Check received IPsec packet.
44
45     :param pkt_recv: Received packet to verify.
46     :param ip_layer: Scapy IP layer.
47     :param src_mac: Source MAC address.
48     :param dst_mac: Destination MAC address.
49     :param src_tun: IPsec tunnel source address.
50     :param dst_tun: IPsec tunnel destination address.
51     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
52     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
53     :param sa_in: IPsec SA for packet decryption.
54     :type pkt_recv: scapy.Ether
55     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
56     :type src_mac: str
57     :type dst_mac: str
58     :type src_tun: str
59     :type dst_tun: str
60     :type src_ip: str
61     :type dst_ip: str
62     :type sa_in: scapy.layers.ipsec.SecurityAssociation
63     :raises RuntimeError: If received packet is invalid.
64     """
65     if pkt_recv[Ether].src != src_mac:
66         raise RuntimeError(
67             f"Received frame has invalid source MAC address: "
68             f"{pkt_recv[Ether].src} should be: {src_mac}"
69         )
70
71     if pkt_recv[Ether].dst != dst_mac:
72         raise RuntimeError(
73             f"Received frame has invalid destination MAC address: "
74             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
75         )
76
77     if not pkt_recv.haslayer(ip_layer):
78         raise RuntimeError(
79             f"Not an {ip_layer.__name__} packet received: {pkt_recv!r}"
80         )
81
82     if pkt_recv[ip_layer].src != src_tun:
83         raise RuntimeError(
84             f"Received packet has invalid source address: "
85             f"{pkt_recv[ip_layer].src} should be: {src_tun}"
86         )
87
88     if pkt_recv[ip_layer].dst != dst_tun:
89         raise RuntimeError(
90             f"Received packet has invalid destination address: "
91             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
92         )
93
94     if not pkt_recv.haslayer(ESP):
95         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
96
97     ip_pkt = pkt_recv[ip_layer]
98     d_pkt = sa_in.decrypt(ip_pkt)
99     print(u"Decrypted packet:")
100     d_pkt.show2()
101     print()
102
103     if d_pkt[ip_layer].dst != dst_ip:
104         raise RuntimeError(
105             f"Decrypted packet has invalid destination address: "
106             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
107         )
108
109     if d_pkt[ip_layer].src != src_ip:
110         raise RuntimeError(
111             f"Decrypted packet has invalid source address: "
112             f"{d_pkt[ip_layer].src} should be: {src_ip}"
113         )
114
115     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
116         raise RuntimeError(
117             f"Decrypted packet has invalid IP protocol: "
118             f"{d_pkt[ip_layer].proto} should be: 61"
119         )
120
121
122 def check_ip(pkt_recv, ip_layer, src_mac, dst_mac, src_ip, dst_ip):
123     """Check received IP/IPv6 packet.
124
125     :param pkt_recv: Received packet to verify.
126     :param ip_layer: Scapy IP layer.
127     :param src_mac: Source MAC address.
128     :param dst_mac: Destination MAC address.
129     :param src_ip: Source IP/IPv6 address.
130     :param dst_ip: Destination IP/IPv6 address.
131     :type pkt_recv: scapy.Ether
132     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
133     :type src_mac: str
134     :type dst_mac: str
135     :type src_ip: str
136     :type dst_ip: str
137     :raises RuntimeError: If received packet is invalid.
138     """
139     if pkt_recv[Ether].src != src_mac:
140         raise RuntimeError(
141             f"Received frame has invalid source MAC address: "
142             f"{pkt_recv[Ether].src} should be: {src_mac}"
143         )
144
145     if pkt_recv[Ether].dst != dst_mac:
146         raise RuntimeError(
147             f"Received frame has invalid destination MAC address: "
148             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
149         )
150
151     if not pkt_recv.haslayer(ip_layer):
152         raise RuntimeError(
153             f"Not an {ip_layer.__name__} packet received: {pkt_recv!r}"
154         )
155
156     if pkt_recv[ip_layer].dst != dst_ip:
157         raise RuntimeError(
158             f"Received packet has invalid destination address: "
159             f"{pkt_recv[ip_layer.__name__].dst} should be: {dst_ip}"
160         )
161
162     if pkt_recv[ip_layer].src != src_ip:
163         raise RuntimeError(
164             f"Received packet has invalid destination address: "
165             f"{pkt_recv[ip_layer.__name__].dst} should be: {src_ip}"
166         )
167
168     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
169         raise RuntimeError(
170             f"Received packet has invalid IP protocol: "
171             f"{pkt_recv[ip_layer].proto} should be: 61"
172         )
173
174
175 def main():
176     """Send and receive IPsec packet."""
177
178     args = TrafficScriptArg(
179         [
180             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
181             u"src_ip", u"dst_ip", u"src_tun", u"dst_tun", u"crypto_alg",
182             u"crypto_key", u"integ_alg", u"integ_key", u"l_spi", u"r_spi"
183         ]
184     )
185
186     tx_txq = TxQueue(args.get_arg(u"tx_if"))
187     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
188     rx_txq = TxQueue(args.get_arg(u"rx_if"))
189     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
190
191     tx_src_mac = args.get_arg(u"tx_src_mac")
192     tx_dst_mac = args.get_arg(u"tx_dst_mac")
193     rx_src_mac = args.get_arg(u"rx_src_mac")
194     rx_dst_mac = args.get_arg(u"rx_dst_mac")
195     src_ip = args.get_arg(u"src_ip")
196     dst_ip = args.get_arg(u"dst_ip")
197     src_tun = args.get_arg(u"src_tun")
198     dst_tun = args.get_arg(u"dst_tun")
199     crypto_alg = args.get_arg(u"crypto_alg")
200     crypto_key = args.get_arg(u"crypto_key")
201     integ_alg = args.get_arg(u"integ_alg")
202     integ_key = args.get_arg(u"integ_key")
203     l_spi = int(args.get_arg(u"l_spi"))
204     r_spi = int(args.get_arg(u"r_spi"))
205
206     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
207     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
208         else ip_layer(src=src_ip, dst=dst_ip)
209
210     tunnel_in = ip_layer(src=src_tun, dst=dst_tun)
211     tunnel_out = ip_layer(src=dst_tun, dst=src_tun)
212
213     sa_in = SecurityAssociation(
214         ESP, spi=l_spi, crypt_algo=crypto_alg,
215         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
216         auth_key=integ_key.encode(encoding=u"utf-8"),
217         tunnel_header=tunnel_in
218     )
219
220     sa_out = SecurityAssociation(
221         ESP, spi=r_spi, crypt_algo=crypto_alg,
222         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
223         auth_key=integ_key.encode(encoding=u"utf-8"),
224         tunnel_header=tunnel_out
225     )
226
227     sent_packets = list()
228     tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / ip_pkt)
229     tx_pkt_send /= Raw()
230     size_limit = 78 if ip_layer == IPv6 else 64
231     if len(tx_pkt_send) < size_limit:
232         tx_pkt_send[Raw].load += (b"\0" * (size_limit - len(tx_pkt_send)))
233     sent_packets.append(tx_pkt_send)
234     tx_txq.send(tx_pkt_send)
235
236     while True:
237         rx_pkt_recv = rx_rxq.recv(2)
238
239         if rx_pkt_recv is None:
240             raise RuntimeError(f"{ip_layer.__name__} packet Rx timeout")
241
242         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
243             # read another packet in the queue if the current one is ICMPv6ND_NS
244             continue
245         else:
246             # otherwise process the current packet
247             break
248
249     check_ipsec(
250         rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip,
251         dst_ip, sa_in
252     )
253
254     ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
255         else ip_layer(src=dst_ip, dst=src_ip)
256     ip_pkt /= Raw()
257     if len(ip_pkt) < (size_limit - 14):
258         ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt)))
259     e_pkt = sa_out.encrypt(ip_pkt)
260     rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
261                    e_pkt)
262     rx_txq.send(rx_pkt_send)
263
264     while True:
265         tx_pkt_recv = tx_rxq.recv(2, ignore=sent_packets)
266
267         if tx_pkt_recv is None:
268             raise RuntimeError(f"{ip_layer.__name__} packet Rx timeout")
269
270         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
271             # read another packet in the queue if the current one is ICMPv6ND_NS
272             continue
273         else:
274             # otherwise process the current packet
275             break
276
277     check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip)
278
279     sys.exit(0)
280
281
282 if __name__ == u"__main__":
283     main()