PAL: Add processing of throughput in Gbps
[csit.git] / GPL / traffic_scripts / ipsec_policy.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script for IPsec verification."""
17
18 import sys
19
20 from ipaddress import ip_address
21 from scapy.layers.inet import IP
22 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
23 from scapy.layers.ipsec import SecurityAssociation, ESP
24 from scapy.layers.l2 import Ether
25 from scapy.packet import Raw
26
27 from .PacketVerifier import RxQueue, TxQueue
28 from .TrafficScriptArg import TrafficScriptArg
29
30
31 def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
32     """Check received IPsec packet.
33
34     :param pkt_recv: Received packet to verify.
35     :param ip_layer: Scapy IP layer.
36     :param dst_tun: IPsec tunnel destination address.
37     :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet.
38     :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet.
39     :param sa_in: IPsec SA for packet decryption.
40     :type pkt_recv: scapy.Ether
41     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
42     :type dst_tun: str
43     :type src_ip: str
44     :type dst_ip: str
45     :type sa_in: scapy.layers.ipsec.SecurityAssociation
46     :raises RuntimeError: If received packet is invalid.
47     """
48     if not pkt_recv.haslayer(ip_layer):
49         raise RuntimeError(
50             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
51         )
52
53     if pkt_recv[ip_layer].dst != dst_tun:
54         raise RuntimeError(
55             f"Received packet has invalid destination address: "
56             f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
57         )
58
59     if not pkt_recv.haslayer(ESP):
60         raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
61
62     ip_pkt = pkt_recv[ip_layer]
63     d_pkt = sa_in.decrypt(ip_pkt)
64
65     if d_pkt[ip_layer].dst != dst_ip:
66         raise RuntimeError(
67             f"Decrypted packet has invalid destination address: "
68             f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
69         )
70
71     if d_pkt[ip_layer].src != src_ip:
72         raise RuntimeError(
73             f"Decrypted packet has invalid source address: "
74             f"{d_pkt[ip_layer].src} should be: {src_ip}"
75         )
76
77     if ip_layer == IP and d_pkt[ip_layer].proto != 61:
78         raise RuntimeError(
79             f"Decrypted packet has invalid IP protocol: "
80             f"{d_pkt[ip_layer].proto} should be: 61"
81         )
82
83
84 def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
85     """Check received IP/IPv6 packet.
86
87     :param pkt_recv: Received packet to verify.
88     :param ip_layer: Scapy IP layer.
89     :param src_ip: Source IP/IPv6 address.
90     :param dst_ip: Destination IP/IPv6 address.
91     :type pkt_recv: scapy.Ether
92     :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6
93     :type src_ip: str
94     :type dst_ip: str
95     :raises RuntimeError: If received packet is invalid.
96     """
97     if not pkt_recv.haslayer(ip_layer):
98         raise RuntimeError(
99             f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
100         )
101
102     if pkt_recv[ip_layer].dst != dst_ip:
103         raise RuntimeError(
104             f"Received packet has invalid destination address: "
105             f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
106         )
107
108     if pkt_recv[ip_layer].src != src_ip:
109         raise RuntimeError(
110             f"Received packet has invalid destination address: "
111             f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
112         )
113
114     if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
115         raise RuntimeError(
116             f"Received packet has invalid IP protocol: "
117             f"{pkt_recv[ip_layer].proto} should be: 61"
118         )
119
120
121 # TODO: Pylint says too-many-locals and too-many-statements. Refactor!
122 def main():
123     """Send and receive IPsec packet."""
124
125     args = TrafficScriptArg(
126         [
127             u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
128             u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
129             u"integ_key", u"l_spi", u"r_spi"
130         ],
131         [u"src_tun", u"dst_tun"]
132     )
133
134     tx_txq = TxQueue(args.get_arg(u"tx_if"))
135     tx_rxq = RxQueue(args.get_arg(u"tx_if"))
136     rx_txq = TxQueue(args.get_arg(u"rx_if"))
137     rx_rxq = RxQueue(args.get_arg(u"rx_if"))
138
139     tx_src_mac = args.get_arg(u"tx_src_mac")
140     tx_dst_mac = args.get_arg(u"tx_dst_mac")
141     rx_src_mac = args.get_arg(u"rx_src_mac")
142     rx_dst_mac = args.get_arg(u"rx_dst_mac")
143     src_ip = args.get_arg(u"src_ip")
144     dst_ip = args.get_arg(u"dst_ip")
145     crypto_alg = args.get_arg(u"crypto_alg")
146     crypto_key = args.get_arg(u"crypto_key")
147     integ_alg = args.get_arg(u"integ_alg")
148     integ_key = args.get_arg(u"integ_key")
149     l_spi = int(args.get_arg(u"l_spi"))
150     r_spi = int(args.get_arg(u"r_spi"))
151     src_tun = args.get_arg(u"src_tun")
152     dst_tun = args.get_arg(u"dst_tun")
153
154     ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
155
156     tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
157         else None
158     tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \
159         else None
160
161     if not (src_tun and dst_tun):
162         src_tun = src_ip
163
164     sa_in = SecurityAssociation(
165         ESP, spi=r_spi, crypt_algo=crypto_alg,
166         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
167         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
168     )
169
170     sa_out = SecurityAssociation(
171         ESP, spi=l_spi, crypt_algo=crypto_alg,
172         crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
173         auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
174     )
175
176     ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
177         else ip_layer(src=src_ip, dst=dst_ip)
178     ip_pkt = ip_layer(ip_pkt)
179
180     e_pkt = sa_out.encrypt(ip_pkt)
181     tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
182                    e_pkt)
183
184     sent_packets = list()
185     tx_pkt_send /= Raw()
186     sent_packets.append(tx_pkt_send)
187     tx_txq.send(tx_pkt_send)
188
189     while True:
190         rx_pkt_recv = rx_rxq.recv(2)
191
192         if rx_pkt_recv is None:
193             raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
194
195         if rx_pkt_recv.haslayer(ICMPv6ND_NS):
196             # read another packet in the queue if the current one is ICMPv6ND_NS
197             continue
198         else:
199             # otherwise process the current packet
200             break
201
202     check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
203
204     rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \
205         else ip_layer(src=dst_ip, dst=src_ip)
206     rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
207                    rx_ip_pkt)
208
209     rx_pkt_send /= Raw()
210     rx_txq.send(rx_pkt_send)
211
212     while True:
213         tx_pkt_recv = tx_rxq.recv(2, sent_packets)
214
215         if tx_pkt_recv is None:
216             raise RuntimeError(u"ESP packet Rx timeout")
217
218         if tx_pkt_recv.haslayer(ICMPv6ND_NS):
219             # read another packet in the queue if the current one is ICMPv6ND_NS
220             continue
221         else:
222             # otherwise process the current packet
223             break
224
225     check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in)
226
227     sys.exit(0)
228
229
230 if __name__ == u"__main__":
231     main()