Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4 from scapy.layers.ipsec import ESP
5 from scapy.layers.l2 import Ether, Raw, GRE
6 from scapy.layers.inet import IP, UDP
7 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
8 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
9     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
10 from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
11     VppIpsecGRETunInterface
12 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
13 from vpp_ipsec import VppIpsecSA
14 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
15 from util import ppp
16
17
18 class TemplateIpsec4TunIfEsp(TemplateIpsec):
19     """ IPsec tunnel interface tests """
20
21     encryption_type = ESP
22
23     @classmethod
24     def setUpClass(cls):
25         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
26
27     @classmethod
28     def tearDownClass(cls):
29         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
30
31     def setUp(self):
32         super(TemplateIpsec4TunIfEsp, self).setUp()
33
34         self.tun_if = self.pg0
35
36         p = self.ipv4_params
37
38         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
39                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
40                                         p.crypt_key, p.crypt_key,
41                                         p.auth_algo_vpp_id, p.auth_key,
42                                         p.auth_key)
43         p.tun_if.add_vpp_config()
44         p.tun_if.admin_up()
45         p.tun_if.config_ip4()
46         p.tun_if.config_ip6()
47
48         VppIpRoute(self, p.remote_tun_if_host, 32,
49                    [VppRoutePath(p.tun_if.remote_ip4,
50                                  0xffffffff)]).add_vpp_config()
51         VppIpRoute(self, p.remote_tun_if_host6, 128,
52                    [VppRoutePath(p.tun_if.remote_ip6,
53                                  0xffffffff,
54                                  proto=DpoProto.DPO_PROTO_IP6)],
55                    is_ip6=1).add_vpp_config()
56
57     def tearDown(self):
58         if not self.vpp_dead:
59             self.vapi.cli("show hardware")
60         super(TemplateIpsec4TunIfEsp, self).tearDown()
61
62
63 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
64     """ Ipsec ESP - TUN tests """
65     tun4_encrypt_node_name = "esp4-encrypt"
66     tun4_decrypt_node_name = "esp4-decrypt"
67
68     def test_tun_basic64(self):
69         """ ipsec 6o4 tunnel basic test """
70         self.verify_tun_64(self.params[socket.AF_INET], count=1)
71
72     def test_tun_burst64(self):
73         """ ipsec 6o4 tunnel basic test """
74         self.verify_tun_64(self.params[socket.AF_INET], count=257)
75
76     def test_tun_basic_frag44(self):
77         """ ipsec 4o4 tunnel frag basic test """
78         p = self.ipv4_params
79
80         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
81                                        [1500, 0, 0, 0])
82         self.verify_tun_44(self.params[socket.AF_INET],
83                            count=1, payload_size=1800, n_rx=2)
84         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
85                                        [9000, 0, 0, 0])
86
87
88 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
89     """ Ipsec ESP - TCP tests """
90     pass
91
92
93 class TemplateIpsec6TunIfEsp(TemplateIpsec):
94     """ IPsec tunnel interface tests """
95
96     encryption_type = ESP
97
98     def setUp(self):
99         super(TemplateIpsec6TunIfEsp, self).setUp()
100
101         self.tun_if = self.pg0
102
103         p = self.ipv6_params
104         tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
105                                       p.scapy_tun_spi, p.crypt_algo_vpp_id,
106                                       p.crypt_key, p.crypt_key,
107                                       p.auth_algo_vpp_id, p.auth_key,
108                                       p.auth_key, is_ip6=True)
109         tun_if.add_vpp_config()
110         tun_if.admin_up()
111         tun_if.config_ip6()
112         tun_if.config_ip4()
113
114         VppIpRoute(self, p.remote_tun_if_host, 128,
115                    [VppRoutePath(tun_if.remote_ip6,
116                                  0xffffffff,
117                                  proto=DpoProto.DPO_PROTO_IP6)],
118                    is_ip6=1).add_vpp_config()
119         VppIpRoute(self, p.remote_tun_if_host4, 32,
120                    [VppRoutePath(tun_if.remote_ip4,
121                                  0xffffffff)]).add_vpp_config()
122
123     def tearDown(self):
124         if not self.vpp_dead:
125             self.vapi.cli("show hardware")
126         super(TemplateIpsec6TunIfEsp, self).tearDown()
127
128
129 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
130     """ Ipsec ESP - TUN tests """
131     tun6_encrypt_node_name = "esp6-encrypt"
132     tun6_decrypt_node_name = "esp6-decrypt"
133
134     def test_tun_basic46(self):
135         """ ipsec 4o6 tunnel basic test """
136         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
137
138     def test_tun_burst46(self):
139         """ ipsec 4o6 tunnel burst test """
140         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
141
142
143 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
144     """ IPsec IPv4 Multi Tunnel interface """
145
146     encryption_type = ESP
147     tun4_encrypt_node_name = "esp4-encrypt"
148     tun4_decrypt_node_name = "esp4-decrypt"
149
150     def setUp(self):
151         super(TestIpsec4MultiTunIfEsp, self).setUp()
152
153         self.tun_if = self.pg0
154
155         self.multi_params = []
156
157         for ii in range(10):
158             p = copy.copy(self.ipv4_params)
159
160             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
161             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
162             p.scapy_tun_spi = p.scapy_tun_spi + ii
163             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
164             p.vpp_tun_spi = p.vpp_tun_spi + ii
165
166             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
167             p.scapy_tra_spi = p.scapy_tra_spi + ii
168             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
169             p.vpp_tra_spi = p.vpp_tra_spi + ii
170
171             config_tun_params(p, self.encryption_type, self.tun_if)
172             self.multi_params.append(p)
173
174             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
175                                             p.scapy_tun_spi,
176                                             p.crypt_algo_vpp_id,
177                                             p.crypt_key, p.crypt_key,
178                                             p.auth_algo_vpp_id, p.auth_key,
179                                             p.auth_key)
180             p.tun_if.add_vpp_config()
181             p.tun_if.admin_up()
182             p.tun_if.config_ip4()
183
184             VppIpRoute(self, p.remote_tun_if_host, 32,
185                        [VppRoutePath(p.tun_if.remote_ip4,
186                                      0xffffffff)]).add_vpp_config()
187
188     def tearDown(self):
189         if not self.vpp_dead:
190             self.vapi.cli("show hardware")
191         super(TestIpsec4MultiTunIfEsp, self).tearDown()
192
193     def test_tun_44(self):
194         """Multiple IPSEC tunnel interfaces """
195         for p in self.multi_params:
196             self.verify_tun_44(p, count=127)
197             c = p.tun_if.get_rx_stats()
198             self.assertEqual(c['packets'], 127)
199             c = p.tun_if.get_tx_stats()
200             self.assertEqual(c['packets'], 127)
201
202
203 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
204     """ IPsec IPv6 Multi Tunnel interface """
205
206     encryption_type = ESP
207     tun6_encrypt_node_name = "esp6-encrypt"
208     tun6_decrypt_node_name = "esp6-decrypt"
209
210     def setUp(self):
211         super(TestIpsec6MultiTunIfEsp, self).setUp()
212
213         self.tun_if = self.pg0
214
215         self.multi_params = []
216
217         for ii in range(10):
218             p = copy.copy(self.ipv6_params)
219
220             p.remote_tun_if_host = "1111::%d" % (ii + 1)
221             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
222             p.scapy_tun_spi = p.scapy_tun_spi + ii
223             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
224             p.vpp_tun_spi = p.vpp_tun_spi + ii
225
226             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
227             p.scapy_tra_spi = p.scapy_tra_spi + ii
228             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
229             p.vpp_tra_spi = p.vpp_tra_spi + ii
230
231             config_tun_params(p, self.encryption_type, self.tun_if)
232             self.multi_params.append(p)
233
234             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
235                                             p.scapy_tun_spi,
236                                             p.crypt_algo_vpp_id,
237                                             p.crypt_key, p.crypt_key,
238                                             p.auth_algo_vpp_id, p.auth_key,
239                                             p.auth_key, is_ip6=True)
240             p.tun_if.add_vpp_config()
241             p.tun_if.admin_up()
242             p.tun_if.config_ip6()
243
244             VppIpRoute(self, p.remote_tun_if_host, 128,
245                        [VppRoutePath(p.tun_if.remote_ip6,
246                                      0xffffffff,
247                                      proto=DpoProto.DPO_PROTO_IP6)],
248                        is_ip6=1).add_vpp_config()
249
250     def tearDown(self):
251         if not self.vpp_dead:
252             self.vapi.cli("show hardware")
253         super(TestIpsec6MultiTunIfEsp, self).tearDown()
254
255     def test_tun_66(self):
256         """Multiple IPSEC tunnel interfaces """
257         for p in self.multi_params:
258             self.verify_tun_66(p, count=127)
259             c = p.tun_if.get_rx_stats()
260             self.assertEqual(c['packets'], 127)
261             c = p.tun_if.get_tx_stats()
262             self.assertEqual(c['packets'], 127)
263
264
265 class TemplateIpsecGRETunIfEsp(TemplateIpsec):
266     """ IPsec GRE tunnel interface tests """
267
268     encryption_type = ESP
269     omac = "00:11:22:33:44:55"
270
271     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
272                          payload_size=100):
273         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
274                 sa.encrypt(IP(src=self.pg0.remote_ip4,
275                               dst=self.pg0.local_ip4) /
276                            GRE() /
277                            Ether(dst=self.omac) /
278                            IP(src="1.1.1.1", dst="1.1.1.2") /
279                            UDP(sport=1144, dport=2233) /
280                            Raw('X' * payload_size))
281                 for i in range(count)]
282
283     def gen_pkts(self, sw_intf, src, dst, count=1,
284                  payload_size=100):
285         return [Ether(dst=self.omac) /
286                 IP(src="1.1.1.1", dst="1.1.1.2") /
287                 UDP(sport=1144, dport=2233) /
288                 Raw('X' * payload_size)
289                 for i in range(count)]
290
291     def verify_decrypted(self, p, rxs):
292         for rx in rxs:
293             self.assert_equal(rx[Ether].dst, self.omac)
294             self.assert_equal(rx[IP].dst, "1.1.1.2")
295
296     def verify_encrypted(self, p, sa, rxs):
297         for rx in rxs:
298             try:
299                 pkt = sa.decrypt(rx[IP])
300                 if not pkt.haslayer(IP):
301                     pkt = IP(pkt[Raw].load)
302                 self.assert_packet_checksums_valid(pkt)
303                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
304                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
305                 self.assertTrue(pkt.haslayer(GRE))
306                 e = pkt[Ether]
307                 self.assertEqual(e[Ether].dst, self.omac)
308                 self.assertEqual(e[IP].dst, "1.1.1.2")
309             except (IndexError, AssertionError):
310                 self.logger.debug(ppp("Unexpected packet:", rx))
311                 try:
312                     self.logger.debug(ppp("Decrypted packet:", pkt))
313                 except:
314                     pass
315                 raise
316
317     def setUp(self):
318         super(TemplateIpsecGRETunIfEsp, self).setUp()
319
320         self.tun_if = self.pg0
321
322         p = self.ipv4_params
323
324         bd1 = VppBridgeDomain(self, 1)
325         bd1.add_vpp_config()
326
327         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
328                                   p.auth_algo_vpp_id, p.auth_key,
329                                   p.crypt_algo_vpp_id, p.crypt_key,
330                                   self.vpp_esp_protocol,
331                                   self.pg0.local_ip4,
332                                   self.pg0.remote_ip4)
333         p.tun_sa_out.add_vpp_config()
334
335         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
336                                  p.auth_algo_vpp_id, p.auth_key,
337                                  p.crypt_algo_vpp_id, p.crypt_key,
338                                  self.vpp_esp_protocol,
339                                  self.pg0.remote_ip4,
340                                  self.pg0.local_ip4)
341         p.tun_sa_in.add_vpp_config()
342
343         self.tun = VppIpsecGRETunInterface(self, self.pg0,
344                                            p.tun_sa_out.id,
345                                            p.tun_sa_in.id)
346
347         self.tun.add_vpp_config()
348         self.tun.admin_up()
349         self.tun.config_ip4()
350
351         VppIpRoute(self, p.remote_tun_if_host, 32,
352                    [VppRoutePath(self.tun.remote_ip4,
353                                  0xffffffff)]).add_vpp_config()
354         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
355         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
356
357     def tearDown(self):
358         if not self.vpp_dead:
359             self.vapi.cli("show hardware")
360         self.tun.unconfig_ip4()
361         super(TemplateIpsecGRETunIfEsp, self).tearDown()
362
363
364 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
365                  "test doesn't work on aarch64")
366 class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
367     """ Ipsec GRE ESP - TUN tests """
368     tun4_encrypt_node_name = "esp4-encrypt"
369     tun4_decrypt_node_name = "esp4-decrypt"
370
371 if __name__ == '__main__':
372     unittest.main(testRunner=VppTestRunner)