Python3: resources and libraries
[csit.git] / resources / traffic_scripts / ipsec.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2019 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script for IPsec verification."""
17
18 import sys
19 import logging
20
21 from ipaddress import ip_address
22 # pylint: disable=no-name-in-module
23 # pylint: disable=import-error
24 logging.getLogger(u"scapy.runtime").setLevel(logging.ERROR)
25 from scapy.layers.inet import IP
26 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
27 from scapy.layers.ipsec import SecurityAssociation, ESP
28 from scapy.layers.l2 import Ether
29 from scapy.packet import Raw
30
31 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
32 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
33
34
35 def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
36     """Check received IPsec packet.
37
38     :param pkt_recv: Received packet to verify.
39     :param ip_layer: Scapy IP layer.
40     :param dst_tun: IPsec tunnel destination address.
41     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
42     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
43     :param sa_in: IPsec SA for packet decryption.
44     :type pkt_recv: scapy.Ether
45     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
46     :type dst_tun: str
47     :type src_ip: str
48     :type dst_ip: str
49     :type sa_in: scapy.layers.ipsec.SecurityAssociation
50     :raises RuntimeError: If received packet is invalid.
51     """
52     if not pkt_recv.haslayer(ip_layer):
53         raise RuntimeError(
54             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
55         )
56
57     if pkt_recv[ip_layer].dst != dst_tun:
58         raise RuntimeError(
59             f"Received packet has invalid destination address: "
60             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
61         )
62
63     if not pkt_recv.haslayer(ESP):
64         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
65
66     ip_pkt = pkt_recv[ip_layer]
67     d_pkt = sa_in.decrypt(ip_pkt)
68
69     if d_pkt[ip_layer].dst != dst_ip:
70         raise RuntimeError(
71             f"Decrypted packet has invalid destination address: "
72             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
73         )
74
75     if d_pkt[ip_layer].src != src_ip:
76         raise RuntimeError(
77             f"Decrypted packet has invalid source address: "
78             f"{d_pkt[ip_layer].src} should be: {src_ip}"
79         )
80
81     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
82         raise RuntimeError(
83             f"Decrypted packet has invalid IP protocol: "
84             f"{d_pkt[ip_layer].proto} should be: 61"
85         )
86
87
88 def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
89     """Check received IP/IPv6 packet.
90
91     :param pkt_recv: Received packet to verify.
92     :param ip_layer: Scapy IP layer.
93     :param src_ip: Source IP/IPv6 address.
94     :param dst_ip: Destination IP/IPv6 address.
95     :type pkt_recv: scapy.Ether
96     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
97     :type src_ip: str
98     :type dst_ip: str
99     :raises RuntimeError: If received packet is invalid.
100     """
101     if not pkt_recv.haslayer(ip_layer):
102         raise RuntimeError(
103             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
104         )
105
106     if pkt_recv[ip_layer].dst != dst_ip:
107         raise RuntimeError(
108             f"Received packet has invalid destination address: "
109             f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
110         )
111
112     if pkt_recv[ip_layer].src != src_ip:
113         raise RuntimeError(
114             f"Received packet has invalid destination address: "
115             f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
116         )
117
118     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
119         raise RuntimeError(
120             f"Received packet has invalid IP protocol: "
121             f"{pkt_recv[ip_layer].proto} should be: 61"
122         )
123
124
125 # pylint: disable=too-many-locals
126 # pylint: disable=too-many-statements
127 def main():
128     """Send and receive IPsec packet."""
129
130     args = TrafficScriptArg(
131         [
132             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
133             u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
134             u"integ_key", u"l_spi", u"r_spi"
135         ],
136         [u"src_tun", u"dst_tun"]
137     )
138
139     tx_txq = TxQueue(args.get_arg(u"tx_if"))
140     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
141     rx_txq = TxQueue(args.get_arg(u"rx_if"))
142     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
143
144     tx_src_mac = args.get_arg(u"tx_src_mac")
145     tx_dst_mac = args.get_arg(u"tx_dst_mac")
146     rx_src_mac = args.get_arg(u"rx_src_mac")
147     rx_dst_mac = args.get_arg(u"rx_dst_mac")
148     src_ip = args.get_arg(u"src_ip")
149     dst_ip = args.get_arg(u"dst_ip")
150     crypto_alg = args.get_arg(u"crypto_alg")
151     crypto_key = args.get_arg(u"crypto_key")
152     integ_alg = args.get_arg(u"integ_alg")
153     integ_key = args.get_arg(u"integ_key")
154     l_spi = int(args.get_arg(u"l_spi"))
155     r_spi = int(args.get_arg(u"r_spi"))
156     src_tun = args.get_arg(u"src_tun")
157     dst_tun = args.get_arg(u"dst_tun")
158
159     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
160
161     tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
162         else None
163     tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \
164         else None
165
166     if not (src_tun and dst_tun):
167         src_tun = src_ip
168
169     sa_in = SecurityAssociation(
170         ESP, spi=r_spi, crypt_algo=crypto_alg,
171         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
172         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
173     )
174
175     sa_out = SecurityAssociation(
176         ESP, spi=l_spi, crypt_algo=crypto_alg,
177         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
178         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
179     )
180
181     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
182         else ip_layer(src=src_ip, dst=dst_ip)
183     ip_pkt = ip_layer(ip_pkt)
184
185     e_pkt = sa_out.encrypt(ip_pkt)
186     tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
187                    e_pkt)
188
189     sent_packets = list()
190     tx_pkt_send /= Raw()
191     sent_packets.append(tx_pkt_send)
192     tx_txq.send(tx_pkt_send)
193
194     while True:
195         rx_pkt_recv = rx_rxq.recv(2)
196
197         if rx_pkt_recv is None:
198             raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
199
200         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
201             # read another packet in the queue if the current one is ICMPv6ND_NS
202             continue
203         else:
204             # otherwise process the current packet
205             break
206
207     check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
208
209     rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
210         else ip_layer(src=dst_ip, dst=src_ip)
211     rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
212                    rx_ip_pkt)
213
214     rx_pkt_send /= Raw()
215     rx_txq.send(rx_pkt_send)
216
217     while True:
218         tx_pkt_recv = tx_rxq.recv(2, sent_packets)
219
220         if tx_pkt_recv is None:
221             raise RuntimeError(u"ESP packet Rx timeout")
222
223         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
224             # read another packet in the queue if the current one is ICMPv6ND_NS
225             continue
226         else:
227             # otherwise process the current packet
228             break
229
230     check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in)
231
232     sys.exit(0)
233
234
235 if __name__ == u"__main__":
236     main()