IPSEC Tests: to per-test setup and tearDown
[vpp.git] / test / test_ipsec_nat.py
1 #!/usr/bin/env python
2
3 import socket
4
5 from scapy.layers.l2 import Ether
6 from scapy.layers.inet import ICMP, IP, TCP, UDP
7 from scapy.layers.ipsec import SecurityAssociation, ESP
8 from util import ppp, ppc
9 from template_ipsec import TemplateIpsec
10
11
12 class IPSecNATTestCase(TemplateIpsec):
13     """ IPSec/NAT
14     TUNNEL MODE:
15
16
17      public network  |   private network
18      ---   encrypt  ---   plain   ---
19     |pg0| <------- |VPP| <------ |pg1|
20      ---            ---           ---
21
22      ---   decrypt  ---   plain   ---
23     |pg0| -------> |VPP| ------> |pg1|
24      ---            ---           ---
25     """
26
27     tcp_port_in = 6303
28     tcp_port_out = 6303
29     udp_port_in = 6304
30     udp_port_out = 6304
31     icmp_id_in = 6305
32     icmp_id_out = 6305
33
34     def setUp(self):
35         super(IPSecNATTestCase, self).setUp()
36         self.tun_if = self.pg0
37         self.vapi.ipsec_spd_add_del(self.tun_spd_id)
38         self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id,
39                                               self.tun_if.sw_if_index)
40         p = self.ipv4_params
41         self.config_esp_tun(p)
42         self.logger.info(self.vapi.ppcli("show ipsec"))
43         src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
44         self.vapi.ip_add_del_route(src, p.addr_len,
45                                    self.tun_if.remote_addr_n[p.addr_type],
46                                    is_ipv6=p.is_ipv6)
47
48     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
49         return [
50             # TCP
51             Ether(src=src_mac, dst=dst_mac) /
52             IP(src=src_ip, dst=dst_ip) /
53             TCP(sport=self.tcp_port_in, dport=20),
54             # UDP
55             Ether(src=src_mac, dst=dst_mac) /
56             IP(src=src_ip, dst=dst_ip) /
57             UDP(sport=self.udp_port_in, dport=20),
58             # ICMP
59             Ether(src=src_mac, dst=dst_mac) /
60             IP(src=src_ip, dst=dst_ip) /
61             ICMP(id=self.icmp_id_in, type='echo-request')
62         ]
63
64     def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
65         return [
66             # TCP
67             Ether(src=src_mac, dst=dst_mac) /
68             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
69                        TCP(dport=self.tcp_port_out, sport=20)),
70             # UDP
71             Ether(src=src_mac, dst=dst_mac) /
72             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
73                        UDP(dport=self.udp_port_out, sport=20)),
74             # ICMP
75             Ether(src=src_mac, dst=dst_mac) /
76             sa.encrypt(IP(src=src_ip, dst=dst_ip) /
77                        ICMP(id=self.icmp_id_out, type='echo-request'))
78         ]
79
80     def verify_capture_plain(self, capture):
81         for packet in capture:
82             try:
83                 self.assert_packet_checksums_valid(packet)
84                 self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
85                                   "decrypted packet source address")
86                 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
87                                   "decrypted packet destination address")
88                 if packet.haslayer(TCP):
89                     self.assertFalse(
90                         packet.haslayer(UDP),
91                         "unexpected UDP header in decrypted packet")
92                     self.assert_equal(packet[TCP].dport, self.tcp_port_in,
93                                       "decrypted packet TCP destination port")
94                 elif packet.haslayer(UDP):
95                     if packet[UDP].payload:
96                         self.assertFalse(
97                             packet[UDP][1].haslayer(UDP),
98                             "unexpected UDP header in decrypted packet")
99                     self.assert_equal(packet[UDP].dport, self.udp_port_in,
100                                       "decrypted packet UDP destination port")
101                 else:
102                     self.assertFalse(
103                         packet.haslayer(UDP),
104                         "unexpected UDP header in decrypted packet")
105                     self.assert_equal(packet[ICMP].id, self.icmp_id_in,
106                                       "decrypted packet ICMP ID")
107             except Exception:
108                 self.logger.error(
109                     ppp("Unexpected or invalid plain packet:", packet))
110                 raise
111
112     def verify_capture_encrypted(self, capture, sa):
113         for packet in capture:
114             try:
115                 copy = packet.__class__(str(packet))
116                 del copy[UDP].len
117                 copy = packet.__class__(str(copy))
118                 self.assert_equal(packet[UDP].len, copy[UDP].len,
119                                   "UDP header length")
120                 self.assert_packet_checksums_valid(packet)
121                 self.assertIn(ESP, packet[IP])
122                 decrypt_pkt = sa.decrypt(packet[IP])
123                 self.assert_packet_checksums_valid(decrypt_pkt)
124                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
125                                   "encrypted packet source address")
126                 self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
127                                   "encrypted packet destination address")
128             except Exception:
129                 self.logger.error(
130                     ppp("Unexpected or invalid encrypted packet:", packet))
131                 raise
132
133     def config_esp_tun(self, params):
134         addr_type = params.addr_type
135         scapy_tun_sa_id = params.scapy_tun_sa_id
136         scapy_tun_spi = params.scapy_tun_spi
137         vpp_tun_sa_id = params.vpp_tun_sa_id
138         vpp_tun_spi = params.vpp_tun_spi
139         auth_algo_vpp_id = params.auth_algo_vpp_id
140         auth_key = params.auth_key
141         crypt_algo_vpp_id = params.crypt_algo_vpp_id
142         crypt_key = params.crypt_key
143         addr_any = params.addr_any
144         addr_bcast = params.addr_bcast
145         self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
146                                           auth_algo_vpp_id, auth_key,
147                                           crypt_algo_vpp_id, crypt_key,
148                                           self.vpp_esp_protocol,
149                                           self.pg1.remote_addr_n[addr_type],
150                                           self.tun_if.remote_addr_n[addr_type],
151                                           udp_encap=1)
152         self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
153                                           auth_algo_vpp_id, auth_key,
154                                           crypt_algo_vpp_id, crypt_key,
155                                           self.vpp_esp_protocol,
156                                           self.tun_if.remote_addr_n[addr_type],
157                                           self.pg1.remote_addr_n[addr_type],
158                                           udp_encap=1)
159         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
160         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
161         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
162                                           l_startaddr, l_stopaddr, r_startaddr,
163                                           r_stopaddr,
164                                           protocol=socket.IPPROTO_ESP)
165         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
166                                           l_startaddr, l_stopaddr, r_startaddr,
167                                           r_stopaddr, is_outbound=0,
168                                           protocol=socket.IPPROTO_ESP)
169         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
170                                           l_startaddr, l_stopaddr, r_startaddr,
171                                           r_stopaddr, remote_port_start=4500,
172                                           remote_port_stop=4500,
173                                           protocol=socket.IPPROTO_UDP)
174         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
175                                           l_startaddr, l_stopaddr, r_startaddr,
176                                           r_stopaddr, remote_port_start=4500,
177                                           remote_port_stop=4500,
178                                           protocol=socket.IPPROTO_UDP,
179                                           is_outbound=0)
180         l_startaddr = l_stopaddr = self.tun_if.remote_addr_n[addr_type]
181         r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type]
182         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
183                                           l_startaddr, l_stopaddr, r_startaddr,
184                                           r_stopaddr, priority=10, policy=3,
185                                           is_outbound=0)
186         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
187                                           r_startaddr, r_stopaddr, l_startaddr,
188                                           l_stopaddr, priority=10, policy=3)
189
190     def test_ipsec_nat_tun(self):
191         """ IPSec/NAT tunnel test case """
192         p = self.ipv4_params
193         scapy_tun_sa = SecurityAssociation(ESP, spi=p.scapy_tun_spi,
194                                            crypt_algo=p.crypt_algo,
195                                            crypt_key=p.crypt_key,
196                                            auth_algo=p.auth_algo,
197                                            auth_key=p.auth_key,
198                                            tunnel_header=IP(
199                                                src=self.pg1.remote_ip4,
200                                                dst=self.tun_if.remote_ip4),
201                                            nat_t_header=UDP(
202                                                sport=4500,
203                                                dport=4500))
204         # in2out - from private network to public
205         pkts = self.create_stream_plain(
206             self.pg1.remote_mac, self.pg1.local_mac,
207             self.pg1.remote_ip4, self.tun_if.remote_ip4)
208         self.pg1.add_stream(pkts)
209         self.pg_enable_capture(self.pg_interfaces)
210         self.pg_start()
211         capture = self.tun_if.get_capture(len(pkts))
212         self.verify_capture_encrypted(capture, scapy_tun_sa)
213
214         vpp_tun_sa = SecurityAssociation(ESP,
215                                          spi=p.vpp_tun_spi,
216                                          crypt_algo=p.crypt_algo,
217                                          crypt_key=p.crypt_key,
218                                          auth_algo=p.auth_algo,
219                                          auth_key=p.auth_key,
220                                          tunnel_header=IP(
221                                              src=self.tun_if.remote_ip4,
222                                              dst=self.pg1.remote_ip4),
223                                          nat_t_header=UDP(
224                                              sport=4500,
225                                              dport=4500))
226
227         # out2in - from public network to private
228         pkts = self.create_stream_encrypted(
229             self.tun_if.remote_mac, self.tun_if.local_mac,
230             self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
231         self.logger.info(ppc("Sending packets:", pkts))
232         self.tun_if.add_stream(pkts)
233         self.pg_enable_capture(self.pg_interfaces)
234         self.pg_start()
235         capture = self.pg1.get_capture(len(pkts))
236         self.verify_capture_plain(capture)