tcp/session: tx optimizations
[vpp.git] / test / test_ipsec_nat.py
1 #!/usr/bin/env python
2
3 import socket
4
5 from scapy.layers.l2 import Ether
6 from scapy.layers.inet import ICMP, IP, TCP, UDP
7 from scapy.layers.ipsec import SecurityAssociation, ESP
8 from util import ppp, ppc
9 from template_ipsec import TemplateIpsec
10
11
12 class IPSecNATTestCase(TemplateIpsec):
13     """ IPSec/NAT
14
15     TRANSPORT MODE:
16
17      ---   encrypt   ---
18     |pg2| <-------> |VPP|
19      ---   decrypt   ---
20
21     TUNNEL MODE:
22
23
24      public network  |   private network
25      ---   encrypt  ---   plain   ---
26     |pg0| <------- |VPP| <------ |pg1|
27      ---            ---           ---
28
29      ---   decrypt  ---   plain   ---
30     |pg0| -------> |VPP| ------> |pg1|
31      ---            ---           ---
32     """
33
34     @classmethod
35     def setUpClass(cls):
36         super(IPSecNATTestCase, cls).setUpClass()
37         cls.tcp_port_in = 6303
38         cls.tcp_port_out = 6303
39         cls.udp_port_in = 6304
40         cls.udp_port_out = 6304
41         cls.icmp_id_in = 6305
42         cls.icmp_id_out = 6305
43         cls.tun_if = cls.pg0
44         cls.config_esp_tun()
45         cls.logger.info(cls.vapi.ppcli("show ipsec"))
46         client = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
47         cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
48
49     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
50         return [
51             # TCP
52             Ether(src=src_mac, dst=dst_mac) /
53             IP(src=src_ip, dst=dst_ip) /
54             TCP(sport=self.tcp_port_in, dport=20),
55             # UDP
56             Ether(src=src_mac, dst=dst_mac) /
57             IP(src=src_ip, dst=dst_ip) /
58             UDP(sport=self.udp_port_in, dport=20),
59             # ICMP
60             Ether(src=src_mac, dst=dst_mac) /
61             IP(src=src_ip, dst=dst_ip) /
62             ICMP(id=self.icmp_id_in, type='echo-request')
63         ]
64
65     def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
66         return [
67             # TCP
68             Ether(src=src_mac, dst=dst_mac) /
69             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
70                        TCP(dport=self.tcp_port_out, sport=20)),
71             # UDP
72             Ether(src=src_mac, dst=dst_mac) /
73             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
74                        UDP(dport=self.udp_port_out, sport=20)),
75             # ICMP
76             Ether(src=src_mac, dst=dst_mac) /
77             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
78                        ICMP(id=self.icmp_id_out, type='echo-request'))
79         ]
80
81     def verify_capture_plain(self, capture):
82         for packet in capture:
83             try:
84                 self.assert_packet_checksums_valid(packet)
85                 self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
86                                   "decrypted packet source address")
87                 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
88                                   "decrypted packet destination address")
89                 if packet.haslayer(TCP):
90                     self.assertFalse(
91                         packet.haslayer(UDP),
92                         "unexpected UDP header in decrypted packet")
93                     self.assert_equal(packet[TCP].dport, self.tcp_port_in,
94                                       "decrypted packet TCP destination port")
95                 elif packet.haslayer(UDP):
96                     if packet[UDP].payload:
97                         self.assertFalse(
98                             packet[UDP][1].haslayer(UDP),
99                             "unexpected UDP header in decrypted packet")
100                     self.assert_equal(packet[UDP].dport, self.udp_port_in,
101                                       "decrypted packet UDP destination port")
102                 else:
103                     self.assertFalse(
104                         packet.haslayer(UDP),
105                         "unexpected UDP header in decrypted packet")
106                     self.assert_equal(packet[ICMP].id, self.icmp_id_in,
107                                       "decrypted packet ICMP ID")
108             except Exception:
109                 self.logger.error(
110                     ppp("Unexpected or invalid plain packet:", packet))
111                 raise
112
113     def verify_capture_encrypted(self, capture, sa):
114         for packet in capture:
115             try:
116                 self.assertIn(ESP, packet[IP])
117                 decrypt_pkt = sa.decrypt(packet[IP])
118                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
119                                   "encrypted packet source address")
120                 self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
121                                   "encrypted packet destination address")
122                 # if decrypt_pkt.haslayer(TCP):
123                 #     self.tcp_port_out = decrypt_pkt[TCP].sport
124                 # elif decrypt_pkt.haslayer(UDP):
125                 #     self.udp_port_out = decrypt_pkt[UDP].sport
126                 # else:
127                 #     self.icmp_id_out = decrypt_pkt[ICMP].id
128             except Exception:
129                 self.logger.error(
130                     ppp("Unexpected or invalid encrypted packet:", packet))
131                 raise
132
133     @classmethod
134     def config_esp_tun(cls):
135         cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
136                                          cls.scapy_tun_spi,
137                                          cls.auth_algo_vpp_id, cls.auth_key,
138                                          cls.crypt_algo_vpp_id,
139                                          cls.crypt_key, cls.vpp_esp_protocol,
140                                          cls.pg1.remote_ip4n,
141                                          cls.tun_if.remote_ip4n)
142         cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
143                                          cls.vpp_tun_spi,
144                                          cls.auth_algo_vpp_id, cls.auth_key,
145                                          cls.crypt_algo_vpp_id,
146                                          cls.crypt_key, cls.vpp_esp_protocol,
147                                          cls.tun_if.remote_ip4n,
148                                          cls.pg1.remote_ip4n)
149         cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
150         cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
151                                              cls.tun_if.sw_if_index)
152         l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
153                                                      "0.0.0.0")
154         l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
155                                                    "255.255.255.255")
156         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
157                                          l_startaddr, l_stopaddr, r_startaddr,
158                                          r_stopaddr,
159                                          protocol=socket.IPPROTO_ESP)
160         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
161                                          l_startaddr, l_stopaddr, r_startaddr,
162                                          r_stopaddr, is_outbound=0,
163                                          protocol=socket.IPPROTO_ESP)
164         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
165                                          l_startaddr, l_stopaddr, r_startaddr,
166                                          r_stopaddr, remote_port_start=4500,
167                                          remote_port_stop=4500,
168                                          protocol=socket.IPPROTO_UDP)
169         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
170                                          l_startaddr, l_stopaddr, r_startaddr,
171                                          r_stopaddr, remote_port_start=4500,
172                                          remote_port_stop=4500,
173                                          protocol=socket.IPPROTO_UDP,
174                                          is_outbound=0)
175         l_startaddr = l_stopaddr = cls.tun_if.remote_ip4n
176         r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
177         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
178                                          l_startaddr, l_stopaddr, r_startaddr,
179                                          r_stopaddr, priority=10, policy=3,
180                                          is_outbound=0)
181         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
182                                          r_startaddr, r_stopaddr, l_startaddr,
183                                          l_stopaddr, priority=10, policy=3)
184
185     def test_ipsec_nat_tun(self):
186         """ IPSec/NAT tunnel test case """
187         scapy_tun_sa = SecurityAssociation(ESP,
188                                            spi=self.scapy_tun_spi,
189                                            crypt_algo=self.crypt_algo,
190                                            crypt_key=self.crypt_key,
191                                            auth_algo=self.auth_algo,
192                                            auth_key=self.auth_key,
193                                            tunnel_header=IP(
194                                                src=self.pg1.remote_ip4,
195                                                dst=self.tun_if.remote_ip4),
196                                            nat_t_header=UDP(
197                                                sport=4500,
198                                                dport=4500))
199         # in2out - from private network to public
200         pkts = self.create_stream_plain(
201             self.pg1.remote_mac, self.pg1.local_mac,
202             self.pg1.remote_ip4, self.tun_if.remote_ip4)
203         self.pg1.add_stream(pkts)
204         self.pg_enable_capture(self.pg_interfaces)
205         self.pg_start()
206         capture = self.tun_if.get_capture(len(pkts))
207         self.verify_capture_encrypted(capture, scapy_tun_sa)
208
209         vpp_tun_sa = SecurityAssociation(ESP,
210                                          spi=self.vpp_tun_spi,
211                                          crypt_algo=self.crypt_algo,
212                                          crypt_key=self.crypt_key,
213                                          auth_algo=self.auth_algo,
214                                          auth_key=self.auth_key,
215                                          tunnel_header=IP(
216                                              src=self.tun_if.remote_ip4,
217                                              dst=self.pg1.remote_ip4),
218                                          nat_t_header=UDP(
219                                              sport=4500,
220                                              dport=4500))
221
222         # out2in - from public network to private
223         pkts = self.create_stream_encrypted(
224             self.tun_if.remote_mac, self.tun_if.local_mac,
225             self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
226         self.logger.info(ppc("Sending packets:", pkts))
227         self.tun_if.add_stream(pkts)
228         self.pg_enable_capture(self.pg_interfaces)
229         self.pg_start()
230         capture = self.pg1.get_capture(len(pkts))
231         self.verify_capture_plain(capture)