55e85b1a4b2021bda7bbc86e0410d755e77aae85
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
22 from util import ppp
23 from vpp_papi import VppEnum
24
25
26 def config_tun_params(p, encryption_type, tun_if):
27     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29                              IPSEC_API_SAD_FLAG_USE_ESN))
30     crypt_key = mk_scapy_crypt_key(p)
31     p.tun_dst = tun_if.remote_ip
32     p.tun_src = tun_if.local_ip
33     p.scapy_tun_sa = SecurityAssociation(
34         encryption_type, spi=p.vpp_tun_spi,
35         crypt_algo=p.crypt_algo,
36         crypt_key=crypt_key,
37         auth_algo=p.auth_algo, auth_key=p.auth_key,
38         tunnel_header=ip_class_by_addr_type[p.addr_type](
39             src=p.tun_dst,
40             dst=p.tun_src),
41         nat_t_header=p.nat_header,
42         esn_en=esn_en)
43     p.vpp_tun_sa = SecurityAssociation(
44         encryption_type, spi=p.scapy_tun_spi,
45         crypt_algo=p.crypt_algo,
46         crypt_key=crypt_key,
47         auth_algo=p.auth_algo, auth_key=p.auth_key,
48         tunnel_header=ip_class_by_addr_type[p.addr_type](
49             dst=p.tun_dst,
50             src=p.tun_src),
51         nat_t_header=p.nat_header,
52         esn_en=esn_en)
53
54
55 def config_tra_params(p, encryption_type, tun_if):
56     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58                              IPSEC_API_SAD_FLAG_USE_ESN))
59     crypt_key = mk_scapy_crypt_key(p)
60     p.tun_dst = tun_if.remote_ip
61     p.tun_src = tun_if.local_ip
62     p.scapy_tun_sa = SecurityAssociation(
63         encryption_type, spi=p.vpp_tun_spi,
64         crypt_algo=p.crypt_algo,
65         crypt_key=crypt_key,
66         auth_algo=p.auth_algo, auth_key=p.auth_key,
67         esn_en=esn_en)
68     p.vpp_tun_sa = SecurityAssociation(
69         encryption_type, spi=p.scapy_tun_spi,
70         crypt_algo=p.crypt_algo,
71         crypt_key=crypt_key,
72         auth_algo=p.auth_algo, auth_key=p.auth_key,
73         esn_en=esn_en)
74
75
76 class TemplateIpsec4TunIfEsp(TemplateIpsec):
77     """ IPsec tunnel interface tests """
78
79     encryption_type = ESP
80
81     @classmethod
82     def setUpClass(cls):
83         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
84
85     @classmethod
86     def tearDownClass(cls):
87         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
88
89     def setUp(self):
90         super(TemplateIpsec4TunIfEsp, self).setUp()
91
92         self.tun_if = self.pg0
93
94         p = self.ipv4_params
95
96         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
98                                         p.crypt_key, p.crypt_key,
99                                         p.auth_algo_vpp_id, p.auth_key,
100                                         p.auth_key)
101         p.tun_if.add_vpp_config()
102         p.tun_if.admin_up()
103         p.tun_if.config_ip4()
104         p.tun_if.config_ip6()
105         config_tun_params(p, self.encryption_type, p.tun_if)
106
107         r = VppIpRoute(self, p.remote_tun_if_host, 32,
108                        [VppRoutePath(p.tun_if.remote_ip4,
109                                      0xffffffff)])
110         r.add_vpp_config()
111         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
112                        [VppRoutePath(p.tun_if.remote_ip6,
113                                      0xffffffff,
114                                      proto=DpoProto.DPO_PROTO_IP6)])
115         r.add_vpp_config()
116
117     def tearDown(self):
118         super(TemplateIpsec4TunIfEsp, self).tearDown()
119
120
121 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
122     """ IPsec UDP tunnel interface tests """
123
124     tun4_encrypt_node_name = "esp4-encrypt-tun"
125     tun4_decrypt_node_name = "esp4-decrypt-tun"
126     encryption_type = ESP
127
128     @classmethod
129     def setUpClass(cls):
130         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
131
132     @classmethod
133     def tearDownClass(cls):
134         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
135
136     def setUp(self):
137         super(TemplateIpsec4TunIfEspUdp, self).setUp()
138
139         self.tun_if = self.pg0
140
141         p = self.ipv4_params
142         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
143                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
144         p.nat_header = UDP(sport=5454, dport=4500)
145
146         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
147                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
148                                         p.crypt_key, p.crypt_key,
149                                         p.auth_algo_vpp_id, p.auth_key,
150                                         p.auth_key, udp_encap=True)
151         p.tun_if.add_vpp_config()
152         p.tun_if.admin_up()
153         p.tun_if.config_ip4()
154         p.tun_if.config_ip6()
155         config_tun_params(p, self.encryption_type, p.tun_if)
156
157         r = VppIpRoute(self, p.remote_tun_if_host, 32,
158                        [VppRoutePath(p.tun_if.remote_ip4,
159                                      0xffffffff)])
160         r.add_vpp_config()
161         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
162                        [VppRoutePath(p.tun_if.remote_ip6,
163                                      0xffffffff,
164                                      proto=DpoProto.DPO_PROTO_IP6)])
165         r.add_vpp_config()
166
167     def tearDown(self):
168         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
169
170
171 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
172     """ Ipsec ESP - TUN tests """
173     tun4_encrypt_node_name = "esp4-encrypt-tun"
174     tun4_decrypt_node_name = "esp4-decrypt-tun"
175
176     def test_tun_basic64(self):
177         """ ipsec 6o4 tunnel basic test """
178         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
179
180         self.verify_tun_64(self.params[socket.AF_INET], count=1)
181
182     def test_tun_burst64(self):
183         """ ipsec 6o4 tunnel basic test """
184         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
185
186         self.verify_tun_64(self.params[socket.AF_INET], count=257)
187
188     def test_tun_basic_frag44(self):
189         """ ipsec 4o4 tunnel frag basic test """
190         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
191
192         p = self.ipv4_params
193
194         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
195                                        [1500, 0, 0, 0])
196         self.verify_tun_44(self.params[socket.AF_INET],
197                            count=1, payload_size=1800, n_rx=2)
198         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
199                                        [9000, 0, 0, 0])
200
201
202 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
203     """ Ipsec ESP UDP tests """
204
205     tun4_input_node = "ipsec4-tun-input"
206
207     def test_keepalive(self):
208         """ IPSEC NAT Keepalive """
209         self.verify_keepalive(self.ipv4_params)
210
211
212 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
213     """ Ipsec ESP - TCP tests """
214     pass
215
216
217 class TemplateIpsec6TunIfEsp(TemplateIpsec):
218     """ IPsec tunnel interface tests """
219
220     encryption_type = ESP
221
222     def setUp(self):
223         super(TemplateIpsec6TunIfEsp, self).setUp()
224
225         self.tun_if = self.pg0
226
227         p = self.ipv6_params
228         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
229                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
230                                         p.crypt_key, p.crypt_key,
231                                         p.auth_algo_vpp_id, p.auth_key,
232                                         p.auth_key, is_ip6=True)
233         p.tun_if.add_vpp_config()
234         p.tun_if.admin_up()
235         p.tun_if.config_ip6()
236         p.tun_if.config_ip4()
237         config_tun_params(p, self.encryption_type, p.tun_if)
238
239         r = VppIpRoute(self, p.remote_tun_if_host, 128,
240                        [VppRoutePath(p.tun_if.remote_ip6,
241                                      0xffffffff,
242                                      proto=DpoProto.DPO_PROTO_IP6)])
243         r.add_vpp_config()
244         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
245                        [VppRoutePath(p.tun_if.remote_ip4,
246                                      0xffffffff)])
247         r.add_vpp_config()
248
249     def tearDown(self):
250         super(TemplateIpsec6TunIfEsp, self).tearDown()
251
252
253 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
254                           IpsecTun6Tests):
255     """ Ipsec ESP - TUN tests """
256     tun6_encrypt_node_name = "esp6-encrypt-tun"
257     tun6_decrypt_node_name = "esp6-decrypt-tun"
258
259     def test_tun_basic46(self):
260         """ ipsec 4o6 tunnel basic test """
261         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
262         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
263
264     def test_tun_burst46(self):
265         """ ipsec 4o6 tunnel burst test """
266         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
267         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
268
269
270 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
271                                 IpsecTun6HandoffTests):
272     """ Ipsec ESP 6 Handoff tests """
273     tun6_encrypt_node_name = "esp6-encrypt-tun"
274     tun6_decrypt_node_name = "esp6-decrypt-tun"
275
276
277 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
278                                 IpsecTun4HandoffTests):
279     """ Ipsec ESP 4 Handoff tests """
280     tun4_encrypt_node_name = "esp4-encrypt-tun"
281     tun4_decrypt_node_name = "esp4-decrypt-tun"
282
283
284 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
285     """ IPsec IPv4 Multi Tunnel interface """
286
287     encryption_type = ESP
288     tun4_encrypt_node_name = "esp4-encrypt-tun"
289     tun4_decrypt_node_name = "esp4-decrypt-tun"
290
291     def setUp(self):
292         super(TestIpsec4MultiTunIfEsp, self).setUp()
293
294         self.tun_if = self.pg0
295
296         self.multi_params = []
297         self.pg0.generate_remote_hosts(10)
298         self.pg0.configure_ipv4_neighbors()
299
300         for ii in range(10):
301             p = copy.copy(self.ipv4_params)
302
303             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
304             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
305             p.scapy_tun_spi = p.scapy_tun_spi + ii
306             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
307             p.vpp_tun_spi = p.vpp_tun_spi + ii
308
309             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
310             p.scapy_tra_spi = p.scapy_tra_spi + ii
311             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
312             p.vpp_tra_spi = p.vpp_tra_spi + ii
313             p.tun_dst = self.pg0.remote_hosts[ii].ip4
314
315             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
316                                             p.scapy_tun_spi,
317                                             p.crypt_algo_vpp_id,
318                                             p.crypt_key, p.crypt_key,
319                                             p.auth_algo_vpp_id, p.auth_key,
320                                             p.auth_key,
321                                             dst=p.tun_dst)
322             p.tun_if.add_vpp_config()
323             p.tun_if.admin_up()
324             p.tun_if.config_ip4()
325             config_tun_params(p, self.encryption_type, p.tun_if)
326             self.multi_params.append(p)
327
328             VppIpRoute(self, p.remote_tun_if_host, 32,
329                        [VppRoutePath(p.tun_if.remote_ip4,
330                                      0xffffffff)]).add_vpp_config()
331
332     def tearDown(self):
333         super(TestIpsec4MultiTunIfEsp, self).tearDown()
334
335     def test_tun_44(self):
336         """Multiple IPSEC tunnel interfaces """
337         for p in self.multi_params:
338             self.verify_tun_44(p, count=127)
339             c = p.tun_if.get_rx_stats()
340             self.assertEqual(c['packets'], 127)
341             c = p.tun_if.get_tx_stats()
342             self.assertEqual(c['packets'], 127)
343
344     def test_tun_rr_44(self):
345         """ Round-robin packets acrros multiple interface """
346         tx = []
347         for p in self.multi_params:
348             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
349                                             src=p.remote_tun_if_host,
350                                             dst=self.pg1.remote_ip4)
351         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
352
353         for rx, p in zip(rxs, self.multi_params):
354             self.verify_decrypted(p, [rx])
355
356         tx = []
357         for p in self.multi_params:
358             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
359                                     dst=p.remote_tun_if_host)
360         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
361
362         for rx, p in zip(rxs, self.multi_params):
363             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
364
365
366 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
367     """ IPsec IPv4 Tunnel interface all Algos """
368
369     encryption_type = ESP
370     tun4_encrypt_node_name = "esp4-encrypt-tun"
371     tun4_decrypt_node_name = "esp4-decrypt-tun"
372
373     def config_network(self, p):
374
375         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
376                                         p.scapy_tun_spi,
377                                         p.crypt_algo_vpp_id,
378                                         p.crypt_key, p.crypt_key,
379                                         p.auth_algo_vpp_id, p.auth_key,
380                                         p.auth_key,
381                                         salt=p.salt)
382         p.tun_if.add_vpp_config()
383         p.tun_if.admin_up()
384         p.tun_if.config_ip4()
385         config_tun_params(p, self.encryption_type, p.tun_if)
386         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
387         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
388
389         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
390                              [VppRoutePath(p.tun_if.remote_ip4,
391                                            0xffffffff)])
392         p.route.add_vpp_config()
393
394     def unconfig_network(self, p):
395         p.tun_if.unconfig_ip4()
396         p.tun_if.remove_vpp_config()
397         p.route.remove_vpp_config()
398
399     def setUp(self):
400         super(TestIpsec4TunIfEspAll, self).setUp()
401
402         self.tun_if = self.pg0
403
404     def tearDown(self):
405         super(TestIpsec4TunIfEspAll, self).tearDown()
406
407     def rekey(self, p):
408         #
409         # change the key and the SPI
410         #
411         p.crypt_key = b'X' + p.crypt_key[1:]
412         p.scapy_tun_spi += 1
413         p.scapy_tun_sa_id += 1
414         p.vpp_tun_spi += 1
415         p.vpp_tun_sa_id += 1
416         p.tun_if.local_spi = p.vpp_tun_spi
417         p.tun_if.remote_spi = p.scapy_tun_spi
418
419         config_tun_params(p, self.encryption_type, p.tun_if)
420
421         p.tun_sa_in = VppIpsecSA(self,
422                                  p.scapy_tun_sa_id,
423                                  p.scapy_tun_spi,
424                                  p.auth_algo_vpp_id,
425                                  p.auth_key,
426                                  p.crypt_algo_vpp_id,
427                                  p.crypt_key,
428                                  self.vpp_esp_protocol,
429                                  flags=p.flags,
430                                  salt=p.salt)
431         p.tun_sa_out = VppIpsecSA(self,
432                                   p.vpp_tun_sa_id,
433                                   p.vpp_tun_spi,
434                                   p.auth_algo_vpp_id,
435                                   p.auth_key,
436                                   p.crypt_algo_vpp_id,
437                                   p.crypt_key,
438                                   self.vpp_esp_protocol,
439                                   flags=p.flags,
440                                   salt=p.salt)
441         p.tun_sa_in.add_vpp_config()
442         p.tun_sa_out.add_vpp_config()
443
444         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
445                                          sa_id=p.tun_sa_in.id,
446                                          is_outbound=1)
447         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
448                                          sa_id=p.tun_sa_out.id,
449                                          is_outbound=0)
450         self.logger.info(self.vapi.cli("sh ipsec sa"))
451
452     def test_tun_44(self):
453         """IPSEC tunnel all algos """
454
455         # foreach VPP crypto engine
456         engines = ["ia32", "ipsecmb", "openssl"]
457
458         # foreach crypto algorithm
459         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
460                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
461                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
462                                 IPSEC_API_INTEG_ALG_NONE),
463                   'scapy-crypto': "AES-GCM",
464                   'scapy-integ': "NULL",
465                   'key': b"JPjyOWBeVEQiMe7h",
466                   'salt': 3333},
467                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
468                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
469                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
470                                 IPSEC_API_INTEG_ALG_NONE),
471                   'scapy-crypto': "AES-GCM",
472                   'scapy-integ': "NULL",
473                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
474                   'salt': 0},
475                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
476                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
477                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
478                                 IPSEC_API_INTEG_ALG_NONE),
479                   'scapy-crypto': "AES-GCM",
480                   'scapy-integ': "NULL",
481                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
482                   'salt': 9999},
483                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
484                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
485                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
486                                 IPSEC_API_INTEG_ALG_SHA1_96),
487                   'scapy-crypto': "AES-CBC",
488                   'scapy-integ': "HMAC-SHA1-96",
489                   'salt': 0,
490                   'key': b"JPjyOWBeVEQiMe7h"},
491                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
492                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
493                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
494                                 IPSEC_API_INTEG_ALG_SHA1_96),
495                   'scapy-crypto': "AES-CBC",
496                   'scapy-integ': "HMAC-SHA1-96",
497                   'salt': 0,
498                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
499                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
500                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
501                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
502                                 IPSEC_API_INTEG_ALG_SHA1_96),
503                   'scapy-crypto': "AES-CBC",
504                   'scapy-integ': "HMAC-SHA1-96",
505                   'salt': 0,
506                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
507                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
508                                  IPSEC_API_CRYPTO_ALG_NONE),
509                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
510                                 IPSEC_API_INTEG_ALG_SHA1_96),
511                   'scapy-crypto': "NULL",
512                   'scapy-integ': "HMAC-SHA1-96",
513                   'salt': 0,
514                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
515
516         for engine in engines:
517             self.vapi.cli("set crypto handler all %s" % engine)
518
519             #
520             # loop through each of the algorithms
521             #
522             for algo in algos:
523                 # with self.subTest(algo=algo['scapy']):
524
525                 p = copy.copy(self.ipv4_params)
526                 p.auth_algo_vpp_id = algo['vpp-integ']
527                 p.crypt_algo_vpp_id = algo['vpp-crypto']
528                 p.crypt_algo = algo['scapy-crypto']
529                 p.auth_algo = algo['scapy-integ']
530                 p.crypt_key = algo['key']
531                 p.salt = algo['salt']
532
533                 self.config_network(p)
534
535                 self.verify_tun_44(p, count=127)
536                 c = p.tun_if.get_rx_stats()
537                 self.assertEqual(c['packets'], 127)
538                 c = p.tun_if.get_tx_stats()
539                 self.assertEqual(c['packets'], 127)
540
541                 #
542                 # rekey the tunnel
543                 #
544                 self.rekey(p)
545                 self.verify_tun_44(p, count=127)
546
547                 self.unconfig_network(p)
548                 p.tun_sa_out.remove_vpp_config()
549                 p.tun_sa_in.remove_vpp_config()
550
551
552 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
553     """ IPsec IPv4 Tunnel interface all Algos """
554
555     encryption_type = ESP
556     tun4_encrypt_node_name = "esp4-encrypt-tun"
557     tun4_decrypt_node_name = "esp4-decrypt-tun"
558
559     def config_network(self, p):
560
561         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
562                               IPSEC_API_INTEG_ALG_NONE)
563         p.auth_algo = 'NULL'
564         p.auth_key = []
565
566         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
567                                IPSEC_API_CRYPTO_ALG_NONE)
568         p.crypt_algo = 'NULL'
569         p.crypt_key = []
570
571         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
572                                         p.scapy_tun_spi,
573                                         p.crypt_algo_vpp_id,
574                                         p.crypt_key, p.crypt_key,
575                                         p.auth_algo_vpp_id, p.auth_key,
576                                         p.auth_key,
577                                         salt=p.salt)
578         p.tun_if.add_vpp_config()
579         p.tun_if.admin_up()
580         p.tun_if.config_ip4()
581         config_tun_params(p, self.encryption_type, p.tun_if)
582         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
583         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
584
585         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
586                              [VppRoutePath(p.tun_if.remote_ip4,
587                                            0xffffffff)])
588         p.route.add_vpp_config()
589
590     def unconfig_network(self, p):
591         p.tun_if.unconfig_ip4()
592         p.tun_if.remove_vpp_config()
593         p.route.remove_vpp_config()
594
595     def setUp(self):
596         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
597
598         self.tun_if = self.pg0
599
600     def tearDown(self):
601         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
602
603     def test_tun_44(self):
604         p = self.ipv4_params
605
606         self.config_network(p)
607
608         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
609                            dst=p.remote_tun_if_host)
610         self.send_and_assert_no_replies(self.pg1, tx)
611
612         self.unconfig_network(p)
613
614
615 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
616     """ IPsec IPv6 Multi Tunnel interface """
617
618     encryption_type = ESP
619     tun6_encrypt_node_name = "esp6-encrypt-tun"
620     tun6_decrypt_node_name = "esp6-decrypt-tun"
621
622     def setUp(self):
623         super(TestIpsec6MultiTunIfEsp, self).setUp()
624
625         self.tun_if = self.pg0
626
627         self.multi_params = []
628         self.pg0.generate_remote_hosts(10)
629         self.pg0.configure_ipv6_neighbors()
630
631         for ii in range(10):
632             p = copy.copy(self.ipv6_params)
633
634             p.remote_tun_if_host = "1111::%d" % (ii + 1)
635             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
636             p.scapy_tun_spi = p.scapy_tun_spi + ii
637             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
638             p.vpp_tun_spi = p.vpp_tun_spi + ii
639
640             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
641             p.scapy_tra_spi = p.scapy_tra_spi + ii
642             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
643             p.vpp_tra_spi = p.vpp_tra_spi + ii
644
645             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
646                                             p.scapy_tun_spi,
647                                             p.crypt_algo_vpp_id,
648                                             p.crypt_key, p.crypt_key,
649                                             p.auth_algo_vpp_id, p.auth_key,
650                                             p.auth_key, is_ip6=True,
651                                             dst=self.pg0.remote_hosts[ii].ip6)
652             p.tun_if.add_vpp_config()
653             p.tun_if.admin_up()
654             p.tun_if.config_ip6()
655             config_tun_params(p, self.encryption_type, p.tun_if)
656             self.multi_params.append(p)
657
658             r = VppIpRoute(self, p.remote_tun_if_host, 128,
659                            [VppRoutePath(p.tun_if.remote_ip6,
660                                          0xffffffff,
661                                          proto=DpoProto.DPO_PROTO_IP6)])
662             r.add_vpp_config()
663
664     def tearDown(self):
665         super(TestIpsec6MultiTunIfEsp, self).tearDown()
666
667     def test_tun_66(self):
668         """Multiple IPSEC tunnel interfaces """
669         for p in self.multi_params:
670             self.verify_tun_66(p, count=127)
671             c = p.tun_if.get_rx_stats()
672             self.assertEqual(c['packets'], 127)
673             c = p.tun_if.get_tx_stats()
674             self.assertEqual(c['packets'], 127)
675
676
677 class TestIpsecGreTebIfEsp(TemplateIpsec,
678                            IpsecTun4Tests):
679     """ Ipsec GRE TEB ESP - TUN tests """
680     tun4_encrypt_node_name = "esp4-encrypt-tun"
681     tun4_decrypt_node_name = "esp4-decrypt-tun"
682     encryption_type = ESP
683     omac = "00:11:22:33:44:55"
684
685     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
686                          payload_size=100):
687         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
688                 sa.encrypt(IP(src=self.pg0.remote_ip4,
689                               dst=self.pg0.local_ip4) /
690                            GRE() /
691                            Ether(dst=self.omac) /
692                            IP(src="1.1.1.1", dst="1.1.1.2") /
693                            UDP(sport=1144, dport=2233) /
694                            Raw(b'X' * payload_size))
695                 for i in range(count)]
696
697     def gen_pkts(self, sw_intf, src, dst, count=1,
698                  payload_size=100):
699         return [Ether(dst=self.omac) /
700                 IP(src="1.1.1.1", dst="1.1.1.2") /
701                 UDP(sport=1144, dport=2233) /
702                 Raw(b'X' * payload_size)
703                 for i in range(count)]
704
705     def verify_decrypted(self, p, rxs):
706         for rx in rxs:
707             self.assert_equal(rx[Ether].dst, self.omac)
708             self.assert_equal(rx[IP].dst, "1.1.1.2")
709
710     def verify_encrypted(self, p, sa, rxs):
711         for rx in rxs:
712             try:
713                 pkt = sa.decrypt(rx[IP])
714                 if not pkt.haslayer(IP):
715                     pkt = IP(pkt[Raw].load)
716                 self.assert_packet_checksums_valid(pkt)
717                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
718                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
719                 self.assertTrue(pkt.haslayer(GRE))
720                 e = pkt[Ether]
721                 self.assertEqual(e[Ether].dst, self.omac)
722                 self.assertEqual(e[IP].dst, "1.1.1.2")
723             except (IndexError, AssertionError):
724                 self.logger.debug(ppp("Unexpected packet:", rx))
725                 try:
726                     self.logger.debug(ppp("Decrypted packet:", pkt))
727                 except:
728                     pass
729                 raise
730
731     def setUp(self):
732         super(TestIpsecGreTebIfEsp, self).setUp()
733
734         self.tun_if = self.pg0
735
736         p = self.ipv4_params
737
738         bd1 = VppBridgeDomain(self, 1)
739         bd1.add_vpp_config()
740
741         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
742                                   p.auth_algo_vpp_id, p.auth_key,
743                                   p.crypt_algo_vpp_id, p.crypt_key,
744                                   self.vpp_esp_protocol,
745                                   self.pg0.local_ip4,
746                                   self.pg0.remote_ip4)
747         p.tun_sa_out.add_vpp_config()
748
749         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
750                                  p.auth_algo_vpp_id, p.auth_key,
751                                  p.crypt_algo_vpp_id, p.crypt_key,
752                                  self.vpp_esp_protocol,
753                                  self.pg0.remote_ip4,
754                                  self.pg0.local_ip4)
755         p.tun_sa_in.add_vpp_config()
756
757         p.tun_if = VppGreInterface(self,
758                                    self.pg0.local_ip4,
759                                    self.pg0.remote_ip4,
760                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
761                                          GRE_API_TUNNEL_TYPE_TEB))
762         p.tun_if.add_vpp_config()
763
764         p.tun_protect = VppIpsecTunProtect(self,
765                                            p.tun_if,
766                                            p.tun_sa_out,
767                                            [p.tun_sa_in])
768
769         p.tun_protect.add_vpp_config()
770
771         p.tun_if.admin_up()
772         p.tun_if.config_ip4()
773         config_tun_params(p, self.encryption_type, p.tun_if)
774
775         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
776         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
777
778         self.vapi.cli("clear ipsec sa")
779         self.vapi.cli("sh adj")
780         self.vapi.cli("sh ipsec tun")
781
782     def tearDown(self):
783         p = self.ipv4_params
784         p.tun_if.unconfig_ip4()
785         super(TestIpsecGreTebIfEsp, self).tearDown()
786
787
788 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
789                                IpsecTun4Tests):
790     """ Ipsec GRE TEB ESP - TUN tests """
791     tun4_encrypt_node_name = "esp4-encrypt-tun"
792     tun4_decrypt_node_name = "esp4-decrypt-tun"
793     encryption_type = ESP
794     omac = "00:11:22:33:44:55"
795
796     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
797                          payload_size=100):
798         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
799                 sa.encrypt(IP(src=self.pg0.remote_ip4,
800                               dst=self.pg0.local_ip4) /
801                            GRE() /
802                            Ether(dst=self.omac) /
803                            IP(src="1.1.1.1", dst="1.1.1.2") /
804                            UDP(sport=1144, dport=2233) /
805                            Raw(b'X' * payload_size))
806                 for i in range(count)]
807
808     def gen_pkts(self, sw_intf, src, dst, count=1,
809                  payload_size=100):
810         return [Ether(dst=self.omac) /
811                 Dot1Q(vlan=11) /
812                 IP(src="1.1.1.1", dst="1.1.1.2") /
813                 UDP(sport=1144, dport=2233) /
814                 Raw(b'X' * payload_size)
815                 for i in range(count)]
816
817     def verify_decrypted(self, p, rxs):
818         for rx in rxs:
819             self.assert_equal(rx[Ether].dst, self.omac)
820             self.assert_equal(rx[Dot1Q].vlan, 11)
821             self.assert_equal(rx[IP].dst, "1.1.1.2")
822
823     def verify_encrypted(self, p, sa, rxs):
824         for rx in rxs:
825             try:
826                 pkt = sa.decrypt(rx[IP])
827                 if not pkt.haslayer(IP):
828                     pkt = IP(pkt[Raw].load)
829                 self.assert_packet_checksums_valid(pkt)
830                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
831                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
832                 self.assertTrue(pkt.haslayer(GRE))
833                 e = pkt[Ether]
834                 self.assertEqual(e[Ether].dst, self.omac)
835                 self.assertFalse(e.haslayer(Dot1Q))
836                 self.assertEqual(e[IP].dst, "1.1.1.2")
837             except (IndexError, AssertionError):
838                 self.logger.debug(ppp("Unexpected packet:", rx))
839                 try:
840                     self.logger.debug(ppp("Decrypted packet:", pkt))
841                 except:
842                     pass
843                 raise
844
845     def setUp(self):
846         super(TestIpsecGreTebVlanIfEsp, self).setUp()
847
848         self.tun_if = self.pg0
849
850         p = self.ipv4_params
851
852         bd1 = VppBridgeDomain(self, 1)
853         bd1.add_vpp_config()
854
855         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
856         self.vapi.l2_interface_vlan_tag_rewrite(
857             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
858             push_dot1q=11)
859         self.pg1_11.admin_up()
860
861         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
862                                   p.auth_algo_vpp_id, p.auth_key,
863                                   p.crypt_algo_vpp_id, p.crypt_key,
864                                   self.vpp_esp_protocol,
865                                   self.pg0.local_ip4,
866                                   self.pg0.remote_ip4)
867         p.tun_sa_out.add_vpp_config()
868
869         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
870                                  p.auth_algo_vpp_id, p.auth_key,
871                                  p.crypt_algo_vpp_id, p.crypt_key,
872                                  self.vpp_esp_protocol,
873                                  self.pg0.remote_ip4,
874                                  self.pg0.local_ip4)
875         p.tun_sa_in.add_vpp_config()
876
877         p.tun_if = VppGreInterface(self,
878                                    self.pg0.local_ip4,
879                                    self.pg0.remote_ip4,
880                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
881                                          GRE_API_TUNNEL_TYPE_TEB))
882         p.tun_if.add_vpp_config()
883
884         p.tun_protect = VppIpsecTunProtect(self,
885                                            p.tun_if,
886                                            p.tun_sa_out,
887                                            [p.tun_sa_in])
888
889         p.tun_protect.add_vpp_config()
890
891         p.tun_if.admin_up()
892         p.tun_if.config_ip4()
893         config_tun_params(p, self.encryption_type, p.tun_if)
894
895         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
896         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
897
898         self.vapi.cli("clear ipsec sa")
899
900     def tearDown(self):
901         p = self.ipv4_params
902         p.tun_if.unconfig_ip4()
903         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
904         self.pg1_11.admin_down()
905         self.pg1_11.remove_vpp_config()
906
907
908 class TestIpsecGreTebIfEspTra(TemplateIpsec,
909                               IpsecTun4Tests):
910     """ Ipsec GRE TEB ESP - Tra tests """
911     tun4_encrypt_node_name = "esp4-encrypt-tun"
912     tun4_decrypt_node_name = "esp4-decrypt-tun"
913     encryption_type = ESP
914     omac = "00:11:22:33:44:55"
915
916     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
917                          payload_size=100):
918         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
919                 sa.encrypt(IP(src=self.pg0.remote_ip4,
920                               dst=self.pg0.local_ip4) /
921                            GRE() /
922                            Ether(dst=self.omac) /
923                            IP(src="1.1.1.1", dst="1.1.1.2") /
924                            UDP(sport=1144, dport=2233) /
925                            Raw(b'X' * payload_size))
926                 for i in range(count)]
927
928     def gen_pkts(self, sw_intf, src, dst, count=1,
929                  payload_size=100):
930         return [Ether(dst=self.omac) /
931                 IP(src="1.1.1.1", dst="1.1.1.2") /
932                 UDP(sport=1144, dport=2233) /
933                 Raw(b'X' * payload_size)
934                 for i in range(count)]
935
936     def verify_decrypted(self, p, rxs):
937         for rx in rxs:
938             self.assert_equal(rx[Ether].dst, self.omac)
939             self.assert_equal(rx[IP].dst, "1.1.1.2")
940
941     def verify_encrypted(self, p, sa, rxs):
942         for rx in rxs:
943             try:
944                 pkt = sa.decrypt(rx[IP])
945                 if not pkt.haslayer(IP):
946                     pkt = IP(pkt[Raw].load)
947                 self.assert_packet_checksums_valid(pkt)
948                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
949                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
950                 self.assertTrue(pkt.haslayer(GRE))
951                 e = pkt[Ether]
952                 self.assertEqual(e[Ether].dst, self.omac)
953                 self.assertEqual(e[IP].dst, "1.1.1.2")
954             except (IndexError, AssertionError):
955                 self.logger.debug(ppp("Unexpected packet:", rx))
956                 try:
957                     self.logger.debug(ppp("Decrypted packet:", pkt))
958                 except:
959                     pass
960                 raise
961
962     def setUp(self):
963         super(TestIpsecGreTebIfEspTra, self).setUp()
964
965         self.tun_if = self.pg0
966
967         p = self.ipv4_params
968
969         bd1 = VppBridgeDomain(self, 1)
970         bd1.add_vpp_config()
971
972         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
973                                   p.auth_algo_vpp_id, p.auth_key,
974                                   p.crypt_algo_vpp_id, p.crypt_key,
975                                   self.vpp_esp_protocol)
976         p.tun_sa_out.add_vpp_config()
977
978         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
979                                  p.auth_algo_vpp_id, p.auth_key,
980                                  p.crypt_algo_vpp_id, p.crypt_key,
981                                  self.vpp_esp_protocol)
982         p.tun_sa_in.add_vpp_config()
983
984         p.tun_if = VppGreInterface(self,
985                                    self.pg0.local_ip4,
986                                    self.pg0.remote_ip4,
987                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
988                                          GRE_API_TUNNEL_TYPE_TEB))
989         p.tun_if.add_vpp_config()
990
991         p.tun_protect = VppIpsecTunProtect(self,
992                                            p.tun_if,
993                                            p.tun_sa_out,
994                                            [p.tun_sa_in])
995
996         p.tun_protect.add_vpp_config()
997
998         p.tun_if.admin_up()
999         p.tun_if.config_ip4()
1000         config_tra_params(p, self.encryption_type, p.tun_if)
1001
1002         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1003         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1004
1005         self.vapi.cli("clear ipsec sa")
1006
1007     def tearDown(self):
1008         p = self.ipv4_params
1009         p.tun_if.unconfig_ip4()
1010         super(TestIpsecGreTebIfEspTra, self).tearDown()
1011
1012
1013 class TestIpsecGreIfEsp(TemplateIpsec,
1014                         IpsecTun4Tests):
1015     """ Ipsec GRE ESP - TUN tests """
1016     tun4_encrypt_node_name = "esp4-encrypt-tun"
1017     tun4_decrypt_node_name = "esp4-decrypt-tun"
1018     encryption_type = ESP
1019
1020     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1021                          payload_size=100):
1022         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1023                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1024                               dst=self.pg0.local_ip4) /
1025                            GRE() /
1026                            IP(src=self.pg1.local_ip4,
1027                               dst=self.pg1.remote_ip4) /
1028                            UDP(sport=1144, dport=2233) /
1029                            Raw(b'X' * payload_size))
1030                 for i in range(count)]
1031
1032     def gen_pkts(self, sw_intf, src, dst, count=1,
1033                  payload_size=100):
1034         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1035                 IP(src="1.1.1.1", dst="1.1.1.2") /
1036                 UDP(sport=1144, dport=2233) /
1037                 Raw(b'X' * payload_size)
1038                 for i in range(count)]
1039
1040     def verify_decrypted(self, p, rxs):
1041         for rx in rxs:
1042             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1043             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1044
1045     def verify_encrypted(self, p, sa, rxs):
1046         for rx in rxs:
1047             try:
1048                 pkt = sa.decrypt(rx[IP])
1049                 if not pkt.haslayer(IP):
1050                     pkt = IP(pkt[Raw].load)
1051                 self.assert_packet_checksums_valid(pkt)
1052                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1053                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1054                 self.assertTrue(pkt.haslayer(GRE))
1055                 e = pkt[GRE]
1056                 self.assertEqual(e[IP].dst, "1.1.1.2")
1057             except (IndexError, AssertionError):
1058                 self.logger.debug(ppp("Unexpected packet:", rx))
1059                 try:
1060                     self.logger.debug(ppp("Decrypted packet:", pkt))
1061                 except:
1062                     pass
1063                 raise
1064
1065     def setUp(self):
1066         super(TestIpsecGreIfEsp, self).setUp()
1067
1068         self.tun_if = self.pg0
1069
1070         p = self.ipv4_params
1071
1072         bd1 = VppBridgeDomain(self, 1)
1073         bd1.add_vpp_config()
1074
1075         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1076                                   p.auth_algo_vpp_id, p.auth_key,
1077                                   p.crypt_algo_vpp_id, p.crypt_key,
1078                                   self.vpp_esp_protocol,
1079                                   self.pg0.local_ip4,
1080                                   self.pg0.remote_ip4)
1081         p.tun_sa_out.add_vpp_config()
1082
1083         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1084                                  p.auth_algo_vpp_id, p.auth_key,
1085                                  p.crypt_algo_vpp_id, p.crypt_key,
1086                                  self.vpp_esp_protocol,
1087                                  self.pg0.remote_ip4,
1088                                  self.pg0.local_ip4)
1089         p.tun_sa_in.add_vpp_config()
1090
1091         p.tun_if = VppGreInterface(self,
1092                                    self.pg0.local_ip4,
1093                                    self.pg0.remote_ip4)
1094         p.tun_if.add_vpp_config()
1095
1096         p.tun_protect = VppIpsecTunProtect(self,
1097                                            p.tun_if,
1098                                            p.tun_sa_out,
1099                                            [p.tun_sa_in])
1100         p.tun_protect.add_vpp_config()
1101
1102         p.tun_if.admin_up()
1103         p.tun_if.config_ip4()
1104         config_tun_params(p, self.encryption_type, p.tun_if)
1105
1106         VppIpRoute(self, "1.1.1.2", 32,
1107                    [VppRoutePath(p.tun_if.remote_ip4,
1108                                  0xffffffff)]).add_vpp_config()
1109
1110     def tearDown(self):
1111         p = self.ipv4_params
1112         p.tun_if.unconfig_ip4()
1113         super(TestIpsecGreIfEsp, self).tearDown()
1114
1115
1116 class TestIpsecGreIfEspTra(TemplateIpsec,
1117                            IpsecTun4Tests):
1118     """ Ipsec GRE ESP - TRA tests """
1119     tun4_encrypt_node_name = "esp4-encrypt-tun"
1120     tun4_decrypt_node_name = "esp4-decrypt-tun"
1121     encryption_type = ESP
1122
1123     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1124                          payload_size=100):
1125         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1126                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1127                               dst=self.pg0.local_ip4) /
1128                            GRE() /
1129                            IP(src=self.pg1.local_ip4,
1130                               dst=self.pg1.remote_ip4) /
1131                            UDP(sport=1144, dport=2233) /
1132                            Raw(b'X' * payload_size))
1133                 for i in range(count)]
1134
1135     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1136                                 payload_size=100):
1137         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1138                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1139                               dst=self.pg0.local_ip4) /
1140                            GRE() /
1141                            UDP(sport=1144, dport=2233) /
1142                            Raw(b'X' * payload_size))
1143                 for i in range(count)]
1144
1145     def gen_pkts(self, sw_intf, src, dst, count=1,
1146                  payload_size=100):
1147         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1148                 IP(src="1.1.1.1", dst="1.1.1.2") /
1149                 UDP(sport=1144, dport=2233) /
1150                 Raw(b'X' * payload_size)
1151                 for i in range(count)]
1152
1153     def verify_decrypted(self, p, rxs):
1154         for rx in rxs:
1155             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1156             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1157
1158     def verify_encrypted(self, p, sa, rxs):
1159         for rx in rxs:
1160             try:
1161                 pkt = sa.decrypt(rx[IP])
1162                 if not pkt.haslayer(IP):
1163                     pkt = IP(pkt[Raw].load)
1164                 self.assert_packet_checksums_valid(pkt)
1165                 self.assertTrue(pkt.haslayer(GRE))
1166                 e = pkt[GRE]
1167                 self.assertEqual(e[IP].dst, "1.1.1.2")
1168             except (IndexError, AssertionError):
1169                 self.logger.debug(ppp("Unexpected packet:", rx))
1170                 try:
1171                     self.logger.debug(ppp("Decrypted packet:", pkt))
1172                 except:
1173                     pass
1174                 raise
1175
1176     def setUp(self):
1177         super(TestIpsecGreIfEspTra, self).setUp()
1178
1179         self.tun_if = self.pg0
1180
1181         p = self.ipv4_params
1182
1183         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1184                                   p.auth_algo_vpp_id, p.auth_key,
1185                                   p.crypt_algo_vpp_id, p.crypt_key,
1186                                   self.vpp_esp_protocol)
1187         p.tun_sa_out.add_vpp_config()
1188
1189         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1190                                  p.auth_algo_vpp_id, p.auth_key,
1191                                  p.crypt_algo_vpp_id, p.crypt_key,
1192                                  self.vpp_esp_protocol)
1193         p.tun_sa_in.add_vpp_config()
1194
1195         p.tun_if = VppGreInterface(self,
1196                                    self.pg0.local_ip4,
1197                                    self.pg0.remote_ip4)
1198         p.tun_if.add_vpp_config()
1199
1200         p.tun_protect = VppIpsecTunProtect(self,
1201                                            p.tun_if,
1202                                            p.tun_sa_out,
1203                                            [p.tun_sa_in])
1204         p.tun_protect.add_vpp_config()
1205
1206         p.tun_if.admin_up()
1207         p.tun_if.config_ip4()
1208         config_tra_params(p, self.encryption_type, p.tun_if)
1209
1210         VppIpRoute(self, "1.1.1.2", 32,
1211                    [VppRoutePath(p.tun_if.remote_ip4,
1212                                  0xffffffff)]).add_vpp_config()
1213
1214     def tearDown(self):
1215         p = self.ipv4_params
1216         p.tun_if.unconfig_ip4()
1217         super(TestIpsecGreIfEspTra, self).tearDown()
1218
1219     def test_gre_non_ip(self):
1220         p = self.ipv4_params
1221         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1222                                           src=p.remote_tun_if_host,
1223                                           dst=self.pg1.remote_ip6)
1224         self.send_and_assert_no_replies(self.tun_if, tx)
1225         node_name = ('/err/%s/unsupported payload' %
1226                      self.tun4_decrypt_node_name)
1227         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1228
1229
1230 class TestIpsecGre6IfEspTra(TemplateIpsec,
1231                             IpsecTun6Tests):
1232     """ Ipsec GRE ESP - TRA tests """
1233     tun6_encrypt_node_name = "esp6-encrypt-tun"
1234     tun6_decrypt_node_name = "esp6-decrypt-tun"
1235     encryption_type = ESP
1236
1237     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1238                           payload_size=100):
1239         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1240                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1241                                 dst=self.pg0.local_ip6) /
1242                            GRE() /
1243                            IPv6(src=self.pg1.local_ip6,
1244                                 dst=self.pg1.remote_ip6) /
1245                            UDP(sport=1144, dport=2233) /
1246                            Raw(b'X' * payload_size))
1247                 for i in range(count)]
1248
1249     def gen_pkts6(self, sw_intf, src, dst, count=1,
1250                   payload_size=100):
1251         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1252                 IPv6(src="1::1", dst="1::2") /
1253                 UDP(sport=1144, dport=2233) /
1254                 Raw(b'X' * payload_size)
1255                 for i in range(count)]
1256
1257     def verify_decrypted6(self, p, rxs):
1258         for rx in rxs:
1259             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1260             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1261
1262     def verify_encrypted6(self, p, sa, rxs):
1263         for rx in rxs:
1264             try:
1265                 pkt = sa.decrypt(rx[IPv6])
1266                 if not pkt.haslayer(IPv6):
1267                     pkt = IPv6(pkt[Raw].load)
1268                 self.assert_packet_checksums_valid(pkt)
1269                 self.assertTrue(pkt.haslayer(GRE))
1270                 e = pkt[GRE]
1271                 self.assertEqual(e[IPv6].dst, "1::2")
1272             except (IndexError, AssertionError):
1273                 self.logger.debug(ppp("Unexpected packet:", rx))
1274                 try:
1275                     self.logger.debug(ppp("Decrypted packet:", pkt))
1276                 except:
1277                     pass
1278                 raise
1279
1280     def setUp(self):
1281         super(TestIpsecGre6IfEspTra, self).setUp()
1282
1283         self.tun_if = self.pg0
1284
1285         p = self.ipv6_params
1286
1287         bd1 = VppBridgeDomain(self, 1)
1288         bd1.add_vpp_config()
1289
1290         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1291                                   p.auth_algo_vpp_id, p.auth_key,
1292                                   p.crypt_algo_vpp_id, p.crypt_key,
1293                                   self.vpp_esp_protocol)
1294         p.tun_sa_out.add_vpp_config()
1295
1296         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1297                                  p.auth_algo_vpp_id, p.auth_key,
1298                                  p.crypt_algo_vpp_id, p.crypt_key,
1299                                  self.vpp_esp_protocol)
1300         p.tun_sa_in.add_vpp_config()
1301
1302         p.tun_if = VppGreInterface(self,
1303                                    self.pg0.local_ip6,
1304                                    self.pg0.remote_ip6)
1305         p.tun_if.add_vpp_config()
1306
1307         p.tun_protect = VppIpsecTunProtect(self,
1308                                            p.tun_if,
1309                                            p.tun_sa_out,
1310                                            [p.tun_sa_in])
1311         p.tun_protect.add_vpp_config()
1312
1313         p.tun_if.admin_up()
1314         p.tun_if.config_ip6()
1315         config_tra_params(p, self.encryption_type, p.tun_if)
1316
1317         r = VppIpRoute(self, "1::2", 128,
1318                        [VppRoutePath(p.tun_if.remote_ip6,
1319                                      0xffffffff,
1320                                      proto=DpoProto.DPO_PROTO_IP6)])
1321         r.add_vpp_config()
1322
1323     def tearDown(self):
1324         p = self.ipv6_params
1325         p.tun_if.unconfig_ip6()
1326         super(TestIpsecGre6IfEspTra, self).tearDown()
1327
1328
1329 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1330     """ Ipsec mGRE ESP v4 TRA tests """
1331     tun4_encrypt_node_name = "esp4-encrypt-tun"
1332     tun4_decrypt_node_name = "esp4-decrypt-tun"
1333     encryption_type = ESP
1334
1335     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1336                          payload_size=100):
1337         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1338                 sa.encrypt(IP(src=p.tun_dst,
1339                               dst=self.pg0.local_ip4) /
1340                            GRE() /
1341                            IP(src=self.pg1.local_ip4,
1342                               dst=self.pg1.remote_ip4) /
1343                            UDP(sport=1144, dport=2233) /
1344                            Raw(b'X' * payload_size))
1345                 for i in range(count)]
1346
1347     def gen_pkts(self, sw_intf, src, dst, count=1,
1348                  payload_size=100):
1349         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1350                 IP(src="1.1.1.1", dst=dst) /
1351                 UDP(sport=1144, dport=2233) /
1352                 Raw(b'X' * payload_size)
1353                 for i in range(count)]
1354
1355     def verify_decrypted(self, p, rxs):
1356         for rx in rxs:
1357             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1358             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1359
1360     def verify_encrypted(self, p, sa, rxs):
1361         for rx in rxs:
1362             try:
1363                 pkt = sa.decrypt(rx[IP])
1364                 if not pkt.haslayer(IP):
1365                     pkt = IP(pkt[Raw].load)
1366                 self.assert_packet_checksums_valid(pkt)
1367                 self.assertTrue(pkt.haslayer(GRE))
1368                 e = pkt[GRE]
1369                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1370             except (IndexError, AssertionError):
1371                 self.logger.debug(ppp("Unexpected packet:", rx))
1372                 try:
1373                     self.logger.debug(ppp("Decrypted packet:", pkt))
1374                 except:
1375                     pass
1376                 raise
1377
1378     def setUp(self):
1379         super(TestIpsecMGreIfEspTra4, self).setUp()
1380
1381         N_NHS = 16
1382         self.tun_if = self.pg0
1383         p = self.ipv4_params
1384         p.tun_if = VppGreInterface(self,
1385                                    self.pg0.local_ip4,
1386                                    "0.0.0.0",
1387                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1388                                          TUNNEL_API_MODE_MP))
1389         p.tun_if.add_vpp_config()
1390         p.tun_if.admin_up()
1391         p.tun_if.config_ip4()
1392         p.tun_if.generate_remote_hosts(N_NHS)
1393         self.pg0.generate_remote_hosts(N_NHS)
1394         self.pg0.configure_ipv4_neighbors()
1395
1396         # setup some SAs for several next-hops on the interface
1397         self.multi_params = []
1398
1399         for ii in range(N_NHS):
1400             p = copy.copy(self.ipv4_params)
1401
1402             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1403             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1404             p.scapy_tun_spi = p.scapy_tun_spi + ii
1405             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1406             p.vpp_tun_spi = p.vpp_tun_spi + ii
1407
1408             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1409             p.scapy_tra_spi = p.scapy_tra_spi + ii
1410             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1411             p.vpp_tra_spi = p.vpp_tra_spi + ii
1412             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1413                                       p.auth_algo_vpp_id, p.auth_key,
1414                                       p.crypt_algo_vpp_id, p.crypt_key,
1415                                       self.vpp_esp_protocol)
1416             p.tun_sa_out.add_vpp_config()
1417
1418             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1419                                      p.auth_algo_vpp_id, p.auth_key,
1420                                      p.crypt_algo_vpp_id, p.crypt_key,
1421                                      self.vpp_esp_protocol)
1422             p.tun_sa_in.add_vpp_config()
1423
1424             p.tun_protect = VppIpsecTunProtect(
1425                 self,
1426                 p.tun_if,
1427                 p.tun_sa_out,
1428                 [p.tun_sa_in],
1429                 nh=p.tun_if.remote_hosts[ii].ip4)
1430             p.tun_protect.add_vpp_config()
1431             config_tra_params(p, self.encryption_type, p.tun_if)
1432             self.multi_params.append(p)
1433
1434             VppIpRoute(self, p.remote_tun_if_host, 32,
1435                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1436                                      p.tun_if.sw_if_index)]).add_vpp_config()
1437
1438             # in this v4 variant add the teibs after the protect
1439             p.teib = VppTeib(self, p.tun_if,
1440                              p.tun_if.remote_hosts[ii].ip4,
1441                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1442             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1443         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1444
1445     def tearDown(self):
1446         p = self.ipv4_params
1447         p.tun_if.unconfig_ip4()
1448         super(TestIpsecMGreIfEspTra4, self).tearDown()
1449
1450     def test_tun_44(self):
1451         """mGRE IPSEC 44"""
1452         N_PKTS = 63
1453         for p in self.multi_params:
1454             self.verify_tun_44(p, count=N_PKTS)
1455             p.teib.remove_vpp_config()
1456             self.verify_tun_dropped_44(p, count=N_PKTS)
1457             p.teib.add_vpp_config()
1458             self.verify_tun_44(p, count=N_PKTS)
1459
1460
1461 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1462     """ Ipsec mGRE ESP v6 TRA tests """
1463     tun6_encrypt_node_name = "esp6-encrypt-tun"
1464     tun6_decrypt_node_name = "esp6-decrypt-tun"
1465     encryption_type = ESP
1466
1467     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1468                           payload_size=100):
1469         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1470                 sa.encrypt(IPv6(src=p.tun_dst,
1471                                 dst=self.pg0.local_ip6) /
1472                            GRE() /
1473                            IPv6(src=self.pg1.local_ip6,
1474                                 dst=self.pg1.remote_ip6) /
1475                            UDP(sport=1144, dport=2233) /
1476                            Raw(b'X' * payload_size))
1477                 for i in range(count)]
1478
1479     def gen_pkts6(self, sw_intf, src, dst, count=1,
1480                   payload_size=100):
1481         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1482                 IPv6(src="1::1", dst=dst) /
1483                 UDP(sport=1144, dport=2233) /
1484                 Raw(b'X' * payload_size)
1485                 for i in range(count)]
1486
1487     def verify_decrypted6(self, p, rxs):
1488         for rx in rxs:
1489             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1490             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1491
1492     def verify_encrypted6(self, p, sa, rxs):
1493         for rx in rxs:
1494             try:
1495                 pkt = sa.decrypt(rx[IPv6])
1496                 if not pkt.haslayer(IPv6):
1497                     pkt = IPv6(pkt[Raw].load)
1498                 self.assert_packet_checksums_valid(pkt)
1499                 self.assertTrue(pkt.haslayer(GRE))
1500                 e = pkt[GRE]
1501                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1502             except (IndexError, AssertionError):
1503                 self.logger.debug(ppp("Unexpected packet:", rx))
1504                 try:
1505                     self.logger.debug(ppp("Decrypted packet:", pkt))
1506                 except:
1507                     pass
1508                 raise
1509
1510     def setUp(self):
1511         super(TestIpsecMGreIfEspTra6, self).setUp()
1512
1513         self.vapi.cli("set logging class ipsec level debug")
1514
1515         N_NHS = 16
1516         self.tun_if = self.pg0
1517         p = self.ipv6_params
1518         p.tun_if = VppGreInterface(self,
1519                                    self.pg0.local_ip6,
1520                                    "::",
1521                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1522                                          TUNNEL_API_MODE_MP))
1523         p.tun_if.add_vpp_config()
1524         p.tun_if.admin_up()
1525         p.tun_if.config_ip6()
1526         p.tun_if.generate_remote_hosts(N_NHS)
1527         self.pg0.generate_remote_hosts(N_NHS)
1528         self.pg0.configure_ipv6_neighbors()
1529
1530         # setup some SAs for several next-hops on the interface
1531         self.multi_params = []
1532
1533         for ii in range(N_NHS):
1534             p = copy.copy(self.ipv6_params)
1535
1536             p.remote_tun_if_host = "1::%d" % (ii + 1)
1537             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1538             p.scapy_tun_spi = p.scapy_tun_spi + ii
1539             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1540             p.vpp_tun_spi = p.vpp_tun_spi + ii
1541
1542             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1543             p.scapy_tra_spi = p.scapy_tra_spi + ii
1544             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1545             p.vpp_tra_spi = p.vpp_tra_spi + ii
1546             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1547                                       p.auth_algo_vpp_id, p.auth_key,
1548                                       p.crypt_algo_vpp_id, p.crypt_key,
1549                                       self.vpp_esp_protocol)
1550             p.tun_sa_out.add_vpp_config()
1551
1552             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1553                                      p.auth_algo_vpp_id, p.auth_key,
1554                                      p.crypt_algo_vpp_id, p.crypt_key,
1555                                      self.vpp_esp_protocol)
1556             p.tun_sa_in.add_vpp_config()
1557
1558             # in this v6 variant add the teibs first then the protection
1559             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1560             VppTeib(self, p.tun_if,
1561                     p.tun_if.remote_hosts[ii].ip6,
1562                     p.tun_dst).add_vpp_config()
1563
1564             p.tun_protect = VppIpsecTunProtect(
1565                 self,
1566                 p.tun_if,
1567                 p.tun_sa_out,
1568                 [p.tun_sa_in],
1569                 nh=p.tun_if.remote_hosts[ii].ip6)
1570             p.tun_protect.add_vpp_config()
1571             config_tra_params(p, self.encryption_type, p.tun_if)
1572             self.multi_params.append(p)
1573
1574             VppIpRoute(self, p.remote_tun_if_host, 128,
1575                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1576                                      p.tun_if.sw_if_index)]).add_vpp_config()
1577             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1578
1579         self.logger.info(self.vapi.cli("sh log"))
1580         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1581         self.logger.info(self.vapi.cli("sh adj 41"))
1582
1583     def tearDown(self):
1584         p = self.ipv6_params
1585         p.tun_if.unconfig_ip6()
1586         super(TestIpsecMGreIfEspTra6, self).tearDown()
1587
1588     def test_tun_66(self):
1589         """mGRE IPSec 66"""
1590         for p in self.multi_params:
1591             self.verify_tun_66(p, count=63)
1592
1593
1594 class TemplateIpsec4TunProtect(object):
1595     """ IPsec IPv4 Tunnel protect """
1596
1597     encryption_type = ESP
1598     tun4_encrypt_node_name = "esp4-encrypt-tun"
1599     tun4_decrypt_node_name = "esp4-decrypt-tun"
1600     tun4_input_node = "ipsec4-tun-input"
1601
1602     def config_sa_tra(self, p):
1603         config_tun_params(p, self.encryption_type, p.tun_if)
1604
1605         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1606                                   p.auth_algo_vpp_id, p.auth_key,
1607                                   p.crypt_algo_vpp_id, p.crypt_key,
1608                                   self.vpp_esp_protocol,
1609                                   flags=p.flags)
1610         p.tun_sa_out.add_vpp_config()
1611
1612         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1613                                  p.auth_algo_vpp_id, p.auth_key,
1614                                  p.crypt_algo_vpp_id, p.crypt_key,
1615                                  self.vpp_esp_protocol,
1616                                  flags=p.flags)
1617         p.tun_sa_in.add_vpp_config()
1618
1619     def config_sa_tun(self, p):
1620         config_tun_params(p, self.encryption_type, p.tun_if)
1621
1622         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1623                                   p.auth_algo_vpp_id, p.auth_key,
1624                                   p.crypt_algo_vpp_id, p.crypt_key,
1625                                   self.vpp_esp_protocol,
1626                                   self.tun_if.local_addr[p.addr_type],
1627                                   self.tun_if.remote_addr[p.addr_type],
1628                                   flags=p.flags)
1629         p.tun_sa_out.add_vpp_config()
1630
1631         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1632                                  p.auth_algo_vpp_id, p.auth_key,
1633                                  p.crypt_algo_vpp_id, p.crypt_key,
1634                                  self.vpp_esp_protocol,
1635                                  self.tun_if.remote_addr[p.addr_type],
1636                                  self.tun_if.local_addr[p.addr_type],
1637                                  flags=p.flags)
1638         p.tun_sa_in.add_vpp_config()
1639
1640     def config_protect(self, p):
1641         p.tun_protect = VppIpsecTunProtect(self,
1642                                            p.tun_if,
1643                                            p.tun_sa_out,
1644                                            [p.tun_sa_in])
1645         p.tun_protect.add_vpp_config()
1646
1647     def config_network(self, p):
1648         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1649                                        self.pg0.local_ip4,
1650                                        self.pg0.remote_ip4)
1651         p.tun_if.add_vpp_config()
1652         p.tun_if.admin_up()
1653         p.tun_if.config_ip4()
1654         p.tun_if.config_ip6()
1655
1656         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1657                              [VppRoutePath(p.tun_if.remote_ip4,
1658                                            0xffffffff)])
1659         p.route.add_vpp_config()
1660         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1661                        [VppRoutePath(p.tun_if.remote_ip6,
1662                                      0xffffffff,
1663                                      proto=DpoProto.DPO_PROTO_IP6)])
1664         r.add_vpp_config()
1665
1666     def unconfig_network(self, p):
1667         p.route.remove_vpp_config()
1668         p.tun_if.remove_vpp_config()
1669
1670     def unconfig_protect(self, p):
1671         p.tun_protect.remove_vpp_config()
1672
1673     def unconfig_sa(self, p):
1674         p.tun_sa_out.remove_vpp_config()
1675         p.tun_sa_in.remove_vpp_config()
1676
1677
1678 class TestIpsec4TunProtect(TemplateIpsec,
1679                            TemplateIpsec4TunProtect,
1680                            IpsecTun4):
1681     """ IPsec IPv4 Tunnel protect - transport mode"""
1682
1683     def setUp(self):
1684         super(TestIpsec4TunProtect, self).setUp()
1685
1686         self.tun_if = self.pg0
1687
1688     def tearDown(self):
1689         super(TestIpsec4TunProtect, self).tearDown()
1690
1691     def test_tun_44(self):
1692         """IPSEC tunnel protect"""
1693
1694         p = self.ipv4_params
1695
1696         self.config_network(p)
1697         self.config_sa_tra(p)
1698         self.config_protect(p)
1699
1700         self.verify_tun_44(p, count=127)
1701         c = p.tun_if.get_rx_stats()
1702         self.assertEqual(c['packets'], 127)
1703         c = p.tun_if.get_tx_stats()
1704         self.assertEqual(c['packets'], 127)
1705
1706         self.vapi.cli("clear ipsec sa")
1707         self.verify_tun_64(p, count=127)
1708         c = p.tun_if.get_rx_stats()
1709         self.assertEqual(c['packets'], 254)
1710         c = p.tun_if.get_tx_stats()
1711         self.assertEqual(c['packets'], 254)
1712
1713         # rekey - create new SAs and update the tunnel protection
1714         np = copy.copy(p)
1715         np.crypt_key = b'X' + p.crypt_key[1:]
1716         np.scapy_tun_spi += 100
1717         np.scapy_tun_sa_id += 1
1718         np.vpp_tun_spi += 100
1719         np.vpp_tun_sa_id += 1
1720         np.tun_if.local_spi = p.vpp_tun_spi
1721         np.tun_if.remote_spi = p.scapy_tun_spi
1722
1723         self.config_sa_tra(np)
1724         self.config_protect(np)
1725         self.unconfig_sa(p)
1726
1727         self.verify_tun_44(np, count=127)
1728         c = p.tun_if.get_rx_stats()
1729         self.assertEqual(c['packets'], 381)
1730         c = p.tun_if.get_tx_stats()
1731         self.assertEqual(c['packets'], 381)
1732
1733         # teardown
1734         self.unconfig_protect(np)
1735         self.unconfig_sa(np)
1736         self.unconfig_network(p)
1737
1738
1739 class TestIpsec4TunProtectUdp(TemplateIpsec,
1740                               TemplateIpsec4TunProtect,
1741                               IpsecTun4):
1742     """ IPsec IPv4 Tunnel protect - transport mode"""
1743
1744     def setUp(self):
1745         super(TestIpsec4TunProtectUdp, self).setUp()
1746
1747         self.tun_if = self.pg0
1748
1749         p = self.ipv4_params
1750         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1751                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1752         p.nat_header = UDP(sport=5454, dport=4500)
1753         self.config_network(p)
1754         self.config_sa_tra(p)
1755         self.config_protect(p)
1756
1757     def tearDown(self):
1758         p = self.ipv4_params
1759         self.unconfig_protect(p)
1760         self.unconfig_sa(p)
1761         self.unconfig_network(p)
1762         super(TestIpsec4TunProtectUdp, self).tearDown()
1763
1764     def test_tun_44(self):
1765         """IPSEC UDP tunnel protect"""
1766
1767         p = self.ipv4_params
1768
1769         self.verify_tun_44(p, count=127)
1770         c = p.tun_if.get_rx_stats()
1771         self.assertEqual(c['packets'], 127)
1772         c = p.tun_if.get_tx_stats()
1773         self.assertEqual(c['packets'], 127)
1774
1775     def test_keepalive(self):
1776         """ IPSEC NAT Keepalive """
1777         self.verify_keepalive(self.ipv4_params)
1778
1779
1780 class TestIpsec4TunProtectTun(TemplateIpsec,
1781                               TemplateIpsec4TunProtect,
1782                               IpsecTun4):
1783     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1784
1785     encryption_type = ESP
1786     tun4_encrypt_node_name = "esp4-encrypt-tun"
1787     tun4_decrypt_node_name = "esp4-decrypt-tun"
1788
1789     def setUp(self):
1790         super(TestIpsec4TunProtectTun, self).setUp()
1791
1792         self.tun_if = self.pg0
1793
1794     def tearDown(self):
1795         super(TestIpsec4TunProtectTun, self).tearDown()
1796
1797     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1798                          payload_size=100):
1799         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1800                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1801                               dst=sw_intf.local_ip4) /
1802                            IP(src=src, dst=dst) /
1803                            UDP(sport=1144, dport=2233) /
1804                            Raw(b'X' * payload_size))
1805                 for i in range(count)]
1806
1807     def gen_pkts(self, sw_intf, src, dst, count=1,
1808                  payload_size=100):
1809         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1810                 IP(src=src, dst=dst) /
1811                 UDP(sport=1144, dport=2233) /
1812                 Raw(b'X' * payload_size)
1813                 for i in range(count)]
1814
1815     def verify_decrypted(self, p, rxs):
1816         for rx in rxs:
1817             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1818             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1819             self.assert_packet_checksums_valid(rx)
1820
1821     def verify_encrypted(self, p, sa, rxs):
1822         for rx in rxs:
1823             try:
1824                 pkt = sa.decrypt(rx[IP])
1825                 if not pkt.haslayer(IP):
1826                     pkt = IP(pkt[Raw].load)
1827                 self.assert_packet_checksums_valid(pkt)
1828                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1829                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1830                 inner = pkt[IP].payload
1831                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1832
1833             except (IndexError, AssertionError):
1834                 self.logger.debug(ppp("Unexpected packet:", rx))
1835                 try:
1836                     self.logger.debug(ppp("Decrypted packet:", pkt))
1837                 except:
1838                     pass
1839                 raise
1840
1841     def test_tun_44(self):
1842         """IPSEC tunnel protect """
1843
1844         p = self.ipv4_params
1845
1846         self.config_network(p)
1847         self.config_sa_tun(p)
1848         self.config_protect(p)
1849
1850         self.verify_tun_44(p, count=127)
1851
1852         c = p.tun_if.get_rx_stats()
1853         self.assertEqual(c['packets'], 127)
1854         c = p.tun_if.get_tx_stats()
1855         self.assertEqual(c['packets'], 127)
1856
1857         # rekey - create new SAs and update the tunnel protection
1858         np = copy.copy(p)
1859         np.crypt_key = b'X' + p.crypt_key[1:]
1860         np.scapy_tun_spi += 100
1861         np.scapy_tun_sa_id += 1
1862         np.vpp_tun_spi += 100
1863         np.vpp_tun_sa_id += 1
1864         np.tun_if.local_spi = p.vpp_tun_spi
1865         np.tun_if.remote_spi = p.scapy_tun_spi
1866
1867         self.config_sa_tun(np)
1868         self.config_protect(np)
1869         self.unconfig_sa(p)
1870
1871         self.verify_tun_44(np, count=127)
1872         c = p.tun_if.get_rx_stats()
1873         self.assertEqual(c['packets'], 254)
1874         c = p.tun_if.get_tx_stats()
1875         self.assertEqual(c['packets'], 254)
1876
1877         # teardown
1878         self.unconfig_protect(np)
1879         self.unconfig_sa(np)
1880         self.unconfig_network(p)
1881
1882
1883 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1884                                   TemplateIpsec4TunProtect,
1885                                   IpsecTun4):
1886     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1887
1888     encryption_type = ESP
1889     tun4_encrypt_node_name = "esp4-encrypt-tun"
1890     tun4_decrypt_node_name = "esp4-decrypt-tun"
1891
1892     def setUp(self):
1893         super(TestIpsec4TunProtectTunDrop, self).setUp()
1894
1895         self.tun_if = self.pg0
1896
1897     def tearDown(self):
1898         super(TestIpsec4TunProtectTunDrop, self).tearDown()
1899
1900     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1901                          payload_size=100):
1902         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1903                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1904                               dst="5.5.5.5") /
1905                            IP(src=src, dst=dst) /
1906                            UDP(sport=1144, dport=2233) /
1907                            Raw(b'X' * payload_size))
1908                 for i in range(count)]
1909
1910     def test_tun_drop_44(self):
1911         """IPSEC tunnel protect bogus tunnel header """
1912
1913         p = self.ipv4_params
1914
1915         self.config_network(p)
1916         self.config_sa_tun(p)
1917         self.config_protect(p)
1918
1919         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1920                                    src=p.remote_tun_if_host,
1921                                    dst=self.pg1.remote_ip4,
1922                                    count=63)
1923         self.send_and_assert_no_replies(self.tun_if, tx)
1924
1925         # teardown
1926         self.unconfig_protect(p)
1927         self.unconfig_sa(p)
1928         self.unconfig_network(p)
1929
1930
1931 class TemplateIpsec6TunProtect(object):
1932     """ IPsec IPv6 Tunnel protect """
1933
1934     def config_sa_tra(self, p):
1935         config_tun_params(p, self.encryption_type, p.tun_if)
1936
1937         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1938                                   p.auth_algo_vpp_id, p.auth_key,
1939                                   p.crypt_algo_vpp_id, p.crypt_key,
1940                                   self.vpp_esp_protocol)
1941         p.tun_sa_out.add_vpp_config()
1942
1943         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1944                                  p.auth_algo_vpp_id, p.auth_key,
1945                                  p.crypt_algo_vpp_id, p.crypt_key,
1946                                  self.vpp_esp_protocol)
1947         p.tun_sa_in.add_vpp_config()
1948
1949     def config_sa_tun(self, p):
1950         config_tun_params(p, self.encryption_type, p.tun_if)
1951
1952         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1953                                   p.auth_algo_vpp_id, p.auth_key,
1954                                   p.crypt_algo_vpp_id, p.crypt_key,
1955                                   self.vpp_esp_protocol,
1956                                   self.tun_if.local_addr[p.addr_type],
1957                                   self.tun_if.remote_addr[p.addr_type])
1958         p.tun_sa_out.add_vpp_config()
1959
1960         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1961                                  p.auth_algo_vpp_id, p.auth_key,
1962                                  p.crypt_algo_vpp_id, p.crypt_key,
1963                                  self.vpp_esp_protocol,
1964                                  self.tun_if.remote_addr[p.addr_type],
1965                                  self.tun_if.local_addr[p.addr_type])
1966         p.tun_sa_in.add_vpp_config()
1967
1968     def config_protect(self, p):
1969         p.tun_protect = VppIpsecTunProtect(self,
1970                                            p.tun_if,
1971                                            p.tun_sa_out,
1972                                            [p.tun_sa_in])
1973         p.tun_protect.add_vpp_config()
1974
1975     def config_network(self, p):
1976         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1977                                        self.pg0.local_ip6,
1978                                        self.pg0.remote_ip6)
1979         p.tun_if.add_vpp_config()
1980         p.tun_if.admin_up()
1981         p.tun_if.config_ip6()
1982         p.tun_if.config_ip4()
1983
1984         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1985                              [VppRoutePath(p.tun_if.remote_ip6,
1986                                            0xffffffff,
1987                                            proto=DpoProto.DPO_PROTO_IP6)])
1988         p.route.add_vpp_config()
1989         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1990                        [VppRoutePath(p.tun_if.remote_ip4,
1991                                      0xffffffff)])
1992         r.add_vpp_config()
1993
1994     def unconfig_network(self, p):
1995         p.route.remove_vpp_config()
1996         p.tun_if.remove_vpp_config()
1997
1998     def unconfig_protect(self, p):
1999         p.tun_protect.remove_vpp_config()
2000
2001     def unconfig_sa(self, p):
2002         p.tun_sa_out.remove_vpp_config()
2003         p.tun_sa_in.remove_vpp_config()
2004
2005
2006 class TestIpsec6TunProtect(TemplateIpsec,
2007                            TemplateIpsec6TunProtect,
2008                            IpsecTun6):
2009     """ IPsec IPv6 Tunnel protect - transport mode"""
2010
2011     encryption_type = ESP
2012     tun6_encrypt_node_name = "esp6-encrypt-tun"
2013     tun6_decrypt_node_name = "esp6-decrypt-tun"
2014
2015     def setUp(self):
2016         super(TestIpsec6TunProtect, self).setUp()
2017
2018         self.tun_if = self.pg0
2019
2020     def tearDown(self):
2021         super(TestIpsec6TunProtect, self).tearDown()
2022
2023     def test_tun_66(self):
2024         """IPSEC tunnel protect 6o6"""
2025
2026         p = self.ipv6_params
2027
2028         self.config_network(p)
2029         self.config_sa_tra(p)
2030         self.config_protect(p)
2031
2032         self.verify_tun_66(p, count=127)
2033         c = p.tun_if.get_rx_stats()
2034         self.assertEqual(c['packets'], 127)
2035         c = p.tun_if.get_tx_stats()
2036         self.assertEqual(c['packets'], 127)
2037
2038         # rekey - create new SAs and update the tunnel protection
2039         np = copy.copy(p)
2040         np.crypt_key = b'X' + p.crypt_key[1:]
2041         np.scapy_tun_spi += 100
2042         np.scapy_tun_sa_id += 1
2043         np.vpp_tun_spi += 100
2044         np.vpp_tun_sa_id += 1
2045         np.tun_if.local_spi = p.vpp_tun_spi
2046         np.tun_if.remote_spi = p.scapy_tun_spi
2047
2048         self.config_sa_tra(np)
2049         self.config_protect(np)
2050         self.unconfig_sa(p)
2051
2052         self.verify_tun_66(np, count=127)
2053         c = p.tun_if.get_rx_stats()
2054         self.assertEqual(c['packets'], 254)
2055         c = p.tun_if.get_tx_stats()
2056         self.assertEqual(c['packets'], 254)
2057
2058         # bounce the interface state
2059         p.tun_if.admin_down()
2060         self.verify_drop_tun_66(np, count=127)
2061         node = ('/err/ipsec6-tun-input/%s' %
2062                 'ipsec packets received on disabled interface')
2063         self.assertEqual(127, self.statistics.get_err_counter(node))
2064         p.tun_if.admin_up()
2065         self.verify_tun_66(np, count=127)
2066
2067         # 3 phase rekey
2068         #  1) add two input SAs [old, new]
2069         #  2) swap output SA to [new]
2070         #  3) use only [new] input SA
2071         np3 = copy.copy(np)
2072         np3.crypt_key = b'Z' + p.crypt_key[1:]
2073         np3.scapy_tun_spi += 100
2074         np3.scapy_tun_sa_id += 1
2075         np3.vpp_tun_spi += 100
2076         np3.vpp_tun_sa_id += 1
2077         np3.tun_if.local_spi = p.vpp_tun_spi
2078         np3.tun_if.remote_spi = p.scapy_tun_spi
2079
2080         self.config_sa_tra(np3)
2081
2082         # step 1;
2083         p.tun_protect.update_vpp_config(np.tun_sa_out,
2084                                         [np.tun_sa_in, np3.tun_sa_in])
2085         self.verify_tun_66(np, np, count=127)
2086         self.verify_tun_66(np3, np, count=127)
2087
2088         # step 2;
2089         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2090                                         [np.tun_sa_in, np3.tun_sa_in])
2091         self.verify_tun_66(np, np3, count=127)
2092         self.verify_tun_66(np3, np3, count=127)
2093
2094         # step 1;
2095         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2096                                         [np3.tun_sa_in])
2097         self.verify_tun_66(np3, np3, count=127)
2098         self.verify_drop_tun_66(np, count=127)
2099
2100         c = p.tun_if.get_rx_stats()
2101         self.assertEqual(c['packets'], 127*9)
2102         c = p.tun_if.get_tx_stats()
2103         self.assertEqual(c['packets'], 127*8)
2104         self.unconfig_sa(np)
2105
2106         # teardown
2107         self.unconfig_protect(np3)
2108         self.unconfig_sa(np3)
2109         self.unconfig_network(p)
2110
2111     def test_tun_46(self):
2112         """IPSEC tunnel protect 4o6"""
2113
2114         p = self.ipv6_params
2115
2116         self.config_network(p)
2117         self.config_sa_tra(p)
2118         self.config_protect(p)
2119
2120         self.verify_tun_46(p, count=127)
2121         c = p.tun_if.get_rx_stats()
2122         self.assertEqual(c['packets'], 127)
2123         c = p.tun_if.get_tx_stats()
2124         self.assertEqual(c['packets'], 127)
2125
2126         # teardown
2127         self.unconfig_protect(p)
2128         self.unconfig_sa(p)
2129         self.unconfig_network(p)
2130
2131
2132 class TestIpsec6TunProtectTun(TemplateIpsec,
2133                               TemplateIpsec6TunProtect,
2134                               IpsecTun6):
2135     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2136
2137     encryption_type = ESP
2138     tun6_encrypt_node_name = "esp6-encrypt-tun"
2139     tun6_decrypt_node_name = "esp6-decrypt-tun"
2140
2141     def setUp(self):
2142         super(TestIpsec6TunProtectTun, self).setUp()
2143
2144         self.tun_if = self.pg0
2145
2146     def tearDown(self):
2147         super(TestIpsec6TunProtectTun, self).tearDown()
2148
2149     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2150                           payload_size=100):
2151         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2152                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2153                                 dst=sw_intf.local_ip6) /
2154                            IPv6(src=src, dst=dst) /
2155                            UDP(sport=1166, dport=2233) /
2156                            Raw(b'X' * payload_size))
2157                 for i in range(count)]
2158
2159     def gen_pkts6(self, sw_intf, src, dst, count=1,
2160                   payload_size=100):
2161         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2162                 IPv6(src=src, dst=dst) /
2163                 UDP(sport=1166, dport=2233) /
2164                 Raw(b'X' * payload_size)
2165                 for i in range(count)]
2166
2167     def verify_decrypted6(self, p, rxs):
2168         for rx in rxs:
2169             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2170             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2171             self.assert_packet_checksums_valid(rx)
2172
2173     def verify_encrypted6(self, p, sa, rxs):
2174         for rx in rxs:
2175             try:
2176                 pkt = sa.decrypt(rx[IPv6])
2177                 if not pkt.haslayer(IPv6):
2178                     pkt = IPv6(pkt[Raw].load)
2179                 self.assert_packet_checksums_valid(pkt)
2180                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2181                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2182                 inner = pkt[IPv6].payload
2183                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2184
2185             except (IndexError, AssertionError):
2186                 self.logger.debug(ppp("Unexpected packet:", rx))
2187                 try:
2188                     self.logger.debug(ppp("Decrypted packet:", pkt))
2189                 except:
2190                     pass
2191                 raise
2192
2193     def test_tun_66(self):
2194         """IPSEC tunnel protect """
2195
2196         p = self.ipv6_params
2197
2198         self.config_network(p)
2199         self.config_sa_tun(p)
2200         self.config_protect(p)
2201
2202         self.verify_tun_66(p, count=127)
2203
2204         c = p.tun_if.get_rx_stats()
2205         self.assertEqual(c['packets'], 127)
2206         c = p.tun_if.get_tx_stats()
2207         self.assertEqual(c['packets'], 127)
2208
2209         # rekey - create new SAs and update the tunnel protection
2210         np = copy.copy(p)
2211         np.crypt_key = b'X' + p.crypt_key[1:]
2212         np.scapy_tun_spi += 100
2213         np.scapy_tun_sa_id += 1
2214         np.vpp_tun_spi += 100
2215         np.vpp_tun_sa_id += 1
2216         np.tun_if.local_spi = p.vpp_tun_spi
2217         np.tun_if.remote_spi = p.scapy_tun_spi
2218
2219         self.config_sa_tun(np)
2220         self.config_protect(np)
2221         self.unconfig_sa(p)
2222
2223         self.verify_tun_66(np, count=127)
2224         c = p.tun_if.get_rx_stats()
2225         self.assertEqual(c['packets'], 254)
2226         c = p.tun_if.get_tx_stats()
2227         self.assertEqual(c['packets'], 254)
2228
2229         # teardown
2230         self.unconfig_protect(np)
2231         self.unconfig_sa(np)
2232         self.unconfig_network(p)
2233
2234
2235 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2236                                   TemplateIpsec6TunProtect,
2237                                   IpsecTun6):
2238     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2239
2240     encryption_type = ESP
2241     tun6_encrypt_node_name = "esp6-encrypt-tun"
2242     tun6_decrypt_node_name = "esp6-decrypt-tun"
2243
2244     def setUp(self):
2245         super(TestIpsec6TunProtectTunDrop, self).setUp()
2246
2247         self.tun_if = self.pg0
2248
2249     def tearDown(self):
2250         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2251
2252     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2253                           payload_size=100):
2254         # the IP destination of the revelaed packet does not match
2255         # that assigned to the tunnel
2256         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2257                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2258                                 dst="5::5") /
2259                            IPv6(src=src, dst=dst) /
2260                            UDP(sport=1144, dport=2233) /
2261                            Raw(b'X' * payload_size))
2262                 for i in range(count)]
2263
2264     def test_tun_drop_66(self):
2265         """IPSEC 6 tunnel protect bogus tunnel header """
2266
2267         p = self.ipv6_params
2268
2269         self.config_network(p)
2270         self.config_sa_tun(p)
2271         self.config_protect(p)
2272
2273         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2274                                     src=p.remote_tun_if_host,
2275                                     dst=self.pg1.remote_ip6,
2276                                     count=63)
2277         self.send_and_assert_no_replies(self.tun_if, tx)
2278
2279         self.unconfig_protect(p)
2280         self.unconfig_sa(p)
2281         self.unconfig_network(p)
2282
2283
2284 if __name__ == '__main__':
2285     unittest.main(testRunner=VppTestRunner)