ipsec: remove dedicated IPSec tunnels
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 from util import ppp
19 from vpp_papi import VppEnum
20
21
22 def config_tun_params(p, encryption_type, tun_if):
23     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
24     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
25                               IPSEC_API_SAD_FLAG_USE_ESN))
26     crypt_key = mk_scapy_crypt_key(p)
27     p.scapy_tun_sa = SecurityAssociation(
28         encryption_type, spi=p.vpp_tun_spi,
29         crypt_algo=p.crypt_algo,
30         crypt_key=crypt_key,
31         auth_algo=p.auth_algo, auth_key=p.auth_key,
32         tunnel_header=ip_class_by_addr_type[p.addr_type](
33             src=tun_if.remote_ip,
34             dst=tun_if.local_ip),
35         nat_t_header=p.nat_header,
36         use_esn=use_esn)
37     p.vpp_tun_sa = SecurityAssociation(
38         encryption_type, spi=p.scapy_tun_spi,
39         crypt_algo=p.crypt_algo,
40         crypt_key=crypt_key,
41         auth_algo=p.auth_algo, auth_key=p.auth_key,
42         tunnel_header=ip_class_by_addr_type[p.addr_type](
43             dst=tun_if.remote_ip,
44             src=tun_if.local_ip),
45         nat_t_header=p.nat_header,
46         use_esn=use_esn)
47
48
49 class TemplateIpsec4TunIfEsp(TemplateIpsec):
50     """ IPsec tunnel interface tests """
51
52     encryption_type = ESP
53
54     @classmethod
55     def setUpClass(cls):
56         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
57
58     @classmethod
59     def tearDownClass(cls):
60         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
61
62     def setUp(self):
63         super(TemplateIpsec4TunIfEsp, self).setUp()
64
65         self.tun_if = self.pg0
66
67         p = self.ipv4_params
68
69         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
70                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
71                                         p.crypt_key, p.crypt_key,
72                                         p.auth_algo_vpp_id, p.auth_key,
73                                         p.auth_key)
74         p.tun_if.add_vpp_config()
75         p.tun_if.admin_up()
76         p.tun_if.config_ip4()
77         p.tun_if.config_ip6()
78         config_tun_params(p, self.encryption_type, p.tun_if)
79
80         r = VppIpRoute(self, p.remote_tun_if_host, 32,
81                        [VppRoutePath(p.tun_if.remote_ip4,
82                                      0xffffffff)])
83         r.add_vpp_config()
84         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
85                        [VppRoutePath(p.tun_if.remote_ip6,
86                                      0xffffffff,
87                                      proto=DpoProto.DPO_PROTO_IP6)])
88         r.add_vpp_config()
89
90     def tearDown(self):
91         super(TemplateIpsec4TunIfEsp, self).tearDown()
92
93
94 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
95     """ IPsec UDP tunnel interface tests """
96
97     tun4_encrypt_node_name = "esp4-encrypt-tun"
98     tun4_decrypt_node_name = "esp4-decrypt-tun"
99     encryption_type = ESP
100
101     @classmethod
102     def setUpClass(cls):
103         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
104
105     @classmethod
106     def tearDownClass(cls):
107         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
108
109     def setUp(self):
110         super(TemplateIpsec4TunIfEspUdp, self).setUp()
111
112         self.tun_if = self.pg0
113
114         p = self.ipv4_params
115         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
116                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
117         p.nat_header = UDP(sport=5454, dport=4500)
118
119         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
120                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
121                                         p.crypt_key, p.crypt_key,
122                                         p.auth_algo_vpp_id, p.auth_key,
123                                         p.auth_key, udp_encap=True)
124         p.tun_if.add_vpp_config()
125         p.tun_if.admin_up()
126         p.tun_if.config_ip4()
127         p.tun_if.config_ip6()
128         config_tun_params(p, self.encryption_type, p.tun_if)
129
130         r = VppIpRoute(self, p.remote_tun_if_host, 32,
131                        [VppRoutePath(p.tun_if.remote_ip4,
132                                      0xffffffff)])
133         r.add_vpp_config()
134         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
135                        [VppRoutePath(p.tun_if.remote_ip6,
136                                      0xffffffff,
137                                      proto=DpoProto.DPO_PROTO_IP6)])
138         r.add_vpp_config()
139
140     def tearDown(self):
141         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
142
143
144 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
145     """ Ipsec ESP - TUN tests """
146     tun4_encrypt_node_name = "esp4-encrypt-tun"
147     tun4_decrypt_node_name = "esp4-decrypt-tun"
148
149     def test_tun_basic64(self):
150         """ ipsec 6o4 tunnel basic test """
151         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
152
153         self.verify_tun_64(self.params[socket.AF_INET], count=1)
154
155     def test_tun_burst64(self):
156         """ ipsec 6o4 tunnel basic test """
157         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
158
159         self.verify_tun_64(self.params[socket.AF_INET], count=257)
160
161     def test_tun_basic_frag44(self):
162         """ ipsec 4o4 tunnel frag basic test """
163         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
164
165         p = self.ipv4_params
166
167         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
168                                        [1500, 0, 0, 0])
169         self.verify_tun_44(self.params[socket.AF_INET],
170                            count=1, payload_size=1800, n_rx=2)
171         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
172                                        [9000, 0, 0, 0])
173
174
175 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
176     """ Ipsec ESP UDP tests """
177
178     tun4_input_node = "ipsec4-tun-input"
179
180     def test_keepalive(self):
181         """ IPSEC NAT Keepalive """
182         self.verify_keepalive(self.ipv4_params)
183
184
185 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
186     """ Ipsec ESP - TCP tests """
187     pass
188
189
190 class TemplateIpsec6TunIfEsp(TemplateIpsec):
191     """ IPsec tunnel interface tests """
192
193     encryption_type = ESP
194
195     def setUp(self):
196         super(TemplateIpsec6TunIfEsp, self).setUp()
197
198         self.tun_if = self.pg0
199
200         p = self.ipv6_params
201         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
202                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
203                                         p.crypt_key, p.crypt_key,
204                                         p.auth_algo_vpp_id, p.auth_key,
205                                         p.auth_key, is_ip6=True)
206         p.tun_if.add_vpp_config()
207         p.tun_if.admin_up()
208         p.tun_if.config_ip6()
209         p.tun_if.config_ip4()
210         config_tun_params(p, self.encryption_type, p.tun_if)
211
212         r = VppIpRoute(self, p.remote_tun_if_host, 128,
213                        [VppRoutePath(p.tun_if.remote_ip6,
214                                      0xffffffff,
215                                      proto=DpoProto.DPO_PROTO_IP6)])
216         r.add_vpp_config()
217         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
218                        [VppRoutePath(p.tun_if.remote_ip4,
219                                      0xffffffff)])
220         r.add_vpp_config()
221
222     def tearDown(self):
223         super(TemplateIpsec6TunIfEsp, self).tearDown()
224
225
226 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
227     """ Ipsec ESP - TUN tests """
228     tun6_encrypt_node_name = "esp6-encrypt-tun"
229     tun6_decrypt_node_name = "esp6-decrypt-tun"
230
231     def test_tun_basic46(self):
232         """ ipsec 4o6 tunnel basic test """
233         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
234         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
235
236     def test_tun_burst46(self):
237         """ ipsec 4o6 tunnel burst test """
238         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
239         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
240
241
242 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
243     """ IPsec IPv4 Multi Tunnel interface """
244
245     encryption_type = ESP
246     tun4_encrypt_node_name = "esp4-encrypt-tun"
247     tun4_decrypt_node_name = "esp4-decrypt-tun"
248
249     def setUp(self):
250         super(TestIpsec4MultiTunIfEsp, self).setUp()
251
252         self.tun_if = self.pg0
253
254         self.multi_params = []
255         self.pg0.generate_remote_hosts(10)
256         self.pg0.configure_ipv4_neighbors()
257
258         for ii in range(10):
259             p = copy.copy(self.ipv4_params)
260
261             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
262             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
263             p.scapy_tun_spi = p.scapy_tun_spi + ii
264             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
265             p.vpp_tun_spi = p.vpp_tun_spi + ii
266
267             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
268             p.scapy_tra_spi = p.scapy_tra_spi + ii
269             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
270             p.vpp_tra_spi = p.vpp_tra_spi + ii
271
272             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
273                                             p.scapy_tun_spi,
274                                             p.crypt_algo_vpp_id,
275                                             p.crypt_key, p.crypt_key,
276                                             p.auth_algo_vpp_id, p.auth_key,
277                                             p.auth_key,
278                                             dst=self.pg0.remote_hosts[ii].ip4)
279             p.tun_if.add_vpp_config()
280             p.tun_if.admin_up()
281             p.tun_if.config_ip4()
282             config_tun_params(p, self.encryption_type, p.tun_if)
283             self.multi_params.append(p)
284
285             VppIpRoute(self, p.remote_tun_if_host, 32,
286                        [VppRoutePath(p.tun_if.remote_ip4,
287                                      0xffffffff)]).add_vpp_config()
288
289     def tearDown(self):
290         super(TestIpsec4MultiTunIfEsp, self).tearDown()
291
292     def test_tun_44(self):
293         """Multiple IPSEC tunnel interfaces """
294         for p in self.multi_params:
295             self.verify_tun_44(p, count=127)
296             c = p.tun_if.get_rx_stats()
297             self.assertEqual(c['packets'], 127)
298             c = p.tun_if.get_tx_stats()
299             self.assertEqual(c['packets'], 127)
300
301
302 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
303     """ IPsec IPv4 Tunnel interface all Algos """
304
305     encryption_type = ESP
306     tun4_encrypt_node_name = "esp4-encrypt-tun"
307     tun4_decrypt_node_name = "esp4-decrypt-tun"
308
309     def config_network(self, p):
310
311         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
312                                         p.scapy_tun_spi,
313                                         p.crypt_algo_vpp_id,
314                                         p.crypt_key, p.crypt_key,
315                                         p.auth_algo_vpp_id, p.auth_key,
316                                         p.auth_key,
317                                         salt=p.salt)
318         p.tun_if.add_vpp_config()
319         p.tun_if.admin_up()
320         p.tun_if.config_ip4()
321         config_tun_params(p, self.encryption_type, p.tun_if)
322         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
323         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
324
325         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
326                              [VppRoutePath(p.tun_if.remote_ip4,
327                                            0xffffffff)])
328         p.route.add_vpp_config()
329
330     def unconfig_network(self, p):
331         p.tun_if.unconfig_ip4()
332         p.tun_if.remove_vpp_config()
333         p.route.remove_vpp_config()
334
335     def setUp(self):
336         super(TestIpsec4TunIfEspAll, self).setUp()
337
338         self.tun_if = self.pg0
339
340     def tearDown(self):
341         super(TestIpsec4TunIfEspAll, self).tearDown()
342
343     def rekey(self, p):
344         #
345         # change the key and the SPI
346         #
347         p.crypt_key = b'X' + p.crypt_key[1:]
348         p.scapy_tun_spi += 1
349         p.scapy_tun_sa_id += 1
350         p.vpp_tun_spi += 1
351         p.vpp_tun_sa_id += 1
352         p.tun_if.local_spi = p.vpp_tun_spi
353         p.tun_if.remote_spi = p.scapy_tun_spi
354
355         config_tun_params(p, self.encryption_type, p.tun_if)
356
357         p.tun_sa_in = VppIpsecSA(self,
358                                  p.scapy_tun_sa_id,
359                                  p.scapy_tun_spi,
360                                  p.auth_algo_vpp_id,
361                                  p.auth_key,
362                                  p.crypt_algo_vpp_id,
363                                  p.crypt_key,
364                                  self.vpp_esp_protocol,
365                                  flags=p.flags,
366                                  salt=p.salt)
367         p.tun_sa_out = VppIpsecSA(self,
368                                   p.vpp_tun_sa_id,
369                                   p.vpp_tun_spi,
370                                   p.auth_algo_vpp_id,
371                                   p.auth_key,
372                                   p.crypt_algo_vpp_id,
373                                   p.crypt_key,
374                                   self.vpp_esp_protocol,
375                                   flags=p.flags,
376                                   salt=p.salt)
377         p.tun_sa_in.add_vpp_config()
378         p.tun_sa_out.add_vpp_config()
379
380         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
381                                          sa_id=p.tun_sa_in.id,
382                                          is_outbound=1)
383         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
384                                          sa_id=p.tun_sa_out.id,
385                                          is_outbound=0)
386         self.logger.info(self.vapi.cli("sh ipsec sa"))
387
388     def test_tun_44(self):
389         """IPSEC tunnel all algos """
390
391         # foreach VPP crypto engine
392         engines = ["ia32", "ipsecmb", "openssl"]
393
394         # foreach crypto algorithm
395         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
396                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
397                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
398                                 IPSEC_API_INTEG_ALG_NONE),
399                   'scapy-crypto': "AES-GCM",
400                   'scapy-integ': "NULL",
401                   'key': b"JPjyOWBeVEQiMe7h",
402                   'salt': 3333},
403                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
404                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
405                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
406                                 IPSEC_API_INTEG_ALG_NONE),
407                   'scapy-crypto': "AES-GCM",
408                   'scapy-integ': "NULL",
409                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
410                   'salt': 0},
411                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
412                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
413                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
414                                 IPSEC_API_INTEG_ALG_NONE),
415                   'scapy-crypto': "AES-GCM",
416                   'scapy-integ': "NULL",
417                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
418                   'salt': 9999},
419                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
420                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
421                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
422                                 IPSEC_API_INTEG_ALG_SHA1_96),
423                   'scapy-crypto': "AES-CBC",
424                   'scapy-integ': "HMAC-SHA1-96",
425                   'salt': 0,
426                   'key': b"JPjyOWBeVEQiMe7h"},
427                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
428                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
429                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
430                                 IPSEC_API_INTEG_ALG_SHA1_96),
431                   'scapy-crypto': "AES-CBC",
432                   'scapy-integ': "HMAC-SHA1-96",
433                   'salt': 0,
434                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
435                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
436                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
437                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
438                                 IPSEC_API_INTEG_ALG_SHA1_96),
439                   'scapy-crypto': "AES-CBC",
440                   'scapy-integ': "HMAC-SHA1-96",
441                   'salt': 0,
442                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
443                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
444                                  IPSEC_API_CRYPTO_ALG_NONE),
445                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
446                                 IPSEC_API_INTEG_ALG_SHA1_96),
447                   'scapy-crypto': "NULL",
448                   'scapy-integ': "HMAC-SHA1-96",
449                   'salt': 0,
450                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
451
452         for engine in engines:
453             self.vapi.cli("set crypto handler all %s" % engine)
454
455             #
456             # loop through each of the algorithms
457             #
458             for algo in algos:
459                 # with self.subTest(algo=algo['scapy']):
460
461                 p = copy.copy(self.ipv4_params)
462                 p.auth_algo_vpp_id = algo['vpp-integ']
463                 p.crypt_algo_vpp_id = algo['vpp-crypto']
464                 p.crypt_algo = algo['scapy-crypto']
465                 p.auth_algo = algo['scapy-integ']
466                 p.crypt_key = algo['key']
467                 p.salt = algo['salt']
468
469                 self.config_network(p)
470
471                 self.verify_tun_44(p, count=127)
472                 c = p.tun_if.get_rx_stats()
473                 self.assertEqual(c['packets'], 127)
474                 c = p.tun_if.get_tx_stats()
475                 self.assertEqual(c['packets'], 127)
476
477                 #
478                 # rekey the tunnel
479                 #
480                 self.rekey(p)
481                 self.verify_tun_44(p, count=127)
482
483                 self.unconfig_network(p)
484                 p.tun_sa_out.remove_vpp_config()
485                 p.tun_sa_in.remove_vpp_config()
486
487
488 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
489     """ IPsec IPv6 Multi Tunnel interface """
490
491     encryption_type = ESP
492     tun6_encrypt_node_name = "esp6-encrypt-tun"
493     tun6_decrypt_node_name = "esp6-decrypt-tun"
494
495     def setUp(self):
496         super(TestIpsec6MultiTunIfEsp, self).setUp()
497
498         self.tun_if = self.pg0
499
500         self.multi_params = []
501         self.pg0.generate_remote_hosts(10)
502         self.pg0.configure_ipv6_neighbors()
503
504         for ii in range(10):
505             p = copy.copy(self.ipv6_params)
506
507             p.remote_tun_if_host = "1111::%d" % (ii + 1)
508             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
509             p.scapy_tun_spi = p.scapy_tun_spi + ii
510             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
511             p.vpp_tun_spi = p.vpp_tun_spi + ii
512
513             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
514             p.scapy_tra_spi = p.scapy_tra_spi + ii
515             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
516             p.vpp_tra_spi = p.vpp_tra_spi + ii
517
518             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
519                                             p.scapy_tun_spi,
520                                             p.crypt_algo_vpp_id,
521                                             p.crypt_key, p.crypt_key,
522                                             p.auth_algo_vpp_id, p.auth_key,
523                                             p.auth_key, is_ip6=True,
524                                             dst=self.pg0.remote_hosts[ii].ip6)
525             p.tun_if.add_vpp_config()
526             p.tun_if.admin_up()
527             p.tun_if.config_ip6()
528             config_tun_params(p, self.encryption_type, p.tun_if)
529             self.multi_params.append(p)
530
531             r = VppIpRoute(self, p.remote_tun_if_host, 128,
532                            [VppRoutePath(p.tun_if.remote_ip6,
533                                          0xffffffff,
534                                          proto=DpoProto.DPO_PROTO_IP6)])
535             r.add_vpp_config()
536
537     def tearDown(self):
538         super(TestIpsec6MultiTunIfEsp, self).tearDown()
539
540     def test_tun_66(self):
541         """Multiple IPSEC tunnel interfaces """
542         for p in self.multi_params:
543             self.verify_tun_66(p, count=127)
544             c = p.tun_if.get_rx_stats()
545             self.assertEqual(c['packets'], 127)
546             c = p.tun_if.get_tx_stats()
547             self.assertEqual(c['packets'], 127)
548
549
550 class TestIpsecGreTebIfEsp(TemplateIpsec,
551                            IpsecTun4Tests):
552     """ Ipsec GRE TEB ESP - TUN tests """
553     tun4_encrypt_node_name = "esp4-encrypt-tun"
554     tun4_decrypt_node_name = "esp4-decrypt-tun"
555     encryption_type = ESP
556     omac = "00:11:22:33:44:55"
557
558     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
559                          payload_size=100):
560         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
561                 sa.encrypt(IP(src=self.pg0.remote_ip4,
562                               dst=self.pg0.local_ip4) /
563                            GRE() /
564                            Ether(dst=self.omac) /
565                            IP(src="1.1.1.1", dst="1.1.1.2") /
566                            UDP(sport=1144, dport=2233) /
567                            Raw(b'X' * payload_size))
568                 for i in range(count)]
569
570     def gen_pkts(self, sw_intf, src, dst, count=1,
571                  payload_size=100):
572         return [Ether(dst=self.omac) /
573                 IP(src="1.1.1.1", dst="1.1.1.2") /
574                 UDP(sport=1144, dport=2233) /
575                 Raw(b'X' * payload_size)
576                 for i in range(count)]
577
578     def verify_decrypted(self, p, rxs):
579         for rx in rxs:
580             self.assert_equal(rx[Ether].dst, self.omac)
581             self.assert_equal(rx[IP].dst, "1.1.1.2")
582
583     def verify_encrypted(self, p, sa, rxs):
584         for rx in rxs:
585             try:
586                 pkt = sa.decrypt(rx[IP])
587                 if not pkt.haslayer(IP):
588                     pkt = IP(pkt[Raw].load)
589                 self.assert_packet_checksums_valid(pkt)
590                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
591                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
592                 self.assertTrue(pkt.haslayer(GRE))
593                 e = pkt[Ether]
594                 self.assertEqual(e[Ether].dst, self.omac)
595                 self.assertEqual(e[IP].dst, "1.1.1.2")
596             except (IndexError, AssertionError):
597                 self.logger.debug(ppp("Unexpected packet:", rx))
598                 try:
599                     self.logger.debug(ppp("Decrypted packet:", pkt))
600                 except:
601                     pass
602                 raise
603
604     def setUp(self):
605         super(TestIpsecGreTebIfEsp, self).setUp()
606
607         self.tun_if = self.pg0
608
609         p = self.ipv4_params
610
611         bd1 = VppBridgeDomain(self, 1)
612         bd1.add_vpp_config()
613
614         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
615                                   p.auth_algo_vpp_id, p.auth_key,
616                                   p.crypt_algo_vpp_id, p.crypt_key,
617                                   self.vpp_esp_protocol,
618                                   self.pg0.local_ip4,
619                                   self.pg0.remote_ip4)
620         p.tun_sa_out.add_vpp_config()
621
622         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
623                                  p.auth_algo_vpp_id, p.auth_key,
624                                  p.crypt_algo_vpp_id, p.crypt_key,
625                                  self.vpp_esp_protocol,
626                                  self.pg0.remote_ip4,
627                                  self.pg0.local_ip4)
628         p.tun_sa_in.add_vpp_config()
629
630         p.tun_if = VppGreInterface(self,
631                                    self.pg0.local_ip4,
632                                    self.pg0.remote_ip4,
633                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
634                                          GRE_API_TUNNEL_TYPE_TEB))
635         p.tun_if.add_vpp_config()
636
637         p.tun_protect = VppIpsecTunProtect(self,
638                                            p.tun_if,
639                                            p.tun_sa_out,
640                                            [p.tun_sa_in])
641
642         p.tun_protect.add_vpp_config()
643
644         p.tun_if.admin_up()
645         p.tun_if.config_ip4()
646         config_tun_params(p, self.encryption_type, p.tun_if)
647
648         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
649         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
650
651         self.vapi.cli("clear ipsec sa")
652
653     def tearDown(self):
654         p = self.ipv4_params
655         p.tun_if.unconfig_ip4()
656         super(TestIpsecGreTebIfEsp, self).tearDown()
657
658
659 class TestIpsecGreIfEsp(TemplateIpsec,
660                         IpsecTun4Tests):
661     """ Ipsec GRE ESP - TUN tests """
662     tun4_encrypt_node_name = "esp4-encrypt-tun"
663     tun4_decrypt_node_name = "esp4-decrypt-tun"
664     encryption_type = ESP
665
666     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
667                          payload_size=100):
668         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
669                 sa.encrypt(IP(src=self.pg0.remote_ip4,
670                               dst=self.pg0.local_ip4) /
671                            GRE() /
672                            IP(src=self.pg1.local_ip4,
673                               dst=self.pg1.remote_ip4) /
674                            UDP(sport=1144, dport=2233) /
675                            Raw(b'X' * payload_size))
676                 for i in range(count)]
677
678     def gen_pkts(self, sw_intf, src, dst, count=1,
679                  payload_size=100):
680         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
681                 IP(src="1.1.1.1", dst="1.1.1.2") /
682                 UDP(sport=1144, dport=2233) /
683                 Raw(b'X' * payload_size)
684                 for i in range(count)]
685
686     def verify_decrypted(self, p, rxs):
687         for rx in rxs:
688             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
689             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
690
691     def verify_encrypted(self, p, sa, rxs):
692         for rx in rxs:
693             try:
694                 pkt = sa.decrypt(rx[IP])
695                 if not pkt.haslayer(IP):
696                     pkt = IP(pkt[Raw].load)
697                 self.assert_packet_checksums_valid(pkt)
698                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
699                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
700                 self.assertTrue(pkt.haslayer(GRE))
701                 e = pkt[GRE]
702                 self.assertEqual(e[IP].dst, "1.1.1.2")
703             except (IndexError, AssertionError):
704                 self.logger.debug(ppp("Unexpected packet:", rx))
705                 try:
706                     self.logger.debug(ppp("Decrypted packet:", pkt))
707                 except:
708                     pass
709                 raise
710
711     def setUp(self):
712         super(TestIpsecGreIfEsp, self).setUp()
713
714         self.tun_if = self.pg0
715
716         p = self.ipv4_params
717
718         bd1 = VppBridgeDomain(self, 1)
719         bd1.add_vpp_config()
720
721         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
722                                   p.auth_algo_vpp_id, p.auth_key,
723                                   p.crypt_algo_vpp_id, p.crypt_key,
724                                   self.vpp_esp_protocol,
725                                   self.pg0.local_ip4,
726                                   self.pg0.remote_ip4)
727         p.tun_sa_out.add_vpp_config()
728
729         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
730                                  p.auth_algo_vpp_id, p.auth_key,
731                                  p.crypt_algo_vpp_id, p.crypt_key,
732                                  self.vpp_esp_protocol,
733                                  self.pg0.remote_ip4,
734                                  self.pg0.local_ip4)
735         p.tun_sa_in.add_vpp_config()
736
737         p.tun_if = VppGreInterface(self,
738                                    self.pg0.local_ip4,
739                                    self.pg0.remote_ip4)
740         p.tun_if.add_vpp_config()
741
742         p.tun_protect = VppIpsecTunProtect(self,
743                                            p.tun_if,
744                                            p.tun_sa_out,
745                                            [p.tun_sa_in])
746         p.tun_protect.add_vpp_config()
747
748         p.tun_if.admin_up()
749         p.tun_if.config_ip4()
750         config_tun_params(p, self.encryption_type, p.tun_if)
751
752         VppIpRoute(self, "1.1.1.2", 32,
753                    [VppRoutePath(p.tun_if.remote_ip4,
754                                  0xffffffff)]).add_vpp_config()
755
756     def tearDown(self):
757         p = self.ipv4_params
758         p.tun_if.unconfig_ip4()
759         super(TestIpsecGreIfEsp, self).tearDown()
760
761
762 class TemplateIpsec4TunProtect(object):
763     """ IPsec IPv4 Tunnel protect """
764
765     encryption_type = ESP
766     tun4_encrypt_node_name = "esp4-encrypt-tun"
767     tun4_decrypt_node_name = "esp4-decrypt-tun"
768     tun4_input_node = "ipsec4-tun-input"
769
770     def config_sa_tra(self, p):
771         config_tun_params(p, self.encryption_type, p.tun_if)
772
773         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
774                                   p.auth_algo_vpp_id, p.auth_key,
775                                   p.crypt_algo_vpp_id, p.crypt_key,
776                                   self.vpp_esp_protocol,
777                                   flags=p.flags)
778         p.tun_sa_out.add_vpp_config()
779
780         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
781                                  p.auth_algo_vpp_id, p.auth_key,
782                                  p.crypt_algo_vpp_id, p.crypt_key,
783                                  self.vpp_esp_protocol,
784                                  flags=p.flags)
785         p.tun_sa_in.add_vpp_config()
786
787     def config_sa_tun(self, p):
788         config_tun_params(p, self.encryption_type, p.tun_if)
789
790         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
791                                   p.auth_algo_vpp_id, p.auth_key,
792                                   p.crypt_algo_vpp_id, p.crypt_key,
793                                   self.vpp_esp_protocol,
794                                   self.tun_if.remote_addr[p.addr_type],
795                                   self.tun_if.local_addr[p.addr_type],
796                                   flags=p.flags)
797         p.tun_sa_out.add_vpp_config()
798
799         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
800                                  p.auth_algo_vpp_id, p.auth_key,
801                                  p.crypt_algo_vpp_id, p.crypt_key,
802                                  self.vpp_esp_protocol,
803                                  self.tun_if.remote_addr[p.addr_type],
804                                  self.tun_if.local_addr[p.addr_type],
805                                  flags=p.flags)
806         p.tun_sa_in.add_vpp_config()
807
808     def config_protect(self, p):
809         p.tun_protect = VppIpsecTunProtect(self,
810                                            p.tun_if,
811                                            p.tun_sa_out,
812                                            [p.tun_sa_in])
813         p.tun_protect.add_vpp_config()
814
815     def config_network(self, p):
816         p.tun_if = VppIpIpTunInterface(self, self.pg0,
817                                        self.pg0.local_ip4,
818                                        self.pg0.remote_ip4)
819         p.tun_if.add_vpp_config()
820         p.tun_if.admin_up()
821         p.tun_if.config_ip4()
822         p.tun_if.config_ip6()
823
824         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
825                              [VppRoutePath(p.tun_if.remote_ip4,
826                                            0xffffffff)])
827         p.route.add_vpp_config()
828         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
829                        [VppRoutePath(p.tun_if.remote_ip6,
830                                      0xffffffff,
831                                      proto=DpoProto.DPO_PROTO_IP6)])
832         r.add_vpp_config()
833
834     def unconfig_network(self, p):
835         p.route.remove_vpp_config()
836         p.tun_if.remove_vpp_config()
837
838     def unconfig_protect(self, p):
839         p.tun_protect.remove_vpp_config()
840
841     def unconfig_sa(self, p):
842         p.tun_sa_out.remove_vpp_config()
843         p.tun_sa_in.remove_vpp_config()
844
845
846 class TestIpsec4TunProtect(TemplateIpsec,
847                            TemplateIpsec4TunProtect,
848                            IpsecTun4):
849     """ IPsec IPv4 Tunnel protect - transport mode"""
850
851     def setUp(self):
852         super(TestIpsec4TunProtect, self).setUp()
853
854         self.tun_if = self.pg0
855
856     def tearDown(self):
857         super(TestIpsec4TunProtect, self).tearDown()
858
859     def test_tun_44(self):
860         """IPSEC tunnel protect"""
861
862         p = self.ipv4_params
863
864         self.config_network(p)
865         self.config_sa_tra(p)
866         self.config_protect(p)
867
868         self.verify_tun_44(p, count=127)
869         c = p.tun_if.get_rx_stats()
870         self.assertEqual(c['packets'], 127)
871         c = p.tun_if.get_tx_stats()
872         self.assertEqual(c['packets'], 127)
873
874         self.vapi.cli("clear ipsec sa")
875         self.verify_tun_64(p, count=127)
876         c = p.tun_if.get_rx_stats()
877         self.assertEqual(c['packets'], 254)
878         c = p.tun_if.get_tx_stats()
879         self.assertEqual(c['packets'], 254)
880
881         # rekey - create new SAs and update the tunnel protection
882         np = copy.copy(p)
883         np.crypt_key = b'X' + p.crypt_key[1:]
884         np.scapy_tun_spi += 100
885         np.scapy_tun_sa_id += 1
886         np.vpp_tun_spi += 100
887         np.vpp_tun_sa_id += 1
888         np.tun_if.local_spi = p.vpp_tun_spi
889         np.tun_if.remote_spi = p.scapy_tun_spi
890
891         self.config_sa_tra(np)
892         self.config_protect(np)
893         self.unconfig_sa(p)
894
895         self.verify_tun_44(np, count=127)
896         c = p.tun_if.get_rx_stats()
897         self.assertEqual(c['packets'], 381)
898         c = p.tun_if.get_tx_stats()
899         self.assertEqual(c['packets'], 381)
900
901         # teardown
902         self.unconfig_protect(np)
903         self.unconfig_sa(np)
904         self.unconfig_network(p)
905
906
907 class TestIpsec4TunProtectUdp(TemplateIpsec,
908                               TemplateIpsec4TunProtect,
909                               IpsecTun4):
910     """ IPsec IPv4 Tunnel protect - transport mode"""
911
912     def setUp(self):
913         super(TestIpsec4TunProtectUdp, self).setUp()
914
915         self.tun_if = self.pg0
916
917         p = self.ipv4_params
918         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
919                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
920         p.nat_header = UDP(sport=5454, dport=4500)
921         self.config_network(p)
922         self.config_sa_tra(p)
923         self.config_protect(p)
924
925     def tearDown(self):
926         p = self.ipv4_params
927         self.unconfig_protect(p)
928         self.unconfig_sa(p)
929         self.unconfig_network(p)
930         super(TestIpsec4TunProtectUdp, self).tearDown()
931
932     def test_tun_44(self):
933         """IPSEC UDP tunnel protect"""
934
935         p = self.ipv4_params
936
937         self.verify_tun_44(p, count=127)
938         c = p.tun_if.get_rx_stats()
939         self.assertEqual(c['packets'], 127)
940         c = p.tun_if.get_tx_stats()
941         self.assertEqual(c['packets'], 127)
942
943     def test_keepalive(self):
944         """ IPSEC NAT Keepalive """
945         self.verify_keepalive(self.ipv4_params)
946
947
948 class TestIpsec4TunProtectTun(TemplateIpsec,
949                               TemplateIpsec4TunProtect,
950                               IpsecTun4):
951     """ IPsec IPv4 Tunnel protect - tunnel mode"""
952
953     encryption_type = ESP
954     tun4_encrypt_node_name = "esp4-encrypt-tun"
955     tun4_decrypt_node_name = "esp4-decrypt-tun"
956
957     def setUp(self):
958         super(TestIpsec4TunProtectTun, self).setUp()
959
960         self.tun_if = self.pg0
961
962     def tearDown(self):
963         super(TestIpsec4TunProtectTun, self).tearDown()
964
965     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
966                          payload_size=100):
967         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
968                 sa.encrypt(IP(src=sw_intf.remote_ip4,
969                               dst=sw_intf.local_ip4) /
970                            IP(src=src, dst=dst) /
971                            UDP(sport=1144, dport=2233) /
972                            Raw(b'X' * payload_size))
973                 for i in range(count)]
974
975     def gen_pkts(self, sw_intf, src, dst, count=1,
976                  payload_size=100):
977         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
978                 IP(src=src, dst=dst) /
979                 UDP(sport=1144, dport=2233) /
980                 Raw(b'X' * payload_size)
981                 for i in range(count)]
982
983     def verify_decrypted(self, p, rxs):
984         for rx in rxs:
985             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
986             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
987             self.assert_packet_checksums_valid(rx)
988
989     def verify_encrypted(self, p, sa, rxs):
990         for rx in rxs:
991             try:
992                 pkt = sa.decrypt(rx[IP])
993                 if not pkt.haslayer(IP):
994                     pkt = IP(pkt[Raw].load)
995                 self.assert_packet_checksums_valid(pkt)
996                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
997                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
998                 inner = pkt[IP].payload
999                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1000
1001             except (IndexError, AssertionError):
1002                 self.logger.debug(ppp("Unexpected packet:", rx))
1003                 try:
1004                     self.logger.debug(ppp("Decrypted packet:", pkt))
1005                 except:
1006                     pass
1007                 raise
1008
1009     def test_tun_44(self):
1010         """IPSEC tunnel protect """
1011
1012         p = self.ipv4_params
1013
1014         self.config_network(p)
1015         self.config_sa_tun(p)
1016         self.config_protect(p)
1017
1018         self.verify_tun_44(p, count=127)
1019
1020         c = p.tun_if.get_rx_stats()
1021         self.assertEqual(c['packets'], 127)
1022         c = p.tun_if.get_tx_stats()
1023         self.assertEqual(c['packets'], 127)
1024
1025         # rekey - create new SAs and update the tunnel protection
1026         np = copy.copy(p)
1027         np.crypt_key = b'X' + p.crypt_key[1:]
1028         np.scapy_tun_spi += 100
1029         np.scapy_tun_sa_id += 1
1030         np.vpp_tun_spi += 100
1031         np.vpp_tun_sa_id += 1
1032         np.tun_if.local_spi = p.vpp_tun_spi
1033         np.tun_if.remote_spi = p.scapy_tun_spi
1034
1035         self.config_sa_tun(np)
1036         self.config_protect(np)
1037         self.unconfig_sa(p)
1038
1039         self.verify_tun_44(np, count=127)
1040         c = p.tun_if.get_rx_stats()
1041         self.assertEqual(c['packets'], 254)
1042         c = p.tun_if.get_tx_stats()
1043         self.assertEqual(c['packets'], 254)
1044
1045         # teardown
1046         self.unconfig_protect(np)
1047         self.unconfig_sa(np)
1048         self.unconfig_network(p)
1049
1050
1051 class TemplateIpsec6TunProtect(object):
1052     """ IPsec IPv6 Tunnel protect """
1053
1054     def config_sa_tra(self, p):
1055         config_tun_params(p, self.encryption_type, p.tun_if)
1056
1057         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1058                                   p.auth_algo_vpp_id, p.auth_key,
1059                                   p.crypt_algo_vpp_id, p.crypt_key,
1060                                   self.vpp_esp_protocol)
1061         p.tun_sa_out.add_vpp_config()
1062
1063         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1064                                  p.auth_algo_vpp_id, p.auth_key,
1065                                  p.crypt_algo_vpp_id, p.crypt_key,
1066                                  self.vpp_esp_protocol)
1067         p.tun_sa_in.add_vpp_config()
1068
1069     def config_sa_tun(self, p):
1070         config_tun_params(p, self.encryption_type, p.tun_if)
1071
1072         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1073                                   p.auth_algo_vpp_id, p.auth_key,
1074                                   p.crypt_algo_vpp_id, p.crypt_key,
1075                                   self.vpp_esp_protocol,
1076                                   self.tun_if.remote_addr[p.addr_type],
1077                                   self.tun_if.local_addr[p.addr_type])
1078         p.tun_sa_out.add_vpp_config()
1079
1080         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1081                                  p.auth_algo_vpp_id, p.auth_key,
1082                                  p.crypt_algo_vpp_id, p.crypt_key,
1083                                  self.vpp_esp_protocol,
1084                                  self.tun_if.remote_addr[p.addr_type],
1085                                  self.tun_if.local_addr[p.addr_type])
1086         p.tun_sa_in.add_vpp_config()
1087
1088     def config_protect(self, p):
1089         p.tun_protect = VppIpsecTunProtect(self,
1090                                            p.tun_if,
1091                                            p.tun_sa_out,
1092                                            [p.tun_sa_in])
1093         p.tun_protect.add_vpp_config()
1094
1095     def config_network(self, p):
1096         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1097                                        self.pg0.local_ip6,
1098                                        self.pg0.remote_ip6)
1099         p.tun_if.add_vpp_config()
1100         p.tun_if.admin_up()
1101         p.tun_if.config_ip6()
1102         p.tun_if.config_ip4()
1103
1104         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1105                              [VppRoutePath(p.tun_if.remote_ip6,
1106                                            0xffffffff,
1107                                            proto=DpoProto.DPO_PROTO_IP6)])
1108         p.route.add_vpp_config()
1109         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1110                        [VppRoutePath(p.tun_if.remote_ip4,
1111                                      0xffffffff)])
1112         r.add_vpp_config()
1113
1114     def unconfig_network(self, p):
1115         p.route.remove_vpp_config()
1116         p.tun_if.remove_vpp_config()
1117
1118     def unconfig_protect(self, p):
1119         p.tun_protect.remove_vpp_config()
1120
1121     def unconfig_sa(self, p):
1122         p.tun_sa_out.remove_vpp_config()
1123         p.tun_sa_in.remove_vpp_config()
1124
1125
1126 class TestIpsec6TunProtect(TemplateIpsec,
1127                            TemplateIpsec6TunProtect,
1128                            IpsecTun6):
1129     """ IPsec IPv6 Tunnel protect - transport mode"""
1130
1131     encryption_type = ESP
1132     tun6_encrypt_node_name = "esp6-encrypt-tun"
1133     tun6_decrypt_node_name = "esp6-decrypt-tun"
1134
1135     def setUp(self):
1136         super(TestIpsec6TunProtect, self).setUp()
1137
1138         self.tun_if = self.pg0
1139
1140     def tearDown(self):
1141         super(TestIpsec6TunProtect, self).tearDown()
1142
1143     def test_tun_66(self):
1144         """IPSEC tunnel protect"""
1145
1146         p = self.ipv6_params
1147
1148         self.config_network(p)
1149         self.config_sa_tra(p)
1150         self.config_protect(p)
1151
1152         self.verify_tun_66(p, count=127)
1153         c = p.tun_if.get_rx_stats()
1154         self.assertEqual(c['packets'], 127)
1155         c = p.tun_if.get_tx_stats()
1156         self.assertEqual(c['packets'], 127)
1157
1158         # rekey - create new SAs and update the tunnel protection
1159         np = copy.copy(p)
1160         np.crypt_key = b'X' + p.crypt_key[1:]
1161         np.scapy_tun_spi += 100
1162         np.scapy_tun_sa_id += 1
1163         np.vpp_tun_spi += 100
1164         np.vpp_tun_sa_id += 1
1165         np.tun_if.local_spi = p.vpp_tun_spi
1166         np.tun_if.remote_spi = p.scapy_tun_spi
1167
1168         self.config_sa_tra(np)
1169         self.config_protect(np)
1170         self.unconfig_sa(p)
1171
1172         self.verify_tun_66(np, count=127)
1173         c = p.tun_if.get_rx_stats()
1174         self.assertEqual(c['packets'], 254)
1175         c = p.tun_if.get_tx_stats()
1176         self.assertEqual(c['packets'], 254)
1177
1178         # 3 phase rekey
1179         #  1) add two input SAs [old, new]
1180         #  2) swap output SA to [new]
1181         #  3) use only [new] input SA
1182         np3 = copy.copy(np)
1183         np3.crypt_key = b'Z' + p.crypt_key[1:]
1184         np3.scapy_tun_spi += 100
1185         np3.scapy_tun_sa_id += 1
1186         np3.vpp_tun_spi += 100
1187         np3.vpp_tun_sa_id += 1
1188         np3.tun_if.local_spi = p.vpp_tun_spi
1189         np3.tun_if.remote_spi = p.scapy_tun_spi
1190
1191         self.config_sa_tra(np3)
1192
1193         # step 1;
1194         p.tun_protect.update_vpp_config(np.tun_sa_out,
1195                                         [np.tun_sa_in, np3.tun_sa_in])
1196         self.verify_tun_66(np, np, count=127)
1197         self.verify_tun_66(np3, np, count=127)
1198
1199         # step 2;
1200         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1201                                         [np.tun_sa_in, np3.tun_sa_in])
1202         self.verify_tun_66(np, np3, count=127)
1203         self.verify_tun_66(np3, np3, count=127)
1204
1205         # step 1;
1206         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1207                                         [np3.tun_sa_in])
1208         self.verify_tun_66(np3, np3, count=127)
1209         self.verify_drop_tun_66(np, count=127)
1210
1211         c = p.tun_if.get_rx_stats()
1212         self.assertEqual(c['packets'], 127*7)
1213         c = p.tun_if.get_tx_stats()
1214         self.assertEqual(c['packets'], 127*7)
1215         self.unconfig_sa(np)
1216
1217         # teardown
1218         self.unconfig_protect(np3)
1219         self.unconfig_sa(np3)
1220         self.unconfig_network(p)
1221
1222     def test_tun_46(self):
1223         """IPSEC tunnel protect"""
1224
1225         p = self.ipv6_params
1226
1227         self.config_network(p)
1228         self.config_sa_tra(p)
1229         self.config_protect(p)
1230
1231         self.verify_tun_46(p, count=127)
1232         c = p.tun_if.get_rx_stats()
1233         self.assertEqual(c['packets'], 127)
1234         c = p.tun_if.get_tx_stats()
1235         self.assertEqual(c['packets'], 127)
1236
1237         # teardown
1238         self.unconfig_protect(p)
1239         self.unconfig_sa(p)
1240         self.unconfig_network(p)
1241
1242
1243 class TestIpsec6TunProtectTun(TemplateIpsec,
1244                               TemplateIpsec6TunProtect,
1245                               IpsecTun6):
1246     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1247
1248     encryption_type = ESP
1249     tun6_encrypt_node_name = "esp6-encrypt-tun"
1250     tun6_decrypt_node_name = "esp6-decrypt-tun"
1251
1252     def setUp(self):
1253         super(TestIpsec6TunProtectTun, self).setUp()
1254
1255         self.tun_if = self.pg0
1256
1257     def tearDown(self):
1258         super(TestIpsec6TunProtectTun, self).tearDown()
1259
1260     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1261                           payload_size=100):
1262         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1263                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1264                                 dst=sw_intf.local_ip6) /
1265                            IPv6(src=src, dst=dst) /
1266                            UDP(sport=1166, dport=2233) /
1267                            Raw(b'X' * payload_size))
1268                 for i in range(count)]
1269
1270     def gen_pkts6(self, sw_intf, src, dst, count=1,
1271                   payload_size=100):
1272         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1273                 IPv6(src=src, dst=dst) /
1274                 UDP(sport=1166, dport=2233) /
1275                 Raw(b'X' * payload_size)
1276                 for i in range(count)]
1277
1278     def verify_decrypted6(self, p, rxs):
1279         for rx in rxs:
1280             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1281             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1282             self.assert_packet_checksums_valid(rx)
1283
1284     def verify_encrypted6(self, p, sa, rxs):
1285         for rx in rxs:
1286             try:
1287                 pkt = sa.decrypt(rx[IPv6])
1288                 if not pkt.haslayer(IPv6):
1289                     pkt = IPv6(pkt[Raw].load)
1290                 self.assert_packet_checksums_valid(pkt)
1291                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1292                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1293                 inner = pkt[IPv6].payload
1294                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1295
1296             except (IndexError, AssertionError):
1297                 self.logger.debug(ppp("Unexpected packet:", rx))
1298                 try:
1299                     self.logger.debug(ppp("Decrypted packet:", pkt))
1300                 except:
1301                     pass
1302                 raise
1303
1304     def test_tun_66(self):
1305         """IPSEC tunnel protect """
1306
1307         p = self.ipv6_params
1308
1309         self.config_network(p)
1310         self.config_sa_tun(p)
1311         self.config_protect(p)
1312
1313         self.verify_tun_66(p, count=127)
1314
1315         c = p.tun_if.get_rx_stats()
1316         self.assertEqual(c['packets'], 127)
1317         c = p.tun_if.get_tx_stats()
1318         self.assertEqual(c['packets'], 127)
1319
1320         # rekey - create new SAs and update the tunnel protection
1321         np = copy.copy(p)
1322         np.crypt_key = b'X' + p.crypt_key[1:]
1323         np.scapy_tun_spi += 100
1324         np.scapy_tun_sa_id += 1
1325         np.vpp_tun_spi += 100
1326         np.vpp_tun_sa_id += 1
1327         np.tun_if.local_spi = p.vpp_tun_spi
1328         np.tun_if.remote_spi = p.scapy_tun_spi
1329
1330         self.config_sa_tun(np)
1331         self.config_protect(np)
1332         self.unconfig_sa(p)
1333
1334         self.verify_tun_66(np, count=127)
1335         c = p.tun_if.get_rx_stats()
1336         self.assertEqual(c['packets'], 254)
1337         c = p.tun_if.get_tx_stats()
1338         self.assertEqual(c['packets'], 254)
1339
1340         # teardown
1341         self.unconfig_protect(np)
1342         self.unconfig_sa(np)
1343         self.unconfig_network(p)
1344
1345
1346 if __name__ == '__main__':
1347     unittest.main(testRunner=VppTestRunner)