tests: Re-enable ipsec tests on ARM
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 from util import ppp
19 from vpp_papi import VppEnum
20
21
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23     """ IPsec tunnel interface tests """
24
25     encryption_type = ESP
26
27     @classmethod
28     def setUpClass(cls):
29         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
34
35     def setUp(self):
36         super(TemplateIpsec4TunIfEsp, self).setUp()
37
38         self.tun_if = self.pg0
39
40         p = self.ipv4_params
41
42         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
44                                         p.crypt_key, p.crypt_key,
45                                         p.auth_algo_vpp_id, p.auth_key,
46                                         p.auth_key)
47         p.tun_if.add_vpp_config()
48         p.tun_if.admin_up()
49         p.tun_if.config_ip4()
50         p.tun_if.config_ip6()
51
52         r = VppIpRoute(self, p.remote_tun_if_host, 32,
53                        [VppRoutePath(p.tun_if.remote_ip4,
54                                      0xffffffff)])
55         r.add_vpp_config()
56         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57                        [VppRoutePath(p.tun_if.remote_ip6,
58                                      0xffffffff,
59                                      proto=DpoProto.DPO_PROTO_IP6)])
60         r.add_vpp_config()
61
62     def tearDown(self):
63         super(TemplateIpsec4TunIfEsp, self).tearDown()
64
65
66 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
67     """ Ipsec ESP - TUN tests """
68     tun4_encrypt_node_name = "esp4-encrypt-tun"
69     tun4_decrypt_node_name = "esp4-decrypt"
70
71     def test_tun_basic64(self):
72         """ ipsec 6o4 tunnel basic test """
73         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
74
75         self.verify_tun_64(self.params[socket.AF_INET], count=1)
76
77     def test_tun_burst64(self):
78         """ ipsec 6o4 tunnel basic test """
79         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
80
81         self.verify_tun_64(self.params[socket.AF_INET], count=257)
82
83     def test_tun_basic_frag44(self):
84         """ ipsec 4o4 tunnel frag basic test """
85         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
86
87         p = self.ipv4_params
88
89         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
90                                        [1500, 0, 0, 0])
91         self.verify_tun_44(self.params[socket.AF_INET],
92                            count=1, payload_size=1800, n_rx=2)
93         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
94                                        [9000, 0, 0, 0])
95
96
97 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
98     """ Ipsec ESP - TCP tests """
99     pass
100
101
102 class TemplateIpsec6TunIfEsp(TemplateIpsec):
103     """ IPsec tunnel interface tests """
104
105     encryption_type = ESP
106
107     def setUp(self):
108         super(TemplateIpsec6TunIfEsp, self).setUp()
109
110         self.tun_if = self.pg0
111
112         p = self.ipv6_params
113         tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
114                                       p.scapy_tun_spi, p.crypt_algo_vpp_id,
115                                       p.crypt_key, p.crypt_key,
116                                       p.auth_algo_vpp_id, p.auth_key,
117                                       p.auth_key, is_ip6=True)
118         tun_if.add_vpp_config()
119         tun_if.admin_up()
120         tun_if.config_ip6()
121         tun_if.config_ip4()
122
123         r = VppIpRoute(self, p.remote_tun_if_host, 128,
124                        [VppRoutePath(tun_if.remote_ip6,
125                                      0xffffffff,
126                                      proto=DpoProto.DPO_PROTO_IP6)])
127         r.add_vpp_config()
128         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
129                        [VppRoutePath(tun_if.remote_ip4,
130                                      0xffffffff)])
131         r.add_vpp_config()
132
133     def tearDown(self):
134         super(TemplateIpsec6TunIfEsp, self).tearDown()
135
136
137 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
138     """ Ipsec ESP - TUN tests """
139     tun6_encrypt_node_name = "esp6-encrypt-tun"
140     tun6_decrypt_node_name = "esp6-decrypt"
141
142     def test_tun_basic46(self):
143         """ ipsec 4o6 tunnel basic test """
144         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
145         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
146
147     def test_tun_burst46(self):
148         """ ipsec 4o6 tunnel burst test """
149         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
150         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
151
152
153 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
154     """ IPsec IPv4 Multi Tunnel interface """
155
156     encryption_type = ESP
157     tun4_encrypt_node_name = "esp4-encrypt-tun"
158     tun4_decrypt_node_name = "esp4-decrypt"
159
160     def setUp(self):
161         super(TestIpsec4MultiTunIfEsp, self).setUp()
162
163         self.tun_if = self.pg0
164
165         self.multi_params = []
166
167         for ii in range(10):
168             p = copy.copy(self.ipv4_params)
169
170             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
171             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
172             p.scapy_tun_spi = p.scapy_tun_spi + ii
173             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
174             p.vpp_tun_spi = p.vpp_tun_spi + ii
175
176             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
177             p.scapy_tra_spi = p.scapy_tra_spi + ii
178             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
179             p.vpp_tra_spi = p.vpp_tra_spi + ii
180
181             config_tun_params(p, self.encryption_type, self.tun_if)
182             self.multi_params.append(p)
183
184             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
185                                             p.scapy_tun_spi,
186                                             p.crypt_algo_vpp_id,
187                                             p.crypt_key, p.crypt_key,
188                                             p.auth_algo_vpp_id, p.auth_key,
189                                             p.auth_key)
190             p.tun_if.add_vpp_config()
191             p.tun_if.admin_up()
192             p.tun_if.config_ip4()
193
194             VppIpRoute(self, p.remote_tun_if_host, 32,
195                        [VppRoutePath(p.tun_if.remote_ip4,
196                                      0xffffffff)]).add_vpp_config()
197
198     def tearDown(self):
199         super(TestIpsec4MultiTunIfEsp, self).tearDown()
200
201     def test_tun_44(self):
202         """Multiple IPSEC tunnel interfaces """
203         for p in self.multi_params:
204             self.verify_tun_44(p, count=127)
205             c = p.tun_if.get_rx_stats()
206             self.assertEqual(c['packets'], 127)
207             c = p.tun_if.get_tx_stats()
208             self.assertEqual(c['packets'], 127)
209
210
211 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
212     """ IPsec IPv4 Tunnel interface all Algos """
213
214     encryption_type = ESP
215     tun4_encrypt_node_name = "esp4-encrypt-tun"
216     tun4_decrypt_node_name = "esp4-decrypt"
217
218     def config_network(self, p):
219         config_tun_params(p, self.encryption_type, self.tun_if)
220
221         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
222                                         p.scapy_tun_spi,
223                                         p.crypt_algo_vpp_id,
224                                         p.crypt_key, p.crypt_key,
225                                         p.auth_algo_vpp_id, p.auth_key,
226                                         p.auth_key,
227                                         salt=p.salt)
228         p.tun_if.add_vpp_config()
229         p.tun_if.admin_up()
230         p.tun_if.config_ip4()
231         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
232         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
233
234         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
235                              [VppRoutePath(p.tun_if.remote_ip4,
236                                            0xffffffff)])
237         p.route.add_vpp_config()
238
239     def unconfig_network(self, p):
240         p.tun_if.unconfig_ip4()
241         p.tun_if.remove_vpp_config()
242         p.route.remove_vpp_config()
243
244     def setUp(self):
245         super(TestIpsec4TunIfEspAll, self).setUp()
246
247         self.tun_if = self.pg0
248
249     def tearDown(self):
250         super(TestIpsec4TunIfEspAll, self).tearDown()
251
252     def rekey(self, p):
253         #
254         # change the key and the SPI
255         #
256         p.crypt_key = 'X' + p.crypt_key[1:]
257         p.scapy_tun_spi += 1
258         p.scapy_tun_sa_id += 1
259         p.vpp_tun_spi += 1
260         p.vpp_tun_sa_id += 1
261         p.tun_if.local_spi = p.vpp_tun_spi
262         p.tun_if.remote_spi = p.scapy_tun_spi
263
264         config_tun_params(p, self.encryption_type, self.tun_if)
265
266         p.tun_sa_in = VppIpsecSA(self,
267                                  p.scapy_tun_sa_id,
268                                  p.scapy_tun_spi,
269                                  p.auth_algo_vpp_id,
270                                  p.auth_key,
271                                  p.crypt_algo_vpp_id,
272                                  p.crypt_key,
273                                  self.vpp_esp_protocol,
274                                  self.tun_if.local_addr[p.addr_type],
275                                  self.tun_if.remote_addr[p.addr_type],
276                                  flags=p.flags,
277                                  salt=p.salt)
278         p.tun_sa_out = VppIpsecSA(self,
279                                   p.vpp_tun_sa_id,
280                                   p.vpp_tun_spi,
281                                   p.auth_algo_vpp_id,
282                                   p.auth_key,
283                                   p.crypt_algo_vpp_id,
284                                   p.crypt_key,
285                                   self.vpp_esp_protocol,
286                                   self.tun_if.remote_addr[p.addr_type],
287                                   self.tun_if.local_addr[p.addr_type],
288                                   flags=p.flags,
289                                   salt=p.salt)
290         p.tun_sa_in.add_vpp_config()
291         p.tun_sa_out.add_vpp_config()
292
293         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
294                                          sa_id=p.tun_sa_in.id,
295                                          is_outbound=1)
296         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
297                                          sa_id=p.tun_sa_out.id,
298                                          is_outbound=0)
299         self.logger.info(self.vapi.cli("sh ipsec sa"))
300
301     def test_tun_44(self):
302         """IPSEC tunnel all algos """
303
304         # foreach VPP crypto engine
305         engines = ["ia32", "ipsecmb", "openssl"]
306
307         # foreach crypto algorithm
308         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
309                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
310                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
311                                 IPSEC_API_INTEG_ALG_NONE),
312                   'scapy-crypto': "AES-GCM",
313                   'scapy-integ': "NULL",
314                   'key': "JPjyOWBeVEQiMe7h",
315                   'salt': 3333},
316                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
317                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
318                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
319                                 IPSEC_API_INTEG_ALG_NONE),
320                   'scapy-crypto': "AES-GCM",
321                   'scapy-integ': "NULL",
322                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
323                   'salt': 0},
324                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
325                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
326                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
327                                 IPSEC_API_INTEG_ALG_NONE),
328                   'scapy-crypto': "AES-GCM",
329                   'scapy-integ': "NULL",
330                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
331                   'salt': 9999},
332                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
333                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
334                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
335                                 IPSEC_API_INTEG_ALG_SHA1_96),
336                   'scapy-crypto': "AES-CBC",
337                   'scapy-integ': "HMAC-SHA1-96",
338                   'salt': 0,
339                   'key': "JPjyOWBeVEQiMe7h"},
340                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
341                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
342                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
343                                 IPSEC_API_INTEG_ALG_SHA1_96),
344                   'scapy-crypto': "AES-CBC",
345                   'scapy-integ': "HMAC-SHA1-96",
346                   'salt': 0,
347                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
348                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
349                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
350                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
351                                 IPSEC_API_INTEG_ALG_SHA1_96),
352                   'scapy-crypto': "AES-CBC",
353                   'scapy-integ': "HMAC-SHA1-96",
354                   'salt': 0,
355                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
356
357         for engine in engines:
358             self.vapi.cli("set crypto handler all %s" % engine)
359
360             #
361             # loop through each of the algorithms
362             #
363             for algo in algos:
364                 # with self.subTest(algo=algo['scapy']):
365
366                 p = copy.copy(self.ipv4_params)
367                 p.auth_algo_vpp_id = algo['vpp-integ']
368                 p.crypt_algo_vpp_id = algo['vpp-crypto']
369                 p.crypt_algo = algo['scapy-crypto']
370                 p.auth_algo = algo['scapy-integ']
371                 p.crypt_key = algo['key']
372                 p.salt = algo['salt']
373
374                 self.config_network(p)
375
376                 self.verify_tun_44(p, count=127)
377                 c = p.tun_if.get_rx_stats()
378                 self.assertEqual(c['packets'], 127)
379                 c = p.tun_if.get_tx_stats()
380                 self.assertEqual(c['packets'], 127)
381
382                 #
383                 # rekey the tunnel
384                 #
385                 self.rekey(p)
386                 self.verify_tun_44(p, count=127)
387
388                 self.unconfig_network(p)
389                 p.tun_sa_out.remove_vpp_config()
390                 p.tun_sa_in.remove_vpp_config()
391
392
393 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
394     """ IPsec IPv6 Multi Tunnel interface """
395
396     encryption_type = ESP
397     tun6_encrypt_node_name = "esp6-encrypt-tun"
398     tun6_decrypt_node_name = "esp6-decrypt"
399
400     def setUp(self):
401         super(TestIpsec6MultiTunIfEsp, self).setUp()
402
403         self.tun_if = self.pg0
404
405         self.multi_params = []
406
407         for ii in range(10):
408             p = copy.copy(self.ipv6_params)
409
410             p.remote_tun_if_host = "1111::%d" % (ii + 1)
411             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
412             p.scapy_tun_spi = p.scapy_tun_spi + ii
413             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
414             p.vpp_tun_spi = p.vpp_tun_spi + ii
415
416             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
417             p.scapy_tra_spi = p.scapy_tra_spi + ii
418             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
419             p.vpp_tra_spi = p.vpp_tra_spi + ii
420
421             config_tun_params(p, self.encryption_type, self.tun_if)
422             self.multi_params.append(p)
423
424             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
425                                             p.scapy_tun_spi,
426                                             p.crypt_algo_vpp_id,
427                                             p.crypt_key, p.crypt_key,
428                                             p.auth_algo_vpp_id, p.auth_key,
429                                             p.auth_key, is_ip6=True)
430             p.tun_if.add_vpp_config()
431             p.tun_if.admin_up()
432             p.tun_if.config_ip6()
433
434             r = VppIpRoute(self, p.remote_tun_if_host, 128,
435                            [VppRoutePath(p.tun_if.remote_ip6,
436                                          0xffffffff,
437                                          proto=DpoProto.DPO_PROTO_IP6)])
438             r.add_vpp_config()
439
440     def tearDown(self):
441         super(TestIpsec6MultiTunIfEsp, self).tearDown()
442
443     def test_tun_66(self):
444         """Multiple IPSEC tunnel interfaces """
445         for p in self.multi_params:
446             self.verify_tun_66(p, count=127)
447             c = p.tun_if.get_rx_stats()
448             self.assertEqual(c['packets'], 127)
449             c = p.tun_if.get_tx_stats()
450             self.assertEqual(c['packets'], 127)
451
452
453 class TestIpsecGreTebIfEsp(TemplateIpsec,
454                            IpsecTun4Tests):
455     """ Ipsec GRE TEB ESP - TUN tests """
456     tun4_encrypt_node_name = "esp4-encrypt-tun"
457     tun4_decrypt_node_name = "esp4-decrypt-tun"
458     encryption_type = ESP
459     omac = "00:11:22:33:44:55"
460
461     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
462                          payload_size=100):
463         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
464                 sa.encrypt(IP(src=self.pg0.remote_ip4,
465                               dst=self.pg0.local_ip4) /
466                            GRE() /
467                            Ether(dst=self.omac) /
468                            IP(src="1.1.1.1", dst="1.1.1.2") /
469                            UDP(sport=1144, dport=2233) /
470                            Raw('X' * payload_size))
471                 for i in range(count)]
472
473     def gen_pkts(self, sw_intf, src, dst, count=1,
474                  payload_size=100):
475         return [Ether(dst=self.omac) /
476                 IP(src="1.1.1.1", dst="1.1.1.2") /
477                 UDP(sport=1144, dport=2233) /
478                 Raw('X' * payload_size)
479                 for i in range(count)]
480
481     def verify_decrypted(self, p, rxs):
482         for rx in rxs:
483             self.assert_equal(rx[Ether].dst, self.omac)
484             self.assert_equal(rx[IP].dst, "1.1.1.2")
485
486     def verify_encrypted(self, p, sa, rxs):
487         for rx in rxs:
488             try:
489                 pkt = sa.decrypt(rx[IP])
490                 if not pkt.haslayer(IP):
491                     pkt = IP(pkt[Raw].load)
492                 self.assert_packet_checksums_valid(pkt)
493                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
494                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
495                 self.assertTrue(pkt.haslayer(GRE))
496                 e = pkt[Ether]
497                 self.assertEqual(e[Ether].dst, self.omac)
498                 self.assertEqual(e[IP].dst, "1.1.1.2")
499             except (IndexError, AssertionError):
500                 self.logger.debug(ppp("Unexpected packet:", rx))
501                 try:
502                     self.logger.debug(ppp("Decrypted packet:", pkt))
503                 except:
504                     pass
505                 raise
506
507     def setUp(self):
508         super(TestIpsecGreTebIfEsp, self).setUp()
509
510         self.tun_if = self.pg0
511
512         p = self.ipv4_params
513
514         bd1 = VppBridgeDomain(self, 1)
515         bd1.add_vpp_config()
516
517         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
518                                   p.auth_algo_vpp_id, p.auth_key,
519                                   p.crypt_algo_vpp_id, p.crypt_key,
520                                   self.vpp_esp_protocol,
521                                   self.pg0.local_ip4,
522                                   self.pg0.remote_ip4)
523         p.tun_sa_out.add_vpp_config()
524
525         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
526                                  p.auth_algo_vpp_id, p.auth_key,
527                                  p.crypt_algo_vpp_id, p.crypt_key,
528                                  self.vpp_esp_protocol,
529                                  self.pg0.remote_ip4,
530                                  self.pg0.local_ip4)
531         p.tun_sa_in.add_vpp_config()
532
533         self.tun = VppGreInterface(self,
534                                    self.pg0.local_ip4,
535                                    self.pg0.remote_ip4,
536                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
537                                          GRE_API_TUNNEL_TYPE_TEB))
538         self.tun.add_vpp_config()
539
540         p.tun_protect = VppIpsecTunProtect(self,
541                                            self.tun,
542                                            p.tun_sa_out,
543                                            [p.tun_sa_in])
544
545         p.tun_protect.add_vpp_config()
546
547         self.tun.admin_up()
548         self.tun.config_ip4()
549
550         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
551         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
552
553         self.vapi.cli("clear ipsec sa")
554
555     def tearDown(self):
556         self.tun.unconfig_ip4()
557         super(TestIpsecGreTebIfEsp, self).tearDown()
558
559
560 class TestIpsecGreIfEsp(TemplateIpsec,
561                         IpsecTun4Tests):
562     """ Ipsec GRE ESP - TUN tests """
563     tun4_encrypt_node_name = "esp4-encrypt-tun"
564     tun4_decrypt_node_name = "esp4-decrypt-tun"
565     encryption_type = ESP
566
567     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
568                          payload_size=100):
569         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
570                 sa.encrypt(IP(src=self.pg0.remote_ip4,
571                               dst=self.pg0.local_ip4) /
572                            GRE() /
573                            IP(src=self.pg1.local_ip4,
574                               dst=self.pg1.remote_ip4) /
575                            UDP(sport=1144, dport=2233) /
576                            Raw('X' * payload_size))
577                 for i in range(count)]
578
579     def gen_pkts(self, sw_intf, src, dst, count=1,
580                  payload_size=100):
581         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
582                 IP(src="1.1.1.1", dst="1.1.1.2") /
583                 UDP(sport=1144, dport=2233) /
584                 Raw('X' * payload_size)
585                 for i in range(count)]
586
587     def verify_decrypted(self, p, rxs):
588         for rx in rxs:
589             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
590             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
591
592     def verify_encrypted(self, p, sa, rxs):
593         for rx in rxs:
594             try:
595                 pkt = sa.decrypt(rx[IP])
596                 if not pkt.haslayer(IP):
597                     pkt = IP(pkt[Raw].load)
598                 self.assert_packet_checksums_valid(pkt)
599                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
600                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
601                 self.assertTrue(pkt.haslayer(GRE))
602                 e = pkt[GRE]
603                 self.assertEqual(e[IP].dst, "1.1.1.2")
604             except (IndexError, AssertionError):
605                 self.logger.debug(ppp("Unexpected packet:", rx))
606                 try:
607                     self.logger.debug(ppp("Decrypted packet:", pkt))
608                 except:
609                     pass
610                 raise
611
612     def setUp(self):
613         super(TestIpsecGreIfEsp, self).setUp()
614
615         self.tun_if = self.pg0
616
617         p = self.ipv4_params
618
619         bd1 = VppBridgeDomain(self, 1)
620         bd1.add_vpp_config()
621
622         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
623                                   p.auth_algo_vpp_id, p.auth_key,
624                                   p.crypt_algo_vpp_id, p.crypt_key,
625                                   self.vpp_esp_protocol,
626                                   self.pg0.local_ip4,
627                                   self.pg0.remote_ip4)
628         p.tun_sa_out.add_vpp_config()
629
630         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
631                                  p.auth_algo_vpp_id, p.auth_key,
632                                  p.crypt_algo_vpp_id, p.crypt_key,
633                                  self.vpp_esp_protocol,
634                                  self.pg0.remote_ip4,
635                                  self.pg0.local_ip4)
636         p.tun_sa_in.add_vpp_config()
637
638         self.tun = VppGreInterface(self,
639                                    self.pg0.local_ip4,
640                                    self.pg0.remote_ip4)
641         self.tun.add_vpp_config()
642
643         p.tun_protect = VppIpsecTunProtect(self,
644                                            self.tun,
645                                            p.tun_sa_out,
646                                            [p.tun_sa_in])
647         p.tun_protect.add_vpp_config()
648
649         self.tun.admin_up()
650         self.tun.config_ip4()
651
652         VppIpRoute(self, "1.1.1.2", 32,
653                    [VppRoutePath(self.tun.remote_ip4,
654                                  0xffffffff)]).add_vpp_config()
655
656     def tearDown(self):
657         self.tun.unconfig_ip4()
658         super(TestIpsecGreIfEsp, self).tearDown()
659
660
661 class TemplateIpsec4TunProtect(object):
662     """ IPsec IPv4 Tunnel protect """
663
664     def config_sa_tra(self, p):
665         config_tun_params(p, self.encryption_type, self.tun_if)
666
667         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
668                                   p.auth_algo_vpp_id, p.auth_key,
669                                   p.crypt_algo_vpp_id, p.crypt_key,
670                                   self.vpp_esp_protocol)
671         p.tun_sa_out.add_vpp_config()
672
673         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
674                                  p.auth_algo_vpp_id, p.auth_key,
675                                  p.crypt_algo_vpp_id, p.crypt_key,
676                                  self.vpp_esp_protocol)
677         p.tun_sa_in.add_vpp_config()
678
679     def config_sa_tun(self, p):
680         config_tun_params(p, self.encryption_type, self.tun_if)
681
682         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
683                                   p.auth_algo_vpp_id, p.auth_key,
684                                   p.crypt_algo_vpp_id, p.crypt_key,
685                                   self.vpp_esp_protocol,
686                                   self.tun_if.remote_addr[p.addr_type],
687                                   self.tun_if.local_addr[p.addr_type])
688         p.tun_sa_out.add_vpp_config()
689
690         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
691                                  p.auth_algo_vpp_id, p.auth_key,
692                                  p.crypt_algo_vpp_id, p.crypt_key,
693                                  self.vpp_esp_protocol,
694                                  self.tun_if.remote_addr[p.addr_type],
695                                  self.tun_if.local_addr[p.addr_type])
696         p.tun_sa_in.add_vpp_config()
697
698     def config_protect(self, p):
699         p.tun_protect = VppIpsecTunProtect(self,
700                                            p.tun_if,
701                                            p.tun_sa_out,
702                                            [p.tun_sa_in])
703         p.tun_protect.add_vpp_config()
704
705     def config_network(self, p):
706         p.tun_if = VppIpIpTunInterface(self, self.pg0,
707                                        self.pg0.local_ip4,
708                                        self.pg0.remote_ip4)
709         p.tun_if.add_vpp_config()
710         p.tun_if.admin_up()
711         p.tun_if.config_ip4()
712
713         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
714                              [VppRoutePath(p.tun_if.remote_ip4,
715                                            0xffffffff)])
716         p.route.add_vpp_config()
717
718     def unconfig_network(self, p):
719         p.route.remove_vpp_config()
720         p.tun_if.remove_vpp_config()
721
722     def unconfig_protect(self, p):
723         p.tun_protect.remove_vpp_config()
724
725     def unconfig_sa(self, p):
726         p.tun_sa_out.remove_vpp_config()
727         p.tun_sa_in.remove_vpp_config()
728
729
730 class TestIpsec4TunProtect(TemplateIpsec,
731                            TemplateIpsec4TunProtect,
732                            IpsecTun4):
733     """ IPsec IPv4 Tunnel protect - transport mode"""
734
735     encryption_type = ESP
736     tun4_encrypt_node_name = "esp4-encrypt-tun"
737     tun4_decrypt_node_name = "esp4-decrypt-tun"
738
739     def setUp(self):
740         super(TestIpsec4TunProtect, self).setUp()
741
742         self.tun_if = self.pg0
743
744     def tearDown(self):
745         super(TestIpsec4TunProtect, self).tearDown()
746
747     def test_tun_44(self):
748         """IPSEC tunnel protect"""
749
750         p = self.ipv4_params
751
752         self.config_network(p)
753         self.config_sa_tra(p)
754         self.config_protect(p)
755
756         self.verify_tun_44(p, count=127)
757         c = p.tun_if.get_rx_stats()
758         self.assertEqual(c['packets'], 127)
759         c = p.tun_if.get_tx_stats()
760         self.assertEqual(c['packets'], 127)
761
762         # rekey - create new SAs and update the tunnel protection
763         np = copy.copy(p)
764         np.crypt_key = 'X' + p.crypt_key[1:]
765         np.scapy_tun_spi += 100
766         np.scapy_tun_sa_id += 1
767         np.vpp_tun_spi += 100
768         np.vpp_tun_sa_id += 1
769         np.tun_if.local_spi = p.vpp_tun_spi
770         np.tun_if.remote_spi = p.scapy_tun_spi
771
772         self.config_sa_tra(np)
773         self.config_protect(np)
774         self.unconfig_sa(p)
775
776         self.verify_tun_44(np, count=127)
777         c = p.tun_if.get_rx_stats()
778         self.assertEqual(c['packets'], 254)
779         c = p.tun_if.get_tx_stats()
780         self.assertEqual(c['packets'], 254)
781
782         # teardown
783         self.unconfig_protect(np)
784         self.unconfig_sa(np)
785         self.unconfig_network(p)
786
787
788 class TestIpsec4TunProtectTun(TemplateIpsec,
789                               TemplateIpsec4TunProtect,
790                               IpsecTun4):
791     """ IPsec IPv4 Tunnel protect - tunnel mode"""
792
793     encryption_type = ESP
794     tun4_encrypt_node_name = "esp4-encrypt-tun"
795     tun4_decrypt_node_name = "esp4-decrypt-tun"
796
797     def setUp(self):
798         super(TestIpsec4TunProtectTun, self).setUp()
799
800         self.tun_if = self.pg0
801
802     def tearDown(self):
803         super(TestIpsec4TunProtectTun, self).tearDown()
804
805     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
806                          payload_size=100):
807         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
808                 sa.encrypt(IP(src=sw_intf.remote_ip4,
809                               dst=sw_intf.local_ip4) /
810                            IP(src=src, dst=dst) /
811                            UDP(sport=1144, dport=2233) /
812                            Raw('X' * payload_size))
813                 for i in range(count)]
814
815     def gen_pkts(self, sw_intf, src, dst, count=1,
816                  payload_size=100):
817         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
818                 IP(src=src, dst=dst) /
819                 UDP(sport=1144, dport=2233) /
820                 Raw('X' * payload_size)
821                 for i in range(count)]
822
823     def verify_decrypted(self, p, rxs):
824         for rx in rxs:
825             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
826             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
827             self.assert_packet_checksums_valid(rx)
828
829     def verify_encrypted(self, p, sa, rxs):
830         for rx in rxs:
831             try:
832                 pkt = sa.decrypt(rx[IP])
833                 if not pkt.haslayer(IP):
834                     pkt = IP(pkt[Raw].load)
835                 self.assert_packet_checksums_valid(pkt)
836                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
837                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
838                 inner = pkt[IP].payload
839                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
840
841             except (IndexError, AssertionError):
842                 self.logger.debug(ppp("Unexpected packet:", rx))
843                 try:
844                     self.logger.debug(ppp("Decrypted packet:", pkt))
845                 except:
846                     pass
847                 raise
848
849     def test_tun_44(self):
850         """IPSEC tunnel protect """
851
852         p = self.ipv4_params
853
854         self.config_network(p)
855         self.config_sa_tun(p)
856         self.config_protect(p)
857
858         self.verify_tun_44(p, count=127)
859
860         c = p.tun_if.get_rx_stats()
861         self.assertEqual(c['packets'], 127)
862         c = p.tun_if.get_tx_stats()
863         self.assertEqual(c['packets'], 127)
864
865         # rekey - create new SAs and update the tunnel protection
866         np = copy.copy(p)
867         np.crypt_key = 'X' + p.crypt_key[1:]
868         np.scapy_tun_spi += 100
869         np.scapy_tun_sa_id += 1
870         np.vpp_tun_spi += 100
871         np.vpp_tun_sa_id += 1
872         np.tun_if.local_spi = p.vpp_tun_spi
873         np.tun_if.remote_spi = p.scapy_tun_spi
874
875         self.config_sa_tun(np)
876         self.config_protect(np)
877         self.unconfig_sa(p)
878
879         self.verify_tun_44(np, count=127)
880         c = p.tun_if.get_rx_stats()
881         self.assertEqual(c['packets'], 254)
882         c = p.tun_if.get_tx_stats()
883         self.assertEqual(c['packets'], 254)
884
885         # teardown
886         self.unconfig_protect(np)
887         self.unconfig_sa(np)
888         self.unconfig_network(p)
889
890
891 class TemplateIpsec6TunProtect(object):
892     """ IPsec IPv6 Tunnel protect """
893
894     def config_sa_tra(self, p):
895         config_tun_params(p, self.encryption_type, self.tun_if)
896
897         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
898                                   p.auth_algo_vpp_id, p.auth_key,
899                                   p.crypt_algo_vpp_id, p.crypt_key,
900                                   self.vpp_esp_protocol)
901         p.tun_sa_out.add_vpp_config()
902
903         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
904                                  p.auth_algo_vpp_id, p.auth_key,
905                                  p.crypt_algo_vpp_id, p.crypt_key,
906                                  self.vpp_esp_protocol)
907         p.tun_sa_in.add_vpp_config()
908
909     def config_sa_tun(self, p):
910         config_tun_params(p, self.encryption_type, self.tun_if)
911
912         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
913                                   p.auth_algo_vpp_id, p.auth_key,
914                                   p.crypt_algo_vpp_id, p.crypt_key,
915                                   self.vpp_esp_protocol,
916                                   self.tun_if.remote_addr[p.addr_type],
917                                   self.tun_if.local_addr[p.addr_type])
918         p.tun_sa_out.add_vpp_config()
919
920         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
921                                  p.auth_algo_vpp_id, p.auth_key,
922                                  p.crypt_algo_vpp_id, p.crypt_key,
923                                  self.vpp_esp_protocol,
924                                  self.tun_if.remote_addr[p.addr_type],
925                                  self.tun_if.local_addr[p.addr_type])
926         p.tun_sa_in.add_vpp_config()
927
928     def config_protect(self, p):
929         p.tun_protect = VppIpsecTunProtect(self,
930                                            p.tun_if,
931                                            p.tun_sa_out,
932                                            [p.tun_sa_in])
933         p.tun_protect.add_vpp_config()
934
935     def config_network(self, p):
936         p.tun_if = VppIpIpTunInterface(self, self.pg0,
937                                        self.pg0.local_ip6,
938                                        self.pg0.remote_ip6)
939         p.tun_if.add_vpp_config()
940         p.tun_if.admin_up()
941         p.tun_if.config_ip6()
942
943         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
944                              [VppRoutePath(p.tun_if.remote_ip6,
945                                            0xffffffff,
946                                            proto=DpoProto.DPO_PROTO_IP6)])
947         p.route.add_vpp_config()
948
949     def unconfig_network(self, p):
950         p.route.remove_vpp_config()
951         p.tun_if.remove_vpp_config()
952
953     def unconfig_protect(self, p):
954         p.tun_protect.remove_vpp_config()
955
956     def unconfig_sa(self, p):
957         p.tun_sa_out.remove_vpp_config()
958         p.tun_sa_in.remove_vpp_config()
959
960
961 class TestIpsec6TunProtect(TemplateIpsec,
962                            TemplateIpsec6TunProtect,
963                            IpsecTun6):
964     """ IPsec IPv6 Tunnel protect - transport mode"""
965
966     encryption_type = ESP
967     tun6_encrypt_node_name = "esp6-encrypt-tun"
968     tun6_decrypt_node_name = "esp6-decrypt-tun"
969
970     def setUp(self):
971         super(TestIpsec6TunProtect, self).setUp()
972
973         self.tun_if = self.pg0
974
975     def tearDown(self):
976         super(TestIpsec6TunProtect, self).tearDown()
977
978     def test_tun_66(self):
979         """IPSEC tunnel protect"""
980
981         p = self.ipv6_params
982
983         self.config_network(p)
984         self.config_sa_tra(p)
985         self.config_protect(p)
986
987         self.verify_tun_66(p, count=127)
988         c = p.tun_if.get_rx_stats()
989         self.assertEqual(c['packets'], 127)
990         c = p.tun_if.get_tx_stats()
991         self.assertEqual(c['packets'], 127)
992
993         # rekey - create new SAs and update the tunnel protection
994         np = copy.copy(p)
995         np.crypt_key = 'X' + p.crypt_key[1:]
996         np.scapy_tun_spi += 100
997         np.scapy_tun_sa_id += 1
998         np.vpp_tun_spi += 100
999         np.vpp_tun_sa_id += 1
1000         np.tun_if.local_spi = p.vpp_tun_spi
1001         np.tun_if.remote_spi = p.scapy_tun_spi
1002
1003         self.config_sa_tra(np)
1004         self.config_protect(np)
1005         self.unconfig_sa(p)
1006
1007         self.verify_tun_66(np, count=127)
1008         c = p.tun_if.get_rx_stats()
1009         self.assertEqual(c['packets'], 254)
1010         c = p.tun_if.get_tx_stats()
1011         self.assertEqual(c['packets'], 254)
1012
1013         # 3 phase rekey
1014         #  1) add two input SAs [old, new]
1015         #  2) swap output SA to [new]
1016         #  3) use only [new] input SA
1017         np3 = copy.copy(np)
1018         np3.crypt_key = 'Z' + p.crypt_key[1:]
1019         np3.scapy_tun_spi += 100
1020         np3.scapy_tun_sa_id += 1
1021         np3.vpp_tun_spi += 100
1022         np3.vpp_tun_sa_id += 1
1023         np3.tun_if.local_spi = p.vpp_tun_spi
1024         np3.tun_if.remote_spi = p.scapy_tun_spi
1025
1026         self.config_sa_tra(np3)
1027
1028         # step 1;
1029         p.tun_protect.update_vpp_config(np.tun_sa_out,
1030                                         [np.tun_sa_in, np3.tun_sa_in])
1031         self.verify_tun_66(np, np, count=127)
1032         self.verify_tun_66(np3, np, count=127)
1033
1034         # step 2;
1035         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1036                                         [np.tun_sa_in, np3.tun_sa_in])
1037         self.verify_tun_66(np, np3, count=127)
1038         self.verify_tun_66(np3, np3, count=127)
1039
1040         # step 1;
1041         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1042                                         [np3.tun_sa_in])
1043         self.verify_tun_66(np3, np3, count=127)
1044         self.verify_drop_tun_66(np, count=127)
1045
1046         c = p.tun_if.get_rx_stats()
1047         self.assertEqual(c['packets'], 127*7)
1048         c = p.tun_if.get_tx_stats()
1049         self.assertEqual(c['packets'], 127*7)
1050         self.unconfig_sa(np)
1051
1052         # teardown
1053         self.unconfig_protect(np3)
1054         self.unconfig_sa(np3)
1055         self.unconfig_network(p)
1056
1057
1058 class TestIpsec6TunProtectTun(TemplateIpsec,
1059                               TemplateIpsec6TunProtect,
1060                               IpsecTun6):
1061     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1062
1063     encryption_type = ESP
1064     tun6_encrypt_node_name = "esp6-encrypt-tun"
1065     tun6_decrypt_node_name = "esp6-decrypt-tun"
1066
1067     def setUp(self):
1068         super(TestIpsec6TunProtectTun, self).setUp()
1069
1070         self.tun_if = self.pg0
1071
1072     def tearDown(self):
1073         super(TestIpsec6TunProtectTun, self).tearDown()
1074
1075     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1076                           payload_size=100):
1077         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1078                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1079                                 dst=sw_intf.local_ip6) /
1080                            IPv6(src=src, dst=dst) /
1081                            UDP(sport=1166, dport=2233) /
1082                            Raw('X' * payload_size))
1083                 for i in range(count)]
1084
1085     def gen_pkts6(self, sw_intf, src, dst, count=1,
1086                   payload_size=100):
1087         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1088                 IPv6(src=src, dst=dst) /
1089                 UDP(sport=1166, dport=2233) /
1090                 Raw('X' * payload_size)
1091                 for i in range(count)]
1092
1093     def verify_decrypted6(self, p, rxs):
1094         for rx in rxs:
1095             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1096             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1097             self.assert_packet_checksums_valid(rx)
1098
1099     def verify_encrypted6(self, p, sa, rxs):
1100         for rx in rxs:
1101             try:
1102                 pkt = sa.decrypt(rx[IPv6])
1103                 if not pkt.haslayer(IPv6):
1104                     pkt = IPv6(pkt[Raw].load)
1105                 self.assert_packet_checksums_valid(pkt)
1106                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1107                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1108                 inner = pkt[IPv6].payload
1109                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1110
1111             except (IndexError, AssertionError):
1112                 self.logger.debug(ppp("Unexpected packet:", rx))
1113                 try:
1114                     self.logger.debug(ppp("Decrypted packet:", pkt))
1115                 except:
1116                     pass
1117                 raise
1118
1119     def test_tun_66(self):
1120         """IPSEC tunnel protect """
1121
1122         p = self.ipv6_params
1123
1124         self.config_network(p)
1125         self.config_sa_tun(p)
1126         self.config_protect(p)
1127
1128         self.verify_tun_66(p, count=127)
1129
1130         c = p.tun_if.get_rx_stats()
1131         self.assertEqual(c['packets'], 127)
1132         c = p.tun_if.get_tx_stats()
1133         self.assertEqual(c['packets'], 127)
1134
1135         # rekey - create new SAs and update the tunnel protection
1136         np = copy.copy(p)
1137         np.crypt_key = 'X' + p.crypt_key[1:]
1138         np.scapy_tun_spi += 100
1139         np.scapy_tun_sa_id += 1
1140         np.vpp_tun_spi += 100
1141         np.vpp_tun_sa_id += 1
1142         np.tun_if.local_spi = p.vpp_tun_spi
1143         np.tun_if.remote_spi = p.scapy_tun_spi
1144
1145         self.config_sa_tun(np)
1146         self.config_protect(np)
1147         self.unconfig_sa(p)
1148
1149         self.verify_tun_66(np, count=127)
1150         c = p.tun_if.get_rx_stats()
1151         self.assertEqual(c['packets'], 254)
1152         c = p.tun_if.get_tx_stats()
1153         self.assertEqual(c['packets'], 254)
1154
1155         # teardown
1156         self.unconfig_protect(np)
1157         self.unconfig_sa(np)
1158         self.unconfig_network(p)
1159
1160
1161 if __name__ == '__main__':
1162     unittest.main(testRunner=VppTestRunner)