fix(ip6scale): Unify rnd profiles
[csit.git] / GPL / traffic_scripts / ipsec_interface.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2022 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script for IPsec verification."""
27
28 import sys
29
30 from ipaddress import ip_address
31 from scapy.layers.inet import IP
32 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
33 from scapy.layers.ipsec import SecurityAssociation, ESP
34 from scapy.layers.l2 import Ether
35 from scapy.packet import Raw
36
37 from .PacketVerifier import RxQueue, TxQueue
38 from .TrafficScriptArg import TrafficScriptArg
39
40
41 def check_ipsec(
42         pkt_recv, ip_layer, src_mac, dst_mac, src_tun, dst_tun, src_ip, dst_ip,
43         sa_in):
44     """Check received IPsec packet.
45
46     :param pkt_recv: Received packet to verify.
47     :param ip_layer: Scapy IP layer.
48     :param src_mac: Source MAC address.
49     :param dst_mac: Destination MAC address.
50     :param src_tun: IPsec tunnel source address.
51     :param dst_tun: IPsec tunnel destination address.
52     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
53     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
54     :param sa_in: IPsec SA for packet decryption.
55     :type pkt_recv: scapy.Ether
56     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
57     :type src_mac: str
58     :type dst_mac: str
59     :type src_tun: str
60     :type dst_tun: str
61     :type src_ip: str
62     :type dst_ip: str
63     :type sa_in: scapy.layers.ipsec.SecurityAssociation
64     :raises RuntimeError: If received packet is invalid.
65     """
66     if pkt_recv[Ether].src != src_mac:
67         raise RuntimeError(
68             f"Received frame has invalid source MAC address: "
69             f"{pkt_recv[Ether].src} should be: {src_mac}"
70         )
71
72     if pkt_recv[Ether].dst != dst_mac:
73         raise RuntimeError(
74             f"Received frame has invalid destination MAC address: "
75             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
76         )
77
78     if not pkt_recv.haslayer(ip_layer):
79         raise RuntimeError(
80             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
81         )
82
83     if pkt_recv[ip_layer].src != src_tun:
84         raise RuntimeError(
85             f"Received packet has invalid source address: "
86             f"{pkt_recv[ip_layer].src} should be: {src_tun}"
87         )
88
89     if pkt_recv[ip_layer].dst != dst_tun:
90         raise RuntimeError(
91             f"Received packet has invalid destination address: "
92             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
93         )
94
95     if not pkt_recv.haslayer(ESP):
96         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
97
98     ip_pkt = pkt_recv[ip_layer]
99     d_pkt = sa_in.decrypt(ip_pkt)
100
101     if d_pkt[ip_layer].dst != dst_ip:
102         raise RuntimeError(
103             f"Decrypted packet has invalid destination address: "
104             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
105         )
106
107     if d_pkt[ip_layer].src != src_ip:
108         raise RuntimeError(
109             f"Decrypted packet has invalid source address: "
110             f"{d_pkt[ip_layer].src} should be: {src_ip}"
111         )
112
113     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
114         raise RuntimeError(
115             f"Decrypted packet has invalid IP protocol: "
116             f"{d_pkt[ip_layer].proto} should be: 61"
117         )
118
119
120 def check_ip(pkt_recv, ip_layer, src_mac, dst_mac, src_ip, dst_ip):
121     """Check received IP/IPv6 packet.
122
123     :param pkt_recv: Received packet to verify.
124     :param ip_layer: Scapy IP layer.
125     :param src_mac: Source MAC address.
126     :param dst_mac: Destination MAC address.
127     :param src_ip: Source IP/IPv6 address.
128     :param dst_ip: Destination IP/IPv6 address.
129     :type pkt_recv: scapy.Ether
130     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
131     :type src_mac: str
132     :type dst_mac: str
133     :type src_ip: str
134     :type dst_ip: str
135     :raises RuntimeError: If received packet is invalid.
136     """
137     if pkt_recv[Ether].src != src_mac:
138         raise RuntimeError(
139             f"Received frame has invalid source MAC address: "
140             f"{pkt_recv[Ether].src} should be: {src_mac}"
141         )
142
143     if pkt_recv[Ether].dst != dst_mac:
144         raise RuntimeError(
145             f"Received frame has invalid destination MAC address: "
146             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
147         )
148
149     if not pkt_recv.haslayer(ip_layer):
150         raise RuntimeError(
151             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
152         )
153
154     if pkt_recv[ip_layer].dst != dst_ip:
155         raise RuntimeError(
156             f"Received packet has invalid destination address: "
157             f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
158         )
159
160     if pkt_recv[ip_layer].src != src_ip:
161         raise RuntimeError(
162             f"Received packet has invalid destination address: "
163             f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
164         )
165
166     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
167         raise RuntimeError(
168             f"Received packet has invalid IP protocol: "
169             f"{pkt_recv[ip_layer].proto} should be: 61"
170         )
171
172
173 def main():
174     """Send and receive IPsec packet."""
175
176     args = TrafficScriptArg(
177         [
178             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
179             u"src_ip", u"dst_ip", u"src_tun", u"dst_tun", u"crypto_alg",
180             u"crypto_key", u"integ_alg", u"integ_key", u"l_spi", u"r_spi"
181         ]
182     )
183
184     tx_txq = TxQueue(args.get_arg(u"tx_if"))
185     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
186     rx_txq = TxQueue(args.get_arg(u"rx_if"))
187     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
188
189     tx_src_mac = args.get_arg(u"tx_src_mac")
190     tx_dst_mac = args.get_arg(u"tx_dst_mac")
191     rx_src_mac = args.get_arg(u"rx_src_mac")
192     rx_dst_mac = args.get_arg(u"rx_dst_mac")
193     src_ip = args.get_arg(u"src_ip")
194     dst_ip = args.get_arg(u"dst_ip")
195     src_tun = args.get_arg(u"src_tun")
196     dst_tun = args.get_arg(u"dst_tun")
197     crypto_alg = args.get_arg(u"crypto_alg")
198     crypto_key = args.get_arg(u"crypto_key")
199     integ_alg = args.get_arg(u"integ_alg")
200     integ_key = args.get_arg(u"integ_key")
201     l_spi = int(args.get_arg(u"l_spi"))
202     r_spi = int(args.get_arg(u"r_spi"))
203
204     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
205     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
206         else ip_layer(src=src_ip, dst=dst_ip)
207
208     tunnel_in = ip_layer(src=src_tun, dst=dst_tun)
209     tunnel_out = ip_layer(src=dst_tun, dst=src_tun)
210
211     sa_in = SecurityAssociation(
212         ESP, spi=l_spi, crypt_algo=crypto_alg,
213         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
214         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
215     )
216
217     sa_out = SecurityAssociation(
218         ESP, spi=r_spi, crypt_algo=crypto_alg,
219         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
220         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
221     )
222
223     sent_packets = list()
224     tx_pkt_send = (
225         Ether(src=tx_src_mac, dst=tx_dst_mac) / ip_pkt)
226     tx_pkt_send /= Raw()
227     size_limit = 78 if ip_layer == IPv6 else 64
228     if len(tx_pkt_send) < size_limit:
229         tx_pkt_send[Raw].load += (b"\0" * (size_limit - len(tx_pkt_send)))
230     sent_packets.append(tx_pkt_send)
231     tx_txq.send(tx_pkt_send)
232
233     while True:
234         rx_pkt_recv = rx_rxq.recv(2)
235
236         if rx_pkt_recv is None:
237             raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
238
239         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
240             # read another packet in the queue if the current one is ICMPv6ND_NS
241             continue
242         if rx_pkt_recv.haslayer(ICMPv6MLReport2):
243             # read another packet in the queue if the current one is
244             # ICMPv6MLReport2
245             continue
246         if rx_pkt_recv.haslayer(ICMPv6ND_RA):
247             # read another packet in the queue if the current one is
248             # ICMPv6ND_RA
249             continue
250
251         # otherwise process the current packet
252         break
253
254     check_ipsec(
255         rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip,
256         dst_ip, sa_in
257     )
258
259     rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
260         else ip_layer(src=dst_ip, dst=src_ip)
261     rx_ip_pkt /= Raw()
262     if len(rx_ip_pkt) < (size_limit - 14):
263         rx_ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(rx_ip_pkt)))
264     rx_pkt_send = (
265         Ether(src=rx_dst_mac, dst=rx_src_mac) / sa_out.encrypt(rx_ip_pkt))
266     rx_txq.send(rx_pkt_send)
267
268     while True:
269         tx_pkt_recv = tx_rxq.recv(2, ignore=sent_packets)
270
271         if tx_pkt_recv is None:
272             raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
273
274         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
275             # read another packet in the queue if the current one is ICMPv6ND_NS
276             continue
277         if tx_pkt_recv.haslayer(ICMPv6MLReport2):
278             # read another packet in the queue if the current one is
279             # ICMPv6MLReport2
280             continue
281         if tx_pkt_recv.haslayer(ICMPv6ND_RA):
282             # read another packet in the queue if the current one is
283             # ICMPv6ND_RA
284             continue
285
286         break
287
288     check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip)
289
290
291 if __name__ == u"__main__":
292     main()