fix(ipsec): Prepare IPsecUtil for upcoming changes
[csit.git] / GPL / traffic_scripts / ipsec_interface.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script for IPsec verification."""
27
28 import sys
29
30 from ipaddress import ip_address
31 from scapy.layers.inet import IP
32 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
33 from scapy.layers.ipsec import SecurityAssociation, ESP
34 from scapy.layers.l2 import Ether
35 from scapy.packet import Raw
36
37 from .PacketVerifier import RxQueue, TxQueue
38 from .TrafficScriptArg import TrafficScriptArg
39
40
41 def check_ipsec(
42         pkt_recv, ip_layer, src_mac, dst_mac, src_tun, dst_tun, src_ip, dst_ip,
43         sa_in):
44     """Check received IPsec packet.
45
46     :param pkt_recv: Received packet to verify.
47     :param ip_layer: Scapy IP layer.
48     :param src_mac: Source MAC address.
49     :param dst_mac: Destination MAC address.
50     :param src_tun: IPsec tunnel source address.
51     :param dst_tun: IPsec tunnel destination address.
52     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
53     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
54     :param sa_in: IPsec SA for packet decryption.
55     :type pkt_recv: scapy.Ether
56     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
57     :type src_mac: str
58     :type dst_mac: str
59     :type src_tun: str
60     :type dst_tun: str
61     :type src_ip: str
62     :type dst_ip: str
63     :type sa_in: scapy.layers.ipsec.SecurityAssociation
64     :raises RuntimeError: If received packet is invalid.
65     """
66     if pkt_recv[Ether].src != src_mac:
67         raise RuntimeError(
68             f"Received frame has invalid source MAC address: "
69             f"{pkt_recv[Ether].src} should be: {src_mac}"
70         )
71
72     if pkt_recv[Ether].dst != dst_mac:
73         raise RuntimeError(
74             f"Received frame has invalid destination MAC address: "
75             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
76         )
77
78     if not pkt_recv.haslayer(ip_layer):
79         raise RuntimeError(
80             f"Not an {ip_layer.__name__} packet received: {pkt_recv!r}"
81         )
82
83     if pkt_recv[ip_layer].src != src_tun:
84         raise RuntimeError(
85             f"Received packet has invalid source address: "
86             f"{pkt_recv[ip_layer].src} should be: {src_tun}"
87         )
88
89     if pkt_recv[ip_layer].dst != dst_tun:
90         raise RuntimeError(
91             f"Received packet has invalid destination address: "
92             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
93         )
94
95     if not pkt_recv.haslayer(ESP):
96         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
97
98     ip_pkt = pkt_recv[ip_layer]
99     d_pkt = sa_in.decrypt(ip_pkt)
100     print(u"Decrypted packet:")
101     d_pkt.show2()
102     print()
103
104     if d_pkt[ip_layer].dst != dst_ip:
105         raise RuntimeError(
106             f"Decrypted packet has invalid destination address: "
107             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
108         )
109
110     if d_pkt[ip_layer].src != src_ip:
111         raise RuntimeError(
112             f"Decrypted packet has invalid source address: "
113             f"{d_pkt[ip_layer].src} should be: {src_ip}"
114         )
115
116     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
117         raise RuntimeError(
118             f"Decrypted packet has invalid IP protocol: "
119             f"{d_pkt[ip_layer].proto} should be: 61"
120         )
121
122
123 def check_ip(pkt_recv, ip_layer, src_mac, dst_mac, src_ip, dst_ip):
124     """Check received IP/IPv6 packet.
125
126     :param pkt_recv: Received packet to verify.
127     :param ip_layer: Scapy IP layer.
128     :param src_mac: Source MAC address.
129     :param dst_mac: Destination MAC address.
130     :param src_ip: Source IP/IPv6 address.
131     :param dst_ip: Destination IP/IPv6 address.
132     :type pkt_recv: scapy.Ether
133     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
134     :type src_mac: str
135     :type dst_mac: str
136     :type src_ip: str
137     :type dst_ip: str
138     :raises RuntimeError: If received packet is invalid.
139     """
140     if pkt_recv[Ether].src != src_mac:
141         raise RuntimeError(
142             f"Received frame has invalid source MAC address: "
143             f"{pkt_recv[Ether].src} should be: {src_mac}"
144         )
145
146     if pkt_recv[Ether].dst != dst_mac:
147         raise RuntimeError(
148             f"Received frame has invalid destination MAC address: "
149             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
150         )
151
152     if not pkt_recv.haslayer(ip_layer):
153         raise RuntimeError(
154             f"Not an {ip_layer.__name__} packet received: {pkt_recv!r}"
155         )
156
157     if pkt_recv[ip_layer].dst != dst_ip:
158         raise RuntimeError(
159             f"Received packet has invalid destination address: "
160             f"{pkt_recv[ip_layer.__name__].dst} should be: {dst_ip}"
161         )
162
163     if pkt_recv[ip_layer].src != src_ip:
164         raise RuntimeError(
165             f"Received packet has invalid destination address: "
166             f"{pkt_recv[ip_layer.__name__].dst} should be: {src_ip}"
167         )
168
169     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
170         raise RuntimeError(
171             f"Received packet has invalid IP protocol: "
172             f"{pkt_recv[ip_layer].proto} should be: 61"
173         )
174
175
176 def main():
177     """Send and receive IPsec packet."""
178
179     args = TrafficScriptArg(
180         [
181             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
182             u"src_ip", u"dst_ip", u"src_tun", u"dst_tun", u"crypto_alg",
183             u"crypto_key", u"integ_alg", u"integ_key", u"l_spi", u"r_spi"
184         ]
185     )
186
187     tx_txq = TxQueue(args.get_arg(u"tx_if"))
188     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
189     rx_txq = TxQueue(args.get_arg(u"rx_if"))
190     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
191
192     tx_src_mac = args.get_arg(u"tx_src_mac")
193     tx_dst_mac = args.get_arg(u"tx_dst_mac")
194     rx_src_mac = args.get_arg(u"rx_src_mac")
195     rx_dst_mac = args.get_arg(u"rx_dst_mac")
196     src_ip = args.get_arg(u"src_ip")
197     dst_ip = args.get_arg(u"dst_ip")
198     src_tun = args.get_arg(u"src_tun")
199     dst_tun = args.get_arg(u"dst_tun")
200     crypto_alg = args.get_arg(u"crypto_alg")
201     crypto_key = args.get_arg(u"crypto_key")
202     integ_alg = args.get_arg(u"integ_alg")
203     integ_key = args.get_arg(u"integ_key")
204     l_spi = int(args.get_arg(u"l_spi"))
205     r_spi = int(args.get_arg(u"r_spi"))
206
207     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
208     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
209         else ip_layer(src=src_ip, dst=dst_ip)
210
211     tunnel_in = ip_layer(src=src_tun, dst=dst_tun)
212     tunnel_out = ip_layer(src=dst_tun, dst=src_tun)
213
214     sa_in = SecurityAssociation(
215         ESP, spi=l_spi, crypt_algo=crypto_alg,
216         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
217         auth_key=integ_key.encode(encoding=u"utf-8"),
218         tunnel_header=tunnel_in
219     )
220
221     sa_out = SecurityAssociation(
222         ESP, spi=r_spi, crypt_algo=crypto_alg,
223         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
224         auth_key=integ_key.encode(encoding=u"utf-8"),
225         tunnel_header=tunnel_out
226     )
227
228     sent_packets = list()
229     tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / ip_pkt)
230     tx_pkt_send /= Raw()
231     size_limit = 78 if ip_layer == IPv6 else 64
232     if len(tx_pkt_send) < size_limit:
233         tx_pkt_send[Raw].load += (b"\0" * (size_limit - len(tx_pkt_send)))
234     sent_packets.append(tx_pkt_send)
235     tx_txq.send(tx_pkt_send)
236
237     while True:
238         rx_pkt_recv = rx_rxq.recv(2)
239
240         if rx_pkt_recv is None:
241             raise RuntimeError(f"{ip_layer.__name__} packet Rx timeout")
242
243         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
244             # read another packet in the queue if the current one is ICMPv6ND_NS
245             continue
246         elif rx_pkt_recv.haslayer(ICMPv6MLReport2):
247             # read another packet in the queue if the current one is
248             # ICMPv6MLReport2
249             continue
250         elif rx_pkt_recv.haslayer(ICMPv6ND_RA):
251             # read another packet in the queue if the current one is
252             # ICMPv6ND_RA
253             continue
254
255         # otherwise process the current packet
256         break
257
258     check_ipsec(
259         rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip,
260         dst_ip, sa_in
261     )
262
263     ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
264         else ip_layer(src=dst_ip, dst=src_ip)
265     ip_pkt /= Raw()
266     if len(ip_pkt) < (size_limit - 14):
267         ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt)))
268     e_pkt = sa_out.encrypt(ip_pkt)
269     rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
270                    e_pkt)
271     rx_txq.send(rx_pkt_send)
272
273     while True:
274         tx_pkt_recv = tx_rxq.recv(2, ignore=sent_packets)
275
276         if tx_pkt_recv is None:
277             raise RuntimeError(f"{ip_layer.__name__} packet Rx timeout")
278
279         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
280             # read another packet in the queue if the current one is ICMPv6ND_NS
281             continue
282         elif tx_pkt_recv.haslayer(ICMPv6MLReport2):
283             # read another packet in the queue if the current one is
284             # ICMPv6MLReport2
285             continue
286         elif tx_pkt_recv.haslayer(ICMPv6MLReport2):
287             # read another packet in the queue if the current one is
288             # ICMPv6MLReport2
289             continue
290         elif tx_pkt_recv.haslayer(ICMPv6ND_RA):
291             # read another packet in the queue if the current one is
292             # ICMPv6ND_RA
293             continue
294
295         # otherwise process the current packet
296         break
297
298     check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip)
299
300     sys.exit(0)
301
302
303 if __name__ == u"__main__":
304     main()