Remove leftovers related to Xenial
[csit.git] / resources / traffic_scripts / ipsec.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2019 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script for IPsec verification."""
17
18 import sys
19 import logging
20
21 from ipaddress import ip_address
22 from scapy.layers.inet import IP
23 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
24 from scapy.layers.ipsec import SecurityAssociation, ESP
25 from scapy.layers.l2 import Ether
26 from scapy.packet import Raw
27
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
30
31
32 def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
33     """Check received IPsec packet.
34
35     :param pkt_recv: Received packet to verify.
36     :param ip_layer: Scapy IP layer.
37     :param dst_tun: IPsec tunnel destination address.
38     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
39     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
40     :param sa_in: IPsec SA for packet decryption.
41     :type pkt_recv: scapy.Ether
42     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
43     :type dst_tun: str
44     :type src_ip: str
45     :type dst_ip: str
46     :type sa_in: scapy.layers.ipsec.SecurityAssociation
47     :raises RuntimeError: If received packet is invalid.
48     """
49     if not pkt_recv.haslayer(ip_layer):
50         raise RuntimeError(
51             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
52         )
53
54     if pkt_recv[ip_layer].dst != dst_tun:
55         raise RuntimeError(
56             f"Received packet has invalid destination address: "
57             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
58         )
59
60     if not pkt_recv.haslayer(ESP):
61         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
62
63     ip_pkt = pkt_recv[ip_layer]
64     d_pkt = sa_in.decrypt(ip_pkt)
65
66     if d_pkt[ip_layer].dst != dst_ip:
67         raise RuntimeError(
68             f"Decrypted packet has invalid destination address: "
69             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
70         )
71
72     if d_pkt[ip_layer].src != src_ip:
73         raise RuntimeError(
74             f"Decrypted packet has invalid source address: "
75             f"{d_pkt[ip_layer].src} should be: {src_ip}"
76         )
77
78     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
79         raise RuntimeError(
80             f"Decrypted packet has invalid IP protocol: "
81             f"{d_pkt[ip_layer].proto} should be: 61"
82         )
83
84
85 def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
86     """Check received IP/IPv6 packet.
87
88     :param pkt_recv: Received packet to verify.
89     :param ip_layer: Scapy IP layer.
90     :param src_ip: Source IP/IPv6 address.
91     :param dst_ip: Destination IP/IPv6 address.
92     :type pkt_recv: scapy.Ether
93     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
94     :type src_ip: str
95     :type dst_ip: str
96     :raises RuntimeError: If received packet is invalid.
97     """
98     if not pkt_recv.haslayer(ip_layer):
99         raise RuntimeError(
100             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
101         )
102
103     if pkt_recv[ip_layer].dst != dst_ip:
104         raise RuntimeError(
105             f"Received packet has invalid destination address: "
106             f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
107         )
108
109     if pkt_recv[ip_layer].src != src_ip:
110         raise RuntimeError(
111             f"Received packet has invalid destination address: "
112             f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
113         )
114
115     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
116         raise RuntimeError(
117             f"Received packet has invalid IP protocol: "
118             f"{pkt_recv[ip_layer].proto} should be: 61"
119         )
120
121
122 # TODO: Pylint says too-many-locals and too-many-statements. Refactor!
123 def main():
124     """Send and receive IPsec packet."""
125
126     args = TrafficScriptArg(
127         [
128             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
129             u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
130             u"integ_key", u"l_spi", u"r_spi"
131         ],
132         [u"src_tun", u"dst_tun"]
133     )
134
135     tx_txq = TxQueue(args.get_arg(u"tx_if"))
136     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
137     rx_txq = TxQueue(args.get_arg(u"rx_if"))
138     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
139
140     tx_src_mac = args.get_arg(u"tx_src_mac")
141     tx_dst_mac = args.get_arg(u"tx_dst_mac")
142     rx_src_mac = args.get_arg(u"rx_src_mac")
143     rx_dst_mac = args.get_arg(u"rx_dst_mac")
144     src_ip = args.get_arg(u"src_ip")
145     dst_ip = args.get_arg(u"dst_ip")
146     crypto_alg = args.get_arg(u"crypto_alg")
147     crypto_key = args.get_arg(u"crypto_key")
148     integ_alg = args.get_arg(u"integ_alg")
149     integ_key = args.get_arg(u"integ_key")
150     l_spi = int(args.get_arg(u"l_spi"))
151     r_spi = int(args.get_arg(u"r_spi"))
152     src_tun = args.get_arg(u"src_tun")
153     dst_tun = args.get_arg(u"dst_tun")
154
155     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
156
157     tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
158         else None
159     tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \
160         else None
161
162     if not (src_tun and dst_tun):
163         src_tun = src_ip
164
165     sa_in = SecurityAssociation(
166         ESP, spi=r_spi, crypt_algo=crypto_alg,
167         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
168         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
169     )
170
171     sa_out = SecurityAssociation(
172         ESP, spi=l_spi, crypt_algo=crypto_alg,
173         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
174         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
175     )
176
177     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
178         else ip_layer(src=src_ip, dst=dst_ip)
179     ip_pkt = ip_layer(ip_pkt)
180
181     e_pkt = sa_out.encrypt(ip_pkt)
182     tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
183                    e_pkt)
184
185     sent_packets = list()
186     tx_pkt_send /= Raw()
187     sent_packets.append(tx_pkt_send)
188     tx_txq.send(tx_pkt_send)
189
190     while True:
191         rx_pkt_recv = rx_rxq.recv(2)
192
193         if rx_pkt_recv is None:
194             raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
195
196         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
197             # read another packet in the queue if the current one is ICMPv6ND_NS
198             continue
199         else:
200             # otherwise process the current packet
201             break
202
203     check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
204
205     rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
206         else ip_layer(src=dst_ip, dst=src_ip)
207     rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
208                    rx_ip_pkt)
209
210     rx_pkt_send /= Raw()
211     rx_txq.send(rx_pkt_send)
212
213     while True:
214         tx_pkt_recv = tx_rxq.recv(2, sent_packets)
215
216         if tx_pkt_recv is None:
217             raise RuntimeError(u"ESP packet Rx timeout")
218
219         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
220             # read another packet in the queue if the current one is ICMPv6ND_NS
221             continue
222         else:
223             # otherwise process the current packet
224             break
225
226     check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in)
227
228     sys.exit(0)
229
230
231 if __name__ == u"__main__":
232     main()