ee157260fa2ba3e8128045abacb304021ec9ecee
[csit.git] / GPL / traffic_scripts / ipsec_interface.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script for IPsec verification."""
27
28 import sys
29
30 from ipaddress import ip_address
31 from scapy.layers.inet import IP
32 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2
33 from scapy.layers.ipsec import SecurityAssociation, ESP
34 from scapy.layers.l2 import Ether
35 from scapy.packet import Raw
36
37 from .PacketVerifier import RxQueue, TxQueue
38 from .TrafficScriptArg import TrafficScriptArg
39
40
41 def check_ipsec(
42         pkt_recv, ip_layer, src_mac, dst_mac, src_tun, dst_tun, src_ip, dst_ip,
43         sa_in):
44     """Check received IPsec packet.
45
46     :param pkt_recv: Received packet to verify.
47     :param ip_layer: Scapy IP layer.
48     :param src_mac: Source MAC address.
49     :param dst_mac: Destination MAC address.
50     :param src_tun: IPsec tunnel source address.
51     :param dst_tun: IPsec tunnel destination address.
52     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
53     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
54     :param sa_in: IPsec SA for packet decryption.
55     :type pkt_recv: scapy.Ether
56     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
57     :type src_mac: str
58     :type dst_mac: str
59     :type src_tun: str
60     :type dst_tun: str
61     :type src_ip: str
62     :type dst_ip: str
63     :type sa_in: scapy.layers.ipsec.SecurityAssociation
64     :raises RuntimeError: If received packet is invalid.
65     """
66     if pkt_recv[Ether].src != src_mac:
67         raise RuntimeError(
68             f"Received frame has invalid source MAC address: "
69             f"{pkt_recv[Ether].src} should be: {src_mac}"
70         )
71
72     if pkt_recv[Ether].dst != dst_mac:
73         raise RuntimeError(
74             f"Received frame has invalid destination MAC address: "
75             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
76         )
77
78     if not pkt_recv.haslayer(ip_layer):
79         raise RuntimeError(
80             f"Not an {ip_layer.__name__} packet received: {pkt_recv!r}"
81         )
82
83     if pkt_recv[ip_layer].src != src_tun:
84         raise RuntimeError(
85             f"Received packet has invalid source address: "
86             f"{pkt_recv[ip_layer].src} should be: {src_tun}"
87         )
88
89     if pkt_recv[ip_layer].dst != dst_tun:
90         raise RuntimeError(
91             f"Received packet has invalid destination address: "
92             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
93         )
94
95     if not pkt_recv.haslayer(ESP):
96         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
97
98     ip_pkt = pkt_recv[ip_layer]
99     d_pkt = sa_in.decrypt(ip_pkt)
100     print(u"Decrypted packet:")
101     d_pkt.show2()
102     print()
103
104     if d_pkt[ip_layer].dst != dst_ip:
105         raise RuntimeError(
106             f"Decrypted packet has invalid destination address: "
107             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
108         )
109
110     if d_pkt[ip_layer].src != src_ip:
111         raise RuntimeError(
112             f"Decrypted packet has invalid source address: "
113             f"{d_pkt[ip_layer].src} should be: {src_ip}"
114         )
115
116     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
117         raise RuntimeError(
118             f"Decrypted packet has invalid IP protocol: "
119             f"{d_pkt[ip_layer].proto} should be: 61"
120         )
121
122
123 def check_ip(pkt_recv, ip_layer, src_mac, dst_mac, src_ip, dst_ip):
124     """Check received IP/IPv6 packet.
125
126     :param pkt_recv: Received packet to verify.
127     :param ip_layer: Scapy IP layer.
128     :param src_mac: Source MAC address.
129     :param dst_mac: Destination MAC address.
130     :param src_ip: Source IP/IPv6 address.
131     :param dst_ip: Destination IP/IPv6 address.
132     :type pkt_recv: scapy.Ether
133     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
134     :type src_mac: str
135     :type dst_mac: str
136     :type src_ip: str
137     :type dst_ip: str
138     :raises RuntimeError: If received packet is invalid.
139     """
140     if pkt_recv[Ether].src != src_mac:
141         raise RuntimeError(
142             f"Received frame has invalid source MAC address: "
143             f"{pkt_recv[Ether].src} should be: {src_mac}"
144         )
145
146     if pkt_recv[Ether].dst != dst_mac:
147         raise RuntimeError(
148             f"Received frame has invalid destination MAC address: "
149             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
150         )
151
152     if not pkt_recv.haslayer(ip_layer):
153         raise RuntimeError(
154             f"Not an {ip_layer.__name__} packet received: {pkt_recv!r}"
155         )
156
157     if pkt_recv[ip_layer].dst != dst_ip:
158         raise RuntimeError(
159             f"Received packet has invalid destination address: "
160             f"{pkt_recv[ip_layer.__name__].dst} should be: {dst_ip}"
161         )
162
163     if pkt_recv[ip_layer].src != src_ip:
164         raise RuntimeError(
165             f"Received packet has invalid destination address: "
166             f"{pkt_recv[ip_layer.__name__].dst} should be: {src_ip}"
167         )
168
169     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
170         raise RuntimeError(
171             f"Received packet has invalid IP protocol: "
172             f"{pkt_recv[ip_layer].proto} should be: 61"
173         )
174
175
176 def main():
177     """Send and receive IPsec packet."""
178
179     args = TrafficScriptArg(
180         [
181             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
182             u"src_ip", u"dst_ip", u"src_tun", u"dst_tun", u"crypto_alg",
183             u"crypto_key", u"integ_alg", u"integ_key", u"l_spi", u"r_spi"
184         ]
185     )
186
187     tx_txq = TxQueue(args.get_arg(u"tx_if"))
188     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
189     rx_txq = TxQueue(args.get_arg(u"rx_if"))
190     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
191
192     tx_src_mac = args.get_arg(u"tx_src_mac")
193     tx_dst_mac = args.get_arg(u"tx_dst_mac")
194     rx_src_mac = args.get_arg(u"rx_src_mac")
195     rx_dst_mac = args.get_arg(u"rx_dst_mac")
196     src_ip = args.get_arg(u"src_ip")
197     dst_ip = args.get_arg(u"dst_ip")
198     src_tun = args.get_arg(u"src_tun")
199     dst_tun = args.get_arg(u"dst_tun")
200     crypto_alg = args.get_arg(u"crypto_alg")
201     crypto_key = args.get_arg(u"crypto_key")
202     integ_alg = args.get_arg(u"integ_alg")
203     integ_key = args.get_arg(u"integ_key")
204     l_spi = int(args.get_arg(u"l_spi"))
205     r_spi = int(args.get_arg(u"r_spi"))
206
207     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
208     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
209         else ip_layer(src=src_ip, dst=dst_ip)
210
211     tunnel_in = ip_layer(src=src_tun, dst=dst_tun)
212     tunnel_out = ip_layer(src=dst_tun, dst=src_tun)
213
214     sa_in = SecurityAssociation(
215         ESP, spi=l_spi, crypt_algo=crypto_alg,
216         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
217         auth_key=integ_key.encode(encoding=u"utf-8"),
218         tunnel_header=tunnel_in
219     )
220
221     sa_out = SecurityAssociation(
222         ESP, spi=r_spi, crypt_algo=crypto_alg,
223         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
224         auth_key=integ_key.encode(encoding=u"utf-8"),
225         tunnel_header=tunnel_out
226     )
227
228     sent_packets = list()
229     tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / ip_pkt)
230     tx_pkt_send /= Raw()
231     size_limit = 78 if ip_layer == IPv6 else 64
232     if len(tx_pkt_send) < size_limit:
233         tx_pkt_send[Raw].load += (b"\0" * (size_limit - len(tx_pkt_send)))
234     sent_packets.append(tx_pkt_send)
235     tx_txq.send(tx_pkt_send)
236
237     while True:
238         rx_pkt_recv = rx_rxq.recv(2)
239
240         if rx_pkt_recv is None:
241             raise RuntimeError(f"{ip_layer.__name__} packet Rx timeout")
242
243         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
244             # read another packet in the queue if the current one is ICMPv6ND_NS
245             continue
246         else:
247             # otherwise process the current packet
248             break
249
250     check_ipsec(
251         rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip,
252         dst_ip, sa_in
253     )
254
255     ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
256         else ip_layer(src=dst_ip, dst=src_ip)
257     ip_pkt /= Raw()
258     if len(ip_pkt) < (size_limit - 14):
259         ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt)))
260     e_pkt = sa_out.encrypt(ip_pkt)
261     rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
262                    e_pkt)
263     rx_txq.send(rx_pkt_send)
264
265     while True:
266         tx_pkt_recv = tx_rxq.recv(2, ignore=sent_packets)
267
268         if tx_pkt_recv is None:
269             raise RuntimeError(f"{ip_layer.__name__} packet Rx timeout")
270
271         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
272             # read another packet in the queue if the current one is ICMPv6ND_NS
273             continue
274         elif tx_pkt_recv.haslayer(ICMPv6MLReport2):
275             # read another packet in the queue if the current one is
276             # ICMPv6MLReport2
277             continue
278         else:
279             # otherwise process the current packet
280             break
281
282     check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip)
283
284     sys.exit(0)
285
286
287 if __name__ == u"__main__":
288     main()