IPSEC-GRE; tests
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4 from scapy.layers.ipsec import ESP
5 from scapy.layers.l2 import Ether, Raw, GRE
6 from scapy.layers.inet import IP, UDP
7 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
8 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
9     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
10 from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
11     VppIpsecGRETunInterface
12 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
13 from vpp_ipsec import VppIpsecSA
14 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
15 from util import ppp
16
17
18 class TemplateIpsec4TunIfEsp(TemplateIpsec):
19     """ IPsec tunnel interface tests """
20
21     encryption_type = ESP
22
23     def setUp(self):
24         super(TemplateIpsec4TunIfEsp, self).setUp()
25
26         self.tun_if = self.pg0
27
28         p = self.ipv4_params
29
30         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
31                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
32                                         p.crypt_key, p.crypt_key,
33                                         p.auth_algo_vpp_id, p.auth_key,
34                                         p.auth_key)
35         p.tun_if.add_vpp_config()
36         p.tun_if.admin_up()
37         p.tun_if.config_ip4()
38         p.tun_if.config_ip6()
39
40         VppIpRoute(self, p.remote_tun_if_host, 32,
41                    [VppRoutePath(p.tun_if.remote_ip4,
42                                  0xffffffff)]).add_vpp_config()
43         VppIpRoute(self, p.remote_tun_if_host6, 128,
44                    [VppRoutePath(p.tun_if.remote_ip6,
45                                  0xffffffff,
46                                  proto=DpoProto.DPO_PROTO_IP6)],
47                    is_ip6=1).add_vpp_config()
48
49     def tearDown(self):
50         if not self.vpp_dead:
51             self.vapi.cli("show hardware")
52         super(TemplateIpsec4TunIfEsp, self).tearDown()
53
54
55 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
56     """ Ipsec ESP - TUN tests """
57     tun4_encrypt_node_name = "esp4-encrypt"
58     tun4_decrypt_node_name = "esp4-decrypt"
59
60     def test_tun_basic64(self):
61         """ ipsec 6o4 tunnel basic test """
62         self.verify_tun_64(self.params[socket.AF_INET], count=1)
63
64     def test_tun_burst64(self):
65         """ ipsec 6o4 tunnel basic test """
66         self.verify_tun_64(self.params[socket.AF_INET], count=257)
67
68     def test_tun_basic_frag44(self):
69         """ ipsec 4o4 tunnel frag basic test """
70         p = self.ipv4_params
71
72         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
73                                        [1500, 0, 0, 0])
74         self.verify_tun_44(self.params[socket.AF_INET],
75                            count=1, payload_size=1800, n_rx=2)
76         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
77                                        [9000, 0, 0, 0])
78
79
80 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
81     """ Ipsec ESP - TCP tests """
82     pass
83
84
85 class TemplateIpsec6TunIfEsp(TemplateIpsec):
86     """ IPsec tunnel interface tests """
87
88     encryption_type = ESP
89
90     def setUp(self):
91         super(TemplateIpsec6TunIfEsp, self).setUp()
92
93         self.tun_if = self.pg0
94
95         p = self.ipv6_params
96         tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97                                       p.scapy_tun_spi, p.crypt_algo_vpp_id,
98                                       p.crypt_key, p.crypt_key,
99                                       p.auth_algo_vpp_id, p.auth_key,
100                                       p.auth_key, is_ip6=True)
101         tun_if.add_vpp_config()
102         tun_if.admin_up()
103         tun_if.config_ip6()
104         tun_if.config_ip4()
105
106         VppIpRoute(self, p.remote_tun_if_host, 128,
107                    [VppRoutePath(tun_if.remote_ip6,
108                                  0xffffffff,
109                                  proto=DpoProto.DPO_PROTO_IP6)],
110                    is_ip6=1).add_vpp_config()
111         VppIpRoute(self, p.remote_tun_if_host4, 32,
112                    [VppRoutePath(tun_if.remote_ip4,
113                                  0xffffffff)]).add_vpp_config()
114
115     def tearDown(self):
116         if not self.vpp_dead:
117             self.vapi.cli("show hardware")
118         super(TemplateIpsec6TunIfEsp, self).tearDown()
119
120
121 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
122     """ Ipsec ESP - TUN tests """
123     tun6_encrypt_node_name = "esp6-encrypt"
124     tun6_decrypt_node_name = "esp6-decrypt"
125
126     def test_tun_basic46(self):
127         """ ipsec 4o6 tunnel basic test """
128         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
129
130     def test_tun_burst46(self):
131         """ ipsec 4o6 tunnel burst test """
132         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
133
134
135 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
136     """ IPsec IPv4 Multi Tunnel interface """
137
138     encryption_type = ESP
139     tun4_encrypt_node_name = "esp4-encrypt"
140     tun4_decrypt_node_name = "esp4-decrypt"
141
142     def setUp(self):
143         super(TestIpsec4MultiTunIfEsp, self).setUp()
144
145         self.tun_if = self.pg0
146
147         self.multi_params = []
148
149         for ii in range(10):
150             p = copy.copy(self.ipv4_params)
151
152             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
153             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
154             p.scapy_tun_spi = p.scapy_tun_spi + ii
155             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
156             p.vpp_tun_spi = p.vpp_tun_spi + ii
157
158             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
159             p.scapy_tra_spi = p.scapy_tra_spi + ii
160             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
161             p.vpp_tra_spi = p.vpp_tra_spi + ii
162
163             config_tun_params(p, self.encryption_type, self.tun_if)
164             self.multi_params.append(p)
165
166             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
167                                             p.scapy_tun_spi,
168                                             p.crypt_algo_vpp_id,
169                                             p.crypt_key, p.crypt_key,
170                                             p.auth_algo_vpp_id, p.auth_key,
171                                             p.auth_key)
172             p.tun_if.add_vpp_config()
173             p.tun_if.admin_up()
174             p.tun_if.config_ip4()
175
176             VppIpRoute(self, p.remote_tun_if_host, 32,
177                        [VppRoutePath(p.tun_if.remote_ip4,
178                                      0xffffffff)]).add_vpp_config()
179
180     def tearDown(self):
181         if not self.vpp_dead:
182             self.vapi.cli("show hardware")
183         super(TestIpsec4MultiTunIfEsp, self).tearDown()
184
185     def test_tun_44(self):
186         """Multiple IPSEC tunnel interfaces """
187         for p in self.multi_params:
188             self.verify_tun_44(p, count=127)
189             c = p.tun_if.get_rx_stats()
190             self.assertEqual(c['packets'], 127)
191             c = p.tun_if.get_tx_stats()
192             self.assertEqual(c['packets'], 127)
193
194
195 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
196     """ IPsec IPv6 Multi Tunnel interface """
197
198     encryption_type = ESP
199     tun6_encrypt_node_name = "esp6-encrypt"
200     tun6_decrypt_node_name = "esp6-decrypt"
201
202     def setUp(self):
203         super(TestIpsec6MultiTunIfEsp, self).setUp()
204
205         self.tun_if = self.pg0
206
207         self.multi_params = []
208
209         for ii in range(10):
210             p = copy.copy(self.ipv6_params)
211
212             p.remote_tun_if_host = "1111::%d" % (ii + 1)
213             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
214             p.scapy_tun_spi = p.scapy_tun_spi + ii
215             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
216             p.vpp_tun_spi = p.vpp_tun_spi + ii
217
218             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
219             p.scapy_tra_spi = p.scapy_tra_spi + ii
220             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
221             p.vpp_tra_spi = p.vpp_tra_spi + ii
222
223             config_tun_params(p, self.encryption_type, self.tun_if)
224             self.multi_params.append(p)
225
226             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
227                                             p.scapy_tun_spi,
228                                             p.crypt_algo_vpp_id,
229                                             p.crypt_key, p.crypt_key,
230                                             p.auth_algo_vpp_id, p.auth_key,
231                                             p.auth_key, is_ip6=True)
232             p.tun_if.add_vpp_config()
233             p.tun_if.admin_up()
234             p.tun_if.config_ip6()
235
236             VppIpRoute(self, p.remote_tun_if_host, 128,
237                        [VppRoutePath(p.tun_if.remote_ip6,
238                                      0xffffffff,
239                                      proto=DpoProto.DPO_PROTO_IP6)],
240                        is_ip6=1).add_vpp_config()
241
242     def tearDown(self):
243         if not self.vpp_dead:
244             self.vapi.cli("show hardware")
245         super(TestIpsec6MultiTunIfEsp, self).tearDown()
246
247     def test_tun_66(self):
248         """Multiple IPSEC tunnel interfaces """
249         for p in self.multi_params:
250             self.verify_tun_66(p, count=127)
251             c = p.tun_if.get_rx_stats()
252             self.assertEqual(c['packets'], 127)
253             c = p.tun_if.get_tx_stats()
254             self.assertEqual(c['packets'], 127)
255
256
257 class TemplateIpsecGRETunIfEsp(TemplateIpsec):
258     """ IPsec GRE tunnel interface tests """
259
260     encryption_type = ESP
261     omac = "00:11:22:33:44:55"
262
263     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
264                          payload_size=100):
265         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
266                 sa.encrypt(IP(src=self.pg0.remote_ip4,
267                               dst=self.pg0.local_ip4) /
268                            GRE() /
269                            Ether(dst=self.omac) /
270                            IP(src="1.1.1.1", dst="1.1.1.2") /
271                            UDP(sport=1144, dport=2233) /
272                            Raw('X' * payload_size))
273                 for i in range(count)]
274
275     def gen_pkts(self, sw_intf, src, dst, count=1,
276                  payload_size=100):
277         return [Ether(dst=self.omac) /
278                 IP(src="1.1.1.1", dst="1.1.1.2") /
279                 UDP(sport=1144, dport=2233) /
280                 Raw('X' * payload_size)
281                 for i in range(count)]
282
283     def verify_decrypted(self, p, rxs):
284         for rx in rxs:
285             self.assert_equal(rx[Ether].dst, self.omac)
286             self.assert_equal(rx[IP].dst, "1.1.1.2")
287
288     def verify_encrypted(self, p, sa, rxs):
289         for rx in rxs:
290             try:
291                 pkt = sa.decrypt(rx[IP])
292                 if not pkt.haslayer(IP):
293                     pkt = IP(pkt[Raw].load)
294                 self.assert_packet_checksums_valid(pkt)
295                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
296                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
297                 self.assertTrue(pkt.haslayer(GRE))
298                 e = pkt[Ether]
299                 self.assertEqual(e[Ether].dst, self.omac)
300                 self.assertEqual(e[IP].dst, "1.1.1.2")
301             except (IndexError, AssertionError):
302                 self.logger.debug(ppp("Unexpected packet:", rx))
303                 try:
304                     self.logger.debug(ppp("Decrypted packet:", pkt))
305                 except:
306                     pass
307                 raise
308
309     def setUp(self):
310         super(TemplateIpsecGRETunIfEsp, self).setUp()
311
312         self.tun_if = self.pg0
313
314         p = self.ipv4_params
315
316         bd1 = VppBridgeDomain(self, 1)
317         bd1.add_vpp_config()
318
319         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
320                                   p.auth_algo_vpp_id, p.auth_key,
321                                   p.crypt_algo_vpp_id, p.crypt_key,
322                                   self.vpp_esp_protocol,
323                                   self.pg0.local_ip4,
324                                   self.pg0.remote_ip4)
325         p.tun_sa_out.add_vpp_config()
326
327         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
328                                  p.auth_algo_vpp_id, p.auth_key,
329                                  p.crypt_algo_vpp_id, p.crypt_key,
330                                  self.vpp_esp_protocol,
331                                  self.pg0.remote_ip4,
332                                  self.pg0.local_ip4)
333         p.tun_sa_in.add_vpp_config()
334
335         self.tun = VppIpsecGRETunInterface(self, self.pg0,
336                                            p.tun_sa_out.id,
337                                            p.tun_sa_in.id)
338
339         self.tun.add_vpp_config()
340         self.tun.admin_up()
341         self.tun.config_ip4()
342
343         VppIpRoute(self, p.remote_tun_if_host, 32,
344                    [VppRoutePath(self.tun.remote_ip4,
345                                  0xffffffff)]).add_vpp_config()
346         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
347         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
348
349     def tearDown(self):
350         if not self.vpp_dead:
351             self.vapi.cli("show hardware")
352         self.tun.unconfig_ip4()
353         super(TemplateIpsecGRETunIfEsp, self).tearDown()
354
355
356 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
357                  "test doesn't work on aarch64")
358 class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
359     """ Ipsec GRE ESP - TUN tests """
360     tun4_encrypt_node_name = "esp4-encrypt"
361     tun4_decrypt_node_name = "esp4-decrypt"
362
363 if __name__ == '__main__':
364     unittest.main(testRunner=VppTestRunner)