ipsec: Fix decap of IPSEC/GRE in transport mode
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key
13 from vpp_ipsec_tun_interface import VppIpsecTunInterface
14 from vpp_gre_interface import VppGreInterface
15 from vpp_ipip_tun_interface import VppIpIpTunInterface
16 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
17 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
18 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from util import ppp
20 from vpp_papi import VppEnum
21
22
23 def config_tun_params(p, encryption_type, tun_if):
24     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
25     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
26                              IPSEC_API_SAD_FLAG_USE_ESN))
27     crypt_key = mk_scapy_crypt_key(p)
28     p.scapy_tun_sa = SecurityAssociation(
29         encryption_type, spi=p.vpp_tun_spi,
30         crypt_algo=p.crypt_algo,
31         crypt_key=crypt_key,
32         auth_algo=p.auth_algo, auth_key=p.auth_key,
33         tunnel_header=ip_class_by_addr_type[p.addr_type](
34             src=tun_if.remote_ip,
35             dst=tun_if.local_ip),
36         nat_t_header=p.nat_header,
37         esn_en=esn_en)
38     p.vpp_tun_sa = SecurityAssociation(
39         encryption_type, spi=p.scapy_tun_spi,
40         crypt_algo=p.crypt_algo,
41         crypt_key=crypt_key,
42         auth_algo=p.auth_algo, auth_key=p.auth_key,
43         tunnel_header=ip_class_by_addr_type[p.addr_type](
44             dst=tun_if.remote_ip,
45             src=tun_if.local_ip),
46         nat_t_header=p.nat_header,
47         esn_en=esn_en)
48
49
50 def config_tra_params(p, encryption_type, tun_if):
51     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
52     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
53                              IPSEC_API_SAD_FLAG_USE_ESN))
54     crypt_key = mk_scapy_crypt_key(p)
55     p.scapy_tun_sa = SecurityAssociation(
56         encryption_type, spi=p.vpp_tun_spi,
57         crypt_algo=p.crypt_algo,
58         crypt_key=crypt_key,
59         auth_algo=p.auth_algo, auth_key=p.auth_key,
60         esn_en=esn_en)
61     p.vpp_tun_sa = SecurityAssociation(
62         encryption_type, spi=p.scapy_tun_spi,
63         crypt_algo=p.crypt_algo,
64         crypt_key=crypt_key,
65         auth_algo=p.auth_algo, auth_key=p.auth_key,
66         esn_en=esn_en)
67
68
69 class TemplateIpsec4TunIfEsp(TemplateIpsec):
70     """ IPsec tunnel interface tests """
71
72     encryption_type = ESP
73
74     @classmethod
75     def setUpClass(cls):
76         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
77
78     @classmethod
79     def tearDownClass(cls):
80         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
81
82     def setUp(self):
83         super(TemplateIpsec4TunIfEsp, self).setUp()
84
85         self.tun_if = self.pg0
86
87         p = self.ipv4_params
88
89         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
90                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
91                                         p.crypt_key, p.crypt_key,
92                                         p.auth_algo_vpp_id, p.auth_key,
93                                         p.auth_key)
94         p.tun_if.add_vpp_config()
95         p.tun_if.admin_up()
96         p.tun_if.config_ip4()
97         p.tun_if.config_ip6()
98         config_tun_params(p, self.encryption_type, p.tun_if)
99
100         r = VppIpRoute(self, p.remote_tun_if_host, 32,
101                        [VppRoutePath(p.tun_if.remote_ip4,
102                                      0xffffffff)])
103         r.add_vpp_config()
104         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
105                        [VppRoutePath(p.tun_if.remote_ip6,
106                                      0xffffffff,
107                                      proto=DpoProto.DPO_PROTO_IP6)])
108         r.add_vpp_config()
109
110     def tearDown(self):
111         super(TemplateIpsec4TunIfEsp, self).tearDown()
112
113
114 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
115     """ IPsec UDP tunnel interface tests """
116
117     tun4_encrypt_node_name = "esp4-encrypt-tun"
118     tun4_decrypt_node_name = "esp4-decrypt-tun"
119     encryption_type = ESP
120
121     @classmethod
122     def setUpClass(cls):
123         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
124
125     @classmethod
126     def tearDownClass(cls):
127         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
128
129     def setUp(self):
130         super(TemplateIpsec4TunIfEspUdp, self).setUp()
131
132         self.tun_if = self.pg0
133
134         p = self.ipv4_params
135         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
136                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
137         p.nat_header = UDP(sport=5454, dport=4500)
138
139         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
140                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
141                                         p.crypt_key, p.crypt_key,
142                                         p.auth_algo_vpp_id, p.auth_key,
143                                         p.auth_key, udp_encap=True)
144         p.tun_if.add_vpp_config()
145         p.tun_if.admin_up()
146         p.tun_if.config_ip4()
147         p.tun_if.config_ip6()
148         config_tun_params(p, self.encryption_type, p.tun_if)
149
150         r = VppIpRoute(self, p.remote_tun_if_host, 32,
151                        [VppRoutePath(p.tun_if.remote_ip4,
152                                      0xffffffff)])
153         r.add_vpp_config()
154         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
155                        [VppRoutePath(p.tun_if.remote_ip6,
156                                      0xffffffff,
157                                      proto=DpoProto.DPO_PROTO_IP6)])
158         r.add_vpp_config()
159
160     def tearDown(self):
161         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
162
163
164 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
165     """ Ipsec ESP - TUN tests """
166     tun4_encrypt_node_name = "esp4-encrypt-tun"
167     tun4_decrypt_node_name = "esp4-decrypt-tun"
168
169     def test_tun_basic64(self):
170         """ ipsec 6o4 tunnel basic test """
171         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
172
173         self.verify_tun_64(self.params[socket.AF_INET], count=1)
174
175     def test_tun_burst64(self):
176         """ ipsec 6o4 tunnel basic test """
177         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
178
179         self.verify_tun_64(self.params[socket.AF_INET], count=257)
180
181     def test_tun_basic_frag44(self):
182         """ ipsec 4o4 tunnel frag basic test """
183         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
184
185         p = self.ipv4_params
186
187         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
188                                        [1500, 0, 0, 0])
189         self.verify_tun_44(self.params[socket.AF_INET],
190                            count=1, payload_size=1800, n_rx=2)
191         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
192                                        [9000, 0, 0, 0])
193
194
195 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
196     """ Ipsec ESP UDP tests """
197
198     tun4_input_node = "ipsec4-tun-input"
199
200     def test_keepalive(self):
201         """ IPSEC NAT Keepalive """
202         self.verify_keepalive(self.ipv4_params)
203
204
205 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
206     """ Ipsec ESP - TCP tests """
207     pass
208
209
210 class TemplateIpsec6TunIfEsp(TemplateIpsec):
211     """ IPsec tunnel interface tests """
212
213     encryption_type = ESP
214
215     def setUp(self):
216         super(TemplateIpsec6TunIfEsp, self).setUp()
217
218         self.tun_if = self.pg0
219
220         p = self.ipv6_params
221         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
222                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
223                                         p.crypt_key, p.crypt_key,
224                                         p.auth_algo_vpp_id, p.auth_key,
225                                         p.auth_key, is_ip6=True)
226         p.tun_if.add_vpp_config()
227         p.tun_if.admin_up()
228         p.tun_if.config_ip6()
229         p.tun_if.config_ip4()
230         config_tun_params(p, self.encryption_type, p.tun_if)
231
232         r = VppIpRoute(self, p.remote_tun_if_host, 128,
233                        [VppRoutePath(p.tun_if.remote_ip6,
234                                      0xffffffff,
235                                      proto=DpoProto.DPO_PROTO_IP6)])
236         r.add_vpp_config()
237         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
238                        [VppRoutePath(p.tun_if.remote_ip4,
239                                      0xffffffff)])
240         r.add_vpp_config()
241
242     def tearDown(self):
243         super(TemplateIpsec6TunIfEsp, self).tearDown()
244
245
246 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
247     """ Ipsec ESP - TUN tests """
248     tun6_encrypt_node_name = "esp6-encrypt-tun"
249     tun6_decrypt_node_name = "esp6-decrypt-tun"
250
251     def test_tun_basic46(self):
252         """ ipsec 4o6 tunnel basic test """
253         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
254         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
255
256     def test_tun_burst46(self):
257         """ ipsec 4o6 tunnel burst test """
258         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
259         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
260
261
262 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
263     """ IPsec IPv4 Multi Tunnel interface """
264
265     encryption_type = ESP
266     tun4_encrypt_node_name = "esp4-encrypt-tun"
267     tun4_decrypt_node_name = "esp4-decrypt-tun"
268
269     def setUp(self):
270         super(TestIpsec4MultiTunIfEsp, self).setUp()
271
272         self.tun_if = self.pg0
273
274         self.multi_params = []
275         self.pg0.generate_remote_hosts(10)
276         self.pg0.configure_ipv4_neighbors()
277
278         for ii in range(10):
279             p = copy.copy(self.ipv4_params)
280
281             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
282             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
283             p.scapy_tun_spi = p.scapy_tun_spi + ii
284             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
285             p.vpp_tun_spi = p.vpp_tun_spi + ii
286
287             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
288             p.scapy_tra_spi = p.scapy_tra_spi + ii
289             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
290             p.vpp_tra_spi = p.vpp_tra_spi + ii
291
292             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
293                                             p.scapy_tun_spi,
294                                             p.crypt_algo_vpp_id,
295                                             p.crypt_key, p.crypt_key,
296                                             p.auth_algo_vpp_id, p.auth_key,
297                                             p.auth_key,
298                                             dst=self.pg0.remote_hosts[ii].ip4)
299             p.tun_if.add_vpp_config()
300             p.tun_if.admin_up()
301             p.tun_if.config_ip4()
302             config_tun_params(p, self.encryption_type, p.tun_if)
303             self.multi_params.append(p)
304
305             VppIpRoute(self, p.remote_tun_if_host, 32,
306                        [VppRoutePath(p.tun_if.remote_ip4,
307                                      0xffffffff)]).add_vpp_config()
308
309     def tearDown(self):
310         super(TestIpsec4MultiTunIfEsp, self).tearDown()
311
312     def test_tun_44(self):
313         """Multiple IPSEC tunnel interfaces """
314         for p in self.multi_params:
315             self.verify_tun_44(p, count=127)
316             c = p.tun_if.get_rx_stats()
317             self.assertEqual(c['packets'], 127)
318             c = p.tun_if.get_tx_stats()
319             self.assertEqual(c['packets'], 127)
320
321
322 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
323     """ IPsec IPv4 Tunnel interface all Algos """
324
325     encryption_type = ESP
326     tun4_encrypt_node_name = "esp4-encrypt-tun"
327     tun4_decrypt_node_name = "esp4-decrypt-tun"
328
329     def config_network(self, p):
330
331         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
332                                         p.scapy_tun_spi,
333                                         p.crypt_algo_vpp_id,
334                                         p.crypt_key, p.crypt_key,
335                                         p.auth_algo_vpp_id, p.auth_key,
336                                         p.auth_key,
337                                         salt=p.salt)
338         p.tun_if.add_vpp_config()
339         p.tun_if.admin_up()
340         p.tun_if.config_ip4()
341         config_tun_params(p, self.encryption_type, p.tun_if)
342         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
343         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
344
345         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
346                              [VppRoutePath(p.tun_if.remote_ip4,
347                                            0xffffffff)])
348         p.route.add_vpp_config()
349
350     def unconfig_network(self, p):
351         p.tun_if.unconfig_ip4()
352         p.tun_if.remove_vpp_config()
353         p.route.remove_vpp_config()
354
355     def setUp(self):
356         super(TestIpsec4TunIfEspAll, self).setUp()
357
358         self.tun_if = self.pg0
359
360     def tearDown(self):
361         super(TestIpsec4TunIfEspAll, self).tearDown()
362
363     def rekey(self, p):
364         #
365         # change the key and the SPI
366         #
367         p.crypt_key = b'X' + p.crypt_key[1:]
368         p.scapy_tun_spi += 1
369         p.scapy_tun_sa_id += 1
370         p.vpp_tun_spi += 1
371         p.vpp_tun_sa_id += 1
372         p.tun_if.local_spi = p.vpp_tun_spi
373         p.tun_if.remote_spi = p.scapy_tun_spi
374
375         config_tun_params(p, self.encryption_type, p.tun_if)
376
377         p.tun_sa_in = VppIpsecSA(self,
378                                  p.scapy_tun_sa_id,
379                                  p.scapy_tun_spi,
380                                  p.auth_algo_vpp_id,
381                                  p.auth_key,
382                                  p.crypt_algo_vpp_id,
383                                  p.crypt_key,
384                                  self.vpp_esp_protocol,
385                                  flags=p.flags,
386                                  salt=p.salt)
387         p.tun_sa_out = VppIpsecSA(self,
388                                   p.vpp_tun_sa_id,
389                                   p.vpp_tun_spi,
390                                   p.auth_algo_vpp_id,
391                                   p.auth_key,
392                                   p.crypt_algo_vpp_id,
393                                   p.crypt_key,
394                                   self.vpp_esp_protocol,
395                                   flags=p.flags,
396                                   salt=p.salt)
397         p.tun_sa_in.add_vpp_config()
398         p.tun_sa_out.add_vpp_config()
399
400         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
401                                          sa_id=p.tun_sa_in.id,
402                                          is_outbound=1)
403         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
404                                          sa_id=p.tun_sa_out.id,
405                                          is_outbound=0)
406         self.logger.info(self.vapi.cli("sh ipsec sa"))
407
408     def test_tun_44(self):
409         """IPSEC tunnel all algos """
410
411         # foreach VPP crypto engine
412         engines = ["ia32", "ipsecmb", "openssl"]
413
414         # foreach crypto algorithm
415         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
416                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
417                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
418                                 IPSEC_API_INTEG_ALG_NONE),
419                   'scapy-crypto': "AES-GCM",
420                   'scapy-integ': "NULL",
421                   'key': b"JPjyOWBeVEQiMe7h",
422                   'salt': 3333},
423                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
424                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
425                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
426                                 IPSEC_API_INTEG_ALG_NONE),
427                   'scapy-crypto': "AES-GCM",
428                   'scapy-integ': "NULL",
429                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
430                   'salt': 0},
431                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
432                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
433                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
434                                 IPSEC_API_INTEG_ALG_NONE),
435                   'scapy-crypto': "AES-GCM",
436                   'scapy-integ': "NULL",
437                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
438                   'salt': 9999},
439                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
440                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
441                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
442                                 IPSEC_API_INTEG_ALG_SHA1_96),
443                   'scapy-crypto': "AES-CBC",
444                   'scapy-integ': "HMAC-SHA1-96",
445                   'salt': 0,
446                   'key': b"JPjyOWBeVEQiMe7h"},
447                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
448                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
449                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
450                                 IPSEC_API_INTEG_ALG_SHA1_96),
451                   'scapy-crypto': "AES-CBC",
452                   'scapy-integ': "HMAC-SHA1-96",
453                   'salt': 0,
454                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
455                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
456                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
457                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
458                                 IPSEC_API_INTEG_ALG_SHA1_96),
459                   'scapy-crypto': "AES-CBC",
460                   'scapy-integ': "HMAC-SHA1-96",
461                   'salt': 0,
462                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
463                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
464                                  IPSEC_API_CRYPTO_ALG_NONE),
465                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
466                                 IPSEC_API_INTEG_ALG_SHA1_96),
467                   'scapy-crypto': "NULL",
468                   'scapy-integ': "HMAC-SHA1-96",
469                   'salt': 0,
470                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
471
472         for engine in engines:
473             self.vapi.cli("set crypto handler all %s" % engine)
474
475             #
476             # loop through each of the algorithms
477             #
478             for algo in algos:
479                 # with self.subTest(algo=algo['scapy']):
480
481                 p = copy.copy(self.ipv4_params)
482                 p.auth_algo_vpp_id = algo['vpp-integ']
483                 p.crypt_algo_vpp_id = algo['vpp-crypto']
484                 p.crypt_algo = algo['scapy-crypto']
485                 p.auth_algo = algo['scapy-integ']
486                 p.crypt_key = algo['key']
487                 p.salt = algo['salt']
488
489                 self.config_network(p)
490
491                 self.verify_tun_44(p, count=127)
492                 c = p.tun_if.get_rx_stats()
493                 self.assertEqual(c['packets'], 127)
494                 c = p.tun_if.get_tx_stats()
495                 self.assertEqual(c['packets'], 127)
496
497                 #
498                 # rekey the tunnel
499                 #
500                 self.rekey(p)
501                 self.verify_tun_44(p, count=127)
502
503                 self.unconfig_network(p)
504                 p.tun_sa_out.remove_vpp_config()
505                 p.tun_sa_in.remove_vpp_config()
506
507
508 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
509     """ IPsec IPv6 Multi Tunnel interface """
510
511     encryption_type = ESP
512     tun6_encrypt_node_name = "esp6-encrypt-tun"
513     tun6_decrypt_node_name = "esp6-decrypt-tun"
514
515     def setUp(self):
516         super(TestIpsec6MultiTunIfEsp, self).setUp()
517
518         self.tun_if = self.pg0
519
520         self.multi_params = []
521         self.pg0.generate_remote_hosts(10)
522         self.pg0.configure_ipv6_neighbors()
523
524         for ii in range(10):
525             p = copy.copy(self.ipv6_params)
526
527             p.remote_tun_if_host = "1111::%d" % (ii + 1)
528             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
529             p.scapy_tun_spi = p.scapy_tun_spi + ii
530             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
531             p.vpp_tun_spi = p.vpp_tun_spi + ii
532
533             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
534             p.scapy_tra_spi = p.scapy_tra_spi + ii
535             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
536             p.vpp_tra_spi = p.vpp_tra_spi + ii
537
538             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
539                                             p.scapy_tun_spi,
540                                             p.crypt_algo_vpp_id,
541                                             p.crypt_key, p.crypt_key,
542                                             p.auth_algo_vpp_id, p.auth_key,
543                                             p.auth_key, is_ip6=True,
544                                             dst=self.pg0.remote_hosts[ii].ip6)
545             p.tun_if.add_vpp_config()
546             p.tun_if.admin_up()
547             p.tun_if.config_ip6()
548             config_tun_params(p, self.encryption_type, p.tun_if)
549             self.multi_params.append(p)
550
551             r = VppIpRoute(self, p.remote_tun_if_host, 128,
552                            [VppRoutePath(p.tun_if.remote_ip6,
553                                          0xffffffff,
554                                          proto=DpoProto.DPO_PROTO_IP6)])
555             r.add_vpp_config()
556
557     def tearDown(self):
558         super(TestIpsec6MultiTunIfEsp, self).tearDown()
559
560     def test_tun_66(self):
561         """Multiple IPSEC tunnel interfaces """
562         for p in self.multi_params:
563             self.verify_tun_66(p, count=127)
564             c = p.tun_if.get_rx_stats()
565             self.assertEqual(c['packets'], 127)
566             c = p.tun_if.get_tx_stats()
567             self.assertEqual(c['packets'], 127)
568
569
570 class TestIpsecGreTebIfEsp(TemplateIpsec,
571                            IpsecTun4Tests):
572     """ Ipsec GRE TEB ESP - TUN tests """
573     tun4_encrypt_node_name = "esp4-encrypt-tun"
574     tun4_decrypt_node_name = "esp4-decrypt-tun"
575     encryption_type = ESP
576     omac = "00:11:22:33:44:55"
577
578     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
579                          payload_size=100):
580         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
581                 sa.encrypt(IP(src=self.pg0.remote_ip4,
582                               dst=self.pg0.local_ip4) /
583                            GRE() /
584                            Ether(dst=self.omac) /
585                            IP(src="1.1.1.1", dst="1.1.1.2") /
586                            UDP(sport=1144, dport=2233) /
587                            Raw(b'X' * payload_size))
588                 for i in range(count)]
589
590     def gen_pkts(self, sw_intf, src, dst, count=1,
591                  payload_size=100):
592         return [Ether(dst=self.omac) /
593                 IP(src="1.1.1.1", dst="1.1.1.2") /
594                 UDP(sport=1144, dport=2233) /
595                 Raw(b'X' * payload_size)
596                 for i in range(count)]
597
598     def verify_decrypted(self, p, rxs):
599         for rx in rxs:
600             self.assert_equal(rx[Ether].dst, self.omac)
601             self.assert_equal(rx[IP].dst, "1.1.1.2")
602
603     def verify_encrypted(self, p, sa, rxs):
604         for rx in rxs:
605             try:
606                 pkt = sa.decrypt(rx[IP])
607                 if not pkt.haslayer(IP):
608                     pkt = IP(pkt[Raw].load)
609                 self.assert_packet_checksums_valid(pkt)
610                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
611                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
612                 self.assertTrue(pkt.haslayer(GRE))
613                 e = pkt[Ether]
614                 self.assertEqual(e[Ether].dst, self.omac)
615                 self.assertEqual(e[IP].dst, "1.1.1.2")
616             except (IndexError, AssertionError):
617                 self.logger.debug(ppp("Unexpected packet:", rx))
618                 try:
619                     self.logger.debug(ppp("Decrypted packet:", pkt))
620                 except:
621                     pass
622                 raise
623
624     def setUp(self):
625         super(TestIpsecGreTebIfEsp, self).setUp()
626
627         self.tun_if = self.pg0
628
629         p = self.ipv4_params
630
631         bd1 = VppBridgeDomain(self, 1)
632         bd1.add_vpp_config()
633
634         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
635                                   p.auth_algo_vpp_id, p.auth_key,
636                                   p.crypt_algo_vpp_id, p.crypt_key,
637                                   self.vpp_esp_protocol,
638                                   self.pg0.local_ip4,
639                                   self.pg0.remote_ip4)
640         p.tun_sa_out.add_vpp_config()
641
642         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
643                                  p.auth_algo_vpp_id, p.auth_key,
644                                  p.crypt_algo_vpp_id, p.crypt_key,
645                                  self.vpp_esp_protocol,
646                                  self.pg0.remote_ip4,
647                                  self.pg0.local_ip4)
648         p.tun_sa_in.add_vpp_config()
649
650         p.tun_if = VppGreInterface(self,
651                                    self.pg0.local_ip4,
652                                    self.pg0.remote_ip4,
653                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
654                                          GRE_API_TUNNEL_TYPE_TEB))
655         p.tun_if.add_vpp_config()
656
657         p.tun_protect = VppIpsecTunProtect(self,
658                                            p.tun_if,
659                                            p.tun_sa_out,
660                                            [p.tun_sa_in])
661
662         p.tun_protect.add_vpp_config()
663
664         p.tun_if.admin_up()
665         p.tun_if.config_ip4()
666         config_tun_params(p, self.encryption_type, p.tun_if)
667
668         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
669         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
670
671         self.vapi.cli("clear ipsec sa")
672
673     def tearDown(self):
674         p = self.ipv4_params
675         p.tun_if.unconfig_ip4()
676         super(TestIpsecGreTebIfEsp, self).tearDown()
677
678
679 class TestIpsecGreTebIfEspTra(TemplateIpsec,
680                               IpsecTun4Tests):
681     """ Ipsec GRE TEB ESP - Tra tests """
682     tun4_encrypt_node_name = "esp4-encrypt-tun"
683     tun4_decrypt_node_name = "esp4-decrypt-tun"
684     encryption_type = ESP
685     omac = "00:11:22:33:44:55"
686
687     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
688                          payload_size=100):
689         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
690                 sa.encrypt(IP(src=self.pg0.remote_ip4,
691                               dst=self.pg0.local_ip4) /
692                            GRE() /
693                            Ether(dst=self.omac) /
694                            IP(src="1.1.1.1", dst="1.1.1.2") /
695                            UDP(sport=1144, dport=2233) /
696                            Raw(b'X' * payload_size))
697                 for i in range(count)]
698
699     def gen_pkts(self, sw_intf, src, dst, count=1,
700                  payload_size=100):
701         return [Ether(dst=self.omac) /
702                 IP(src="1.1.1.1", dst="1.1.1.2") /
703                 UDP(sport=1144, dport=2233) /
704                 Raw(b'X' * payload_size)
705                 for i in range(count)]
706
707     def verify_decrypted(self, p, rxs):
708         for rx in rxs:
709             self.assert_equal(rx[Ether].dst, self.omac)
710             self.assert_equal(rx[IP].dst, "1.1.1.2")
711
712     def verify_encrypted(self, p, sa, rxs):
713         for rx in rxs:
714             try:
715                 pkt = sa.decrypt(rx[IP])
716                 if not pkt.haslayer(IP):
717                     pkt = IP(pkt[Raw].load)
718                 self.assert_packet_checksums_valid(pkt)
719                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
720                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
721                 self.assertTrue(pkt.haslayer(GRE))
722                 e = pkt[Ether]
723                 self.assertEqual(e[Ether].dst, self.omac)
724                 self.assertEqual(e[IP].dst, "1.1.1.2")
725             except (IndexError, AssertionError):
726                 self.logger.debug(ppp("Unexpected packet:", rx))
727                 try:
728                     self.logger.debug(ppp("Decrypted packet:", pkt))
729                 except:
730                     pass
731                 raise
732
733     def setUp(self):
734         super(TestIpsecGreTebIfEspTra, self).setUp()
735
736         self.tun_if = self.pg0
737
738         p = self.ipv4_params
739
740         bd1 = VppBridgeDomain(self, 1)
741         bd1.add_vpp_config()
742
743         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
744                                   p.auth_algo_vpp_id, p.auth_key,
745                                   p.crypt_algo_vpp_id, p.crypt_key,
746                                   self.vpp_esp_protocol)
747         p.tun_sa_out.add_vpp_config()
748
749         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
750                                  p.auth_algo_vpp_id, p.auth_key,
751                                  p.crypt_algo_vpp_id, p.crypt_key,
752                                  self.vpp_esp_protocol)
753         p.tun_sa_in.add_vpp_config()
754
755         p.tun_if = VppGreInterface(self,
756                                    self.pg0.local_ip4,
757                                    self.pg0.remote_ip4,
758                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
759                                          GRE_API_TUNNEL_TYPE_TEB))
760         p.tun_if.add_vpp_config()
761
762         p.tun_protect = VppIpsecTunProtect(self,
763                                            p.tun_if,
764                                            p.tun_sa_out,
765                                            [p.tun_sa_in])
766
767         p.tun_protect.add_vpp_config()
768
769         p.tun_if.admin_up()
770         p.tun_if.config_ip4()
771         config_tra_params(p, self.encryption_type, p.tun_if)
772
773         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
774         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
775
776         self.vapi.cli("clear ipsec sa")
777
778     def tearDown(self):
779         p = self.ipv4_params
780         p.tun_if.unconfig_ip4()
781         super(TestIpsecGreTebIfEspTra, self).tearDown()
782
783
784 class TestIpsecGreIfEsp(TemplateIpsec,
785                         IpsecTun4Tests):
786     """ Ipsec GRE ESP - TUN tests """
787     tun4_encrypt_node_name = "esp4-encrypt-tun"
788     tun4_decrypt_node_name = "esp4-decrypt-tun"
789     encryption_type = ESP
790
791     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
792                          payload_size=100):
793         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
794                 sa.encrypt(IP(src=self.pg0.remote_ip4,
795                               dst=self.pg0.local_ip4) /
796                            GRE() /
797                            IP(src=self.pg1.local_ip4,
798                               dst=self.pg1.remote_ip4) /
799                            UDP(sport=1144, dport=2233) /
800                            Raw(b'X' * payload_size))
801                 for i in range(count)]
802
803     def gen_pkts(self, sw_intf, src, dst, count=1,
804                  payload_size=100):
805         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
806                 IP(src="1.1.1.1", dst="1.1.1.2") /
807                 UDP(sport=1144, dport=2233) /
808                 Raw(b'X' * payload_size)
809                 for i in range(count)]
810
811     def verify_decrypted(self, p, rxs):
812         for rx in rxs:
813             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
814             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
815
816     def verify_encrypted(self, p, sa, rxs):
817         for rx in rxs:
818             try:
819                 pkt = sa.decrypt(rx[IP])
820                 if not pkt.haslayer(IP):
821                     pkt = IP(pkt[Raw].load)
822                 self.assert_packet_checksums_valid(pkt)
823                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
824                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
825                 self.assertTrue(pkt.haslayer(GRE))
826                 e = pkt[GRE]
827                 self.assertEqual(e[IP].dst, "1.1.1.2")
828             except (IndexError, AssertionError):
829                 self.logger.debug(ppp("Unexpected packet:", rx))
830                 try:
831                     self.logger.debug(ppp("Decrypted packet:", pkt))
832                 except:
833                     pass
834                 raise
835
836     def setUp(self):
837         super(TestIpsecGreIfEsp, self).setUp()
838
839         self.tun_if = self.pg0
840
841         p = self.ipv4_params
842
843         bd1 = VppBridgeDomain(self, 1)
844         bd1.add_vpp_config()
845
846         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
847                                   p.auth_algo_vpp_id, p.auth_key,
848                                   p.crypt_algo_vpp_id, p.crypt_key,
849                                   self.vpp_esp_protocol,
850                                   self.pg0.local_ip4,
851                                   self.pg0.remote_ip4)
852         p.tun_sa_out.add_vpp_config()
853
854         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
855                                  p.auth_algo_vpp_id, p.auth_key,
856                                  p.crypt_algo_vpp_id, p.crypt_key,
857                                  self.vpp_esp_protocol,
858                                  self.pg0.remote_ip4,
859                                  self.pg0.local_ip4)
860         p.tun_sa_in.add_vpp_config()
861
862         p.tun_if = VppGreInterface(self,
863                                    self.pg0.local_ip4,
864                                    self.pg0.remote_ip4)
865         p.tun_if.add_vpp_config()
866
867         p.tun_protect = VppIpsecTunProtect(self,
868                                            p.tun_if,
869                                            p.tun_sa_out,
870                                            [p.tun_sa_in])
871         p.tun_protect.add_vpp_config()
872
873         p.tun_if.admin_up()
874         p.tun_if.config_ip4()
875         config_tun_params(p, self.encryption_type, p.tun_if)
876
877         VppIpRoute(self, "1.1.1.2", 32,
878                    [VppRoutePath(p.tun_if.remote_ip4,
879                                  0xffffffff)]).add_vpp_config()
880
881     def tearDown(self):
882         p = self.ipv4_params
883         p.tun_if.unconfig_ip4()
884         super(TestIpsecGreIfEsp, self).tearDown()
885
886
887 class TestIpsecGreIfEspTra(TemplateIpsec,
888                            IpsecTun4Tests):
889     """ Ipsec GRE ESP - TRA tests """
890     tun4_encrypt_node_name = "esp4-encrypt-tun"
891     tun4_decrypt_node_name = "esp4-decrypt-tun"
892     encryption_type = ESP
893
894     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
895                          payload_size=100):
896         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
897                 sa.encrypt(IP(src=self.pg0.remote_ip4,
898                               dst=self.pg0.local_ip4) /
899                            GRE() /
900                            IP(src=self.pg1.local_ip4,
901                               dst=self.pg1.remote_ip4) /
902                            UDP(sport=1144, dport=2233) /
903                            Raw(b'X' * payload_size))
904                 for i in range(count)]
905
906     def gen_pkts(self, sw_intf, src, dst, count=1,
907                  payload_size=100):
908         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
909                 IP(src="1.1.1.1", dst="1.1.1.2") /
910                 UDP(sport=1144, dport=2233) /
911                 Raw(b'X' * payload_size)
912                 for i in range(count)]
913
914     def verify_decrypted(self, p, rxs):
915         for rx in rxs:
916             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
917             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
918
919     def verify_encrypted(self, p, sa, rxs):
920         for rx in rxs:
921             try:
922                 pkt = sa.decrypt(rx[IP])
923                 if not pkt.haslayer(IP):
924                     pkt = IP(pkt[Raw].load)
925                 self.assert_packet_checksums_valid(pkt)
926                 self.assertTrue(pkt.haslayer(GRE))
927                 e = pkt[GRE]
928                 self.assertEqual(e[IP].dst, "1.1.1.2")
929             except (IndexError, AssertionError):
930                 self.logger.debug(ppp("Unexpected packet:", rx))
931                 try:
932                     self.logger.debug(ppp("Decrypted packet:", pkt))
933                 except:
934                     pass
935                 raise
936
937     def setUp(self):
938         super(TestIpsecGreIfEspTra, self).setUp()
939
940         self.tun_if = self.pg0
941
942         p = self.ipv4_params
943
944         bd1 = VppBridgeDomain(self, 1)
945         bd1.add_vpp_config()
946
947         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
948                                   p.auth_algo_vpp_id, p.auth_key,
949                                   p.crypt_algo_vpp_id, p.crypt_key,
950                                   self.vpp_esp_protocol)
951         p.tun_sa_out.add_vpp_config()
952
953         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
954                                  p.auth_algo_vpp_id, p.auth_key,
955                                  p.crypt_algo_vpp_id, p.crypt_key,
956                                  self.vpp_esp_protocol)
957         p.tun_sa_in.add_vpp_config()
958
959         p.tun_if = VppGreInterface(self,
960                                    self.pg0.local_ip4,
961                                    self.pg0.remote_ip4)
962         p.tun_if.add_vpp_config()
963
964         p.tun_protect = VppIpsecTunProtect(self,
965                                            p.tun_if,
966                                            p.tun_sa_out,
967                                            [p.tun_sa_in])
968         p.tun_protect.add_vpp_config()
969
970         p.tun_if.admin_up()
971         p.tun_if.config_ip4()
972         config_tra_params(p, self.encryption_type, p.tun_if)
973
974         VppIpRoute(self, "1.1.1.2", 32,
975                    [VppRoutePath(p.tun_if.remote_ip4,
976                                  0xffffffff)]).add_vpp_config()
977
978     def tearDown(self):
979         p = self.ipv4_params
980         p.tun_if.unconfig_ip4()
981         super(TestIpsecGreIfEspTra, self).tearDown()
982
983
984 class TemplateIpsec4TunProtect(object):
985     """ IPsec IPv4 Tunnel protect """
986
987     encryption_type = ESP
988     tun4_encrypt_node_name = "esp4-encrypt-tun"
989     tun4_decrypt_node_name = "esp4-decrypt-tun"
990     tun4_input_node = "ipsec4-tun-input"
991
992     def config_sa_tra(self, p):
993         config_tun_params(p, self.encryption_type, p.tun_if)
994
995         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
996                                   p.auth_algo_vpp_id, p.auth_key,
997                                   p.crypt_algo_vpp_id, p.crypt_key,
998                                   self.vpp_esp_protocol,
999                                   flags=p.flags)
1000         p.tun_sa_out.add_vpp_config()
1001
1002         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1003                                  p.auth_algo_vpp_id, p.auth_key,
1004                                  p.crypt_algo_vpp_id, p.crypt_key,
1005                                  self.vpp_esp_protocol,
1006                                  flags=p.flags)
1007         p.tun_sa_in.add_vpp_config()
1008
1009     def config_sa_tun(self, p):
1010         config_tun_params(p, self.encryption_type, p.tun_if)
1011
1012         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1013                                   p.auth_algo_vpp_id, p.auth_key,
1014                                   p.crypt_algo_vpp_id, p.crypt_key,
1015                                   self.vpp_esp_protocol,
1016                                   self.tun_if.remote_addr[p.addr_type],
1017                                   self.tun_if.local_addr[p.addr_type],
1018                                   flags=p.flags)
1019         p.tun_sa_out.add_vpp_config()
1020
1021         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1022                                  p.auth_algo_vpp_id, p.auth_key,
1023                                  p.crypt_algo_vpp_id, p.crypt_key,
1024                                  self.vpp_esp_protocol,
1025                                  self.tun_if.remote_addr[p.addr_type],
1026                                  self.tun_if.local_addr[p.addr_type],
1027                                  flags=p.flags)
1028         p.tun_sa_in.add_vpp_config()
1029
1030     def config_protect(self, p):
1031         p.tun_protect = VppIpsecTunProtect(self,
1032                                            p.tun_if,
1033                                            p.tun_sa_out,
1034                                            [p.tun_sa_in])
1035         p.tun_protect.add_vpp_config()
1036
1037     def config_network(self, p):
1038         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1039                                        self.pg0.local_ip4,
1040                                        self.pg0.remote_ip4)
1041         p.tun_if.add_vpp_config()
1042         p.tun_if.admin_up()
1043         p.tun_if.config_ip4()
1044         p.tun_if.config_ip6()
1045
1046         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1047                              [VppRoutePath(p.tun_if.remote_ip4,
1048                                            0xffffffff)])
1049         p.route.add_vpp_config()
1050         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1051                        [VppRoutePath(p.tun_if.remote_ip6,
1052                                      0xffffffff,
1053                                      proto=DpoProto.DPO_PROTO_IP6)])
1054         r.add_vpp_config()
1055
1056     def unconfig_network(self, p):
1057         p.route.remove_vpp_config()
1058         p.tun_if.remove_vpp_config()
1059
1060     def unconfig_protect(self, p):
1061         p.tun_protect.remove_vpp_config()
1062
1063     def unconfig_sa(self, p):
1064         p.tun_sa_out.remove_vpp_config()
1065         p.tun_sa_in.remove_vpp_config()
1066
1067
1068 class TestIpsec4TunProtect(TemplateIpsec,
1069                            TemplateIpsec4TunProtect,
1070                            IpsecTun4):
1071     """ IPsec IPv4 Tunnel protect - transport mode"""
1072
1073     def setUp(self):
1074         super(TestIpsec4TunProtect, self).setUp()
1075
1076         self.tun_if = self.pg0
1077
1078     def tearDown(self):
1079         super(TestIpsec4TunProtect, self).tearDown()
1080
1081     def test_tun_44(self):
1082         """IPSEC tunnel protect"""
1083
1084         p = self.ipv4_params
1085
1086         self.config_network(p)
1087         self.config_sa_tra(p)
1088         self.config_protect(p)
1089
1090         self.verify_tun_44(p, count=127)
1091         c = p.tun_if.get_rx_stats()
1092         self.assertEqual(c['packets'], 127)
1093         c = p.tun_if.get_tx_stats()
1094         self.assertEqual(c['packets'], 127)
1095
1096         self.vapi.cli("clear ipsec sa")
1097         self.verify_tun_64(p, count=127)
1098         c = p.tun_if.get_rx_stats()
1099         self.assertEqual(c['packets'], 254)
1100         c = p.tun_if.get_tx_stats()
1101         self.assertEqual(c['packets'], 254)
1102
1103         # rekey - create new SAs and update the tunnel protection
1104         np = copy.copy(p)
1105         np.crypt_key = b'X' + p.crypt_key[1:]
1106         np.scapy_tun_spi += 100
1107         np.scapy_tun_sa_id += 1
1108         np.vpp_tun_spi += 100
1109         np.vpp_tun_sa_id += 1
1110         np.tun_if.local_spi = p.vpp_tun_spi
1111         np.tun_if.remote_spi = p.scapy_tun_spi
1112
1113         self.config_sa_tra(np)
1114         self.config_protect(np)
1115         self.unconfig_sa(p)
1116
1117         self.verify_tun_44(np, count=127)
1118         c = p.tun_if.get_rx_stats()
1119         self.assertEqual(c['packets'], 381)
1120         c = p.tun_if.get_tx_stats()
1121         self.assertEqual(c['packets'], 381)
1122
1123         # teardown
1124         self.unconfig_protect(np)
1125         self.unconfig_sa(np)
1126         self.unconfig_network(p)
1127
1128
1129 class TestIpsec4TunProtectUdp(TemplateIpsec,
1130                               TemplateIpsec4TunProtect,
1131                               IpsecTun4):
1132     """ IPsec IPv4 Tunnel protect - transport mode"""
1133
1134     def setUp(self):
1135         super(TestIpsec4TunProtectUdp, self).setUp()
1136
1137         self.tun_if = self.pg0
1138
1139         p = self.ipv4_params
1140         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1141                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1142         p.nat_header = UDP(sport=5454, dport=4500)
1143         self.config_network(p)
1144         self.config_sa_tra(p)
1145         self.config_protect(p)
1146
1147     def tearDown(self):
1148         p = self.ipv4_params
1149         self.unconfig_protect(p)
1150         self.unconfig_sa(p)
1151         self.unconfig_network(p)
1152         super(TestIpsec4TunProtectUdp, self).tearDown()
1153
1154     def test_tun_44(self):
1155         """IPSEC UDP tunnel protect"""
1156
1157         p = self.ipv4_params
1158
1159         self.verify_tun_44(p, count=127)
1160         c = p.tun_if.get_rx_stats()
1161         self.assertEqual(c['packets'], 127)
1162         c = p.tun_if.get_tx_stats()
1163         self.assertEqual(c['packets'], 127)
1164
1165     def test_keepalive(self):
1166         """ IPSEC NAT Keepalive """
1167         self.verify_keepalive(self.ipv4_params)
1168
1169
1170 class TestIpsec4TunProtectTun(TemplateIpsec,
1171                               TemplateIpsec4TunProtect,
1172                               IpsecTun4):
1173     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1174
1175     encryption_type = ESP
1176     tun4_encrypt_node_name = "esp4-encrypt-tun"
1177     tun4_decrypt_node_name = "esp4-decrypt-tun"
1178
1179     def setUp(self):
1180         super(TestIpsec4TunProtectTun, self).setUp()
1181
1182         self.tun_if = self.pg0
1183
1184     def tearDown(self):
1185         super(TestIpsec4TunProtectTun, self).tearDown()
1186
1187     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1188                          payload_size=100):
1189         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1190                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1191                               dst=sw_intf.local_ip4) /
1192                            IP(src=src, dst=dst) /
1193                            UDP(sport=1144, dport=2233) /
1194                            Raw(b'X' * payload_size))
1195                 for i in range(count)]
1196
1197     def gen_pkts(self, sw_intf, src, dst, count=1,
1198                  payload_size=100):
1199         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1200                 IP(src=src, dst=dst) /
1201                 UDP(sport=1144, dport=2233) /
1202                 Raw(b'X' * payload_size)
1203                 for i in range(count)]
1204
1205     def verify_decrypted(self, p, rxs):
1206         for rx in rxs:
1207             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1208             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1209             self.assert_packet_checksums_valid(rx)
1210
1211     def verify_encrypted(self, p, sa, rxs):
1212         for rx in rxs:
1213             try:
1214                 pkt = sa.decrypt(rx[IP])
1215                 if not pkt.haslayer(IP):
1216                     pkt = IP(pkt[Raw].load)
1217                 self.assert_packet_checksums_valid(pkt)
1218                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1219                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1220                 inner = pkt[IP].payload
1221                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1222
1223             except (IndexError, AssertionError):
1224                 self.logger.debug(ppp("Unexpected packet:", rx))
1225                 try:
1226                     self.logger.debug(ppp("Decrypted packet:", pkt))
1227                 except:
1228                     pass
1229                 raise
1230
1231     def test_tun_44(self):
1232         """IPSEC tunnel protect """
1233
1234         p = self.ipv4_params
1235
1236         self.config_network(p)
1237         self.config_sa_tun(p)
1238         self.config_protect(p)
1239
1240         self.verify_tun_44(p, count=127)
1241
1242         c = p.tun_if.get_rx_stats()
1243         self.assertEqual(c['packets'], 127)
1244         c = p.tun_if.get_tx_stats()
1245         self.assertEqual(c['packets'], 127)
1246
1247         # rekey - create new SAs and update the tunnel protection
1248         np = copy.copy(p)
1249         np.crypt_key = b'X' + p.crypt_key[1:]
1250         np.scapy_tun_spi += 100
1251         np.scapy_tun_sa_id += 1
1252         np.vpp_tun_spi += 100
1253         np.vpp_tun_sa_id += 1
1254         np.tun_if.local_spi = p.vpp_tun_spi
1255         np.tun_if.remote_spi = p.scapy_tun_spi
1256
1257         self.config_sa_tun(np)
1258         self.config_protect(np)
1259         self.unconfig_sa(p)
1260
1261         self.verify_tun_44(np, count=127)
1262         c = p.tun_if.get_rx_stats()
1263         self.assertEqual(c['packets'], 254)
1264         c = p.tun_if.get_tx_stats()
1265         self.assertEqual(c['packets'], 254)
1266
1267         # teardown
1268         self.unconfig_protect(np)
1269         self.unconfig_sa(np)
1270         self.unconfig_network(p)
1271
1272
1273 class TemplateIpsec6TunProtect(object):
1274     """ IPsec IPv6 Tunnel protect """
1275
1276     def config_sa_tra(self, p):
1277         config_tun_params(p, self.encryption_type, p.tun_if)
1278
1279         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1280                                   p.auth_algo_vpp_id, p.auth_key,
1281                                   p.crypt_algo_vpp_id, p.crypt_key,
1282                                   self.vpp_esp_protocol)
1283         p.tun_sa_out.add_vpp_config()
1284
1285         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1286                                  p.auth_algo_vpp_id, p.auth_key,
1287                                  p.crypt_algo_vpp_id, p.crypt_key,
1288                                  self.vpp_esp_protocol)
1289         p.tun_sa_in.add_vpp_config()
1290
1291     def config_sa_tun(self, p):
1292         config_tun_params(p, self.encryption_type, p.tun_if)
1293
1294         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1295                                   p.auth_algo_vpp_id, p.auth_key,
1296                                   p.crypt_algo_vpp_id, p.crypt_key,
1297                                   self.vpp_esp_protocol,
1298                                   self.tun_if.remote_addr[p.addr_type],
1299                                   self.tun_if.local_addr[p.addr_type])
1300         p.tun_sa_out.add_vpp_config()
1301
1302         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1303                                  p.auth_algo_vpp_id, p.auth_key,
1304                                  p.crypt_algo_vpp_id, p.crypt_key,
1305                                  self.vpp_esp_protocol,
1306                                  self.tun_if.remote_addr[p.addr_type],
1307                                  self.tun_if.local_addr[p.addr_type])
1308         p.tun_sa_in.add_vpp_config()
1309
1310     def config_protect(self, p):
1311         p.tun_protect = VppIpsecTunProtect(self,
1312                                            p.tun_if,
1313                                            p.tun_sa_out,
1314                                            [p.tun_sa_in])
1315         p.tun_protect.add_vpp_config()
1316
1317     def config_network(self, p):
1318         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1319                                        self.pg0.local_ip6,
1320                                        self.pg0.remote_ip6)
1321         p.tun_if.add_vpp_config()
1322         p.tun_if.admin_up()
1323         p.tun_if.config_ip6()
1324         p.tun_if.config_ip4()
1325
1326         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1327                              [VppRoutePath(p.tun_if.remote_ip6,
1328                                            0xffffffff,
1329                                            proto=DpoProto.DPO_PROTO_IP6)])
1330         p.route.add_vpp_config()
1331         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1332                        [VppRoutePath(p.tun_if.remote_ip4,
1333                                      0xffffffff)])
1334         r.add_vpp_config()
1335
1336     def unconfig_network(self, p):
1337         p.route.remove_vpp_config()
1338         p.tun_if.remove_vpp_config()
1339
1340     def unconfig_protect(self, p):
1341         p.tun_protect.remove_vpp_config()
1342
1343     def unconfig_sa(self, p):
1344         p.tun_sa_out.remove_vpp_config()
1345         p.tun_sa_in.remove_vpp_config()
1346
1347
1348 class TestIpsec6TunProtect(TemplateIpsec,
1349                            TemplateIpsec6TunProtect,
1350                            IpsecTun6):
1351     """ IPsec IPv6 Tunnel protect - transport mode"""
1352
1353     encryption_type = ESP
1354     tun6_encrypt_node_name = "esp6-encrypt-tun"
1355     tun6_decrypt_node_name = "esp6-decrypt-tun"
1356
1357     def setUp(self):
1358         super(TestIpsec6TunProtect, self).setUp()
1359
1360         self.tun_if = self.pg0
1361
1362     def tearDown(self):
1363         super(TestIpsec6TunProtect, self).tearDown()
1364
1365     def test_tun_66(self):
1366         """IPSEC tunnel protect"""
1367
1368         p = self.ipv6_params
1369
1370         self.config_network(p)
1371         self.config_sa_tra(p)
1372         self.config_protect(p)
1373
1374         self.verify_tun_66(p, count=127)
1375         c = p.tun_if.get_rx_stats()
1376         self.assertEqual(c['packets'], 127)
1377         c = p.tun_if.get_tx_stats()
1378         self.assertEqual(c['packets'], 127)
1379
1380         # rekey - create new SAs and update the tunnel protection
1381         np = copy.copy(p)
1382         np.crypt_key = b'X' + p.crypt_key[1:]
1383         np.scapy_tun_spi += 100
1384         np.scapy_tun_sa_id += 1
1385         np.vpp_tun_spi += 100
1386         np.vpp_tun_sa_id += 1
1387         np.tun_if.local_spi = p.vpp_tun_spi
1388         np.tun_if.remote_spi = p.scapy_tun_spi
1389
1390         self.config_sa_tra(np)
1391         self.config_protect(np)
1392         self.unconfig_sa(p)
1393
1394         self.verify_tun_66(np, count=127)
1395         c = p.tun_if.get_rx_stats()
1396         self.assertEqual(c['packets'], 254)
1397         c = p.tun_if.get_tx_stats()
1398         self.assertEqual(c['packets'], 254)
1399
1400         # 3 phase rekey
1401         #  1) add two input SAs [old, new]
1402         #  2) swap output SA to [new]
1403         #  3) use only [new] input SA
1404         np3 = copy.copy(np)
1405         np3.crypt_key = b'Z' + p.crypt_key[1:]
1406         np3.scapy_tun_spi += 100
1407         np3.scapy_tun_sa_id += 1
1408         np3.vpp_tun_spi += 100
1409         np3.vpp_tun_sa_id += 1
1410         np3.tun_if.local_spi = p.vpp_tun_spi
1411         np3.tun_if.remote_spi = p.scapy_tun_spi
1412
1413         self.config_sa_tra(np3)
1414
1415         # step 1;
1416         p.tun_protect.update_vpp_config(np.tun_sa_out,
1417                                         [np.tun_sa_in, np3.tun_sa_in])
1418         self.verify_tun_66(np, np, count=127)
1419         self.verify_tun_66(np3, np, count=127)
1420
1421         # step 2;
1422         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1423                                         [np.tun_sa_in, np3.tun_sa_in])
1424         self.verify_tun_66(np, np3, count=127)
1425         self.verify_tun_66(np3, np3, count=127)
1426
1427         # step 1;
1428         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1429                                         [np3.tun_sa_in])
1430         self.verify_tun_66(np3, np3, count=127)
1431         self.verify_drop_tun_66(np, count=127)
1432
1433         c = p.tun_if.get_rx_stats()
1434         self.assertEqual(c['packets'], 127*7)
1435         c = p.tun_if.get_tx_stats()
1436         self.assertEqual(c['packets'], 127*7)
1437         self.unconfig_sa(np)
1438
1439         # teardown
1440         self.unconfig_protect(np3)
1441         self.unconfig_sa(np3)
1442         self.unconfig_network(p)
1443
1444     def test_tun_46(self):
1445         """IPSEC tunnel protect"""
1446
1447         p = self.ipv6_params
1448
1449         self.config_network(p)
1450         self.config_sa_tra(p)
1451         self.config_protect(p)
1452
1453         self.verify_tun_46(p, count=127)
1454         c = p.tun_if.get_rx_stats()
1455         self.assertEqual(c['packets'], 127)
1456         c = p.tun_if.get_tx_stats()
1457         self.assertEqual(c['packets'], 127)
1458
1459         # teardown
1460         self.unconfig_protect(p)
1461         self.unconfig_sa(p)
1462         self.unconfig_network(p)
1463
1464
1465 class TestIpsec6TunProtectTun(TemplateIpsec,
1466                               TemplateIpsec6TunProtect,
1467                               IpsecTun6):
1468     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1469
1470     encryption_type = ESP
1471     tun6_encrypt_node_name = "esp6-encrypt-tun"
1472     tun6_decrypt_node_name = "esp6-decrypt-tun"
1473
1474     def setUp(self):
1475         super(TestIpsec6TunProtectTun, self).setUp()
1476
1477         self.tun_if = self.pg0
1478
1479     def tearDown(self):
1480         super(TestIpsec6TunProtectTun, self).tearDown()
1481
1482     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1483                           payload_size=100):
1484         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1485                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1486                                 dst=sw_intf.local_ip6) /
1487                            IPv6(src=src, dst=dst) /
1488                            UDP(sport=1166, dport=2233) /
1489                            Raw(b'X' * payload_size))
1490                 for i in range(count)]
1491
1492     def gen_pkts6(self, sw_intf, src, dst, count=1,
1493                   payload_size=100):
1494         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1495                 IPv6(src=src, dst=dst) /
1496                 UDP(sport=1166, dport=2233) /
1497                 Raw(b'X' * payload_size)
1498                 for i in range(count)]
1499
1500     def verify_decrypted6(self, p, rxs):
1501         for rx in rxs:
1502             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1503             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1504             self.assert_packet_checksums_valid(rx)
1505
1506     def verify_encrypted6(self, p, sa, rxs):
1507         for rx in rxs:
1508             try:
1509                 pkt = sa.decrypt(rx[IPv6])
1510                 if not pkt.haslayer(IPv6):
1511                     pkt = IPv6(pkt[Raw].load)
1512                 self.assert_packet_checksums_valid(pkt)
1513                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1514                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1515                 inner = pkt[IPv6].payload
1516                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1517
1518             except (IndexError, AssertionError):
1519                 self.logger.debug(ppp("Unexpected packet:", rx))
1520                 try:
1521                     self.logger.debug(ppp("Decrypted packet:", pkt))
1522                 except:
1523                     pass
1524                 raise
1525
1526     def test_tun_66(self):
1527         """IPSEC tunnel protect """
1528
1529         p = self.ipv6_params
1530
1531         self.config_network(p)
1532         self.config_sa_tun(p)
1533         self.config_protect(p)
1534
1535         self.verify_tun_66(p, count=127)
1536
1537         c = p.tun_if.get_rx_stats()
1538         self.assertEqual(c['packets'], 127)
1539         c = p.tun_if.get_tx_stats()
1540         self.assertEqual(c['packets'], 127)
1541
1542         # rekey - create new SAs and update the tunnel protection
1543         np = copy.copy(p)
1544         np.crypt_key = b'X' + p.crypt_key[1:]
1545         np.scapy_tun_spi += 100
1546         np.scapy_tun_sa_id += 1
1547         np.vpp_tun_spi += 100
1548         np.vpp_tun_sa_id += 1
1549         np.tun_if.local_spi = p.vpp_tun_spi
1550         np.tun_if.remote_spi = p.scapy_tun_spi
1551
1552         self.config_sa_tun(np)
1553         self.config_protect(np)
1554         self.unconfig_sa(p)
1555
1556         self.verify_tun_66(np, count=127)
1557         c = p.tun_if.get_rx_stats()
1558         self.assertEqual(c['packets'], 254)
1559         c = p.tun_if.get_tx_stats()
1560         self.assertEqual(c['packets'], 254)
1561
1562         # teardown
1563         self.unconfig_protect(np)
1564         self.unconfig_sa(np)
1565         self.unconfig_network(p)
1566
1567
1568 if __name__ == '__main__':
1569     unittest.main(testRunner=VppTestRunner)