ipsec: Support MPLS over IPSec[46] interface
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import VppTestRunner
12 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
13     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
14     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
18     VppMplsTable, VppMplsRoute, FibPathProto
19 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
20 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
21 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
22 from vpp_teib import VppTeib
23 from util import ppp
24 from vpp_papi import VppEnum
25 from vpp_papi_provider import CliFailedCommandError
26 from vpp_acl import AclRule, VppAcl, VppAclInterface
27
28
29 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
30     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
31     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
32                              IPSEC_API_SAD_FLAG_USE_ESN))
33     crypt_key = mk_scapy_crypt_key(p)
34     if tun_if:
35         p.tun_dst = tun_if.remote_ip
36         p.tun_src = tun_if.local_ip
37     else:
38         p.tun_dst = dst
39         p.tun_src = src
40
41     p.scapy_tun_sa = SecurityAssociation(
42         encryption_type, spi=p.vpp_tun_spi,
43         crypt_algo=p.crypt_algo,
44         crypt_key=crypt_key,
45         auth_algo=p.auth_algo, auth_key=p.auth_key,
46         tunnel_header=ip_class_by_addr_type[p.addr_type](
47             src=p.tun_dst,
48             dst=p.tun_src),
49         nat_t_header=p.nat_header,
50         esn_en=esn_en)
51     p.vpp_tun_sa = SecurityAssociation(
52         encryption_type, spi=p.scapy_tun_spi,
53         crypt_algo=p.crypt_algo,
54         crypt_key=crypt_key,
55         auth_algo=p.auth_algo, auth_key=p.auth_key,
56         tunnel_header=ip_class_by_addr_type[p.addr_type](
57             dst=p.tun_dst,
58             src=p.tun_src),
59         nat_t_header=p.nat_header,
60         esn_en=esn_en)
61
62
63 def config_tra_params(p, encryption_type, tun_if):
64     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
65     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
66                              IPSEC_API_SAD_FLAG_USE_ESN))
67     crypt_key = mk_scapy_crypt_key(p)
68     p.tun_dst = tun_if.remote_ip
69     p.tun_src = tun_if.local_ip
70     p.scapy_tun_sa = SecurityAssociation(
71         encryption_type, spi=p.vpp_tun_spi,
72         crypt_algo=p.crypt_algo,
73         crypt_key=crypt_key,
74         auth_algo=p.auth_algo, auth_key=p.auth_key,
75         esn_en=esn_en,
76         nat_t_header=p.nat_header)
77     p.vpp_tun_sa = SecurityAssociation(
78         encryption_type, spi=p.scapy_tun_spi,
79         crypt_algo=p.crypt_algo,
80         crypt_key=crypt_key,
81         auth_algo=p.auth_algo, auth_key=p.auth_key,
82         esn_en=esn_en,
83         nat_t_header=p.nat_header)
84
85
86 class TemplateIpsec4TunProtect(object):
87     """ IPsec IPv4 Tunnel protect """
88
89     encryption_type = ESP
90     tun4_encrypt_node_name = "esp4-encrypt-tun"
91     tun4_decrypt_node_name = "esp4-decrypt-tun"
92     tun4_input_node = "ipsec4-tun-input"
93
94     def config_sa_tra(self, p):
95         config_tun_params(p, self.encryption_type, p.tun_if)
96
97         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
98                                   p.auth_algo_vpp_id, p.auth_key,
99                                   p.crypt_algo_vpp_id, p.crypt_key,
100                                   self.vpp_esp_protocol,
101                                   flags=p.flags)
102         p.tun_sa_out.add_vpp_config()
103
104         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
105                                  p.auth_algo_vpp_id, p.auth_key,
106                                  p.crypt_algo_vpp_id, p.crypt_key,
107                                  self.vpp_esp_protocol,
108                                  flags=p.flags)
109         p.tun_sa_in.add_vpp_config()
110
111     def config_sa_tun(self, p):
112         config_tun_params(p, self.encryption_type, p.tun_if)
113
114         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
115                                   p.auth_algo_vpp_id, p.auth_key,
116                                   p.crypt_algo_vpp_id, p.crypt_key,
117                                   self.vpp_esp_protocol,
118                                   self.tun_if.local_addr[p.addr_type],
119                                   self.tun_if.remote_addr[p.addr_type],
120                                   flags=p.flags)
121         p.tun_sa_out.add_vpp_config()
122
123         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
124                                  p.auth_algo_vpp_id, p.auth_key,
125                                  p.crypt_algo_vpp_id, p.crypt_key,
126                                  self.vpp_esp_protocol,
127                                  self.tun_if.remote_addr[p.addr_type],
128                                  self.tun_if.local_addr[p.addr_type],
129                                  flags=p.flags)
130         p.tun_sa_in.add_vpp_config()
131
132     def config_protect(self, p):
133         p.tun_protect = VppIpsecTunProtect(self,
134                                            p.tun_if,
135                                            p.tun_sa_out,
136                                            [p.tun_sa_in])
137         p.tun_protect.add_vpp_config()
138
139     def config_network(self, p):
140         if hasattr(p, 'tun_dst'):
141             tun_dst = p.tun_dst
142         else:
143             tun_dst = self.pg0.remote_ip4
144         p.tun_if = VppIpIpTunInterface(self, self.pg0,
145                                        self.pg0.local_ip4,
146                                        tun_dst)
147         p.tun_if.add_vpp_config()
148         p.tun_if.admin_up()
149         p.tun_if.config_ip4()
150         p.tun_if.config_ip6()
151
152         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
153                              [VppRoutePath(p.tun_if.remote_ip4,
154                                            0xffffffff)])
155         p.route.add_vpp_config()
156         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
157                        [VppRoutePath(p.tun_if.remote_ip6,
158                                      0xffffffff,
159                                      proto=DpoProto.DPO_PROTO_IP6)])
160         r.add_vpp_config()
161
162     def unconfig_network(self, p):
163         p.route.remove_vpp_config()
164         p.tun_if.remove_vpp_config()
165
166     def unconfig_protect(self, p):
167         p.tun_protect.remove_vpp_config()
168
169     def unconfig_sa(self, p):
170         p.tun_sa_out.remove_vpp_config()
171         p.tun_sa_in.remove_vpp_config()
172
173
174 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
175                              TemplateIpsec):
176     """ IPsec tunnel interface tests """
177
178     encryption_type = ESP
179
180     @classmethod
181     def setUpClass(cls):
182         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
183
184     @classmethod
185     def tearDownClass(cls):
186         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
187
188     def setUp(self):
189         super(TemplateIpsec4TunIfEsp, self).setUp()
190
191         self.tun_if = self.pg0
192
193         p = self.ipv4_params
194
195         self.config_network(p)
196         self.config_sa_tra(p)
197         self.config_protect(p)
198
199     def tearDown(self):
200         super(TemplateIpsec4TunIfEsp, self).tearDown()
201
202
203 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
204                                 TemplateIpsec):
205     """ IPsec UDP tunnel interface tests """
206
207     tun4_encrypt_node_name = "esp4-encrypt-tun"
208     tun4_decrypt_node_name = "esp4-decrypt-tun"
209     encryption_type = ESP
210
211     @classmethod
212     def setUpClass(cls):
213         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
214
215     @classmethod
216     def tearDownClass(cls):
217         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
218
219     def verify_encrypted(self, p, sa, rxs):
220         for rx in rxs:
221             try:
222                 # ensure the UDP ports are correct before we decrypt
223                 # which strips them
224                 self.assertTrue(rx.haslayer(UDP))
225                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
226                 self.assert_equal(rx[UDP].dport, 4500)
227
228                 pkt = sa.decrypt(rx[IP])
229                 if not pkt.haslayer(IP):
230                     pkt = IP(pkt[Raw].load)
231
232                 self.assert_packet_checksums_valid(pkt)
233                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
234                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
235             except (IndexError, AssertionError):
236                 self.logger.debug(ppp("Unexpected packet:", rx))
237                 try:
238                     self.logger.debug(ppp("Decrypted packet:", pkt))
239                 except:
240                     pass
241                 raise
242
243     def config_sa_tra(self, p):
244         config_tun_params(p, self.encryption_type, p.tun_if)
245
246         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
247                                   p.auth_algo_vpp_id, p.auth_key,
248                                   p.crypt_algo_vpp_id, p.crypt_key,
249                                   self.vpp_esp_protocol,
250                                   flags=p.flags,
251                                   udp_src=p.nat_header.sport,
252                                   udp_dst=p.nat_header.dport)
253         p.tun_sa_out.add_vpp_config()
254
255         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
256                                  p.auth_algo_vpp_id, p.auth_key,
257                                  p.crypt_algo_vpp_id, p.crypt_key,
258                                  self.vpp_esp_protocol,
259                                  flags=p.flags,
260                                  udp_src=p.nat_header.sport,
261                                  udp_dst=p.nat_header.dport)
262         p.tun_sa_in.add_vpp_config()
263
264     def setUp(self):
265         super(TemplateIpsec4TunIfEspUdp, self).setUp()
266
267         p = self.ipv4_params
268         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
269                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
270         p.nat_header = UDP(sport=5454, dport=4500)
271
272         self.tun_if = self.pg0
273
274         self.config_network(p)
275         self.config_sa_tra(p)
276         self.config_protect(p)
277
278     def tearDown(self):
279         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
280
281
282 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
283     """ Ipsec ESP - TUN tests """
284     tun4_encrypt_node_name = "esp4-encrypt-tun"
285     tun4_decrypt_node_name = "esp4-decrypt-tun"
286
287     def test_tun_basic64(self):
288         """ ipsec 6o4 tunnel basic test """
289         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
290
291         self.verify_tun_64(self.params[socket.AF_INET], count=1)
292
293     def test_tun_burst64(self):
294         """ ipsec 6o4 tunnel basic test """
295         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
296
297         self.verify_tun_64(self.params[socket.AF_INET], count=257)
298
299     def test_tun_basic_frag44(self):
300         """ ipsec 4o4 tunnel frag basic test """
301         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
302
303         p = self.ipv4_params
304
305         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
306                                        [1500, 0, 0, 0])
307         self.verify_tun_44(self.params[socket.AF_INET],
308                            count=1, payload_size=1800, n_rx=2)
309         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
310                                        [9000, 0, 0, 0])
311
312
313 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
314     """ Ipsec ESP UDP tests """
315
316     tun4_input_node = "ipsec4-tun-input"
317
318     def setUp(self):
319         super(TestIpsec4TunIfEspUdp, self).setUp()
320
321     def test_keepalive(self):
322         """ IPSEC NAT Keepalive """
323         self.verify_keepalive(self.ipv4_params)
324
325
326 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
327     """ Ipsec ESP UDP GCM tests """
328
329     tun4_input_node = "ipsec4-tun-input"
330
331     def setUp(self):
332         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
333         p = self.ipv4_params
334         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
335                               IPSEC_API_INTEG_ALG_NONE)
336         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
337                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
338         p.crypt_algo = "AES-GCM"
339         p.auth_algo = "NULL"
340         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
341         p.salt = 0
342
343
344 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
345     """ Ipsec ESP - TCP tests """
346     pass
347
348
349 class TemplateIpsec6TunProtect(object):
350     """ IPsec IPv6 Tunnel protect """
351
352     def config_sa_tra(self, p):
353         config_tun_params(p, self.encryption_type, p.tun_if)
354
355         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
356                                   p.auth_algo_vpp_id, p.auth_key,
357                                   p.crypt_algo_vpp_id, p.crypt_key,
358                                   self.vpp_esp_protocol)
359         p.tun_sa_out.add_vpp_config()
360
361         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
362                                  p.auth_algo_vpp_id, p.auth_key,
363                                  p.crypt_algo_vpp_id, p.crypt_key,
364                                  self.vpp_esp_protocol)
365         p.tun_sa_in.add_vpp_config()
366
367     def config_sa_tun(self, p):
368         config_tun_params(p, self.encryption_type, p.tun_if)
369
370         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
371                                   p.auth_algo_vpp_id, p.auth_key,
372                                   p.crypt_algo_vpp_id, p.crypt_key,
373                                   self.vpp_esp_protocol,
374                                   self.tun_if.local_addr[p.addr_type],
375                                   self.tun_if.remote_addr[p.addr_type])
376         p.tun_sa_out.add_vpp_config()
377
378         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
379                                  p.auth_algo_vpp_id, p.auth_key,
380                                  p.crypt_algo_vpp_id, p.crypt_key,
381                                  self.vpp_esp_protocol,
382                                  self.tun_if.remote_addr[p.addr_type],
383                                  self.tun_if.local_addr[p.addr_type])
384         p.tun_sa_in.add_vpp_config()
385
386     def config_protect(self, p):
387         p.tun_protect = VppIpsecTunProtect(self,
388                                            p.tun_if,
389                                            p.tun_sa_out,
390                                            [p.tun_sa_in])
391         p.tun_protect.add_vpp_config()
392
393     def config_network(self, p):
394         if hasattr(p, 'tun_dst'):
395             tun_dst = p.tun_dst
396         else:
397             tun_dst = self.pg0.remote_ip6
398         p.tun_if = VppIpIpTunInterface(self, self.pg0,
399                                        self.pg0.local_ip6,
400                                        tun_dst)
401         p.tun_if.add_vpp_config()
402         p.tun_if.admin_up()
403         p.tun_if.config_ip6()
404         p.tun_if.config_ip4()
405
406         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
407                              [VppRoutePath(p.tun_if.remote_ip6,
408                                            0xffffffff,
409                                            proto=DpoProto.DPO_PROTO_IP6)])
410         p.route.add_vpp_config()
411         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
412                        [VppRoutePath(p.tun_if.remote_ip4,
413                                      0xffffffff)])
414         r.add_vpp_config()
415
416     def unconfig_network(self, p):
417         p.route.remove_vpp_config()
418         p.tun_if.remove_vpp_config()
419
420     def unconfig_protect(self, p):
421         p.tun_protect.remove_vpp_config()
422
423     def unconfig_sa(self, p):
424         p.tun_sa_out.remove_vpp_config()
425         p.tun_sa_in.remove_vpp_config()
426
427
428 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
429                              TemplateIpsec):
430     """ IPsec tunnel interface tests """
431
432     encryption_type = ESP
433
434     def setUp(self):
435         super(TemplateIpsec6TunIfEsp, self).setUp()
436
437         self.tun_if = self.pg0
438
439         p = self.ipv6_params
440         self.config_network(p)
441         self.config_sa_tra(p)
442         self.config_protect(p)
443
444     def tearDown(self):
445         super(TemplateIpsec6TunIfEsp, self).tearDown()
446
447
448 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
449                           IpsecTun6Tests):
450     """ Ipsec ESP - TUN tests """
451     tun6_encrypt_node_name = "esp6-encrypt-tun"
452     tun6_decrypt_node_name = "esp6-decrypt-tun"
453
454     def test_tun_basic46(self):
455         """ ipsec 4o6 tunnel basic test """
456         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
457         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
458
459     def test_tun_burst46(self):
460         """ ipsec 4o6 tunnel burst test """
461         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
462         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
463
464
465 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
466                                 IpsecTun6HandoffTests):
467     """ Ipsec ESP 6 Handoff tests """
468     tun6_encrypt_node_name = "esp6-encrypt-tun"
469     tun6_decrypt_node_name = "esp6-decrypt-tun"
470
471
472 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
473                                 IpsecTun4HandoffTests):
474     """ Ipsec ESP 4 Handoff tests """
475     tun4_encrypt_node_name = "esp4-encrypt-tun"
476     tun4_decrypt_node_name = "esp4-decrypt-tun"
477
478
479 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
480                               TemplateIpsec,
481                               IpsecTun4):
482     """ IPsec IPv4 Multi Tunnel interface """
483
484     encryption_type = ESP
485     tun4_encrypt_node_name = "esp4-encrypt-tun"
486     tun4_decrypt_node_name = "esp4-decrypt-tun"
487
488     def setUp(self):
489         super(TestIpsec4MultiTunIfEsp, self).setUp()
490
491         self.tun_if = self.pg0
492
493         self.multi_params = []
494         self.pg0.generate_remote_hosts(10)
495         self.pg0.configure_ipv4_neighbors()
496
497         for ii in range(10):
498             p = copy.copy(self.ipv4_params)
499
500             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
501             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
502             p.scapy_tun_spi = p.scapy_tun_spi + ii
503             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
504             p.vpp_tun_spi = p.vpp_tun_spi + ii
505
506             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
507             p.scapy_tra_spi = p.scapy_tra_spi + ii
508             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
509             p.vpp_tra_spi = p.vpp_tra_spi + ii
510             p.tun_dst = self.pg0.remote_hosts[ii].ip4
511
512             self.multi_params.append(p)
513             self.config_network(p)
514             self.config_sa_tra(p)
515             self.config_protect(p)
516
517     def tearDown(self):
518         super(TestIpsec4MultiTunIfEsp, self).tearDown()
519
520     def test_tun_44(self):
521         """Multiple IPSEC tunnel interfaces """
522         for p in self.multi_params:
523             self.verify_tun_44(p, count=127)
524             c = p.tun_if.get_rx_stats()
525             self.assertEqual(c['packets'], 127)
526             c = p.tun_if.get_tx_stats()
527             self.assertEqual(c['packets'], 127)
528
529     def test_tun_rr_44(self):
530         """ Round-robin packets acrros multiple interface """
531         tx = []
532         for p in self.multi_params:
533             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
534                                             src=p.remote_tun_if_host,
535                                             dst=self.pg1.remote_ip4)
536         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
537
538         for rx, p in zip(rxs, self.multi_params):
539             self.verify_decrypted(p, [rx])
540
541         tx = []
542         for p in self.multi_params:
543             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
544                                     dst=p.remote_tun_if_host)
545         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
546
547         for rx, p in zip(rxs, self.multi_params):
548             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
549
550
551 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
552                             TemplateIpsec,
553                             IpsecTun4):
554     """ IPsec IPv4 Tunnel interface all Algos """
555
556     encryption_type = ESP
557     tun4_encrypt_node_name = "esp4-encrypt-tun"
558     tun4_decrypt_node_name = "esp4-decrypt-tun"
559
560     def setUp(self):
561         super(TestIpsec4TunIfEspAll, self).setUp()
562
563         self.tun_if = self.pg0
564         p = self.ipv4_params
565
566         self.config_network(p)
567         self.config_sa_tra(p)
568         self.config_protect(p)
569
570     def tearDown(self):
571         p = self.ipv4_params
572         self.unconfig_protect(p)
573         self.unconfig_network(p)
574         self.unconfig_sa(p)
575
576         super(TestIpsec4TunIfEspAll, self).tearDown()
577
578     def rekey(self, p):
579         #
580         # change the key and the SPI
581         #
582         np = copy.copy(p)
583         p.crypt_key = b'X' + p.crypt_key[1:]
584         p.scapy_tun_spi += 1
585         p.scapy_tun_sa_id += 1
586         p.vpp_tun_spi += 1
587         p.vpp_tun_sa_id += 1
588         p.tun_if.local_spi = p.vpp_tun_spi
589         p.tun_if.remote_spi = p.scapy_tun_spi
590
591         config_tun_params(p, self.encryption_type, p.tun_if)
592
593         p.tun_sa_out = VppIpsecSA(self,
594                                   p.scapy_tun_sa_id,
595                                   p.scapy_tun_spi,
596                                   p.auth_algo_vpp_id,
597                                   p.auth_key,
598                                   p.crypt_algo_vpp_id,
599                                   p.crypt_key,
600                                   self.vpp_esp_protocol,
601                                   flags=p.flags,
602                                   salt=p.salt)
603         p.tun_sa_in = VppIpsecSA(self,
604                                  p.vpp_tun_sa_id,
605                                  p.vpp_tun_spi,
606                                  p.auth_algo_vpp_id,
607                                  p.auth_key,
608                                  p.crypt_algo_vpp_id,
609                                  p.crypt_key,
610                                  self.vpp_esp_protocol,
611                                  flags=p.flags,
612                                  salt=p.salt)
613         p.tun_sa_in.add_vpp_config()
614         p.tun_sa_out.add_vpp_config()
615
616         self.config_protect(p)
617         np.tun_sa_out.remove_vpp_config()
618         np.tun_sa_in.remove_vpp_config()
619         self.logger.info(self.vapi.cli("sh ipsec sa"))
620
621     def test_tun_44(self):
622         """IPSEC tunnel all algos """
623
624         # foreach VPP crypto engine
625         engines = ["ia32", "ipsecmb", "openssl"]
626
627         # foreach crypto algorithm
628         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
629                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
630                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
631                                 IPSEC_API_INTEG_ALG_NONE),
632                   'scapy-crypto': "AES-GCM",
633                   'scapy-integ': "NULL",
634                   'key': b"JPjyOWBeVEQiMe7h",
635                   'salt': 3333},
636                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
637                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
638                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
639                                 IPSEC_API_INTEG_ALG_NONE),
640                   'scapy-crypto': "AES-GCM",
641                   'scapy-integ': "NULL",
642                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
643                   'salt': 0},
644                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
645                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
646                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
647                                 IPSEC_API_INTEG_ALG_NONE),
648                   'scapy-crypto': "AES-GCM",
649                   'scapy-integ': "NULL",
650                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
651                   'salt': 9999},
652                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
653                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
654                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
655                                 IPSEC_API_INTEG_ALG_SHA1_96),
656                   'scapy-crypto': "AES-CBC",
657                   'scapy-integ': "HMAC-SHA1-96",
658                   'salt': 0,
659                   'key': b"JPjyOWBeVEQiMe7h"},
660                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
661                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
662                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
663                                 IPSEC_API_INTEG_ALG_SHA_512_256),
664                   'scapy-crypto': "AES-CBC",
665                   'scapy-integ': "SHA2-512-256",
666                   'salt': 0,
667                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
668                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
669                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
670                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
671                                 IPSEC_API_INTEG_ALG_SHA_256_128),
672                   'scapy-crypto': "AES-CBC",
673                   'scapy-integ': "SHA2-256-128",
674                   'salt': 0,
675                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
676                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
677                                  IPSEC_API_CRYPTO_ALG_NONE),
678                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
679                                 IPSEC_API_INTEG_ALG_SHA1_96),
680                   'scapy-crypto': "NULL",
681                   'scapy-integ': "HMAC-SHA1-96",
682                   'salt': 0,
683                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
684
685         for engine in engines:
686             self.vapi.cli("set crypto handler all %s" % engine)
687
688             #
689             # loop through each of the algorithms
690             #
691             for algo in algos:
692                 # with self.subTest(algo=algo['scapy']):
693
694                 p = self.ipv4_params
695                 p.auth_algo_vpp_id = algo['vpp-integ']
696                 p.crypt_algo_vpp_id = algo['vpp-crypto']
697                 p.crypt_algo = algo['scapy-crypto']
698                 p.auth_algo = algo['scapy-integ']
699                 p.crypt_key = algo['key']
700                 p.salt = algo['salt']
701
702                 #
703                 # rekey the tunnel
704                 #
705                 self.rekey(p)
706                 self.verify_tun_44(p, count=127)
707
708
709 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
710                                TemplateIpsec,
711                                IpsecTun4):
712     """ IPsec IPv4 Tunnel interface no Algos """
713
714     encryption_type = ESP
715     tun4_encrypt_node_name = "esp4-encrypt-tun"
716     tun4_decrypt_node_name = "esp4-decrypt-tun"
717
718     def setUp(self):
719         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
720
721         self.tun_if = self.pg0
722         p = self.ipv4_params
723         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
724                               IPSEC_API_INTEG_ALG_NONE)
725         p.auth_algo = 'NULL'
726         p.auth_key = []
727
728         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
729                                IPSEC_API_CRYPTO_ALG_NONE)
730         p.crypt_algo = 'NULL'
731         p.crypt_key = []
732
733     def tearDown(self):
734         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
735
736     def test_tun_44(self):
737         """ IPSec SA with NULL algos """
738         p = self.ipv4_params
739
740         self.config_network(p)
741         self.config_sa_tra(p)
742         self.config_protect(p)
743
744         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
745                            dst=p.remote_tun_if_host)
746         self.send_and_assert_no_replies(self.pg1, tx)
747
748         self.unconfig_protect(p)
749         self.unconfig_sa(p)
750         self.unconfig_network(p)
751
752
753 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
754                               TemplateIpsec,
755                               IpsecTun6):
756     """ IPsec IPv6 Multi Tunnel interface """
757
758     encryption_type = ESP
759     tun6_encrypt_node_name = "esp6-encrypt-tun"
760     tun6_decrypt_node_name = "esp6-decrypt-tun"
761
762     def setUp(self):
763         super(TestIpsec6MultiTunIfEsp, self).setUp()
764
765         self.tun_if = self.pg0
766
767         self.multi_params = []
768         self.pg0.generate_remote_hosts(10)
769         self.pg0.configure_ipv6_neighbors()
770
771         for ii in range(10):
772             p = copy.copy(self.ipv6_params)
773
774             p.remote_tun_if_host = "1111::%d" % (ii + 1)
775             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
776             p.scapy_tun_spi = p.scapy_tun_spi + ii
777             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
778             p.vpp_tun_spi = p.vpp_tun_spi + ii
779
780             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
781             p.scapy_tra_spi = p.scapy_tra_spi + ii
782             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
783             p.vpp_tra_spi = p.vpp_tra_spi + ii
784             p.tun_dst = self.pg0.remote_hosts[ii].ip6
785
786             self.multi_params.append(p)
787             self.config_network(p)
788             self.config_sa_tra(p)
789             self.config_protect(p)
790
791     def tearDown(self):
792         super(TestIpsec6MultiTunIfEsp, self).tearDown()
793
794     def test_tun_66(self):
795         """Multiple IPSEC tunnel interfaces """
796         for p in self.multi_params:
797             self.verify_tun_66(p, count=127)
798             c = p.tun_if.get_rx_stats()
799             self.assertEqual(c['packets'], 127)
800             c = p.tun_if.get_tx_stats()
801             self.assertEqual(c['packets'], 127)
802
803
804 class TestIpsecGreTebIfEsp(TemplateIpsec,
805                            IpsecTun4Tests):
806     """ Ipsec GRE TEB ESP - TUN tests """
807     tun4_encrypt_node_name = "esp4-encrypt-tun"
808     tun4_decrypt_node_name = "esp4-decrypt-tun"
809     encryption_type = ESP
810     omac = "00:11:22:33:44:55"
811
812     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
813                          payload_size=100):
814         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
815                 sa.encrypt(IP(src=self.pg0.remote_ip4,
816                               dst=self.pg0.local_ip4) /
817                            GRE() /
818                            Ether(dst=self.omac) /
819                            IP(src="1.1.1.1", dst="1.1.1.2") /
820                            UDP(sport=1144, dport=2233) /
821                            Raw(b'X' * payload_size))
822                 for i in range(count)]
823
824     def gen_pkts(self, sw_intf, src, dst, count=1,
825                  payload_size=100):
826         return [Ether(dst=self.omac) /
827                 IP(src="1.1.1.1", dst="1.1.1.2") /
828                 UDP(sport=1144, dport=2233) /
829                 Raw(b'X' * payload_size)
830                 for i in range(count)]
831
832     def verify_decrypted(self, p, rxs):
833         for rx in rxs:
834             self.assert_equal(rx[Ether].dst, self.omac)
835             self.assert_equal(rx[IP].dst, "1.1.1.2")
836
837     def verify_encrypted(self, p, sa, rxs):
838         for rx in rxs:
839             try:
840                 pkt = sa.decrypt(rx[IP])
841                 if not pkt.haslayer(IP):
842                     pkt = IP(pkt[Raw].load)
843                 self.assert_packet_checksums_valid(pkt)
844                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
845                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
846                 self.assertTrue(pkt.haslayer(GRE))
847                 e = pkt[Ether]
848                 self.assertEqual(e[Ether].dst, self.omac)
849                 self.assertEqual(e[IP].dst, "1.1.1.2")
850             except (IndexError, AssertionError):
851                 self.logger.debug(ppp("Unexpected packet:", rx))
852                 try:
853                     self.logger.debug(ppp("Decrypted packet:", pkt))
854                 except:
855                     pass
856                 raise
857
858     def setUp(self):
859         super(TestIpsecGreTebIfEsp, self).setUp()
860
861         self.tun_if = self.pg0
862
863         p = self.ipv4_params
864
865         bd1 = VppBridgeDomain(self, 1)
866         bd1.add_vpp_config()
867
868         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
869                                   p.auth_algo_vpp_id, p.auth_key,
870                                   p.crypt_algo_vpp_id, p.crypt_key,
871                                   self.vpp_esp_protocol,
872                                   self.pg0.local_ip4,
873                                   self.pg0.remote_ip4)
874         p.tun_sa_out.add_vpp_config()
875
876         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
877                                  p.auth_algo_vpp_id, p.auth_key,
878                                  p.crypt_algo_vpp_id, p.crypt_key,
879                                  self.vpp_esp_protocol,
880                                  self.pg0.remote_ip4,
881                                  self.pg0.local_ip4)
882         p.tun_sa_in.add_vpp_config()
883
884         p.tun_if = VppGreInterface(self,
885                                    self.pg0.local_ip4,
886                                    self.pg0.remote_ip4,
887                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
888                                          GRE_API_TUNNEL_TYPE_TEB))
889         p.tun_if.add_vpp_config()
890
891         p.tun_protect = VppIpsecTunProtect(self,
892                                            p.tun_if,
893                                            p.tun_sa_out,
894                                            [p.tun_sa_in])
895
896         p.tun_protect.add_vpp_config()
897
898         p.tun_if.admin_up()
899         p.tun_if.config_ip4()
900         config_tun_params(p, self.encryption_type, p.tun_if)
901
902         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
903         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
904
905         self.vapi.cli("clear ipsec sa")
906         self.vapi.cli("sh adj")
907         self.vapi.cli("sh ipsec tun")
908
909     def tearDown(self):
910         p = self.ipv4_params
911         p.tun_if.unconfig_ip4()
912         super(TestIpsecGreTebIfEsp, self).tearDown()
913
914
915 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
916                                IpsecTun4Tests):
917     """ Ipsec GRE TEB ESP - TUN tests """
918     tun4_encrypt_node_name = "esp4-encrypt-tun"
919     tun4_decrypt_node_name = "esp4-decrypt-tun"
920     encryption_type = ESP
921     omac = "00:11:22:33:44:55"
922
923     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
924                          payload_size=100):
925         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
926                 sa.encrypt(IP(src=self.pg0.remote_ip4,
927                               dst=self.pg0.local_ip4) /
928                            GRE() /
929                            Ether(dst=self.omac) /
930                            IP(src="1.1.1.1", dst="1.1.1.2") /
931                            UDP(sport=1144, dport=2233) /
932                            Raw(b'X' * payload_size))
933                 for i in range(count)]
934
935     def gen_pkts(self, sw_intf, src, dst, count=1,
936                  payload_size=100):
937         return [Ether(dst=self.omac) /
938                 Dot1Q(vlan=11) /
939                 IP(src="1.1.1.1", dst="1.1.1.2") /
940                 UDP(sport=1144, dport=2233) /
941                 Raw(b'X' * payload_size)
942                 for i in range(count)]
943
944     def verify_decrypted(self, p, rxs):
945         for rx in rxs:
946             self.assert_equal(rx[Ether].dst, self.omac)
947             self.assert_equal(rx[Dot1Q].vlan, 11)
948             self.assert_equal(rx[IP].dst, "1.1.1.2")
949
950     def verify_encrypted(self, p, sa, rxs):
951         for rx in rxs:
952             try:
953                 pkt = sa.decrypt(rx[IP])
954                 if not pkt.haslayer(IP):
955                     pkt = IP(pkt[Raw].load)
956                 self.assert_packet_checksums_valid(pkt)
957                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
958                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
959                 self.assertTrue(pkt.haslayer(GRE))
960                 e = pkt[Ether]
961                 self.assertEqual(e[Ether].dst, self.omac)
962                 self.assertFalse(e.haslayer(Dot1Q))
963                 self.assertEqual(e[IP].dst, "1.1.1.2")
964             except (IndexError, AssertionError):
965                 self.logger.debug(ppp("Unexpected packet:", rx))
966                 try:
967                     self.logger.debug(ppp("Decrypted packet:", pkt))
968                 except:
969                     pass
970                 raise
971
972     def setUp(self):
973         super(TestIpsecGreTebVlanIfEsp, self).setUp()
974
975         self.tun_if = self.pg0
976
977         p = self.ipv4_params
978
979         bd1 = VppBridgeDomain(self, 1)
980         bd1.add_vpp_config()
981
982         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
983         self.vapi.l2_interface_vlan_tag_rewrite(
984             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
985             push_dot1q=11)
986         self.pg1_11.admin_up()
987
988         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
989                                   p.auth_algo_vpp_id, p.auth_key,
990                                   p.crypt_algo_vpp_id, p.crypt_key,
991                                   self.vpp_esp_protocol,
992                                   self.pg0.local_ip4,
993                                   self.pg0.remote_ip4)
994         p.tun_sa_out.add_vpp_config()
995
996         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
997                                  p.auth_algo_vpp_id, p.auth_key,
998                                  p.crypt_algo_vpp_id, p.crypt_key,
999                                  self.vpp_esp_protocol,
1000                                  self.pg0.remote_ip4,
1001                                  self.pg0.local_ip4)
1002         p.tun_sa_in.add_vpp_config()
1003
1004         p.tun_if = VppGreInterface(self,
1005                                    self.pg0.local_ip4,
1006                                    self.pg0.remote_ip4,
1007                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1008                                          GRE_API_TUNNEL_TYPE_TEB))
1009         p.tun_if.add_vpp_config()
1010
1011         p.tun_protect = VppIpsecTunProtect(self,
1012                                            p.tun_if,
1013                                            p.tun_sa_out,
1014                                            [p.tun_sa_in])
1015
1016         p.tun_protect.add_vpp_config()
1017
1018         p.tun_if.admin_up()
1019         p.tun_if.config_ip4()
1020         config_tun_params(p, self.encryption_type, p.tun_if)
1021
1022         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1023         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1024
1025         self.vapi.cli("clear ipsec sa")
1026
1027     def tearDown(self):
1028         p = self.ipv4_params
1029         p.tun_if.unconfig_ip4()
1030         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1031         self.pg1_11.admin_down()
1032         self.pg1_11.remove_vpp_config()
1033
1034
1035 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1036                               IpsecTun4Tests):
1037     """ Ipsec GRE TEB ESP - Tra tests """
1038     tun4_encrypt_node_name = "esp4-encrypt-tun"
1039     tun4_decrypt_node_name = "esp4-decrypt-tun"
1040     encryption_type = ESP
1041     omac = "00:11:22:33:44:55"
1042
1043     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1044                          payload_size=100):
1045         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1046                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1047                               dst=self.pg0.local_ip4) /
1048                            GRE() /
1049                            Ether(dst=self.omac) /
1050                            IP(src="1.1.1.1", dst="1.1.1.2") /
1051                            UDP(sport=1144, dport=2233) /
1052                            Raw(b'X' * payload_size))
1053                 for i in range(count)]
1054
1055     def gen_pkts(self, sw_intf, src, dst, count=1,
1056                  payload_size=100):
1057         return [Ether(dst=self.omac) /
1058                 IP(src="1.1.1.1", dst="1.1.1.2") /
1059                 UDP(sport=1144, dport=2233) /
1060                 Raw(b'X' * payload_size)
1061                 for i in range(count)]
1062
1063     def verify_decrypted(self, p, rxs):
1064         for rx in rxs:
1065             self.assert_equal(rx[Ether].dst, self.omac)
1066             self.assert_equal(rx[IP].dst, "1.1.1.2")
1067
1068     def verify_encrypted(self, p, sa, rxs):
1069         for rx in rxs:
1070             try:
1071                 pkt = sa.decrypt(rx[IP])
1072                 if not pkt.haslayer(IP):
1073                     pkt = IP(pkt[Raw].load)
1074                 self.assert_packet_checksums_valid(pkt)
1075                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1076                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1077                 self.assertTrue(pkt.haslayer(GRE))
1078                 e = pkt[Ether]
1079                 self.assertEqual(e[Ether].dst, self.omac)
1080                 self.assertEqual(e[IP].dst, "1.1.1.2")
1081             except (IndexError, AssertionError):
1082                 self.logger.debug(ppp("Unexpected packet:", rx))
1083                 try:
1084                     self.logger.debug(ppp("Decrypted packet:", pkt))
1085                 except:
1086                     pass
1087                 raise
1088
1089     def setUp(self):
1090         super(TestIpsecGreTebIfEspTra, self).setUp()
1091
1092         self.tun_if = self.pg0
1093
1094         p = self.ipv4_params
1095
1096         bd1 = VppBridgeDomain(self, 1)
1097         bd1.add_vpp_config()
1098
1099         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1100                                   p.auth_algo_vpp_id, p.auth_key,
1101                                   p.crypt_algo_vpp_id, p.crypt_key,
1102                                   self.vpp_esp_protocol)
1103         p.tun_sa_out.add_vpp_config()
1104
1105         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1106                                  p.auth_algo_vpp_id, p.auth_key,
1107                                  p.crypt_algo_vpp_id, p.crypt_key,
1108                                  self.vpp_esp_protocol)
1109         p.tun_sa_in.add_vpp_config()
1110
1111         p.tun_if = VppGreInterface(self,
1112                                    self.pg0.local_ip4,
1113                                    self.pg0.remote_ip4,
1114                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1115                                          GRE_API_TUNNEL_TYPE_TEB))
1116         p.tun_if.add_vpp_config()
1117
1118         p.tun_protect = VppIpsecTunProtect(self,
1119                                            p.tun_if,
1120                                            p.tun_sa_out,
1121                                            [p.tun_sa_in])
1122
1123         p.tun_protect.add_vpp_config()
1124
1125         p.tun_if.admin_up()
1126         p.tun_if.config_ip4()
1127         config_tra_params(p, self.encryption_type, p.tun_if)
1128
1129         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1130         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1131
1132         self.vapi.cli("clear ipsec sa")
1133
1134     def tearDown(self):
1135         p = self.ipv4_params
1136         p.tun_if.unconfig_ip4()
1137         super(TestIpsecGreTebIfEspTra, self).tearDown()
1138
1139
1140 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1141                                  IpsecTun4Tests):
1142     """ Ipsec GRE TEB UDP ESP - Tra tests """
1143     tun4_encrypt_node_name = "esp4-encrypt-tun"
1144     tun4_decrypt_node_name = "esp4-decrypt-tun"
1145     encryption_type = ESP
1146     omac = "00:11:22:33:44:55"
1147
1148     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1149                          payload_size=100):
1150         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1151                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1152                               dst=self.pg0.local_ip4) /
1153                            GRE() /
1154                            Ether(dst=self.omac) /
1155                            IP(src="1.1.1.1", dst="1.1.1.2") /
1156                            UDP(sport=1144, dport=2233) /
1157                            Raw(b'X' * payload_size))
1158                 for i in range(count)]
1159
1160     def gen_pkts(self, sw_intf, src, dst, count=1,
1161                  payload_size=100):
1162         return [Ether(dst=self.omac) /
1163                 IP(src="1.1.1.1", dst="1.1.1.2") /
1164                 UDP(sport=1144, dport=2233) /
1165                 Raw(b'X' * payload_size)
1166                 for i in range(count)]
1167
1168     def verify_decrypted(self, p, rxs):
1169         for rx in rxs:
1170             self.assert_equal(rx[Ether].dst, self.omac)
1171             self.assert_equal(rx[IP].dst, "1.1.1.2")
1172
1173     def verify_encrypted(self, p, sa, rxs):
1174         for rx in rxs:
1175             self.assertTrue(rx.haslayer(UDP))
1176             self.assertEqual(rx[UDP].dport, 4545)
1177             self.assertEqual(rx[UDP].sport, 5454)
1178             try:
1179                 pkt = sa.decrypt(rx[IP])
1180                 if not pkt.haslayer(IP):
1181                     pkt = IP(pkt[Raw].load)
1182                 self.assert_packet_checksums_valid(pkt)
1183                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1184                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1185                 self.assertTrue(pkt.haslayer(GRE))
1186                 e = pkt[Ether]
1187                 self.assertEqual(e[Ether].dst, self.omac)
1188                 self.assertEqual(e[IP].dst, "1.1.1.2")
1189             except (IndexError, AssertionError):
1190                 self.logger.debug(ppp("Unexpected packet:", rx))
1191                 try:
1192                     self.logger.debug(ppp("Decrypted packet:", pkt))
1193                 except:
1194                     pass
1195                 raise
1196
1197     def setUp(self):
1198         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1199
1200         self.tun_if = self.pg0
1201
1202         p = self.ipv4_params
1203         p = self.ipv4_params
1204         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1205                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1206         p.nat_header = UDP(sport=5454, dport=4545)
1207
1208         bd1 = VppBridgeDomain(self, 1)
1209         bd1.add_vpp_config()
1210
1211         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1212                                   p.auth_algo_vpp_id, p.auth_key,
1213                                   p.crypt_algo_vpp_id, p.crypt_key,
1214                                   self.vpp_esp_protocol,
1215                                   flags=p.flags,
1216                                   udp_src=5454,
1217                                   udp_dst=4545)
1218         p.tun_sa_out.add_vpp_config()
1219
1220         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1221                                  p.auth_algo_vpp_id, p.auth_key,
1222                                  p.crypt_algo_vpp_id, p.crypt_key,
1223                                  self.vpp_esp_protocol,
1224                                  flags=(p.flags |
1225                                         VppEnum.vl_api_ipsec_sad_flags_t.
1226                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1227                                  udp_src=5454,
1228                                  udp_dst=4545)
1229         p.tun_sa_in.add_vpp_config()
1230
1231         p.tun_if = VppGreInterface(self,
1232                                    self.pg0.local_ip4,
1233                                    self.pg0.remote_ip4,
1234                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1235                                          GRE_API_TUNNEL_TYPE_TEB))
1236         p.tun_if.add_vpp_config()
1237
1238         p.tun_protect = VppIpsecTunProtect(self,
1239                                            p.tun_if,
1240                                            p.tun_sa_out,
1241                                            [p.tun_sa_in])
1242
1243         p.tun_protect.add_vpp_config()
1244
1245         p.tun_if.admin_up()
1246         p.tun_if.config_ip4()
1247         config_tra_params(p, self.encryption_type, p.tun_if)
1248
1249         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1250         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1251
1252         self.vapi.cli("clear ipsec sa")
1253         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1254
1255     def tearDown(self):
1256         p = self.ipv4_params
1257         p.tun_if.unconfig_ip4()
1258         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1259
1260
1261 class TestIpsecGreIfEsp(TemplateIpsec,
1262                         IpsecTun4Tests):
1263     """ Ipsec GRE ESP - TUN tests """
1264     tun4_encrypt_node_name = "esp4-encrypt-tun"
1265     tun4_decrypt_node_name = "esp4-decrypt-tun"
1266     encryption_type = ESP
1267
1268     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1269                          payload_size=100):
1270         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1271                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1272                               dst=self.pg0.local_ip4) /
1273                            GRE() /
1274                            IP(src=self.pg1.local_ip4,
1275                               dst=self.pg1.remote_ip4) /
1276                            UDP(sport=1144, dport=2233) /
1277                            Raw(b'X' * payload_size))
1278                 for i in range(count)]
1279
1280     def gen_pkts(self, sw_intf, src, dst, count=1,
1281                  payload_size=100):
1282         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1283                 IP(src="1.1.1.1", dst="1.1.1.2") /
1284                 UDP(sport=1144, dport=2233) /
1285                 Raw(b'X' * payload_size)
1286                 for i in range(count)]
1287
1288     def verify_decrypted(self, p, rxs):
1289         for rx in rxs:
1290             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1291             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1292
1293     def verify_encrypted(self, p, sa, rxs):
1294         for rx in rxs:
1295             try:
1296                 pkt = sa.decrypt(rx[IP])
1297                 if not pkt.haslayer(IP):
1298                     pkt = IP(pkt[Raw].load)
1299                 self.assert_packet_checksums_valid(pkt)
1300                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1301                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1302                 self.assertTrue(pkt.haslayer(GRE))
1303                 e = pkt[GRE]
1304                 self.assertEqual(e[IP].dst, "1.1.1.2")
1305             except (IndexError, AssertionError):
1306                 self.logger.debug(ppp("Unexpected packet:", rx))
1307                 try:
1308                     self.logger.debug(ppp("Decrypted packet:", pkt))
1309                 except:
1310                     pass
1311                 raise
1312
1313     def setUp(self):
1314         super(TestIpsecGreIfEsp, self).setUp()
1315
1316         self.tun_if = self.pg0
1317
1318         p = self.ipv4_params
1319
1320         bd1 = VppBridgeDomain(self, 1)
1321         bd1.add_vpp_config()
1322
1323         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1324                                   p.auth_algo_vpp_id, p.auth_key,
1325                                   p.crypt_algo_vpp_id, p.crypt_key,
1326                                   self.vpp_esp_protocol,
1327                                   self.pg0.local_ip4,
1328                                   self.pg0.remote_ip4)
1329         p.tun_sa_out.add_vpp_config()
1330
1331         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1332                                  p.auth_algo_vpp_id, p.auth_key,
1333                                  p.crypt_algo_vpp_id, p.crypt_key,
1334                                  self.vpp_esp_protocol,
1335                                  self.pg0.remote_ip4,
1336                                  self.pg0.local_ip4)
1337         p.tun_sa_in.add_vpp_config()
1338
1339         p.tun_if = VppGreInterface(self,
1340                                    self.pg0.local_ip4,
1341                                    self.pg0.remote_ip4)
1342         p.tun_if.add_vpp_config()
1343
1344         p.tun_protect = VppIpsecTunProtect(self,
1345                                            p.tun_if,
1346                                            p.tun_sa_out,
1347                                            [p.tun_sa_in])
1348         p.tun_protect.add_vpp_config()
1349
1350         p.tun_if.admin_up()
1351         p.tun_if.config_ip4()
1352         config_tun_params(p, self.encryption_type, p.tun_if)
1353
1354         VppIpRoute(self, "1.1.1.2", 32,
1355                    [VppRoutePath(p.tun_if.remote_ip4,
1356                                  0xffffffff)]).add_vpp_config()
1357
1358     def tearDown(self):
1359         p = self.ipv4_params
1360         p.tun_if.unconfig_ip4()
1361         super(TestIpsecGreIfEsp, self).tearDown()
1362
1363
1364 class TestIpsecGreIfEspTra(TemplateIpsec,
1365                            IpsecTun4Tests):
1366     """ Ipsec GRE ESP - TRA tests """
1367     tun4_encrypt_node_name = "esp4-encrypt-tun"
1368     tun4_decrypt_node_name = "esp4-decrypt-tun"
1369     encryption_type = ESP
1370
1371     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1372                          payload_size=100):
1373         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1374                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1375                               dst=self.pg0.local_ip4) /
1376                            GRE() /
1377                            IP(src=self.pg1.local_ip4,
1378                               dst=self.pg1.remote_ip4) /
1379                            UDP(sport=1144, dport=2233) /
1380                            Raw(b'X' * payload_size))
1381                 for i in range(count)]
1382
1383     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1384                                 payload_size=100):
1385         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1386                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1387                               dst=self.pg0.local_ip4) /
1388                            GRE() /
1389                            UDP(sport=1144, dport=2233) /
1390                            Raw(b'X' * payload_size))
1391                 for i in range(count)]
1392
1393     def gen_pkts(self, sw_intf, src, dst, count=1,
1394                  payload_size=100):
1395         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1396                 IP(src="1.1.1.1", dst="1.1.1.2") /
1397                 UDP(sport=1144, dport=2233) /
1398                 Raw(b'X' * payload_size)
1399                 for i in range(count)]
1400
1401     def verify_decrypted(self, p, rxs):
1402         for rx in rxs:
1403             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1404             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1405
1406     def verify_encrypted(self, p, sa, rxs):
1407         for rx in rxs:
1408             try:
1409                 pkt = sa.decrypt(rx[IP])
1410                 if not pkt.haslayer(IP):
1411                     pkt = IP(pkt[Raw].load)
1412                 self.assert_packet_checksums_valid(pkt)
1413                 self.assertTrue(pkt.haslayer(GRE))
1414                 e = pkt[GRE]
1415                 self.assertEqual(e[IP].dst, "1.1.1.2")
1416             except (IndexError, AssertionError):
1417                 self.logger.debug(ppp("Unexpected packet:", rx))
1418                 try:
1419                     self.logger.debug(ppp("Decrypted packet:", pkt))
1420                 except:
1421                     pass
1422                 raise
1423
1424     def setUp(self):
1425         super(TestIpsecGreIfEspTra, self).setUp()
1426
1427         self.tun_if = self.pg0
1428
1429         p = self.ipv4_params
1430
1431         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1432                                   p.auth_algo_vpp_id, p.auth_key,
1433                                   p.crypt_algo_vpp_id, p.crypt_key,
1434                                   self.vpp_esp_protocol)
1435         p.tun_sa_out.add_vpp_config()
1436
1437         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1438                                  p.auth_algo_vpp_id, p.auth_key,
1439                                  p.crypt_algo_vpp_id, p.crypt_key,
1440                                  self.vpp_esp_protocol)
1441         p.tun_sa_in.add_vpp_config()
1442
1443         p.tun_if = VppGreInterface(self,
1444                                    self.pg0.local_ip4,
1445                                    self.pg0.remote_ip4)
1446         p.tun_if.add_vpp_config()
1447
1448         p.tun_protect = VppIpsecTunProtect(self,
1449                                            p.tun_if,
1450                                            p.tun_sa_out,
1451                                            [p.tun_sa_in])
1452         p.tun_protect.add_vpp_config()
1453
1454         p.tun_if.admin_up()
1455         p.tun_if.config_ip4()
1456         config_tra_params(p, self.encryption_type, p.tun_if)
1457
1458         VppIpRoute(self, "1.1.1.2", 32,
1459                    [VppRoutePath(p.tun_if.remote_ip4,
1460                                  0xffffffff)]).add_vpp_config()
1461
1462     def tearDown(self):
1463         p = self.ipv4_params
1464         p.tun_if.unconfig_ip4()
1465         super(TestIpsecGreIfEspTra, self).tearDown()
1466
1467     def test_gre_non_ip(self):
1468         p = self.ipv4_params
1469         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1470                                           src=p.remote_tun_if_host,
1471                                           dst=self.pg1.remote_ip6)
1472         self.send_and_assert_no_replies(self.tun_if, tx)
1473         node_name = ('/err/%s/unsupported payload' %
1474                      self.tun4_decrypt_node_name)
1475         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1476
1477
1478 class TestIpsecGre6IfEspTra(TemplateIpsec,
1479                             IpsecTun6Tests):
1480     """ Ipsec GRE ESP - TRA tests """
1481     tun6_encrypt_node_name = "esp6-encrypt-tun"
1482     tun6_decrypt_node_name = "esp6-decrypt-tun"
1483     encryption_type = ESP
1484
1485     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1486                           payload_size=100):
1487         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1488                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1489                                 dst=self.pg0.local_ip6) /
1490                            GRE() /
1491                            IPv6(src=self.pg1.local_ip6,
1492                                 dst=self.pg1.remote_ip6) /
1493                            UDP(sport=1144, dport=2233) /
1494                            Raw(b'X' * payload_size))
1495                 for i in range(count)]
1496
1497     def gen_pkts6(self, sw_intf, src, dst, count=1,
1498                   payload_size=100):
1499         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1500                 IPv6(src="1::1", dst="1::2") /
1501                 UDP(sport=1144, dport=2233) /
1502                 Raw(b'X' * payload_size)
1503                 for i in range(count)]
1504
1505     def verify_decrypted6(self, p, rxs):
1506         for rx in rxs:
1507             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1508             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1509
1510     def verify_encrypted6(self, p, sa, rxs):
1511         for rx in rxs:
1512             try:
1513                 pkt = sa.decrypt(rx[IPv6])
1514                 if not pkt.haslayer(IPv6):
1515                     pkt = IPv6(pkt[Raw].load)
1516                 self.assert_packet_checksums_valid(pkt)
1517                 self.assertTrue(pkt.haslayer(GRE))
1518                 e = pkt[GRE]
1519                 self.assertEqual(e[IPv6].dst, "1::2")
1520             except (IndexError, AssertionError):
1521                 self.logger.debug(ppp("Unexpected packet:", rx))
1522                 try:
1523                     self.logger.debug(ppp("Decrypted packet:", pkt))
1524                 except:
1525                     pass
1526                 raise
1527
1528     def setUp(self):
1529         super(TestIpsecGre6IfEspTra, self).setUp()
1530
1531         self.tun_if = self.pg0
1532
1533         p = self.ipv6_params
1534
1535         bd1 = VppBridgeDomain(self, 1)
1536         bd1.add_vpp_config()
1537
1538         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1539                                   p.auth_algo_vpp_id, p.auth_key,
1540                                   p.crypt_algo_vpp_id, p.crypt_key,
1541                                   self.vpp_esp_protocol)
1542         p.tun_sa_out.add_vpp_config()
1543
1544         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1545                                  p.auth_algo_vpp_id, p.auth_key,
1546                                  p.crypt_algo_vpp_id, p.crypt_key,
1547                                  self.vpp_esp_protocol)
1548         p.tun_sa_in.add_vpp_config()
1549
1550         p.tun_if = VppGreInterface(self,
1551                                    self.pg0.local_ip6,
1552                                    self.pg0.remote_ip6)
1553         p.tun_if.add_vpp_config()
1554
1555         p.tun_protect = VppIpsecTunProtect(self,
1556                                            p.tun_if,
1557                                            p.tun_sa_out,
1558                                            [p.tun_sa_in])
1559         p.tun_protect.add_vpp_config()
1560
1561         p.tun_if.admin_up()
1562         p.tun_if.config_ip6()
1563         config_tra_params(p, self.encryption_type, p.tun_if)
1564
1565         r = VppIpRoute(self, "1::2", 128,
1566                        [VppRoutePath(p.tun_if.remote_ip6,
1567                                      0xffffffff,
1568                                      proto=DpoProto.DPO_PROTO_IP6)])
1569         r.add_vpp_config()
1570
1571     def tearDown(self):
1572         p = self.ipv6_params
1573         p.tun_if.unconfig_ip6()
1574         super(TestIpsecGre6IfEspTra, self).tearDown()
1575
1576
1577 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1578     """ Ipsec mGRE ESP v4 TRA tests """
1579     tun4_encrypt_node_name = "esp4-encrypt-tun"
1580     tun4_decrypt_node_name = "esp4-decrypt-tun"
1581     encryption_type = ESP
1582
1583     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1584                          payload_size=100):
1585         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1586                 sa.encrypt(IP(src=p.tun_dst,
1587                               dst=self.pg0.local_ip4) /
1588                            GRE() /
1589                            IP(src=self.pg1.local_ip4,
1590                               dst=self.pg1.remote_ip4) /
1591                            UDP(sport=1144, dport=2233) /
1592                            Raw(b'X' * payload_size))
1593                 for i in range(count)]
1594
1595     def gen_pkts(self, sw_intf, src, dst, count=1,
1596                  payload_size=100):
1597         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1598                 IP(src="1.1.1.1", dst=dst) /
1599                 UDP(sport=1144, dport=2233) /
1600                 Raw(b'X' * payload_size)
1601                 for i in range(count)]
1602
1603     def verify_decrypted(self, p, rxs):
1604         for rx in rxs:
1605             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1606             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1607
1608     def verify_encrypted(self, p, sa, rxs):
1609         for rx in rxs:
1610             try:
1611                 pkt = sa.decrypt(rx[IP])
1612                 if not pkt.haslayer(IP):
1613                     pkt = IP(pkt[Raw].load)
1614                 self.assert_packet_checksums_valid(pkt)
1615                 self.assertTrue(pkt.haslayer(GRE))
1616                 e = pkt[GRE]
1617                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1618             except (IndexError, AssertionError):
1619                 self.logger.debug(ppp("Unexpected packet:", rx))
1620                 try:
1621                     self.logger.debug(ppp("Decrypted packet:", pkt))
1622                 except:
1623                     pass
1624                 raise
1625
1626     def setUp(self):
1627         super(TestIpsecMGreIfEspTra4, self).setUp()
1628
1629         N_NHS = 16
1630         self.tun_if = self.pg0
1631         p = self.ipv4_params
1632         p.tun_if = VppGreInterface(self,
1633                                    self.pg0.local_ip4,
1634                                    "0.0.0.0",
1635                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1636                                          TUNNEL_API_MODE_MP))
1637         p.tun_if.add_vpp_config()
1638         p.tun_if.admin_up()
1639         p.tun_if.config_ip4()
1640         p.tun_if.generate_remote_hosts(N_NHS)
1641         self.pg0.generate_remote_hosts(N_NHS)
1642         self.pg0.configure_ipv4_neighbors()
1643
1644         # setup some SAs for several next-hops on the interface
1645         self.multi_params = []
1646
1647         for ii in range(N_NHS):
1648             p = copy.copy(self.ipv4_params)
1649
1650             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1651             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1652             p.scapy_tun_spi = p.scapy_tun_spi + ii
1653             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1654             p.vpp_tun_spi = p.vpp_tun_spi + ii
1655
1656             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1657             p.scapy_tra_spi = p.scapy_tra_spi + ii
1658             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1659             p.vpp_tra_spi = p.vpp_tra_spi + ii
1660             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1661                                       p.auth_algo_vpp_id, p.auth_key,
1662                                       p.crypt_algo_vpp_id, p.crypt_key,
1663                                       self.vpp_esp_protocol)
1664             p.tun_sa_out.add_vpp_config()
1665
1666             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1667                                      p.auth_algo_vpp_id, p.auth_key,
1668                                      p.crypt_algo_vpp_id, p.crypt_key,
1669                                      self.vpp_esp_protocol)
1670             p.tun_sa_in.add_vpp_config()
1671
1672             p.tun_protect = VppIpsecTunProtect(
1673                 self,
1674                 p.tun_if,
1675                 p.tun_sa_out,
1676                 [p.tun_sa_in],
1677                 nh=p.tun_if.remote_hosts[ii].ip4)
1678             p.tun_protect.add_vpp_config()
1679             config_tra_params(p, self.encryption_type, p.tun_if)
1680             self.multi_params.append(p)
1681
1682             VppIpRoute(self, p.remote_tun_if_host, 32,
1683                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1684                                      p.tun_if.sw_if_index)]).add_vpp_config()
1685
1686             # in this v4 variant add the teibs after the protect
1687             p.teib = VppTeib(self, p.tun_if,
1688                              p.tun_if.remote_hosts[ii].ip4,
1689                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1690             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1691         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1692
1693     def tearDown(self):
1694         p = self.ipv4_params
1695         p.tun_if.unconfig_ip4()
1696         super(TestIpsecMGreIfEspTra4, self).tearDown()
1697
1698     def test_tun_44(self):
1699         """mGRE IPSEC 44"""
1700         N_PKTS = 63
1701         for p in self.multi_params:
1702             self.verify_tun_44(p, count=N_PKTS)
1703             p.teib.remove_vpp_config()
1704             self.verify_tun_dropped_44(p, count=N_PKTS)
1705             p.teib.add_vpp_config()
1706             self.verify_tun_44(p, count=N_PKTS)
1707
1708
1709 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1710     """ Ipsec mGRE ESP v6 TRA tests """
1711     tun6_encrypt_node_name = "esp6-encrypt-tun"
1712     tun6_decrypt_node_name = "esp6-decrypt-tun"
1713     encryption_type = ESP
1714
1715     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1716                           payload_size=100):
1717         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1718                 sa.encrypt(IPv6(src=p.tun_dst,
1719                                 dst=self.pg0.local_ip6) /
1720                            GRE() /
1721                            IPv6(src=self.pg1.local_ip6,
1722                                 dst=self.pg1.remote_ip6) /
1723                            UDP(sport=1144, dport=2233) /
1724                            Raw(b'X' * payload_size))
1725                 for i in range(count)]
1726
1727     def gen_pkts6(self, sw_intf, src, dst, count=1,
1728                   payload_size=100):
1729         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1730                 IPv6(src="1::1", dst=dst) /
1731                 UDP(sport=1144, dport=2233) /
1732                 Raw(b'X' * payload_size)
1733                 for i in range(count)]
1734
1735     def verify_decrypted6(self, p, rxs):
1736         for rx in rxs:
1737             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1738             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1739
1740     def verify_encrypted6(self, p, sa, rxs):
1741         for rx in rxs:
1742             try:
1743                 pkt = sa.decrypt(rx[IPv6])
1744                 if not pkt.haslayer(IPv6):
1745                     pkt = IPv6(pkt[Raw].load)
1746                 self.assert_packet_checksums_valid(pkt)
1747                 self.assertTrue(pkt.haslayer(GRE))
1748                 e = pkt[GRE]
1749                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1750             except (IndexError, AssertionError):
1751                 self.logger.debug(ppp("Unexpected packet:", rx))
1752                 try:
1753                     self.logger.debug(ppp("Decrypted packet:", pkt))
1754                 except:
1755                     pass
1756                 raise
1757
1758     def setUp(self):
1759         super(TestIpsecMGreIfEspTra6, self).setUp()
1760
1761         self.vapi.cli("set logging class ipsec level debug")
1762
1763         N_NHS = 16
1764         self.tun_if = self.pg0
1765         p = self.ipv6_params
1766         p.tun_if = VppGreInterface(self,
1767                                    self.pg0.local_ip6,
1768                                    "::",
1769                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1770                                          TUNNEL_API_MODE_MP))
1771         p.tun_if.add_vpp_config()
1772         p.tun_if.admin_up()
1773         p.tun_if.config_ip6()
1774         p.tun_if.generate_remote_hosts(N_NHS)
1775         self.pg0.generate_remote_hosts(N_NHS)
1776         self.pg0.configure_ipv6_neighbors()
1777
1778         # setup some SAs for several next-hops on the interface
1779         self.multi_params = []
1780
1781         for ii in range(N_NHS):
1782             p = copy.copy(self.ipv6_params)
1783
1784             p.remote_tun_if_host = "1::%d" % (ii + 1)
1785             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1786             p.scapy_tun_spi = p.scapy_tun_spi + ii
1787             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1788             p.vpp_tun_spi = p.vpp_tun_spi + ii
1789
1790             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1791             p.scapy_tra_spi = p.scapy_tra_spi + ii
1792             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1793             p.vpp_tra_spi = p.vpp_tra_spi + ii
1794             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1795                                       p.auth_algo_vpp_id, p.auth_key,
1796                                       p.crypt_algo_vpp_id, p.crypt_key,
1797                                       self.vpp_esp_protocol)
1798             p.tun_sa_out.add_vpp_config()
1799
1800             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1801                                      p.auth_algo_vpp_id, p.auth_key,
1802                                      p.crypt_algo_vpp_id, p.crypt_key,
1803                                      self.vpp_esp_protocol)
1804             p.tun_sa_in.add_vpp_config()
1805
1806             # in this v6 variant add the teibs first then the protection
1807             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1808             VppTeib(self, p.tun_if,
1809                     p.tun_if.remote_hosts[ii].ip6,
1810                     p.tun_dst).add_vpp_config()
1811
1812             p.tun_protect = VppIpsecTunProtect(
1813                 self,
1814                 p.tun_if,
1815                 p.tun_sa_out,
1816                 [p.tun_sa_in],
1817                 nh=p.tun_if.remote_hosts[ii].ip6)
1818             p.tun_protect.add_vpp_config()
1819             config_tra_params(p, self.encryption_type, p.tun_if)
1820             self.multi_params.append(p)
1821
1822             VppIpRoute(self, p.remote_tun_if_host, 128,
1823                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1824                                      p.tun_if.sw_if_index)]).add_vpp_config()
1825             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1826
1827         self.logger.info(self.vapi.cli("sh log"))
1828         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1829         self.logger.info(self.vapi.cli("sh adj 41"))
1830
1831     def tearDown(self):
1832         p = self.ipv6_params
1833         p.tun_if.unconfig_ip6()
1834         super(TestIpsecMGreIfEspTra6, self).tearDown()
1835
1836     def test_tun_66(self):
1837         """mGRE IPSec 66"""
1838         for p in self.multi_params:
1839             self.verify_tun_66(p, count=63)
1840
1841
1842 class TestIpsec4TunProtect(TemplateIpsec,
1843                            TemplateIpsec4TunProtect,
1844                            IpsecTun4):
1845     """ IPsec IPv4 Tunnel protect - transport mode"""
1846
1847     def setUp(self):
1848         super(TestIpsec4TunProtect, self).setUp()
1849
1850         self.tun_if = self.pg0
1851
1852     def tearDown(self):
1853         super(TestIpsec4TunProtect, self).tearDown()
1854
1855     def test_tun_44(self):
1856         """IPSEC tunnel protect"""
1857
1858         p = self.ipv4_params
1859
1860         self.config_network(p)
1861         self.config_sa_tra(p)
1862         self.config_protect(p)
1863
1864         self.verify_tun_44(p, count=127)
1865         c = p.tun_if.get_rx_stats()
1866         self.assertEqual(c['packets'], 127)
1867         c = p.tun_if.get_tx_stats()
1868         self.assertEqual(c['packets'], 127)
1869
1870         self.vapi.cli("clear ipsec sa")
1871         self.verify_tun_64(p, count=127)
1872         c = p.tun_if.get_rx_stats()
1873         self.assertEqual(c['packets'], 254)
1874         c = p.tun_if.get_tx_stats()
1875         self.assertEqual(c['packets'], 254)
1876
1877         # rekey - create new SAs and update the tunnel protection
1878         np = copy.copy(p)
1879         np.crypt_key = b'X' + p.crypt_key[1:]
1880         np.scapy_tun_spi += 100
1881         np.scapy_tun_sa_id += 1
1882         np.vpp_tun_spi += 100
1883         np.vpp_tun_sa_id += 1
1884         np.tun_if.local_spi = p.vpp_tun_spi
1885         np.tun_if.remote_spi = p.scapy_tun_spi
1886
1887         self.config_sa_tra(np)
1888         self.config_protect(np)
1889         self.unconfig_sa(p)
1890
1891         self.verify_tun_44(np, count=127)
1892         c = p.tun_if.get_rx_stats()
1893         self.assertEqual(c['packets'], 381)
1894         c = p.tun_if.get_tx_stats()
1895         self.assertEqual(c['packets'], 381)
1896
1897         # teardown
1898         self.unconfig_protect(np)
1899         self.unconfig_sa(np)
1900         self.unconfig_network(p)
1901
1902
1903 class TestIpsec4TunProtectUdp(TemplateIpsec,
1904                               TemplateIpsec4TunProtect,
1905                               IpsecTun4):
1906     """ IPsec IPv4 Tunnel protect - transport mode"""
1907
1908     def setUp(self):
1909         super(TestIpsec4TunProtectUdp, self).setUp()
1910
1911         self.tun_if = self.pg0
1912
1913         p = self.ipv4_params
1914         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1915                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1916         p.nat_header = UDP(sport=4500, dport=4500)
1917         self.config_network(p)
1918         self.config_sa_tra(p)
1919         self.config_protect(p)
1920
1921     def tearDown(self):
1922         p = self.ipv4_params
1923         self.unconfig_protect(p)
1924         self.unconfig_sa(p)
1925         self.unconfig_network(p)
1926         super(TestIpsec4TunProtectUdp, self).tearDown()
1927
1928     def verify_encrypted(self, p, sa, rxs):
1929         # ensure encrypted packets are recieved with the default UDP ports
1930         for rx in rxs:
1931             self.assertEqual(rx[UDP].sport, 4500)
1932             self.assertEqual(rx[UDP].dport, 4500)
1933         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1934
1935     def test_tun_44(self):
1936         """IPSEC UDP tunnel protect"""
1937
1938         p = self.ipv4_params
1939
1940         self.verify_tun_44(p, count=127)
1941         c = p.tun_if.get_rx_stats()
1942         self.assertEqual(c['packets'], 127)
1943         c = p.tun_if.get_tx_stats()
1944         self.assertEqual(c['packets'], 127)
1945
1946     def test_keepalive(self):
1947         """ IPSEC NAT Keepalive """
1948         self.verify_keepalive(self.ipv4_params)
1949
1950
1951 class TestIpsec4TunProtectTun(TemplateIpsec,
1952                               TemplateIpsec4TunProtect,
1953                               IpsecTun4):
1954     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1955
1956     encryption_type = ESP
1957     tun4_encrypt_node_name = "esp4-encrypt-tun"
1958     tun4_decrypt_node_name = "esp4-decrypt-tun"
1959
1960     def setUp(self):
1961         super(TestIpsec4TunProtectTun, self).setUp()
1962
1963         self.tun_if = self.pg0
1964
1965     def tearDown(self):
1966         super(TestIpsec4TunProtectTun, self).tearDown()
1967
1968     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1969                          payload_size=100):
1970         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1971                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1972                               dst=sw_intf.local_ip4) /
1973                            IP(src=src, dst=dst) /
1974                            UDP(sport=1144, dport=2233) /
1975                            Raw(b'X' * payload_size))
1976                 for i in range(count)]
1977
1978     def gen_pkts(self, sw_intf, src, dst, count=1,
1979                  payload_size=100):
1980         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1981                 IP(src=src, dst=dst) /
1982                 UDP(sport=1144, dport=2233) /
1983                 Raw(b'X' * payload_size)
1984                 for i in range(count)]
1985
1986     def verify_decrypted(self, p, rxs):
1987         for rx in rxs:
1988             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1989             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1990             self.assert_packet_checksums_valid(rx)
1991
1992     def verify_encrypted(self, p, sa, rxs):
1993         for rx in rxs:
1994             try:
1995                 pkt = sa.decrypt(rx[IP])
1996                 if not pkt.haslayer(IP):
1997                     pkt = IP(pkt[Raw].load)
1998                 self.assert_packet_checksums_valid(pkt)
1999                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2000                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2001                 inner = pkt[IP].payload
2002                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2003
2004             except (IndexError, AssertionError):
2005                 self.logger.debug(ppp("Unexpected packet:", rx))
2006                 try:
2007                     self.logger.debug(ppp("Decrypted packet:", pkt))
2008                 except:
2009                     pass
2010                 raise
2011
2012     def test_tun_44(self):
2013         """IPSEC tunnel protect """
2014
2015         p = self.ipv4_params
2016
2017         self.config_network(p)
2018         self.config_sa_tun(p)
2019         self.config_protect(p)
2020
2021         # also add an output features on the tunnel and physical interface
2022         # so we test they still work
2023         r_all = AclRule(True,
2024                         src_prefix="0.0.0.0/0",
2025                         dst_prefix="0.0.0.0/0",
2026                         proto=0)
2027         a = VppAcl(self, [r_all]).add_vpp_config()
2028
2029         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2030         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2031
2032         self.verify_tun_44(p, count=127)
2033
2034         c = p.tun_if.get_rx_stats()
2035         self.assertEqual(c['packets'], 127)
2036         c = p.tun_if.get_tx_stats()
2037         self.assertEqual(c['packets'], 127)
2038
2039         # rekey - create new SAs and update the tunnel protection
2040         np = copy.copy(p)
2041         np.crypt_key = b'X' + p.crypt_key[1:]
2042         np.scapy_tun_spi += 100
2043         np.scapy_tun_sa_id += 1
2044         np.vpp_tun_spi += 100
2045         np.vpp_tun_sa_id += 1
2046         np.tun_if.local_spi = p.vpp_tun_spi
2047         np.tun_if.remote_spi = p.scapy_tun_spi
2048
2049         self.config_sa_tun(np)
2050         self.config_protect(np)
2051         self.unconfig_sa(p)
2052
2053         self.verify_tun_44(np, count=127)
2054         c = p.tun_if.get_rx_stats()
2055         self.assertEqual(c['packets'], 254)
2056         c = p.tun_if.get_tx_stats()
2057         self.assertEqual(c['packets'], 254)
2058
2059         # teardown
2060         self.unconfig_protect(np)
2061         self.unconfig_sa(np)
2062         self.unconfig_network(p)
2063
2064
2065 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2066                                   TemplateIpsec4TunProtect,
2067                                   IpsecTun4):
2068     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2069
2070     encryption_type = ESP
2071     tun4_encrypt_node_name = "esp4-encrypt-tun"
2072     tun4_decrypt_node_name = "esp4-decrypt-tun"
2073
2074     def setUp(self):
2075         super(TestIpsec4TunProtectTunDrop, self).setUp()
2076
2077         self.tun_if = self.pg0
2078
2079     def tearDown(self):
2080         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2081
2082     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2083                          payload_size=100):
2084         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2085                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2086                               dst="5.5.5.5") /
2087                            IP(src=src, dst=dst) /
2088                            UDP(sport=1144, dport=2233) /
2089                            Raw(b'X' * payload_size))
2090                 for i in range(count)]
2091
2092     def test_tun_drop_44(self):
2093         """IPSEC tunnel protect bogus tunnel header """
2094
2095         p = self.ipv4_params
2096
2097         self.config_network(p)
2098         self.config_sa_tun(p)
2099         self.config_protect(p)
2100
2101         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2102                                    src=p.remote_tun_if_host,
2103                                    dst=self.pg1.remote_ip4,
2104                                    count=63)
2105         self.send_and_assert_no_replies(self.tun_if, tx)
2106
2107         # teardown
2108         self.unconfig_protect(p)
2109         self.unconfig_sa(p)
2110         self.unconfig_network(p)
2111
2112
2113 class TestIpsec6TunProtect(TemplateIpsec,
2114                            TemplateIpsec6TunProtect,
2115                            IpsecTun6):
2116     """ IPsec IPv6 Tunnel protect - transport mode"""
2117
2118     encryption_type = ESP
2119     tun6_encrypt_node_name = "esp6-encrypt-tun"
2120     tun6_decrypt_node_name = "esp6-decrypt-tun"
2121
2122     def setUp(self):
2123         super(TestIpsec6TunProtect, self).setUp()
2124
2125         self.tun_if = self.pg0
2126
2127     def tearDown(self):
2128         super(TestIpsec6TunProtect, self).tearDown()
2129
2130     def test_tun_66(self):
2131         """IPSEC tunnel protect 6o6"""
2132
2133         p = self.ipv6_params
2134
2135         self.config_network(p)
2136         self.config_sa_tra(p)
2137         self.config_protect(p)
2138
2139         self.verify_tun_66(p, count=127)
2140         c = p.tun_if.get_rx_stats()
2141         self.assertEqual(c['packets'], 127)
2142         c = p.tun_if.get_tx_stats()
2143         self.assertEqual(c['packets'], 127)
2144
2145         # rekey - create new SAs and update the tunnel protection
2146         np = copy.copy(p)
2147         np.crypt_key = b'X' + p.crypt_key[1:]
2148         np.scapy_tun_spi += 100
2149         np.scapy_tun_sa_id += 1
2150         np.vpp_tun_spi += 100
2151         np.vpp_tun_sa_id += 1
2152         np.tun_if.local_spi = p.vpp_tun_spi
2153         np.tun_if.remote_spi = p.scapy_tun_spi
2154
2155         self.config_sa_tra(np)
2156         self.config_protect(np)
2157         self.unconfig_sa(p)
2158
2159         self.verify_tun_66(np, count=127)
2160         c = p.tun_if.get_rx_stats()
2161         self.assertEqual(c['packets'], 254)
2162         c = p.tun_if.get_tx_stats()
2163         self.assertEqual(c['packets'], 254)
2164
2165         # bounce the interface state
2166         p.tun_if.admin_down()
2167         self.verify_drop_tun_66(np, count=127)
2168         node = ('/err/ipsec6-tun-input/%s' %
2169                 'ipsec packets received on disabled interface')
2170         self.assertEqual(127, self.statistics.get_err_counter(node))
2171         p.tun_if.admin_up()
2172         self.verify_tun_66(np, count=127)
2173
2174         # 3 phase rekey
2175         #  1) add two input SAs [old, new]
2176         #  2) swap output SA to [new]
2177         #  3) use only [new] input SA
2178         np3 = copy.copy(np)
2179         np3.crypt_key = b'Z' + p.crypt_key[1:]
2180         np3.scapy_tun_spi += 100
2181         np3.scapy_tun_sa_id += 1
2182         np3.vpp_tun_spi += 100
2183         np3.vpp_tun_sa_id += 1
2184         np3.tun_if.local_spi = p.vpp_tun_spi
2185         np3.tun_if.remote_spi = p.scapy_tun_spi
2186
2187         self.config_sa_tra(np3)
2188
2189         # step 1;
2190         p.tun_protect.update_vpp_config(np.tun_sa_out,
2191                                         [np.tun_sa_in, np3.tun_sa_in])
2192         self.verify_tun_66(np, np, count=127)
2193         self.verify_tun_66(np3, np, count=127)
2194
2195         # step 2;
2196         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2197                                         [np.tun_sa_in, np3.tun_sa_in])
2198         self.verify_tun_66(np, np3, count=127)
2199         self.verify_tun_66(np3, np3, count=127)
2200
2201         # step 1;
2202         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2203                                         [np3.tun_sa_in])
2204         self.verify_tun_66(np3, np3, count=127)
2205         self.verify_drop_tun_66(np, count=127)
2206
2207         c = p.tun_if.get_rx_stats()
2208         self.assertEqual(c['packets'], 127*9)
2209         c = p.tun_if.get_tx_stats()
2210         self.assertEqual(c['packets'], 127*8)
2211         self.unconfig_sa(np)
2212
2213         # teardown
2214         self.unconfig_protect(np3)
2215         self.unconfig_sa(np3)
2216         self.unconfig_network(p)
2217
2218     def test_tun_46(self):
2219         """IPSEC tunnel protect 4o6"""
2220
2221         p = self.ipv6_params
2222
2223         self.config_network(p)
2224         self.config_sa_tra(p)
2225         self.config_protect(p)
2226
2227         self.verify_tun_46(p, count=127)
2228         c = p.tun_if.get_rx_stats()
2229         self.assertEqual(c['packets'], 127)
2230         c = p.tun_if.get_tx_stats()
2231         self.assertEqual(c['packets'], 127)
2232
2233         # teardown
2234         self.unconfig_protect(p)
2235         self.unconfig_sa(p)
2236         self.unconfig_network(p)
2237
2238
2239 class TestIpsec6TunProtectTun(TemplateIpsec,
2240                               TemplateIpsec6TunProtect,
2241                               IpsecTun6):
2242     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2243
2244     encryption_type = ESP
2245     tun6_encrypt_node_name = "esp6-encrypt-tun"
2246     tun6_decrypt_node_name = "esp6-decrypt-tun"
2247
2248     def setUp(self):
2249         super(TestIpsec6TunProtectTun, self).setUp()
2250
2251         self.tun_if = self.pg0
2252
2253     def tearDown(self):
2254         super(TestIpsec6TunProtectTun, self).tearDown()
2255
2256     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2257                           payload_size=100):
2258         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2259                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2260                                 dst=sw_intf.local_ip6) /
2261                            IPv6(src=src, dst=dst) /
2262                            UDP(sport=1166, dport=2233) /
2263                            Raw(b'X' * payload_size))
2264                 for i in range(count)]
2265
2266     def gen_pkts6(self, sw_intf, src, dst, count=1,
2267                   payload_size=100):
2268         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2269                 IPv6(src=src, dst=dst) /
2270                 UDP(sport=1166, dport=2233) /
2271                 Raw(b'X' * payload_size)
2272                 for i in range(count)]
2273
2274     def verify_decrypted6(self, p, rxs):
2275         for rx in rxs:
2276             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2277             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2278             self.assert_packet_checksums_valid(rx)
2279
2280     def verify_encrypted6(self, p, sa, rxs):
2281         for rx in rxs:
2282             try:
2283                 pkt = sa.decrypt(rx[IPv6])
2284                 if not pkt.haslayer(IPv6):
2285                     pkt = IPv6(pkt[Raw].load)
2286                 self.assert_packet_checksums_valid(pkt)
2287                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2288                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2289                 inner = pkt[IPv6].payload
2290                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2291
2292             except (IndexError, AssertionError):
2293                 self.logger.debug(ppp("Unexpected packet:", rx))
2294                 try:
2295                     self.logger.debug(ppp("Decrypted packet:", pkt))
2296                 except:
2297                     pass
2298                 raise
2299
2300     def test_tun_66(self):
2301         """IPSEC tunnel protect """
2302
2303         p = self.ipv6_params
2304
2305         self.config_network(p)
2306         self.config_sa_tun(p)
2307         self.config_protect(p)
2308
2309         self.verify_tun_66(p, count=127)
2310
2311         c = p.tun_if.get_rx_stats()
2312         self.assertEqual(c['packets'], 127)
2313         c = p.tun_if.get_tx_stats()
2314         self.assertEqual(c['packets'], 127)
2315
2316         # rekey - create new SAs and update the tunnel protection
2317         np = copy.copy(p)
2318         np.crypt_key = b'X' + p.crypt_key[1:]
2319         np.scapy_tun_spi += 100
2320         np.scapy_tun_sa_id += 1
2321         np.vpp_tun_spi += 100
2322         np.vpp_tun_sa_id += 1
2323         np.tun_if.local_spi = p.vpp_tun_spi
2324         np.tun_if.remote_spi = p.scapy_tun_spi
2325
2326         self.config_sa_tun(np)
2327         self.config_protect(np)
2328         self.unconfig_sa(p)
2329
2330         self.verify_tun_66(np, count=127)
2331         c = p.tun_if.get_rx_stats()
2332         self.assertEqual(c['packets'], 254)
2333         c = p.tun_if.get_tx_stats()
2334         self.assertEqual(c['packets'], 254)
2335
2336         # teardown
2337         self.unconfig_protect(np)
2338         self.unconfig_sa(np)
2339         self.unconfig_network(p)
2340
2341
2342 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2343                                   TemplateIpsec6TunProtect,
2344                                   IpsecTun6):
2345     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2346
2347     encryption_type = ESP
2348     tun6_encrypt_node_name = "esp6-encrypt-tun"
2349     tun6_decrypt_node_name = "esp6-decrypt-tun"
2350
2351     def setUp(self):
2352         super(TestIpsec6TunProtectTunDrop, self).setUp()
2353
2354         self.tun_if = self.pg0
2355
2356     def tearDown(self):
2357         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2358
2359     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2360                           payload_size=100):
2361         # the IP destination of the revelaed packet does not match
2362         # that assigned to the tunnel
2363         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2364                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2365                                 dst="5::5") /
2366                            IPv6(src=src, dst=dst) /
2367                            UDP(sport=1144, dport=2233) /
2368                            Raw(b'X' * payload_size))
2369                 for i in range(count)]
2370
2371     def test_tun_drop_66(self):
2372         """IPSEC 6 tunnel protect bogus tunnel header """
2373
2374         p = self.ipv6_params
2375
2376         self.config_network(p)
2377         self.config_sa_tun(p)
2378         self.config_protect(p)
2379
2380         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2381                                     src=p.remote_tun_if_host,
2382                                     dst=self.pg1.remote_ip6,
2383                                     count=63)
2384         self.send_and_assert_no_replies(self.tun_if, tx)
2385
2386         self.unconfig_protect(p)
2387         self.unconfig_sa(p)
2388         self.unconfig_network(p)
2389
2390
2391 class TemplateIpsecItf4(object):
2392     """ IPsec Interface IPv4 """
2393
2394     encryption_type = ESP
2395     tun4_encrypt_node_name = "esp4-encrypt-tun"
2396     tun4_decrypt_node_name = "esp4-decrypt-tun"
2397     tun4_input_node = "ipsec4-tun-input"
2398
2399     def config_sa_tun(self, p, src, dst):
2400         config_tun_params(p, self.encryption_type, None, src, dst)
2401
2402         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2403                                   p.auth_algo_vpp_id, p.auth_key,
2404                                   p.crypt_algo_vpp_id, p.crypt_key,
2405                                   self.vpp_esp_protocol,
2406                                   src, dst,
2407                                   flags=p.flags)
2408         p.tun_sa_out.add_vpp_config()
2409
2410         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2411                                  p.auth_algo_vpp_id, p.auth_key,
2412                                  p.crypt_algo_vpp_id, p.crypt_key,
2413                                  self.vpp_esp_protocol,
2414                                  dst, src,
2415                                  flags=p.flags)
2416         p.tun_sa_in.add_vpp_config()
2417
2418     def config_protect(self, p):
2419         p.tun_protect = VppIpsecTunProtect(self,
2420                                            p.tun_if,
2421                                            p.tun_sa_out,
2422                                            [p.tun_sa_in])
2423         p.tun_protect.add_vpp_config()
2424
2425     def config_network(self, p, instance=0xffffffff):
2426         p.tun_if = VppIpsecInterface(self, instance=instance)
2427
2428         p.tun_if.add_vpp_config()
2429         p.tun_if.admin_up()
2430         p.tun_if.config_ip4()
2431         p.tun_if.config_ip6()
2432
2433         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2434                              [VppRoutePath(p.tun_if.remote_ip4,
2435                                            0xffffffff)])
2436         p.route.add_vpp_config()
2437         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2438                        [VppRoutePath(p.tun_if.remote_ip6,
2439                                      0xffffffff,
2440                                      proto=DpoProto.DPO_PROTO_IP6)])
2441         r.add_vpp_config()
2442
2443     def unconfig_network(self, p):
2444         p.route.remove_vpp_config()
2445         p.tun_if.remove_vpp_config()
2446
2447     def unconfig_protect(self, p):
2448         p.tun_protect.remove_vpp_config()
2449
2450     def unconfig_sa(self, p):
2451         p.tun_sa_out.remove_vpp_config()
2452         p.tun_sa_in.remove_vpp_config()
2453
2454
2455 class TestIpsecItf4(TemplateIpsec,
2456                     TemplateIpsecItf4,
2457                     IpsecTun4):
2458     """ IPsec Interface IPv4 """
2459
2460     def setUp(self):
2461         super(TestIpsecItf4, self).setUp()
2462
2463         self.tun_if = self.pg0
2464
2465     def tearDown(self):
2466         super(TestIpsecItf4, self).tearDown()
2467
2468     def test_tun_instance_44(self):
2469         p = self.ipv4_params
2470         self.config_network(p, instance=3)
2471
2472         with self.assertRaises(CliFailedCommandError):
2473             self.vapi.cli("show interface ipsec0")
2474
2475         output = self.vapi.cli("show interface ipsec3")
2476         self.assertTrue("unknown" not in output)
2477
2478         self.unconfig_network(p)
2479
2480     def test_tun_44(self):
2481         """IPSEC interface IPv4"""
2482
2483         n_pkts = 127
2484         p = self.ipv4_params
2485
2486         self.config_network(p)
2487         self.config_sa_tun(p,
2488                            self.pg0.local_ip4,
2489                            self.pg0.remote_ip4)
2490         self.config_protect(p)
2491
2492         self.verify_tun_44(p, count=n_pkts)
2493         c = p.tun_if.get_rx_stats()
2494         self.assertEqual(c['packets'], n_pkts)
2495         c = p.tun_if.get_tx_stats()
2496         self.assertEqual(c['packets'], n_pkts)
2497
2498         p.tun_if.admin_down()
2499         self.verify_tun_dropped_44(p, count=n_pkts)
2500         p.tun_if.admin_up()
2501         self.verify_tun_44(p, count=n_pkts)
2502
2503         c = p.tun_if.get_rx_stats()
2504         self.assertEqual(c['packets'], 3*n_pkts)
2505         c = p.tun_if.get_tx_stats()
2506         self.assertEqual(c['packets'], 2*n_pkts)
2507
2508         # it's a v6 packet when its encrypted
2509         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2510
2511         self.verify_tun_64(p, count=n_pkts)
2512         c = p.tun_if.get_rx_stats()
2513         self.assertEqual(c['packets'], 4*n_pkts)
2514         c = p.tun_if.get_tx_stats()
2515         self.assertEqual(c['packets'], 3*n_pkts)
2516
2517         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2518
2519         self.vapi.cli("clear interfaces")
2520
2521         # rekey - create new SAs and update the tunnel protection
2522         np = copy.copy(p)
2523         np.crypt_key = b'X' + p.crypt_key[1:]
2524         np.scapy_tun_spi += 100
2525         np.scapy_tun_sa_id += 1
2526         np.vpp_tun_spi += 100
2527         np.vpp_tun_sa_id += 1
2528         np.tun_if.local_spi = p.vpp_tun_spi
2529         np.tun_if.remote_spi = p.scapy_tun_spi
2530
2531         self.config_sa_tun(np,
2532                            self.pg0.local_ip4,
2533                            self.pg0.remote_ip4)
2534         self.config_protect(np)
2535         self.unconfig_sa(p)
2536
2537         self.verify_tun_44(np, count=n_pkts)
2538         c = p.tun_if.get_rx_stats()
2539         self.assertEqual(c['packets'], n_pkts)
2540         c = p.tun_if.get_tx_stats()
2541         self.assertEqual(c['packets'], n_pkts)
2542
2543         # teardown
2544         self.unconfig_protect(np)
2545         self.unconfig_sa(np)
2546         self.unconfig_network(p)
2547
2548     def test_tun_44_null(self):
2549         """IPSEC interface IPv4 NULL auth/crypto"""
2550
2551         n_pkts = 127
2552         p = copy.copy(self.ipv4_params)
2553
2554         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2555                               IPSEC_API_INTEG_ALG_NONE)
2556         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2557                                IPSEC_API_CRYPTO_ALG_NONE)
2558         p.crypt_algo = "NULL"
2559         p.auth_algo = "NULL"
2560
2561         self.config_network(p)
2562         self.config_sa_tun(p,
2563                            self.pg0.local_ip4,
2564                            self.pg0.remote_ip4)
2565         self.config_protect(p)
2566
2567         self.verify_tun_44(p, count=n_pkts)
2568
2569         # teardown
2570         self.unconfig_protect(p)
2571         self.unconfig_sa(p)
2572         self.unconfig_network(p)
2573
2574
2575 class TestIpsecItf4MPLS(TemplateIpsec,
2576                         TemplateIpsecItf4,
2577                         IpsecTun4):
2578     """ IPsec Interface MPLSoIPv4 """
2579
2580     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2581
2582     def setUp(self):
2583         super(TestIpsecItf4MPLS, self).setUp()
2584
2585         self.tun_if = self.pg0
2586
2587     def tearDown(self):
2588         super(TestIpsecItf4MPLS, self).tearDown()
2589
2590     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2591                          payload_size=100):
2592         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2593                 sa.encrypt(MPLS(label=44, ttl=3) /
2594                            IP(src=src, dst=dst) /
2595                            UDP(sport=1166, dport=2233) /
2596                            Raw(b'X' * payload_size))
2597                 for i in range(count)]
2598
2599     def verify_encrypted(self, p, sa, rxs):
2600         for rx in rxs:
2601             try:
2602                 pkt = sa.decrypt(rx[IP])
2603                 if not pkt.haslayer(IP):
2604                     pkt = IP(pkt[Raw].load)
2605                 self.assert_packet_checksums_valid(pkt)
2606                 self.assert_equal(pkt[MPLS].label, 44)
2607                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2608             except (IndexError, AssertionError):
2609                 self.logger.debug(ppp("Unexpected packet:", rx))
2610                 try:
2611                     self.logger.debug(ppp("Decrypted packet:", pkt))
2612                 except:
2613                     pass
2614                 raise
2615
2616     def test_tun_mpls_o_ip4(self):
2617         """IPSEC interface MPLS over IPv4"""
2618
2619         n_pkts = 127
2620         p = self.ipv4_params
2621         f = FibPathProto
2622
2623         tbl = VppMplsTable(self, 0)
2624         tbl.add_vpp_config()
2625
2626         self.config_network(p)
2627         # deag MPLS routes from the tunnel
2628         r4 = VppMplsRoute(self, 44, 1,
2629                           [VppRoutePath(
2630                               self.pg1.remote_ip4,
2631                               self.pg1.sw_if_index)]).add_vpp_config()
2632         p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2633                                      p.tun_if.sw_if_index,
2634                                      labels=[VppMplsLabel(44)])])
2635         p.tun_if.enable_mpls()
2636
2637         self.config_sa_tun(p,
2638                            self.pg0.local_ip4,
2639                            self.pg0.remote_ip4)
2640         self.config_protect(p)
2641
2642         self.verify_tun_44(p, count=n_pkts)
2643
2644         # cleanup
2645         p.tun_if.disable_mpls()
2646         self.unconfig_protect(p)
2647         self.unconfig_sa(p)
2648         self.unconfig_network(p)
2649
2650
2651 class TemplateIpsecItf6(object):
2652     """ IPsec Interface IPv6 """
2653
2654     encryption_type = ESP
2655     tun6_encrypt_node_name = "esp6-encrypt-tun"
2656     tun6_decrypt_node_name = "esp6-decrypt-tun"
2657     tun6_input_node = "ipsec6-tun-input"
2658
2659     def config_sa_tun(self, p, src, dst):
2660         config_tun_params(p, self.encryption_type, None, src, dst)
2661
2662         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2663                                   p.auth_algo_vpp_id, p.auth_key,
2664                                   p.crypt_algo_vpp_id, p.crypt_key,
2665                                   self.vpp_esp_protocol,
2666                                   src, dst,
2667                                   flags=p.flags)
2668         p.tun_sa_out.add_vpp_config()
2669
2670         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2671                                  p.auth_algo_vpp_id, p.auth_key,
2672                                  p.crypt_algo_vpp_id, p.crypt_key,
2673                                  self.vpp_esp_protocol,
2674                                  dst, src,
2675                                  flags=p.flags)
2676         p.tun_sa_in.add_vpp_config()
2677
2678     def config_protect(self, p):
2679         p.tun_protect = VppIpsecTunProtect(self,
2680                                            p.tun_if,
2681                                            p.tun_sa_out,
2682                                            [p.tun_sa_in])
2683         p.tun_protect.add_vpp_config()
2684
2685     def config_network(self, p):
2686         p.tun_if = VppIpsecInterface(self)
2687
2688         p.tun_if.add_vpp_config()
2689         p.tun_if.admin_up()
2690         p.tun_if.config_ip4()
2691         p.tun_if.config_ip6()
2692
2693         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2694                        [VppRoutePath(p.tun_if.remote_ip4,
2695                                      0xffffffff)])
2696         r.add_vpp_config()
2697
2698         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2699                              [VppRoutePath(p.tun_if.remote_ip6,
2700                                            0xffffffff,
2701                                            proto=DpoProto.DPO_PROTO_IP6)])
2702         p.route.add_vpp_config()
2703
2704     def unconfig_network(self, p):
2705         p.route.remove_vpp_config()
2706         p.tun_if.remove_vpp_config()
2707
2708     def unconfig_protect(self, p):
2709         p.tun_protect.remove_vpp_config()
2710
2711     def unconfig_sa(self, p):
2712         p.tun_sa_out.remove_vpp_config()
2713         p.tun_sa_in.remove_vpp_config()
2714
2715
2716 class TestIpsecItf6(TemplateIpsec,
2717                     TemplateIpsecItf6,
2718                     IpsecTun6):
2719     """ IPsec Interface IPv6 """
2720
2721     def setUp(self):
2722         super(TestIpsecItf6, self).setUp()
2723
2724         self.tun_if = self.pg0
2725
2726     def tearDown(self):
2727         super(TestIpsecItf6, self).tearDown()
2728
2729     def test_tun_44(self):
2730         """IPSEC interface IPv6"""
2731
2732         n_pkts = 127
2733         p = self.ipv6_params
2734
2735         self.config_network(p)
2736         self.config_sa_tun(p,
2737                            self.pg0.local_ip6,
2738                            self.pg0.remote_ip6)
2739         self.config_protect(p)
2740
2741         self.verify_tun_66(p, count=n_pkts)
2742         c = p.tun_if.get_rx_stats()
2743         self.assertEqual(c['packets'], n_pkts)
2744         c = p.tun_if.get_tx_stats()
2745         self.assertEqual(c['packets'], n_pkts)
2746
2747         p.tun_if.admin_down()
2748         self.verify_drop_tun_66(p, count=n_pkts)
2749         p.tun_if.admin_up()
2750         self.verify_tun_66(p, count=n_pkts)
2751
2752         c = p.tun_if.get_rx_stats()
2753         self.assertEqual(c['packets'], 3*n_pkts)
2754         c = p.tun_if.get_tx_stats()
2755         self.assertEqual(c['packets'], 2*n_pkts)
2756
2757         # it's a v4 packet when its encrypted
2758         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2759
2760         self.verify_tun_46(p, count=n_pkts)
2761         c = p.tun_if.get_rx_stats()
2762         self.assertEqual(c['packets'], 4*n_pkts)
2763         c = p.tun_if.get_tx_stats()
2764         self.assertEqual(c['packets'], 3*n_pkts)
2765
2766         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2767
2768         self.vapi.cli("clear interfaces")
2769
2770         # rekey - create new SAs and update the tunnel protection
2771         np = copy.copy(p)
2772         np.crypt_key = b'X' + p.crypt_key[1:]
2773         np.scapy_tun_spi += 100
2774         np.scapy_tun_sa_id += 1
2775         np.vpp_tun_spi += 100
2776         np.vpp_tun_sa_id += 1
2777         np.tun_if.local_spi = p.vpp_tun_spi
2778         np.tun_if.remote_spi = p.scapy_tun_spi
2779
2780         self.config_sa_tun(np,
2781                            self.pg0.local_ip6,
2782                            self.pg0.remote_ip6)
2783         self.config_protect(np)
2784         self.unconfig_sa(p)
2785
2786         self.verify_tun_66(np, count=n_pkts)
2787         c = p.tun_if.get_rx_stats()
2788         self.assertEqual(c['packets'], n_pkts)
2789         c = p.tun_if.get_tx_stats()
2790         self.assertEqual(c['packets'], n_pkts)
2791
2792         # teardown
2793         self.unconfig_protect(np)
2794         self.unconfig_sa(np)
2795         self.unconfig_network(p)
2796
2797
2798 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2799     """ Ipsec P2MP ESP v4 tests """
2800     tun4_encrypt_node_name = "esp4-encrypt-tun"
2801     tun4_decrypt_node_name = "esp4-decrypt-tun"
2802     encryption_type = ESP
2803
2804     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2805                          payload_size=100):
2806         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2807                 sa.encrypt(IP(src=self.pg1.local_ip4,
2808                               dst=self.pg1.remote_ip4) /
2809                            UDP(sport=1144, dport=2233) /
2810                            Raw(b'X' * payload_size))
2811                 for i in range(count)]
2812
2813     def gen_pkts(self, sw_intf, src, dst, count=1,
2814                  payload_size=100):
2815         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2816                 IP(src="1.1.1.1", dst=dst) /
2817                 UDP(sport=1144, dport=2233) /
2818                 Raw(b'X' * payload_size)
2819                 for i in range(count)]
2820
2821     def verify_decrypted(self, p, rxs):
2822         for rx in rxs:
2823             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2824             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2825
2826     def verify_encrypted(self, p, sa, rxs):
2827         for rx in rxs:
2828             try:
2829                 self.assertEqual(rx[IP].tos,
2830                                  VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
2831                 pkt = sa.decrypt(rx[IP])
2832                 if not pkt.haslayer(IP):
2833                     pkt = IP(pkt[Raw].load)
2834                 self.assert_packet_checksums_valid(pkt)
2835                 e = pkt[IP]
2836                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2837             except (IndexError, AssertionError):
2838                 self.logger.debug(ppp("Unexpected packet:", rx))
2839                 try:
2840                     self.logger.debug(ppp("Decrypted packet:", pkt))
2841                 except:
2842                     pass
2843                 raise
2844
2845     def setUp(self):
2846         super(TestIpsecMIfEsp4, self).setUp()
2847
2848         N_NHS = 16
2849         self.tun_if = self.pg0
2850         p = self.ipv4_params
2851         p.tun_if = VppIpsecInterface(self,
2852                                      mode=(VppEnum.vl_api_tunnel_mode_t.
2853                                            TUNNEL_API_MODE_MP))
2854         p.tun_if.add_vpp_config()
2855         p.tun_if.admin_up()
2856         p.tun_if.config_ip4()
2857         p.tun_if.unconfig_ip4()
2858         p.tun_if.config_ip4()
2859         p.tun_if.generate_remote_hosts(N_NHS)
2860         self.pg0.generate_remote_hosts(N_NHS)
2861         self.pg0.configure_ipv4_neighbors()
2862
2863         # setup some SAs for several next-hops on the interface
2864         self.multi_params = []
2865
2866         for ii in range(N_NHS):
2867             p = copy.copy(self.ipv4_params)
2868
2869             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2870             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2871             p.scapy_tun_spi = p.scapy_tun_spi + ii
2872             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2873             p.vpp_tun_spi = p.vpp_tun_spi + ii
2874
2875             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2876             p.scapy_tra_spi = p.scapy_tra_spi + ii
2877             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2878             p.vpp_tra_spi = p.vpp_tra_spi + ii
2879             p.tun_sa_out = VppIpsecSA(
2880                 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2881                 p.auth_algo_vpp_id, p.auth_key,
2882                 p.crypt_algo_vpp_id, p.crypt_key,
2883                 self.vpp_esp_protocol,
2884                 self.pg0.local_ip4,
2885                 self.pg0.remote_hosts[ii].ip4,
2886                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2887             p.tun_sa_out.add_vpp_config()
2888
2889             p.tun_sa_in = VppIpsecSA(
2890                 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2891                 p.auth_algo_vpp_id, p.auth_key,
2892                 p.crypt_algo_vpp_id, p.crypt_key,
2893                 self.vpp_esp_protocol,
2894                 self.pg0.remote_hosts[ii].ip4,
2895                 self.pg0.local_ip4,
2896                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2897             p.tun_sa_in.add_vpp_config()
2898
2899             p.tun_protect = VppIpsecTunProtect(
2900                 self,
2901                 p.tun_if,
2902                 p.tun_sa_out,
2903                 [p.tun_sa_in],
2904                 nh=p.tun_if.remote_hosts[ii].ip4)
2905             p.tun_protect.add_vpp_config()
2906             config_tun_params(p, self.encryption_type, None,
2907                               self.pg0.local_ip4,
2908                               self.pg0.remote_hosts[ii].ip4)
2909             self.multi_params.append(p)
2910
2911             VppIpRoute(self, p.remote_tun_if_host, 32,
2912                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2913                                      p.tun_if.sw_if_index)]).add_vpp_config()
2914
2915             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2916
2917     def tearDown(self):
2918         p = self.ipv4_params
2919         p.tun_if.unconfig_ip4()
2920         super(TestIpsecMIfEsp4, self).tearDown()
2921
2922     def test_tun_44(self):
2923         """P2MP IPSEC 44"""
2924         N_PKTS = 63
2925         for p in self.multi_params:
2926             self.verify_tun_44(p, count=N_PKTS)
2927
2928
2929 class TestIpsecItf6MPLS(TemplateIpsec,
2930                         TemplateIpsecItf6,
2931                         IpsecTun6):
2932     """ IPsec Interface MPLSoIPv6 """
2933
2934     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
2935
2936     def setUp(self):
2937         super(TestIpsecItf6MPLS, self).setUp()
2938
2939         self.tun_if = self.pg0
2940
2941     def tearDown(self):
2942         super(TestIpsecItf6MPLS, self).tearDown()
2943
2944     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2945                           payload_size=100):
2946         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2947                 sa.encrypt(MPLS(label=66, ttl=3) /
2948                            IPv6(src=src, dst=dst) /
2949                            UDP(sport=1166, dport=2233) /
2950                            Raw(b'X' * payload_size))
2951                 for i in range(count)]
2952
2953     def verify_encrypted6(self, p, sa, rxs):
2954         for rx in rxs:
2955             try:
2956                 pkt = sa.decrypt(rx[IPv6])
2957                 if not pkt.haslayer(IPv6):
2958                     pkt = IP(pkt[Raw].load)
2959                 self.assert_packet_checksums_valid(pkt)
2960                 self.assert_equal(pkt[MPLS].label, 66)
2961                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
2962             except (IndexError, AssertionError):
2963                 self.logger.debug(ppp("Unexpected packet:", rx))
2964                 try:
2965                     self.logger.debug(ppp("Decrypted packet:", pkt))
2966                 except:
2967                     pass
2968                 raise
2969
2970     def test_tun_mpls_o_ip6(self):
2971         """IPSEC interface MPLS over IPv6"""
2972
2973         n_pkts = 127
2974         p = self.ipv6_params
2975         f = FibPathProto
2976
2977         tbl = VppMplsTable(self, 0)
2978         tbl.add_vpp_config()
2979
2980         self.config_network(p)
2981         # deag MPLS routes from the tunnel
2982         r6 = VppMplsRoute(self, 66, 1,
2983                           [VppRoutePath(
2984                               self.pg1.remote_ip6,
2985                               self.pg1.sw_if_index)],
2986                           eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
2987         p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
2988                                      p.tun_if.sw_if_index,
2989                                      labels=[VppMplsLabel(66)])])
2990         p.tun_if.enable_mpls()
2991
2992         self.config_sa_tun(p,
2993                            self.pg0.local_ip6,
2994                            self.pg0.remote_ip6)
2995         self.config_protect(p)
2996
2997         self.verify_tun_66(p, count=n_pkts)
2998
2999         # cleanup
3000         p.tun_if.disable_mpls()
3001         self.unconfig_protect(p)
3002         self.unconfig_sa(p)
3003         self.unconfig_network(p)
3004
3005
3006 if __name__ == '__main__':
3007     unittest.main(testRunner=VppTestRunner)