9e0b9ea912e754816fc31950f6b20113ec8672ad
[vpp.git] / test / test_ipsec_nat.py
1 #!/usr/bin/env python
2
3 import socket
4
5 from scapy.layers.l2 import Ether
6 from scapy.layers.inet import ICMP, IP, TCP, UDP
7 from scapy.layers.ipsec import SecurityAssociation, ESP
8 from util import ppp, ppc
9 from template_ipsec import TemplateIpsec
10
11
12 class IPSecNATTestCase(TemplateIpsec):
13     """ IPSec/NAT
14     TUNNEL MODE:
15
16
17      public network  |   private network
18      ---   encrypt  ---   plain   ---
19     |pg0| <------- |VPP| <------ |pg1|
20      ---            ---           ---
21
22      ---   decrypt  ---   plain   ---
23     |pg0| -------> |VPP| ------> |pg1|
24      ---            ---           ---
25     """
26
27     tcp_port_in = 6303
28     tcp_port_out = 6303
29     udp_port_in = 6304
30     udp_port_out = 6304
31     icmp_id_in = 6305
32     icmp_id_out = 6305
33
34     @classmethod
35     def setUpClass(cls):
36         super(IPSecNATTestCase, cls).setUpClass()
37         cls.tun_if = cls.pg0
38         cls.config_esp_tun()
39         cls.logger.info(cls.vapi.ppcli("show ipsec"))
40         client = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
41         cls.vapi.ip_add_del_route(client, 32, cls.tun_if.remote_ip4n)
42
43     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
44         return [
45             # TCP
46             Ether(src=src_mac, dst=dst_mac) /
47             IP(src=src_ip, dst=dst_ip) /
48             TCP(sport=self.tcp_port_in, dport=20),
49             # UDP
50             Ether(src=src_mac, dst=dst_mac) /
51             IP(src=src_ip, dst=dst_ip) /
52             UDP(sport=self.udp_port_in, dport=20),
53             # ICMP
54             Ether(src=src_mac, dst=dst_mac) /
55             IP(src=src_ip, dst=dst_ip) /
56             ICMP(id=self.icmp_id_in, type='echo-request')
57         ]
58
59     def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
60         return [
61             # TCP
62             Ether(src=src_mac, dst=dst_mac) /
63             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
64                        TCP(dport=self.tcp_port_out, sport=20)),
65             # UDP
66             Ether(src=src_mac, dst=dst_mac) /
67             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
68                        UDP(dport=self.udp_port_out, sport=20)),
69             # ICMP
70             Ether(src=src_mac, dst=dst_mac) /
71             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
72                        ICMP(id=self.icmp_id_out, type='echo-request'))
73         ]
74
75     def verify_capture_plain(self, capture):
76         for packet in capture:
77             try:
78                 self.assert_packet_checksums_valid(packet)
79                 self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
80                                   "decrypted packet source address")
81                 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
82                                   "decrypted packet destination address")
83                 if packet.haslayer(TCP):
84                     self.assertFalse(
85                         packet.haslayer(UDP),
86                         "unexpected UDP header in decrypted packet")
87                     self.assert_equal(packet[TCP].dport, self.tcp_port_in,
88                                       "decrypted packet TCP destination port")
89                 elif packet.haslayer(UDP):
90                     if packet[UDP].payload:
91                         self.assertFalse(
92                             packet[UDP][1].haslayer(UDP),
93                             "unexpected UDP header in decrypted packet")
94                     self.assert_equal(packet[UDP].dport, self.udp_port_in,
95                                       "decrypted packet UDP destination port")
96                 else:
97                     self.assertFalse(
98                         packet.haslayer(UDP),
99                         "unexpected UDP header in decrypted packet")
100                     self.assert_equal(packet[ICMP].id, self.icmp_id_in,
101                                       "decrypted packet ICMP ID")
102             except Exception:
103                 self.logger.error(
104                     ppp("Unexpected or invalid plain packet:", packet))
105                 raise
106
107     def verify_capture_encrypted(self, capture, sa):
108         for packet in capture:
109             try:
110                 copy = packet.__class__(str(packet))
111                 del copy[UDP].len
112                 copy = packet.__class__(str(copy))
113                 self.assert_equal(packet[UDP].len, copy[UDP].len,
114                                   "UDP header length")
115                 self.assert_packet_checksums_valid(packet)
116                 self.assertIn(ESP, packet[IP])
117                 decrypt_pkt = sa.decrypt(packet[IP])
118                 self.assert_packet_checksums_valid(decrypt_pkt)
119                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
120                                   "encrypted packet source address")
121                 self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
122                                   "encrypted packet destination address")
123             except Exception:
124                 self.logger.error(
125                     ppp("Unexpected or invalid encrypted packet:", packet))
126                 raise
127
128     @classmethod
129     def config_esp_tun(cls):
130         cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
131                                          cls.scapy_tun_spi,
132                                          cls.auth_algo_vpp_id, cls.auth_key,
133                                          cls.crypt_algo_vpp_id,
134                                          cls.crypt_key, cls.vpp_esp_protocol,
135                                          cls.pg1.remote_ip4n,
136                                          cls.tun_if.remote_ip4n,
137                                          udp_encap=1)
138         cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
139                                          cls.vpp_tun_spi,
140                                          cls.auth_algo_vpp_id, cls.auth_key,
141                                          cls.crypt_algo_vpp_id,
142                                          cls.crypt_key, cls.vpp_esp_protocol,
143                                          cls.tun_if.remote_ip4n,
144                                          cls.pg1.remote_ip4n,
145                                          udp_encap=1)
146         cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
147         cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
148                                              cls.tun_if.sw_if_index)
149         l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
150                                                      "0.0.0.0")
151         l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
152                                                    "255.255.255.255")
153         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
154                                          l_startaddr, l_stopaddr, r_startaddr,
155                                          r_stopaddr,
156                                          protocol=socket.IPPROTO_ESP)
157         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
158                                          l_startaddr, l_stopaddr, r_startaddr,
159                                          r_stopaddr, is_outbound=0,
160                                          protocol=socket.IPPROTO_ESP)
161         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
162                                          l_startaddr, l_stopaddr, r_startaddr,
163                                          r_stopaddr, remote_port_start=4500,
164                                          remote_port_stop=4500,
165                                          protocol=socket.IPPROTO_UDP)
166         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
167                                          l_startaddr, l_stopaddr, r_startaddr,
168                                          r_stopaddr, remote_port_start=4500,
169                                          remote_port_stop=4500,
170                                          protocol=socket.IPPROTO_UDP,
171                                          is_outbound=0)
172         l_startaddr = l_stopaddr = cls.tun_if.remote_ip4n
173         r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
174         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
175                                          l_startaddr, l_stopaddr, r_startaddr,
176                                          r_stopaddr, priority=10, policy=3,
177                                          is_outbound=0)
178         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
179                                          r_startaddr, r_stopaddr, l_startaddr,
180                                          l_stopaddr, priority=10, policy=3)
181
182     def test_ipsec_nat_tun(self):
183         """ IPSec/NAT tunnel test case """
184         scapy_tun_sa = SecurityAssociation(ESP, spi=self.scapy_tun_spi,
185                                            crypt_algo=self.crypt_algo,
186                                            crypt_key=self.crypt_key,
187                                            auth_algo=self.auth_algo,
188                                            auth_key=self.auth_key,
189                                            tunnel_header=IP(
190                                                src=self.pg1.remote_ip4,
191                                                dst=self.tun_if.remote_ip4),
192                                            nat_t_header=UDP(
193                                                sport=4500,
194                                                dport=4500))
195         # in2out - from private network to public
196         pkts = self.create_stream_plain(
197             self.pg1.remote_mac, self.pg1.local_mac,
198             self.pg1.remote_ip4, self.tun_if.remote_ip4)
199         self.pg1.add_stream(pkts)
200         self.pg_enable_capture(self.pg_interfaces)
201         self.pg_start()
202         capture = self.tun_if.get_capture(len(pkts))
203         self.verify_capture_encrypted(capture, scapy_tun_sa)
204
205         vpp_tun_sa = SecurityAssociation(ESP,
206                                          spi=self.vpp_tun_spi,
207                                          crypt_algo=self.crypt_algo,
208                                          crypt_key=self.crypt_key,
209                                          auth_algo=self.auth_algo,
210                                          auth_key=self.auth_key,
211                                          tunnel_header=IP(
212                                              src=self.tun_if.remote_ip4,
213                                              dst=self.pg1.remote_ip4),
214                                          nat_t_header=UDP(
215                                              sport=4500,
216                                              dport=4500))
217
218         # out2in - from public network to private
219         pkts = self.create_stream_encrypted(
220             self.tun_if.remote_mac, self.tun_if.local_mac,
221             self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
222         self.logger.info(ppc("Sending packets:", pkts))
223         self.tun_if.add_stream(pkts)
224         self.pg_enable_capture(self.pg_interfaces)
225         self.pg_start()
226         capture = self.pg1.get_capture(len(pkts))
227         self.verify_capture_plain(capture)