tests: GRE over IPSec unit tests
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 from util import ppp
19 from vpp_papi import VppEnum
20
21
22 def config_tun_params(p, encryption_type, tun_if):
23     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
24     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
25                               IPSEC_API_SAD_FLAG_USE_ESN))
26     crypt_key = mk_scapy_crypt_key(p)
27     p.scapy_tun_sa = SecurityAssociation(
28         encryption_type, spi=p.vpp_tun_spi,
29         crypt_algo=p.crypt_algo,
30         crypt_key=crypt_key,
31         auth_algo=p.auth_algo, auth_key=p.auth_key,
32         tunnel_header=ip_class_by_addr_type[p.addr_type](
33             src=tun_if.remote_ip,
34             dst=tun_if.local_ip),
35         nat_t_header=p.nat_header,
36         use_esn=use_esn)
37     p.vpp_tun_sa = SecurityAssociation(
38         encryption_type, spi=p.scapy_tun_spi,
39         crypt_algo=p.crypt_algo,
40         crypt_key=crypt_key,
41         auth_algo=p.auth_algo, auth_key=p.auth_key,
42         tunnel_header=ip_class_by_addr_type[p.addr_type](
43             dst=tun_if.remote_ip,
44             src=tun_if.local_ip),
45         nat_t_header=p.nat_header,
46         use_esn=use_esn)
47
48
49 class TemplateIpsec4TunIfEsp(TemplateIpsec):
50     """ IPsec tunnel interface tests """
51
52     encryption_type = ESP
53
54     @classmethod
55     def setUpClass(cls):
56         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
57
58     @classmethod
59     def tearDownClass(cls):
60         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
61
62     def setUp(self):
63         super(TemplateIpsec4TunIfEsp, self).setUp()
64
65         self.tun_if = self.pg0
66
67         p = self.ipv4_params
68
69         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
70                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
71                                         p.crypt_key, p.crypt_key,
72                                         p.auth_algo_vpp_id, p.auth_key,
73                                         p.auth_key)
74         p.tun_if.add_vpp_config()
75         p.tun_if.admin_up()
76         p.tun_if.config_ip4()
77         p.tun_if.config_ip6()
78         config_tun_params(p, self.encryption_type, p.tun_if)
79
80         r = VppIpRoute(self, p.remote_tun_if_host, 32,
81                        [VppRoutePath(p.tun_if.remote_ip4,
82                                      0xffffffff)])
83         r.add_vpp_config()
84         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
85                        [VppRoutePath(p.tun_if.remote_ip6,
86                                      0xffffffff,
87                                      proto=DpoProto.DPO_PROTO_IP6)])
88         r.add_vpp_config()
89
90     def tearDown(self):
91         super(TemplateIpsec4TunIfEsp, self).tearDown()
92
93
94 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
95     """ IPsec UDP tunnel interface tests """
96
97     tun4_encrypt_node_name = "esp4-encrypt-tun"
98     tun4_decrypt_node_name = "esp4-decrypt-tun"
99     encryption_type = ESP
100
101     @classmethod
102     def setUpClass(cls):
103         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
104
105     @classmethod
106     def tearDownClass(cls):
107         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
108
109     def setUp(self):
110         super(TemplateIpsec4TunIfEspUdp, self).setUp()
111
112         self.tun_if = self.pg0
113
114         p = self.ipv4_params
115         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
116                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
117         p.nat_header = UDP(sport=5454, dport=4500)
118
119         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
120                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
121                                         p.crypt_key, p.crypt_key,
122                                         p.auth_algo_vpp_id, p.auth_key,
123                                         p.auth_key, udp_encap=True)
124         p.tun_if.add_vpp_config()
125         p.tun_if.admin_up()
126         p.tun_if.config_ip4()
127         p.tun_if.config_ip6()
128         config_tun_params(p, self.encryption_type, p.tun_if)
129
130         r = VppIpRoute(self, p.remote_tun_if_host, 32,
131                        [VppRoutePath(p.tun_if.remote_ip4,
132                                      0xffffffff)])
133         r.add_vpp_config()
134         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
135                        [VppRoutePath(p.tun_if.remote_ip6,
136                                      0xffffffff,
137                                      proto=DpoProto.DPO_PROTO_IP6)])
138         r.add_vpp_config()
139
140     def tearDown(self):
141         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
142
143
144 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
145     """ Ipsec ESP - TUN tests """
146     tun4_encrypt_node_name = "esp4-encrypt-tun"
147     tun4_decrypt_node_name = "esp4-decrypt-tun"
148
149     def test_tun_basic64(self):
150         """ ipsec 6o4 tunnel basic test """
151         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
152
153         self.verify_tun_64(self.params[socket.AF_INET], count=1)
154
155     def test_tun_burst64(self):
156         """ ipsec 6o4 tunnel basic test """
157         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
158
159         self.verify_tun_64(self.params[socket.AF_INET], count=257)
160
161     def test_tun_basic_frag44(self):
162         """ ipsec 4o4 tunnel frag basic test """
163         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
164
165         p = self.ipv4_params
166
167         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
168                                        [1500, 0, 0, 0])
169         self.verify_tun_44(self.params[socket.AF_INET],
170                            count=1, payload_size=1800, n_rx=2)
171         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
172                                        [9000, 0, 0, 0])
173
174
175 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
176     """ Ipsec ESP UDP tests """
177
178     tun4_input_node = "ipsec4-tun-input"
179
180     def test_keepalive(self):
181         """ IPSEC NAT Keepalive """
182         self.verify_keepalive(self.ipv4_params)
183
184
185 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
186     """ Ipsec ESP - TCP tests """
187     pass
188
189
190 class TemplateIpsec6TunIfEsp(TemplateIpsec):
191     """ IPsec tunnel interface tests """
192
193     encryption_type = ESP
194
195     def setUp(self):
196         super(TemplateIpsec6TunIfEsp, self).setUp()
197
198         self.tun_if = self.pg0
199
200         p = self.ipv6_params
201         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
202                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
203                                         p.crypt_key, p.crypt_key,
204                                         p.auth_algo_vpp_id, p.auth_key,
205                                         p.auth_key, is_ip6=True)
206         p.tun_if.add_vpp_config()
207         p.tun_if.admin_up()
208         p.tun_if.config_ip6()
209         p.tun_if.config_ip4()
210         config_tun_params(p, self.encryption_type, p.tun_if)
211
212         r = VppIpRoute(self, p.remote_tun_if_host, 128,
213                        [VppRoutePath(p.tun_if.remote_ip6,
214                                      0xffffffff,
215                                      proto=DpoProto.DPO_PROTO_IP6)])
216         r.add_vpp_config()
217         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
218                        [VppRoutePath(p.tun_if.remote_ip4,
219                                      0xffffffff)])
220         r.add_vpp_config()
221
222     def tearDown(self):
223         super(TemplateIpsec6TunIfEsp, self).tearDown()
224
225
226 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
227     """ Ipsec ESP - TUN tests """
228     tun6_encrypt_node_name = "esp6-encrypt-tun"
229     tun6_decrypt_node_name = "esp6-decrypt-tun"
230
231     def test_tun_basic46(self):
232         """ ipsec 4o6 tunnel basic test """
233         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
234         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
235
236     def test_tun_burst46(self):
237         """ ipsec 4o6 tunnel burst test """
238         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
239         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
240
241
242 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
243     """ IPsec IPv4 Multi Tunnel interface """
244
245     encryption_type = ESP
246     tun4_encrypt_node_name = "esp4-encrypt-tun"
247     tun4_decrypt_node_name = "esp4-decrypt-tun"
248
249     def setUp(self):
250         super(TestIpsec4MultiTunIfEsp, self).setUp()
251
252         self.tun_if = self.pg0
253
254         self.multi_params = []
255         self.pg0.generate_remote_hosts(10)
256         self.pg0.configure_ipv4_neighbors()
257
258         for ii in range(10):
259             p = copy.copy(self.ipv4_params)
260
261             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
262             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
263             p.scapy_tun_spi = p.scapy_tun_spi + ii
264             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
265             p.vpp_tun_spi = p.vpp_tun_spi + ii
266
267             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
268             p.scapy_tra_spi = p.scapy_tra_spi + ii
269             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
270             p.vpp_tra_spi = p.vpp_tra_spi + ii
271
272             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
273                                             p.scapy_tun_spi,
274                                             p.crypt_algo_vpp_id,
275                                             p.crypt_key, p.crypt_key,
276                                             p.auth_algo_vpp_id, p.auth_key,
277                                             p.auth_key,
278                                             dst=self.pg0.remote_hosts[ii].ip4)
279             p.tun_if.add_vpp_config()
280             p.tun_if.admin_up()
281             p.tun_if.config_ip4()
282             config_tun_params(p, self.encryption_type, p.tun_if)
283             self.multi_params.append(p)
284
285             VppIpRoute(self, p.remote_tun_if_host, 32,
286                        [VppRoutePath(p.tun_if.remote_ip4,
287                                      0xffffffff)]).add_vpp_config()
288
289     def tearDown(self):
290         super(TestIpsec4MultiTunIfEsp, self).tearDown()
291
292     def test_tun_44(self):
293         """Multiple IPSEC tunnel interfaces """
294         for p in self.multi_params:
295             self.verify_tun_44(p, count=127)
296             c = p.tun_if.get_rx_stats()
297             self.assertEqual(c['packets'], 127)
298             c = p.tun_if.get_tx_stats()
299             self.assertEqual(c['packets'], 127)
300
301
302 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
303     """ IPsec IPv4 Tunnel interface all Algos """
304
305     encryption_type = ESP
306     tun4_encrypt_node_name = "esp4-encrypt-tun"
307     tun4_decrypt_node_name = "esp4-decrypt-tun"
308
309     def config_network(self, p):
310
311         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
312                                         p.scapy_tun_spi,
313                                         p.crypt_algo_vpp_id,
314                                         p.crypt_key, p.crypt_key,
315                                         p.auth_algo_vpp_id, p.auth_key,
316                                         p.auth_key,
317                                         salt=p.salt)
318         p.tun_if.add_vpp_config()
319         p.tun_if.admin_up()
320         p.tun_if.config_ip4()
321         config_tun_params(p, self.encryption_type, p.tun_if)
322         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
323         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
324
325         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
326                              [VppRoutePath(p.tun_if.remote_ip4,
327                                            0xffffffff)])
328         p.route.add_vpp_config()
329
330     def unconfig_network(self, p):
331         p.tun_if.unconfig_ip4()
332         p.tun_if.remove_vpp_config()
333         p.route.remove_vpp_config()
334
335     def setUp(self):
336         super(TestIpsec4TunIfEspAll, self).setUp()
337
338         self.tun_if = self.pg0
339
340     def tearDown(self):
341         super(TestIpsec4TunIfEspAll, self).tearDown()
342
343     def rekey(self, p):
344         #
345         # change the key and the SPI
346         #
347         p.crypt_key = b'X' + p.crypt_key[1:]
348         p.scapy_tun_spi += 1
349         p.scapy_tun_sa_id += 1
350         p.vpp_tun_spi += 1
351         p.vpp_tun_sa_id += 1
352         p.tun_if.local_spi = p.vpp_tun_spi
353         p.tun_if.remote_spi = p.scapy_tun_spi
354
355         config_tun_params(p, self.encryption_type, p.tun_if)
356
357         p.tun_sa_in = VppIpsecSA(self,
358                                  p.scapy_tun_sa_id,
359                                  p.scapy_tun_spi,
360                                  p.auth_algo_vpp_id,
361                                  p.auth_key,
362                                  p.crypt_algo_vpp_id,
363                                  p.crypt_key,
364                                  self.vpp_esp_protocol,
365                                  flags=p.flags,
366                                  salt=p.salt)
367         p.tun_sa_out = VppIpsecSA(self,
368                                   p.vpp_tun_sa_id,
369                                   p.vpp_tun_spi,
370                                   p.auth_algo_vpp_id,
371                                   p.auth_key,
372                                   p.crypt_algo_vpp_id,
373                                   p.crypt_key,
374                                   self.vpp_esp_protocol,
375                                   flags=p.flags,
376                                   salt=p.salt)
377         p.tun_sa_in.add_vpp_config()
378         p.tun_sa_out.add_vpp_config()
379
380         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
381                                          sa_id=p.tun_sa_in.id,
382                                          is_outbound=1)
383         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
384                                          sa_id=p.tun_sa_out.id,
385                                          is_outbound=0)
386         self.logger.info(self.vapi.cli("sh ipsec sa"))
387
388     def test_tun_44(self):
389         """IPSEC tunnel all algos """
390
391         # foreach VPP crypto engine
392         engines = ["ia32", "ipsecmb", "openssl"]
393
394         # foreach crypto algorithm
395         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
396                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
397                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
398                                 IPSEC_API_INTEG_ALG_NONE),
399                   'scapy-crypto': "AES-GCM",
400                   'scapy-integ': "NULL",
401                   'key': b"JPjyOWBeVEQiMe7h",
402                   'salt': 3333},
403                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
404                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
405                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
406                                 IPSEC_API_INTEG_ALG_NONE),
407                   'scapy-crypto': "AES-GCM",
408                   'scapy-integ': "NULL",
409                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
410                   'salt': 0},
411                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
412                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
413                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
414                                 IPSEC_API_INTEG_ALG_NONE),
415                   'scapy-crypto': "AES-GCM",
416                   'scapy-integ': "NULL",
417                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
418                   'salt': 9999},
419                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
420                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
421                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
422                                 IPSEC_API_INTEG_ALG_SHA1_96),
423                   'scapy-crypto': "AES-CBC",
424                   'scapy-integ': "HMAC-SHA1-96",
425                   'salt': 0,
426                   'key': b"JPjyOWBeVEQiMe7h"},
427                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
428                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
429                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
430                                 IPSEC_API_INTEG_ALG_SHA1_96),
431                   'scapy-crypto': "AES-CBC",
432                   'scapy-integ': "HMAC-SHA1-96",
433                   'salt': 0,
434                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
435                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
436                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
437                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
438                                 IPSEC_API_INTEG_ALG_SHA1_96),
439                   'scapy-crypto': "AES-CBC",
440                   'scapy-integ': "HMAC-SHA1-96",
441                   'salt': 0,
442                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
443                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
444                                  IPSEC_API_CRYPTO_ALG_NONE),
445                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
446                                 IPSEC_API_INTEG_ALG_SHA1_96),
447                   'scapy-crypto': "NULL",
448                   'scapy-integ': "HMAC-SHA1-96",
449                   'salt': 0,
450                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
451
452         for engine in engines:
453             self.vapi.cli("set crypto handler all %s" % engine)
454
455             #
456             # loop through each of the algorithms
457             #
458             for algo in algos:
459                 # with self.subTest(algo=algo['scapy']):
460
461                 p = copy.copy(self.ipv4_params)
462                 p.auth_algo_vpp_id = algo['vpp-integ']
463                 p.crypt_algo_vpp_id = algo['vpp-crypto']
464                 p.crypt_algo = algo['scapy-crypto']
465                 p.auth_algo = algo['scapy-integ']
466                 p.crypt_key = algo['key']
467                 p.salt = algo['salt']
468
469                 self.config_network(p)
470
471                 self.verify_tun_44(p, count=127)
472                 c = p.tun_if.get_rx_stats()
473                 self.assertEqual(c['packets'], 127)
474                 c = p.tun_if.get_tx_stats()
475                 self.assertEqual(c['packets'], 127)
476
477                 #
478                 # rekey the tunnel
479                 #
480                 self.rekey(p)
481                 self.verify_tun_44(p, count=127)
482
483                 self.unconfig_network(p)
484                 p.tun_sa_out.remove_vpp_config()
485                 p.tun_sa_in.remove_vpp_config()
486
487
488 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
489     """ IPsec IPv6 Multi Tunnel interface """
490
491     encryption_type = ESP
492     tun6_encrypt_node_name = "esp6-encrypt-tun"
493     tun6_decrypt_node_name = "esp6-decrypt-tun"
494
495     def setUp(self):
496         super(TestIpsec6MultiTunIfEsp, self).setUp()
497
498         self.tun_if = self.pg0
499
500         self.multi_params = []
501         self.pg0.generate_remote_hosts(10)
502         self.pg0.configure_ipv6_neighbors()
503
504         for ii in range(10):
505             p = copy.copy(self.ipv6_params)
506
507             p.remote_tun_if_host = "1111::%d" % (ii + 1)
508             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
509             p.scapy_tun_spi = p.scapy_tun_spi + ii
510             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
511             p.vpp_tun_spi = p.vpp_tun_spi + ii
512
513             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
514             p.scapy_tra_spi = p.scapy_tra_spi + ii
515             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
516             p.vpp_tra_spi = p.vpp_tra_spi + ii
517
518             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
519                                             p.scapy_tun_spi,
520                                             p.crypt_algo_vpp_id,
521                                             p.crypt_key, p.crypt_key,
522                                             p.auth_algo_vpp_id, p.auth_key,
523                                             p.auth_key, is_ip6=True,
524                                             dst=self.pg0.remote_hosts[ii].ip6)
525             p.tun_if.add_vpp_config()
526             p.tun_if.admin_up()
527             p.tun_if.config_ip6()
528             config_tun_params(p, self.encryption_type, p.tun_if)
529             self.multi_params.append(p)
530
531             r = VppIpRoute(self, p.remote_tun_if_host, 128,
532                            [VppRoutePath(p.tun_if.remote_ip6,
533                                          0xffffffff,
534                                          proto=DpoProto.DPO_PROTO_IP6)])
535             r.add_vpp_config()
536
537     def tearDown(self):
538         super(TestIpsec6MultiTunIfEsp, self).tearDown()
539
540     def test_tun_66(self):
541         """Multiple IPSEC tunnel interfaces """
542         for p in self.multi_params:
543             self.verify_tun_66(p, count=127)
544             c = p.tun_if.get_rx_stats()
545             self.assertEqual(c['packets'], 127)
546             c = p.tun_if.get_tx_stats()
547             self.assertEqual(c['packets'], 127)
548
549
550 class TestIpsecGreTebIfEsp(TemplateIpsec,
551                            IpsecTun4Tests):
552     """ Ipsec GRE TEB ESP - TUN tests """
553     tun4_encrypt_node_name = "esp4-encrypt-tun"
554     tun4_decrypt_node_name = "esp4-decrypt-tun"
555     encryption_type = ESP
556     omac = "00:11:22:33:44:55"
557
558     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
559                          payload_size=100):
560         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
561                 sa.encrypt(IP(src=self.pg0.remote_ip4,
562                               dst=self.pg0.local_ip4) /
563                            GRE() /
564                            Ether(dst=self.omac) /
565                            IP(src="1.1.1.1", dst="1.1.1.2") /
566                            UDP(sport=1144, dport=2233) /
567                            Raw(b'X' * payload_size))
568                 for i in range(count)]
569
570     def gen_pkts(self, sw_intf, src, dst, count=1,
571                  payload_size=100):
572         return [Ether(dst=self.omac) /
573                 IP(src="1.1.1.1", dst="1.1.1.2") /
574                 UDP(sport=1144, dport=2233) /
575                 Raw(b'X' * payload_size)
576                 for i in range(count)]
577
578     def verify_decrypted(self, p, rxs):
579         for rx in rxs:
580             self.assert_equal(rx[Ether].dst, self.omac)
581             self.assert_equal(rx[IP].dst, "1.1.1.2")
582
583     def verify_encrypted(self, p, sa, rxs):
584         for rx in rxs:
585             try:
586                 pkt = sa.decrypt(rx[IP])
587                 if not pkt.haslayer(IP):
588                     pkt = IP(pkt[Raw].load)
589                 self.assert_packet_checksums_valid(pkt)
590                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
591                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
592                 self.assertTrue(pkt.haslayer(GRE))
593                 e = pkt[Ether]
594                 self.assertEqual(e[Ether].dst, self.omac)
595                 self.assertEqual(e[IP].dst, "1.1.1.2")
596             except (IndexError, AssertionError):
597                 self.logger.debug(ppp("Unexpected packet:", rx))
598                 try:
599                     self.logger.debug(ppp("Decrypted packet:", pkt))
600                 except:
601                     pass
602                 raise
603
604     def setUp(self):
605         super(TestIpsecGreTebIfEsp, self).setUp()
606
607         self.tun_if = self.pg0
608
609         p = self.ipv4_params
610
611         bd1 = VppBridgeDomain(self, 1)
612         bd1.add_vpp_config()
613
614         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
615                                   p.auth_algo_vpp_id, p.auth_key,
616                                   p.crypt_algo_vpp_id, p.crypt_key,
617                                   self.vpp_esp_protocol,
618                                   self.pg0.local_ip4,
619                                   self.pg0.remote_ip4)
620         p.tun_sa_out.add_vpp_config()
621
622         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
623                                  p.auth_algo_vpp_id, p.auth_key,
624                                  p.crypt_algo_vpp_id, p.crypt_key,
625                                  self.vpp_esp_protocol,
626                                  self.pg0.remote_ip4,
627                                  self.pg0.local_ip4)
628         p.tun_sa_in.add_vpp_config()
629
630         p.tun_if = VppGreInterface(self,
631                                    self.pg0.local_ip4,
632                                    self.pg0.remote_ip4,
633                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
634                                          GRE_API_TUNNEL_TYPE_TEB))
635         p.tun_if.add_vpp_config()
636
637         p.tun_protect = VppIpsecTunProtect(self,
638                                            p.tun_if,
639                                            p.tun_sa_out,
640                                            [p.tun_sa_in])
641
642         p.tun_protect.add_vpp_config()
643
644         p.tun_if.admin_up()
645         p.tun_if.config_ip4()
646         config_tun_params(p, self.encryption_type, p.tun_if)
647
648         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
649         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
650
651         self.vapi.cli("clear ipsec sa")
652
653     def tearDown(self):
654         p = self.ipv4_params
655         p.tun_if.unconfig_ip4()
656         super(TestIpsecGreTebIfEsp, self).tearDown()
657
658
659 class TestIpsecGreIfEsp(TemplateIpsec,
660                         IpsecTun4Tests):
661     """ Ipsec GRE ESP - TUN tests """
662     tun4_encrypt_node_name = "esp4-encrypt-tun"
663     tun4_decrypt_node_name = "esp4-decrypt-tun"
664     encryption_type = ESP
665
666     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
667                          payload_size=100):
668         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
669                 sa.encrypt(IP(src=self.pg0.remote_ip4,
670                               dst=self.pg0.local_ip4) /
671                            GRE() /
672                            IP(src=self.pg1.local_ip4,
673                               dst=self.pg1.remote_ip4) /
674                            UDP(sport=1144, dport=2233) /
675                            Raw(b'X' * payload_size))
676                 for i in range(count)]
677
678     def gen_pkts(self, sw_intf, src, dst, count=1,
679                  payload_size=100):
680         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
681                 IP(src="1.1.1.1", dst="1.1.1.2") /
682                 UDP(sport=1144, dport=2233) /
683                 Raw(b'X' * payload_size)
684                 for i in range(count)]
685
686     def verify_decrypted(self, p, rxs):
687         for rx in rxs:
688             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
689             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
690
691     def verify_encrypted(self, p, sa, rxs):
692         for rx in rxs:
693             try:
694                 pkt = sa.decrypt(rx[IP])
695                 if not pkt.haslayer(IP):
696                     pkt = IP(pkt[Raw].load)
697                 self.assert_packet_checksums_valid(pkt)
698                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
699                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
700                 self.assertTrue(pkt.haslayer(GRE))
701                 e = pkt[GRE]
702                 self.assertEqual(e[IP].dst, "1.1.1.2")
703             except (IndexError, AssertionError):
704                 self.logger.debug(ppp("Unexpected packet:", rx))
705                 try:
706                     self.logger.debug(ppp("Decrypted packet:", pkt))
707                 except:
708                     pass
709                 raise
710
711     def setUp(self):
712         super(TestIpsecGreIfEsp, self).setUp()
713
714         self.tun_if = self.pg0
715
716         p = self.ipv4_params
717
718         bd1 = VppBridgeDomain(self, 1)
719         bd1.add_vpp_config()
720
721         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
722                                   p.auth_algo_vpp_id, p.auth_key,
723                                   p.crypt_algo_vpp_id, p.crypt_key,
724                                   self.vpp_esp_protocol,
725                                   self.pg0.local_ip4,
726                                   self.pg0.remote_ip4)
727         p.tun_sa_out.add_vpp_config()
728
729         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
730                                  p.auth_algo_vpp_id, p.auth_key,
731                                  p.crypt_algo_vpp_id, p.crypt_key,
732                                  self.vpp_esp_protocol,
733                                  self.pg0.remote_ip4,
734                                  self.pg0.local_ip4)
735         p.tun_sa_in.add_vpp_config()
736
737         p.tun_if = VppGreInterface(self,
738                                    self.pg0.local_ip4,
739                                    self.pg0.remote_ip4)
740         p.tun_if.add_vpp_config()
741
742         p.tun_protect = VppIpsecTunProtect(self,
743                                            p.tun_if,
744                                            p.tun_sa_out,
745                                            [p.tun_sa_in])
746         p.tun_protect.add_vpp_config()
747
748         p.tun_if.admin_up()
749         p.tun_if.config_ip4()
750         config_tun_params(p, self.encryption_type, p.tun_if)
751
752         VppIpRoute(self, "1.1.1.2", 32,
753                    [VppRoutePath(p.tun_if.remote_ip4,
754                                  0xffffffff)]).add_vpp_config()
755
756     def tearDown(self):
757         p = self.ipv4_params
758         p.tun_if.unconfig_ip4()
759         super(TestIpsecGreIfEsp, self).tearDown()
760
761
762 class TestIpsecGreIfEspTra(TemplateIpsec,
763                            IpsecTun4Tests):
764     """ Ipsec GRE ESP - TRA tests """
765     tun4_encrypt_node_name = "esp4-encrypt-tun"
766     tun4_decrypt_node_name = "esp4-decrypt-tun"
767     encryption_type = ESP
768
769     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
770                          payload_size=100):
771         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
772                 sa.encrypt(IP(src=self.pg0.remote_ip4,
773                               dst=self.pg0.local_ip4) /
774                            GRE() /
775                            IP(src=self.pg1.local_ip4,
776                               dst=self.pg1.remote_ip4) /
777                            UDP(sport=1144, dport=2233) /
778                            Raw(b'X' * payload_size))
779                 for i in range(count)]
780
781     def gen_pkts(self, sw_intf, src, dst, count=1,
782                  payload_size=100):
783         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
784                 IP(src="1.1.1.1", dst="1.1.1.2") /
785                 UDP(sport=1144, dport=2233) /
786                 Raw(b'X' * payload_size)
787                 for i in range(count)]
788
789     def verify_decrypted(self, p, rxs):
790         for rx in rxs:
791             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
792             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
793
794     def verify_encrypted(self, p, sa, rxs):
795         for rx in rxs:
796             try:
797                 pkt = sa.decrypt(rx[IP])
798                 if not pkt.haslayer(IP):
799                     pkt = IP(pkt[Raw].load)
800                 self.assert_packet_checksums_valid(pkt)
801                 self.assertTrue(pkt.haslayer(GRE))
802                 e = pkt[GRE]
803                 self.assertEqual(e[IP].dst, "1.1.1.2")
804             except (IndexError, AssertionError):
805                 self.logger.debug(ppp("Unexpected packet:", rx))
806                 try:
807                     self.logger.debug(ppp("Decrypted packet:", pkt))
808                 except:
809                     pass
810                 raise
811
812     def setUp(self):
813         super(TestIpsecGreIfEspTra, self).setUp()
814
815         self.tun_if = self.pg0
816
817         p = self.ipv4_params
818
819         bd1 = VppBridgeDomain(self, 1)
820         bd1.add_vpp_config()
821
822         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
823                                   p.auth_algo_vpp_id, p.auth_key,
824                                   p.crypt_algo_vpp_id, p.crypt_key,
825                                   self.vpp_esp_protocol)
826         p.tun_sa_out.add_vpp_config()
827
828         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
829                                  p.auth_algo_vpp_id, p.auth_key,
830                                  p.crypt_algo_vpp_id, p.crypt_key,
831                                  self.vpp_esp_protocol)
832         p.tun_sa_in.add_vpp_config()
833
834         p.tun_if = VppGreInterface(self,
835                                    self.pg0.local_ip4,
836                                    self.pg0.remote_ip4)
837         p.tun_if.add_vpp_config()
838
839         p.tun_protect = VppIpsecTunProtect(self,
840                                            p.tun_if,
841                                            p.tun_sa_out,
842                                            [p.tun_sa_in])
843         p.tun_protect.add_vpp_config()
844
845         p.tun_if.admin_up()
846         p.tun_if.config_ip4()
847         config_tun_params(p, self.encryption_type, p.tun_if)
848
849         VppIpRoute(self, "1.1.1.2", 32,
850                    [VppRoutePath(p.tun_if.remote_ip4,
851                                  0xffffffff)]).add_vpp_config()
852
853     def tearDown(self):
854         p = self.ipv4_params
855         p.tun_if.unconfig_ip4()
856         super(TestIpsecGreIfEspTra, self).tearDown()
857
858
859 class TemplateIpsec4TunProtect(object):
860     """ IPsec IPv4 Tunnel protect """
861
862     encryption_type = ESP
863     tun4_encrypt_node_name = "esp4-encrypt-tun"
864     tun4_decrypt_node_name = "esp4-decrypt-tun"
865     tun4_input_node = "ipsec4-tun-input"
866
867     def config_sa_tra(self, p):
868         config_tun_params(p, self.encryption_type, p.tun_if)
869
870         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
871                                   p.auth_algo_vpp_id, p.auth_key,
872                                   p.crypt_algo_vpp_id, p.crypt_key,
873                                   self.vpp_esp_protocol,
874                                   flags=p.flags)
875         p.tun_sa_out.add_vpp_config()
876
877         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
878                                  p.auth_algo_vpp_id, p.auth_key,
879                                  p.crypt_algo_vpp_id, p.crypt_key,
880                                  self.vpp_esp_protocol,
881                                  flags=p.flags)
882         p.tun_sa_in.add_vpp_config()
883
884     def config_sa_tun(self, p):
885         config_tun_params(p, self.encryption_type, p.tun_if)
886
887         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
888                                   p.auth_algo_vpp_id, p.auth_key,
889                                   p.crypt_algo_vpp_id, p.crypt_key,
890                                   self.vpp_esp_protocol,
891                                   self.tun_if.remote_addr[p.addr_type],
892                                   self.tun_if.local_addr[p.addr_type],
893                                   flags=p.flags)
894         p.tun_sa_out.add_vpp_config()
895
896         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
897                                  p.auth_algo_vpp_id, p.auth_key,
898                                  p.crypt_algo_vpp_id, p.crypt_key,
899                                  self.vpp_esp_protocol,
900                                  self.tun_if.remote_addr[p.addr_type],
901                                  self.tun_if.local_addr[p.addr_type],
902                                  flags=p.flags)
903         p.tun_sa_in.add_vpp_config()
904
905     def config_protect(self, p):
906         p.tun_protect = VppIpsecTunProtect(self,
907                                            p.tun_if,
908                                            p.tun_sa_out,
909                                            [p.tun_sa_in])
910         p.tun_protect.add_vpp_config()
911
912     def config_network(self, p):
913         p.tun_if = VppIpIpTunInterface(self, self.pg0,
914                                        self.pg0.local_ip4,
915                                        self.pg0.remote_ip4)
916         p.tun_if.add_vpp_config()
917         p.tun_if.admin_up()
918         p.tun_if.config_ip4()
919         p.tun_if.config_ip6()
920
921         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
922                              [VppRoutePath(p.tun_if.remote_ip4,
923                                            0xffffffff)])
924         p.route.add_vpp_config()
925         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
926                        [VppRoutePath(p.tun_if.remote_ip6,
927                                      0xffffffff,
928                                      proto=DpoProto.DPO_PROTO_IP6)])
929         r.add_vpp_config()
930
931     def unconfig_network(self, p):
932         p.route.remove_vpp_config()
933         p.tun_if.remove_vpp_config()
934
935     def unconfig_protect(self, p):
936         p.tun_protect.remove_vpp_config()
937
938     def unconfig_sa(self, p):
939         p.tun_sa_out.remove_vpp_config()
940         p.tun_sa_in.remove_vpp_config()
941
942
943 class TestIpsec4TunProtect(TemplateIpsec,
944                            TemplateIpsec4TunProtect,
945                            IpsecTun4):
946     """ IPsec IPv4 Tunnel protect - transport mode"""
947
948     def setUp(self):
949         super(TestIpsec4TunProtect, self).setUp()
950
951         self.tun_if = self.pg0
952
953     def tearDown(self):
954         super(TestIpsec4TunProtect, self).tearDown()
955
956     def test_tun_44(self):
957         """IPSEC tunnel protect"""
958
959         p = self.ipv4_params
960
961         self.config_network(p)
962         self.config_sa_tra(p)
963         self.config_protect(p)
964
965         self.verify_tun_44(p, count=127)
966         c = p.tun_if.get_rx_stats()
967         self.assertEqual(c['packets'], 127)
968         c = p.tun_if.get_tx_stats()
969         self.assertEqual(c['packets'], 127)
970
971         self.vapi.cli("clear ipsec sa")
972         self.verify_tun_64(p, count=127)
973         c = p.tun_if.get_rx_stats()
974         self.assertEqual(c['packets'], 254)
975         c = p.tun_if.get_tx_stats()
976         self.assertEqual(c['packets'], 254)
977
978         # rekey - create new SAs and update the tunnel protection
979         np = copy.copy(p)
980         np.crypt_key = b'X' + p.crypt_key[1:]
981         np.scapy_tun_spi += 100
982         np.scapy_tun_sa_id += 1
983         np.vpp_tun_spi += 100
984         np.vpp_tun_sa_id += 1
985         np.tun_if.local_spi = p.vpp_tun_spi
986         np.tun_if.remote_spi = p.scapy_tun_spi
987
988         self.config_sa_tra(np)
989         self.config_protect(np)
990         self.unconfig_sa(p)
991
992         self.verify_tun_44(np, count=127)
993         c = p.tun_if.get_rx_stats()
994         self.assertEqual(c['packets'], 381)
995         c = p.tun_if.get_tx_stats()
996         self.assertEqual(c['packets'], 381)
997
998         # teardown
999         self.unconfig_protect(np)
1000         self.unconfig_sa(np)
1001         self.unconfig_network(p)
1002
1003
1004 class TestIpsec4TunProtectUdp(TemplateIpsec,
1005                               TemplateIpsec4TunProtect,
1006                               IpsecTun4):
1007     """ IPsec IPv4 Tunnel protect - transport mode"""
1008
1009     def setUp(self):
1010         super(TestIpsec4TunProtectUdp, self).setUp()
1011
1012         self.tun_if = self.pg0
1013
1014         p = self.ipv4_params
1015         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1016                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1017         p.nat_header = UDP(sport=5454, dport=4500)
1018         self.config_network(p)
1019         self.config_sa_tra(p)
1020         self.config_protect(p)
1021
1022     def tearDown(self):
1023         p = self.ipv4_params
1024         self.unconfig_protect(p)
1025         self.unconfig_sa(p)
1026         self.unconfig_network(p)
1027         super(TestIpsec4TunProtectUdp, self).tearDown()
1028
1029     def test_tun_44(self):
1030         """IPSEC UDP tunnel protect"""
1031
1032         p = self.ipv4_params
1033
1034         self.verify_tun_44(p, count=127)
1035         c = p.tun_if.get_rx_stats()
1036         self.assertEqual(c['packets'], 127)
1037         c = p.tun_if.get_tx_stats()
1038         self.assertEqual(c['packets'], 127)
1039
1040     def test_keepalive(self):
1041         """ IPSEC NAT Keepalive """
1042         self.verify_keepalive(self.ipv4_params)
1043
1044
1045 class TestIpsec4TunProtectTun(TemplateIpsec,
1046                               TemplateIpsec4TunProtect,
1047                               IpsecTun4):
1048     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1049
1050     encryption_type = ESP
1051     tun4_encrypt_node_name = "esp4-encrypt-tun"
1052     tun4_decrypt_node_name = "esp4-decrypt-tun"
1053
1054     def setUp(self):
1055         super(TestIpsec4TunProtectTun, self).setUp()
1056
1057         self.tun_if = self.pg0
1058
1059     def tearDown(self):
1060         super(TestIpsec4TunProtectTun, self).tearDown()
1061
1062     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1063                          payload_size=100):
1064         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1065                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1066                               dst=sw_intf.local_ip4) /
1067                            IP(src=src, dst=dst) /
1068                            UDP(sport=1144, dport=2233) /
1069                            Raw(b'X' * payload_size))
1070                 for i in range(count)]
1071
1072     def gen_pkts(self, sw_intf, src, dst, count=1,
1073                  payload_size=100):
1074         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1075                 IP(src=src, dst=dst) /
1076                 UDP(sport=1144, dport=2233) /
1077                 Raw(b'X' * payload_size)
1078                 for i in range(count)]
1079
1080     def verify_decrypted(self, p, rxs):
1081         for rx in rxs:
1082             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1083             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1084             self.assert_packet_checksums_valid(rx)
1085
1086     def verify_encrypted(self, p, sa, rxs):
1087         for rx in rxs:
1088             try:
1089                 pkt = sa.decrypt(rx[IP])
1090                 if not pkt.haslayer(IP):
1091                     pkt = IP(pkt[Raw].load)
1092                 self.assert_packet_checksums_valid(pkt)
1093                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1094                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1095                 inner = pkt[IP].payload
1096                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1097
1098             except (IndexError, AssertionError):
1099                 self.logger.debug(ppp("Unexpected packet:", rx))
1100                 try:
1101                     self.logger.debug(ppp("Decrypted packet:", pkt))
1102                 except:
1103                     pass
1104                 raise
1105
1106     def test_tun_44(self):
1107         """IPSEC tunnel protect """
1108
1109         p = self.ipv4_params
1110
1111         self.config_network(p)
1112         self.config_sa_tun(p)
1113         self.config_protect(p)
1114
1115         self.verify_tun_44(p, count=127)
1116
1117         c = p.tun_if.get_rx_stats()
1118         self.assertEqual(c['packets'], 127)
1119         c = p.tun_if.get_tx_stats()
1120         self.assertEqual(c['packets'], 127)
1121
1122         # rekey - create new SAs and update the tunnel protection
1123         np = copy.copy(p)
1124         np.crypt_key = b'X' + p.crypt_key[1:]
1125         np.scapy_tun_spi += 100
1126         np.scapy_tun_sa_id += 1
1127         np.vpp_tun_spi += 100
1128         np.vpp_tun_sa_id += 1
1129         np.tun_if.local_spi = p.vpp_tun_spi
1130         np.tun_if.remote_spi = p.scapy_tun_spi
1131
1132         self.config_sa_tun(np)
1133         self.config_protect(np)
1134         self.unconfig_sa(p)
1135
1136         self.verify_tun_44(np, count=127)
1137         c = p.tun_if.get_rx_stats()
1138         self.assertEqual(c['packets'], 254)
1139         c = p.tun_if.get_tx_stats()
1140         self.assertEqual(c['packets'], 254)
1141
1142         # teardown
1143         self.unconfig_protect(np)
1144         self.unconfig_sa(np)
1145         self.unconfig_network(p)
1146
1147
1148 class TemplateIpsec6TunProtect(object):
1149     """ IPsec IPv6 Tunnel protect """
1150
1151     def config_sa_tra(self, p):
1152         config_tun_params(p, self.encryption_type, p.tun_if)
1153
1154         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1155                                   p.auth_algo_vpp_id, p.auth_key,
1156                                   p.crypt_algo_vpp_id, p.crypt_key,
1157                                   self.vpp_esp_protocol)
1158         p.tun_sa_out.add_vpp_config()
1159
1160         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1161                                  p.auth_algo_vpp_id, p.auth_key,
1162                                  p.crypt_algo_vpp_id, p.crypt_key,
1163                                  self.vpp_esp_protocol)
1164         p.tun_sa_in.add_vpp_config()
1165
1166     def config_sa_tun(self, p):
1167         config_tun_params(p, self.encryption_type, p.tun_if)
1168
1169         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1170                                   p.auth_algo_vpp_id, p.auth_key,
1171                                   p.crypt_algo_vpp_id, p.crypt_key,
1172                                   self.vpp_esp_protocol,
1173                                   self.tun_if.remote_addr[p.addr_type],
1174                                   self.tun_if.local_addr[p.addr_type])
1175         p.tun_sa_out.add_vpp_config()
1176
1177         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1178                                  p.auth_algo_vpp_id, p.auth_key,
1179                                  p.crypt_algo_vpp_id, p.crypt_key,
1180                                  self.vpp_esp_protocol,
1181                                  self.tun_if.remote_addr[p.addr_type],
1182                                  self.tun_if.local_addr[p.addr_type])
1183         p.tun_sa_in.add_vpp_config()
1184
1185     def config_protect(self, p):
1186         p.tun_protect = VppIpsecTunProtect(self,
1187                                            p.tun_if,
1188                                            p.tun_sa_out,
1189                                            [p.tun_sa_in])
1190         p.tun_protect.add_vpp_config()
1191
1192     def config_network(self, p):
1193         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1194                                        self.pg0.local_ip6,
1195                                        self.pg0.remote_ip6)
1196         p.tun_if.add_vpp_config()
1197         p.tun_if.admin_up()
1198         p.tun_if.config_ip6()
1199         p.tun_if.config_ip4()
1200
1201         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1202                              [VppRoutePath(p.tun_if.remote_ip6,
1203                                            0xffffffff,
1204                                            proto=DpoProto.DPO_PROTO_IP6)])
1205         p.route.add_vpp_config()
1206         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1207                        [VppRoutePath(p.tun_if.remote_ip4,
1208                                      0xffffffff)])
1209         r.add_vpp_config()
1210
1211     def unconfig_network(self, p):
1212         p.route.remove_vpp_config()
1213         p.tun_if.remove_vpp_config()
1214
1215     def unconfig_protect(self, p):
1216         p.tun_protect.remove_vpp_config()
1217
1218     def unconfig_sa(self, p):
1219         p.tun_sa_out.remove_vpp_config()
1220         p.tun_sa_in.remove_vpp_config()
1221
1222
1223 class TestIpsec6TunProtect(TemplateIpsec,
1224                            TemplateIpsec6TunProtect,
1225                            IpsecTun6):
1226     """ IPsec IPv6 Tunnel protect - transport mode"""
1227
1228     encryption_type = ESP
1229     tun6_encrypt_node_name = "esp6-encrypt-tun"
1230     tun6_decrypt_node_name = "esp6-decrypt-tun"
1231
1232     def setUp(self):
1233         super(TestIpsec6TunProtect, self).setUp()
1234
1235         self.tun_if = self.pg0
1236
1237     def tearDown(self):
1238         super(TestIpsec6TunProtect, self).tearDown()
1239
1240     def test_tun_66(self):
1241         """IPSEC tunnel protect"""
1242
1243         p = self.ipv6_params
1244
1245         self.config_network(p)
1246         self.config_sa_tra(p)
1247         self.config_protect(p)
1248
1249         self.verify_tun_66(p, count=127)
1250         c = p.tun_if.get_rx_stats()
1251         self.assertEqual(c['packets'], 127)
1252         c = p.tun_if.get_tx_stats()
1253         self.assertEqual(c['packets'], 127)
1254
1255         # rekey - create new SAs and update the tunnel protection
1256         np = copy.copy(p)
1257         np.crypt_key = b'X' + p.crypt_key[1:]
1258         np.scapy_tun_spi += 100
1259         np.scapy_tun_sa_id += 1
1260         np.vpp_tun_spi += 100
1261         np.vpp_tun_sa_id += 1
1262         np.tun_if.local_spi = p.vpp_tun_spi
1263         np.tun_if.remote_spi = p.scapy_tun_spi
1264
1265         self.config_sa_tra(np)
1266         self.config_protect(np)
1267         self.unconfig_sa(p)
1268
1269         self.verify_tun_66(np, count=127)
1270         c = p.tun_if.get_rx_stats()
1271         self.assertEqual(c['packets'], 254)
1272         c = p.tun_if.get_tx_stats()
1273         self.assertEqual(c['packets'], 254)
1274
1275         # 3 phase rekey
1276         #  1) add two input SAs [old, new]
1277         #  2) swap output SA to [new]
1278         #  3) use only [new] input SA
1279         np3 = copy.copy(np)
1280         np3.crypt_key = b'Z' + p.crypt_key[1:]
1281         np3.scapy_tun_spi += 100
1282         np3.scapy_tun_sa_id += 1
1283         np3.vpp_tun_spi += 100
1284         np3.vpp_tun_sa_id += 1
1285         np3.tun_if.local_spi = p.vpp_tun_spi
1286         np3.tun_if.remote_spi = p.scapy_tun_spi
1287
1288         self.config_sa_tra(np3)
1289
1290         # step 1;
1291         p.tun_protect.update_vpp_config(np.tun_sa_out,
1292                                         [np.tun_sa_in, np3.tun_sa_in])
1293         self.verify_tun_66(np, np, count=127)
1294         self.verify_tun_66(np3, np, count=127)
1295
1296         # step 2;
1297         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1298                                         [np.tun_sa_in, np3.tun_sa_in])
1299         self.verify_tun_66(np, np3, count=127)
1300         self.verify_tun_66(np3, np3, count=127)
1301
1302         # step 1;
1303         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1304                                         [np3.tun_sa_in])
1305         self.verify_tun_66(np3, np3, count=127)
1306         self.verify_drop_tun_66(np, count=127)
1307
1308         c = p.tun_if.get_rx_stats()
1309         self.assertEqual(c['packets'], 127*7)
1310         c = p.tun_if.get_tx_stats()
1311         self.assertEqual(c['packets'], 127*7)
1312         self.unconfig_sa(np)
1313
1314         # teardown
1315         self.unconfig_protect(np3)
1316         self.unconfig_sa(np3)
1317         self.unconfig_network(p)
1318
1319     def test_tun_46(self):
1320         """IPSEC tunnel protect"""
1321
1322         p = self.ipv6_params
1323
1324         self.config_network(p)
1325         self.config_sa_tra(p)
1326         self.config_protect(p)
1327
1328         self.verify_tun_46(p, count=127)
1329         c = p.tun_if.get_rx_stats()
1330         self.assertEqual(c['packets'], 127)
1331         c = p.tun_if.get_tx_stats()
1332         self.assertEqual(c['packets'], 127)
1333
1334         # teardown
1335         self.unconfig_protect(p)
1336         self.unconfig_sa(p)
1337         self.unconfig_network(p)
1338
1339
1340 class TestIpsec6TunProtectTun(TemplateIpsec,
1341                               TemplateIpsec6TunProtect,
1342                               IpsecTun6):
1343     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1344
1345     encryption_type = ESP
1346     tun6_encrypt_node_name = "esp6-encrypt-tun"
1347     tun6_decrypt_node_name = "esp6-decrypt-tun"
1348
1349     def setUp(self):
1350         super(TestIpsec6TunProtectTun, self).setUp()
1351
1352         self.tun_if = self.pg0
1353
1354     def tearDown(self):
1355         super(TestIpsec6TunProtectTun, self).tearDown()
1356
1357     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1358                           payload_size=100):
1359         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1360                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1361                                 dst=sw_intf.local_ip6) /
1362                            IPv6(src=src, dst=dst) /
1363                            UDP(sport=1166, dport=2233) /
1364                            Raw(b'X' * payload_size))
1365                 for i in range(count)]
1366
1367     def gen_pkts6(self, sw_intf, src, dst, count=1,
1368                   payload_size=100):
1369         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1370                 IPv6(src=src, dst=dst) /
1371                 UDP(sport=1166, dport=2233) /
1372                 Raw(b'X' * payload_size)
1373                 for i in range(count)]
1374
1375     def verify_decrypted6(self, p, rxs):
1376         for rx in rxs:
1377             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1378             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1379             self.assert_packet_checksums_valid(rx)
1380
1381     def verify_encrypted6(self, p, sa, rxs):
1382         for rx in rxs:
1383             try:
1384                 pkt = sa.decrypt(rx[IPv6])
1385                 if not pkt.haslayer(IPv6):
1386                     pkt = IPv6(pkt[Raw].load)
1387                 self.assert_packet_checksums_valid(pkt)
1388                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1389                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1390                 inner = pkt[IPv6].payload
1391                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1392
1393             except (IndexError, AssertionError):
1394                 self.logger.debug(ppp("Unexpected packet:", rx))
1395                 try:
1396                     self.logger.debug(ppp("Decrypted packet:", pkt))
1397                 except:
1398                     pass
1399                 raise
1400
1401     def test_tun_66(self):
1402         """IPSEC tunnel protect """
1403
1404         p = self.ipv6_params
1405
1406         self.config_network(p)
1407         self.config_sa_tun(p)
1408         self.config_protect(p)
1409
1410         self.verify_tun_66(p, count=127)
1411
1412         c = p.tun_if.get_rx_stats()
1413         self.assertEqual(c['packets'], 127)
1414         c = p.tun_if.get_tx_stats()
1415         self.assertEqual(c['packets'], 127)
1416
1417         # rekey - create new SAs and update the tunnel protection
1418         np = copy.copy(p)
1419         np.crypt_key = b'X' + p.crypt_key[1:]
1420         np.scapy_tun_spi += 100
1421         np.scapy_tun_sa_id += 1
1422         np.vpp_tun_spi += 100
1423         np.vpp_tun_sa_id += 1
1424         np.tun_if.local_spi = p.vpp_tun_spi
1425         np.tun_if.remote_spi = p.scapy_tun_spi
1426
1427         self.config_sa_tun(np)
1428         self.config_protect(np)
1429         self.unconfig_sa(p)
1430
1431         self.verify_tun_66(np, count=127)
1432         c = p.tun_if.get_rx_stats()
1433         self.assertEqual(c['packets'], 254)
1434         c = p.tun_if.get_tx_stats()
1435         self.assertEqual(c['packets'], 254)
1436
1437         # teardown
1438         self.unconfig_protect(np)
1439         self.unconfig_sa(np)
1440         self.unconfig_network(p)
1441
1442
1443 if __name__ == '__main__':
1444     unittest.main(testRunner=VppTestRunner)