fib: midchain adjacency optimisations
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
22 from util import ppp
23 from vpp_papi import VppEnum
24
25
26 def config_tun_params(p, encryption_type, tun_if):
27     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29                              IPSEC_API_SAD_FLAG_USE_ESN))
30     crypt_key = mk_scapy_crypt_key(p)
31     p.tun_dst = tun_if.remote_ip
32     p.tun_src = tun_if.local_ip
33     p.scapy_tun_sa = SecurityAssociation(
34         encryption_type, spi=p.vpp_tun_spi,
35         crypt_algo=p.crypt_algo,
36         crypt_key=crypt_key,
37         auth_algo=p.auth_algo, auth_key=p.auth_key,
38         tunnel_header=ip_class_by_addr_type[p.addr_type](
39             src=p.tun_dst,
40             dst=p.tun_src),
41         nat_t_header=p.nat_header,
42         esn_en=esn_en)
43     p.vpp_tun_sa = SecurityAssociation(
44         encryption_type, spi=p.scapy_tun_spi,
45         crypt_algo=p.crypt_algo,
46         crypt_key=crypt_key,
47         auth_algo=p.auth_algo, auth_key=p.auth_key,
48         tunnel_header=ip_class_by_addr_type[p.addr_type](
49             dst=p.tun_dst,
50             src=p.tun_src),
51         nat_t_header=p.nat_header,
52         esn_en=esn_en)
53
54
55 def config_tra_params(p, encryption_type, tun_if):
56     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58                              IPSEC_API_SAD_FLAG_USE_ESN))
59     crypt_key = mk_scapy_crypt_key(p)
60     p.tun_dst = tun_if.remote_ip
61     p.tun_src = tun_if.local_ip
62     p.scapy_tun_sa = SecurityAssociation(
63         encryption_type, spi=p.vpp_tun_spi,
64         crypt_algo=p.crypt_algo,
65         crypt_key=crypt_key,
66         auth_algo=p.auth_algo, auth_key=p.auth_key,
67         esn_en=esn_en)
68     p.vpp_tun_sa = SecurityAssociation(
69         encryption_type, spi=p.scapy_tun_spi,
70         crypt_algo=p.crypt_algo,
71         crypt_key=crypt_key,
72         auth_algo=p.auth_algo, auth_key=p.auth_key,
73         esn_en=esn_en)
74
75
76 class TemplateIpsec4TunIfEsp(TemplateIpsec):
77     """ IPsec tunnel interface tests """
78
79     encryption_type = ESP
80
81     @classmethod
82     def setUpClass(cls):
83         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
84
85     @classmethod
86     def tearDownClass(cls):
87         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
88
89     def setUp(self):
90         super(TemplateIpsec4TunIfEsp, self).setUp()
91
92         self.tun_if = self.pg0
93
94         p = self.ipv4_params
95
96         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
98                                         p.crypt_key, p.crypt_key,
99                                         p.auth_algo_vpp_id, p.auth_key,
100                                         p.auth_key)
101         p.tun_if.add_vpp_config()
102         p.tun_if.admin_up()
103         p.tun_if.config_ip4()
104         p.tun_if.config_ip6()
105         config_tun_params(p, self.encryption_type, p.tun_if)
106
107         r = VppIpRoute(self, p.remote_tun_if_host, 32,
108                        [VppRoutePath(p.tun_if.remote_ip4,
109                                      0xffffffff)])
110         r.add_vpp_config()
111         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
112                        [VppRoutePath(p.tun_if.remote_ip6,
113                                      0xffffffff,
114                                      proto=DpoProto.DPO_PROTO_IP6)])
115         r.add_vpp_config()
116
117     def tearDown(self):
118         super(TemplateIpsec4TunIfEsp, self).tearDown()
119
120
121 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
122     """ IPsec UDP tunnel interface tests """
123
124     tun4_encrypt_node_name = "esp4-encrypt-tun"
125     tun4_decrypt_node_name = "esp4-decrypt-tun"
126     encryption_type = ESP
127
128     @classmethod
129     def setUpClass(cls):
130         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
131
132     @classmethod
133     def tearDownClass(cls):
134         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
135
136     def verify_encrypted(self, p, sa, rxs):
137         for rx in rxs:
138             try:
139                 # ensure the UDP ports are correct before we decrypt
140                 # which strips them
141                 self.assertTrue(rx.haslayer(UDP))
142                 self.assert_equal(rx[UDP].sport, 4500)
143                 self.assert_equal(rx[UDP].dport, 4500)
144
145                 pkt = sa.decrypt(rx[IP])
146                 if not pkt.haslayer(IP):
147                     pkt = IP(pkt[Raw].load)
148
149                 self.assert_packet_checksums_valid(pkt)
150                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
151                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
152             except (IndexError, AssertionError):
153                 self.logger.debug(ppp("Unexpected packet:", rx))
154                 try:
155                     self.logger.debug(ppp("Decrypted packet:", pkt))
156                 except:
157                     pass
158                 raise
159
160     def setUp(self):
161         super(TemplateIpsec4TunIfEspUdp, self).setUp()
162
163         p = self.ipv4_params
164         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
165                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
166         p.nat_header = UDP(sport=5454, dport=4500)
167
168     def config_network(self):
169
170         self.tun_if = self.pg0
171         p = self.ipv4_params
172         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
174                                         p.crypt_key, p.crypt_key,
175                                         p.auth_algo_vpp_id, p.auth_key,
176                                         p.auth_key, udp_encap=True)
177         p.tun_if.add_vpp_config()
178         p.tun_if.admin_up()
179         p.tun_if.config_ip4()
180         p.tun_if.config_ip6()
181         config_tun_params(p, self.encryption_type, p.tun_if)
182
183         r = VppIpRoute(self, p.remote_tun_if_host, 32,
184                        [VppRoutePath(p.tun_if.remote_ip4,
185                                      0xffffffff)])
186         r.add_vpp_config()
187         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
188                        [VppRoutePath(p.tun_if.remote_ip6,
189                                      0xffffffff,
190                                      proto=DpoProto.DPO_PROTO_IP6)])
191         r.add_vpp_config()
192
193     def tearDown(self):
194         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
195
196
197 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
198     """ Ipsec ESP - TUN tests """
199     tun4_encrypt_node_name = "esp4-encrypt-tun"
200     tun4_decrypt_node_name = "esp4-decrypt-tun"
201
202     def test_tun_basic64(self):
203         """ ipsec 6o4 tunnel basic test """
204         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
205
206         self.verify_tun_64(self.params[socket.AF_INET], count=1)
207
208     def test_tun_burst64(self):
209         """ ipsec 6o4 tunnel basic test """
210         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
211
212         self.verify_tun_64(self.params[socket.AF_INET], count=257)
213
214     def test_tun_basic_frag44(self):
215         """ ipsec 4o4 tunnel frag basic test """
216         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
217
218         p = self.ipv4_params
219
220         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
221                                        [1500, 0, 0, 0])
222         self.verify_tun_44(self.params[socket.AF_INET],
223                            count=1, payload_size=1800, n_rx=2)
224         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
225                                        [9000, 0, 0, 0])
226
227
228 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
229     """ Ipsec ESP UDP tests """
230
231     tun4_input_node = "ipsec4-tun-input"
232
233     def setUp(self):
234         super(TemplateIpsec4TunIfEspUdp, self).setUp()
235         self.config_network()
236
237     def test_keepalive(self):
238         """ IPSEC NAT Keepalive """
239         self.verify_keepalive(self.ipv4_params)
240
241
242 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
243     """ Ipsec ESP UDP GCM tests """
244
245     tun4_input_node = "ipsec4-tun-input"
246
247     def setUp(self):
248         super(TemplateIpsec4TunIfEspUdp, self).setUp()
249         p = self.ipv4_params
250         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
251                               IPSEC_API_INTEG_ALG_NONE)
252         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
253                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
254         p.crypt_algo = "AES-GCM"
255         p.auth_algo = "NULL"
256         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
257         p.salt = 0
258         self.config_network()
259
260
261 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
262     """ Ipsec ESP - TCP tests """
263     pass
264
265
266 class TemplateIpsec6TunIfEsp(TemplateIpsec):
267     """ IPsec tunnel interface tests """
268
269     encryption_type = ESP
270
271     def setUp(self):
272         super(TemplateIpsec6TunIfEsp, self).setUp()
273
274         self.tun_if = self.pg0
275
276         p = self.ipv6_params
277         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
278                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
279                                         p.crypt_key, p.crypt_key,
280                                         p.auth_algo_vpp_id, p.auth_key,
281                                         p.auth_key, is_ip6=True)
282         p.tun_if.add_vpp_config()
283         p.tun_if.admin_up()
284         p.tun_if.config_ip6()
285         p.tun_if.config_ip4()
286         config_tun_params(p, self.encryption_type, p.tun_if)
287
288         r = VppIpRoute(self, p.remote_tun_if_host, 128,
289                        [VppRoutePath(p.tun_if.remote_ip6,
290                                      0xffffffff,
291                                      proto=DpoProto.DPO_PROTO_IP6)])
292         r.add_vpp_config()
293         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
294                        [VppRoutePath(p.tun_if.remote_ip4,
295                                      0xffffffff)])
296         r.add_vpp_config()
297
298     def tearDown(self):
299         super(TemplateIpsec6TunIfEsp, self).tearDown()
300
301
302 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
303                           IpsecTun6Tests):
304     """ Ipsec ESP - TUN tests """
305     tun6_encrypt_node_name = "esp6-encrypt-tun"
306     tun6_decrypt_node_name = "esp6-decrypt-tun"
307
308     def test_tun_basic46(self):
309         """ ipsec 4o6 tunnel basic test """
310         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
311         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
312
313     def test_tun_burst46(self):
314         """ ipsec 4o6 tunnel burst test """
315         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
316         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
317
318
319 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
320                                 IpsecTun6HandoffTests):
321     """ Ipsec ESP 6 Handoff tests """
322     tun6_encrypt_node_name = "esp6-encrypt-tun"
323     tun6_decrypt_node_name = "esp6-decrypt-tun"
324
325
326 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
327                                 IpsecTun4HandoffTests):
328     """ Ipsec ESP 4 Handoff tests """
329     tun4_encrypt_node_name = "esp4-encrypt-tun"
330     tun4_decrypt_node_name = "esp4-decrypt-tun"
331
332
333 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
334     """ IPsec IPv4 Multi Tunnel interface """
335
336     encryption_type = ESP
337     tun4_encrypt_node_name = "esp4-encrypt-tun"
338     tun4_decrypt_node_name = "esp4-decrypt-tun"
339
340     def setUp(self):
341         super(TestIpsec4MultiTunIfEsp, self).setUp()
342
343         self.tun_if = self.pg0
344
345         self.multi_params = []
346         self.pg0.generate_remote_hosts(10)
347         self.pg0.configure_ipv4_neighbors()
348
349         for ii in range(10):
350             p = copy.copy(self.ipv4_params)
351
352             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
353             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
354             p.scapy_tun_spi = p.scapy_tun_spi + ii
355             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
356             p.vpp_tun_spi = p.vpp_tun_spi + ii
357
358             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
359             p.scapy_tra_spi = p.scapy_tra_spi + ii
360             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
361             p.vpp_tra_spi = p.vpp_tra_spi + ii
362             p.tun_dst = self.pg0.remote_hosts[ii].ip4
363
364             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
365                                             p.scapy_tun_spi,
366                                             p.crypt_algo_vpp_id,
367                                             p.crypt_key, p.crypt_key,
368                                             p.auth_algo_vpp_id, p.auth_key,
369                                             p.auth_key,
370                                             dst=p.tun_dst)
371             p.tun_if.add_vpp_config()
372             p.tun_if.admin_up()
373             p.tun_if.config_ip4()
374             config_tun_params(p, self.encryption_type, p.tun_if)
375             self.multi_params.append(p)
376
377             VppIpRoute(self, p.remote_tun_if_host, 32,
378                        [VppRoutePath(p.tun_if.remote_ip4,
379                                      0xffffffff)]).add_vpp_config()
380
381     def tearDown(self):
382         super(TestIpsec4MultiTunIfEsp, self).tearDown()
383
384     def test_tun_44(self):
385         """Multiple IPSEC tunnel interfaces """
386         for p in self.multi_params:
387             self.verify_tun_44(p, count=127)
388             c = p.tun_if.get_rx_stats()
389             self.assertEqual(c['packets'], 127)
390             c = p.tun_if.get_tx_stats()
391             self.assertEqual(c['packets'], 127)
392
393     def test_tun_rr_44(self):
394         """ Round-robin packets acrros multiple interface """
395         tx = []
396         for p in self.multi_params:
397             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
398                                             src=p.remote_tun_if_host,
399                                             dst=self.pg1.remote_ip4)
400         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
401
402         for rx, p in zip(rxs, self.multi_params):
403             self.verify_decrypted(p, [rx])
404
405         tx = []
406         for p in self.multi_params:
407             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
408                                     dst=p.remote_tun_if_host)
409         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
410
411         for rx, p in zip(rxs, self.multi_params):
412             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
413
414
415 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
416     """ IPsec IPv4 Tunnel interface all Algos """
417
418     encryption_type = ESP
419     tun4_encrypt_node_name = "esp4-encrypt-tun"
420     tun4_decrypt_node_name = "esp4-decrypt-tun"
421
422     def config_network(self, p):
423
424         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
425                                         p.scapy_tun_spi,
426                                         p.crypt_algo_vpp_id,
427                                         p.crypt_key, p.crypt_key,
428                                         p.auth_algo_vpp_id, p.auth_key,
429                                         p.auth_key,
430                                         salt=p.salt)
431         p.tun_if.add_vpp_config()
432         p.tun_if.admin_up()
433         p.tun_if.config_ip4()
434         config_tun_params(p, self.encryption_type, p.tun_if)
435         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
436         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
437
438         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
439                              [VppRoutePath(p.tun_if.remote_ip4,
440                                            0xffffffff)])
441         p.route.add_vpp_config()
442
443     def unconfig_network(self, p):
444         p.tun_if.unconfig_ip4()
445         p.tun_if.remove_vpp_config()
446         p.route.remove_vpp_config()
447
448     def setUp(self):
449         super(TestIpsec4TunIfEspAll, self).setUp()
450
451         self.tun_if = self.pg0
452
453     def tearDown(self):
454         super(TestIpsec4TunIfEspAll, self).tearDown()
455
456     def rekey(self, p):
457         #
458         # change the key and the SPI
459         #
460         p.crypt_key = b'X' + p.crypt_key[1:]
461         p.scapy_tun_spi += 1
462         p.scapy_tun_sa_id += 1
463         p.vpp_tun_spi += 1
464         p.vpp_tun_sa_id += 1
465         p.tun_if.local_spi = p.vpp_tun_spi
466         p.tun_if.remote_spi = p.scapy_tun_spi
467
468         config_tun_params(p, self.encryption_type, p.tun_if)
469
470         p.tun_sa_in = VppIpsecSA(self,
471                                  p.scapy_tun_sa_id,
472                                  p.scapy_tun_spi,
473                                  p.auth_algo_vpp_id,
474                                  p.auth_key,
475                                  p.crypt_algo_vpp_id,
476                                  p.crypt_key,
477                                  self.vpp_esp_protocol,
478                                  flags=p.flags,
479                                  salt=p.salt)
480         p.tun_sa_out = VppIpsecSA(self,
481                                   p.vpp_tun_sa_id,
482                                   p.vpp_tun_spi,
483                                   p.auth_algo_vpp_id,
484                                   p.auth_key,
485                                   p.crypt_algo_vpp_id,
486                                   p.crypt_key,
487                                   self.vpp_esp_protocol,
488                                   flags=p.flags,
489                                   salt=p.salt)
490         p.tun_sa_in.add_vpp_config()
491         p.tun_sa_out.add_vpp_config()
492
493         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
494                                          sa_id=p.tun_sa_in.id,
495                                          is_outbound=1)
496         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
497                                          sa_id=p.tun_sa_out.id,
498                                          is_outbound=0)
499         self.logger.info(self.vapi.cli("sh ipsec sa"))
500
501     def test_tun_44(self):
502         """IPSEC tunnel all algos """
503
504         # foreach VPP crypto engine
505         engines = ["ia32", "ipsecmb", "openssl"]
506
507         # foreach crypto algorithm
508         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
509                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
510                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
511                                 IPSEC_API_INTEG_ALG_NONE),
512                   'scapy-crypto': "AES-GCM",
513                   'scapy-integ': "NULL",
514                   'key': b"JPjyOWBeVEQiMe7h",
515                   'salt': 3333},
516                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
517                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
518                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
519                                 IPSEC_API_INTEG_ALG_NONE),
520                   'scapy-crypto': "AES-GCM",
521                   'scapy-integ': "NULL",
522                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
523                   'salt': 0},
524                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
525                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
526                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
527                                 IPSEC_API_INTEG_ALG_NONE),
528                   'scapy-crypto': "AES-GCM",
529                   'scapy-integ': "NULL",
530                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
531                   'salt': 9999},
532                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
533                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
534                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
535                                 IPSEC_API_INTEG_ALG_SHA1_96),
536                   'scapy-crypto': "AES-CBC",
537                   'scapy-integ': "HMAC-SHA1-96",
538                   'salt': 0,
539                   'key': b"JPjyOWBeVEQiMe7h"},
540                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
541                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
542                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
543                                 IPSEC_API_INTEG_ALG_SHA1_96),
544                   'scapy-crypto': "AES-CBC",
545                   'scapy-integ': "HMAC-SHA1-96",
546                   'salt': 0,
547                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
548                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
549                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
550                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
551                                 IPSEC_API_INTEG_ALG_SHA1_96),
552                   'scapy-crypto': "AES-CBC",
553                   'scapy-integ': "HMAC-SHA1-96",
554                   'salt': 0,
555                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
556                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
557                                  IPSEC_API_CRYPTO_ALG_NONE),
558                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
559                                 IPSEC_API_INTEG_ALG_SHA1_96),
560                   'scapy-crypto': "NULL",
561                   'scapy-integ': "HMAC-SHA1-96",
562                   'salt': 0,
563                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
564
565         for engine in engines:
566             self.vapi.cli("set crypto handler all %s" % engine)
567
568             #
569             # loop through each of the algorithms
570             #
571             for algo in algos:
572                 # with self.subTest(algo=algo['scapy']):
573
574                 p = copy.copy(self.ipv4_params)
575                 p.auth_algo_vpp_id = algo['vpp-integ']
576                 p.crypt_algo_vpp_id = algo['vpp-crypto']
577                 p.crypt_algo = algo['scapy-crypto']
578                 p.auth_algo = algo['scapy-integ']
579                 p.crypt_key = algo['key']
580                 p.salt = algo['salt']
581
582                 self.config_network(p)
583
584                 self.verify_tun_44(p, count=127)
585                 c = p.tun_if.get_rx_stats()
586                 self.assertEqual(c['packets'], 127)
587                 c = p.tun_if.get_tx_stats()
588                 self.assertEqual(c['packets'], 127)
589
590                 #
591                 # rekey the tunnel
592                 #
593                 self.rekey(p)
594                 self.verify_tun_44(p, count=127)
595
596                 self.unconfig_network(p)
597                 p.tun_sa_out.remove_vpp_config()
598                 p.tun_sa_in.remove_vpp_config()
599
600
601 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
602     """ IPsec IPv4 Tunnel interface no Algos """
603
604     encryption_type = ESP
605     tun4_encrypt_node_name = "esp4-encrypt-tun"
606     tun4_decrypt_node_name = "esp4-decrypt-tun"
607
608     def config_network(self, p):
609
610         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
611                               IPSEC_API_INTEG_ALG_NONE)
612         p.auth_algo = 'NULL'
613         p.auth_key = []
614
615         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
616                                IPSEC_API_CRYPTO_ALG_NONE)
617         p.crypt_algo = 'NULL'
618         p.crypt_key = []
619
620         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
621                                         p.scapy_tun_spi,
622                                         p.crypt_algo_vpp_id,
623                                         p.crypt_key, p.crypt_key,
624                                         p.auth_algo_vpp_id, p.auth_key,
625                                         p.auth_key,
626                                         salt=p.salt)
627         p.tun_if.add_vpp_config()
628         p.tun_if.admin_up()
629         p.tun_if.config_ip4()
630         config_tun_params(p, self.encryption_type, p.tun_if)
631         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
632         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
633
634         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
635                              [VppRoutePath(p.tun_if.remote_ip4,
636                                            0xffffffff)])
637         p.route.add_vpp_config()
638
639     def unconfig_network(self, p):
640         p.tun_if.unconfig_ip4()
641         p.tun_if.remove_vpp_config()
642         p.route.remove_vpp_config()
643
644     def setUp(self):
645         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
646
647         self.tun_if = self.pg0
648
649     def tearDown(self):
650         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
651
652     def test_tun_44(self):
653         """ IPSec SA with NULL algos """
654         p = self.ipv4_params
655
656         self.config_network(p)
657
658         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
659                            dst=p.remote_tun_if_host)
660         self.send_and_assert_no_replies(self.pg1, tx)
661
662         self.unconfig_network(p)
663
664
665 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
666     """ IPsec IPv6 Multi Tunnel interface """
667
668     encryption_type = ESP
669     tun6_encrypt_node_name = "esp6-encrypt-tun"
670     tun6_decrypt_node_name = "esp6-decrypt-tun"
671
672     def setUp(self):
673         super(TestIpsec6MultiTunIfEsp, self).setUp()
674
675         self.tun_if = self.pg0
676
677         self.multi_params = []
678         self.pg0.generate_remote_hosts(10)
679         self.pg0.configure_ipv6_neighbors()
680
681         for ii in range(10):
682             p = copy.copy(self.ipv6_params)
683
684             p.remote_tun_if_host = "1111::%d" % (ii + 1)
685             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
686             p.scapy_tun_spi = p.scapy_tun_spi + ii
687             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
688             p.vpp_tun_spi = p.vpp_tun_spi + ii
689
690             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
691             p.scapy_tra_spi = p.scapy_tra_spi + ii
692             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
693             p.vpp_tra_spi = p.vpp_tra_spi + ii
694
695             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
696                                             p.scapy_tun_spi,
697                                             p.crypt_algo_vpp_id,
698                                             p.crypt_key, p.crypt_key,
699                                             p.auth_algo_vpp_id, p.auth_key,
700                                             p.auth_key, is_ip6=True,
701                                             dst=self.pg0.remote_hosts[ii].ip6)
702             p.tun_if.add_vpp_config()
703             p.tun_if.admin_up()
704             p.tun_if.config_ip6()
705             config_tun_params(p, self.encryption_type, p.tun_if)
706             self.multi_params.append(p)
707
708             r = VppIpRoute(self, p.remote_tun_if_host, 128,
709                            [VppRoutePath(p.tun_if.remote_ip6,
710                                          0xffffffff,
711                                          proto=DpoProto.DPO_PROTO_IP6)])
712             r.add_vpp_config()
713
714     def tearDown(self):
715         super(TestIpsec6MultiTunIfEsp, self).tearDown()
716
717     def test_tun_66(self):
718         """Multiple IPSEC tunnel interfaces """
719         for p in self.multi_params:
720             self.verify_tun_66(p, count=127)
721             c = p.tun_if.get_rx_stats()
722             self.assertEqual(c['packets'], 127)
723             c = p.tun_if.get_tx_stats()
724             self.assertEqual(c['packets'], 127)
725
726
727 class TestIpsecGreTebIfEsp(TemplateIpsec,
728                            IpsecTun4Tests):
729     """ Ipsec GRE TEB ESP - TUN tests """
730     tun4_encrypt_node_name = "esp4-encrypt-tun"
731     tun4_decrypt_node_name = "esp4-decrypt-tun"
732     encryption_type = ESP
733     omac = "00:11:22:33:44:55"
734
735     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
736                          payload_size=100):
737         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
738                 sa.encrypt(IP(src=self.pg0.remote_ip4,
739                               dst=self.pg0.local_ip4) /
740                            GRE() /
741                            Ether(dst=self.omac) /
742                            IP(src="1.1.1.1", dst="1.1.1.2") /
743                            UDP(sport=1144, dport=2233) /
744                            Raw(b'X' * payload_size))
745                 for i in range(count)]
746
747     def gen_pkts(self, sw_intf, src, dst, count=1,
748                  payload_size=100):
749         return [Ether(dst=self.omac) /
750                 IP(src="1.1.1.1", dst="1.1.1.2") /
751                 UDP(sport=1144, dport=2233) /
752                 Raw(b'X' * payload_size)
753                 for i in range(count)]
754
755     def verify_decrypted(self, p, rxs):
756         for rx in rxs:
757             self.assert_equal(rx[Ether].dst, self.omac)
758             self.assert_equal(rx[IP].dst, "1.1.1.2")
759
760     def verify_encrypted(self, p, sa, rxs):
761         for rx in rxs:
762             try:
763                 pkt = sa.decrypt(rx[IP])
764                 if not pkt.haslayer(IP):
765                     pkt = IP(pkt[Raw].load)
766                 self.assert_packet_checksums_valid(pkt)
767                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
768                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
769                 self.assertTrue(pkt.haslayer(GRE))
770                 e = pkt[Ether]
771                 self.assertEqual(e[Ether].dst, self.omac)
772                 self.assertEqual(e[IP].dst, "1.1.1.2")
773             except (IndexError, AssertionError):
774                 self.logger.debug(ppp("Unexpected packet:", rx))
775                 try:
776                     self.logger.debug(ppp("Decrypted packet:", pkt))
777                 except:
778                     pass
779                 raise
780
781     def setUp(self):
782         super(TestIpsecGreTebIfEsp, self).setUp()
783
784         self.tun_if = self.pg0
785
786         p = self.ipv4_params
787
788         bd1 = VppBridgeDomain(self, 1)
789         bd1.add_vpp_config()
790
791         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
792                                   p.auth_algo_vpp_id, p.auth_key,
793                                   p.crypt_algo_vpp_id, p.crypt_key,
794                                   self.vpp_esp_protocol,
795                                   self.pg0.local_ip4,
796                                   self.pg0.remote_ip4)
797         p.tun_sa_out.add_vpp_config()
798
799         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
800                                  p.auth_algo_vpp_id, p.auth_key,
801                                  p.crypt_algo_vpp_id, p.crypt_key,
802                                  self.vpp_esp_protocol,
803                                  self.pg0.remote_ip4,
804                                  self.pg0.local_ip4)
805         p.tun_sa_in.add_vpp_config()
806
807         p.tun_if = VppGreInterface(self,
808                                    self.pg0.local_ip4,
809                                    self.pg0.remote_ip4,
810                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
811                                          GRE_API_TUNNEL_TYPE_TEB))
812         p.tun_if.add_vpp_config()
813
814         p.tun_protect = VppIpsecTunProtect(self,
815                                            p.tun_if,
816                                            p.tun_sa_out,
817                                            [p.tun_sa_in])
818
819         p.tun_protect.add_vpp_config()
820
821         p.tun_if.admin_up()
822         p.tun_if.config_ip4()
823         config_tun_params(p, self.encryption_type, p.tun_if)
824
825         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
826         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
827
828         self.vapi.cli("clear ipsec sa")
829         self.vapi.cli("sh adj")
830         self.vapi.cli("sh ipsec tun")
831
832     def tearDown(self):
833         p = self.ipv4_params
834         p.tun_if.unconfig_ip4()
835         super(TestIpsecGreTebIfEsp, self).tearDown()
836
837
838 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
839                                IpsecTun4Tests):
840     """ Ipsec GRE TEB ESP - TUN tests """
841     tun4_encrypt_node_name = "esp4-encrypt-tun"
842     tun4_decrypt_node_name = "esp4-decrypt-tun"
843     encryption_type = ESP
844     omac = "00:11:22:33:44:55"
845
846     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
847                          payload_size=100):
848         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
849                 sa.encrypt(IP(src=self.pg0.remote_ip4,
850                               dst=self.pg0.local_ip4) /
851                            GRE() /
852                            Ether(dst=self.omac) /
853                            IP(src="1.1.1.1", dst="1.1.1.2") /
854                            UDP(sport=1144, dport=2233) /
855                            Raw(b'X' * payload_size))
856                 for i in range(count)]
857
858     def gen_pkts(self, sw_intf, src, dst, count=1,
859                  payload_size=100):
860         return [Ether(dst=self.omac) /
861                 Dot1Q(vlan=11) /
862                 IP(src="1.1.1.1", dst="1.1.1.2") /
863                 UDP(sport=1144, dport=2233) /
864                 Raw(b'X' * payload_size)
865                 for i in range(count)]
866
867     def verify_decrypted(self, p, rxs):
868         for rx in rxs:
869             self.assert_equal(rx[Ether].dst, self.omac)
870             self.assert_equal(rx[Dot1Q].vlan, 11)
871             self.assert_equal(rx[IP].dst, "1.1.1.2")
872
873     def verify_encrypted(self, p, sa, rxs):
874         for rx in rxs:
875             try:
876                 pkt = sa.decrypt(rx[IP])
877                 if not pkt.haslayer(IP):
878                     pkt = IP(pkt[Raw].load)
879                 self.assert_packet_checksums_valid(pkt)
880                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
881                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
882                 self.assertTrue(pkt.haslayer(GRE))
883                 e = pkt[Ether]
884                 self.assertEqual(e[Ether].dst, self.omac)
885                 self.assertFalse(e.haslayer(Dot1Q))
886                 self.assertEqual(e[IP].dst, "1.1.1.2")
887             except (IndexError, AssertionError):
888                 self.logger.debug(ppp("Unexpected packet:", rx))
889                 try:
890                     self.logger.debug(ppp("Decrypted packet:", pkt))
891                 except:
892                     pass
893                 raise
894
895     def setUp(self):
896         super(TestIpsecGreTebVlanIfEsp, self).setUp()
897
898         self.tun_if = self.pg0
899
900         p = self.ipv4_params
901
902         bd1 = VppBridgeDomain(self, 1)
903         bd1.add_vpp_config()
904
905         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
906         self.vapi.l2_interface_vlan_tag_rewrite(
907             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
908             push_dot1q=11)
909         self.pg1_11.admin_up()
910
911         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
912                                   p.auth_algo_vpp_id, p.auth_key,
913                                   p.crypt_algo_vpp_id, p.crypt_key,
914                                   self.vpp_esp_protocol,
915                                   self.pg0.local_ip4,
916                                   self.pg0.remote_ip4)
917         p.tun_sa_out.add_vpp_config()
918
919         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
920                                  p.auth_algo_vpp_id, p.auth_key,
921                                  p.crypt_algo_vpp_id, p.crypt_key,
922                                  self.vpp_esp_protocol,
923                                  self.pg0.remote_ip4,
924                                  self.pg0.local_ip4)
925         p.tun_sa_in.add_vpp_config()
926
927         p.tun_if = VppGreInterface(self,
928                                    self.pg0.local_ip4,
929                                    self.pg0.remote_ip4,
930                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
931                                          GRE_API_TUNNEL_TYPE_TEB))
932         p.tun_if.add_vpp_config()
933
934         p.tun_protect = VppIpsecTunProtect(self,
935                                            p.tun_if,
936                                            p.tun_sa_out,
937                                            [p.tun_sa_in])
938
939         p.tun_protect.add_vpp_config()
940
941         p.tun_if.admin_up()
942         p.tun_if.config_ip4()
943         config_tun_params(p, self.encryption_type, p.tun_if)
944
945         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
946         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
947
948         self.vapi.cli("clear ipsec sa")
949
950     def tearDown(self):
951         p = self.ipv4_params
952         p.tun_if.unconfig_ip4()
953         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
954         self.pg1_11.admin_down()
955         self.pg1_11.remove_vpp_config()
956
957
958 class TestIpsecGreTebIfEspTra(TemplateIpsec,
959                               IpsecTun4Tests):
960     """ Ipsec GRE TEB ESP - Tra tests """
961     tun4_encrypt_node_name = "esp4-encrypt-tun"
962     tun4_decrypt_node_name = "esp4-decrypt-tun"
963     encryption_type = ESP
964     omac = "00:11:22:33:44:55"
965
966     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
967                          payload_size=100):
968         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
969                 sa.encrypt(IP(src=self.pg0.remote_ip4,
970                               dst=self.pg0.local_ip4) /
971                            GRE() /
972                            Ether(dst=self.omac) /
973                            IP(src="1.1.1.1", dst="1.1.1.2") /
974                            UDP(sport=1144, dport=2233) /
975                            Raw(b'X' * payload_size))
976                 for i in range(count)]
977
978     def gen_pkts(self, sw_intf, src, dst, count=1,
979                  payload_size=100):
980         return [Ether(dst=self.omac) /
981                 IP(src="1.1.1.1", dst="1.1.1.2") /
982                 UDP(sport=1144, dport=2233) /
983                 Raw(b'X' * payload_size)
984                 for i in range(count)]
985
986     def verify_decrypted(self, p, rxs):
987         for rx in rxs:
988             self.assert_equal(rx[Ether].dst, self.omac)
989             self.assert_equal(rx[IP].dst, "1.1.1.2")
990
991     def verify_encrypted(self, p, sa, rxs):
992         for rx in rxs:
993             try:
994                 pkt = sa.decrypt(rx[IP])
995                 if not pkt.haslayer(IP):
996                     pkt = IP(pkt[Raw].load)
997                 self.assert_packet_checksums_valid(pkt)
998                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
999                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1000                 self.assertTrue(pkt.haslayer(GRE))
1001                 e = pkt[Ether]
1002                 self.assertEqual(e[Ether].dst, self.omac)
1003                 self.assertEqual(e[IP].dst, "1.1.1.2")
1004             except (IndexError, AssertionError):
1005                 self.logger.debug(ppp("Unexpected packet:", rx))
1006                 try:
1007                     self.logger.debug(ppp("Decrypted packet:", pkt))
1008                 except:
1009                     pass
1010                 raise
1011
1012     def setUp(self):
1013         super(TestIpsecGreTebIfEspTra, self).setUp()
1014
1015         self.tun_if = self.pg0
1016
1017         p = self.ipv4_params
1018
1019         bd1 = VppBridgeDomain(self, 1)
1020         bd1.add_vpp_config()
1021
1022         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1023                                   p.auth_algo_vpp_id, p.auth_key,
1024                                   p.crypt_algo_vpp_id, p.crypt_key,
1025                                   self.vpp_esp_protocol)
1026         p.tun_sa_out.add_vpp_config()
1027
1028         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1029                                  p.auth_algo_vpp_id, p.auth_key,
1030                                  p.crypt_algo_vpp_id, p.crypt_key,
1031                                  self.vpp_esp_protocol)
1032         p.tun_sa_in.add_vpp_config()
1033
1034         p.tun_if = VppGreInterface(self,
1035                                    self.pg0.local_ip4,
1036                                    self.pg0.remote_ip4,
1037                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1038                                          GRE_API_TUNNEL_TYPE_TEB))
1039         p.tun_if.add_vpp_config()
1040
1041         p.tun_protect = VppIpsecTunProtect(self,
1042                                            p.tun_if,
1043                                            p.tun_sa_out,
1044                                            [p.tun_sa_in])
1045
1046         p.tun_protect.add_vpp_config()
1047
1048         p.tun_if.admin_up()
1049         p.tun_if.config_ip4()
1050         config_tra_params(p, self.encryption_type, p.tun_if)
1051
1052         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1053         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1054
1055         self.vapi.cli("clear ipsec sa")
1056
1057     def tearDown(self):
1058         p = self.ipv4_params
1059         p.tun_if.unconfig_ip4()
1060         super(TestIpsecGreTebIfEspTra, self).tearDown()
1061
1062
1063 class TestIpsecGreIfEsp(TemplateIpsec,
1064                         IpsecTun4Tests):
1065     """ Ipsec GRE ESP - TUN tests """
1066     tun4_encrypt_node_name = "esp4-encrypt-tun"
1067     tun4_decrypt_node_name = "esp4-decrypt-tun"
1068     encryption_type = ESP
1069
1070     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1071                          payload_size=100):
1072         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1073                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1074                               dst=self.pg0.local_ip4) /
1075                            GRE() /
1076                            IP(src=self.pg1.local_ip4,
1077                               dst=self.pg1.remote_ip4) /
1078                            UDP(sport=1144, dport=2233) /
1079                            Raw(b'X' * payload_size))
1080                 for i in range(count)]
1081
1082     def gen_pkts(self, sw_intf, src, dst, count=1,
1083                  payload_size=100):
1084         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1085                 IP(src="1.1.1.1", dst="1.1.1.2") /
1086                 UDP(sport=1144, dport=2233) /
1087                 Raw(b'X' * payload_size)
1088                 for i in range(count)]
1089
1090     def verify_decrypted(self, p, rxs):
1091         for rx in rxs:
1092             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1093             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1094
1095     def verify_encrypted(self, p, sa, rxs):
1096         for rx in rxs:
1097             try:
1098                 pkt = sa.decrypt(rx[IP])
1099                 if not pkt.haslayer(IP):
1100                     pkt = IP(pkt[Raw].load)
1101                 self.assert_packet_checksums_valid(pkt)
1102                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1103                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1104                 self.assertTrue(pkt.haslayer(GRE))
1105                 e = pkt[GRE]
1106                 self.assertEqual(e[IP].dst, "1.1.1.2")
1107             except (IndexError, AssertionError):
1108                 self.logger.debug(ppp("Unexpected packet:", rx))
1109                 try:
1110                     self.logger.debug(ppp("Decrypted packet:", pkt))
1111                 except:
1112                     pass
1113                 raise
1114
1115     def setUp(self):
1116         super(TestIpsecGreIfEsp, self).setUp()
1117
1118         self.tun_if = self.pg0
1119
1120         p = self.ipv4_params
1121
1122         bd1 = VppBridgeDomain(self, 1)
1123         bd1.add_vpp_config()
1124
1125         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1126                                   p.auth_algo_vpp_id, p.auth_key,
1127                                   p.crypt_algo_vpp_id, p.crypt_key,
1128                                   self.vpp_esp_protocol,
1129                                   self.pg0.local_ip4,
1130                                   self.pg0.remote_ip4)
1131         p.tun_sa_out.add_vpp_config()
1132
1133         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1134                                  p.auth_algo_vpp_id, p.auth_key,
1135                                  p.crypt_algo_vpp_id, p.crypt_key,
1136                                  self.vpp_esp_protocol,
1137                                  self.pg0.remote_ip4,
1138                                  self.pg0.local_ip4)
1139         p.tun_sa_in.add_vpp_config()
1140
1141         p.tun_if = VppGreInterface(self,
1142                                    self.pg0.local_ip4,
1143                                    self.pg0.remote_ip4)
1144         p.tun_if.add_vpp_config()
1145
1146         p.tun_protect = VppIpsecTunProtect(self,
1147                                            p.tun_if,
1148                                            p.tun_sa_out,
1149                                            [p.tun_sa_in])
1150         p.tun_protect.add_vpp_config()
1151
1152         p.tun_if.admin_up()
1153         p.tun_if.config_ip4()
1154         config_tun_params(p, self.encryption_type, p.tun_if)
1155
1156         VppIpRoute(self, "1.1.1.2", 32,
1157                    [VppRoutePath(p.tun_if.remote_ip4,
1158                                  0xffffffff)]).add_vpp_config()
1159
1160     def tearDown(self):
1161         p = self.ipv4_params
1162         p.tun_if.unconfig_ip4()
1163         super(TestIpsecGreIfEsp, self).tearDown()
1164
1165
1166 class TestIpsecGreIfEspTra(TemplateIpsec,
1167                            IpsecTun4Tests):
1168     """ Ipsec GRE ESP - TRA tests """
1169     tun4_encrypt_node_name = "esp4-encrypt-tun"
1170     tun4_decrypt_node_name = "esp4-decrypt-tun"
1171     encryption_type = ESP
1172
1173     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1174                          payload_size=100):
1175         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1176                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1177                               dst=self.pg0.local_ip4) /
1178                            GRE() /
1179                            IP(src=self.pg1.local_ip4,
1180                               dst=self.pg1.remote_ip4) /
1181                            UDP(sport=1144, dport=2233) /
1182                            Raw(b'X' * payload_size))
1183                 for i in range(count)]
1184
1185     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1186                                 payload_size=100):
1187         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1188                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1189                               dst=self.pg0.local_ip4) /
1190                            GRE() /
1191                            UDP(sport=1144, dport=2233) /
1192                            Raw(b'X' * payload_size))
1193                 for i in range(count)]
1194
1195     def gen_pkts(self, sw_intf, src, dst, count=1,
1196                  payload_size=100):
1197         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1198                 IP(src="1.1.1.1", dst="1.1.1.2") /
1199                 UDP(sport=1144, dport=2233) /
1200                 Raw(b'X' * payload_size)
1201                 for i in range(count)]
1202
1203     def verify_decrypted(self, p, rxs):
1204         for rx in rxs:
1205             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1206             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1207
1208     def verify_encrypted(self, p, sa, rxs):
1209         for rx in rxs:
1210             try:
1211                 pkt = sa.decrypt(rx[IP])
1212                 if not pkt.haslayer(IP):
1213                     pkt = IP(pkt[Raw].load)
1214                 self.assert_packet_checksums_valid(pkt)
1215                 self.assertTrue(pkt.haslayer(GRE))
1216                 e = pkt[GRE]
1217                 self.assertEqual(e[IP].dst, "1.1.1.2")
1218             except (IndexError, AssertionError):
1219                 self.logger.debug(ppp("Unexpected packet:", rx))
1220                 try:
1221                     self.logger.debug(ppp("Decrypted packet:", pkt))
1222                 except:
1223                     pass
1224                 raise
1225
1226     def setUp(self):
1227         super(TestIpsecGreIfEspTra, self).setUp()
1228
1229         self.tun_if = self.pg0
1230
1231         p = self.ipv4_params
1232
1233         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1234                                   p.auth_algo_vpp_id, p.auth_key,
1235                                   p.crypt_algo_vpp_id, p.crypt_key,
1236                                   self.vpp_esp_protocol)
1237         p.tun_sa_out.add_vpp_config()
1238
1239         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1240                                  p.auth_algo_vpp_id, p.auth_key,
1241                                  p.crypt_algo_vpp_id, p.crypt_key,
1242                                  self.vpp_esp_protocol)
1243         p.tun_sa_in.add_vpp_config()
1244
1245         p.tun_if = VppGreInterface(self,
1246                                    self.pg0.local_ip4,
1247                                    self.pg0.remote_ip4)
1248         p.tun_if.add_vpp_config()
1249
1250         p.tun_protect = VppIpsecTunProtect(self,
1251                                            p.tun_if,
1252                                            p.tun_sa_out,
1253                                            [p.tun_sa_in])
1254         p.tun_protect.add_vpp_config()
1255
1256         p.tun_if.admin_up()
1257         p.tun_if.config_ip4()
1258         config_tra_params(p, self.encryption_type, p.tun_if)
1259
1260         VppIpRoute(self, "1.1.1.2", 32,
1261                    [VppRoutePath(p.tun_if.remote_ip4,
1262                                  0xffffffff)]).add_vpp_config()
1263
1264     def tearDown(self):
1265         p = self.ipv4_params
1266         p.tun_if.unconfig_ip4()
1267         super(TestIpsecGreIfEspTra, self).tearDown()
1268
1269     def test_gre_non_ip(self):
1270         p = self.ipv4_params
1271         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1272                                           src=p.remote_tun_if_host,
1273                                           dst=self.pg1.remote_ip6)
1274         self.send_and_assert_no_replies(self.tun_if, tx)
1275         node_name = ('/err/%s/unsupported payload' %
1276                      self.tun4_decrypt_node_name)
1277         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1278
1279
1280 class TestIpsecGre6IfEspTra(TemplateIpsec,
1281                             IpsecTun6Tests):
1282     """ Ipsec GRE ESP - TRA tests """
1283     tun6_encrypt_node_name = "esp6-encrypt-tun"
1284     tun6_decrypt_node_name = "esp6-decrypt-tun"
1285     encryption_type = ESP
1286
1287     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1288                           payload_size=100):
1289         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1290                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1291                                 dst=self.pg0.local_ip6) /
1292                            GRE() /
1293                            IPv6(src=self.pg1.local_ip6,
1294                                 dst=self.pg1.remote_ip6) /
1295                            UDP(sport=1144, dport=2233) /
1296                            Raw(b'X' * payload_size))
1297                 for i in range(count)]
1298
1299     def gen_pkts6(self, sw_intf, src, dst, count=1,
1300                   payload_size=100):
1301         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1302                 IPv6(src="1::1", dst="1::2") /
1303                 UDP(sport=1144, dport=2233) /
1304                 Raw(b'X' * payload_size)
1305                 for i in range(count)]
1306
1307     def verify_decrypted6(self, p, rxs):
1308         for rx in rxs:
1309             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1310             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1311
1312     def verify_encrypted6(self, p, sa, rxs):
1313         for rx in rxs:
1314             try:
1315                 pkt = sa.decrypt(rx[IPv6])
1316                 if not pkt.haslayer(IPv6):
1317                     pkt = IPv6(pkt[Raw].load)
1318                 self.assert_packet_checksums_valid(pkt)
1319                 self.assertTrue(pkt.haslayer(GRE))
1320                 e = pkt[GRE]
1321                 self.assertEqual(e[IPv6].dst, "1::2")
1322             except (IndexError, AssertionError):
1323                 self.logger.debug(ppp("Unexpected packet:", rx))
1324                 try:
1325                     self.logger.debug(ppp("Decrypted packet:", pkt))
1326                 except:
1327                     pass
1328                 raise
1329
1330     def setUp(self):
1331         super(TestIpsecGre6IfEspTra, self).setUp()
1332
1333         self.tun_if = self.pg0
1334
1335         p = self.ipv6_params
1336
1337         bd1 = VppBridgeDomain(self, 1)
1338         bd1.add_vpp_config()
1339
1340         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1341                                   p.auth_algo_vpp_id, p.auth_key,
1342                                   p.crypt_algo_vpp_id, p.crypt_key,
1343                                   self.vpp_esp_protocol)
1344         p.tun_sa_out.add_vpp_config()
1345
1346         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1347                                  p.auth_algo_vpp_id, p.auth_key,
1348                                  p.crypt_algo_vpp_id, p.crypt_key,
1349                                  self.vpp_esp_protocol)
1350         p.tun_sa_in.add_vpp_config()
1351
1352         p.tun_if = VppGreInterface(self,
1353                                    self.pg0.local_ip6,
1354                                    self.pg0.remote_ip6)
1355         p.tun_if.add_vpp_config()
1356
1357         p.tun_protect = VppIpsecTunProtect(self,
1358                                            p.tun_if,
1359                                            p.tun_sa_out,
1360                                            [p.tun_sa_in])
1361         p.tun_protect.add_vpp_config()
1362
1363         p.tun_if.admin_up()
1364         p.tun_if.config_ip6()
1365         config_tra_params(p, self.encryption_type, p.tun_if)
1366
1367         r = VppIpRoute(self, "1::2", 128,
1368                        [VppRoutePath(p.tun_if.remote_ip6,
1369                                      0xffffffff,
1370                                      proto=DpoProto.DPO_PROTO_IP6)])
1371         r.add_vpp_config()
1372
1373     def tearDown(self):
1374         p = self.ipv6_params
1375         p.tun_if.unconfig_ip6()
1376         super(TestIpsecGre6IfEspTra, self).tearDown()
1377
1378
1379 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1380     """ Ipsec mGRE ESP v4 TRA tests """
1381     tun4_encrypt_node_name = "esp4-encrypt-tun"
1382     tun4_decrypt_node_name = "esp4-decrypt-tun"
1383     encryption_type = ESP
1384
1385     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1386                          payload_size=100):
1387         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1388                 sa.encrypt(IP(src=p.tun_dst,
1389                               dst=self.pg0.local_ip4) /
1390                            GRE() /
1391                            IP(src=self.pg1.local_ip4,
1392                               dst=self.pg1.remote_ip4) /
1393                            UDP(sport=1144, dport=2233) /
1394                            Raw(b'X' * payload_size))
1395                 for i in range(count)]
1396
1397     def gen_pkts(self, sw_intf, src, dst, count=1,
1398                  payload_size=100):
1399         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1400                 IP(src="1.1.1.1", dst=dst) /
1401                 UDP(sport=1144, dport=2233) /
1402                 Raw(b'X' * payload_size)
1403                 for i in range(count)]
1404
1405     def verify_decrypted(self, p, rxs):
1406         for rx in rxs:
1407             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1408             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1409
1410     def verify_encrypted(self, p, sa, rxs):
1411         for rx in rxs:
1412             try:
1413                 pkt = sa.decrypt(rx[IP])
1414                 if not pkt.haslayer(IP):
1415                     pkt = IP(pkt[Raw].load)
1416                 self.assert_packet_checksums_valid(pkt)
1417                 self.assertTrue(pkt.haslayer(GRE))
1418                 e = pkt[GRE]
1419                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1420             except (IndexError, AssertionError):
1421                 self.logger.debug(ppp("Unexpected packet:", rx))
1422                 try:
1423                     self.logger.debug(ppp("Decrypted packet:", pkt))
1424                 except:
1425                     pass
1426                 raise
1427
1428     def setUp(self):
1429         super(TestIpsecMGreIfEspTra4, self).setUp()
1430
1431         N_NHS = 16
1432         self.tun_if = self.pg0
1433         p = self.ipv4_params
1434         p.tun_if = VppGreInterface(self,
1435                                    self.pg0.local_ip4,
1436                                    "0.0.0.0",
1437                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1438                                          TUNNEL_API_MODE_MP))
1439         p.tun_if.add_vpp_config()
1440         p.tun_if.admin_up()
1441         p.tun_if.config_ip4()
1442         p.tun_if.generate_remote_hosts(N_NHS)
1443         self.pg0.generate_remote_hosts(N_NHS)
1444         self.pg0.configure_ipv4_neighbors()
1445
1446         # setup some SAs for several next-hops on the interface
1447         self.multi_params = []
1448
1449         for ii in range(1):
1450             p = copy.copy(self.ipv4_params)
1451
1452             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1453             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1454             p.scapy_tun_spi = p.scapy_tun_spi + ii
1455             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1456             p.vpp_tun_spi = p.vpp_tun_spi + ii
1457
1458             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1459             p.scapy_tra_spi = p.scapy_tra_spi + ii
1460             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1461             p.vpp_tra_spi = p.vpp_tra_spi + ii
1462             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1463                                       p.auth_algo_vpp_id, p.auth_key,
1464                                       p.crypt_algo_vpp_id, p.crypt_key,
1465                                       self.vpp_esp_protocol)
1466             p.tun_sa_out.add_vpp_config()
1467
1468             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1469                                      p.auth_algo_vpp_id, p.auth_key,
1470                                      p.crypt_algo_vpp_id, p.crypt_key,
1471                                      self.vpp_esp_protocol)
1472             p.tun_sa_in.add_vpp_config()
1473
1474             p.tun_protect = VppIpsecTunProtect(
1475                 self,
1476                 p.tun_if,
1477                 p.tun_sa_out,
1478                 [p.tun_sa_in],
1479                 nh=p.tun_if.remote_hosts[ii].ip4)
1480             p.tun_protect.add_vpp_config()
1481             config_tra_params(p, self.encryption_type, p.tun_if)
1482             self.multi_params.append(p)
1483
1484             VppIpRoute(self, p.remote_tun_if_host, 32,
1485                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1486                                      p.tun_if.sw_if_index)]).add_vpp_config()
1487
1488             # in this v4 variant add the teibs after the protect
1489             p.teib = VppTeib(self, p.tun_if,
1490                              p.tun_if.remote_hosts[ii].ip4,
1491                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1492             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1493         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1494
1495     def tearDown(self):
1496         p = self.ipv4_params
1497         p.tun_if.unconfig_ip4()
1498         super(TestIpsecMGreIfEspTra4, self).tearDown()
1499
1500     def test_tun_44(self):
1501         """mGRE IPSEC 44"""
1502         N_PKTS = 63
1503         for p in self.multi_params:
1504             self.verify_tun_44(p, count=N_PKTS)
1505             p.teib.remove_vpp_config()
1506             self.verify_tun_dropped_44(p, count=N_PKTS)
1507             p.teib.add_vpp_config()
1508             self.verify_tun_44(p, count=N_PKTS)
1509
1510
1511 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1512     """ Ipsec mGRE ESP v6 TRA tests """
1513     tun6_encrypt_node_name = "esp6-encrypt-tun"
1514     tun6_decrypt_node_name = "esp6-decrypt-tun"
1515     encryption_type = ESP
1516
1517     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1518                           payload_size=100):
1519         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1520                 sa.encrypt(IPv6(src=p.tun_dst,
1521                                 dst=self.pg0.local_ip6) /
1522                            GRE() /
1523                            IPv6(src=self.pg1.local_ip6,
1524                                 dst=self.pg1.remote_ip6) /
1525                            UDP(sport=1144, dport=2233) /
1526                            Raw(b'X' * payload_size))
1527                 for i in range(count)]
1528
1529     def gen_pkts6(self, sw_intf, src, dst, count=1,
1530                   payload_size=100):
1531         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1532                 IPv6(src="1::1", dst=dst) /
1533                 UDP(sport=1144, dport=2233) /
1534                 Raw(b'X' * payload_size)
1535                 for i in range(count)]
1536
1537     def verify_decrypted6(self, p, rxs):
1538         for rx in rxs:
1539             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1540             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1541
1542     def verify_encrypted6(self, p, sa, rxs):
1543         for rx in rxs:
1544             try:
1545                 pkt = sa.decrypt(rx[IPv6])
1546                 if not pkt.haslayer(IPv6):
1547                     pkt = IPv6(pkt[Raw].load)
1548                 self.assert_packet_checksums_valid(pkt)
1549                 self.assertTrue(pkt.haslayer(GRE))
1550                 e = pkt[GRE]
1551                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1552             except (IndexError, AssertionError):
1553                 self.logger.debug(ppp("Unexpected packet:", rx))
1554                 try:
1555                     self.logger.debug(ppp("Decrypted packet:", pkt))
1556                 except:
1557                     pass
1558                 raise
1559
1560     def setUp(self):
1561         super(TestIpsecMGreIfEspTra6, self).setUp()
1562
1563         self.vapi.cli("set logging class ipsec level debug")
1564
1565         N_NHS = 16
1566         self.tun_if = self.pg0
1567         p = self.ipv6_params
1568         p.tun_if = VppGreInterface(self,
1569                                    self.pg0.local_ip6,
1570                                    "::",
1571                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1572                                          TUNNEL_API_MODE_MP))
1573         p.tun_if.add_vpp_config()
1574         p.tun_if.admin_up()
1575         p.tun_if.config_ip6()
1576         p.tun_if.generate_remote_hosts(N_NHS)
1577         self.pg0.generate_remote_hosts(N_NHS)
1578         self.pg0.configure_ipv6_neighbors()
1579
1580         # setup some SAs for several next-hops on the interface
1581         self.multi_params = []
1582
1583         for ii in range(N_NHS):
1584             p = copy.copy(self.ipv6_params)
1585
1586             p.remote_tun_if_host = "1::%d" % (ii + 1)
1587             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1588             p.scapy_tun_spi = p.scapy_tun_spi + ii
1589             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1590             p.vpp_tun_spi = p.vpp_tun_spi + ii
1591
1592             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1593             p.scapy_tra_spi = p.scapy_tra_spi + ii
1594             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1595             p.vpp_tra_spi = p.vpp_tra_spi + ii
1596             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1597                                       p.auth_algo_vpp_id, p.auth_key,
1598                                       p.crypt_algo_vpp_id, p.crypt_key,
1599                                       self.vpp_esp_protocol)
1600             p.tun_sa_out.add_vpp_config()
1601
1602             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1603                                      p.auth_algo_vpp_id, p.auth_key,
1604                                      p.crypt_algo_vpp_id, p.crypt_key,
1605                                      self.vpp_esp_protocol)
1606             p.tun_sa_in.add_vpp_config()
1607
1608             # in this v6 variant add the teibs first then the protection
1609             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1610             VppTeib(self, p.tun_if,
1611                     p.tun_if.remote_hosts[ii].ip6,
1612                     p.tun_dst).add_vpp_config()
1613
1614             p.tun_protect = VppIpsecTunProtect(
1615                 self,
1616                 p.tun_if,
1617                 p.tun_sa_out,
1618                 [p.tun_sa_in],
1619                 nh=p.tun_if.remote_hosts[ii].ip6)
1620             p.tun_protect.add_vpp_config()
1621             config_tra_params(p, self.encryption_type, p.tun_if)
1622             self.multi_params.append(p)
1623
1624             VppIpRoute(self, p.remote_tun_if_host, 128,
1625                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1626                                      p.tun_if.sw_if_index)]).add_vpp_config()
1627             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1628
1629         self.logger.info(self.vapi.cli("sh log"))
1630         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1631         self.logger.info(self.vapi.cli("sh adj 41"))
1632
1633     def tearDown(self):
1634         p = self.ipv6_params
1635         p.tun_if.unconfig_ip6()
1636         super(TestIpsecMGreIfEspTra6, self).tearDown()
1637
1638     def test_tun_66(self):
1639         """mGRE IPSec 66"""
1640         for p in self.multi_params:
1641             self.verify_tun_66(p, count=63)
1642
1643
1644 class TemplateIpsec4TunProtect(object):
1645     """ IPsec IPv4 Tunnel protect """
1646
1647     encryption_type = ESP
1648     tun4_encrypt_node_name = "esp4-encrypt-tun"
1649     tun4_decrypt_node_name = "esp4-decrypt-tun"
1650     tun4_input_node = "ipsec4-tun-input"
1651
1652     def config_sa_tra(self, p):
1653         config_tun_params(p, self.encryption_type, p.tun_if)
1654
1655         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1656                                   p.auth_algo_vpp_id, p.auth_key,
1657                                   p.crypt_algo_vpp_id, p.crypt_key,
1658                                   self.vpp_esp_protocol,
1659                                   flags=p.flags)
1660         p.tun_sa_out.add_vpp_config()
1661
1662         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1663                                  p.auth_algo_vpp_id, p.auth_key,
1664                                  p.crypt_algo_vpp_id, p.crypt_key,
1665                                  self.vpp_esp_protocol,
1666                                  flags=p.flags)
1667         p.tun_sa_in.add_vpp_config()
1668
1669     def config_sa_tun(self, p):
1670         config_tun_params(p, self.encryption_type, p.tun_if)
1671
1672         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1673                                   p.auth_algo_vpp_id, p.auth_key,
1674                                   p.crypt_algo_vpp_id, p.crypt_key,
1675                                   self.vpp_esp_protocol,
1676                                   self.tun_if.local_addr[p.addr_type],
1677                                   self.tun_if.remote_addr[p.addr_type],
1678                                   flags=p.flags)
1679         p.tun_sa_out.add_vpp_config()
1680
1681         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1682                                  p.auth_algo_vpp_id, p.auth_key,
1683                                  p.crypt_algo_vpp_id, p.crypt_key,
1684                                  self.vpp_esp_protocol,
1685                                  self.tun_if.remote_addr[p.addr_type],
1686                                  self.tun_if.local_addr[p.addr_type],
1687                                  flags=p.flags)
1688         p.tun_sa_in.add_vpp_config()
1689
1690     def config_protect(self, p):
1691         p.tun_protect = VppIpsecTunProtect(self,
1692                                            p.tun_if,
1693                                            p.tun_sa_out,
1694                                            [p.tun_sa_in])
1695         p.tun_protect.add_vpp_config()
1696
1697     def config_network(self, p):
1698         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1699                                        self.pg0.local_ip4,
1700                                        self.pg0.remote_ip4)
1701         p.tun_if.add_vpp_config()
1702         p.tun_if.admin_up()
1703         p.tun_if.config_ip4()
1704         p.tun_if.config_ip6()
1705
1706         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1707                              [VppRoutePath(p.tun_if.remote_ip4,
1708                                            0xffffffff)])
1709         p.route.add_vpp_config()
1710         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1711                        [VppRoutePath(p.tun_if.remote_ip6,
1712                                      0xffffffff,
1713                                      proto=DpoProto.DPO_PROTO_IP6)])
1714         r.add_vpp_config()
1715
1716     def unconfig_network(self, p):
1717         p.route.remove_vpp_config()
1718         p.tun_if.remove_vpp_config()
1719
1720     def unconfig_protect(self, p):
1721         p.tun_protect.remove_vpp_config()
1722
1723     def unconfig_sa(self, p):
1724         p.tun_sa_out.remove_vpp_config()
1725         p.tun_sa_in.remove_vpp_config()
1726
1727
1728 class TestIpsec4TunProtect(TemplateIpsec,
1729                            TemplateIpsec4TunProtect,
1730                            IpsecTun4):
1731     """ IPsec IPv4 Tunnel protect - transport mode"""
1732
1733     def setUp(self):
1734         super(TestIpsec4TunProtect, self).setUp()
1735
1736         self.tun_if = self.pg0
1737
1738     def tearDown(self):
1739         super(TestIpsec4TunProtect, self).tearDown()
1740
1741     def test_tun_44(self):
1742         """IPSEC tunnel protect"""
1743
1744         p = self.ipv4_params
1745
1746         self.config_network(p)
1747         self.config_sa_tra(p)
1748         self.config_protect(p)
1749
1750         self.verify_tun_44(p, count=127)
1751         c = p.tun_if.get_rx_stats()
1752         self.assertEqual(c['packets'], 127)
1753         c = p.tun_if.get_tx_stats()
1754         self.assertEqual(c['packets'], 127)
1755
1756         self.vapi.cli("clear ipsec sa")
1757         self.verify_tun_64(p, count=127)
1758         c = p.tun_if.get_rx_stats()
1759         self.assertEqual(c['packets'], 254)
1760         c = p.tun_if.get_tx_stats()
1761         self.assertEqual(c['packets'], 254)
1762
1763         # rekey - create new SAs and update the tunnel protection
1764         np = copy.copy(p)
1765         np.crypt_key = b'X' + p.crypt_key[1:]
1766         np.scapy_tun_spi += 100
1767         np.scapy_tun_sa_id += 1
1768         np.vpp_tun_spi += 100
1769         np.vpp_tun_sa_id += 1
1770         np.tun_if.local_spi = p.vpp_tun_spi
1771         np.tun_if.remote_spi = p.scapy_tun_spi
1772
1773         self.config_sa_tra(np)
1774         self.config_protect(np)
1775         self.unconfig_sa(p)
1776
1777         self.verify_tun_44(np, count=127)
1778         c = p.tun_if.get_rx_stats()
1779         self.assertEqual(c['packets'], 381)
1780         c = p.tun_if.get_tx_stats()
1781         self.assertEqual(c['packets'], 381)
1782
1783         # teardown
1784         self.unconfig_protect(np)
1785         self.unconfig_sa(np)
1786         self.unconfig_network(p)
1787
1788
1789 class TestIpsec4TunProtectUdp(TemplateIpsec,
1790                               TemplateIpsec4TunProtect,
1791                               IpsecTun4):
1792     """ IPsec IPv4 Tunnel protect - transport mode"""
1793
1794     def setUp(self):
1795         super(TestIpsec4TunProtectUdp, self).setUp()
1796
1797         self.tun_if = self.pg0
1798
1799         p = self.ipv4_params
1800         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1801                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1802         p.nat_header = UDP(sport=5454, dport=4500)
1803         self.config_network(p)
1804         self.config_sa_tra(p)
1805         self.config_protect(p)
1806
1807     def tearDown(self):
1808         p = self.ipv4_params
1809         self.unconfig_protect(p)
1810         self.unconfig_sa(p)
1811         self.unconfig_network(p)
1812         super(TestIpsec4TunProtectUdp, self).tearDown()
1813
1814     def test_tun_44(self):
1815         """IPSEC UDP tunnel protect"""
1816
1817         p = self.ipv4_params
1818
1819         self.verify_tun_44(p, count=127)
1820         c = p.tun_if.get_rx_stats()
1821         self.assertEqual(c['packets'], 127)
1822         c = p.tun_if.get_tx_stats()
1823         self.assertEqual(c['packets'], 127)
1824
1825     def test_keepalive(self):
1826         """ IPSEC NAT Keepalive """
1827         self.verify_keepalive(self.ipv4_params)
1828
1829
1830 class TestIpsec4TunProtectTun(TemplateIpsec,
1831                               TemplateIpsec4TunProtect,
1832                               IpsecTun4):
1833     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1834
1835     encryption_type = ESP
1836     tun4_encrypt_node_name = "esp4-encrypt-tun"
1837     tun4_decrypt_node_name = "esp4-decrypt-tun"
1838
1839     def setUp(self):
1840         super(TestIpsec4TunProtectTun, self).setUp()
1841
1842         self.tun_if = self.pg0
1843
1844     def tearDown(self):
1845         super(TestIpsec4TunProtectTun, self).tearDown()
1846
1847     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1848                          payload_size=100):
1849         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1850                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1851                               dst=sw_intf.local_ip4) /
1852                            IP(src=src, dst=dst) /
1853                            UDP(sport=1144, dport=2233) /
1854                            Raw(b'X' * payload_size))
1855                 for i in range(count)]
1856
1857     def gen_pkts(self, sw_intf, src, dst, count=1,
1858                  payload_size=100):
1859         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1860                 IP(src=src, dst=dst) /
1861                 UDP(sport=1144, dport=2233) /
1862                 Raw(b'X' * payload_size)
1863                 for i in range(count)]
1864
1865     def verify_decrypted(self, p, rxs):
1866         for rx in rxs:
1867             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1868             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1869             self.assert_packet_checksums_valid(rx)
1870
1871     def verify_encrypted(self, p, sa, rxs):
1872         for rx in rxs:
1873             try:
1874                 pkt = sa.decrypt(rx[IP])
1875                 if not pkt.haslayer(IP):
1876                     pkt = IP(pkt[Raw].load)
1877                 self.assert_packet_checksums_valid(pkt)
1878                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1879                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1880                 inner = pkt[IP].payload
1881                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1882
1883             except (IndexError, AssertionError):
1884                 self.logger.debug(ppp("Unexpected packet:", rx))
1885                 try:
1886                     self.logger.debug(ppp("Decrypted packet:", pkt))
1887                 except:
1888                     pass
1889                 raise
1890
1891     def test_tun_44(self):
1892         """IPSEC tunnel protect """
1893
1894         p = self.ipv4_params
1895
1896         self.config_network(p)
1897         self.config_sa_tun(p)
1898         self.config_protect(p)
1899
1900         self.verify_tun_44(p, count=127)
1901
1902         c = p.tun_if.get_rx_stats()
1903         self.assertEqual(c['packets'], 127)
1904         c = p.tun_if.get_tx_stats()
1905         self.assertEqual(c['packets'], 127)
1906
1907         # rekey - create new SAs and update the tunnel protection
1908         np = copy.copy(p)
1909         np.crypt_key = b'X' + p.crypt_key[1:]
1910         np.scapy_tun_spi += 100
1911         np.scapy_tun_sa_id += 1
1912         np.vpp_tun_spi += 100
1913         np.vpp_tun_sa_id += 1
1914         np.tun_if.local_spi = p.vpp_tun_spi
1915         np.tun_if.remote_spi = p.scapy_tun_spi
1916
1917         self.config_sa_tun(np)
1918         self.config_protect(np)
1919         self.unconfig_sa(p)
1920
1921         self.verify_tun_44(np, count=127)
1922         c = p.tun_if.get_rx_stats()
1923         self.assertEqual(c['packets'], 254)
1924         c = p.tun_if.get_tx_stats()
1925         self.assertEqual(c['packets'], 254)
1926
1927         # teardown
1928         self.unconfig_protect(np)
1929         self.unconfig_sa(np)
1930         self.unconfig_network(p)
1931
1932
1933 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1934                                   TemplateIpsec4TunProtect,
1935                                   IpsecTun4):
1936     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1937
1938     encryption_type = ESP
1939     tun4_encrypt_node_name = "esp4-encrypt-tun"
1940     tun4_decrypt_node_name = "esp4-decrypt-tun"
1941
1942     def setUp(self):
1943         super(TestIpsec4TunProtectTunDrop, self).setUp()
1944
1945         self.tun_if = self.pg0
1946
1947     def tearDown(self):
1948         super(TestIpsec4TunProtectTunDrop, self).tearDown()
1949
1950     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1951                          payload_size=100):
1952         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1953                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1954                               dst="5.5.5.5") /
1955                            IP(src=src, dst=dst) /
1956                            UDP(sport=1144, dport=2233) /
1957                            Raw(b'X' * payload_size))
1958                 for i in range(count)]
1959
1960     def test_tun_drop_44(self):
1961         """IPSEC tunnel protect bogus tunnel header """
1962
1963         p = self.ipv4_params
1964
1965         self.config_network(p)
1966         self.config_sa_tun(p)
1967         self.config_protect(p)
1968
1969         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1970                                    src=p.remote_tun_if_host,
1971                                    dst=self.pg1.remote_ip4,
1972                                    count=63)
1973         self.send_and_assert_no_replies(self.tun_if, tx)
1974
1975         # teardown
1976         self.unconfig_protect(p)
1977         self.unconfig_sa(p)
1978         self.unconfig_network(p)
1979
1980
1981 class TemplateIpsec6TunProtect(object):
1982     """ IPsec IPv6 Tunnel protect """
1983
1984     def config_sa_tra(self, p):
1985         config_tun_params(p, self.encryption_type, p.tun_if)
1986
1987         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1988                                   p.auth_algo_vpp_id, p.auth_key,
1989                                   p.crypt_algo_vpp_id, p.crypt_key,
1990                                   self.vpp_esp_protocol)
1991         p.tun_sa_out.add_vpp_config()
1992
1993         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1994                                  p.auth_algo_vpp_id, p.auth_key,
1995                                  p.crypt_algo_vpp_id, p.crypt_key,
1996                                  self.vpp_esp_protocol)
1997         p.tun_sa_in.add_vpp_config()
1998
1999     def config_sa_tun(self, p):
2000         config_tun_params(p, self.encryption_type, p.tun_if)
2001
2002         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2003                                   p.auth_algo_vpp_id, p.auth_key,
2004                                   p.crypt_algo_vpp_id, p.crypt_key,
2005                                   self.vpp_esp_protocol,
2006                                   self.tun_if.local_addr[p.addr_type],
2007                                   self.tun_if.remote_addr[p.addr_type])
2008         p.tun_sa_out.add_vpp_config()
2009
2010         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2011                                  p.auth_algo_vpp_id, p.auth_key,
2012                                  p.crypt_algo_vpp_id, p.crypt_key,
2013                                  self.vpp_esp_protocol,
2014                                  self.tun_if.remote_addr[p.addr_type],
2015                                  self.tun_if.local_addr[p.addr_type])
2016         p.tun_sa_in.add_vpp_config()
2017
2018     def config_protect(self, p):
2019         p.tun_protect = VppIpsecTunProtect(self,
2020                                            p.tun_if,
2021                                            p.tun_sa_out,
2022                                            [p.tun_sa_in])
2023         p.tun_protect.add_vpp_config()
2024
2025     def config_network(self, p):
2026         p.tun_if = VppIpIpTunInterface(self, self.pg0,
2027                                        self.pg0.local_ip6,
2028                                        self.pg0.remote_ip6)
2029         p.tun_if.add_vpp_config()
2030         p.tun_if.admin_up()
2031         p.tun_if.config_ip6()
2032         p.tun_if.config_ip4()
2033
2034         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2035                              [VppRoutePath(p.tun_if.remote_ip6,
2036                                            0xffffffff,
2037                                            proto=DpoProto.DPO_PROTO_IP6)])
2038         p.route.add_vpp_config()
2039         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2040                        [VppRoutePath(p.tun_if.remote_ip4,
2041                                      0xffffffff)])
2042         r.add_vpp_config()
2043
2044     def unconfig_network(self, p):
2045         p.route.remove_vpp_config()
2046         p.tun_if.remove_vpp_config()
2047
2048     def unconfig_protect(self, p):
2049         p.tun_protect.remove_vpp_config()
2050
2051     def unconfig_sa(self, p):
2052         p.tun_sa_out.remove_vpp_config()
2053         p.tun_sa_in.remove_vpp_config()
2054
2055
2056 class TestIpsec6TunProtect(TemplateIpsec,
2057                            TemplateIpsec6TunProtect,
2058                            IpsecTun6):
2059     """ IPsec IPv6 Tunnel protect - transport mode"""
2060
2061     encryption_type = ESP
2062     tun6_encrypt_node_name = "esp6-encrypt-tun"
2063     tun6_decrypt_node_name = "esp6-decrypt-tun"
2064
2065     def setUp(self):
2066         super(TestIpsec6TunProtect, self).setUp()
2067
2068         self.tun_if = self.pg0
2069
2070     def tearDown(self):
2071         super(TestIpsec6TunProtect, self).tearDown()
2072
2073     def test_tun_66(self):
2074         """IPSEC tunnel protect 6o6"""
2075
2076         p = self.ipv6_params
2077
2078         self.config_network(p)
2079         self.config_sa_tra(p)
2080         self.config_protect(p)
2081
2082         self.verify_tun_66(p, count=127)
2083         c = p.tun_if.get_rx_stats()
2084         self.assertEqual(c['packets'], 127)
2085         c = p.tun_if.get_tx_stats()
2086         self.assertEqual(c['packets'], 127)
2087
2088         # rekey - create new SAs and update the tunnel protection
2089         np = copy.copy(p)
2090         np.crypt_key = b'X' + p.crypt_key[1:]
2091         np.scapy_tun_spi += 100
2092         np.scapy_tun_sa_id += 1
2093         np.vpp_tun_spi += 100
2094         np.vpp_tun_sa_id += 1
2095         np.tun_if.local_spi = p.vpp_tun_spi
2096         np.tun_if.remote_spi = p.scapy_tun_spi
2097
2098         self.config_sa_tra(np)
2099         self.config_protect(np)
2100         self.unconfig_sa(p)
2101
2102         self.verify_tun_66(np, count=127)
2103         c = p.tun_if.get_rx_stats()
2104         self.assertEqual(c['packets'], 254)
2105         c = p.tun_if.get_tx_stats()
2106         self.assertEqual(c['packets'], 254)
2107
2108         # bounce the interface state
2109         p.tun_if.admin_down()
2110         self.verify_drop_tun_66(np, count=127)
2111         node = ('/err/ipsec6-tun-input/%s' %
2112                 'ipsec packets received on disabled interface')
2113         self.assertEqual(127, self.statistics.get_err_counter(node))
2114         p.tun_if.admin_up()
2115         self.verify_tun_66(np, count=127)
2116
2117         # 3 phase rekey
2118         #  1) add two input SAs [old, new]
2119         #  2) swap output SA to [new]
2120         #  3) use only [new] input SA
2121         np3 = copy.copy(np)
2122         np3.crypt_key = b'Z' + p.crypt_key[1:]
2123         np3.scapy_tun_spi += 100
2124         np3.scapy_tun_sa_id += 1
2125         np3.vpp_tun_spi += 100
2126         np3.vpp_tun_sa_id += 1
2127         np3.tun_if.local_spi = p.vpp_tun_spi
2128         np3.tun_if.remote_spi = p.scapy_tun_spi
2129
2130         self.config_sa_tra(np3)
2131
2132         # step 1;
2133         p.tun_protect.update_vpp_config(np.tun_sa_out,
2134                                         [np.tun_sa_in, np3.tun_sa_in])
2135         self.verify_tun_66(np, np, count=127)
2136         self.verify_tun_66(np3, np, count=127)
2137
2138         # step 2;
2139         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2140                                         [np.tun_sa_in, np3.tun_sa_in])
2141         self.verify_tun_66(np, np3, count=127)
2142         self.verify_tun_66(np3, np3, count=127)
2143
2144         # step 1;
2145         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2146                                         [np3.tun_sa_in])
2147         self.verify_tun_66(np3, np3, count=127)
2148         self.verify_drop_tun_66(np, count=127)
2149
2150         c = p.tun_if.get_rx_stats()
2151         self.assertEqual(c['packets'], 127*9)
2152         c = p.tun_if.get_tx_stats()
2153         self.assertEqual(c['packets'], 127*8)
2154         self.unconfig_sa(np)
2155
2156         # teardown
2157         self.unconfig_protect(np3)
2158         self.unconfig_sa(np3)
2159         self.unconfig_network(p)
2160
2161     def test_tun_46(self):
2162         """IPSEC tunnel protect 4o6"""
2163
2164         p = self.ipv6_params
2165
2166         self.config_network(p)
2167         self.config_sa_tra(p)
2168         self.config_protect(p)
2169
2170         self.verify_tun_46(p, count=127)
2171         c = p.tun_if.get_rx_stats()
2172         self.assertEqual(c['packets'], 127)
2173         c = p.tun_if.get_tx_stats()
2174         self.assertEqual(c['packets'], 127)
2175
2176         # teardown
2177         self.unconfig_protect(p)
2178         self.unconfig_sa(p)
2179         self.unconfig_network(p)
2180
2181
2182 class TestIpsec6TunProtectTun(TemplateIpsec,
2183                               TemplateIpsec6TunProtect,
2184                               IpsecTun6):
2185     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2186
2187     encryption_type = ESP
2188     tun6_encrypt_node_name = "esp6-encrypt-tun"
2189     tun6_decrypt_node_name = "esp6-decrypt-tun"
2190
2191     def setUp(self):
2192         super(TestIpsec6TunProtectTun, self).setUp()
2193
2194         self.tun_if = self.pg0
2195
2196     def tearDown(self):
2197         super(TestIpsec6TunProtectTun, self).tearDown()
2198
2199     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2200                           payload_size=100):
2201         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2202                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2203                                 dst=sw_intf.local_ip6) /
2204                            IPv6(src=src, dst=dst) /
2205                            UDP(sport=1166, dport=2233) /
2206                            Raw(b'X' * payload_size))
2207                 for i in range(count)]
2208
2209     def gen_pkts6(self, sw_intf, src, dst, count=1,
2210                   payload_size=100):
2211         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2212                 IPv6(src=src, dst=dst) /
2213                 UDP(sport=1166, dport=2233) /
2214                 Raw(b'X' * payload_size)
2215                 for i in range(count)]
2216
2217     def verify_decrypted6(self, p, rxs):
2218         for rx in rxs:
2219             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2220             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2221             self.assert_packet_checksums_valid(rx)
2222
2223     def verify_encrypted6(self, p, sa, rxs):
2224         for rx in rxs:
2225             try:
2226                 pkt = sa.decrypt(rx[IPv6])
2227                 if not pkt.haslayer(IPv6):
2228                     pkt = IPv6(pkt[Raw].load)
2229                 self.assert_packet_checksums_valid(pkt)
2230                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2231                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2232                 inner = pkt[IPv6].payload
2233                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2234
2235             except (IndexError, AssertionError):
2236                 self.logger.debug(ppp("Unexpected packet:", rx))
2237                 try:
2238                     self.logger.debug(ppp("Decrypted packet:", pkt))
2239                 except:
2240                     pass
2241                 raise
2242
2243     def test_tun_66(self):
2244         """IPSEC tunnel protect """
2245
2246         p = self.ipv6_params
2247
2248         self.config_network(p)
2249         self.config_sa_tun(p)
2250         self.config_protect(p)
2251
2252         self.verify_tun_66(p, count=127)
2253
2254         c = p.tun_if.get_rx_stats()
2255         self.assertEqual(c['packets'], 127)
2256         c = p.tun_if.get_tx_stats()
2257         self.assertEqual(c['packets'], 127)
2258
2259         # rekey - create new SAs and update the tunnel protection
2260         np = copy.copy(p)
2261         np.crypt_key = b'X' + p.crypt_key[1:]
2262         np.scapy_tun_spi += 100
2263         np.scapy_tun_sa_id += 1
2264         np.vpp_tun_spi += 100
2265         np.vpp_tun_sa_id += 1
2266         np.tun_if.local_spi = p.vpp_tun_spi
2267         np.tun_if.remote_spi = p.scapy_tun_spi
2268
2269         self.config_sa_tun(np)
2270         self.config_protect(np)
2271         self.unconfig_sa(p)
2272
2273         self.verify_tun_66(np, count=127)
2274         c = p.tun_if.get_rx_stats()
2275         self.assertEqual(c['packets'], 254)
2276         c = p.tun_if.get_tx_stats()
2277         self.assertEqual(c['packets'], 254)
2278
2279         # teardown
2280         self.unconfig_protect(np)
2281         self.unconfig_sa(np)
2282         self.unconfig_network(p)
2283
2284
2285 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2286                                   TemplateIpsec6TunProtect,
2287                                   IpsecTun6):
2288     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2289
2290     encryption_type = ESP
2291     tun6_encrypt_node_name = "esp6-encrypt-tun"
2292     tun6_decrypt_node_name = "esp6-decrypt-tun"
2293
2294     def setUp(self):
2295         super(TestIpsec6TunProtectTunDrop, self).setUp()
2296
2297         self.tun_if = self.pg0
2298
2299     def tearDown(self):
2300         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2301
2302     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2303                           payload_size=100):
2304         # the IP destination of the revelaed packet does not match
2305         # that assigned to the tunnel
2306         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2307                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2308                                 dst="5::5") /
2309                            IPv6(src=src, dst=dst) /
2310                            UDP(sport=1144, dport=2233) /
2311                            Raw(b'X' * payload_size))
2312                 for i in range(count)]
2313
2314     def test_tun_drop_66(self):
2315         """IPSEC 6 tunnel protect bogus tunnel header """
2316
2317         p = self.ipv6_params
2318
2319         self.config_network(p)
2320         self.config_sa_tun(p)
2321         self.config_protect(p)
2322
2323         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2324                                     src=p.remote_tun_if_host,
2325                                     dst=self.pg1.remote_ip6,
2326                                     count=63)
2327         self.send_and_assert_no_replies(self.tun_if, tx)
2328
2329         self.unconfig_protect(p)
2330         self.unconfig_sa(p)
2331         self.unconfig_network(p)
2332
2333
2334 if __name__ == '__main__':
2335     unittest.main(testRunner=VppTestRunner)