builtinurl: mark api as deprecated
[vpp.git] / test / test_ipsec_nat.py
1 #!/usr/bin/env python3
2
3 import socket
4
5 import scapy.compat
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import ICMP, IP, TCP, UDP
8 from scapy.layers.ipsec import SecurityAssociation, ESP
9
10 from util import ppp, ppc
11 from template_ipsec import TemplateIpsec
12 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from vpp_ip import DpoProto
15 from vpp_papi import VppEnum
16
17
18 class IPSecNATTestCase(TemplateIpsec):
19     """IPSec/NAT
20
21     TUNNEL MODE::
22
23          public network  |   private network
24          ---   encrypt  ---   plain   ---
25         |pg0| <------- |VPP| <------ |pg1|
26          ---            ---           ---
27
28          ---   decrypt  ---   plain   ---
29         |pg0| -------> |VPP| ------> |pg1|
30          ---            ---           ---
31
32     """
33
34     tcp_port_in = 6303
35     tcp_port_out = 6303
36     udp_port_in = 6304
37     udp_port_out = 6304
38     icmp_id_in = 6305
39     icmp_id_out = 6305
40
41     @classmethod
42     def setUpClass(cls):
43         super(IPSecNATTestCase, cls).setUpClass()
44
45     @classmethod
46     def tearDownClass(cls):
47         super(IPSecNATTestCase, cls).tearDownClass()
48
49     def setUp(self):
50         super(IPSecNATTestCase, self).setUp()
51         self.tun_if = self.pg0
52
53         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
54         self.tun_spd.add_vpp_config()
55         VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if).add_vpp_config()
56
57         p = self.ipv4_params
58         self.config_esp_tun(p)
59         self.logger.info(self.vapi.ppcli("show ipsec all"))
60
61         d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
62         VppIpRoute(
63             self,
64             p.remote_tun_if_host,
65             p.addr_len,
66             [VppRoutePath(self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d)],
67         ).add_vpp_config()
68
69     def tearDown(self):
70         super(IPSecNATTestCase, self).tearDown()
71
72     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
73         return [
74             # TCP
75             Ether(src=src_mac, dst=dst_mac)
76             / IP(src=src_ip, dst=dst_ip)
77             / TCP(sport=self.tcp_port_in, dport=20),
78             # UDP
79             Ether(src=src_mac, dst=dst_mac)
80             / IP(src=src_ip, dst=dst_ip)
81             / UDP(sport=self.udp_port_in, dport=20),
82             # ICMP
83             Ether(src=src_mac, dst=dst_mac)
84             / IP(src=src_ip, dst=dst_ip)
85             / ICMP(id=self.icmp_id_in, type="echo-request"),
86         ]
87
88     def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
89         return [
90             # TCP
91             Ether(src=src_mac, dst=dst_mac)
92             / sa.encrypt(
93                 IP(src=src_ip, dst=dst_ip) / TCP(dport=self.tcp_port_out, sport=20)
94             ),
95             # UDP
96             Ether(src=src_mac, dst=dst_mac)
97             / sa.encrypt(
98                 IP(src=src_ip, dst=dst_ip) / UDP(dport=self.udp_port_out, sport=20)
99             ),
100             # ICMP
101             Ether(src=src_mac, dst=dst_mac)
102             / sa.encrypt(
103                 IP(src=src_ip, dst=dst_ip)
104                 / ICMP(id=self.icmp_id_out, type="echo-request")
105             ),
106         ]
107
108     def verify_capture_plain(self, capture):
109         for packet in capture:
110             try:
111                 self.assert_packet_checksums_valid(packet)
112                 self.assert_equal(
113                     packet[IP].src,
114                     self.tun_if.remote_ip4,
115                     "decrypted packet source address",
116                 )
117                 self.assert_equal(
118                     packet[IP].dst,
119                     self.pg1.remote_ip4,
120                     "decrypted packet destination address",
121                 )
122                 if packet.haslayer(TCP):
123                     self.assertFalse(
124                         packet.haslayer(UDP),
125                         "unexpected UDP header in decrypted packet",
126                     )
127                     self.assert_equal(
128                         packet[TCP].dport,
129                         self.tcp_port_in,
130                         "decrypted packet TCP destination port",
131                     )
132                 elif packet.haslayer(UDP):
133                     if packet[UDP].payload:
134                         self.assertFalse(
135                             packet[UDP][1].haslayer(UDP),
136                             "unexpected UDP header in decrypted packet",
137                         )
138                     self.assert_equal(
139                         packet[UDP].dport,
140                         self.udp_port_in,
141                         "decrypted packet UDP destination port",
142                     )
143                 else:
144                     self.assertFalse(
145                         packet.haslayer(UDP),
146                         "unexpected UDP header in decrypted packet",
147                     )
148                     self.assert_equal(
149                         packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
150                     )
151             except Exception:
152                 self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
153                 raise
154
155     def verify_capture_encrypted(self, capture, sa):
156         for packet in capture:
157             try:
158                 copy = packet.__class__(scapy.compat.raw(packet))
159                 del copy[UDP].len
160                 copy = packet.__class__(scapy.compat.raw(copy))
161                 self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length")
162                 self.assert_packet_checksums_valid(packet)
163                 self.assertIn(ESP, packet[IP])
164                 decrypt_pkt = sa.decrypt(packet[IP])
165                 self.assert_packet_checksums_valid(decrypt_pkt)
166                 self.assert_equal(
167                     decrypt_pkt[IP].src,
168                     self.pg1.remote_ip4,
169                     "encrypted packet source address",
170                 )
171                 self.assert_equal(
172                     decrypt_pkt[IP].dst,
173                     self.tun_if.remote_ip4,
174                     "encrypted packet destination address",
175                 )
176             except Exception:
177                 self.logger.error(
178                     ppp("Unexpected or invalid encrypted packet:", packet)
179                 )
180                 raise
181
182     def config_esp_tun(self, params):
183         addr_type = params.addr_type
184         scapy_tun_sa_id = params.scapy_tun_sa_id
185         scapy_tun_spi = params.scapy_tun_spi
186         vpp_tun_sa_id = params.vpp_tun_sa_id
187         vpp_tun_spi = params.vpp_tun_spi
188         auth_algo_vpp_id = params.auth_algo_vpp_id
189         auth_key = params.auth_key
190         crypt_algo_vpp_id = params.crypt_algo_vpp_id
191         crypt_key = params.crypt_key
192         addr_any = params.addr_any
193         addr_bcast = params.addr_bcast
194         flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
195         e = VppEnum.vl_api_ipsec_spd_action_t
196
197         VppIpsecSA(
198             self,
199             scapy_tun_sa_id,
200             scapy_tun_spi,
201             auth_algo_vpp_id,
202             auth_key,
203             crypt_algo_vpp_id,
204             crypt_key,
205             self.vpp_esp_protocol,
206             self.pg1.remote_addr[addr_type],
207             self.tun_if.remote_addr[addr_type],
208             flags=flags,
209         ).add_vpp_config()
210         VppIpsecSA(
211             self,
212             vpp_tun_sa_id,
213             vpp_tun_spi,
214             auth_algo_vpp_id,
215             auth_key,
216             crypt_algo_vpp_id,
217             crypt_key,
218             self.vpp_esp_protocol,
219             self.tun_if.remote_addr[addr_type],
220             self.pg1.remote_addr[addr_type],
221             flags=flags,
222         ).add_vpp_config()
223
224         VppIpsecSpdEntry(
225             self,
226             self.tun_spd,
227             scapy_tun_sa_id,
228             addr_any,
229             addr_bcast,
230             addr_any,
231             addr_bcast,
232             socket.IPPROTO_ESP,
233         ).add_vpp_config()
234         VppIpsecSpdEntry(
235             self,
236             self.tun_spd,
237             scapy_tun_sa_id,
238             addr_any,
239             addr_bcast,
240             addr_any,
241             addr_bcast,
242             socket.IPPROTO_ESP,
243             is_outbound=0,
244         ).add_vpp_config()
245         VppIpsecSpdEntry(
246             self,
247             self.tun_spd,
248             scapy_tun_sa_id,
249             addr_any,
250             addr_bcast,
251             addr_any,
252             addr_bcast,
253             socket.IPPROTO_UDP,
254             remote_port_start=4500,
255             remote_port_stop=4500,
256         ).add_vpp_config()
257         VppIpsecSpdEntry(
258             self,
259             self.tun_spd,
260             scapy_tun_sa_id,
261             addr_any,
262             addr_bcast,
263             addr_any,
264             addr_bcast,
265             socket.IPPROTO_UDP,
266             remote_port_start=4500,
267             remote_port_stop=4500,
268             is_outbound=0,
269         ).add_vpp_config()
270         VppIpsecSpdEntry(
271             self,
272             self.tun_spd,
273             vpp_tun_sa_id,
274             self.tun_if.remote_addr[addr_type],
275             self.tun_if.remote_addr[addr_type],
276             self.pg1.remote_addr[addr_type],
277             self.pg1.remote_addr[addr_type],
278             socket.IPPROTO_RAW,
279             priority=10,
280             policy=e.IPSEC_API_SPD_ACTION_PROTECT,
281             is_outbound=0,
282         ).add_vpp_config()
283         VppIpsecSpdEntry(
284             self,
285             self.tun_spd,
286             scapy_tun_sa_id,
287             self.pg1.remote_addr[addr_type],
288             self.pg1.remote_addr[addr_type],
289             self.tun_if.remote_addr[addr_type],
290             self.tun_if.remote_addr[addr_type],
291             socket.IPPROTO_RAW,
292             policy=e.IPSEC_API_SPD_ACTION_PROTECT,
293             priority=10,
294         ).add_vpp_config()
295
296     def test_ipsec_nat_tun(self):
297         """IPSec/NAT tunnel test case"""
298         p = self.ipv4_params
299         scapy_tun_sa = SecurityAssociation(
300             ESP,
301             spi=p.scapy_tun_spi,
302             crypt_algo=p.crypt_algo,
303             crypt_key=p.crypt_key,
304             auth_algo=p.auth_algo,
305             auth_key=p.auth_key,
306             tunnel_header=IP(src=self.pg1.remote_ip4, dst=self.tun_if.remote_ip4),
307             nat_t_header=UDP(sport=4500, dport=4500),
308         )
309         # in2out - from private network to public
310         pkts = self.create_stream_plain(
311             self.pg1.remote_mac,
312             self.pg1.local_mac,
313             self.pg1.remote_ip4,
314             self.tun_if.remote_ip4,
315         )
316         self.pg1.add_stream(pkts)
317         self.pg_enable_capture(self.pg_interfaces)
318         self.pg_start()
319         capture = self.tun_if.get_capture(len(pkts))
320         self.verify_capture_encrypted(capture, scapy_tun_sa)
321
322         vpp_tun_sa = SecurityAssociation(
323             ESP,
324             spi=p.vpp_tun_spi,
325             crypt_algo=p.crypt_algo,
326             crypt_key=p.crypt_key,
327             auth_algo=p.auth_algo,
328             auth_key=p.auth_key,
329             tunnel_header=IP(src=self.tun_if.remote_ip4, dst=self.pg1.remote_ip4),
330             nat_t_header=UDP(sport=4500, dport=4500),
331         )
332
333         # out2in - from public network to private
334         pkts = self.create_stream_encrypted(
335             self.tun_if.remote_mac,
336             self.tun_if.local_mac,
337             self.tun_if.remote_ip4,
338             self.pg1.remote_ip4,
339             vpp_tun_sa,
340         )
341         self.logger.info(ppc("Sending packets:", pkts))
342         self.tun_if.add_stream(pkts)
343         self.pg_enable_capture(self.pg_interfaces)
344         self.pg_start()
345         capture = self.pg1.get_capture(len(pkts))
346         self.verify_capture_plain(capture)