ipsec: AH copy destination and source address from template
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from util import ppp
21 from vpp_papi import VppEnum
22
23
24 def config_tun_params(p, encryption_type, tun_if):
25     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
26     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
27                              IPSEC_API_SAD_FLAG_USE_ESN))
28     crypt_key = mk_scapy_crypt_key(p)
29     p.tun_dst = tun_if.remote_ip
30     p.tun_src = tun_if.local_ip
31     p.scapy_tun_sa = SecurityAssociation(
32         encryption_type, spi=p.vpp_tun_spi,
33         crypt_algo=p.crypt_algo,
34         crypt_key=crypt_key,
35         auth_algo=p.auth_algo, auth_key=p.auth_key,
36         tunnel_header=ip_class_by_addr_type[p.addr_type](
37             src=p.tun_dst,
38             dst=p.tun_src),
39         nat_t_header=p.nat_header,
40         esn_en=esn_en)
41     p.vpp_tun_sa = SecurityAssociation(
42         encryption_type, spi=p.scapy_tun_spi,
43         crypt_algo=p.crypt_algo,
44         crypt_key=crypt_key,
45         auth_algo=p.auth_algo, auth_key=p.auth_key,
46         tunnel_header=ip_class_by_addr_type[p.addr_type](
47             dst=p.tun_dst,
48             src=p.tun_src),
49         nat_t_header=p.nat_header,
50         esn_en=esn_en)
51
52
53 def config_tra_params(p, encryption_type, tun_if):
54     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
55     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
56                              IPSEC_API_SAD_FLAG_USE_ESN))
57     crypt_key = mk_scapy_crypt_key(p)
58     p.tun_dst = tun_if.remote_ip
59     p.tun_src = tun_if.local_ip
60     p.scapy_tun_sa = SecurityAssociation(
61         encryption_type, spi=p.vpp_tun_spi,
62         crypt_algo=p.crypt_algo,
63         crypt_key=crypt_key,
64         auth_algo=p.auth_algo, auth_key=p.auth_key,
65         esn_en=esn_en)
66     p.vpp_tun_sa = SecurityAssociation(
67         encryption_type, spi=p.scapy_tun_spi,
68         crypt_algo=p.crypt_algo,
69         crypt_key=crypt_key,
70         auth_algo=p.auth_algo, auth_key=p.auth_key,
71         esn_en=esn_en)
72
73
74 class TemplateIpsec4TunIfEsp(TemplateIpsec):
75     """ IPsec tunnel interface tests """
76
77     encryption_type = ESP
78
79     @classmethod
80     def setUpClass(cls):
81         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
82
83     @classmethod
84     def tearDownClass(cls):
85         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
86
87     def setUp(self):
88         super(TemplateIpsec4TunIfEsp, self).setUp()
89
90         self.tun_if = self.pg0
91
92         p = self.ipv4_params
93
94         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
95                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
96                                         p.crypt_key, p.crypt_key,
97                                         p.auth_algo_vpp_id, p.auth_key,
98                                         p.auth_key)
99         p.tun_if.add_vpp_config()
100         p.tun_if.admin_up()
101         p.tun_if.config_ip4()
102         p.tun_if.config_ip6()
103         config_tun_params(p, self.encryption_type, p.tun_if)
104
105         r = VppIpRoute(self, p.remote_tun_if_host, 32,
106                        [VppRoutePath(p.tun_if.remote_ip4,
107                                      0xffffffff)])
108         r.add_vpp_config()
109         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
110                        [VppRoutePath(p.tun_if.remote_ip6,
111                                      0xffffffff,
112                                      proto=DpoProto.DPO_PROTO_IP6)])
113         r.add_vpp_config()
114
115     def tearDown(self):
116         super(TemplateIpsec4TunIfEsp, self).tearDown()
117
118
119 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
120     """ IPsec UDP tunnel interface tests """
121
122     tun4_encrypt_node_name = "esp4-encrypt-tun"
123     tun4_decrypt_node_name = "esp4-decrypt-tun"
124     encryption_type = ESP
125
126     @classmethod
127     def setUpClass(cls):
128         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
129
130     @classmethod
131     def tearDownClass(cls):
132         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
133
134     def setUp(self):
135         super(TemplateIpsec4TunIfEspUdp, self).setUp()
136
137         self.tun_if = self.pg0
138
139         p = self.ipv4_params
140         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
141                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
142         p.nat_header = UDP(sport=5454, dport=4500)
143
144         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
145                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
146                                         p.crypt_key, p.crypt_key,
147                                         p.auth_algo_vpp_id, p.auth_key,
148                                         p.auth_key, udp_encap=True)
149         p.tun_if.add_vpp_config()
150         p.tun_if.admin_up()
151         p.tun_if.config_ip4()
152         p.tun_if.config_ip6()
153         config_tun_params(p, self.encryption_type, p.tun_if)
154
155         r = VppIpRoute(self, p.remote_tun_if_host, 32,
156                        [VppRoutePath(p.tun_if.remote_ip4,
157                                      0xffffffff)])
158         r.add_vpp_config()
159         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
160                        [VppRoutePath(p.tun_if.remote_ip6,
161                                      0xffffffff,
162                                      proto=DpoProto.DPO_PROTO_IP6)])
163         r.add_vpp_config()
164
165     def tearDown(self):
166         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
167
168
169 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
170     """ Ipsec ESP - TUN tests """
171     tun4_encrypt_node_name = "esp4-encrypt-tun"
172     tun4_decrypt_node_name = "esp4-decrypt-tun"
173
174     def test_tun_basic64(self):
175         """ ipsec 6o4 tunnel basic test """
176         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
177
178         self.verify_tun_64(self.params[socket.AF_INET], count=1)
179
180     def test_tun_burst64(self):
181         """ ipsec 6o4 tunnel basic test """
182         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
183
184         self.verify_tun_64(self.params[socket.AF_INET], count=257)
185
186     def test_tun_basic_frag44(self):
187         """ ipsec 4o4 tunnel frag basic test """
188         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
189
190         p = self.ipv4_params
191
192         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
193                                        [1500, 0, 0, 0])
194         self.verify_tun_44(self.params[socket.AF_INET],
195                            count=1, payload_size=1800, n_rx=2)
196         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
197                                        [9000, 0, 0, 0])
198
199
200 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
201     """ Ipsec ESP UDP tests """
202
203     tun4_input_node = "ipsec4-tun-input"
204
205     def test_keepalive(self):
206         """ IPSEC NAT Keepalive """
207         self.verify_keepalive(self.ipv4_params)
208
209
210 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
211     """ Ipsec ESP - TCP tests """
212     pass
213
214
215 class TemplateIpsec6TunIfEsp(TemplateIpsec):
216     """ IPsec tunnel interface tests """
217
218     encryption_type = ESP
219
220     def setUp(self):
221         super(TemplateIpsec6TunIfEsp, self).setUp()
222
223         self.tun_if = self.pg0
224
225         p = self.ipv6_params
226         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
227                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
228                                         p.crypt_key, p.crypt_key,
229                                         p.auth_algo_vpp_id, p.auth_key,
230                                         p.auth_key, is_ip6=True)
231         p.tun_if.add_vpp_config()
232         p.tun_if.admin_up()
233         p.tun_if.config_ip6()
234         p.tun_if.config_ip4()
235         config_tun_params(p, self.encryption_type, p.tun_if)
236
237         r = VppIpRoute(self, p.remote_tun_if_host, 128,
238                        [VppRoutePath(p.tun_if.remote_ip6,
239                                      0xffffffff,
240                                      proto=DpoProto.DPO_PROTO_IP6)])
241         r.add_vpp_config()
242         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
243                        [VppRoutePath(p.tun_if.remote_ip4,
244                                      0xffffffff)])
245         r.add_vpp_config()
246
247     def tearDown(self):
248         super(TemplateIpsec6TunIfEsp, self).tearDown()
249
250
251 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
252                           IpsecTun6Tests):
253     """ Ipsec ESP - TUN tests """
254     tun6_encrypt_node_name = "esp6-encrypt-tun"
255     tun6_decrypt_node_name = "esp6-decrypt-tun"
256
257     def test_tun_basic46(self):
258         """ ipsec 4o6 tunnel basic test """
259         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
260         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
261
262     def test_tun_burst46(self):
263         """ ipsec 4o6 tunnel burst test """
264         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
265         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
266
267
268 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
269                                 IpsecTun6HandoffTests):
270     """ Ipsec ESP 6 Handoff tests """
271     tun6_encrypt_node_name = "esp6-encrypt-tun"
272     tun6_decrypt_node_name = "esp6-decrypt-tun"
273
274
275 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
276                                 IpsecTun4HandoffTests):
277     """ Ipsec ESP 4 Handoff tests """
278     tun4_encrypt_node_name = "esp4-encrypt-tun"
279     tun4_decrypt_node_name = "esp4-decrypt-tun"
280
281
282 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
283     """ IPsec IPv4 Multi Tunnel interface """
284
285     encryption_type = ESP
286     tun4_encrypt_node_name = "esp4-encrypt-tun"
287     tun4_decrypt_node_name = "esp4-decrypt-tun"
288
289     def setUp(self):
290         super(TestIpsec4MultiTunIfEsp, self).setUp()
291
292         self.tun_if = self.pg0
293
294         self.multi_params = []
295         self.pg0.generate_remote_hosts(10)
296         self.pg0.configure_ipv4_neighbors()
297
298         for ii in range(10):
299             p = copy.copy(self.ipv4_params)
300
301             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
302             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
303             p.scapy_tun_spi = p.scapy_tun_spi + ii
304             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
305             p.vpp_tun_spi = p.vpp_tun_spi + ii
306
307             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
308             p.scapy_tra_spi = p.scapy_tra_spi + ii
309             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
310             p.vpp_tra_spi = p.vpp_tra_spi + ii
311             p.tun_dst = self.pg0.remote_hosts[ii].ip4
312
313             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
314                                             p.scapy_tun_spi,
315                                             p.crypt_algo_vpp_id,
316                                             p.crypt_key, p.crypt_key,
317                                             p.auth_algo_vpp_id, p.auth_key,
318                                             p.auth_key,
319                                             dst=p.tun_dst)
320             p.tun_if.add_vpp_config()
321             p.tun_if.admin_up()
322             p.tun_if.config_ip4()
323             config_tun_params(p, self.encryption_type, p.tun_if)
324             self.multi_params.append(p)
325
326             VppIpRoute(self, p.remote_tun_if_host, 32,
327                        [VppRoutePath(p.tun_if.remote_ip4,
328                                      0xffffffff)]).add_vpp_config()
329
330     def tearDown(self):
331         super(TestIpsec4MultiTunIfEsp, self).tearDown()
332
333     def test_tun_44(self):
334         """Multiple IPSEC tunnel interfaces """
335         for p in self.multi_params:
336             self.verify_tun_44(p, count=127)
337             c = p.tun_if.get_rx_stats()
338             self.assertEqual(c['packets'], 127)
339             c = p.tun_if.get_tx_stats()
340             self.assertEqual(c['packets'], 127)
341
342     def test_tun_rr_44(self):
343         """ Round-robin packets acrros multiple interface """
344         tx = []
345         for p in self.multi_params:
346             tx = tx + self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
347                                             src=p.remote_tun_if_host,
348                                             dst=self.pg1.remote_ip4)
349         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
350
351         for rx, p in zip(rxs, self.multi_params):
352             self.verify_decrypted(p, [rx])
353
354         tx = []
355         for p in self.multi_params:
356             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
357                                     dst=p.remote_tun_if_host)
358         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
359
360         for rx, p in zip(rxs, self.multi_params):
361             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
362
363
364 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
365     """ IPsec IPv4 Tunnel interface all Algos """
366
367     encryption_type = ESP
368     tun4_encrypt_node_name = "esp4-encrypt-tun"
369     tun4_decrypt_node_name = "esp4-decrypt-tun"
370
371     def config_network(self, p):
372
373         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
374                                         p.scapy_tun_spi,
375                                         p.crypt_algo_vpp_id,
376                                         p.crypt_key, p.crypt_key,
377                                         p.auth_algo_vpp_id, p.auth_key,
378                                         p.auth_key,
379                                         salt=p.salt)
380         p.tun_if.add_vpp_config()
381         p.tun_if.admin_up()
382         p.tun_if.config_ip4()
383         config_tun_params(p, self.encryption_type, p.tun_if)
384         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
385         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
386
387         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
388                              [VppRoutePath(p.tun_if.remote_ip4,
389                                            0xffffffff)])
390         p.route.add_vpp_config()
391
392     def unconfig_network(self, p):
393         p.tun_if.unconfig_ip4()
394         p.tun_if.remove_vpp_config()
395         p.route.remove_vpp_config()
396
397     def setUp(self):
398         super(TestIpsec4TunIfEspAll, self).setUp()
399
400         self.tun_if = self.pg0
401
402     def tearDown(self):
403         super(TestIpsec4TunIfEspAll, self).tearDown()
404
405     def rekey(self, p):
406         #
407         # change the key and the SPI
408         #
409         p.crypt_key = b'X' + p.crypt_key[1:]
410         p.scapy_tun_spi += 1
411         p.scapy_tun_sa_id += 1
412         p.vpp_tun_spi += 1
413         p.vpp_tun_sa_id += 1
414         p.tun_if.local_spi = p.vpp_tun_spi
415         p.tun_if.remote_spi = p.scapy_tun_spi
416
417         config_tun_params(p, self.encryption_type, p.tun_if)
418
419         p.tun_sa_in = VppIpsecSA(self,
420                                  p.scapy_tun_sa_id,
421                                  p.scapy_tun_spi,
422                                  p.auth_algo_vpp_id,
423                                  p.auth_key,
424                                  p.crypt_algo_vpp_id,
425                                  p.crypt_key,
426                                  self.vpp_esp_protocol,
427                                  flags=p.flags,
428                                  salt=p.salt)
429         p.tun_sa_out = VppIpsecSA(self,
430                                   p.vpp_tun_sa_id,
431                                   p.vpp_tun_spi,
432                                   p.auth_algo_vpp_id,
433                                   p.auth_key,
434                                   p.crypt_algo_vpp_id,
435                                   p.crypt_key,
436                                   self.vpp_esp_protocol,
437                                   flags=p.flags,
438                                   salt=p.salt)
439         p.tun_sa_in.add_vpp_config()
440         p.tun_sa_out.add_vpp_config()
441
442         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
443                                          sa_id=p.tun_sa_in.id,
444                                          is_outbound=1)
445         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
446                                          sa_id=p.tun_sa_out.id,
447                                          is_outbound=0)
448         self.logger.info(self.vapi.cli("sh ipsec sa"))
449
450     def test_tun_44(self):
451         """IPSEC tunnel all algos """
452
453         # foreach VPP crypto engine
454         engines = ["ia32", "ipsecmb", "openssl"]
455
456         # foreach crypto algorithm
457         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
458                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
459                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
460                                 IPSEC_API_INTEG_ALG_NONE),
461                   'scapy-crypto': "AES-GCM",
462                   'scapy-integ': "NULL",
463                   'key': b"JPjyOWBeVEQiMe7h",
464                   'salt': 3333},
465                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
466                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
467                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
468                                 IPSEC_API_INTEG_ALG_NONE),
469                   'scapy-crypto': "AES-GCM",
470                   'scapy-integ': "NULL",
471                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
472                   'salt': 0},
473                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
474                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
475                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
476                                 IPSEC_API_INTEG_ALG_NONE),
477                   'scapy-crypto': "AES-GCM",
478                   'scapy-integ': "NULL",
479                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
480                   'salt': 9999},
481                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
482                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
483                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
484                                 IPSEC_API_INTEG_ALG_SHA1_96),
485                   'scapy-crypto': "AES-CBC",
486                   'scapy-integ': "HMAC-SHA1-96",
487                   'salt': 0,
488                   'key': b"JPjyOWBeVEQiMe7h"},
489                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
490                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
491                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
492                                 IPSEC_API_INTEG_ALG_SHA1_96),
493                   'scapy-crypto': "AES-CBC",
494                   'scapy-integ': "HMAC-SHA1-96",
495                   'salt': 0,
496                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
497                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
498                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
499                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
500                                 IPSEC_API_INTEG_ALG_SHA1_96),
501                   'scapy-crypto': "AES-CBC",
502                   'scapy-integ': "HMAC-SHA1-96",
503                   'salt': 0,
504                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
505                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
506                                  IPSEC_API_CRYPTO_ALG_NONE),
507                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
508                                 IPSEC_API_INTEG_ALG_SHA1_96),
509                   'scapy-crypto': "NULL",
510                   'scapy-integ': "HMAC-SHA1-96",
511                   'salt': 0,
512                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
513
514         for engine in engines:
515             self.vapi.cli("set crypto handler all %s" % engine)
516
517             #
518             # loop through each of the algorithms
519             #
520             for algo in algos:
521                 # with self.subTest(algo=algo['scapy']):
522
523                 p = copy.copy(self.ipv4_params)
524                 p.auth_algo_vpp_id = algo['vpp-integ']
525                 p.crypt_algo_vpp_id = algo['vpp-crypto']
526                 p.crypt_algo = algo['scapy-crypto']
527                 p.auth_algo = algo['scapy-integ']
528                 p.crypt_key = algo['key']
529                 p.salt = algo['salt']
530
531                 self.config_network(p)
532
533                 self.verify_tun_44(p, count=127)
534                 c = p.tun_if.get_rx_stats()
535                 self.assertEqual(c['packets'], 127)
536                 c = p.tun_if.get_tx_stats()
537                 self.assertEqual(c['packets'], 127)
538
539                 #
540                 # rekey the tunnel
541                 #
542                 self.rekey(p)
543                 self.verify_tun_44(p, count=127)
544
545                 self.unconfig_network(p)
546                 p.tun_sa_out.remove_vpp_config()
547                 p.tun_sa_in.remove_vpp_config()
548
549
550 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
551     """ IPsec IPv4 Tunnel interface all Algos """
552
553     encryption_type = ESP
554     tun4_encrypt_node_name = "esp4-encrypt-tun"
555     tun4_decrypt_node_name = "esp4-decrypt-tun"
556
557     def config_network(self, p):
558
559         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
560                               IPSEC_API_INTEG_ALG_NONE)
561         p.auth_algo = 'NULL'
562         p.auth_key = []
563
564         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
565                                IPSEC_API_CRYPTO_ALG_NONE)
566         p.crypt_algo = 'NULL'
567         p.crypt_key = []
568
569         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
570                                         p.scapy_tun_spi,
571                                         p.crypt_algo_vpp_id,
572                                         p.crypt_key, p.crypt_key,
573                                         p.auth_algo_vpp_id, p.auth_key,
574                                         p.auth_key,
575                                         salt=p.salt)
576         p.tun_if.add_vpp_config()
577         p.tun_if.admin_up()
578         p.tun_if.config_ip4()
579         config_tun_params(p, self.encryption_type, p.tun_if)
580         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
581         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
582
583         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
584                              [VppRoutePath(p.tun_if.remote_ip4,
585                                            0xffffffff)])
586         p.route.add_vpp_config()
587
588     def unconfig_network(self, p):
589         p.tun_if.unconfig_ip4()
590         p.tun_if.remove_vpp_config()
591         p.route.remove_vpp_config()
592
593     def setUp(self):
594         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
595
596         self.tun_if = self.pg0
597
598     def tearDown(self):
599         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
600
601     def test_tun_44(self):
602         p = self.ipv4_params
603
604         self.config_network(p)
605
606         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
607                            dst=p.remote_tun_if_host)
608         self.send_and_assert_no_replies(self.pg1, tx)
609
610         self.unconfig_network(p)
611
612
613 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
614     """ IPsec IPv6 Multi Tunnel interface """
615
616     encryption_type = ESP
617     tun6_encrypt_node_name = "esp6-encrypt-tun"
618     tun6_decrypt_node_name = "esp6-decrypt-tun"
619
620     def setUp(self):
621         super(TestIpsec6MultiTunIfEsp, self).setUp()
622
623         self.tun_if = self.pg0
624
625         self.multi_params = []
626         self.pg0.generate_remote_hosts(10)
627         self.pg0.configure_ipv6_neighbors()
628
629         for ii in range(10):
630             p = copy.copy(self.ipv6_params)
631
632             p.remote_tun_if_host = "1111::%d" % (ii + 1)
633             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
634             p.scapy_tun_spi = p.scapy_tun_spi + ii
635             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
636             p.vpp_tun_spi = p.vpp_tun_spi + ii
637
638             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
639             p.scapy_tra_spi = p.scapy_tra_spi + ii
640             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
641             p.vpp_tra_spi = p.vpp_tra_spi + ii
642
643             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
644                                             p.scapy_tun_spi,
645                                             p.crypt_algo_vpp_id,
646                                             p.crypt_key, p.crypt_key,
647                                             p.auth_algo_vpp_id, p.auth_key,
648                                             p.auth_key, is_ip6=True,
649                                             dst=self.pg0.remote_hosts[ii].ip6)
650             p.tun_if.add_vpp_config()
651             p.tun_if.admin_up()
652             p.tun_if.config_ip6()
653             config_tun_params(p, self.encryption_type, p.tun_if)
654             self.multi_params.append(p)
655
656             r = VppIpRoute(self, p.remote_tun_if_host, 128,
657                            [VppRoutePath(p.tun_if.remote_ip6,
658                                          0xffffffff,
659                                          proto=DpoProto.DPO_PROTO_IP6)])
660             r.add_vpp_config()
661
662     def tearDown(self):
663         super(TestIpsec6MultiTunIfEsp, self).tearDown()
664
665     def test_tun_66(self):
666         """Multiple IPSEC tunnel interfaces """
667         for p in self.multi_params:
668             self.verify_tun_66(p, count=127)
669             c = p.tun_if.get_rx_stats()
670             self.assertEqual(c['packets'], 127)
671             c = p.tun_if.get_tx_stats()
672             self.assertEqual(c['packets'], 127)
673
674
675 class TestIpsecGreTebIfEsp(TemplateIpsec,
676                            IpsecTun4Tests):
677     """ Ipsec GRE TEB ESP - TUN tests """
678     tun4_encrypt_node_name = "esp4-encrypt-tun"
679     tun4_decrypt_node_name = "esp4-decrypt-tun"
680     encryption_type = ESP
681     omac = "00:11:22:33:44:55"
682
683     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
684                          payload_size=100):
685         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
686                 sa.encrypt(IP(src=self.pg0.remote_ip4,
687                               dst=self.pg0.local_ip4) /
688                            GRE() /
689                            Ether(dst=self.omac) /
690                            IP(src="1.1.1.1", dst="1.1.1.2") /
691                            UDP(sport=1144, dport=2233) /
692                            Raw(b'X' * payload_size))
693                 for i in range(count)]
694
695     def gen_pkts(self, sw_intf, src, dst, count=1,
696                  payload_size=100):
697         return [Ether(dst=self.omac) /
698                 IP(src="1.1.1.1", dst="1.1.1.2") /
699                 UDP(sport=1144, dport=2233) /
700                 Raw(b'X' * payload_size)
701                 for i in range(count)]
702
703     def verify_decrypted(self, p, rxs):
704         for rx in rxs:
705             self.assert_equal(rx[Ether].dst, self.omac)
706             self.assert_equal(rx[IP].dst, "1.1.1.2")
707
708     def verify_encrypted(self, p, sa, rxs):
709         for rx in rxs:
710             try:
711                 pkt = sa.decrypt(rx[IP])
712                 if not pkt.haslayer(IP):
713                     pkt = IP(pkt[Raw].load)
714                 self.assert_packet_checksums_valid(pkt)
715                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
716                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
717                 self.assertTrue(pkt.haslayer(GRE))
718                 e = pkt[Ether]
719                 self.assertEqual(e[Ether].dst, self.omac)
720                 self.assertEqual(e[IP].dst, "1.1.1.2")
721             except (IndexError, AssertionError):
722                 self.logger.debug(ppp("Unexpected packet:", rx))
723                 try:
724                     self.logger.debug(ppp("Decrypted packet:", pkt))
725                 except:
726                     pass
727                 raise
728
729     def setUp(self):
730         super(TestIpsecGreTebIfEsp, self).setUp()
731
732         self.tun_if = self.pg0
733
734         p = self.ipv4_params
735
736         bd1 = VppBridgeDomain(self, 1)
737         bd1.add_vpp_config()
738
739         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
740                                   p.auth_algo_vpp_id, p.auth_key,
741                                   p.crypt_algo_vpp_id, p.crypt_key,
742                                   self.vpp_esp_protocol,
743                                   self.pg0.local_ip4,
744                                   self.pg0.remote_ip4)
745         p.tun_sa_out.add_vpp_config()
746
747         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
748                                  p.auth_algo_vpp_id, p.auth_key,
749                                  p.crypt_algo_vpp_id, p.crypt_key,
750                                  self.vpp_esp_protocol,
751                                  self.pg0.remote_ip4,
752                                  self.pg0.local_ip4)
753         p.tun_sa_in.add_vpp_config()
754
755         p.tun_if = VppGreInterface(self,
756                                    self.pg0.local_ip4,
757                                    self.pg0.remote_ip4,
758                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
759                                          GRE_API_TUNNEL_TYPE_TEB))
760         p.tun_if.add_vpp_config()
761
762         p.tun_protect = VppIpsecTunProtect(self,
763                                            p.tun_if,
764                                            p.tun_sa_out,
765                                            [p.tun_sa_in])
766
767         p.tun_protect.add_vpp_config()
768
769         p.tun_if.admin_up()
770         p.tun_if.config_ip4()
771         config_tun_params(p, self.encryption_type, p.tun_if)
772
773         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
774         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
775
776         self.vapi.cli("clear ipsec sa")
777
778     def tearDown(self):
779         p = self.ipv4_params
780         p.tun_if.unconfig_ip4()
781         super(TestIpsecGreTebIfEsp, self).tearDown()
782
783
784 class TestIpsecGreTebIfEspTra(TemplateIpsec,
785                               IpsecTun4Tests):
786     """ Ipsec GRE TEB ESP - Tra tests """
787     tun4_encrypt_node_name = "esp4-encrypt-tun"
788     tun4_decrypt_node_name = "esp4-decrypt-tun"
789     encryption_type = ESP
790     omac = "00:11:22:33:44:55"
791
792     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
793                          payload_size=100):
794         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
795                 sa.encrypt(IP(src=self.pg0.remote_ip4,
796                               dst=self.pg0.local_ip4) /
797                            GRE() /
798                            Ether(dst=self.omac) /
799                            IP(src="1.1.1.1", dst="1.1.1.2") /
800                            UDP(sport=1144, dport=2233) /
801                            Raw(b'X' * payload_size))
802                 for i in range(count)]
803
804     def gen_pkts(self, sw_intf, src, dst, count=1,
805                  payload_size=100):
806         return [Ether(dst=self.omac) /
807                 IP(src="1.1.1.1", dst="1.1.1.2") /
808                 UDP(sport=1144, dport=2233) /
809                 Raw(b'X' * payload_size)
810                 for i in range(count)]
811
812     def verify_decrypted(self, p, rxs):
813         for rx in rxs:
814             self.assert_equal(rx[Ether].dst, self.omac)
815             self.assert_equal(rx[IP].dst, "1.1.1.2")
816
817     def verify_encrypted(self, p, sa, rxs):
818         for rx in rxs:
819             try:
820                 pkt = sa.decrypt(rx[IP])
821                 if not pkt.haslayer(IP):
822                     pkt = IP(pkt[Raw].load)
823                 self.assert_packet_checksums_valid(pkt)
824                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
825                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
826                 self.assertTrue(pkt.haslayer(GRE))
827                 e = pkt[Ether]
828                 self.assertEqual(e[Ether].dst, self.omac)
829                 self.assertEqual(e[IP].dst, "1.1.1.2")
830             except (IndexError, AssertionError):
831                 self.logger.debug(ppp("Unexpected packet:", rx))
832                 try:
833                     self.logger.debug(ppp("Decrypted packet:", pkt))
834                 except:
835                     pass
836                 raise
837
838     def setUp(self):
839         super(TestIpsecGreTebIfEspTra, self).setUp()
840
841         self.tun_if = self.pg0
842
843         p = self.ipv4_params
844
845         bd1 = VppBridgeDomain(self, 1)
846         bd1.add_vpp_config()
847
848         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
849                                   p.auth_algo_vpp_id, p.auth_key,
850                                   p.crypt_algo_vpp_id, p.crypt_key,
851                                   self.vpp_esp_protocol)
852         p.tun_sa_out.add_vpp_config()
853
854         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
855                                  p.auth_algo_vpp_id, p.auth_key,
856                                  p.crypt_algo_vpp_id, p.crypt_key,
857                                  self.vpp_esp_protocol)
858         p.tun_sa_in.add_vpp_config()
859
860         p.tun_if = VppGreInterface(self,
861                                    self.pg0.local_ip4,
862                                    self.pg0.remote_ip4,
863                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
864                                          GRE_API_TUNNEL_TYPE_TEB))
865         p.tun_if.add_vpp_config()
866
867         p.tun_protect = VppIpsecTunProtect(self,
868                                            p.tun_if,
869                                            p.tun_sa_out,
870                                            [p.tun_sa_in])
871
872         p.tun_protect.add_vpp_config()
873
874         p.tun_if.admin_up()
875         p.tun_if.config_ip4()
876         config_tra_params(p, self.encryption_type, p.tun_if)
877
878         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
879         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
880
881         self.vapi.cli("clear ipsec sa")
882
883     def tearDown(self):
884         p = self.ipv4_params
885         p.tun_if.unconfig_ip4()
886         super(TestIpsecGreTebIfEspTra, self).tearDown()
887
888
889 class TestIpsecGreIfEsp(TemplateIpsec,
890                         IpsecTun4Tests):
891     """ Ipsec GRE ESP - TUN tests """
892     tun4_encrypt_node_name = "esp4-encrypt-tun"
893     tun4_decrypt_node_name = "esp4-decrypt-tun"
894     encryption_type = ESP
895
896     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
897                          payload_size=100):
898         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
899                 sa.encrypt(IP(src=self.pg0.remote_ip4,
900                               dst=self.pg0.local_ip4) /
901                            GRE() /
902                            IP(src=self.pg1.local_ip4,
903                               dst=self.pg1.remote_ip4) /
904                            UDP(sport=1144, dport=2233) /
905                            Raw(b'X' * payload_size))
906                 for i in range(count)]
907
908     def gen_pkts(self, sw_intf, src, dst, count=1,
909                  payload_size=100):
910         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
911                 IP(src="1.1.1.1", dst="1.1.1.2") /
912                 UDP(sport=1144, dport=2233) /
913                 Raw(b'X' * payload_size)
914                 for i in range(count)]
915
916     def verify_decrypted(self, p, rxs):
917         for rx in rxs:
918             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
919             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
920
921     def verify_encrypted(self, p, sa, rxs):
922         for rx in rxs:
923             try:
924                 pkt = sa.decrypt(rx[IP])
925                 if not pkt.haslayer(IP):
926                     pkt = IP(pkt[Raw].load)
927                 self.assert_packet_checksums_valid(pkt)
928                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
929                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
930                 self.assertTrue(pkt.haslayer(GRE))
931                 e = pkt[GRE]
932                 self.assertEqual(e[IP].dst, "1.1.1.2")
933             except (IndexError, AssertionError):
934                 self.logger.debug(ppp("Unexpected packet:", rx))
935                 try:
936                     self.logger.debug(ppp("Decrypted packet:", pkt))
937                 except:
938                     pass
939                 raise
940
941     def setUp(self):
942         super(TestIpsecGreIfEsp, self).setUp()
943
944         self.tun_if = self.pg0
945
946         p = self.ipv4_params
947
948         bd1 = VppBridgeDomain(self, 1)
949         bd1.add_vpp_config()
950
951         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
952                                   p.auth_algo_vpp_id, p.auth_key,
953                                   p.crypt_algo_vpp_id, p.crypt_key,
954                                   self.vpp_esp_protocol,
955                                   self.pg0.local_ip4,
956                                   self.pg0.remote_ip4)
957         p.tun_sa_out.add_vpp_config()
958
959         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
960                                  p.auth_algo_vpp_id, p.auth_key,
961                                  p.crypt_algo_vpp_id, p.crypt_key,
962                                  self.vpp_esp_protocol,
963                                  self.pg0.remote_ip4,
964                                  self.pg0.local_ip4)
965         p.tun_sa_in.add_vpp_config()
966
967         p.tun_if = VppGreInterface(self,
968                                    self.pg0.local_ip4,
969                                    self.pg0.remote_ip4)
970         p.tun_if.add_vpp_config()
971
972         p.tun_protect = VppIpsecTunProtect(self,
973                                            p.tun_if,
974                                            p.tun_sa_out,
975                                            [p.tun_sa_in])
976         p.tun_protect.add_vpp_config()
977
978         p.tun_if.admin_up()
979         p.tun_if.config_ip4()
980         config_tun_params(p, self.encryption_type, p.tun_if)
981
982         VppIpRoute(self, "1.1.1.2", 32,
983                    [VppRoutePath(p.tun_if.remote_ip4,
984                                  0xffffffff)]).add_vpp_config()
985
986     def tearDown(self):
987         p = self.ipv4_params
988         p.tun_if.unconfig_ip4()
989         super(TestIpsecGreIfEsp, self).tearDown()
990
991
992 class TestIpsecGreIfEspTra(TemplateIpsec,
993                            IpsecTun4Tests):
994     """ Ipsec GRE ESP - TRA tests """
995     tun4_encrypt_node_name = "esp4-encrypt-tun"
996     tun4_decrypt_node_name = "esp4-decrypt-tun"
997     encryption_type = ESP
998
999     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1000                          payload_size=100):
1001         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1002                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1003                               dst=self.pg0.local_ip4) /
1004                            GRE() /
1005                            IP(src=self.pg1.local_ip4,
1006                               dst=self.pg1.remote_ip4) /
1007                            UDP(sport=1144, dport=2233) /
1008                            Raw(b'X' * payload_size))
1009                 for i in range(count)]
1010
1011     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1012                                 payload_size=100):
1013         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1014                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1015                               dst=self.pg0.local_ip4) /
1016                            GRE() /
1017                            UDP(sport=1144, dport=2233) /
1018                            Raw(b'X' * payload_size))
1019                 for i in range(count)]
1020
1021     def gen_pkts(self, sw_intf, src, dst, count=1,
1022                  payload_size=100):
1023         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1024                 IP(src="1.1.1.1", dst="1.1.1.2") /
1025                 UDP(sport=1144, dport=2233) /
1026                 Raw(b'X' * payload_size)
1027                 for i in range(count)]
1028
1029     def verify_decrypted(self, p, rxs):
1030         for rx in rxs:
1031             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1032             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1033
1034     def verify_encrypted(self, p, sa, rxs):
1035         for rx in rxs:
1036             try:
1037                 pkt = sa.decrypt(rx[IP])
1038                 if not pkt.haslayer(IP):
1039                     pkt = IP(pkt[Raw].load)
1040                 self.assert_packet_checksums_valid(pkt)
1041                 self.assertTrue(pkt.haslayer(GRE))
1042                 e = pkt[GRE]
1043                 self.assertEqual(e[IP].dst, "1.1.1.2")
1044             except (IndexError, AssertionError):
1045                 self.logger.debug(ppp("Unexpected packet:", rx))
1046                 try:
1047                     self.logger.debug(ppp("Decrypted packet:", pkt))
1048                 except:
1049                     pass
1050                 raise
1051
1052     def setUp(self):
1053         super(TestIpsecGreIfEspTra, self).setUp()
1054
1055         self.tun_if = self.pg0
1056
1057         p = self.ipv4_params
1058
1059         bd1 = VppBridgeDomain(self, 1)
1060         bd1.add_vpp_config()
1061
1062         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1063                                   p.auth_algo_vpp_id, p.auth_key,
1064                                   p.crypt_algo_vpp_id, p.crypt_key,
1065                                   self.vpp_esp_protocol)
1066         p.tun_sa_out.add_vpp_config()
1067
1068         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1069                                  p.auth_algo_vpp_id, p.auth_key,
1070                                  p.crypt_algo_vpp_id, p.crypt_key,
1071                                  self.vpp_esp_protocol)
1072         p.tun_sa_in.add_vpp_config()
1073
1074         p.tun_if = VppGreInterface(self,
1075                                    self.pg0.local_ip4,
1076                                    self.pg0.remote_ip4)
1077         p.tun_if.add_vpp_config()
1078
1079         p.tun_protect = VppIpsecTunProtect(self,
1080                                            p.tun_if,
1081                                            p.tun_sa_out,
1082                                            [p.tun_sa_in])
1083         p.tun_protect.add_vpp_config()
1084
1085         p.tun_if.admin_up()
1086         p.tun_if.config_ip4()
1087         config_tra_params(p, self.encryption_type, p.tun_if)
1088
1089         VppIpRoute(self, "1.1.1.2", 32,
1090                    [VppRoutePath(p.tun_if.remote_ip4,
1091                                  0xffffffff)]).add_vpp_config()
1092
1093     def tearDown(self):
1094         p = self.ipv4_params
1095         p.tun_if.unconfig_ip4()
1096         super(TestIpsecGreIfEspTra, self).tearDown()
1097
1098     def test_gre_non_ip(self):
1099         p = self.ipv4_params
1100         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1101                                           src=p.remote_tun_if_host,
1102                                           dst=self.pg1.remote_ip6)
1103         self.send_and_assert_no_replies(self.tun_if, tx)
1104         node_name = ('/err/%s/unsupported payload' %
1105                      self.tun4_decrypt_node_name)
1106         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1107
1108
1109 class TestIpsecGre6IfEspTra(TemplateIpsec,
1110                             IpsecTun6Tests):
1111     """ Ipsec GRE ESP - TRA tests """
1112     tun6_encrypt_node_name = "esp6-encrypt-tun"
1113     tun6_decrypt_node_name = "esp6-decrypt-tun"
1114     encryption_type = ESP
1115
1116     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1117                           payload_size=100):
1118         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1119                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1120                                 dst=self.pg0.local_ip6) /
1121                            GRE() /
1122                            IPv6(src=self.pg1.local_ip6,
1123                                 dst=self.pg1.remote_ip6) /
1124                            UDP(sport=1144, dport=2233) /
1125                            Raw(b'X' * payload_size))
1126                 for i in range(count)]
1127
1128     def gen_pkts6(self, sw_intf, src, dst, count=1,
1129                   payload_size=100):
1130         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1131                 IPv6(src="1::1", dst="1::2") /
1132                 UDP(sport=1144, dport=2233) /
1133                 Raw(b'X' * payload_size)
1134                 for i in range(count)]
1135
1136     def verify_decrypted6(self, p, rxs):
1137         for rx in rxs:
1138             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1139             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1140
1141     def verify_encrypted6(self, p, sa, rxs):
1142         for rx in rxs:
1143             try:
1144                 pkt = sa.decrypt(rx[IPv6])
1145                 if not pkt.haslayer(IPv6):
1146                     pkt = IPv6(pkt[Raw].load)
1147                 self.assert_packet_checksums_valid(pkt)
1148                 self.assertTrue(pkt.haslayer(GRE))
1149                 e = pkt[GRE]
1150                 self.assertEqual(e[IPv6].dst, "1::2")
1151             except (IndexError, AssertionError):
1152                 self.logger.debug(ppp("Unexpected packet:", rx))
1153                 try:
1154                     self.logger.debug(ppp("Decrypted packet:", pkt))
1155                 except:
1156                     pass
1157                 raise
1158
1159     def setUp(self):
1160         super(TestIpsecGre6IfEspTra, self).setUp()
1161
1162         self.tun_if = self.pg0
1163
1164         p = self.ipv6_params
1165
1166         bd1 = VppBridgeDomain(self, 1)
1167         bd1.add_vpp_config()
1168
1169         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1170                                   p.auth_algo_vpp_id, p.auth_key,
1171                                   p.crypt_algo_vpp_id, p.crypt_key,
1172                                   self.vpp_esp_protocol)
1173         p.tun_sa_out.add_vpp_config()
1174
1175         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1176                                  p.auth_algo_vpp_id, p.auth_key,
1177                                  p.crypt_algo_vpp_id, p.crypt_key,
1178                                  self.vpp_esp_protocol)
1179         p.tun_sa_in.add_vpp_config()
1180
1181         p.tun_if = VppGreInterface(self,
1182                                    self.pg0.local_ip6,
1183                                    self.pg0.remote_ip6)
1184         p.tun_if.add_vpp_config()
1185
1186         p.tun_protect = VppIpsecTunProtect(self,
1187                                            p.tun_if,
1188                                            p.tun_sa_out,
1189                                            [p.tun_sa_in])
1190         p.tun_protect.add_vpp_config()
1191
1192         p.tun_if.admin_up()
1193         p.tun_if.config_ip6()
1194         config_tra_params(p, self.encryption_type, p.tun_if)
1195
1196         r = VppIpRoute(self, "1::2", 128,
1197                        [VppRoutePath(p.tun_if.remote_ip6,
1198                                      0xffffffff,
1199                                      proto=DpoProto.DPO_PROTO_IP6)])
1200         r.add_vpp_config()
1201
1202     def tearDown(self):
1203         p = self.ipv6_params
1204         p.tun_if.unconfig_ip6()
1205         super(TestIpsecGre6IfEspTra, self).tearDown()
1206
1207
1208 class TemplateIpsec4TunProtect(object):
1209     """ IPsec IPv4 Tunnel protect """
1210
1211     encryption_type = ESP
1212     tun4_encrypt_node_name = "esp4-encrypt-tun"
1213     tun4_decrypt_node_name = "esp4-decrypt-tun"
1214     tun4_input_node = "ipsec4-tun-input"
1215
1216     def config_sa_tra(self, p):
1217         config_tun_params(p, self.encryption_type, p.tun_if)
1218
1219         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1220                                   p.auth_algo_vpp_id, p.auth_key,
1221                                   p.crypt_algo_vpp_id, p.crypt_key,
1222                                   self.vpp_esp_protocol,
1223                                   flags=p.flags)
1224         p.tun_sa_out.add_vpp_config()
1225
1226         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1227                                  p.auth_algo_vpp_id, p.auth_key,
1228                                  p.crypt_algo_vpp_id, p.crypt_key,
1229                                  self.vpp_esp_protocol,
1230                                  flags=p.flags)
1231         p.tun_sa_in.add_vpp_config()
1232
1233     def config_sa_tun(self, p):
1234         config_tun_params(p, self.encryption_type, p.tun_if)
1235
1236         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1237                                   p.auth_algo_vpp_id, p.auth_key,
1238                                   p.crypt_algo_vpp_id, p.crypt_key,
1239                                   self.vpp_esp_protocol,
1240                                   self.tun_if.local_addr[p.addr_type],
1241                                   self.tun_if.remote_addr[p.addr_type],
1242                                   flags=p.flags)
1243         p.tun_sa_out.add_vpp_config()
1244
1245         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1246                                  p.auth_algo_vpp_id, p.auth_key,
1247                                  p.crypt_algo_vpp_id, p.crypt_key,
1248                                  self.vpp_esp_protocol,
1249                                  self.tun_if.remote_addr[p.addr_type],
1250                                  self.tun_if.local_addr[p.addr_type],
1251                                  flags=p.flags)
1252         p.tun_sa_in.add_vpp_config()
1253
1254     def config_protect(self, p):
1255         p.tun_protect = VppIpsecTunProtect(self,
1256                                            p.tun_if,
1257                                            p.tun_sa_out,
1258                                            [p.tun_sa_in])
1259         p.tun_protect.add_vpp_config()
1260
1261     def config_network(self, p):
1262         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1263                                        self.pg0.local_ip4,
1264                                        self.pg0.remote_ip4)
1265         p.tun_if.add_vpp_config()
1266         p.tun_if.admin_up()
1267         p.tun_if.config_ip4()
1268         p.tun_if.config_ip6()
1269
1270         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1271                              [VppRoutePath(p.tun_if.remote_ip4,
1272                                            0xffffffff)])
1273         p.route.add_vpp_config()
1274         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1275                        [VppRoutePath(p.tun_if.remote_ip6,
1276                                      0xffffffff,
1277                                      proto=DpoProto.DPO_PROTO_IP6)])
1278         r.add_vpp_config()
1279
1280     def unconfig_network(self, p):
1281         p.route.remove_vpp_config()
1282         p.tun_if.remove_vpp_config()
1283
1284     def unconfig_protect(self, p):
1285         p.tun_protect.remove_vpp_config()
1286
1287     def unconfig_sa(self, p):
1288         p.tun_sa_out.remove_vpp_config()
1289         p.tun_sa_in.remove_vpp_config()
1290
1291
1292 class TestIpsec4TunProtect(TemplateIpsec,
1293                            TemplateIpsec4TunProtect,
1294                            IpsecTun4):
1295     """ IPsec IPv4 Tunnel protect - transport mode"""
1296
1297     def setUp(self):
1298         super(TestIpsec4TunProtect, self).setUp()
1299
1300         self.tun_if = self.pg0
1301
1302     def tearDown(self):
1303         super(TestIpsec4TunProtect, self).tearDown()
1304
1305     def test_tun_44(self):
1306         """IPSEC tunnel protect"""
1307
1308         p = self.ipv4_params
1309
1310         self.config_network(p)
1311         self.config_sa_tra(p)
1312         self.config_protect(p)
1313
1314         self.verify_tun_44(p, count=127)
1315         c = p.tun_if.get_rx_stats()
1316         self.assertEqual(c['packets'], 127)
1317         c = p.tun_if.get_tx_stats()
1318         self.assertEqual(c['packets'], 127)
1319
1320         self.vapi.cli("clear ipsec sa")
1321         self.verify_tun_64(p, count=127)
1322         c = p.tun_if.get_rx_stats()
1323         self.assertEqual(c['packets'], 254)
1324         c = p.tun_if.get_tx_stats()
1325         self.assertEqual(c['packets'], 254)
1326
1327         # rekey - create new SAs and update the tunnel protection
1328         np = copy.copy(p)
1329         np.crypt_key = b'X' + p.crypt_key[1:]
1330         np.scapy_tun_spi += 100
1331         np.scapy_tun_sa_id += 1
1332         np.vpp_tun_spi += 100
1333         np.vpp_tun_sa_id += 1
1334         np.tun_if.local_spi = p.vpp_tun_spi
1335         np.tun_if.remote_spi = p.scapy_tun_spi
1336
1337         self.config_sa_tra(np)
1338         self.config_protect(np)
1339         self.unconfig_sa(p)
1340
1341         self.verify_tun_44(np, count=127)
1342         c = p.tun_if.get_rx_stats()
1343         self.assertEqual(c['packets'], 381)
1344         c = p.tun_if.get_tx_stats()
1345         self.assertEqual(c['packets'], 381)
1346
1347         # teardown
1348         self.unconfig_protect(np)
1349         self.unconfig_sa(np)
1350         self.unconfig_network(p)
1351
1352
1353 class TestIpsec4TunProtectUdp(TemplateIpsec,
1354                               TemplateIpsec4TunProtect,
1355                               IpsecTun4):
1356     """ IPsec IPv4 Tunnel protect - transport mode"""
1357
1358     def setUp(self):
1359         super(TestIpsec4TunProtectUdp, self).setUp()
1360
1361         self.tun_if = self.pg0
1362
1363         p = self.ipv4_params
1364         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1365                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1366         p.nat_header = UDP(sport=5454, dport=4500)
1367         self.config_network(p)
1368         self.config_sa_tra(p)
1369         self.config_protect(p)
1370
1371     def tearDown(self):
1372         p = self.ipv4_params
1373         self.unconfig_protect(p)
1374         self.unconfig_sa(p)
1375         self.unconfig_network(p)
1376         super(TestIpsec4TunProtectUdp, self).tearDown()
1377
1378     def test_tun_44(self):
1379         """IPSEC UDP tunnel protect"""
1380
1381         p = self.ipv4_params
1382
1383         self.verify_tun_44(p, count=127)
1384         c = p.tun_if.get_rx_stats()
1385         self.assertEqual(c['packets'], 127)
1386         c = p.tun_if.get_tx_stats()
1387         self.assertEqual(c['packets'], 127)
1388
1389     def test_keepalive(self):
1390         """ IPSEC NAT Keepalive """
1391         self.verify_keepalive(self.ipv4_params)
1392
1393
1394 class TestIpsec4TunProtectTun(TemplateIpsec,
1395                               TemplateIpsec4TunProtect,
1396                               IpsecTun4):
1397     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1398
1399     encryption_type = ESP
1400     tun4_encrypt_node_name = "esp4-encrypt-tun"
1401     tun4_decrypt_node_name = "esp4-decrypt-tun"
1402
1403     def setUp(self):
1404         super(TestIpsec4TunProtectTun, self).setUp()
1405
1406         self.tun_if = self.pg0
1407
1408     def tearDown(self):
1409         super(TestIpsec4TunProtectTun, self).tearDown()
1410
1411     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1412                          payload_size=100):
1413         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1414                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1415                               dst=sw_intf.local_ip4) /
1416                            IP(src=src, dst=dst) /
1417                            UDP(sport=1144, dport=2233) /
1418                            Raw(b'X' * payload_size))
1419                 for i in range(count)]
1420
1421     def gen_pkts(self, sw_intf, src, dst, count=1,
1422                  payload_size=100):
1423         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1424                 IP(src=src, dst=dst) /
1425                 UDP(sport=1144, dport=2233) /
1426                 Raw(b'X' * payload_size)
1427                 for i in range(count)]
1428
1429     def verify_decrypted(self, p, rxs):
1430         for rx in rxs:
1431             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1432             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1433             self.assert_packet_checksums_valid(rx)
1434
1435     def verify_encrypted(self, p, sa, rxs):
1436         for rx in rxs:
1437             try:
1438                 pkt = sa.decrypt(rx[IP])
1439                 if not pkt.haslayer(IP):
1440                     pkt = IP(pkt[Raw].load)
1441                 self.assert_packet_checksums_valid(pkt)
1442                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1443                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1444                 inner = pkt[IP].payload
1445                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1446
1447             except (IndexError, AssertionError):
1448                 self.logger.debug(ppp("Unexpected packet:", rx))
1449                 try:
1450                     self.logger.debug(ppp("Decrypted packet:", pkt))
1451                 except:
1452                     pass
1453                 raise
1454
1455     def test_tun_44(self):
1456         """IPSEC tunnel protect """
1457
1458         p = self.ipv4_params
1459
1460         self.config_network(p)
1461         self.config_sa_tun(p)
1462         self.config_protect(p)
1463
1464         self.verify_tun_44(p, count=127)
1465
1466         c = p.tun_if.get_rx_stats()
1467         self.assertEqual(c['packets'], 127)
1468         c = p.tun_if.get_tx_stats()
1469         self.assertEqual(c['packets'], 127)
1470
1471         # rekey - create new SAs and update the tunnel protection
1472         np = copy.copy(p)
1473         np.crypt_key = b'X' + p.crypt_key[1:]
1474         np.scapy_tun_spi += 100
1475         np.scapy_tun_sa_id += 1
1476         np.vpp_tun_spi += 100
1477         np.vpp_tun_sa_id += 1
1478         np.tun_if.local_spi = p.vpp_tun_spi
1479         np.tun_if.remote_spi = p.scapy_tun_spi
1480
1481         self.config_sa_tun(np)
1482         self.config_protect(np)
1483         self.unconfig_sa(p)
1484
1485         self.verify_tun_44(np, count=127)
1486         c = p.tun_if.get_rx_stats()
1487         self.assertEqual(c['packets'], 254)
1488         c = p.tun_if.get_tx_stats()
1489         self.assertEqual(c['packets'], 254)
1490
1491         # teardown
1492         self.unconfig_protect(np)
1493         self.unconfig_sa(np)
1494         self.unconfig_network(p)
1495
1496
1497 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1498                                   TemplateIpsec4TunProtect,
1499                                   IpsecTun4):
1500     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1501
1502     encryption_type = ESP
1503     tun4_encrypt_node_name = "esp4-encrypt-tun"
1504     tun4_decrypt_node_name = "esp4-decrypt-tun"
1505
1506     def setUp(self):
1507         super(TestIpsec4TunProtectTunDrop, self).setUp()
1508
1509         self.tun_if = self.pg0
1510
1511     def tearDown(self):
1512         super(TestIpsec4TunProtectTunDrop, self).tearDown()
1513
1514     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1515                          payload_size=100):
1516         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1517                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1518                               dst="5.5.5.5") /
1519                            IP(src=src, dst=dst) /
1520                            UDP(sport=1144, dport=2233) /
1521                            Raw(b'X' * payload_size))
1522                 for i in range(count)]
1523
1524     def test_tun_drop_44(self):
1525         """IPSEC tunnel protect bogus tunnel header """
1526
1527         p = self.ipv4_params
1528
1529         self.config_network(p)
1530         self.config_sa_tun(p)
1531         self.config_protect(p)
1532
1533         tx = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1534                                    src=p.remote_tun_if_host,
1535                                    dst=self.pg1.remote_ip4,
1536                                    count=63)
1537         self.send_and_assert_no_replies(self.tun_if, tx)
1538
1539         # teardown
1540         self.unconfig_protect(p)
1541         self.unconfig_sa(p)
1542         self.unconfig_network(p)
1543
1544
1545 class TemplateIpsec6TunProtect(object):
1546     """ IPsec IPv6 Tunnel protect """
1547
1548     def config_sa_tra(self, p):
1549         config_tun_params(p, self.encryption_type, p.tun_if)
1550
1551         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1552                                   p.auth_algo_vpp_id, p.auth_key,
1553                                   p.crypt_algo_vpp_id, p.crypt_key,
1554                                   self.vpp_esp_protocol)
1555         p.tun_sa_out.add_vpp_config()
1556
1557         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1558                                  p.auth_algo_vpp_id, p.auth_key,
1559                                  p.crypt_algo_vpp_id, p.crypt_key,
1560                                  self.vpp_esp_protocol)
1561         p.tun_sa_in.add_vpp_config()
1562
1563     def config_sa_tun(self, p):
1564         config_tun_params(p, self.encryption_type, p.tun_if)
1565
1566         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1567                                   p.auth_algo_vpp_id, p.auth_key,
1568                                   p.crypt_algo_vpp_id, p.crypt_key,
1569                                   self.vpp_esp_protocol,
1570                                   self.tun_if.local_addr[p.addr_type],
1571                                   self.tun_if.remote_addr[p.addr_type])
1572         p.tun_sa_out.add_vpp_config()
1573
1574         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1575                                  p.auth_algo_vpp_id, p.auth_key,
1576                                  p.crypt_algo_vpp_id, p.crypt_key,
1577                                  self.vpp_esp_protocol,
1578                                  self.tun_if.remote_addr[p.addr_type],
1579                                  self.tun_if.local_addr[p.addr_type])
1580         p.tun_sa_in.add_vpp_config()
1581
1582     def config_protect(self, p):
1583         p.tun_protect = VppIpsecTunProtect(self,
1584                                            p.tun_if,
1585                                            p.tun_sa_out,
1586                                            [p.tun_sa_in])
1587         p.tun_protect.add_vpp_config()
1588
1589     def config_network(self, p):
1590         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1591                                        self.pg0.local_ip6,
1592                                        self.pg0.remote_ip6)
1593         p.tun_if.add_vpp_config()
1594         p.tun_if.admin_up()
1595         p.tun_if.config_ip6()
1596         p.tun_if.config_ip4()
1597
1598         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1599                              [VppRoutePath(p.tun_if.remote_ip6,
1600                                            0xffffffff,
1601                                            proto=DpoProto.DPO_PROTO_IP6)])
1602         p.route.add_vpp_config()
1603         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1604                        [VppRoutePath(p.tun_if.remote_ip4,
1605                                      0xffffffff)])
1606         r.add_vpp_config()
1607
1608     def unconfig_network(self, p):
1609         p.route.remove_vpp_config()
1610         p.tun_if.remove_vpp_config()
1611
1612     def unconfig_protect(self, p):
1613         p.tun_protect.remove_vpp_config()
1614
1615     def unconfig_sa(self, p):
1616         p.tun_sa_out.remove_vpp_config()
1617         p.tun_sa_in.remove_vpp_config()
1618
1619
1620 class TestIpsec6TunProtect(TemplateIpsec,
1621                            TemplateIpsec6TunProtect,
1622                            IpsecTun6):
1623     """ IPsec IPv6 Tunnel protect - transport mode"""
1624
1625     encryption_type = ESP
1626     tun6_encrypt_node_name = "esp6-encrypt-tun"
1627     tun6_decrypt_node_name = "esp6-decrypt-tun"
1628
1629     def setUp(self):
1630         super(TestIpsec6TunProtect, self).setUp()
1631
1632         self.tun_if = self.pg0
1633
1634     def tearDown(self):
1635         super(TestIpsec6TunProtect, self).tearDown()
1636
1637     def test_tun_66(self):
1638         """IPSEC tunnel protect 6o6"""
1639
1640         p = self.ipv6_params
1641
1642         self.config_network(p)
1643         self.config_sa_tra(p)
1644         self.config_protect(p)
1645
1646         self.verify_tun_66(p, count=127)
1647         c = p.tun_if.get_rx_stats()
1648         self.assertEqual(c['packets'], 127)
1649         c = p.tun_if.get_tx_stats()
1650         self.assertEqual(c['packets'], 127)
1651
1652         # rekey - create new SAs and update the tunnel protection
1653         np = copy.copy(p)
1654         np.crypt_key = b'X' + p.crypt_key[1:]
1655         np.scapy_tun_spi += 100
1656         np.scapy_tun_sa_id += 1
1657         np.vpp_tun_spi += 100
1658         np.vpp_tun_sa_id += 1
1659         np.tun_if.local_spi = p.vpp_tun_spi
1660         np.tun_if.remote_spi = p.scapy_tun_spi
1661
1662         self.config_sa_tra(np)
1663         self.config_protect(np)
1664         self.unconfig_sa(p)
1665
1666         self.verify_tun_66(np, count=127)
1667         c = p.tun_if.get_rx_stats()
1668         self.assertEqual(c['packets'], 254)
1669         c = p.tun_if.get_tx_stats()
1670         self.assertEqual(c['packets'], 254)
1671
1672         # bounce the interface state
1673         p.tun_if.admin_down()
1674         self.verify_drop_tun_66(np, count=127)
1675         node = ('/err/ipsec6-tun-input/%s' %
1676                 'ipsec packets received on disabled interface')
1677         self.assertEqual(127, self.statistics.get_err_counter(node))
1678         p.tun_if.admin_up()
1679         self.verify_tun_66(np, count=127)
1680
1681         # 3 phase rekey
1682         #  1) add two input SAs [old, new]
1683         #  2) swap output SA to [new]
1684         #  3) use only [new] input SA
1685         np3 = copy.copy(np)
1686         np3.crypt_key = b'Z' + p.crypt_key[1:]
1687         np3.scapy_tun_spi += 100
1688         np3.scapy_tun_sa_id += 1
1689         np3.vpp_tun_spi += 100
1690         np3.vpp_tun_sa_id += 1
1691         np3.tun_if.local_spi = p.vpp_tun_spi
1692         np3.tun_if.remote_spi = p.scapy_tun_spi
1693
1694         self.config_sa_tra(np3)
1695
1696         # step 1;
1697         p.tun_protect.update_vpp_config(np.tun_sa_out,
1698                                         [np.tun_sa_in, np3.tun_sa_in])
1699         self.verify_tun_66(np, np, count=127)
1700         self.verify_tun_66(np3, np, count=127)
1701
1702         # step 2;
1703         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1704                                         [np.tun_sa_in, np3.tun_sa_in])
1705         self.verify_tun_66(np, np3, count=127)
1706         self.verify_tun_66(np3, np3, count=127)
1707
1708         # step 1;
1709         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1710                                         [np3.tun_sa_in])
1711         self.verify_tun_66(np3, np3, count=127)
1712         self.verify_drop_tun_66(np, count=127)
1713
1714         c = p.tun_if.get_rx_stats()
1715         self.assertEqual(c['packets'], 127*9)
1716         c = p.tun_if.get_tx_stats()
1717         self.assertEqual(c['packets'], 127*8)
1718         self.unconfig_sa(np)
1719
1720         # teardown
1721         self.unconfig_protect(np3)
1722         self.unconfig_sa(np3)
1723         self.unconfig_network(p)
1724
1725     def test_tun_46(self):
1726         """IPSEC tunnel protect 4o6"""
1727
1728         p = self.ipv6_params
1729
1730         self.config_network(p)
1731         self.config_sa_tra(p)
1732         self.config_protect(p)
1733
1734         self.verify_tun_46(p, count=127)
1735         c = p.tun_if.get_rx_stats()
1736         self.assertEqual(c['packets'], 127)
1737         c = p.tun_if.get_tx_stats()
1738         self.assertEqual(c['packets'], 127)
1739
1740         # teardown
1741         self.unconfig_protect(p)
1742         self.unconfig_sa(p)
1743         self.unconfig_network(p)
1744
1745
1746 class TestIpsec6TunProtectTun(TemplateIpsec,
1747                               TemplateIpsec6TunProtect,
1748                               IpsecTun6):
1749     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1750
1751     encryption_type = ESP
1752     tun6_encrypt_node_name = "esp6-encrypt-tun"
1753     tun6_decrypt_node_name = "esp6-decrypt-tun"
1754
1755     def setUp(self):
1756         super(TestIpsec6TunProtectTun, self).setUp()
1757
1758         self.tun_if = self.pg0
1759
1760     def tearDown(self):
1761         super(TestIpsec6TunProtectTun, self).tearDown()
1762
1763     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1764                           payload_size=100):
1765         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1766                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1767                                 dst=sw_intf.local_ip6) /
1768                            IPv6(src=src, dst=dst) /
1769                            UDP(sport=1166, dport=2233) /
1770                            Raw(b'X' * payload_size))
1771                 for i in range(count)]
1772
1773     def gen_pkts6(self, sw_intf, src, dst, count=1,
1774                   payload_size=100):
1775         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1776                 IPv6(src=src, dst=dst) /
1777                 UDP(sport=1166, dport=2233) /
1778                 Raw(b'X' * payload_size)
1779                 for i in range(count)]
1780
1781     def verify_decrypted6(self, p, rxs):
1782         for rx in rxs:
1783             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1784             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1785             self.assert_packet_checksums_valid(rx)
1786
1787     def verify_encrypted6(self, p, sa, rxs):
1788         for rx in rxs:
1789             try:
1790                 pkt = sa.decrypt(rx[IPv6])
1791                 if not pkt.haslayer(IPv6):
1792                     pkt = IPv6(pkt[Raw].load)
1793                 self.assert_packet_checksums_valid(pkt)
1794                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1795                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1796                 inner = pkt[IPv6].payload
1797                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1798
1799             except (IndexError, AssertionError):
1800                 self.logger.debug(ppp("Unexpected packet:", rx))
1801                 try:
1802                     self.logger.debug(ppp("Decrypted packet:", pkt))
1803                 except:
1804                     pass
1805                 raise
1806
1807     def test_tun_66(self):
1808         """IPSEC tunnel protect """
1809
1810         p = self.ipv6_params
1811
1812         self.config_network(p)
1813         self.config_sa_tun(p)
1814         self.config_protect(p)
1815
1816         self.verify_tun_66(p, count=127)
1817
1818         c = p.tun_if.get_rx_stats()
1819         self.assertEqual(c['packets'], 127)
1820         c = p.tun_if.get_tx_stats()
1821         self.assertEqual(c['packets'], 127)
1822
1823         # rekey - create new SAs and update the tunnel protection
1824         np = copy.copy(p)
1825         np.crypt_key = b'X' + p.crypt_key[1:]
1826         np.scapy_tun_spi += 100
1827         np.scapy_tun_sa_id += 1
1828         np.vpp_tun_spi += 100
1829         np.vpp_tun_sa_id += 1
1830         np.tun_if.local_spi = p.vpp_tun_spi
1831         np.tun_if.remote_spi = p.scapy_tun_spi
1832
1833         self.config_sa_tun(np)
1834         self.config_protect(np)
1835         self.unconfig_sa(p)
1836
1837         self.verify_tun_66(np, count=127)
1838         c = p.tun_if.get_rx_stats()
1839         self.assertEqual(c['packets'], 254)
1840         c = p.tun_if.get_tx_stats()
1841         self.assertEqual(c['packets'], 254)
1842
1843         # teardown
1844         self.unconfig_protect(np)
1845         self.unconfig_sa(np)
1846         self.unconfig_network(p)
1847
1848
1849 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
1850                                   TemplateIpsec6TunProtect,
1851                                   IpsecTun6):
1852     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
1853
1854     encryption_type = ESP
1855     tun6_encrypt_node_name = "esp6-encrypt-tun"
1856     tun6_decrypt_node_name = "esp6-decrypt-tun"
1857
1858     def setUp(self):
1859         super(TestIpsec6TunProtectTunDrop, self).setUp()
1860
1861         self.tun_if = self.pg0
1862
1863     def tearDown(self):
1864         super(TestIpsec6TunProtectTunDrop, self).tearDown()
1865
1866     def gen_encrypt_pkts5(self, sa, sw_intf, src, dst, count=1,
1867                           payload_size=100):
1868         # the IP destination of the revelaed packet does not match
1869         # that assigned to the tunnel
1870         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1871                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1872                                 dst="5::5") /
1873                            IPv6(src=src, dst=dst) /
1874                            UDP(sport=1144, dport=2233) /
1875                            Raw(b'X' * payload_size))
1876                 for i in range(count)]
1877
1878     def test_tun_drop_66(self):
1879         """IPSEC 6 tunnel protect bogus tunnel header """
1880
1881         p = self.ipv6_params
1882
1883         self.config_network(p)
1884         self.config_sa_tun(p)
1885         self.config_protect(p)
1886
1887         tx = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1888                                     src=p.remote_tun_if_host,
1889                                     dst=self.pg1.remote_ip6,
1890                                     count=63)
1891         self.send_and_assert_no_replies(self.tun_if, tx)
1892
1893         self.unconfig_protect(p)
1894         self.unconfig_sa(p)
1895         self.unconfig_network(p)
1896
1897
1898 if __name__ == '__main__':
1899     unittest.main(testRunner=VppTestRunner)