Ansible: Enable consul TLS
[csit.git] / GPL / traffic_scripts / ipsec_interface.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
18 #
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24
25 """Traffic script for IPsec verification."""
26
27 import sys
28
29 from ipaddress import ip_address
30 from scapy.layers.inet import IP
31 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
32 from scapy.layers.ipsec import SecurityAssociation, ESP
33 from scapy.layers.l2 import Ether
34 from scapy.packet import Raw
35
36 from .PacketVerifier import RxQueue, TxQueue
37 from .TrafficScriptArg import TrafficScriptArg
38
39
40 def check_ipsec(
41         pkt_recv, ip_layer, src_mac, dst_mac, src_tun, dst_tun, src_ip, dst_ip,
42         sa_in):
43     """Check received IPsec packet.
44
45     :param pkt_recv: Received packet to verify.
46     :param ip_layer: Scapy IP layer.
47     :param src_mac: Source MAC address.
48     :param dst_mac: Destination MAC address.
49     :param src_tun: IPsec tunnel source address.
50     :param dst_tun: IPsec tunnel destination address.
51     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
52     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
53     :param sa_in: IPsec SA for packet decryption.
54     :type pkt_recv: scapy.Ether
55     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
56     :type src_mac: str
57     :type dst_mac: str
58     :type src_tun: str
59     :type dst_tun: str
60     :type src_ip: str
61     :type dst_ip: str
62     :type sa_in: scapy.layers.ipsec.SecurityAssociation
63     :raises RuntimeError: If received packet is invalid.
64     """
65     if pkt_recv[Ether].src != src_mac:
66         raise RuntimeError(
67             f"Received frame has invalid source MAC address: "
68             f"{pkt_recv[Ether].src} should be: {src_mac}"
69         )
70
71     if pkt_recv[Ether].dst != dst_mac:
72         raise RuntimeError(
73             f"Received frame has invalid destination MAC address: "
74             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
75         )
76
77     if not pkt_recv.haslayer(ip_layer):
78         raise RuntimeError(
79             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
80         )
81
82     if pkt_recv[ip_layer].src != src_tun:
83         raise RuntimeError(
84             f"Received packet has invalid source address: "
85             f"{pkt_recv[ip_layer].src} should be: {src_tun}"
86         )
87
88     if pkt_recv[ip_layer].dst != dst_tun:
89         raise RuntimeError(
90             f"Received packet has invalid destination address: "
91             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
92         )
93
94     if not pkt_recv.haslayer(ESP):
95         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
96
97     ip_pkt = pkt_recv[ip_layer]
98     d_pkt = sa_in.decrypt(ip_pkt)
99
100     if d_pkt[ip_layer].dst != dst_ip:
101         raise RuntimeError(
102             f"Decrypted packet has invalid destination address: "
103             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
104         )
105
106     if d_pkt[ip_layer].src != src_ip:
107         raise RuntimeError(
108             f"Decrypted packet has invalid source address: "
109             f"{d_pkt[ip_layer].src} should be: {src_ip}"
110         )
111
112     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
113         raise RuntimeError(
114             f"Decrypted packet has invalid IP protocol: "
115             f"{d_pkt[ip_layer].proto} should be: 61"
116         )
117
118
119 def check_ip(pkt_recv, ip_layer, src_mac, dst_mac, src_ip, dst_ip):
120     """Check received IP/IPv6 packet.
121
122     :param pkt_recv: Received packet to verify.
123     :param ip_layer: Scapy IP layer.
124     :param src_mac: Source MAC address.
125     :param dst_mac: Destination MAC address.
126     :param src_ip: Source IP/IPv6 address.
127     :param dst_ip: Destination IP/IPv6 address.
128     :type pkt_recv: scapy.Ether
129     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
130     :type src_mac: str
131     :type dst_mac: str
132     :type src_ip: str
133     :type dst_ip: str
134     :raises RuntimeError: If received packet is invalid.
135     """
136     if pkt_recv[Ether].src != src_mac:
137         raise RuntimeError(
138             f"Received frame has invalid source MAC address: "
139             f"{pkt_recv[Ether].src} should be: {src_mac}"
140         )
141
142     if pkt_recv[Ether].dst != dst_mac:
143         raise RuntimeError(
144             f"Received frame has invalid destination MAC address: "
145             f"{pkt_recv[Ether].dst} should be: {dst_mac}"
146         )
147
148     if not pkt_recv.haslayer(ip_layer):
149         raise RuntimeError(
150             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
151         )
152
153     if pkt_recv[ip_layer].dst != dst_ip:
154         raise RuntimeError(
155             f"Received packet has invalid destination address: "
156             f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
157         )
158
159     if pkt_recv[ip_layer].src != src_ip:
160         raise RuntimeError(
161             f"Received packet has invalid destination address: "
162             f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
163         )
164
165     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
166         raise RuntimeError(
167             f"Received packet has invalid IP protocol: "
168             f"{pkt_recv[ip_layer].proto} should be: 61"
169         )
170
171
172 def main():
173     """Send and receive IPsec packet."""
174
175     args = TrafficScriptArg(
176         [
177             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
178             u"src_ip", u"dst_ip", u"src_tun", u"dst_tun", u"crypto_alg",
179             u"crypto_key", u"integ_alg", u"integ_key", u"l_spi", u"r_spi"
180         ]
181     )
182
183     tx_txq = TxQueue(args.get_arg(u"tx_if"))
184     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
185     rx_txq = TxQueue(args.get_arg(u"rx_if"))
186     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
187
188     tx_src_mac = args.get_arg(u"tx_src_mac")
189     tx_dst_mac = args.get_arg(u"tx_dst_mac")
190     rx_src_mac = args.get_arg(u"rx_src_mac")
191     rx_dst_mac = args.get_arg(u"rx_dst_mac")
192     src_ip = args.get_arg(u"src_ip")
193     dst_ip = args.get_arg(u"dst_ip")
194     src_tun = args.get_arg(u"src_tun")
195     dst_tun = args.get_arg(u"dst_tun")
196     crypto_alg = args.get_arg(u"crypto_alg")
197     crypto_key = args.get_arg(u"crypto_key")
198     integ_alg = args.get_arg(u"integ_alg")
199     integ_key = args.get_arg(u"integ_key")
200     l_spi = int(args.get_arg(u"l_spi"))
201     r_spi = int(args.get_arg(u"r_spi"))
202
203     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
204     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
205         else ip_layer(src=src_ip, dst=dst_ip)
206
207     tunnel_in = ip_layer(src=src_tun, dst=dst_tun)
208     tunnel_out = ip_layer(src=dst_tun, dst=src_tun)
209
210     sa_in = SecurityAssociation(
211         ESP, spi=l_spi, crypt_algo=crypto_alg,
212         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
213         auth_key=integ_key.encode(encoding=u"utf-8"),
214         tunnel_header=tunnel_in
215     )
216
217     sa_out = SecurityAssociation(
218         ESP, spi=r_spi, crypt_algo=crypto_alg,
219         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
220         auth_key=integ_key.encode(encoding=u"utf-8"),
221         tunnel_header=tunnel_out
222     )
223
224     sent_packets = list()
225     tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / ip_pkt)
226     tx_pkt_send /= Raw()
227     size_limit = 78 if ip_layer == IPv6 else 64
228     if len(tx_pkt_send) < size_limit:
229         tx_pkt_send[Raw].load += (b"\0" * (size_limit - len(tx_pkt_send)))
230     sent_packets.append(tx_pkt_send)
231     tx_txq.send(tx_pkt_send)
232
233     while True:
234         rx_pkt_recv = rx_rxq.recv(2)
235
236         if rx_pkt_recv is None:
237             raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
238
239         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
240             # read another packet in the queue if the current one is ICMPv6ND_NS
241             continue
242         else:
243             # otherwise process the current packet
244             break
245
246     check_ipsec(
247         rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip,
248         dst_ip, sa_in
249     )
250
251     ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
252         else ip_layer(src=dst_ip, dst=src_ip)
253     ip_pkt /= Raw()
254     if len(ip_pkt) < (size_limit - 14):
255         ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt)))
256     e_pkt = sa_out.encrypt(ip_pkt)
257     rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
258                    e_pkt)
259     rx_txq.send(rx_pkt_send)
260
261     while True:
262         tx_pkt_recv = tx_rxq.recv(2, ignore=sent_packets)
263
264         if tx_pkt_recv is None:
265             raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
266
267         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
268             # read another packet in the queue if the current one is ICMPv6ND_NS
269             continue
270         else:
271             # otherwise process the current packet
272             break
273
274     check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip)
275
276     sys.exit(0)
277
278
279 if __name__ == u"__main__":
280     main()