ip: Replace Sematics for Interface IP addresses
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
22 from util import ppp
23 from vpp_papi import VppEnum
24
25
26 def config_tun_params(p, encryption_type, tun_if):
27     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29                              IPSEC_API_SAD_FLAG_USE_ESN))
30     crypt_key = mk_scapy_crypt_key(p)
31     p.tun_dst = tun_if.remote_ip
32     p.tun_src = tun_if.local_ip
33     p.scapy_tun_sa = SecurityAssociation(
34         encryption_type, spi=p.vpp_tun_spi,
35         crypt_algo=p.crypt_algo,
36         crypt_key=crypt_key,
37         auth_algo=p.auth_algo, auth_key=p.auth_key,
38         tunnel_header=ip_class_by_addr_type[p.addr_type](
39             src=p.tun_dst,
40             dst=p.tun_src),
41         nat_t_header=p.nat_header,
42         esn_en=esn_en)
43     p.vpp_tun_sa = SecurityAssociation(
44         encryption_type, spi=p.scapy_tun_spi,
45         crypt_algo=p.crypt_algo,
46         crypt_key=crypt_key,
47         auth_algo=p.auth_algo, auth_key=p.auth_key,
48         tunnel_header=ip_class_by_addr_type[p.addr_type](
49             dst=p.tun_dst,
50             src=p.tun_src),
51         nat_t_header=p.nat_header,
52         esn_en=esn_en)
53
54
55 def config_tra_params(p, encryption_type, tun_if):
56     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58                              IPSEC_API_SAD_FLAG_USE_ESN))
59     crypt_key = mk_scapy_crypt_key(p)
60     p.tun_dst = tun_if.remote_ip
61     p.tun_src = tun_if.local_ip
62     p.scapy_tun_sa = SecurityAssociation(
63         encryption_type, spi=p.vpp_tun_spi,
64         crypt_algo=p.crypt_algo,
65         crypt_key=crypt_key,
66         auth_algo=p.auth_algo, auth_key=p.auth_key,
67         esn_en=esn_en)
68     p.vpp_tun_sa = SecurityAssociation(
69         encryption_type, spi=p.scapy_tun_spi,
70         crypt_algo=p.crypt_algo,
71         crypt_key=crypt_key,
72         auth_algo=p.auth_algo, auth_key=p.auth_key,
73         esn_en=esn_en)
74
75
76 class TemplateIpsec4TunIfEsp(TemplateIpsec):
77     """ IPsec tunnel interface tests """
78
79     encryption_type = ESP
80
81     @classmethod
82     def setUpClass(cls):
83         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
84
85     @classmethod
86     def tearDownClass(cls):
87         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
88
89     def setUp(self):
90         super(TemplateIpsec4TunIfEsp, self).setUp()
91
92         self.tun_if = self.pg0
93
94         p = self.ipv4_params
95
96         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
98                                         p.crypt_key, p.crypt_key,
99                                         p.auth_algo_vpp_id, p.auth_key,
100                                         p.auth_key)
101         p.tun_if.add_vpp_config()
102         p.tun_if.admin_up()
103         p.tun_if.config_ip4()
104         p.tun_if.config_ip6()
105         config_tun_params(p, self.encryption_type, p.tun_if)
106
107         r = VppIpRoute(self, p.remote_tun_if_host, 32,
108                        [VppRoutePath(p.tun_if.remote_ip4,
109                                      0xffffffff)])
110         r.add_vpp_config()
111         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
112                        [VppRoutePath(p.tun_if.remote_ip6,
113                                      0xffffffff,
114                                      proto=DpoProto.DPO_PROTO_IP6)])
115         r.add_vpp_config()
116
117     def tearDown(self):
118         super(TemplateIpsec4TunIfEsp, self).tearDown()
119
120
121 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
122     """ IPsec UDP tunnel interface tests """
123
124     tun4_encrypt_node_name = "esp4-encrypt-tun"
125     tun4_decrypt_node_name = "esp4-decrypt-tun"
126     encryption_type = ESP
127
128     @classmethod
129     def setUpClass(cls):
130         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
131
132     @classmethod
133     def tearDownClass(cls):
134         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
135
136     def verify_encrypted(self, p, sa, rxs):
137         for rx in rxs:
138             try:
139                 # ensure the UDP ports are correct before we decrypt
140                 # which strips them
141                 self.assertTrue(rx.haslayer(UDP))
142                 self.assert_equal(rx[UDP].sport, 4500)
143                 self.assert_equal(rx[UDP].dport, 4500)
144
145                 pkt = sa.decrypt(rx[IP])
146                 if not pkt.haslayer(IP):
147                     pkt = IP(pkt[Raw].load)
148
149                 self.assert_packet_checksums_valid(pkt)
150                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
151                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
152             except (IndexError, AssertionError):
153                 self.logger.debug(ppp("Unexpected packet:", rx))
154                 try:
155                     self.logger.debug(ppp("Decrypted packet:", pkt))
156                 except:
157                     pass
158                 raise
159
160     def setUp(self):
161         super(TemplateIpsec4TunIfEspUdp, self).setUp()
162
163         p = self.ipv4_params
164         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
165                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
166         p.nat_header = UDP(sport=5454, dport=4500)
167
168     def config_network(self):
169
170         self.tun_if = self.pg0
171         p = self.ipv4_params
172         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
174                                         p.crypt_key, p.crypt_key,
175                                         p.auth_algo_vpp_id, p.auth_key,
176                                         p.auth_key, udp_encap=True)
177         p.tun_if.add_vpp_config()
178         p.tun_if.admin_up()
179         p.tun_if.config_ip4()
180         p.tun_if.config_ip6()
181         config_tun_params(p, self.encryption_type, p.tun_if)
182
183         r = VppIpRoute(self, p.remote_tun_if_host, 32,
184                        [VppRoutePath(p.tun_if.remote_ip4,
185                                      0xffffffff)])
186         r.add_vpp_config()
187         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
188                        [VppRoutePath(p.tun_if.remote_ip6,
189                                      0xffffffff,
190                                      proto=DpoProto.DPO_PROTO_IP6)])
191         r.add_vpp_config()
192
193     def tearDown(self):
194         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
195
196
197 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
198     """ Ipsec ESP - TUN tests """
199     tun4_encrypt_node_name = "esp4-encrypt-tun"
200     tun4_decrypt_node_name = "esp4-decrypt-tun"
201
202     def test_tun_basic64(self):
203         """ ipsec 6o4 tunnel basic test """
204         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
205
206         self.verify_tun_64(self.params[socket.AF_INET], count=1)
207
208     def test_tun_burst64(self):
209         """ ipsec 6o4 tunnel basic test """
210         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
211
212         self.verify_tun_64(self.params[socket.AF_INET], count=257)
213
214     def test_tun_basic_frag44(self):
215         """ ipsec 4o4 tunnel frag basic test """
216         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
217
218         p = self.ipv4_params
219
220         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
221                                        [1500, 0, 0, 0])
222         self.verify_tun_44(self.params[socket.AF_INET],
223                            count=1, payload_size=1800, n_rx=2)
224         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
225                                        [9000, 0, 0, 0])
226
227
228 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
229     """ Ipsec ESP UDP tests """
230
231     tun4_input_node = "ipsec4-tun-input"
232
233     def setUp(self):
234         super(TemplateIpsec4TunIfEspUdp, self).setUp()
235         self.config_network()
236
237     def test_keepalive(self):
238         """ IPSEC NAT Keepalive """
239         self.verify_keepalive(self.ipv4_params)
240
241
242 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
243     """ Ipsec ESP UDP GCM tests """
244
245     tun4_input_node = "ipsec4-tun-input"
246
247     def setUp(self):
248         super(TemplateIpsec4TunIfEspUdp, self).setUp()
249         p = self.ipv4_params
250         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
251                               IPSEC_API_INTEG_ALG_NONE)
252         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
253                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
254         p.crypt_algo = "AES-GCM"
255         p.auth_algo = "NULL"
256         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
257         p.salt = 0
258         self.config_network()
259
260
261 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
262     """ Ipsec ESP - TCP tests """
263     pass
264
265
266 class TemplateIpsec6TunIfEsp(TemplateIpsec):
267     """ IPsec tunnel interface tests """
268
269     encryption_type = ESP
270
271     def setUp(self):
272         super(TemplateIpsec6TunIfEsp, self).setUp()
273
274         self.tun_if = self.pg0
275
276         p = self.ipv6_params
277         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
278                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
279                                         p.crypt_key, p.crypt_key,
280                                         p.auth_algo_vpp_id, p.auth_key,
281                                         p.auth_key, is_ip6=True)
282         p.tun_if.add_vpp_config()
283         p.tun_if.admin_up()
284         p.tun_if.config_ip6()
285         p.tun_if.config_ip4()
286         config_tun_params(p, self.encryption_type, p.tun_if)
287
288         r = VppIpRoute(self, p.remote_tun_if_host, 128,
289                        [VppRoutePath(p.tun_if.remote_ip6,
290                                      0xffffffff,
291                                      proto=DpoProto.DPO_PROTO_IP6)])
292         r.add_vpp_config()
293         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
294                        [VppRoutePath(p.tun_if.remote_ip4,
295                                      0xffffffff)])
296         r.add_vpp_config()
297
298     def tearDown(self):
299         super(TemplateIpsec6TunIfEsp, self).tearDown()
300
301
302 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
303                           IpsecTun6Tests):
304     """ Ipsec ESP - TUN tests """
305     tun6_encrypt_node_name = "esp6-encrypt-tun"
306     tun6_decrypt_node_name = "esp6-decrypt-tun"
307
308     def test_tun_basic46(self):
309         """ ipsec 4o6 tunnel basic test """
310         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
311         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
312
313     def test_tun_burst46(self):
314         """ ipsec 4o6 tunnel burst test """
315         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
316         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
317
318
319 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
320                                 IpsecTun6HandoffTests):
321     """ Ipsec ESP 6 Handoff tests """
322     tun6_encrypt_node_name = "esp6-encrypt-tun"
323     tun6_decrypt_node_name = "esp6-decrypt-tun"
324
325
326 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
327                                 IpsecTun4HandoffTests):
328     """ Ipsec ESP 4 Handoff tests """
329     tun4_encrypt_node_name = "esp4-encrypt-tun"
330     tun4_decrypt_node_name = "esp4-decrypt-tun"
331
332
333 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
334     """ IPsec IPv4 Multi Tunnel interface """
335
336     encryption_type = ESP
337     tun4_encrypt_node_name = "esp4-encrypt-tun"
338     tun4_decrypt_node_name = "esp4-decrypt-tun"
339
340     def setUp(self):
341         super(TestIpsec4MultiTunIfEsp, self).setUp()
342
343         self.tun_if = self.pg0
344
345         self.multi_params = []
346         self.pg0.generate_remote_hosts(10)
347         self.pg0.configure_ipv4_neighbors()
348
349         for ii in range(10):
350             p = copy.copy(self.ipv4_params)
351
352             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
353             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
354             p.scapy_tun_spi = p.scapy_tun_spi + ii
355             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
356             p.vpp_tun_spi = p.vpp_tun_spi + ii
357
358             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
359             p.scapy_tra_spi = p.scapy_tra_spi + ii
360             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
361             p.vpp_tra_spi = p.vpp_tra_spi + ii
362             p.tun_dst = self.pg0.remote_hosts[ii].ip4
363
364             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
365                                             p.scapy_tun_spi,
366                                             p.crypt_algo_vpp_id,
367                                             p.crypt_key, p.crypt_key,
368                                             p.auth_algo_vpp_id, p.auth_key,
369                                             p.auth_key,
370                                             dst=p.tun_dst)
371             p.tun_if.add_vpp_config()
372             p.tun_if.admin_up()
373             p.tun_if.config_ip4()
374             config_tun_params(p, self.encryption_type, p.tun_if)
375             self.multi_params.append(p)
376
377             VppIpRoute(self, p.remote_tun_if_host, 32,
378                        [VppRoutePath(p.tun_if.remote_ip4,
379                                      0xffffffff)]).add_vpp_config()
380
381     def tearDown(self):
382         super(TestIpsec4MultiTunIfEsp, self).tearDown()
383
384     def test_tun_44(self):
385         """Multiple IPSEC tunnel interfaces """
386         for p in self.multi_params:
387             self.verify_tun_44(p, count=127)
388             c = p.tun_if.get_rx_stats()
389             self.assertEqual(c['packets'], 127)
390             c = p.tun_if.get_tx_stats()
391             self.assertEqual(c['packets'], 127)
392
393     def test_tun_rr_44(self):
394         """ Round-robin packets acrros multiple interface """
395         tx = []
396         for p in self.multi_params:
397             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
398                                             src=p.remote_tun_if_host,
399                                             dst=self.pg1.remote_ip4)
400         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
401
402         for rx, p in zip(rxs, self.multi_params):
403             self.verify_decrypted(p, [rx])
404
405         tx = []
406         for p in self.multi_params:
407             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
408                                     dst=p.remote_tun_if_host)
409         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
410
411         for rx, p in zip(rxs, self.multi_params):
412             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
413
414
415 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
416     """ IPsec IPv4 Tunnel interface all Algos """
417
418     encryption_type = ESP
419     tun4_encrypt_node_name = "esp4-encrypt-tun"
420     tun4_decrypt_node_name = "esp4-decrypt-tun"
421
422     def config_network(self, p):
423
424         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
425                                         p.scapy_tun_spi,
426                                         p.crypt_algo_vpp_id,
427                                         p.crypt_key, p.crypt_key,
428                                         p.auth_algo_vpp_id, p.auth_key,
429                                         p.auth_key,
430                                         salt=p.salt)
431         p.tun_if.add_vpp_config()
432         p.tun_if.admin_up()
433         p.tun_if.config_ip4()
434         config_tun_params(p, self.encryption_type, p.tun_if)
435         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
436         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
437
438         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
439                              [VppRoutePath(p.tun_if.remote_ip4,
440                                            0xffffffff)])
441         p.route.add_vpp_config()
442
443     def unconfig_network(self, p):
444         p.tun_if.unconfig_ip4()
445         p.tun_if.remove_vpp_config()
446         p.route.remove_vpp_config()
447
448     def setUp(self):
449         super(TestIpsec4TunIfEspAll, self).setUp()
450
451         self.tun_if = self.pg0
452
453     def tearDown(self):
454         super(TestIpsec4TunIfEspAll, self).tearDown()
455
456     def rekey(self, p):
457         #
458         # change the key and the SPI
459         #
460         p.crypt_key = b'X' + p.crypt_key[1:]
461         p.scapy_tun_spi += 1
462         p.scapy_tun_sa_id += 1
463         p.vpp_tun_spi += 1
464         p.vpp_tun_sa_id += 1
465         p.tun_if.local_spi = p.vpp_tun_spi
466         p.tun_if.remote_spi = p.scapy_tun_spi
467
468         config_tun_params(p, self.encryption_type, p.tun_if)
469
470         p.tun_sa_in = VppIpsecSA(self,
471                                  p.scapy_tun_sa_id,
472                                  p.scapy_tun_spi,
473                                  p.auth_algo_vpp_id,
474                                  p.auth_key,
475                                  p.crypt_algo_vpp_id,
476                                  p.crypt_key,
477                                  self.vpp_esp_protocol,
478                                  flags=p.flags,
479                                  salt=p.salt)
480         p.tun_sa_out = VppIpsecSA(self,
481                                   p.vpp_tun_sa_id,
482                                   p.vpp_tun_spi,
483                                   p.auth_algo_vpp_id,
484                                   p.auth_key,
485                                   p.crypt_algo_vpp_id,
486                                   p.crypt_key,
487                                   self.vpp_esp_protocol,
488                                   flags=p.flags,
489                                   salt=p.salt)
490         p.tun_sa_in.add_vpp_config()
491         p.tun_sa_out.add_vpp_config()
492
493         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
494                                          sa_id=p.tun_sa_in.id,
495                                          is_outbound=1)
496         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
497                                          sa_id=p.tun_sa_out.id,
498                                          is_outbound=0)
499         self.logger.info(self.vapi.cli("sh ipsec sa"))
500
501     def test_tun_44(self):
502         """IPSEC tunnel all algos """
503
504         # foreach VPP crypto engine
505         engines = ["ia32", "ipsecmb", "openssl"]
506
507         # foreach crypto algorithm
508         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
509                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
510                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
511                                 IPSEC_API_INTEG_ALG_NONE),
512                   'scapy-crypto': "AES-GCM",
513                   'scapy-integ': "NULL",
514                   'key': b"JPjyOWBeVEQiMe7h",
515                   'salt': 3333},
516                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
517                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
518                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
519                                 IPSEC_API_INTEG_ALG_NONE),
520                   'scapy-crypto': "AES-GCM",
521                   'scapy-integ': "NULL",
522                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
523                   'salt': 0},
524                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
525                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
526                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
527                                 IPSEC_API_INTEG_ALG_NONE),
528                   'scapy-crypto': "AES-GCM",
529                   'scapy-integ': "NULL",
530                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
531                   'salt': 9999},
532                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
533                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
534                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
535                                 IPSEC_API_INTEG_ALG_SHA1_96),
536                   'scapy-crypto': "AES-CBC",
537                   'scapy-integ': "HMAC-SHA1-96",
538                   'salt': 0,
539                   'key': b"JPjyOWBeVEQiMe7h"},
540                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
541                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
542                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
543                                 IPSEC_API_INTEG_ALG_SHA1_96),
544                   'scapy-crypto': "AES-CBC",
545                   'scapy-integ': "HMAC-SHA1-96",
546                   'salt': 0,
547                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
548                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
549                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
550                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
551                                 IPSEC_API_INTEG_ALG_SHA1_96),
552                   'scapy-crypto': "AES-CBC",
553                   'scapy-integ': "HMAC-SHA1-96",
554                   'salt': 0,
555                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
556                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
557                                  IPSEC_API_CRYPTO_ALG_NONE),
558                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
559                                 IPSEC_API_INTEG_ALG_SHA1_96),
560                   'scapy-crypto': "NULL",
561                   'scapy-integ': "HMAC-SHA1-96",
562                   'salt': 0,
563                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
564
565         for engine in engines:
566             self.vapi.cli("set crypto handler all %s" % engine)
567
568             #
569             # loop through each of the algorithms
570             #
571             for algo in algos:
572                 # with self.subTest(algo=algo['scapy']):
573
574                 p = copy.copy(self.ipv4_params)
575                 p.auth_algo_vpp_id = algo['vpp-integ']
576                 p.crypt_algo_vpp_id = algo['vpp-crypto']
577                 p.crypt_algo = algo['scapy-crypto']
578                 p.auth_algo = algo['scapy-integ']
579                 p.crypt_key = algo['key']
580                 p.salt = algo['salt']
581
582                 self.config_network(p)
583
584                 self.verify_tun_44(p, count=127)
585                 c = p.tun_if.get_rx_stats()
586                 self.assertEqual(c['packets'], 127)
587                 c = p.tun_if.get_tx_stats()
588                 self.assertEqual(c['packets'], 127)
589
590                 #
591                 # rekey the tunnel
592                 #
593                 self.rekey(p)
594                 self.verify_tun_44(p, count=127)
595
596                 self.unconfig_network(p)
597                 p.tun_sa_out.remove_vpp_config()
598                 p.tun_sa_in.remove_vpp_config()
599
600
601 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
602     """ IPsec IPv4 Tunnel interface all Algos """
603
604     encryption_type = ESP
605     tun4_encrypt_node_name = "esp4-encrypt-tun"
606     tun4_decrypt_node_name = "esp4-decrypt-tun"
607
608     def config_network(self, p):
609
610         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
611                               IPSEC_API_INTEG_ALG_NONE)
612         p.auth_algo = 'NULL'
613         p.auth_key = []
614
615         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
616                                IPSEC_API_CRYPTO_ALG_NONE)
617         p.crypt_algo = 'NULL'
618         p.crypt_key = []
619
620         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
621                                         p.scapy_tun_spi,
622                                         p.crypt_algo_vpp_id,
623                                         p.crypt_key, p.crypt_key,
624                                         p.auth_algo_vpp_id, p.auth_key,
625                                         p.auth_key,
626                                         salt=p.salt)
627         p.tun_if.add_vpp_config()
628         p.tun_if.admin_up()
629         p.tun_if.config_ip4()
630         config_tun_params(p, self.encryption_type, p.tun_if)
631         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
632         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
633
634         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
635                              [VppRoutePath(p.tun_if.remote_ip4,
636                                            0xffffffff)])
637         p.route.add_vpp_config()
638
639     def unconfig_network(self, p):
640         p.tun_if.unconfig_ip4()
641         p.tun_if.remove_vpp_config()
642         p.route.remove_vpp_config()
643
644     def setUp(self):
645         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
646
647         self.tun_if = self.pg0
648
649     def tearDown(self):
650         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
651
652     def test_tun_44(self):
653         p = self.ipv4_params
654
655         self.config_network(p)
656
657         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
658                            dst=p.remote_tun_if_host)
659         self.send_and_assert_no_replies(self.pg1, tx)
660
661         self.unconfig_network(p)
662
663
664 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
665     """ IPsec IPv6 Multi Tunnel interface """
666
667     encryption_type = ESP
668     tun6_encrypt_node_name = "esp6-encrypt-tun"
669     tun6_decrypt_node_name = "esp6-decrypt-tun"
670
671     def setUp(self):
672         super(TestIpsec6MultiTunIfEsp, self).setUp()
673
674         self.tun_if = self.pg0
675
676         self.multi_params = []
677         self.pg0.generate_remote_hosts(10)
678         self.pg0.configure_ipv6_neighbors()
679
680         for ii in range(10):
681             p = copy.copy(self.ipv6_params)
682
683             p.remote_tun_if_host = "1111::%d" % (ii + 1)
684             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
685             p.scapy_tun_spi = p.scapy_tun_spi + ii
686             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
687             p.vpp_tun_spi = p.vpp_tun_spi + ii
688
689             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
690             p.scapy_tra_spi = p.scapy_tra_spi + ii
691             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
692             p.vpp_tra_spi = p.vpp_tra_spi + ii
693
694             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
695                                             p.scapy_tun_spi,
696                                             p.crypt_algo_vpp_id,
697                                             p.crypt_key, p.crypt_key,
698                                             p.auth_algo_vpp_id, p.auth_key,
699                                             p.auth_key, is_ip6=True,
700                                             dst=self.pg0.remote_hosts[ii].ip6)
701             p.tun_if.add_vpp_config()
702             p.tun_if.admin_up()
703             p.tun_if.config_ip6()
704             config_tun_params(p, self.encryption_type, p.tun_if)
705             self.multi_params.append(p)
706
707             r = VppIpRoute(self, p.remote_tun_if_host, 128,
708                            [VppRoutePath(p.tun_if.remote_ip6,
709                                          0xffffffff,
710                                          proto=DpoProto.DPO_PROTO_IP6)])
711             r.add_vpp_config()
712
713     def tearDown(self):
714         super(TestIpsec6MultiTunIfEsp, self).tearDown()
715
716     def test_tun_66(self):
717         """Multiple IPSEC tunnel interfaces """
718         for p in self.multi_params:
719             self.verify_tun_66(p, count=127)
720             c = p.tun_if.get_rx_stats()
721             self.assertEqual(c['packets'], 127)
722             c = p.tun_if.get_tx_stats()
723             self.assertEqual(c['packets'], 127)
724
725
726 class TestIpsecGreTebIfEsp(TemplateIpsec,
727                            IpsecTun4Tests):
728     """ Ipsec GRE TEB ESP - TUN tests """
729     tun4_encrypt_node_name = "esp4-encrypt-tun"
730     tun4_decrypt_node_name = "esp4-decrypt-tun"
731     encryption_type = ESP
732     omac = "00:11:22:33:44:55"
733
734     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
735                          payload_size=100):
736         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
737                 sa.encrypt(IP(src=self.pg0.remote_ip4,
738                               dst=self.pg0.local_ip4) /
739                            GRE() /
740                            Ether(dst=self.omac) /
741                            IP(src="1.1.1.1", dst="1.1.1.2") /
742                            UDP(sport=1144, dport=2233) /
743                            Raw(b'X' * payload_size))
744                 for i in range(count)]
745
746     def gen_pkts(self, sw_intf, src, dst, count=1,
747                  payload_size=100):
748         return [Ether(dst=self.omac) /
749                 IP(src="1.1.1.1", dst="1.1.1.2") /
750                 UDP(sport=1144, dport=2233) /
751                 Raw(b'X' * payload_size)
752                 for i in range(count)]
753
754     def verify_decrypted(self, p, rxs):
755         for rx in rxs:
756             self.assert_equal(rx[Ether].dst, self.omac)
757             self.assert_equal(rx[IP].dst, "1.1.1.2")
758
759     def verify_encrypted(self, p, sa, rxs):
760         for rx in rxs:
761             try:
762                 pkt = sa.decrypt(rx[IP])
763                 if not pkt.haslayer(IP):
764                     pkt = IP(pkt[Raw].load)
765                 self.assert_packet_checksums_valid(pkt)
766                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
767                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
768                 self.assertTrue(pkt.haslayer(GRE))
769                 e = pkt[Ether]
770                 self.assertEqual(e[Ether].dst, self.omac)
771                 self.assertEqual(e[IP].dst, "1.1.1.2")
772             except (IndexError, AssertionError):
773                 self.logger.debug(ppp("Unexpected packet:", rx))
774                 try:
775                     self.logger.debug(ppp("Decrypted packet:", pkt))
776                 except:
777                     pass
778                 raise
779
780     def setUp(self):
781         super(TestIpsecGreTebIfEsp, self).setUp()
782
783         self.tun_if = self.pg0
784
785         p = self.ipv4_params
786
787         bd1 = VppBridgeDomain(self, 1)
788         bd1.add_vpp_config()
789
790         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
791                                   p.auth_algo_vpp_id, p.auth_key,
792                                   p.crypt_algo_vpp_id, p.crypt_key,
793                                   self.vpp_esp_protocol,
794                                   self.pg0.local_ip4,
795                                   self.pg0.remote_ip4)
796         p.tun_sa_out.add_vpp_config()
797
798         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
799                                  p.auth_algo_vpp_id, p.auth_key,
800                                  p.crypt_algo_vpp_id, p.crypt_key,
801                                  self.vpp_esp_protocol,
802                                  self.pg0.remote_ip4,
803                                  self.pg0.local_ip4)
804         p.tun_sa_in.add_vpp_config()
805
806         p.tun_if = VppGreInterface(self,
807                                    self.pg0.local_ip4,
808                                    self.pg0.remote_ip4,
809                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
810                                          GRE_API_TUNNEL_TYPE_TEB))
811         p.tun_if.add_vpp_config()
812
813         p.tun_protect = VppIpsecTunProtect(self,
814                                            p.tun_if,
815                                            p.tun_sa_out,
816                                            [p.tun_sa_in])
817
818         p.tun_protect.add_vpp_config()
819
820         p.tun_if.admin_up()
821         p.tun_if.config_ip4()
822         config_tun_params(p, self.encryption_type, p.tun_if)
823
824         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
825         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
826
827         self.vapi.cli("clear ipsec sa")
828         self.vapi.cli("sh adj")
829         self.vapi.cli("sh ipsec tun")
830
831     def tearDown(self):
832         p = self.ipv4_params
833         p.tun_if.unconfig_ip4()
834         super(TestIpsecGreTebIfEsp, self).tearDown()
835
836
837 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
838                                IpsecTun4Tests):
839     """ Ipsec GRE TEB ESP - TUN tests """
840     tun4_encrypt_node_name = "esp4-encrypt-tun"
841     tun4_decrypt_node_name = "esp4-decrypt-tun"
842     encryption_type = ESP
843     omac = "00:11:22:33:44:55"
844
845     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
846                          payload_size=100):
847         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
848                 sa.encrypt(IP(src=self.pg0.remote_ip4,
849                               dst=self.pg0.local_ip4) /
850                            GRE() /
851                            Ether(dst=self.omac) /
852                            IP(src="1.1.1.1", dst="1.1.1.2") /
853                            UDP(sport=1144, dport=2233) /
854                            Raw(b'X' * payload_size))
855                 for i in range(count)]
856
857     def gen_pkts(self, sw_intf, src, dst, count=1,
858                  payload_size=100):
859         return [Ether(dst=self.omac) /
860                 Dot1Q(vlan=11) /
861                 IP(src="1.1.1.1", dst="1.1.1.2") /
862                 UDP(sport=1144, dport=2233) /
863                 Raw(b'X' * payload_size)
864                 for i in range(count)]
865
866     def verify_decrypted(self, p, rxs):
867         for rx in rxs:
868             self.assert_equal(rx[Ether].dst, self.omac)
869             self.assert_equal(rx[Dot1Q].vlan, 11)
870             self.assert_equal(rx[IP].dst, "1.1.1.2")
871
872     def verify_encrypted(self, p, sa, rxs):
873         for rx in rxs:
874             try:
875                 pkt = sa.decrypt(rx[IP])
876                 if not pkt.haslayer(IP):
877                     pkt = IP(pkt[Raw].load)
878                 self.assert_packet_checksums_valid(pkt)
879                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
880                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
881                 self.assertTrue(pkt.haslayer(GRE))
882                 e = pkt[Ether]
883                 self.assertEqual(e[Ether].dst, self.omac)
884                 self.assertFalse(e.haslayer(Dot1Q))
885                 self.assertEqual(e[IP].dst, "1.1.1.2")
886             except (IndexError, AssertionError):
887                 self.logger.debug(ppp("Unexpected packet:", rx))
888                 try:
889                     self.logger.debug(ppp("Decrypted packet:", pkt))
890                 except:
891                     pass
892                 raise
893
894     def setUp(self):
895         super(TestIpsecGreTebVlanIfEsp, self).setUp()
896
897         self.tun_if = self.pg0
898
899         p = self.ipv4_params
900
901         bd1 = VppBridgeDomain(self, 1)
902         bd1.add_vpp_config()
903
904         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
905         self.vapi.l2_interface_vlan_tag_rewrite(
906             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
907             push_dot1q=11)
908         self.pg1_11.admin_up()
909
910         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
911                                   p.auth_algo_vpp_id, p.auth_key,
912                                   p.crypt_algo_vpp_id, p.crypt_key,
913                                   self.vpp_esp_protocol,
914                                   self.pg0.local_ip4,
915                                   self.pg0.remote_ip4)
916         p.tun_sa_out.add_vpp_config()
917
918         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
919                                  p.auth_algo_vpp_id, p.auth_key,
920                                  p.crypt_algo_vpp_id, p.crypt_key,
921                                  self.vpp_esp_protocol,
922                                  self.pg0.remote_ip4,
923                                  self.pg0.local_ip4)
924         p.tun_sa_in.add_vpp_config()
925
926         p.tun_if = VppGreInterface(self,
927                                    self.pg0.local_ip4,
928                                    self.pg0.remote_ip4,
929                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
930                                          GRE_API_TUNNEL_TYPE_TEB))
931         p.tun_if.add_vpp_config()
932
933         p.tun_protect = VppIpsecTunProtect(self,
934                                            p.tun_if,
935                                            p.tun_sa_out,
936                                            [p.tun_sa_in])
937
938         p.tun_protect.add_vpp_config()
939
940         p.tun_if.admin_up()
941         p.tun_if.config_ip4()
942         config_tun_params(p, self.encryption_type, p.tun_if)
943
944         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
945         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
946
947         self.vapi.cli("clear ipsec sa")
948
949     def tearDown(self):
950         p = self.ipv4_params
951         p.tun_if.unconfig_ip4()
952         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
953         self.pg1_11.admin_down()
954         self.pg1_11.remove_vpp_config()
955
956
957 class TestIpsecGreTebIfEspTra(TemplateIpsec,
958                               IpsecTun4Tests):
959     """ Ipsec GRE TEB ESP - Tra tests """
960     tun4_encrypt_node_name = "esp4-encrypt-tun"
961     tun4_decrypt_node_name = "esp4-decrypt-tun"
962     encryption_type = ESP
963     omac = "00:11:22:33:44:55"
964
965     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
966                          payload_size=100):
967         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
968                 sa.encrypt(IP(src=self.pg0.remote_ip4,
969                               dst=self.pg0.local_ip4) /
970                            GRE() /
971                            Ether(dst=self.omac) /
972                            IP(src="1.1.1.1", dst="1.1.1.2") /
973                            UDP(sport=1144, dport=2233) /
974                            Raw(b'X' * payload_size))
975                 for i in range(count)]
976
977     def gen_pkts(self, sw_intf, src, dst, count=1,
978                  payload_size=100):
979         return [Ether(dst=self.omac) /
980                 IP(src="1.1.1.1", dst="1.1.1.2") /
981                 UDP(sport=1144, dport=2233) /
982                 Raw(b'X' * payload_size)
983                 for i in range(count)]
984
985     def verify_decrypted(self, p, rxs):
986         for rx in rxs:
987             self.assert_equal(rx[Ether].dst, self.omac)
988             self.assert_equal(rx[IP].dst, "1.1.1.2")
989
990     def verify_encrypted(self, p, sa, rxs):
991         for rx in rxs:
992             try:
993                 pkt = sa.decrypt(rx[IP])
994                 if not pkt.haslayer(IP):
995                     pkt = IP(pkt[Raw].load)
996                 self.assert_packet_checksums_valid(pkt)
997                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
998                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
999                 self.assertTrue(pkt.haslayer(GRE))
1000                 e = pkt[Ether]
1001                 self.assertEqual(e[Ether].dst, self.omac)
1002                 self.assertEqual(e[IP].dst, "1.1.1.2")
1003             except (IndexError, AssertionError):
1004                 self.logger.debug(ppp("Unexpected packet:", rx))
1005                 try:
1006                     self.logger.debug(ppp("Decrypted packet:", pkt))
1007                 except:
1008                     pass
1009                 raise
1010
1011     def setUp(self):
1012         super(TestIpsecGreTebIfEspTra, self).setUp()
1013
1014         self.tun_if = self.pg0
1015
1016         p = self.ipv4_params
1017
1018         bd1 = VppBridgeDomain(self, 1)
1019         bd1.add_vpp_config()
1020
1021         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1022                                   p.auth_algo_vpp_id, p.auth_key,
1023                                   p.crypt_algo_vpp_id, p.crypt_key,
1024                                   self.vpp_esp_protocol)
1025         p.tun_sa_out.add_vpp_config()
1026
1027         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1028                                  p.auth_algo_vpp_id, p.auth_key,
1029                                  p.crypt_algo_vpp_id, p.crypt_key,
1030                                  self.vpp_esp_protocol)
1031         p.tun_sa_in.add_vpp_config()
1032
1033         p.tun_if = VppGreInterface(self,
1034                                    self.pg0.local_ip4,
1035                                    self.pg0.remote_ip4,
1036                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1037                                          GRE_API_TUNNEL_TYPE_TEB))
1038         p.tun_if.add_vpp_config()
1039
1040         p.tun_protect = VppIpsecTunProtect(self,
1041                                            p.tun_if,
1042                                            p.tun_sa_out,
1043                                            [p.tun_sa_in])
1044
1045         p.tun_protect.add_vpp_config()
1046
1047         p.tun_if.admin_up()
1048         p.tun_if.config_ip4()
1049         config_tra_params(p, self.encryption_type, p.tun_if)
1050
1051         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1052         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1053
1054         self.vapi.cli("clear ipsec sa")
1055
1056     def tearDown(self):
1057         p = self.ipv4_params
1058         p.tun_if.unconfig_ip4()
1059         super(TestIpsecGreTebIfEspTra, self).tearDown()
1060
1061
1062 class TestIpsecGreIfEsp(TemplateIpsec,
1063                         IpsecTun4Tests):
1064     """ Ipsec GRE ESP - TUN tests """
1065     tun4_encrypt_node_name = "esp4-encrypt-tun"
1066     tun4_decrypt_node_name = "esp4-decrypt-tun"
1067     encryption_type = ESP
1068
1069     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1070                          payload_size=100):
1071         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1072                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1073                               dst=self.pg0.local_ip4) /
1074                            GRE() /
1075                            IP(src=self.pg1.local_ip4,
1076                               dst=self.pg1.remote_ip4) /
1077                            UDP(sport=1144, dport=2233) /
1078                            Raw(b'X' * payload_size))
1079                 for i in range(count)]
1080
1081     def gen_pkts(self, sw_intf, src, dst, count=1,
1082                  payload_size=100):
1083         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1084                 IP(src="1.1.1.1", dst="1.1.1.2") /
1085                 UDP(sport=1144, dport=2233) /
1086                 Raw(b'X' * payload_size)
1087                 for i in range(count)]
1088
1089     def verify_decrypted(self, p, rxs):
1090         for rx in rxs:
1091             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1092             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1093
1094     def verify_encrypted(self, p, sa, rxs):
1095         for rx in rxs:
1096             try:
1097                 pkt = sa.decrypt(rx[IP])
1098                 if not pkt.haslayer(IP):
1099                     pkt = IP(pkt[Raw].load)
1100                 self.assert_packet_checksums_valid(pkt)
1101                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1102                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1103                 self.assertTrue(pkt.haslayer(GRE))
1104                 e = pkt[GRE]
1105                 self.assertEqual(e[IP].dst, "1.1.1.2")
1106             except (IndexError, AssertionError):
1107                 self.logger.debug(ppp("Unexpected packet:", rx))
1108                 try:
1109                     self.logger.debug(ppp("Decrypted packet:", pkt))
1110                 except:
1111                     pass
1112                 raise
1113
1114     def setUp(self):
1115         super(TestIpsecGreIfEsp, self).setUp()
1116
1117         self.tun_if = self.pg0
1118
1119         p = self.ipv4_params
1120
1121         bd1 = VppBridgeDomain(self, 1)
1122         bd1.add_vpp_config()
1123
1124         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1125                                   p.auth_algo_vpp_id, p.auth_key,
1126                                   p.crypt_algo_vpp_id, p.crypt_key,
1127                                   self.vpp_esp_protocol,
1128                                   self.pg0.local_ip4,
1129                                   self.pg0.remote_ip4)
1130         p.tun_sa_out.add_vpp_config()
1131
1132         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1133                                  p.auth_algo_vpp_id, p.auth_key,
1134                                  p.crypt_algo_vpp_id, p.crypt_key,
1135                                  self.vpp_esp_protocol,
1136                                  self.pg0.remote_ip4,
1137                                  self.pg0.local_ip4)
1138         p.tun_sa_in.add_vpp_config()
1139
1140         p.tun_if = VppGreInterface(self,
1141                                    self.pg0.local_ip4,
1142                                    self.pg0.remote_ip4)
1143         p.tun_if.add_vpp_config()
1144
1145         p.tun_protect = VppIpsecTunProtect(self,
1146                                            p.tun_if,
1147                                            p.tun_sa_out,
1148                                            [p.tun_sa_in])
1149         p.tun_protect.add_vpp_config()
1150
1151         p.tun_if.admin_up()
1152         p.tun_if.config_ip4()
1153         config_tun_params(p, self.encryption_type, p.tun_if)
1154
1155         VppIpRoute(self, "1.1.1.2", 32,
1156                    [VppRoutePath(p.tun_if.remote_ip4,
1157                                  0xffffffff)]).add_vpp_config()
1158
1159     def tearDown(self):
1160         p = self.ipv4_params
1161         p.tun_if.unconfig_ip4()
1162         super(TestIpsecGreIfEsp, self).tearDown()
1163
1164
1165 class TestIpsecGreIfEspTra(TemplateIpsec,
1166                            IpsecTun4Tests):
1167     """ Ipsec GRE ESP - TRA tests """
1168     tun4_encrypt_node_name = "esp4-encrypt-tun"
1169     tun4_decrypt_node_name = "esp4-decrypt-tun"
1170     encryption_type = ESP
1171
1172     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1173                          payload_size=100):
1174         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1175                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1176                               dst=self.pg0.local_ip4) /
1177                            GRE() /
1178                            IP(src=self.pg1.local_ip4,
1179                               dst=self.pg1.remote_ip4) /
1180                            UDP(sport=1144, dport=2233) /
1181                            Raw(b'X' * payload_size))
1182                 for i in range(count)]
1183
1184     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1185                                 payload_size=100):
1186         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1187                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1188                               dst=self.pg0.local_ip4) /
1189                            GRE() /
1190                            UDP(sport=1144, dport=2233) /
1191                            Raw(b'X' * payload_size))
1192                 for i in range(count)]
1193
1194     def gen_pkts(self, sw_intf, src, dst, count=1,
1195                  payload_size=100):
1196         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1197                 IP(src="1.1.1.1", dst="1.1.1.2") /
1198                 UDP(sport=1144, dport=2233) /
1199                 Raw(b'X' * payload_size)
1200                 for i in range(count)]
1201
1202     def verify_decrypted(self, p, rxs):
1203         for rx in rxs:
1204             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1205             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1206
1207     def verify_encrypted(self, p, sa, rxs):
1208         for rx in rxs:
1209             try:
1210                 pkt = sa.decrypt(rx[IP])
1211                 if not pkt.haslayer(IP):
1212                     pkt = IP(pkt[Raw].load)
1213                 self.assert_packet_checksums_valid(pkt)
1214                 self.assertTrue(pkt.haslayer(GRE))
1215                 e = pkt[GRE]
1216                 self.assertEqual(e[IP].dst, "1.1.1.2")
1217             except (IndexError, AssertionError):
1218                 self.logger.debug(ppp("Unexpected packet:", rx))
1219                 try:
1220                     self.logger.debug(ppp("Decrypted packet:", pkt))
1221                 except:
1222                     pass
1223                 raise
1224
1225     def setUp(self):
1226         super(TestIpsecGreIfEspTra, self).setUp()
1227
1228         self.tun_if = self.pg0
1229
1230         p = self.ipv4_params
1231
1232         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1233                                   p.auth_algo_vpp_id, p.auth_key,
1234                                   p.crypt_algo_vpp_id, p.crypt_key,
1235                                   self.vpp_esp_protocol)
1236         p.tun_sa_out.add_vpp_config()
1237
1238         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1239                                  p.auth_algo_vpp_id, p.auth_key,
1240                                  p.crypt_algo_vpp_id, p.crypt_key,
1241                                  self.vpp_esp_protocol)
1242         p.tun_sa_in.add_vpp_config()
1243
1244         p.tun_if = VppGreInterface(self,
1245                                    self.pg0.local_ip4,
1246                                    self.pg0.remote_ip4)
1247         p.tun_if.add_vpp_config()
1248
1249         p.tun_protect = VppIpsecTunProtect(self,
1250                                            p.tun_if,
1251                                            p.tun_sa_out,
1252                                            [p.tun_sa_in])
1253         p.tun_protect.add_vpp_config()
1254
1255         p.tun_if.admin_up()
1256         p.tun_if.config_ip4()
1257         config_tra_params(p, self.encryption_type, p.tun_if)
1258
1259         VppIpRoute(self, "1.1.1.2", 32,
1260                    [VppRoutePath(p.tun_if.remote_ip4,
1261                                  0xffffffff)]).add_vpp_config()
1262
1263     def tearDown(self):
1264         p = self.ipv4_params
1265         p.tun_if.unconfig_ip4()
1266         super(TestIpsecGreIfEspTra, self).tearDown()
1267
1268     def test_gre_non_ip(self):
1269         p = self.ipv4_params
1270         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1271                                           src=p.remote_tun_if_host,
1272                                           dst=self.pg1.remote_ip6)
1273         self.send_and_assert_no_replies(self.tun_if, tx)
1274         node_name = ('/err/%s/unsupported payload' %
1275                      self.tun4_decrypt_node_name)
1276         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1277
1278
1279 class TestIpsecGre6IfEspTra(TemplateIpsec,
1280                             IpsecTun6Tests):
1281     """ Ipsec GRE ESP - TRA tests """
1282     tun6_encrypt_node_name = "esp6-encrypt-tun"
1283     tun6_decrypt_node_name = "esp6-decrypt-tun"
1284     encryption_type = ESP
1285
1286     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1287                           payload_size=100):
1288         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1289                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1290                                 dst=self.pg0.local_ip6) /
1291                            GRE() /
1292                            IPv6(src=self.pg1.local_ip6,
1293                                 dst=self.pg1.remote_ip6) /
1294                            UDP(sport=1144, dport=2233) /
1295                            Raw(b'X' * payload_size))
1296                 for i in range(count)]
1297
1298     def gen_pkts6(self, sw_intf, src, dst, count=1,
1299                   payload_size=100):
1300         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1301                 IPv6(src="1::1", dst="1::2") /
1302                 UDP(sport=1144, dport=2233) /
1303                 Raw(b'X' * payload_size)
1304                 for i in range(count)]
1305
1306     def verify_decrypted6(self, p, rxs):
1307         for rx in rxs:
1308             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1309             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1310
1311     def verify_encrypted6(self, p, sa, rxs):
1312         for rx in rxs:
1313             try:
1314                 pkt = sa.decrypt(rx[IPv6])
1315                 if not pkt.haslayer(IPv6):
1316                     pkt = IPv6(pkt[Raw].load)
1317                 self.assert_packet_checksums_valid(pkt)
1318                 self.assertTrue(pkt.haslayer(GRE))
1319                 e = pkt[GRE]
1320                 self.assertEqual(e[IPv6].dst, "1::2")
1321             except (IndexError, AssertionError):
1322                 self.logger.debug(ppp("Unexpected packet:", rx))
1323                 try:
1324                     self.logger.debug(ppp("Decrypted packet:", pkt))
1325                 except:
1326                     pass
1327                 raise
1328
1329     def setUp(self):
1330         super(TestIpsecGre6IfEspTra, self).setUp()
1331
1332         self.tun_if = self.pg0
1333
1334         p = self.ipv6_params
1335
1336         bd1 = VppBridgeDomain(self, 1)
1337         bd1.add_vpp_config()
1338
1339         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1340                                   p.auth_algo_vpp_id, p.auth_key,
1341                                   p.crypt_algo_vpp_id, p.crypt_key,
1342                                   self.vpp_esp_protocol)
1343         p.tun_sa_out.add_vpp_config()
1344
1345         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1346                                  p.auth_algo_vpp_id, p.auth_key,
1347                                  p.crypt_algo_vpp_id, p.crypt_key,
1348                                  self.vpp_esp_protocol)
1349         p.tun_sa_in.add_vpp_config()
1350
1351         p.tun_if = VppGreInterface(self,
1352                                    self.pg0.local_ip6,
1353                                    self.pg0.remote_ip6)
1354         p.tun_if.add_vpp_config()
1355
1356         p.tun_protect = VppIpsecTunProtect(self,
1357                                            p.tun_if,
1358                                            p.tun_sa_out,
1359                                            [p.tun_sa_in])
1360         p.tun_protect.add_vpp_config()
1361
1362         p.tun_if.admin_up()
1363         p.tun_if.config_ip6()
1364         config_tra_params(p, self.encryption_type, p.tun_if)
1365
1366         r = VppIpRoute(self, "1::2", 128,
1367                        [VppRoutePath(p.tun_if.remote_ip6,
1368                                      0xffffffff,
1369                                      proto=DpoProto.DPO_PROTO_IP6)])
1370         r.add_vpp_config()
1371
1372     def tearDown(self):
1373         p = self.ipv6_params
1374         p.tun_if.unconfig_ip6()
1375         super(TestIpsecGre6IfEspTra, self).tearDown()
1376
1377
1378 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1379     """ Ipsec mGRE ESP v4 TRA tests """
1380     tun4_encrypt_node_name = "esp4-encrypt-tun"
1381     tun4_decrypt_node_name = "esp4-decrypt-tun"
1382     encryption_type = ESP
1383
1384     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1385                          payload_size=100):
1386         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1387                 sa.encrypt(IP(src=p.tun_dst,
1388                               dst=self.pg0.local_ip4) /
1389                            GRE() /
1390                            IP(src=self.pg1.local_ip4,
1391                               dst=self.pg1.remote_ip4) /
1392                            UDP(sport=1144, dport=2233) /
1393                            Raw(b'X' * payload_size))
1394                 for i in range(count)]
1395
1396     def gen_pkts(self, sw_intf, src, dst, count=1,
1397                  payload_size=100):
1398         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1399                 IP(src="1.1.1.1", dst=dst) /
1400                 UDP(sport=1144, dport=2233) /
1401                 Raw(b'X' * payload_size)
1402                 for i in range(count)]
1403
1404     def verify_decrypted(self, p, rxs):
1405         for rx in rxs:
1406             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1407             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1408
1409     def verify_encrypted(self, p, sa, rxs):
1410         for rx in rxs:
1411             try:
1412                 pkt = sa.decrypt(rx[IP])
1413                 if not pkt.haslayer(IP):
1414                     pkt = IP(pkt[Raw].load)
1415                 self.assert_packet_checksums_valid(pkt)
1416                 self.assertTrue(pkt.haslayer(GRE))
1417                 e = pkt[GRE]
1418                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1419             except (IndexError, AssertionError):
1420                 self.logger.debug(ppp("Unexpected packet:", rx))
1421                 try:
1422                     self.logger.debug(ppp("Decrypted packet:", pkt))
1423                 except:
1424                     pass
1425                 raise
1426
1427     def setUp(self):
1428         super(TestIpsecMGreIfEspTra4, self).setUp()
1429
1430         N_NHS = 16
1431         self.tun_if = self.pg0
1432         p = self.ipv4_params
1433         p.tun_if = VppGreInterface(self,
1434                                    self.pg0.local_ip4,
1435                                    "0.0.0.0",
1436                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1437                                          TUNNEL_API_MODE_MP))
1438         p.tun_if.add_vpp_config()
1439         p.tun_if.admin_up()
1440         p.tun_if.config_ip4()
1441         p.tun_if.generate_remote_hosts(N_NHS)
1442         self.pg0.generate_remote_hosts(N_NHS)
1443         self.pg0.configure_ipv4_neighbors()
1444
1445         # setup some SAs for several next-hops on the interface
1446         self.multi_params = []
1447
1448         for ii in range(N_NHS):
1449             p = copy.copy(self.ipv4_params)
1450
1451             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1452             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1453             p.scapy_tun_spi = p.scapy_tun_spi + ii
1454             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1455             p.vpp_tun_spi = p.vpp_tun_spi + ii
1456
1457             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1458             p.scapy_tra_spi = p.scapy_tra_spi + ii
1459             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1460             p.vpp_tra_spi = p.vpp_tra_spi + ii
1461             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1462                                       p.auth_algo_vpp_id, p.auth_key,
1463                                       p.crypt_algo_vpp_id, p.crypt_key,
1464                                       self.vpp_esp_protocol)
1465             p.tun_sa_out.add_vpp_config()
1466
1467             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1468                                      p.auth_algo_vpp_id, p.auth_key,
1469                                      p.crypt_algo_vpp_id, p.crypt_key,
1470                                      self.vpp_esp_protocol)
1471             p.tun_sa_in.add_vpp_config()
1472
1473             p.tun_protect = VppIpsecTunProtect(
1474                 self,
1475                 p.tun_if,
1476                 p.tun_sa_out,
1477                 [p.tun_sa_in],
1478                 nh=p.tun_if.remote_hosts[ii].ip4)
1479             p.tun_protect.add_vpp_config()
1480             config_tra_params(p, self.encryption_type, p.tun_if)
1481             self.multi_params.append(p)
1482
1483             VppIpRoute(self, p.remote_tun_if_host, 32,
1484                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1485                                      p.tun_if.sw_if_index)]).add_vpp_config()
1486
1487             # in this v4 variant add the teibs after the protect
1488             p.teib = VppTeib(self, p.tun_if,
1489                              p.tun_if.remote_hosts[ii].ip4,
1490                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1491             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1492         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1493
1494     def tearDown(self):
1495         p = self.ipv4_params
1496         p.tun_if.unconfig_ip4()
1497         super(TestIpsecMGreIfEspTra4, self).tearDown()
1498
1499     def test_tun_44(self):
1500         """mGRE IPSEC 44"""
1501         N_PKTS = 63
1502         for p in self.multi_params:
1503             self.verify_tun_44(p, count=N_PKTS)
1504             p.teib.remove_vpp_config()
1505             self.verify_tun_dropped_44(p, count=N_PKTS)
1506             p.teib.add_vpp_config()
1507             self.verify_tun_44(p, count=N_PKTS)
1508
1509
1510 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1511     """ Ipsec mGRE ESP v6 TRA tests """
1512     tun6_encrypt_node_name = "esp6-encrypt-tun"
1513     tun6_decrypt_node_name = "esp6-decrypt-tun"
1514     encryption_type = ESP
1515
1516     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1517                           payload_size=100):
1518         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1519                 sa.encrypt(IPv6(src=p.tun_dst,
1520                                 dst=self.pg0.local_ip6) /
1521                            GRE() /
1522                            IPv6(src=self.pg1.local_ip6,
1523                                 dst=self.pg1.remote_ip6) /
1524                            UDP(sport=1144, dport=2233) /
1525                            Raw(b'X' * payload_size))
1526                 for i in range(count)]
1527
1528     def gen_pkts6(self, sw_intf, src, dst, count=1,
1529                   payload_size=100):
1530         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1531                 IPv6(src="1::1", dst=dst) /
1532                 UDP(sport=1144, dport=2233) /
1533                 Raw(b'X' * payload_size)
1534                 for i in range(count)]
1535
1536     def verify_decrypted6(self, p, rxs):
1537         for rx in rxs:
1538             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1539             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1540
1541     def verify_encrypted6(self, p, sa, rxs):
1542         for rx in rxs:
1543             try:
1544                 pkt = sa.decrypt(rx[IPv6])
1545                 if not pkt.haslayer(IPv6):
1546                     pkt = IPv6(pkt[Raw].load)
1547                 self.assert_packet_checksums_valid(pkt)
1548                 self.assertTrue(pkt.haslayer(GRE))
1549                 e = pkt[GRE]
1550                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1551             except (IndexError, AssertionError):
1552                 self.logger.debug(ppp("Unexpected packet:", rx))
1553                 try:
1554                     self.logger.debug(ppp("Decrypted packet:", pkt))
1555                 except:
1556                     pass
1557                 raise
1558
1559     def setUp(self):
1560         super(TestIpsecMGreIfEspTra6, self).setUp()
1561
1562         self.vapi.cli("set logging class ipsec level debug")
1563
1564         N_NHS = 16
1565         self.tun_if = self.pg0
1566         p = self.ipv6_params
1567         p.tun_if = VppGreInterface(self,
1568                                    self.pg0.local_ip6,
1569                                    "::",
1570                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1571                                          TUNNEL_API_MODE_MP))
1572         p.tun_if.add_vpp_config()
1573         p.tun_if.admin_up()
1574         p.tun_if.config_ip6()
1575         p.tun_if.generate_remote_hosts(N_NHS)
1576         self.pg0.generate_remote_hosts(N_NHS)
1577         self.pg0.configure_ipv6_neighbors()
1578
1579         # setup some SAs for several next-hops on the interface
1580         self.multi_params = []
1581
1582         for ii in range(N_NHS):
1583             p = copy.copy(self.ipv6_params)
1584
1585             p.remote_tun_if_host = "1::%d" % (ii + 1)
1586             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1587             p.scapy_tun_spi = p.scapy_tun_spi + ii
1588             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1589             p.vpp_tun_spi = p.vpp_tun_spi + ii
1590
1591             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1592             p.scapy_tra_spi = p.scapy_tra_spi + ii
1593             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1594             p.vpp_tra_spi = p.vpp_tra_spi + ii
1595             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1596                                       p.auth_algo_vpp_id, p.auth_key,
1597                                       p.crypt_algo_vpp_id, p.crypt_key,
1598                                       self.vpp_esp_protocol)
1599             p.tun_sa_out.add_vpp_config()
1600
1601             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1602                                      p.auth_algo_vpp_id, p.auth_key,
1603                                      p.crypt_algo_vpp_id, p.crypt_key,
1604                                      self.vpp_esp_protocol)
1605             p.tun_sa_in.add_vpp_config()
1606
1607             # in this v6 variant add the teibs first then the protection
1608             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1609             VppTeib(self, p.tun_if,
1610                     p.tun_if.remote_hosts[ii].ip6,
1611                     p.tun_dst).add_vpp_config()
1612
1613             p.tun_protect = VppIpsecTunProtect(
1614                 self,
1615                 p.tun_if,
1616                 p.tun_sa_out,
1617                 [p.tun_sa_in],
1618                 nh=p.tun_if.remote_hosts[ii].ip6)
1619             p.tun_protect.add_vpp_config()
1620             config_tra_params(p, self.encryption_type, p.tun_if)
1621             self.multi_params.append(p)
1622
1623             VppIpRoute(self, p.remote_tun_if_host, 128,
1624                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1625                                      p.tun_if.sw_if_index)]).add_vpp_config()
1626             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1627
1628         self.logger.info(self.vapi.cli("sh log"))
1629         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1630         self.logger.info(self.vapi.cli("sh adj 41"))
1631
1632     def tearDown(self):
1633         p = self.ipv6_params
1634         p.tun_if.unconfig_ip6()
1635         super(TestIpsecMGreIfEspTra6, self).tearDown()
1636
1637     def test_tun_66(self):
1638         """mGRE IPSec 66"""
1639         for p in self.multi_params:
1640             self.verify_tun_66(p, count=63)
1641
1642
1643 class TemplateIpsec4TunProtect(object):
1644     """ IPsec IPv4 Tunnel protect """
1645
1646     encryption_type = ESP
1647     tun4_encrypt_node_name = "esp4-encrypt-tun"
1648     tun4_decrypt_node_name = "esp4-decrypt-tun"
1649     tun4_input_node = "ipsec4-tun-input"
1650
1651     def config_sa_tra(self, p):
1652         config_tun_params(p, self.encryption_type, p.tun_if)
1653
1654         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1655                                   p.auth_algo_vpp_id, p.auth_key,
1656                                   p.crypt_algo_vpp_id, p.crypt_key,
1657                                   self.vpp_esp_protocol,
1658                                   flags=p.flags)
1659         p.tun_sa_out.add_vpp_config()
1660
1661         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1662                                  p.auth_algo_vpp_id, p.auth_key,
1663                                  p.crypt_algo_vpp_id, p.crypt_key,
1664                                  self.vpp_esp_protocol,
1665                                  flags=p.flags)
1666         p.tun_sa_in.add_vpp_config()
1667
1668     def config_sa_tun(self, p):
1669         config_tun_params(p, self.encryption_type, p.tun_if)
1670
1671         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1672                                   p.auth_algo_vpp_id, p.auth_key,
1673                                   p.crypt_algo_vpp_id, p.crypt_key,
1674                                   self.vpp_esp_protocol,
1675                                   self.tun_if.local_addr[p.addr_type],
1676                                   self.tun_if.remote_addr[p.addr_type],
1677                                   flags=p.flags)
1678         p.tun_sa_out.add_vpp_config()
1679
1680         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1681                                  p.auth_algo_vpp_id, p.auth_key,
1682                                  p.crypt_algo_vpp_id, p.crypt_key,
1683                                  self.vpp_esp_protocol,
1684                                  self.tun_if.remote_addr[p.addr_type],
1685                                  self.tun_if.local_addr[p.addr_type],
1686                                  flags=p.flags)
1687         p.tun_sa_in.add_vpp_config()
1688
1689     def config_protect(self, p):
1690         p.tun_protect = VppIpsecTunProtect(self,
1691                                            p.tun_if,
1692                                            p.tun_sa_out,
1693                                            [p.tun_sa_in])
1694         p.tun_protect.add_vpp_config()
1695
1696     def config_network(self, p):
1697         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1698                                        self.pg0.local_ip4,
1699                                        self.pg0.remote_ip4)
1700         p.tun_if.add_vpp_config()
1701         p.tun_if.admin_up()
1702         p.tun_if.config_ip4()
1703         p.tun_if.config_ip6()
1704
1705         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1706                              [VppRoutePath(p.tun_if.remote_ip4,
1707                                            0xffffffff)])
1708         p.route.add_vpp_config()
1709         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1710                        [VppRoutePath(p.tun_if.remote_ip6,
1711                                      0xffffffff,
1712                                      proto=DpoProto.DPO_PROTO_IP6)])
1713         r.add_vpp_config()
1714
1715     def unconfig_network(self, p):
1716         p.route.remove_vpp_config()
1717         p.tun_if.remove_vpp_config()
1718
1719     def unconfig_protect(self, p):
1720         p.tun_protect.remove_vpp_config()
1721
1722     def unconfig_sa(self, p):
1723         p.tun_sa_out.remove_vpp_config()
1724         p.tun_sa_in.remove_vpp_config()
1725
1726
1727 class TestIpsec4TunProtect(TemplateIpsec,
1728                            TemplateIpsec4TunProtect,
1729                            IpsecTun4):
1730     """ IPsec IPv4 Tunnel protect - transport mode"""
1731
1732     def setUp(self):
1733         super(TestIpsec4TunProtect, self).setUp()
1734
1735         self.tun_if = self.pg0
1736
1737     def tearDown(self):
1738         super(TestIpsec4TunProtect, self).tearDown()
1739
1740     def test_tun_44(self):
1741         """IPSEC tunnel protect"""
1742
1743         p = self.ipv4_params
1744
1745         self.config_network(p)
1746         self.config_sa_tra(p)
1747         self.config_protect(p)
1748
1749         self.verify_tun_44(p, count=127)
1750         c = p.tun_if.get_rx_stats()
1751         self.assertEqual(c['packets'], 127)
1752         c = p.tun_if.get_tx_stats()
1753         self.assertEqual(c['packets'], 127)
1754
1755         self.vapi.cli("clear ipsec sa")
1756         self.verify_tun_64(p, count=127)
1757         c = p.tun_if.get_rx_stats()
1758         self.assertEqual(c['packets'], 254)
1759         c = p.tun_if.get_tx_stats()
1760         self.assertEqual(c['packets'], 254)
1761
1762         # rekey - create new SAs and update the tunnel protection
1763         np = copy.copy(p)
1764         np.crypt_key = b'X' + p.crypt_key[1:]
1765         np.scapy_tun_spi += 100
1766         np.scapy_tun_sa_id += 1
1767         np.vpp_tun_spi += 100
1768         np.vpp_tun_sa_id += 1
1769         np.tun_if.local_spi = p.vpp_tun_spi
1770         np.tun_if.remote_spi = p.scapy_tun_spi
1771
1772         self.config_sa_tra(np)
1773         self.config_protect(np)
1774         self.unconfig_sa(p)
1775
1776         self.verify_tun_44(np, count=127)
1777         c = p.tun_if.get_rx_stats()
1778         self.assertEqual(c['packets'], 381)
1779         c = p.tun_if.get_tx_stats()
1780         self.assertEqual(c['packets'], 381)
1781
1782         # teardown
1783         self.unconfig_protect(np)
1784         self.unconfig_sa(np)
1785         self.unconfig_network(p)
1786
1787
1788 class TestIpsec4TunProtectUdp(TemplateIpsec,
1789                               TemplateIpsec4TunProtect,
1790                               IpsecTun4):
1791     """ IPsec IPv4 Tunnel protect - transport mode"""
1792
1793     def setUp(self):
1794         super(TestIpsec4TunProtectUdp, self).setUp()
1795
1796         self.tun_if = self.pg0
1797
1798         p = self.ipv4_params
1799         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1800                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1801         p.nat_header = UDP(sport=5454, dport=4500)
1802         self.config_network(p)
1803         self.config_sa_tra(p)
1804         self.config_protect(p)
1805
1806     def tearDown(self):
1807         p = self.ipv4_params
1808         self.unconfig_protect(p)
1809         self.unconfig_sa(p)
1810         self.unconfig_network(p)
1811         super(TestIpsec4TunProtectUdp, self).tearDown()
1812
1813     def test_tun_44(self):
1814         """IPSEC UDP tunnel protect"""
1815
1816         p = self.ipv4_params
1817
1818         self.verify_tun_44(p, count=127)
1819         c = p.tun_if.get_rx_stats()
1820         self.assertEqual(c['packets'], 127)
1821         c = p.tun_if.get_tx_stats()
1822         self.assertEqual(c['packets'], 127)
1823
1824     def test_keepalive(self):
1825         """ IPSEC NAT Keepalive """
1826         self.verify_keepalive(self.ipv4_params)
1827
1828
1829 class TestIpsec4TunProtectTun(TemplateIpsec,
1830                               TemplateIpsec4TunProtect,
1831                               IpsecTun4):
1832     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1833
1834     encryption_type = ESP
1835     tun4_encrypt_node_name = "esp4-encrypt-tun"
1836     tun4_decrypt_node_name = "esp4-decrypt-tun"
1837
1838     def setUp(self):
1839         super(TestIpsec4TunProtectTun, self).setUp()
1840
1841         self.tun_if = self.pg0
1842
1843     def tearDown(self):
1844         super(TestIpsec4TunProtectTun, self).tearDown()
1845
1846     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1847                          payload_size=100):
1848         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1849                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1850                               dst=sw_intf.local_ip4) /
1851                            IP(src=src, dst=dst) /
1852                            UDP(sport=1144, dport=2233) /
1853                            Raw(b'X' * payload_size))
1854                 for i in range(count)]
1855
1856     def gen_pkts(self, sw_intf, src, dst, count=1,
1857                  payload_size=100):
1858         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1859                 IP(src=src, dst=dst) /
1860                 UDP(sport=1144, dport=2233) /
1861                 Raw(b'X' * payload_size)
1862                 for i in range(count)]
1863
1864     def verify_decrypted(self, p, rxs):
1865         for rx in rxs:
1866             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1867             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1868             self.assert_packet_checksums_valid(rx)
1869
1870     def verify_encrypted(self, p, sa, rxs):
1871         for rx in rxs:
1872             try:
1873                 pkt = sa.decrypt(rx[IP])
1874                 if not pkt.haslayer(IP):
1875                     pkt = IP(pkt[Raw].load)
1876                 self.assert_packet_checksums_valid(pkt)
1877                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1878                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1879                 inner = pkt[IP].payload
1880                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1881
1882             except (IndexError, AssertionError):
1883                 self.logger.debug(ppp("Unexpected packet:", rx))
1884                 try:
1885                     self.logger.debug(ppp("Decrypted packet:", pkt))
1886                 except:
1887                     pass
1888                 raise
1889
1890     def test_tun_44(self):
1891         """IPSEC tunnel protect """
1892
1893         p = self.ipv4_params
1894
1895         self.config_network(p)
1896         self.config_sa_tun(p)
1897         self.config_protect(p)
1898
1899         self.verify_tun_44(p, count=127)
1900
1901         c = p.tun_if.get_rx_stats()
1902         self.assertEqual(c['packets'], 127)
1903         c = p.tun_if.get_tx_stats()
1904         self.assertEqual(c['packets'], 127)
1905
1906         # rekey - create new SAs and update the tunnel protection
1907         np = copy.copy(p)
1908         np.crypt_key = b'X' + p.crypt_key[1:]
1909         np.scapy_tun_spi += 100
1910         np.scapy_tun_sa_id += 1
1911         np.vpp_tun_spi += 100
1912         np.vpp_tun_sa_id += 1
1913         np.tun_if.local_spi = p.vpp_tun_spi
1914         np.tun_if.remote_spi = p.scapy_tun_spi
1915
1916         self.config_sa_tun(np)
1917         self.config_protect(np)
1918         self.unconfig_sa(p)
1919
1920         self.verify_tun_44(np, count=127)
1921         c = p.tun_if.get_rx_stats()
1922         self.assertEqual(c['packets'], 254)
1923         c = p.tun_if.get_tx_stats()
1924         self.assertEqual(c['packets'], 254)
1925
1926         # teardown
1927         self.unconfig_protect(np)
1928         self.unconfig_sa(np)
1929         self.unconfig_network(p)
1930
1931
1932 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1933                                   TemplateIpsec4TunProtect,
1934                                   IpsecTun4):
1935     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1936
1937     encryption_type = ESP
1938     tun4_encrypt_node_name = "esp4-encrypt-tun"
1939     tun4_decrypt_node_name = "esp4-decrypt-tun"
1940
1941     def setUp(self):
1942         super(TestIpsec4TunProtectTunDrop, self).setUp()
1943
1944         self.tun_if = self.pg0
1945
1946     def tearDown(self):
1947         super(TestIpsec4TunProtectTunDrop, self).tearDown()
1948
1949     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1950                          payload_size=100):
1951         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1952                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1953                               dst="5.5.5.5") /
1954                            IP(src=src, dst=dst) /
1955                            UDP(sport=1144, dport=2233) /
1956                            Raw(b'X' * payload_size))
1957                 for i in range(count)]
1958
1959     def test_tun_drop_44(self):
1960         """IPSEC tunnel protect bogus tunnel header """
1961
1962         p = self.ipv4_params
1963
1964         self.config_network(p)
1965         self.config_sa_tun(p)
1966         self.config_protect(p)
1967
1968         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1969                                    src=p.remote_tun_if_host,
1970                                    dst=self.pg1.remote_ip4,
1971                                    count=63)
1972         self.send_and_assert_no_replies(self.tun_if, tx)
1973
1974         # teardown
1975         self.unconfig_protect(p)
1976         self.unconfig_sa(p)
1977         self.unconfig_network(p)
1978
1979
1980 class TemplateIpsec6TunProtect(object):
1981     """ IPsec IPv6 Tunnel protect """
1982
1983     def config_sa_tra(self, p):
1984         config_tun_params(p, self.encryption_type, p.tun_if)
1985
1986         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1987                                   p.auth_algo_vpp_id, p.auth_key,
1988                                   p.crypt_algo_vpp_id, p.crypt_key,
1989                                   self.vpp_esp_protocol)
1990         p.tun_sa_out.add_vpp_config()
1991
1992         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1993                                  p.auth_algo_vpp_id, p.auth_key,
1994                                  p.crypt_algo_vpp_id, p.crypt_key,
1995                                  self.vpp_esp_protocol)
1996         p.tun_sa_in.add_vpp_config()
1997
1998     def config_sa_tun(self, p):
1999         config_tun_params(p, self.encryption_type, p.tun_if)
2000
2001         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2002                                   p.auth_algo_vpp_id, p.auth_key,
2003                                   p.crypt_algo_vpp_id, p.crypt_key,
2004                                   self.vpp_esp_protocol,
2005                                   self.tun_if.local_addr[p.addr_type],
2006                                   self.tun_if.remote_addr[p.addr_type])
2007         p.tun_sa_out.add_vpp_config()
2008
2009         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2010                                  p.auth_algo_vpp_id, p.auth_key,
2011                                  p.crypt_algo_vpp_id, p.crypt_key,
2012                                  self.vpp_esp_protocol,
2013                                  self.tun_if.remote_addr[p.addr_type],
2014                                  self.tun_if.local_addr[p.addr_type])
2015         p.tun_sa_in.add_vpp_config()
2016
2017     def config_protect(self, p):
2018         p.tun_protect = VppIpsecTunProtect(self,
2019                                            p.tun_if,
2020                                            p.tun_sa_out,
2021                                            [p.tun_sa_in])
2022         p.tun_protect.add_vpp_config()
2023
2024     def config_network(self, p):
2025         p.tun_if = VppIpIpTunInterface(self, self.pg0,
2026                                        self.pg0.local_ip6,
2027                                        self.pg0.remote_ip6)
2028         p.tun_if.add_vpp_config()
2029         p.tun_if.admin_up()
2030         p.tun_if.config_ip6()
2031         p.tun_if.config_ip4()
2032
2033         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2034                              [VppRoutePath(p.tun_if.remote_ip6,
2035                                            0xffffffff,
2036                                            proto=DpoProto.DPO_PROTO_IP6)])
2037         p.route.add_vpp_config()
2038         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2039                        [VppRoutePath(p.tun_if.remote_ip4,
2040                                      0xffffffff)])
2041         r.add_vpp_config()
2042
2043     def unconfig_network(self, p):
2044         p.route.remove_vpp_config()
2045         p.tun_if.remove_vpp_config()
2046
2047     def unconfig_protect(self, p):
2048         p.tun_protect.remove_vpp_config()
2049
2050     def unconfig_sa(self, p):
2051         p.tun_sa_out.remove_vpp_config()
2052         p.tun_sa_in.remove_vpp_config()
2053
2054
2055 class TestIpsec6TunProtect(TemplateIpsec,
2056                            TemplateIpsec6TunProtect,
2057                            IpsecTun6):
2058     """ IPsec IPv6 Tunnel protect - transport mode"""
2059
2060     encryption_type = ESP
2061     tun6_encrypt_node_name = "esp6-encrypt-tun"
2062     tun6_decrypt_node_name = "esp6-decrypt-tun"
2063
2064     def setUp(self):
2065         super(TestIpsec6TunProtect, self).setUp()
2066
2067         self.tun_if = self.pg0
2068
2069     def tearDown(self):
2070         super(TestIpsec6TunProtect, self).tearDown()
2071
2072     def test_tun_66(self):
2073         """IPSEC tunnel protect 6o6"""
2074
2075         p = self.ipv6_params
2076
2077         self.config_network(p)
2078         self.config_sa_tra(p)
2079         self.config_protect(p)
2080
2081         self.verify_tun_66(p, count=127)
2082         c = p.tun_if.get_rx_stats()
2083         self.assertEqual(c['packets'], 127)
2084         c = p.tun_if.get_tx_stats()
2085         self.assertEqual(c['packets'], 127)
2086
2087         # rekey - create new SAs and update the tunnel protection
2088         np = copy.copy(p)
2089         np.crypt_key = b'X' + p.crypt_key[1:]
2090         np.scapy_tun_spi += 100
2091         np.scapy_tun_sa_id += 1
2092         np.vpp_tun_spi += 100
2093         np.vpp_tun_sa_id += 1
2094         np.tun_if.local_spi = p.vpp_tun_spi
2095         np.tun_if.remote_spi = p.scapy_tun_spi
2096
2097         self.config_sa_tra(np)
2098         self.config_protect(np)
2099         self.unconfig_sa(p)
2100
2101         self.verify_tun_66(np, count=127)
2102         c = p.tun_if.get_rx_stats()
2103         self.assertEqual(c['packets'], 254)
2104         c = p.tun_if.get_tx_stats()
2105         self.assertEqual(c['packets'], 254)
2106
2107         # bounce the interface state
2108         p.tun_if.admin_down()
2109         self.verify_drop_tun_66(np, count=127)
2110         node = ('/err/ipsec6-tun-input/%s' %
2111                 'ipsec packets received on disabled interface')
2112         self.assertEqual(127, self.statistics.get_err_counter(node))
2113         p.tun_if.admin_up()
2114         self.verify_tun_66(np, count=127)
2115
2116         # 3 phase rekey
2117         #  1) add two input SAs [old, new]
2118         #  2) swap output SA to [new]
2119         #  3) use only [new] input SA
2120         np3 = copy.copy(np)
2121         np3.crypt_key = b'Z' + p.crypt_key[1:]
2122         np3.scapy_tun_spi += 100
2123         np3.scapy_tun_sa_id += 1
2124         np3.vpp_tun_spi += 100
2125         np3.vpp_tun_sa_id += 1
2126         np3.tun_if.local_spi = p.vpp_tun_spi
2127         np3.tun_if.remote_spi = p.scapy_tun_spi
2128
2129         self.config_sa_tra(np3)
2130
2131         # step 1;
2132         p.tun_protect.update_vpp_config(np.tun_sa_out,
2133                                         [np.tun_sa_in, np3.tun_sa_in])
2134         self.verify_tun_66(np, np, count=127)
2135         self.verify_tun_66(np3, np, count=127)
2136
2137         # step 2;
2138         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2139                                         [np.tun_sa_in, np3.tun_sa_in])
2140         self.verify_tun_66(np, np3, count=127)
2141         self.verify_tun_66(np3, np3, count=127)
2142
2143         # step 1;
2144         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2145                                         [np3.tun_sa_in])
2146         self.verify_tun_66(np3, np3, count=127)
2147         self.verify_drop_tun_66(np, count=127)
2148
2149         c = p.tun_if.get_rx_stats()
2150         self.assertEqual(c['packets'], 127*9)
2151         c = p.tun_if.get_tx_stats()
2152         self.assertEqual(c['packets'], 127*8)
2153         self.unconfig_sa(np)
2154
2155         # teardown
2156         self.unconfig_protect(np3)
2157         self.unconfig_sa(np3)
2158         self.unconfig_network(p)
2159
2160     def test_tun_46(self):
2161         """IPSEC tunnel protect 4o6"""
2162
2163         p = self.ipv6_params
2164
2165         self.config_network(p)
2166         self.config_sa_tra(p)
2167         self.config_protect(p)
2168
2169         self.verify_tun_46(p, count=127)
2170         c = p.tun_if.get_rx_stats()
2171         self.assertEqual(c['packets'], 127)
2172         c = p.tun_if.get_tx_stats()
2173         self.assertEqual(c['packets'], 127)
2174
2175         # teardown
2176         self.unconfig_protect(p)
2177         self.unconfig_sa(p)
2178         self.unconfig_network(p)
2179
2180
2181 class TestIpsec6TunProtectTun(TemplateIpsec,
2182                               TemplateIpsec6TunProtect,
2183                               IpsecTun6):
2184     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2185
2186     encryption_type = ESP
2187     tun6_encrypt_node_name = "esp6-encrypt-tun"
2188     tun6_decrypt_node_name = "esp6-decrypt-tun"
2189
2190     def setUp(self):
2191         super(TestIpsec6TunProtectTun, self).setUp()
2192
2193         self.tun_if = self.pg0
2194
2195     def tearDown(self):
2196         super(TestIpsec6TunProtectTun, self).tearDown()
2197
2198     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2199                           payload_size=100):
2200         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2201                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2202                                 dst=sw_intf.local_ip6) /
2203                            IPv6(src=src, dst=dst) /
2204                            UDP(sport=1166, dport=2233) /
2205                            Raw(b'X' * payload_size))
2206                 for i in range(count)]
2207
2208     def gen_pkts6(self, sw_intf, src, dst, count=1,
2209                   payload_size=100):
2210         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2211                 IPv6(src=src, dst=dst) /
2212                 UDP(sport=1166, dport=2233) /
2213                 Raw(b'X' * payload_size)
2214                 for i in range(count)]
2215
2216     def verify_decrypted6(self, p, rxs):
2217         for rx in rxs:
2218             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2219             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2220             self.assert_packet_checksums_valid(rx)
2221
2222     def verify_encrypted6(self, p, sa, rxs):
2223         for rx in rxs:
2224             try:
2225                 pkt = sa.decrypt(rx[IPv6])
2226                 if not pkt.haslayer(IPv6):
2227                     pkt = IPv6(pkt[Raw].load)
2228                 self.assert_packet_checksums_valid(pkt)
2229                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2230                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2231                 inner = pkt[IPv6].payload
2232                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2233
2234             except (IndexError, AssertionError):
2235                 self.logger.debug(ppp("Unexpected packet:", rx))
2236                 try:
2237                     self.logger.debug(ppp("Decrypted packet:", pkt))
2238                 except:
2239                     pass
2240                 raise
2241
2242     def test_tun_66(self):
2243         """IPSEC tunnel protect """
2244
2245         p = self.ipv6_params
2246
2247         self.config_network(p)
2248         self.config_sa_tun(p)
2249         self.config_protect(p)
2250
2251         self.verify_tun_66(p, count=127)
2252
2253         c = p.tun_if.get_rx_stats()
2254         self.assertEqual(c['packets'], 127)
2255         c = p.tun_if.get_tx_stats()
2256         self.assertEqual(c['packets'], 127)
2257
2258         # rekey - create new SAs and update the tunnel protection
2259         np = copy.copy(p)
2260         np.crypt_key = b'X' + p.crypt_key[1:]
2261         np.scapy_tun_spi += 100
2262         np.scapy_tun_sa_id += 1
2263         np.vpp_tun_spi += 100
2264         np.vpp_tun_sa_id += 1
2265         np.tun_if.local_spi = p.vpp_tun_spi
2266         np.tun_if.remote_spi = p.scapy_tun_spi
2267
2268         self.config_sa_tun(np)
2269         self.config_protect(np)
2270         self.unconfig_sa(p)
2271
2272         self.verify_tun_66(np, count=127)
2273         c = p.tun_if.get_rx_stats()
2274         self.assertEqual(c['packets'], 254)
2275         c = p.tun_if.get_tx_stats()
2276         self.assertEqual(c['packets'], 254)
2277
2278         # teardown
2279         self.unconfig_protect(np)
2280         self.unconfig_sa(np)
2281         self.unconfig_network(p)
2282
2283
2284 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2285                                   TemplateIpsec6TunProtect,
2286                                   IpsecTun6):
2287     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2288
2289     encryption_type = ESP
2290     tun6_encrypt_node_name = "esp6-encrypt-tun"
2291     tun6_decrypt_node_name = "esp6-decrypt-tun"
2292
2293     def setUp(self):
2294         super(TestIpsec6TunProtectTunDrop, self).setUp()
2295
2296         self.tun_if = self.pg0
2297
2298     def tearDown(self):
2299         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2300
2301     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2302                           payload_size=100):
2303         # the IP destination of the revelaed packet does not match
2304         # that assigned to the tunnel
2305         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2306                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2307                                 dst="5::5") /
2308                            IPv6(src=src, dst=dst) /
2309                            UDP(sport=1144, dport=2233) /
2310                            Raw(b'X' * payload_size))
2311                 for i in range(count)]
2312
2313     def test_tun_drop_66(self):
2314         """IPSEC 6 tunnel protect bogus tunnel header """
2315
2316         p = self.ipv6_params
2317
2318         self.config_network(p)
2319         self.config_sa_tun(p)
2320         self.config_protect(p)
2321
2322         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2323                                     src=p.remote_tun_if_host,
2324                                     dst=self.pg1.remote_ip6,
2325                                     count=63)
2326         self.send_and_assert_no_replies(self.tun_if, tx)
2327
2328         self.unconfig_protect(p)
2329         self.unconfig_sa(p)
2330         self.unconfig_network(p)
2331
2332
2333 if __name__ == '__main__':
2334     unittest.main(testRunner=VppTestRunner)