ipsec: add per-SA error counters
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
14     TemplateIpsec,
15     IpsecTun4Tests,
16     IpsecTun6Tests,
17     IpsecTun4,
18     IpsecTun6,
19     IpsecTcpTests,
20     mk_scapy_crypt_key,
21     IpsecTun6HandoffTests,
22     IpsecTun4HandoffTests,
23     config_tun_params,
24 )
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
28     VppIpRoute,
29     VppRoutePath,
30     DpoProto,
31     VppMplsLabel,
32     VppMplsTable,
33     VppMplsRoute,
34     FibPathProto,
35 )
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
40 from util import ppp
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
45
46
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
49     esn_en = bool(
50         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
51     )
52     crypt_key = mk_scapy_crypt_key(p)
53     if tun_if:
54         p.tun_dst = tun_if.remote_ip
55         p.tun_src = tun_if.local_ip
56     else:
57         p.tun_dst = dst
58         p.tun_src = src
59
60     if p.nat_header:
61         is_default_port = p.nat_header.dport == 4500
62     else:
63         is_default_port = True
64
65     if is_default_port:
66         outbound_nat_header = p.nat_header
67     else:
68         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69         bind_layers(UDP, ESP, dport=p.nat_header.dport)
70
71     p.scapy_tun_sa = SecurityAssociation(
72         encryption_type,
73         spi=p.scapy_tun_spi,
74         crypt_algo=p.crypt_algo,
75         crypt_key=crypt_key,
76         auth_algo=p.auth_algo,
77         auth_key=p.auth_key,
78         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79         nat_t_header=outbound_nat_header,
80         esn_en=esn_en,
81     )
82     p.vpp_tun_sa = SecurityAssociation(
83         encryption_type,
84         spi=p.vpp_tun_spi,
85         crypt_algo=p.crypt_algo,
86         crypt_key=crypt_key,
87         auth_algo=p.auth_algo,
88         auth_key=p.auth_key,
89         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90         nat_t_header=p.nat_header,
91         esn_en=esn_en,
92     )
93
94
95 def config_tra_params(p, encryption_type, tun_if):
96     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97     esn_en = bool(
98         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
99     )
100     crypt_key = mk_scapy_crypt_key(p)
101     p.tun_dst = tun_if.remote_ip
102     p.tun_src = tun_if.local_ip
103
104     if p.nat_header:
105         is_default_port = p.nat_header.dport == 4500
106     else:
107         is_default_port = True
108
109     if is_default_port:
110         outbound_nat_header = p.nat_header
111     else:
112         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113         bind_layers(UDP, ESP, dport=p.nat_header.dport)
114
115     p.scapy_tun_sa = SecurityAssociation(
116         encryption_type,
117         spi=p.scapy_tun_spi,
118         crypt_algo=p.crypt_algo,
119         crypt_key=crypt_key,
120         auth_algo=p.auth_algo,
121         auth_key=p.auth_key,
122         esn_en=esn_en,
123         nat_t_header=outbound_nat_header,
124     )
125     p.vpp_tun_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.vpp_tun_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         esn_en=esn_en,
133         nat_t_header=p.nat_header,
134     )
135
136
137 class TemplateIpsec4TunProtect(object):
138     """IPsec IPv4 Tunnel protect"""
139
140     encryption_type = ESP
141     tun4_encrypt_node_name = "esp4-encrypt-tun"
142     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143     tun4_input_node = "ipsec4-tun-input"
144
145     def config_sa_tra(self, p):
146         config_tun_params(p, self.encryption_type, p.tun_if)
147
148         p.tun_sa_out = VppIpsecSA(
149             self,
150             p.vpp_tun_sa_id,
151             p.vpp_tun_spi,
152             p.auth_algo_vpp_id,
153             p.auth_key,
154             p.crypt_algo_vpp_id,
155             p.crypt_key,
156             self.vpp_esp_protocol,
157             flags=p.flags,
158         )
159         p.tun_sa_out.add_vpp_config()
160
161         p.tun_sa_in = VppIpsecSA(
162             self,
163             p.scapy_tun_sa_id,
164             p.scapy_tun_spi,
165             p.auth_algo_vpp_id,
166             p.auth_key,
167             p.crypt_algo_vpp_id,
168             p.crypt_key,
169             self.vpp_esp_protocol,
170             flags=p.flags,
171         )
172         p.tun_sa_in.add_vpp_config()
173
174     def config_sa_tun(self, p):
175         config_tun_params(p, self.encryption_type, p.tun_if)
176
177         p.tun_sa_out = VppIpsecSA(
178             self,
179             p.vpp_tun_sa_id,
180             p.vpp_tun_spi,
181             p.auth_algo_vpp_id,
182             p.auth_key,
183             p.crypt_algo_vpp_id,
184             p.crypt_key,
185             self.vpp_esp_protocol,
186             self.tun_if.local_addr[p.addr_type],
187             self.tun_if.remote_addr[p.addr_type],
188             flags=p.flags,
189         )
190         p.tun_sa_out.add_vpp_config()
191
192         p.tun_sa_in = VppIpsecSA(
193             self,
194             p.scapy_tun_sa_id,
195             p.scapy_tun_spi,
196             p.auth_algo_vpp_id,
197             p.auth_key,
198             p.crypt_algo_vpp_id,
199             p.crypt_key,
200             self.vpp_esp_protocol,
201             self.tun_if.remote_addr[p.addr_type],
202             self.tun_if.local_addr[p.addr_type],
203             flags=p.flags,
204         )
205         p.tun_sa_in.add_vpp_config()
206
207     def config_protect(self, p):
208         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209         p.tun_protect.add_vpp_config()
210
211     def config_network(self, p):
212         if hasattr(p, "tun_dst"):
213             tun_dst = p.tun_dst
214         else:
215             tun_dst = self.pg0.remote_ip4
216         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217         p.tun_if.add_vpp_config()
218         p.tun_if.admin_up()
219         p.tun_if.config_ip4()
220         p.tun_if.config_ip6()
221
222         p.route = VppIpRoute(
223             self,
224             p.remote_tun_if_host,
225             32,
226             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
227         )
228         p.route.add_vpp_config()
229         r = VppIpRoute(
230             self,
231             p.remote_tun_if_host6,
232             128,
233             [
234                 VppRoutePath(
235                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
236                 )
237             ],
238         )
239         r.add_vpp_config()
240
241     def unconfig_network(self, p):
242         p.route.remove_vpp_config()
243         p.tun_if.remove_vpp_config()
244
245     def unconfig_protect(self, p):
246         p.tun_protect.remove_vpp_config()
247
248     def unconfig_sa(self, p):
249         p.tun_sa_out.remove_vpp_config()
250         p.tun_sa_in.remove_vpp_config()
251
252
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254     """IPsec tunnel interface tests"""
255
256     encryption_type = ESP
257
258     @classmethod
259     def setUpClass(cls):
260         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
265
266     def setUp(self):
267         super(TemplateIpsec4TunIfEsp, self).setUp()
268
269         self.tun_if = self.pg0
270
271         p = self.ipv4_params
272
273         self.config_network(p)
274         self.config_sa_tra(p)
275         self.config_protect(p)
276
277     def tearDown(self):
278         super(TemplateIpsec4TunIfEsp, self).tearDown()
279
280
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282     """IPsec UDP tunnel interface tests"""
283
284     tun4_encrypt_node_name = "esp4-encrypt-tun"
285     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286     encryption_type = ESP
287
288     @classmethod
289     def setUpClass(cls):
290         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
291
292     @classmethod
293     def tearDownClass(cls):
294         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
295
296     def verify_encrypted(self, p, sa, rxs):
297         for rx in rxs:
298             try:
299                 # ensure the UDP ports are correct before we decrypt
300                 # which strips them
301                 self.assertTrue(rx.haslayer(UDP))
302                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
304
305                 pkt = sa.decrypt(rx[IP])
306                 if not pkt.haslayer(IP):
307                     pkt = IP(pkt[Raw].load)
308
309                 self.assert_packet_checksums_valid(pkt)
310                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312             except (IndexError, AssertionError):
313                 self.logger.debug(ppp("Unexpected packet:", rx))
314                 try:
315                     self.logger.debug(ppp("Decrypted packet:", pkt))
316                 except:
317                     pass
318                 raise
319
320     def config_sa_tra(self, p):
321         config_tun_params(p, self.encryption_type, p.tun_if)
322
323         p.tun_sa_out = VppIpsecSA(
324             self,
325             p.vpp_tun_sa_id,
326             p.vpp_tun_spi,
327             p.auth_algo_vpp_id,
328             p.auth_key,
329             p.crypt_algo_vpp_id,
330             p.crypt_key,
331             self.vpp_esp_protocol,
332             flags=p.flags,
333             udp_src=p.nat_header.sport,
334             udp_dst=p.nat_header.dport,
335         )
336         p.tun_sa_out.add_vpp_config()
337
338         p.tun_sa_in = VppIpsecSA(
339             self,
340             p.scapy_tun_sa_id,
341             p.scapy_tun_spi,
342             p.auth_algo_vpp_id,
343             p.auth_key,
344             p.crypt_algo_vpp_id,
345             p.crypt_key,
346             self.vpp_esp_protocol,
347             flags=p.flags
348             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
349             udp_src=p.nat_header.sport,
350             udp_dst=p.nat_header.dport,
351         )
352         p.tun_sa_in.add_vpp_config()
353
354     def setUp(self):
355         super(TemplateIpsec4TunIfEspUdp, self).setUp()
356
357         p = self.ipv4_params
358         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
359         p.nat_header = UDP(sport=5454, dport=4500)
360
361         self.tun_if = self.pg0
362
363         self.config_network(p)
364         self.config_sa_tra(p)
365         self.config_protect(p)
366
367     def tearDown(self):
368         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
369
370
371 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
372     """Ipsec ESP - TUN tests"""
373
374     tun4_encrypt_node_name = "esp4-encrypt-tun"
375     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
376
377     def test_tun_basic64(self):
378         """ipsec 6o4 tunnel basic test"""
379         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
380
381         self.verify_tun_64(self.params[socket.AF_INET], count=1)
382
383     def test_tun_burst64(self):
384         """ipsec 6o4 tunnel basic test"""
385         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
386
387         self.verify_tun_64(self.params[socket.AF_INET], count=257)
388
389     def test_tun_basic_frag44(self):
390         """ipsec 4o4 tunnel frag basic test"""
391         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
392
393         p = self.ipv4_params
394
395         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
396         self.verify_tun_44(
397             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
398         )
399         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
400
401
402 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
403     """Ipsec ESP UDP tests"""
404
405     tun4_input_node = "ipsec4-tun-input"
406
407     def setUp(self):
408         super(TestIpsec4TunIfEspUdp, self).setUp()
409
410     def test_keepalive(self):
411         """IPSEC NAT Keepalive"""
412         self.verify_keepalive(self.ipv4_params)
413
414
415 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
416     """Ipsec ESP UDP GCM tests"""
417
418     tun4_input_node = "ipsec4-tun-input"
419
420     def setUp(self):
421         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
422         p = self.ipv4_params
423         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
424         p.crypt_algo_vpp_id = (
425             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
426         )
427         p.crypt_algo = "AES-GCM"
428         p.auth_algo = "NULL"
429         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
430         p.salt = 0
431
432
433 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
434     """Ipsec ESP UDP update tests"""
435
436     tun4_input_node = "ipsec4-tun-input"
437
438     def setUp(self):
439         super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
440         p = self.ipv4_params
441         p.nat_header = UDP(sport=6565, dport=7676)
442         config_tun_params(p, self.encryption_type, p.tun_if)
443         p.tun_sa_in.update_vpp_config(
444             udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
445         )
446         p.tun_sa_out.update_vpp_config(
447             udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
448         )
449
450
451 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
452     """Ipsec ESP - TCP tests"""
453
454     pass
455
456
457 class TemplateIpsec6TunProtect(object):
458     """IPsec IPv6 Tunnel protect"""
459
460     def config_sa_tra(self, p):
461         config_tun_params(p, self.encryption_type, p.tun_if)
462
463         p.tun_sa_out = VppIpsecSA(
464             self,
465             p.vpp_tun_sa_id,
466             p.vpp_tun_spi,
467             p.auth_algo_vpp_id,
468             p.auth_key,
469             p.crypt_algo_vpp_id,
470             p.crypt_key,
471             self.vpp_esp_protocol,
472         )
473         p.tun_sa_out.add_vpp_config()
474
475         p.tun_sa_in = VppIpsecSA(
476             self,
477             p.scapy_tun_sa_id,
478             p.scapy_tun_spi,
479             p.auth_algo_vpp_id,
480             p.auth_key,
481             p.crypt_algo_vpp_id,
482             p.crypt_key,
483             self.vpp_esp_protocol,
484         )
485         p.tun_sa_in.add_vpp_config()
486
487     def config_sa_tun(self, p):
488         config_tun_params(p, self.encryption_type, p.tun_if)
489
490         p.tun_sa_out = VppIpsecSA(
491             self,
492             p.vpp_tun_sa_id,
493             p.vpp_tun_spi,
494             p.auth_algo_vpp_id,
495             p.auth_key,
496             p.crypt_algo_vpp_id,
497             p.crypt_key,
498             self.vpp_esp_protocol,
499             self.tun_if.local_addr[p.addr_type],
500             self.tun_if.remote_addr[p.addr_type],
501         )
502         p.tun_sa_out.add_vpp_config()
503
504         p.tun_sa_in = VppIpsecSA(
505             self,
506             p.scapy_tun_sa_id,
507             p.scapy_tun_spi,
508             p.auth_algo_vpp_id,
509             p.auth_key,
510             p.crypt_algo_vpp_id,
511             p.crypt_key,
512             self.vpp_esp_protocol,
513             self.tun_if.remote_addr[p.addr_type],
514             self.tun_if.local_addr[p.addr_type],
515         )
516         p.tun_sa_in.add_vpp_config()
517
518     def config_protect(self, p):
519         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
520         p.tun_protect.add_vpp_config()
521
522     def config_network(self, p):
523         if hasattr(p, "tun_dst"):
524             tun_dst = p.tun_dst
525         else:
526             tun_dst = self.pg0.remote_ip6
527         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
528         p.tun_if.add_vpp_config()
529         p.tun_if.admin_up()
530         p.tun_if.config_ip6()
531         p.tun_if.config_ip4()
532
533         p.route = VppIpRoute(
534             self,
535             p.remote_tun_if_host,
536             128,
537             [
538                 VppRoutePath(
539                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
540                 )
541             ],
542         )
543         p.route.add_vpp_config()
544         r = VppIpRoute(
545             self,
546             p.remote_tun_if_host4,
547             32,
548             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
549         )
550         r.add_vpp_config()
551
552     def unconfig_network(self, p):
553         p.route.remove_vpp_config()
554         p.tun_if.remove_vpp_config()
555
556     def unconfig_protect(self, p):
557         p.tun_protect.remove_vpp_config()
558
559     def unconfig_sa(self, p):
560         p.tun_sa_out.remove_vpp_config()
561         p.tun_sa_in.remove_vpp_config()
562
563
564 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
565     """IPsec tunnel interface tests"""
566
567     encryption_type = ESP
568
569     def setUp(self):
570         super(TemplateIpsec6TunIfEsp, self).setUp()
571
572         self.tun_if = self.pg0
573
574         p = self.ipv6_params
575         self.config_network(p)
576         self.config_sa_tra(p)
577         self.config_protect(p)
578
579     def tearDown(self):
580         super(TemplateIpsec6TunIfEsp, self).tearDown()
581
582
583 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
584     """IPsec6 UDP tunnel interface tests"""
585
586     tun4_encrypt_node_name = "esp6-encrypt-tun"
587     tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
588     encryption_type = ESP
589
590     @classmethod
591     def setUpClass(cls):
592         super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
593
594     @classmethod
595     def tearDownClass(cls):
596         super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
597
598     def verify_encrypted(self, p, sa, rxs):
599         for rx in rxs:
600             try:
601                 # ensure the UDP ports are correct before we decrypt
602                 # which strips them
603                 self.assertTrue(rx.haslayer(UDP))
604                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
605                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
606
607                 pkt = sa.decrypt(rx[IP])
608                 if not pkt.haslayer(IP):
609                     pkt = IP(pkt[Raw].load)
610
611                 self.assert_packet_checksums_valid(pkt)
612                 self.assert_equal(
613                     pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
614                 )
615                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
616             except (IndexError, AssertionError):
617                 self.logger.debug(ppp("Unexpected packet:", rx))
618                 try:
619                     self.logger.debug(ppp("Decrypted packet:", pkt))
620                 except:
621                     pass
622                 raise
623
624     def config_sa_tra(self, p):
625         config_tun_params(p, self.encryption_type, p.tun_if)
626
627         p.tun_sa_out = VppIpsecSA(
628             self,
629             p.vpp_tun_sa_id,
630             p.vpp_tun_spi,
631             p.auth_algo_vpp_id,
632             p.auth_key,
633             p.crypt_algo_vpp_id,
634             p.crypt_key,
635             self.vpp_esp_protocol,
636             flags=p.flags,
637             udp_src=p.nat_header.sport,
638             udp_dst=p.nat_header.dport,
639         )
640         p.tun_sa_out.add_vpp_config()
641
642         p.tun_sa_in = VppIpsecSA(
643             self,
644             p.scapy_tun_sa_id,
645             p.scapy_tun_spi,
646             p.auth_algo_vpp_id,
647             p.auth_key,
648             p.crypt_algo_vpp_id,
649             p.crypt_key,
650             self.vpp_esp_protocol,
651             flags=p.flags
652             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
653             udp_src=p.nat_header.sport,
654             udp_dst=p.nat_header.dport,
655         )
656         p.tun_sa_in.add_vpp_config()
657
658     def setUp(self):
659         super(TemplateIpsec6TunIfEspUdp, self).setUp()
660
661         p = self.ipv6_params
662         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
663         p.nat_header = UDP(sport=5454, dport=4500)
664
665         self.tun_if = self.pg0
666
667         self.config_network(p)
668         self.config_sa_tra(p)
669         self.config_protect(p)
670
671     def tearDown(self):
672         super(TemplateIpsec6TunIfEspUdp, self).tearDown()
673
674
675 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
676     """Ipsec ESP 6 UDP tests"""
677
678     tun6_input_node = "ipsec6-tun-input"
679     tun6_encrypt_node_name = "esp6-encrypt-tun"
680     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
681
682     def setUp(self):
683         super(TestIpsec6TunIfEspUdp, self).setUp()
684
685     def test_keepalive(self):
686         """IPSEC6 NAT Keepalive"""
687         self.verify_keepalive(self.ipv6_params)
688
689
690 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
691     """Ipsec ESP 6 UDP GCM tests"""
692
693     tun6_input_node = "ipsec6-tun-input"
694     tun6_encrypt_node_name = "esp6-encrypt-tun"
695     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
696
697     def setUp(self):
698         super(TestIpsec6TunIfEspUdpGCM, self).setUp()
699         p = self.ipv6_params
700         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
701         p.crypt_algo_vpp_id = (
702             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
703         )
704         p.crypt_algo = "AES-GCM"
705         p.auth_algo = "NULL"
706         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
707         p.salt = 0
708
709
710 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
711     """Ipsec ESP - TUN tests"""
712
713     tun6_encrypt_node_name = "esp6-encrypt-tun"
714     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
715
716     def test_tun_basic46(self):
717         """ipsec 4o6 tunnel basic test"""
718         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
719         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
720
721     def test_tun_burst46(self):
722         """ipsec 4o6 tunnel burst test"""
723         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
724         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
725
726
727 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
728     """Ipsec ESP 6 Handoff tests"""
729
730     tun6_encrypt_node_name = "esp6-encrypt-tun"
731     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
732
733     def test_tun_handoff_66_police(self):
734         """ESP 6o6 tunnel with policer worker hand-off test"""
735         self.vapi.cli("clear errors")
736         self.vapi.cli("clear ipsec sa")
737
738         N_PKTS = 15
739         p = self.params[socket.AF_INET6]
740
741         action_tx = PolicerAction(
742             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
743         )
744         policer = VppPolicer(
745             self,
746             "pol1",
747             80,
748             0,
749             1000,
750             0,
751             conform_action=action_tx,
752             exceed_action=action_tx,
753             violate_action=action_tx,
754         )
755         policer.add_vpp_config()
756
757         # Start policing on tun
758         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
759
760         for pol_bind in [1, 0]:
761             policer.bind_vpp_config(pol_bind, True)
762
763             # inject alternately on worker 0 and 1.
764             for worker in [0, 1, 0, 1]:
765                 send_pkts = self.gen_encrypt_pkts6(
766                     p,
767                     p.scapy_tun_sa,
768                     self.tun_if,
769                     src=p.remote_tun_if_host,
770                     dst=self.pg1.remote_ip6,
771                     count=N_PKTS,
772                 )
773                 recv_pkts = self.send_and_expect(
774                     self.tun_if, send_pkts, self.pg1, worker=worker
775                 )
776                 self.verify_decrypted6(p, recv_pkts)
777                 self.logger.debug(self.vapi.cli("show trace max 100"))
778
779             stats = policer.get_stats()
780             stats0 = policer.get_stats(worker=0)
781             stats1 = policer.get_stats(worker=1)
782
783             if pol_bind == 1:
784                 # First pass: Worker 1, should have done all the policing
785                 self.assertEqual(stats, stats1)
786
787                 # Worker 0, should have handed everything off
788                 self.assertEqual(stats0["conform_packets"], 0)
789                 self.assertEqual(stats0["exceed_packets"], 0)
790                 self.assertEqual(stats0["violate_packets"], 0)
791             else:
792                 # Second pass: both workers should have policed equal amounts
793                 self.assertGreater(stats1["conform_packets"], 0)
794                 self.assertEqual(stats1["exceed_packets"], 0)
795                 self.assertGreater(stats1["violate_packets"], 0)
796
797                 self.assertGreater(stats0["conform_packets"], 0)
798                 self.assertEqual(stats0["exceed_packets"], 0)
799                 self.assertGreater(stats0["violate_packets"], 0)
800
801                 self.assertEqual(
802                     stats0["conform_packets"] + stats0["violate_packets"],
803                     stats1["conform_packets"] + stats1["violate_packets"],
804                 )
805
806         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
807         policer.remove_vpp_config()
808
809
810 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
811     """Ipsec ESP 4 Handoff tests"""
812
813     tun4_encrypt_node_name = "esp4-encrypt-tun"
814     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
815
816     def test_tun_handoff_44_police(self):
817         """ESP 4o4 tunnel with policer worker hand-off test"""
818         self.vapi.cli("clear errors")
819         self.vapi.cli("clear ipsec sa")
820
821         N_PKTS = 15
822         p = self.params[socket.AF_INET]
823
824         action_tx = PolicerAction(
825             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
826         )
827         policer = VppPolicer(
828             self,
829             "pol1",
830             80,
831             0,
832             1000,
833             0,
834             conform_action=action_tx,
835             exceed_action=action_tx,
836             violate_action=action_tx,
837         )
838         policer.add_vpp_config()
839
840         # Start policing on tun
841         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
842
843         for pol_bind in [1, 0]:
844             policer.bind_vpp_config(pol_bind, True)
845
846             # inject alternately on worker 0 and 1.
847             for worker in [0, 1, 0, 1]:
848                 send_pkts = self.gen_encrypt_pkts(
849                     p,
850                     p.scapy_tun_sa,
851                     self.tun_if,
852                     src=p.remote_tun_if_host,
853                     dst=self.pg1.remote_ip4,
854                     count=N_PKTS,
855                 )
856                 recv_pkts = self.send_and_expect(
857                     self.tun_if, send_pkts, self.pg1, worker=worker
858                 )
859                 self.verify_decrypted(p, recv_pkts)
860                 self.logger.debug(self.vapi.cli("show trace max 100"))
861
862             stats = policer.get_stats()
863             stats0 = policer.get_stats(worker=0)
864             stats1 = policer.get_stats(worker=1)
865
866             if pol_bind == 1:
867                 # First pass: Worker 1, should have done all the policing
868                 self.assertEqual(stats, stats1)
869
870                 # Worker 0, should have handed everything off
871                 self.assertEqual(stats0["conform_packets"], 0)
872                 self.assertEqual(stats0["exceed_packets"], 0)
873                 self.assertEqual(stats0["violate_packets"], 0)
874             else:
875                 # Second pass: both workers should have policed equal amounts
876                 self.assertGreater(stats1["conform_packets"], 0)
877                 self.assertEqual(stats1["exceed_packets"], 0)
878                 self.assertGreater(stats1["violate_packets"], 0)
879
880                 self.assertGreater(stats0["conform_packets"], 0)
881                 self.assertEqual(stats0["exceed_packets"], 0)
882                 self.assertGreater(stats0["violate_packets"], 0)
883
884                 self.assertEqual(
885                     stats0["conform_packets"] + stats0["violate_packets"],
886                     stats1["conform_packets"] + stats1["violate_packets"],
887                 )
888
889         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
890         policer.remove_vpp_config()
891
892
893 @tag_fixme_vpp_workers
894 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
895     """IPsec IPv4 Multi Tunnel interface"""
896
897     encryption_type = ESP
898     tun4_encrypt_node_name = "esp4-encrypt-tun"
899     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
900
901     def setUp(self):
902         super(TestIpsec4MultiTunIfEsp, self).setUp()
903
904         self.tun_if = self.pg0
905
906         self.multi_params = []
907         self.pg0.generate_remote_hosts(10)
908         self.pg0.configure_ipv4_neighbors()
909
910         for ii in range(10):
911             p = copy.copy(self.ipv4_params)
912
913             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
914             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
915             p.scapy_tun_spi = p.scapy_tun_spi + ii
916             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
917             p.vpp_tun_spi = p.vpp_tun_spi + ii
918
919             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
920             p.scapy_tra_spi = p.scapy_tra_spi + ii
921             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
922             p.vpp_tra_spi = p.vpp_tra_spi + ii
923             p.tun_dst = self.pg0.remote_hosts[ii].ip4
924
925             self.multi_params.append(p)
926             self.config_network(p)
927             self.config_sa_tra(p)
928             self.config_protect(p)
929
930     def tearDown(self):
931         super(TestIpsec4MultiTunIfEsp, self).tearDown()
932
933     def test_tun_44(self):
934         """Multiple IPSEC tunnel interfaces"""
935         for p in self.multi_params:
936             self.verify_tun_44(p, count=127)
937             self.assertEqual(p.tun_if.get_rx_stats(), 127)
938             self.assertEqual(p.tun_if.get_tx_stats(), 127)
939
940     def test_tun_rr_44(self):
941         """Round-robin packets acrros multiple interface"""
942         tx = []
943         for p in self.multi_params:
944             tx = tx + self.gen_encrypt_pkts(
945                 p,
946                 p.scapy_tun_sa,
947                 self.tun_if,
948                 src=p.remote_tun_if_host,
949                 dst=self.pg1.remote_ip4,
950             )
951         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
952
953         for rx, p in zip(rxs, self.multi_params):
954             self.verify_decrypted(p, [rx])
955
956         tx = []
957         for p in self.multi_params:
958             tx = tx + self.gen_pkts(
959                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
960             )
961         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
962
963         for rx, p in zip(rxs, self.multi_params):
964             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
965
966
967 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
968     """IPsec IPv4 Tunnel interface all Algos"""
969
970     encryption_type = ESP
971     tun4_encrypt_node_name = "esp4-encrypt-tun"
972     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
973
974     def setUp(self):
975         super(TestIpsec4TunIfEspAll, self).setUp()
976
977         self.tun_if = self.pg0
978         p = self.ipv4_params
979
980         self.config_network(p)
981         self.config_sa_tra(p)
982         self.config_protect(p)
983
984     def tearDown(self):
985         p = self.ipv4_params
986         self.unconfig_protect(p)
987         self.unconfig_network(p)
988         self.unconfig_sa(p)
989
990         super(TestIpsec4TunIfEspAll, self).tearDown()
991
992     def rekey(self, p):
993         #
994         # change the key and the SPI
995         #
996         np = copy.copy(p)
997         p.crypt_key = b"X" + p.crypt_key[1:]
998         p.scapy_tun_spi += 1
999         p.scapy_tun_sa_id += 1
1000         p.vpp_tun_spi += 1
1001         p.vpp_tun_sa_id += 1
1002         p.tun_if.local_spi = p.vpp_tun_spi
1003         p.tun_if.remote_spi = p.scapy_tun_spi
1004
1005         config_tun_params(p, self.encryption_type, p.tun_if)
1006
1007         p.tun_sa_out = VppIpsecSA(
1008             self,
1009             p.vpp_tun_sa_id,
1010             p.vpp_tun_spi,
1011             p.auth_algo_vpp_id,
1012             p.auth_key,
1013             p.crypt_algo_vpp_id,
1014             p.crypt_key,
1015             self.vpp_esp_protocol,
1016             flags=p.flags,
1017             salt=p.salt,
1018         )
1019         p.tun_sa_in = VppIpsecSA(
1020             self,
1021             p.scapy_tun_sa_id,
1022             p.scapy_tun_spi,
1023             p.auth_algo_vpp_id,
1024             p.auth_key,
1025             p.crypt_algo_vpp_id,
1026             p.crypt_key,
1027             self.vpp_esp_protocol,
1028             flags=p.flags,
1029             salt=p.salt,
1030         )
1031         p.tun_sa_in.add_vpp_config()
1032         p.tun_sa_out.add_vpp_config()
1033
1034         self.config_protect(p)
1035         np.tun_sa_out.remove_vpp_config()
1036         np.tun_sa_in.remove_vpp_config()
1037         self.logger.info(self.vapi.cli("sh ipsec sa"))
1038
1039     def test_tun_44(self):
1040         """IPSEC tunnel all algos"""
1041
1042         # foreach VPP crypto engine
1043         engines = ["ia32", "ipsecmb", "openssl"]
1044
1045         # foreach crypto algorithm
1046         algos = [
1047             {
1048                 "vpp-crypto": (
1049                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1050                 ),
1051                 "vpp-integ": (
1052                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1053                 ),
1054                 "scapy-crypto": "AES-GCM",
1055                 "scapy-integ": "NULL",
1056                 "key": b"JPjyOWBeVEQiMe7h",
1057                 "salt": 3333,
1058             },
1059             {
1060                 "vpp-crypto": (
1061                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1062                 ),
1063                 "vpp-integ": (
1064                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1065                 ),
1066                 "scapy-crypto": "AES-GCM",
1067                 "scapy-integ": "NULL",
1068                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1069                 "salt": 0,
1070             },
1071             {
1072                 "vpp-crypto": (
1073                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1074                 ),
1075                 "vpp-integ": (
1076                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1077                 ),
1078                 "scapy-crypto": "AES-GCM",
1079                 "scapy-integ": "NULL",
1080                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1081                 "salt": 9999,
1082             },
1083             {
1084                 "vpp-crypto": (
1085                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1086                 ),
1087                 "vpp-integ": (
1088                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1089                 ),
1090                 "scapy-crypto": "AES-CBC",
1091                 "scapy-integ": "HMAC-SHA1-96",
1092                 "salt": 0,
1093                 "key": b"JPjyOWBeVEQiMe7h",
1094             },
1095             {
1096                 "vpp-crypto": (
1097                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1098                 ),
1099                 "vpp-integ": (
1100                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1101                 ),
1102                 "scapy-crypto": "AES-CBC",
1103                 "scapy-integ": "SHA2-512-256",
1104                 "salt": 0,
1105                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1106             },
1107             {
1108                 "vpp-crypto": (
1109                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1110                 ),
1111                 "vpp-integ": (
1112                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1113                 ),
1114                 "scapy-crypto": "AES-CBC",
1115                 "scapy-integ": "SHA2-256-128",
1116                 "salt": 0,
1117                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1118             },
1119             {
1120                 "vpp-crypto": (
1121                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1122                 ),
1123                 "vpp-integ": (
1124                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1125                 ),
1126                 "scapy-crypto": "NULL",
1127                 "scapy-integ": "HMAC-SHA1-96",
1128                 "salt": 0,
1129                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1130             },
1131         ]
1132
1133         for engine in engines:
1134             self.vapi.cli("set crypto handler all %s" % engine)
1135
1136             #
1137             # loop through each of the algorithms
1138             #
1139             for algo in algos:
1140                 # with self.subTest(algo=algo['scapy']):
1141
1142                 p = self.ipv4_params
1143                 p.auth_algo_vpp_id = algo["vpp-integ"]
1144                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1145                 p.crypt_algo = algo["scapy-crypto"]
1146                 p.auth_algo = algo["scapy-integ"]
1147                 p.crypt_key = algo["key"]
1148                 p.salt = algo["salt"]
1149
1150                 #
1151                 # rekey the tunnel
1152                 #
1153                 self.rekey(p)
1154                 self.verify_tun_44(p, count=127)
1155
1156
1157 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1158     """IPsec IPv4 Tunnel interface no Algos"""
1159
1160     encryption_type = ESP
1161     tun4_encrypt_node_name = "esp4-encrypt-tun"
1162     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1163
1164     def setUp(self):
1165         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1166
1167         self.tun_if = self.pg0
1168         p = self.ipv4_params
1169         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1170         p.auth_algo = "NULL"
1171         p.auth_key = []
1172
1173         p.crypt_algo_vpp_id = (
1174             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1175         )
1176         p.crypt_algo = "NULL"
1177         p.crypt_key = []
1178
1179     def tearDown(self):
1180         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1181
1182     def test_tun_44(self):
1183         """IPSec SA with NULL algos"""
1184         p = self.ipv4_params
1185
1186         self.config_network(p)
1187         self.config_sa_tra(p)
1188         self.config_protect(p)
1189
1190         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1191         self.send_and_assert_no_replies(self.pg1, tx)
1192
1193         self.unconfig_protect(p)
1194         self.unconfig_sa(p)
1195         self.unconfig_network(p)
1196
1197
1198 @tag_fixme_vpp_workers
1199 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1200     """IPsec IPv6 Multi Tunnel interface"""
1201
1202     encryption_type = ESP
1203     tun6_encrypt_node_name = "esp6-encrypt-tun"
1204     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1205
1206     def setUp(self):
1207         super(TestIpsec6MultiTunIfEsp, self).setUp()
1208
1209         self.tun_if = self.pg0
1210
1211         self.multi_params = []
1212         self.pg0.generate_remote_hosts(10)
1213         self.pg0.configure_ipv6_neighbors()
1214
1215         for ii in range(10):
1216             p = copy.copy(self.ipv6_params)
1217
1218             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1219             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1220             p.scapy_tun_spi = p.scapy_tun_spi + ii
1221             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1222             p.vpp_tun_spi = p.vpp_tun_spi + ii
1223
1224             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1225             p.scapy_tra_spi = p.scapy_tra_spi + ii
1226             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1227             p.vpp_tra_spi = p.vpp_tra_spi + ii
1228             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1229
1230             self.multi_params.append(p)
1231             self.config_network(p)
1232             self.config_sa_tra(p)
1233             self.config_protect(p)
1234
1235     def tearDown(self):
1236         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1237
1238     def test_tun_66(self):
1239         """Multiple IPSEC tunnel interfaces"""
1240         for p in self.multi_params:
1241             self.verify_tun_66(p, count=127)
1242             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1243             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1244
1245
1246 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1247     """Ipsec GRE TEB ESP - TUN tests"""
1248
1249     tun4_encrypt_node_name = "esp4-encrypt-tun"
1250     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1251     encryption_type = ESP
1252     omac = "00:11:22:33:44:55"
1253
1254     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1255         return [
1256             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1257             / sa.encrypt(
1258                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1259                 / GRE()
1260                 / Ether(dst=self.omac)
1261                 / IP(src="1.1.1.1", dst="1.1.1.2")
1262                 / UDP(sport=1144, dport=2233)
1263                 / Raw(b"X" * payload_size)
1264             )
1265             for i in range(count)
1266         ]
1267
1268     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1269         return [
1270             Ether(dst=self.omac)
1271             / IP(src="1.1.1.1", dst="1.1.1.2")
1272             / UDP(sport=1144, dport=2233)
1273             / Raw(b"X" * payload_size)
1274             for i in range(count)
1275         ]
1276
1277     def verify_decrypted(self, p, rxs):
1278         for rx in rxs:
1279             self.assert_equal(rx[Ether].dst, self.omac)
1280             self.assert_equal(rx[IP].dst, "1.1.1.2")
1281
1282     def verify_encrypted(self, p, sa, rxs):
1283         for rx in rxs:
1284             try:
1285                 pkt = sa.decrypt(rx[IP])
1286                 if not pkt.haslayer(IP):
1287                     pkt = IP(pkt[Raw].load)
1288                 self.assert_packet_checksums_valid(pkt)
1289                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1290                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1291                 self.assertTrue(pkt.haslayer(GRE))
1292                 e = pkt[Ether]
1293                 self.assertEqual(e[Ether].dst, self.omac)
1294                 self.assertEqual(e[IP].dst, "1.1.1.2")
1295             except (IndexError, AssertionError):
1296                 self.logger.debug(ppp("Unexpected packet:", rx))
1297                 try:
1298                     self.logger.debug(ppp("Decrypted packet:", pkt))
1299                 except:
1300                     pass
1301                 raise
1302
1303     def setUp(self):
1304         super(TestIpsecGreTebIfEsp, self).setUp()
1305
1306         self.tun_if = self.pg0
1307
1308         p = self.ipv4_params
1309
1310         bd1 = VppBridgeDomain(self, 1)
1311         bd1.add_vpp_config()
1312
1313         p.tun_sa_out = VppIpsecSA(
1314             self,
1315             p.vpp_tun_sa_id,
1316             p.vpp_tun_spi,
1317             p.auth_algo_vpp_id,
1318             p.auth_key,
1319             p.crypt_algo_vpp_id,
1320             p.crypt_key,
1321             self.vpp_esp_protocol,
1322             self.pg0.local_ip4,
1323             self.pg0.remote_ip4,
1324         )
1325         p.tun_sa_out.add_vpp_config()
1326
1327         p.tun_sa_in = VppIpsecSA(
1328             self,
1329             p.scapy_tun_sa_id,
1330             p.scapy_tun_spi,
1331             p.auth_algo_vpp_id,
1332             p.auth_key,
1333             p.crypt_algo_vpp_id,
1334             p.crypt_key,
1335             self.vpp_esp_protocol,
1336             self.pg0.remote_ip4,
1337             self.pg0.local_ip4,
1338         )
1339         p.tun_sa_in.add_vpp_config()
1340
1341         p.tun_if = VppGreInterface(
1342             self,
1343             self.pg0.local_ip4,
1344             self.pg0.remote_ip4,
1345             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1346         )
1347         p.tun_if.add_vpp_config()
1348
1349         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1350
1351         p.tun_protect.add_vpp_config()
1352
1353         p.tun_if.admin_up()
1354         p.tun_if.config_ip4()
1355         config_tun_params(p, self.encryption_type, p.tun_if)
1356
1357         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1358         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1359
1360         self.vapi.cli("clear ipsec sa")
1361         self.vapi.cli("sh adj")
1362         self.vapi.cli("sh ipsec tun")
1363
1364     def tearDown(self):
1365         p = self.ipv4_params
1366         p.tun_if.unconfig_ip4()
1367         super(TestIpsecGreTebIfEsp, self).tearDown()
1368
1369
1370 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1371     """Ipsec GRE TEB ESP - TUN tests"""
1372
1373     tun4_encrypt_node_name = "esp4-encrypt-tun"
1374     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1375     encryption_type = ESP
1376     omac = "00:11:22:33:44:55"
1377
1378     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1379         return [
1380             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1381             / sa.encrypt(
1382                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1383                 / GRE()
1384                 / Ether(dst=self.omac)
1385                 / IP(src="1.1.1.1", dst="1.1.1.2")
1386                 / UDP(sport=1144, dport=2233)
1387                 / Raw(b"X" * payload_size)
1388             )
1389             for i in range(count)
1390         ]
1391
1392     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1393         return [
1394             Ether(dst=self.omac)
1395             / Dot1Q(vlan=11)
1396             / IP(src="1.1.1.1", dst="1.1.1.2")
1397             / UDP(sport=1144, dport=2233)
1398             / Raw(b"X" * payload_size)
1399             for i in range(count)
1400         ]
1401
1402     def verify_decrypted(self, p, rxs):
1403         for rx in rxs:
1404             self.assert_equal(rx[Ether].dst, self.omac)
1405             self.assert_equal(rx[Dot1Q].vlan, 11)
1406             self.assert_equal(rx[IP].dst, "1.1.1.2")
1407
1408     def verify_encrypted(self, p, sa, rxs):
1409         for rx in rxs:
1410             try:
1411                 pkt = sa.decrypt(rx[IP])
1412                 if not pkt.haslayer(IP):
1413                     pkt = IP(pkt[Raw].load)
1414                 self.assert_packet_checksums_valid(pkt)
1415                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1416                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1417                 self.assertTrue(pkt.haslayer(GRE))
1418                 e = pkt[Ether]
1419                 self.assertEqual(e[Ether].dst, self.omac)
1420                 self.assertFalse(e.haslayer(Dot1Q))
1421                 self.assertEqual(e[IP].dst, "1.1.1.2")
1422             except (IndexError, AssertionError):
1423                 self.logger.debug(ppp("Unexpected packet:", rx))
1424                 try:
1425                     self.logger.debug(ppp("Decrypted packet:", pkt))
1426                 except:
1427                     pass
1428                 raise
1429
1430     def setUp(self):
1431         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1432
1433         self.tun_if = self.pg0
1434
1435         p = self.ipv4_params
1436
1437         bd1 = VppBridgeDomain(self, 1)
1438         bd1.add_vpp_config()
1439
1440         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1441         self.vapi.l2_interface_vlan_tag_rewrite(
1442             sw_if_index=self.pg1_11.sw_if_index,
1443             vtr_op=L2_VTR_OP.L2_POP_1,
1444             push_dot1q=11,
1445         )
1446         self.pg1_11.admin_up()
1447
1448         p.tun_sa_out = VppIpsecSA(
1449             self,
1450             p.vpp_tun_sa_id,
1451             p.vpp_tun_spi,
1452             p.auth_algo_vpp_id,
1453             p.auth_key,
1454             p.crypt_algo_vpp_id,
1455             p.crypt_key,
1456             self.vpp_esp_protocol,
1457             self.pg0.local_ip4,
1458             self.pg0.remote_ip4,
1459         )
1460         p.tun_sa_out.add_vpp_config()
1461
1462         p.tun_sa_in = VppIpsecSA(
1463             self,
1464             p.scapy_tun_sa_id,
1465             p.scapy_tun_spi,
1466             p.auth_algo_vpp_id,
1467             p.auth_key,
1468             p.crypt_algo_vpp_id,
1469             p.crypt_key,
1470             self.vpp_esp_protocol,
1471             self.pg0.remote_ip4,
1472             self.pg0.local_ip4,
1473         )
1474         p.tun_sa_in.add_vpp_config()
1475
1476         p.tun_if = VppGreInterface(
1477             self,
1478             self.pg0.local_ip4,
1479             self.pg0.remote_ip4,
1480             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1481         )
1482         p.tun_if.add_vpp_config()
1483
1484         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1485
1486         p.tun_protect.add_vpp_config()
1487
1488         p.tun_if.admin_up()
1489         p.tun_if.config_ip4()
1490         config_tun_params(p, self.encryption_type, p.tun_if)
1491
1492         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1493         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1494
1495         self.vapi.cli("clear ipsec sa")
1496
1497     def tearDown(self):
1498         p = self.ipv4_params
1499         p.tun_if.unconfig_ip4()
1500         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1501         self.pg1_11.admin_down()
1502         self.pg1_11.remove_vpp_config()
1503
1504
1505 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1506     """Ipsec GRE TEB ESP - Tra tests"""
1507
1508     tun4_encrypt_node_name = "esp4-encrypt-tun"
1509     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1510     encryption_type = ESP
1511     omac = "00:11:22:33:44:55"
1512
1513     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1514         return [
1515             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1516             / sa.encrypt(
1517                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1518                 / GRE()
1519                 / Ether(dst=self.omac)
1520                 / IP(src="1.1.1.1", dst="1.1.1.2")
1521                 / UDP(sport=1144, dport=2233)
1522                 / Raw(b"X" * payload_size)
1523             )
1524             for i in range(count)
1525         ]
1526
1527     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1528         return [
1529             Ether(dst=self.omac)
1530             / IP(src="1.1.1.1", dst="1.1.1.2")
1531             / UDP(sport=1144, dport=2233)
1532             / Raw(b"X" * payload_size)
1533             for i in range(count)
1534         ]
1535
1536     def verify_decrypted(self, p, rxs):
1537         for rx in rxs:
1538             self.assert_equal(rx[Ether].dst, self.omac)
1539             self.assert_equal(rx[IP].dst, "1.1.1.2")
1540
1541     def verify_encrypted(self, p, sa, rxs):
1542         for rx in rxs:
1543             try:
1544                 pkt = sa.decrypt(rx[IP])
1545                 if not pkt.haslayer(IP):
1546                     pkt = IP(pkt[Raw].load)
1547                 self.assert_packet_checksums_valid(pkt)
1548                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1549                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1550                 self.assertTrue(pkt.haslayer(GRE))
1551                 e = pkt[Ether]
1552                 self.assertEqual(e[Ether].dst, self.omac)
1553                 self.assertEqual(e[IP].dst, "1.1.1.2")
1554             except (IndexError, AssertionError):
1555                 self.logger.debug(ppp("Unexpected packet:", rx))
1556                 try:
1557                     self.logger.debug(ppp("Decrypted packet:", pkt))
1558                 except:
1559                     pass
1560                 raise
1561
1562     def setUp(self):
1563         super(TestIpsecGreTebIfEspTra, self).setUp()
1564
1565         self.tun_if = self.pg0
1566
1567         p = self.ipv4_params
1568
1569         bd1 = VppBridgeDomain(self, 1)
1570         bd1.add_vpp_config()
1571
1572         p.tun_sa_out = VppIpsecSA(
1573             self,
1574             p.vpp_tun_sa_id,
1575             p.vpp_tun_spi,
1576             p.auth_algo_vpp_id,
1577             p.auth_key,
1578             p.crypt_algo_vpp_id,
1579             p.crypt_key,
1580             self.vpp_esp_protocol,
1581         )
1582         p.tun_sa_out.add_vpp_config()
1583
1584         p.tun_sa_in = VppIpsecSA(
1585             self,
1586             p.scapy_tun_sa_id,
1587             p.scapy_tun_spi,
1588             p.auth_algo_vpp_id,
1589             p.auth_key,
1590             p.crypt_algo_vpp_id,
1591             p.crypt_key,
1592             self.vpp_esp_protocol,
1593         )
1594         p.tun_sa_in.add_vpp_config()
1595
1596         p.tun_if = VppGreInterface(
1597             self,
1598             self.pg0.local_ip4,
1599             self.pg0.remote_ip4,
1600             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1601         )
1602         p.tun_if.add_vpp_config()
1603
1604         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1605
1606         p.tun_protect.add_vpp_config()
1607
1608         p.tun_if.admin_up()
1609         p.tun_if.config_ip4()
1610         config_tra_params(p, self.encryption_type, p.tun_if)
1611
1612         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1613         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1614
1615         self.vapi.cli("clear ipsec sa")
1616
1617     def tearDown(self):
1618         p = self.ipv4_params
1619         p.tun_if.unconfig_ip4()
1620         super(TestIpsecGreTebIfEspTra, self).tearDown()
1621
1622
1623 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1624     """Ipsec GRE TEB UDP ESP - Tra tests"""
1625
1626     tun4_encrypt_node_name = "esp4-encrypt-tun"
1627     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1628     encryption_type = ESP
1629     omac = "00:11:22:33:44:55"
1630
1631     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1632         return [
1633             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1634             / sa.encrypt(
1635                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1636                 / GRE()
1637                 / Ether(dst=self.omac)
1638                 / IP(src="1.1.1.1", dst="1.1.1.2")
1639                 / UDP(sport=1144, dport=2233)
1640                 / Raw(b"X" * payload_size)
1641             )
1642             for i in range(count)
1643         ]
1644
1645     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1646         return [
1647             Ether(dst=self.omac)
1648             / IP(src="1.1.1.1", dst="1.1.1.2")
1649             / UDP(sport=1144, dport=2233)
1650             / Raw(b"X" * payload_size)
1651             for i in range(count)
1652         ]
1653
1654     def verify_decrypted(self, p, rxs):
1655         for rx in rxs:
1656             self.assert_equal(rx[Ether].dst, self.omac)
1657             self.assert_equal(rx[IP].dst, "1.1.1.2")
1658
1659     def verify_encrypted(self, p, sa, rxs):
1660         for rx in rxs:
1661             self.assertTrue(rx.haslayer(UDP))
1662             self.assertEqual(rx[UDP].dport, 4545)
1663             self.assertEqual(rx[UDP].sport, 5454)
1664             try:
1665                 pkt = sa.decrypt(rx[IP])
1666                 if not pkt.haslayer(IP):
1667                     pkt = IP(pkt[Raw].load)
1668                 self.assert_packet_checksums_valid(pkt)
1669                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1670                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1671                 self.assertTrue(pkt.haslayer(GRE))
1672                 e = pkt[Ether]
1673                 self.assertEqual(e[Ether].dst, self.omac)
1674                 self.assertEqual(e[IP].dst, "1.1.1.2")
1675             except (IndexError, AssertionError):
1676                 self.logger.debug(ppp("Unexpected packet:", rx))
1677                 try:
1678                     self.logger.debug(ppp("Decrypted packet:", pkt))
1679                 except:
1680                     pass
1681                 raise
1682
1683     def setUp(self):
1684         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1685
1686         self.tun_if = self.pg0
1687
1688         p = self.ipv4_params
1689         p = self.ipv4_params
1690         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1691         p.nat_header = UDP(sport=5454, dport=4545)
1692
1693         bd1 = VppBridgeDomain(self, 1)
1694         bd1.add_vpp_config()
1695
1696         p.tun_sa_out = VppIpsecSA(
1697             self,
1698             p.vpp_tun_sa_id,
1699             p.vpp_tun_spi,
1700             p.auth_algo_vpp_id,
1701             p.auth_key,
1702             p.crypt_algo_vpp_id,
1703             p.crypt_key,
1704             self.vpp_esp_protocol,
1705             flags=p.flags,
1706             udp_src=5454,
1707             udp_dst=4545,
1708         )
1709         p.tun_sa_out.add_vpp_config()
1710
1711         p.tun_sa_in = VppIpsecSA(
1712             self,
1713             p.scapy_tun_sa_id,
1714             p.scapy_tun_spi,
1715             p.auth_algo_vpp_id,
1716             p.auth_key,
1717             p.crypt_algo_vpp_id,
1718             p.crypt_key,
1719             self.vpp_esp_protocol,
1720             flags=(
1721                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1722             ),
1723             udp_src=4545,
1724             udp_dst=5454,
1725         )
1726         p.tun_sa_in.add_vpp_config()
1727
1728         p.tun_if = VppGreInterface(
1729             self,
1730             self.pg0.local_ip4,
1731             self.pg0.remote_ip4,
1732             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1733         )
1734         p.tun_if.add_vpp_config()
1735
1736         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1737
1738         p.tun_protect.add_vpp_config()
1739
1740         p.tun_if.admin_up()
1741         p.tun_if.config_ip4()
1742         config_tra_params(p, self.encryption_type, p.tun_if)
1743
1744         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1745         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1746
1747         self.vapi.cli("clear ipsec sa")
1748         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1749
1750     def tearDown(self):
1751         p = self.ipv4_params
1752         p.tun_if.unconfig_ip4()
1753         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1754
1755
1756 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1757     """Ipsec GRE ESP - TUN tests"""
1758
1759     tun4_encrypt_node_name = "esp4-encrypt-tun"
1760     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1761     encryption_type = ESP
1762
1763     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1764         return [
1765             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1766             / sa.encrypt(
1767                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1768                 / GRE()
1769                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1770                 / UDP(sport=1144, dport=2233)
1771                 / Raw(b"X" * payload_size)
1772             )
1773             for i in range(count)
1774         ]
1775
1776     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1777         return [
1778             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1779             / IP(src="1.1.1.1", dst="1.1.1.2")
1780             / UDP(sport=1144, dport=2233)
1781             / Raw(b"X" * payload_size)
1782             for i in range(count)
1783         ]
1784
1785     def verify_decrypted(self, p, rxs):
1786         for rx in rxs:
1787             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1788             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1789
1790     def verify_encrypted(self, p, sa, rxs):
1791         for rx in rxs:
1792             try:
1793                 pkt = sa.decrypt(rx[IP])
1794                 if not pkt.haslayer(IP):
1795                     pkt = IP(pkt[Raw].load)
1796                 self.assert_packet_checksums_valid(pkt)
1797                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1798                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1799                 self.assertTrue(pkt.haslayer(GRE))
1800                 e = pkt[GRE]
1801                 self.assertEqual(e[IP].dst, "1.1.1.2")
1802             except (IndexError, AssertionError):
1803                 self.logger.debug(ppp("Unexpected packet:", rx))
1804                 try:
1805                     self.logger.debug(ppp("Decrypted packet:", pkt))
1806                 except:
1807                     pass
1808                 raise
1809
1810     def setUp(self):
1811         super(TestIpsecGreIfEsp, self).setUp()
1812
1813         self.tun_if = self.pg0
1814
1815         p = self.ipv4_params
1816
1817         bd1 = VppBridgeDomain(self, 1)
1818         bd1.add_vpp_config()
1819
1820         p.tun_sa_out = VppIpsecSA(
1821             self,
1822             p.vpp_tun_sa_id,
1823             p.vpp_tun_spi,
1824             p.auth_algo_vpp_id,
1825             p.auth_key,
1826             p.crypt_algo_vpp_id,
1827             p.crypt_key,
1828             self.vpp_esp_protocol,
1829             self.pg0.local_ip4,
1830             self.pg0.remote_ip4,
1831         )
1832         p.tun_sa_out.add_vpp_config()
1833
1834         p.tun_sa_in = VppIpsecSA(
1835             self,
1836             p.scapy_tun_sa_id,
1837             p.scapy_tun_spi,
1838             p.auth_algo_vpp_id,
1839             p.auth_key,
1840             p.crypt_algo_vpp_id,
1841             p.crypt_key,
1842             self.vpp_esp_protocol,
1843             self.pg0.remote_ip4,
1844             self.pg0.local_ip4,
1845         )
1846         p.tun_sa_in.add_vpp_config()
1847
1848         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1849         p.tun_if.add_vpp_config()
1850
1851         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1852         p.tun_protect.add_vpp_config()
1853
1854         p.tun_if.admin_up()
1855         p.tun_if.config_ip4()
1856         config_tun_params(p, self.encryption_type, p.tun_if)
1857
1858         VppIpRoute(
1859             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1860         ).add_vpp_config()
1861
1862     def tearDown(self):
1863         p = self.ipv4_params
1864         p.tun_if.unconfig_ip4()
1865         super(TestIpsecGreIfEsp, self).tearDown()
1866
1867
1868 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1869     """Ipsec GRE ESP - TRA tests"""
1870
1871     tun4_encrypt_node_name = "esp4-encrypt-tun"
1872     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1873     encryption_type = ESP
1874
1875     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1876         return [
1877             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1878             / sa.encrypt(
1879                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1880                 / GRE()
1881                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1882                 / UDP(sport=1144, dport=2233)
1883                 / Raw(b"X" * payload_size)
1884             )
1885             for i in range(count)
1886         ]
1887
1888     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1889         return [
1890             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1891             / sa.encrypt(
1892                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1893                 / GRE()
1894                 / UDP(sport=1144, dport=2233)
1895                 / Raw(b"X" * payload_size)
1896             )
1897             for i in range(count)
1898         ]
1899
1900     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1901         return [
1902             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1903             / IP(src="1.1.1.1", dst="1.1.1.2")
1904             / UDP(sport=1144, dport=2233)
1905             / Raw(b"X" * payload_size)
1906             for i in range(count)
1907         ]
1908
1909     def verify_decrypted(self, p, rxs):
1910         for rx in rxs:
1911             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1912             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1913
1914     def verify_encrypted(self, p, sa, rxs):
1915         for rx in rxs:
1916             try:
1917                 pkt = sa.decrypt(rx[IP])
1918                 if not pkt.haslayer(IP):
1919                     pkt = IP(pkt[Raw].load)
1920                 self.assert_packet_checksums_valid(pkt)
1921                 self.assertTrue(pkt.haslayer(GRE))
1922                 e = pkt[GRE]
1923                 self.assertEqual(e[IP].dst, "1.1.1.2")
1924             except (IndexError, AssertionError):
1925                 self.logger.debug(ppp("Unexpected packet:", rx))
1926                 try:
1927                     self.logger.debug(ppp("Decrypted packet:", pkt))
1928                 except:
1929                     pass
1930                 raise
1931
1932     def setUp(self):
1933         super(TestIpsecGreIfEspTra, self).setUp()
1934
1935         self.tun_if = self.pg0
1936
1937         p = self.ipv4_params
1938
1939         p.tun_sa_out = VppIpsecSA(
1940             self,
1941             p.vpp_tun_sa_id,
1942             p.vpp_tun_spi,
1943             p.auth_algo_vpp_id,
1944             p.auth_key,
1945             p.crypt_algo_vpp_id,
1946             p.crypt_key,
1947             self.vpp_esp_protocol,
1948         )
1949         p.tun_sa_out.add_vpp_config()
1950
1951         p.tun_sa_in = VppIpsecSA(
1952             self,
1953             p.scapy_tun_sa_id,
1954             p.scapy_tun_spi,
1955             p.auth_algo_vpp_id,
1956             p.auth_key,
1957             p.crypt_algo_vpp_id,
1958             p.crypt_key,
1959             self.vpp_esp_protocol,
1960         )
1961         p.tun_sa_in.add_vpp_config()
1962
1963         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1964         p.tun_if.add_vpp_config()
1965
1966         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1967         p.tun_protect.add_vpp_config()
1968
1969         p.tun_if.admin_up()
1970         p.tun_if.config_ip4()
1971         config_tra_params(p, self.encryption_type, p.tun_if)
1972
1973         VppIpRoute(
1974             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1975         ).add_vpp_config()
1976
1977     def tearDown(self):
1978         p = self.ipv4_params
1979         p.tun_if.unconfig_ip4()
1980         super(TestIpsecGreIfEspTra, self).tearDown()
1981
1982     def test_gre_non_ip(self):
1983         p = self.ipv4_params
1984         tx = self.gen_encrypt_non_ip_pkts(
1985             p.scapy_tun_sa,
1986             self.tun_if,
1987             src=p.remote_tun_if_host,
1988             dst=self.pg1.remote_ip6,
1989         )
1990         self.send_and_assert_no_replies(self.tun_if, tx)
1991         node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1992         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1993         err = p.tun_sa_in.get_err("unsup_payload")
1994         self.assertEqual(err, 1)
1995
1996
1997 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1998     """Ipsec GRE ESP - TRA tests"""
1999
2000     tun6_encrypt_node_name = "esp6-encrypt-tun"
2001     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2002     encryption_type = ESP
2003
2004     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2005         return [
2006             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2007             / sa.encrypt(
2008                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2009                 / GRE()
2010                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2011                 / UDP(sport=1144, dport=2233)
2012                 / Raw(b"X" * payload_size)
2013             )
2014             for i in range(count)
2015         ]
2016
2017     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2018         return [
2019             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2020             / IPv6(src="1::1", dst="1::2")
2021             / UDP(sport=1144, dport=2233)
2022             / Raw(b"X" * payload_size)
2023             for i in range(count)
2024         ]
2025
2026     def verify_decrypted6(self, p, rxs):
2027         for rx in rxs:
2028             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2029             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2030
2031     def verify_encrypted6(self, p, sa, rxs):
2032         for rx in rxs:
2033             try:
2034                 pkt = sa.decrypt(rx[IPv6])
2035                 if not pkt.haslayer(IPv6):
2036                     pkt = IPv6(pkt[Raw].load)
2037                 self.assert_packet_checksums_valid(pkt)
2038                 self.assertTrue(pkt.haslayer(GRE))
2039                 e = pkt[GRE]
2040                 self.assertEqual(e[IPv6].dst, "1::2")
2041             except (IndexError, AssertionError):
2042                 self.logger.debug(ppp("Unexpected packet:", rx))
2043                 try:
2044                     self.logger.debug(ppp("Decrypted packet:", pkt))
2045                 except:
2046                     pass
2047                 raise
2048
2049     def setUp(self):
2050         super(TestIpsecGre6IfEspTra, self).setUp()
2051
2052         self.tun_if = self.pg0
2053
2054         p = self.ipv6_params
2055
2056         bd1 = VppBridgeDomain(self, 1)
2057         bd1.add_vpp_config()
2058
2059         p.tun_sa_out = VppIpsecSA(
2060             self,
2061             p.vpp_tun_sa_id,
2062             p.vpp_tun_spi,
2063             p.auth_algo_vpp_id,
2064             p.auth_key,
2065             p.crypt_algo_vpp_id,
2066             p.crypt_key,
2067             self.vpp_esp_protocol,
2068         )
2069         p.tun_sa_out.add_vpp_config()
2070
2071         p.tun_sa_in = VppIpsecSA(
2072             self,
2073             p.scapy_tun_sa_id,
2074             p.scapy_tun_spi,
2075             p.auth_algo_vpp_id,
2076             p.auth_key,
2077             p.crypt_algo_vpp_id,
2078             p.crypt_key,
2079             self.vpp_esp_protocol,
2080         )
2081         p.tun_sa_in.add_vpp_config()
2082
2083         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2084         p.tun_if.add_vpp_config()
2085
2086         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2087         p.tun_protect.add_vpp_config()
2088
2089         p.tun_if.admin_up()
2090         p.tun_if.config_ip6()
2091         config_tra_params(p, self.encryption_type, p.tun_if)
2092
2093         r = VppIpRoute(
2094             self,
2095             "1::2",
2096             128,
2097             [
2098                 VppRoutePath(
2099                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2100                 )
2101             ],
2102         )
2103         r.add_vpp_config()
2104
2105     def tearDown(self):
2106         p = self.ipv6_params
2107         p.tun_if.unconfig_ip6()
2108         super(TestIpsecGre6IfEspTra, self).tearDown()
2109
2110
2111 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2112     """Ipsec mGRE ESP v4 TRA tests"""
2113
2114     tun4_encrypt_node_name = "esp4-encrypt-tun"
2115     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2116     encryption_type = ESP
2117
2118     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2119         return [
2120             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2121             / sa.encrypt(
2122                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2123                 / GRE()
2124                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2125                 / UDP(sport=1144, dport=2233)
2126                 / Raw(b"X" * payload_size)
2127             )
2128             for i in range(count)
2129         ]
2130
2131     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2132         return [
2133             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2134             / IP(src="1.1.1.1", dst=dst)
2135             / UDP(sport=1144, dport=2233)
2136             / Raw(b"X" * payload_size)
2137             for i in range(count)
2138         ]
2139
2140     def verify_decrypted(self, p, rxs):
2141         for rx in rxs:
2142             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2143             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2144
2145     def verify_encrypted(self, p, sa, rxs):
2146         for rx in rxs:
2147             try:
2148                 pkt = sa.decrypt(rx[IP])
2149                 if not pkt.haslayer(IP):
2150                     pkt = IP(pkt[Raw].load)
2151                 self.assert_packet_checksums_valid(pkt)
2152                 self.assertTrue(pkt.haslayer(GRE))
2153                 e = pkt[GRE]
2154                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2155             except (IndexError, AssertionError):
2156                 self.logger.debug(ppp("Unexpected packet:", rx))
2157                 try:
2158                     self.logger.debug(ppp("Decrypted packet:", pkt))
2159                 except:
2160                     pass
2161                 raise
2162
2163     def setUp(self):
2164         super(TestIpsecMGreIfEspTra4, self).setUp()
2165
2166         N_NHS = 16
2167         self.tun_if = self.pg0
2168         p = self.ipv4_params
2169         p.tun_if = VppGreInterface(
2170             self,
2171             self.pg0.local_ip4,
2172             "0.0.0.0",
2173             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2174         )
2175         p.tun_if.add_vpp_config()
2176         p.tun_if.admin_up()
2177         p.tun_if.config_ip4()
2178         p.tun_if.generate_remote_hosts(N_NHS)
2179         self.pg0.generate_remote_hosts(N_NHS)
2180         self.pg0.configure_ipv4_neighbors()
2181
2182         # setup some SAs for several next-hops on the interface
2183         self.multi_params = []
2184
2185         for ii in range(N_NHS):
2186             p = copy.copy(self.ipv4_params)
2187
2188             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2189             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2190             p.scapy_tun_spi = p.scapy_tun_spi + ii
2191             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2192             p.vpp_tun_spi = p.vpp_tun_spi + ii
2193
2194             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2195             p.scapy_tra_spi = p.scapy_tra_spi + ii
2196             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2197             p.vpp_tra_spi = p.vpp_tra_spi + ii
2198             p.tun_sa_out = VppIpsecSA(
2199                 self,
2200                 p.vpp_tun_sa_id,
2201                 p.vpp_tun_spi,
2202                 p.auth_algo_vpp_id,
2203                 p.auth_key,
2204                 p.crypt_algo_vpp_id,
2205                 p.crypt_key,
2206                 self.vpp_esp_protocol,
2207             )
2208             p.tun_sa_out.add_vpp_config()
2209
2210             p.tun_sa_in = VppIpsecSA(
2211                 self,
2212                 p.scapy_tun_sa_id,
2213                 p.scapy_tun_spi,
2214                 p.auth_algo_vpp_id,
2215                 p.auth_key,
2216                 p.crypt_algo_vpp_id,
2217                 p.crypt_key,
2218                 self.vpp_esp_protocol,
2219             )
2220             p.tun_sa_in.add_vpp_config()
2221
2222             p.tun_protect = VppIpsecTunProtect(
2223                 self,
2224                 p.tun_if,
2225                 p.tun_sa_out,
2226                 [p.tun_sa_in],
2227                 nh=p.tun_if.remote_hosts[ii].ip4,
2228             )
2229             p.tun_protect.add_vpp_config()
2230             config_tra_params(p, self.encryption_type, p.tun_if)
2231             self.multi_params.append(p)
2232
2233             VppIpRoute(
2234                 self,
2235                 p.remote_tun_if_host,
2236                 32,
2237                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2238             ).add_vpp_config()
2239
2240             # in this v4 variant add the teibs after the protect
2241             p.teib = VppTeib(
2242                 self,
2243                 p.tun_if,
2244                 p.tun_if.remote_hosts[ii].ip4,
2245                 self.pg0.remote_hosts[ii].ip4,
2246             ).add_vpp_config()
2247             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2248         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2249
2250     def tearDown(self):
2251         p = self.ipv4_params
2252         p.tun_if.unconfig_ip4()
2253         super(TestIpsecMGreIfEspTra4, self).tearDown()
2254
2255     def test_tun_44(self):
2256         """mGRE IPSEC 44"""
2257         N_PKTS = 63
2258         for p in self.multi_params:
2259             self.verify_tun_44(p, count=N_PKTS)
2260             p.teib.remove_vpp_config()
2261             self.verify_tun_dropped_44(p, count=N_PKTS)
2262             p.teib.add_vpp_config()
2263             self.verify_tun_44(p, count=N_PKTS)
2264
2265
2266 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2267     """Ipsec mGRE ESP v6 TRA tests"""
2268
2269     tun6_encrypt_node_name = "esp6-encrypt-tun"
2270     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2271     encryption_type = ESP
2272
2273     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2274         return [
2275             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2276             / sa.encrypt(
2277                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2278                 / GRE()
2279                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2280                 / UDP(sport=1144, dport=2233)
2281                 / Raw(b"X" * payload_size)
2282             )
2283             for i in range(count)
2284         ]
2285
2286     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2287         return [
2288             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2289             / IPv6(src="1::1", dst=dst)
2290             / UDP(sport=1144, dport=2233)
2291             / Raw(b"X" * payload_size)
2292             for i in range(count)
2293         ]
2294
2295     def verify_decrypted6(self, p, rxs):
2296         for rx in rxs:
2297             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2298             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2299
2300     def verify_encrypted6(self, p, sa, rxs):
2301         for rx in rxs:
2302             try:
2303                 pkt = sa.decrypt(rx[IPv6])
2304                 if not pkt.haslayer(IPv6):
2305                     pkt = IPv6(pkt[Raw].load)
2306                 self.assert_packet_checksums_valid(pkt)
2307                 self.assertTrue(pkt.haslayer(GRE))
2308                 e = pkt[GRE]
2309                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2310             except (IndexError, AssertionError):
2311                 self.logger.debug(ppp("Unexpected packet:", rx))
2312                 try:
2313                     self.logger.debug(ppp("Decrypted packet:", pkt))
2314                 except:
2315                     pass
2316                 raise
2317
2318     def setUp(self):
2319         super(TestIpsecMGreIfEspTra6, self).setUp()
2320
2321         self.vapi.cli("set logging class ipsec level debug")
2322
2323         N_NHS = 16
2324         self.tun_if = self.pg0
2325         p = self.ipv6_params
2326         p.tun_if = VppGreInterface(
2327             self,
2328             self.pg0.local_ip6,
2329             "::",
2330             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2331         )
2332         p.tun_if.add_vpp_config()
2333         p.tun_if.admin_up()
2334         p.tun_if.config_ip6()
2335         p.tun_if.generate_remote_hosts(N_NHS)
2336         self.pg0.generate_remote_hosts(N_NHS)
2337         self.pg0.configure_ipv6_neighbors()
2338
2339         # setup some SAs for several next-hops on the interface
2340         self.multi_params = []
2341
2342         for ii in range(N_NHS):
2343             p = copy.copy(self.ipv6_params)
2344
2345             p.remote_tun_if_host = "1::%d" % (ii + 1)
2346             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2347             p.scapy_tun_spi = p.scapy_tun_spi + ii
2348             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2349             p.vpp_tun_spi = p.vpp_tun_spi + ii
2350
2351             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2352             p.scapy_tra_spi = p.scapy_tra_spi + ii
2353             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2354             p.vpp_tra_spi = p.vpp_tra_spi + ii
2355             p.tun_sa_out = VppIpsecSA(
2356                 self,
2357                 p.vpp_tun_sa_id,
2358                 p.vpp_tun_spi,
2359                 p.auth_algo_vpp_id,
2360                 p.auth_key,
2361                 p.crypt_algo_vpp_id,
2362                 p.crypt_key,
2363                 self.vpp_esp_protocol,
2364             )
2365             p.tun_sa_out.add_vpp_config()
2366
2367             p.tun_sa_in = VppIpsecSA(
2368                 self,
2369                 p.scapy_tun_sa_id,
2370                 p.scapy_tun_spi,
2371                 p.auth_algo_vpp_id,
2372                 p.auth_key,
2373                 p.crypt_algo_vpp_id,
2374                 p.crypt_key,
2375                 self.vpp_esp_protocol,
2376             )
2377             p.tun_sa_in.add_vpp_config()
2378
2379             # in this v6 variant add the teibs first then the protection
2380             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2381             VppTeib(
2382                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2383             ).add_vpp_config()
2384
2385             p.tun_protect = VppIpsecTunProtect(
2386                 self,
2387                 p.tun_if,
2388                 p.tun_sa_out,
2389                 [p.tun_sa_in],
2390                 nh=p.tun_if.remote_hosts[ii].ip6,
2391             )
2392             p.tun_protect.add_vpp_config()
2393             config_tra_params(p, self.encryption_type, p.tun_if)
2394             self.multi_params.append(p)
2395
2396             VppIpRoute(
2397                 self,
2398                 p.remote_tun_if_host,
2399                 128,
2400                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2401             ).add_vpp_config()
2402             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2403
2404         self.logger.info(self.vapi.cli("sh log"))
2405         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2406         self.logger.info(self.vapi.cli("sh adj 41"))
2407
2408     def tearDown(self):
2409         p = self.ipv6_params
2410         p.tun_if.unconfig_ip6()
2411         super(TestIpsecMGreIfEspTra6, self).tearDown()
2412
2413     def test_tun_66(self):
2414         """mGRE IPSec 66"""
2415         for p in self.multi_params:
2416             self.verify_tun_66(p, count=63)
2417
2418
2419 @tag_fixme_vpp_workers
2420 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2421     """IPsec IPv4 Tunnel protect - transport mode"""
2422
2423     def setUp(self):
2424         super(TestIpsec4TunProtect, self).setUp()
2425
2426         self.tun_if = self.pg0
2427
2428     def tearDown(self):
2429         super(TestIpsec4TunProtect, self).tearDown()
2430
2431     def test_tun_44(self):
2432         """IPSEC tunnel protect"""
2433
2434         p = self.ipv4_params
2435
2436         self.config_network(p)
2437         self.config_sa_tra(p)
2438         self.config_protect(p)
2439
2440         self.verify_tun_44(p, count=127)
2441         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2442         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2443
2444         self.vapi.cli("clear ipsec sa")
2445         self.verify_tun_64(p, count=127)
2446         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2447         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2448
2449         # rekey - create new SAs and update the tunnel protection
2450         np = copy.copy(p)
2451         np.crypt_key = b"X" + p.crypt_key[1:]
2452         np.scapy_tun_spi += 100
2453         np.scapy_tun_sa_id += 1
2454         np.vpp_tun_spi += 100
2455         np.vpp_tun_sa_id += 1
2456         np.tun_if.local_spi = p.vpp_tun_spi
2457         np.tun_if.remote_spi = p.scapy_tun_spi
2458
2459         self.config_sa_tra(np)
2460         self.config_protect(np)
2461         self.unconfig_sa(p)
2462
2463         self.verify_tun_44(np, count=127)
2464         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2465         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2466
2467         # teardown
2468         self.unconfig_protect(np)
2469         self.unconfig_sa(np)
2470         self.unconfig_network(p)
2471
2472
2473 @tag_fixme_vpp_workers
2474 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2475     """IPsec IPv4 Tunnel protect - transport mode"""
2476
2477     def setUp(self):
2478         super(TestIpsec4TunProtectUdp, self).setUp()
2479
2480         self.tun_if = self.pg0
2481
2482         p = self.ipv4_params
2483         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2484         p.nat_header = UDP(sport=4500, dport=4500)
2485         self.config_network(p)
2486         self.config_sa_tra(p)
2487         self.config_protect(p)
2488
2489     def tearDown(self):
2490         p = self.ipv4_params
2491         self.unconfig_protect(p)
2492         self.unconfig_sa(p)
2493         self.unconfig_network(p)
2494         super(TestIpsec4TunProtectUdp, self).tearDown()
2495
2496     def verify_encrypted(self, p, sa, rxs):
2497         # ensure encrypted packets are recieved with the default UDP ports
2498         for rx in rxs:
2499             self.assertEqual(rx[UDP].sport, 4500)
2500             self.assertEqual(rx[UDP].dport, 4500)
2501         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2502
2503     def test_tun_44(self):
2504         """IPSEC UDP tunnel protect"""
2505
2506         p = self.ipv4_params
2507
2508         self.verify_tun_44(p, count=127)
2509         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2510         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2511
2512     def test_keepalive(self):
2513         """IPSEC NAT Keepalive"""
2514         self.verify_keepalive(self.ipv4_params)
2515
2516
2517 @tag_fixme_vpp_workers
2518 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2519     """IPsec IPv4 Tunnel protect - tunnel mode"""
2520
2521     encryption_type = ESP
2522     tun4_encrypt_node_name = "esp4-encrypt-tun"
2523     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2524
2525     def setUp(self):
2526         super(TestIpsec4TunProtectTun, self).setUp()
2527
2528         self.tun_if = self.pg0
2529
2530     def tearDown(self):
2531         super(TestIpsec4TunProtectTun, self).tearDown()
2532
2533     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2534         return [
2535             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2536             / sa.encrypt(
2537                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2538                 / IP(src=src, dst=dst)
2539                 / UDP(sport=1144, dport=2233)
2540                 / Raw(b"X" * payload_size)
2541             )
2542             for i in range(count)
2543         ]
2544
2545     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2546         return [
2547             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2548             / IP(src=src, dst=dst)
2549             / UDP(sport=1144, dport=2233)
2550             / Raw(b"X" * payload_size)
2551             for i in range(count)
2552         ]
2553
2554     def verify_decrypted(self, p, rxs):
2555         for rx in rxs:
2556             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2557             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2558             self.assert_packet_checksums_valid(rx)
2559
2560     def verify_encrypted(self, p, sa, rxs):
2561         for rx in rxs:
2562             try:
2563                 pkt = sa.decrypt(rx[IP])
2564                 if not pkt.haslayer(IP):
2565                     pkt = IP(pkt[Raw].load)
2566                 self.assert_packet_checksums_valid(pkt)
2567                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2568                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2569                 inner = pkt[IP].payload
2570                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2571
2572             except (IndexError, AssertionError):
2573                 self.logger.debug(ppp("Unexpected packet:", rx))
2574                 try:
2575                     self.logger.debug(ppp("Decrypted packet:", pkt))
2576                 except:
2577                     pass
2578                 raise
2579
2580     def test_tun_44(self):
2581         """IPSEC tunnel protect"""
2582
2583         p = self.ipv4_params
2584
2585         self.config_network(p)
2586         self.config_sa_tun(p)
2587         self.config_protect(p)
2588
2589         # also add an output features on the tunnel and physical interface
2590         # so we test they still work
2591         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2592         a = VppAcl(self, [r_all]).add_vpp_config()
2593
2594         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2595         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2596
2597         self.verify_tun_44(p, count=127)
2598
2599         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2600         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2601
2602         # rekey - create new SAs and update the tunnel protection
2603         np = copy.copy(p)
2604         np.crypt_key = b"X" + p.crypt_key[1:]
2605         np.scapy_tun_spi += 100
2606         np.scapy_tun_sa_id += 1
2607         np.vpp_tun_spi += 100
2608         np.vpp_tun_sa_id += 1
2609         np.tun_if.local_spi = p.vpp_tun_spi
2610         np.tun_if.remote_spi = p.scapy_tun_spi
2611
2612         self.config_sa_tun(np)
2613         self.config_protect(np)
2614         self.unconfig_sa(p)
2615
2616         self.verify_tun_44(np, count=127)
2617         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2618         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2619
2620         # teardown
2621         self.unconfig_protect(np)
2622         self.unconfig_sa(np)
2623         self.unconfig_network(p)
2624
2625
2626 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2627     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2628
2629     encryption_type = ESP
2630     tun4_encrypt_node_name = "esp4-encrypt-tun"
2631     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2632
2633     def setUp(self):
2634         super(TestIpsec4TunProtectTunDrop, self).setUp()
2635
2636         self.tun_if = self.pg0
2637
2638     def tearDown(self):
2639         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2640
2641     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2642         return [
2643             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2644             / sa.encrypt(
2645                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2646                 / IP(src=src, dst=dst)
2647                 / UDP(sport=1144, dport=2233)
2648                 / Raw(b"X" * payload_size)
2649             )
2650             for i in range(count)
2651         ]
2652
2653     def test_tun_drop_44(self):
2654         """IPSEC tunnel protect bogus tunnel header"""
2655
2656         p = self.ipv4_params
2657
2658         self.config_network(p)
2659         self.config_sa_tun(p)
2660         self.config_protect(p)
2661
2662         tx = self.gen_encrypt_pkts(
2663             p,
2664             p.scapy_tun_sa,
2665             self.tun_if,
2666             src=p.remote_tun_if_host,
2667             dst=self.pg1.remote_ip4,
2668             count=63,
2669         )
2670         self.send_and_assert_no_replies(self.tun_if, tx)
2671
2672         # teardown
2673         self.unconfig_protect(p)
2674         self.unconfig_sa(p)
2675         self.unconfig_network(p)
2676
2677
2678 @tag_fixme_vpp_workers
2679 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2680     """IPsec IPv6 Tunnel protect - transport mode"""
2681
2682     encryption_type = ESP
2683     tun6_encrypt_node_name = "esp6-encrypt-tun"
2684     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2685
2686     def setUp(self):
2687         super(TestIpsec6TunProtect, self).setUp()
2688
2689         self.tun_if = self.pg0
2690
2691     def tearDown(self):
2692         super(TestIpsec6TunProtect, self).tearDown()
2693
2694     def test_tun_66(self):
2695         """IPSEC tunnel protect 6o6"""
2696
2697         p = self.ipv6_params
2698
2699         self.config_network(p)
2700         self.config_sa_tra(p)
2701         self.config_protect(p)
2702
2703         self.verify_tun_66(p, count=127)
2704         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2705         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2706
2707         # rekey - create new SAs and update the tunnel protection
2708         np = copy.copy(p)
2709         np.crypt_key = b"X" + p.crypt_key[1:]
2710         np.scapy_tun_spi += 100
2711         np.scapy_tun_sa_id += 1
2712         np.vpp_tun_spi += 100
2713         np.vpp_tun_sa_id += 1
2714         np.tun_if.local_spi = p.vpp_tun_spi
2715         np.tun_if.remote_spi = p.scapy_tun_spi
2716
2717         self.config_sa_tra(np)
2718         self.config_protect(np)
2719         self.unconfig_sa(p)
2720
2721         self.verify_tun_66(np, count=127)
2722         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2723         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2724
2725         # bounce the interface state
2726         p.tun_if.admin_down()
2727         self.verify_drop_tun_66(np, count=127)
2728         node = "/err/ipsec6-tun-input/disabled"
2729         self.assertEqual(127, self.statistics.get_err_counter(node))
2730         p.tun_if.admin_up()
2731         self.verify_tun_66(np, count=127)
2732
2733         # 3 phase rekey
2734         #  1) add two input SAs [old, new]
2735         #  2) swap output SA to [new]
2736         #  3) use only [new] input SA
2737         np3 = copy.copy(np)
2738         np3.crypt_key = b"Z" + p.crypt_key[1:]
2739         np3.scapy_tun_spi += 100
2740         np3.scapy_tun_sa_id += 1
2741         np3.vpp_tun_spi += 100
2742         np3.vpp_tun_sa_id += 1
2743         np3.tun_if.local_spi = p.vpp_tun_spi
2744         np3.tun_if.remote_spi = p.scapy_tun_spi
2745
2746         self.config_sa_tra(np3)
2747
2748         # step 1;
2749         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2750         self.verify_tun_66(np, np, count=127)
2751         self.verify_tun_66(np3, np, count=127)
2752
2753         # step 2;
2754         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2755         self.verify_tun_66(np, np3, count=127)
2756         self.verify_tun_66(np3, np3, count=127)
2757
2758         # step 1;
2759         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2760         self.verify_tun_66(np3, np3, count=127)
2761         self.verify_drop_tun_rx_66(np, count=127)
2762
2763         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2764         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2765         self.unconfig_sa(np)
2766
2767         # teardown
2768         self.unconfig_protect(np3)
2769         self.unconfig_sa(np3)
2770         self.unconfig_network(p)
2771
2772     def test_tun_46(self):
2773         """IPSEC tunnel protect 4o6"""
2774
2775         p = self.ipv6_params
2776
2777         self.config_network(p)
2778         self.config_sa_tra(p)
2779         self.config_protect(p)
2780
2781         self.verify_tun_46(p, count=127)
2782         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2783         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2784
2785         # teardown
2786         self.unconfig_protect(p)
2787         self.unconfig_sa(p)
2788         self.unconfig_network(p)
2789
2790
2791 @tag_fixme_vpp_workers
2792 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2793     """IPsec IPv6 Tunnel protect - tunnel mode"""
2794
2795     encryption_type = ESP
2796     tun6_encrypt_node_name = "esp6-encrypt-tun"
2797     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2798
2799     def setUp(self):
2800         super(TestIpsec6TunProtectTun, self).setUp()
2801
2802         self.tun_if = self.pg0
2803
2804     def tearDown(self):
2805         super(TestIpsec6TunProtectTun, self).tearDown()
2806
2807     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2808         return [
2809             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2810             / sa.encrypt(
2811                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2812                 / IPv6(src=src, dst=dst)
2813                 / UDP(sport=1166, dport=2233)
2814                 / Raw(b"X" * payload_size)
2815             )
2816             for i in range(count)
2817         ]
2818
2819     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2820         return [
2821             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2822             / IPv6(src=src, dst=dst)
2823             / UDP(sport=1166, dport=2233)
2824             / Raw(b"X" * payload_size)
2825             for i in range(count)
2826         ]
2827
2828     def verify_decrypted6(self, p, rxs):
2829         for rx in rxs:
2830             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2831             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2832             self.assert_packet_checksums_valid(rx)
2833
2834     def verify_encrypted6(self, p, sa, rxs):
2835         for rx in rxs:
2836             try:
2837                 pkt = sa.decrypt(rx[IPv6])
2838                 if not pkt.haslayer(IPv6):
2839                     pkt = IPv6(pkt[Raw].load)
2840                 self.assert_packet_checksums_valid(pkt)
2841                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2842                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2843                 inner = pkt[IPv6].payload
2844                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2845
2846             except (IndexError, AssertionError):
2847                 self.logger.debug(ppp("Unexpected packet:", rx))
2848                 try:
2849                     self.logger.debug(ppp("Decrypted packet:", pkt))
2850                 except:
2851                     pass
2852                 raise
2853
2854     def test_tun_66(self):
2855         """IPSEC tunnel protect"""
2856
2857         p = self.ipv6_params
2858
2859         self.config_network(p)
2860         self.config_sa_tun(p)
2861         self.config_protect(p)
2862
2863         self.verify_tun_66(p, count=127)
2864
2865         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2866         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2867
2868         # rekey - create new SAs and update the tunnel protection
2869         np = copy.copy(p)
2870         np.crypt_key = b"X" + p.crypt_key[1:]
2871         np.scapy_tun_spi += 100
2872         np.scapy_tun_sa_id += 1
2873         np.vpp_tun_spi += 100
2874         np.vpp_tun_sa_id += 1
2875         np.tun_if.local_spi = p.vpp_tun_spi
2876         np.tun_if.remote_spi = p.scapy_tun_spi
2877
2878         self.config_sa_tun(np)
2879         self.config_protect(np)
2880         self.unconfig_sa(p)
2881
2882         self.verify_tun_66(np, count=127)
2883         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2884         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2885
2886         # teardown
2887         self.unconfig_protect(np)
2888         self.unconfig_sa(np)
2889         self.unconfig_network(p)
2890
2891
2892 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2893     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2894
2895     encryption_type = ESP
2896     tun6_encrypt_node_name = "esp6-encrypt-tun"
2897     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2898
2899     def setUp(self):
2900         super(TestIpsec6TunProtectTunDrop, self).setUp()
2901
2902         self.tun_if = self.pg0
2903
2904     def tearDown(self):
2905         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2906
2907     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2908         # the IP destination of the revelaed packet does not match
2909         # that assigned to the tunnel
2910         return [
2911             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2912             / sa.encrypt(
2913                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2914                 / IPv6(src=src, dst=dst)
2915                 / UDP(sport=1144, dport=2233)
2916                 / Raw(b"X" * payload_size)
2917             )
2918             for i in range(count)
2919         ]
2920
2921     def test_tun_drop_66(self):
2922         """IPSEC 6 tunnel protect bogus tunnel header"""
2923
2924         p = self.ipv6_params
2925
2926         self.config_network(p)
2927         self.config_sa_tun(p)
2928         self.config_protect(p)
2929
2930         tx = self.gen_encrypt_pkts6(
2931             p,
2932             p.scapy_tun_sa,
2933             self.tun_if,
2934             src=p.remote_tun_if_host,
2935             dst=self.pg1.remote_ip6,
2936             count=63,
2937         )
2938         self.send_and_assert_no_replies(self.tun_if, tx)
2939
2940         self.unconfig_protect(p)
2941         self.unconfig_sa(p)
2942         self.unconfig_network(p)
2943
2944
2945 class TemplateIpsecItf4(object):
2946     """IPsec Interface IPv4"""
2947
2948     encryption_type = ESP
2949     tun4_encrypt_node_name = "esp4-encrypt-tun"
2950     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2951     tun4_input_node = "ipsec4-tun-input"
2952
2953     def config_sa_tun(self, p, src, dst):
2954         config_tun_params(p, self.encryption_type, None, src, dst)
2955
2956         p.tun_sa_out = VppIpsecSA(
2957             self,
2958             p.vpp_tun_sa_id,
2959             p.vpp_tun_spi,
2960             p.auth_algo_vpp_id,
2961             p.auth_key,
2962             p.crypt_algo_vpp_id,
2963             p.crypt_key,
2964             self.vpp_esp_protocol,
2965             src,
2966             dst,
2967             flags=p.flags,
2968         )
2969         p.tun_sa_out.add_vpp_config()
2970
2971         p.tun_sa_in = VppIpsecSA(
2972             self,
2973             p.scapy_tun_sa_id,
2974             p.scapy_tun_spi,
2975             p.auth_algo_vpp_id,
2976             p.auth_key,
2977             p.crypt_algo_vpp_id,
2978             p.crypt_key,
2979             self.vpp_esp_protocol,
2980             dst,
2981             src,
2982             flags=p.flags
2983             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
2984         )
2985         p.tun_sa_in.add_vpp_config()
2986
2987     def config_protect(self, p):
2988         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2989         p.tun_protect.add_vpp_config()
2990
2991     def config_network(self, p, instance=0xFFFFFFFF):
2992         p.tun_if = VppIpsecInterface(self, instance=instance)
2993
2994         p.tun_if.add_vpp_config()
2995         p.tun_if.admin_up()
2996         p.tun_if.config_ip4()
2997         p.tun_if.config_ip6()
2998
2999         p.route = VppIpRoute(
3000             self,
3001             p.remote_tun_if_host,
3002             32,
3003             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3004         )
3005         p.route.add_vpp_config()
3006         r = VppIpRoute(
3007             self,
3008             p.remote_tun_if_host6,
3009             128,
3010             [
3011                 VppRoutePath(
3012                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3013                 )
3014             ],
3015         )
3016         r.add_vpp_config()
3017
3018     def unconfig_network(self, p):
3019         p.route.remove_vpp_config()
3020         p.tun_if.remove_vpp_config()
3021
3022     def unconfig_protect(self, p):
3023         p.tun_protect.remove_vpp_config()
3024
3025     def unconfig_sa(self, p):
3026         p.tun_sa_out.remove_vpp_config()
3027         p.tun_sa_in.remove_vpp_config()
3028
3029
3030 @tag_fixme_vpp_workers
3031 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3032     """IPsec Interface IPv4"""
3033
3034     def setUp(self):
3035         super(TestIpsecItf4, self).setUp()
3036
3037         self.tun_if = self.pg0
3038
3039     def tearDown(self):
3040         super(TestIpsecItf4, self).tearDown()
3041
3042     def test_tun_instance_44(self):
3043         p = self.ipv4_params
3044         self.config_network(p, instance=3)
3045
3046         with self.assertRaises(CliFailedCommandError):
3047             self.vapi.cli("show interface ipsec0")
3048
3049         output = self.vapi.cli("show interface ipsec3")
3050         self.assertTrue("unknown" not in output)
3051
3052         self.unconfig_network(p)
3053
3054     def test_tun_44(self):
3055         """IPSEC interface IPv4"""
3056
3057         n_pkts = 127
3058         p = self.ipv4_params
3059
3060         self.config_network(p)
3061         config_tun_params(
3062             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3063         )
3064         self.verify_tun_dropped_44(p, count=n_pkts)
3065         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3066         self.config_protect(p)
3067
3068         self.verify_tun_44(p, count=n_pkts)
3069         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3070         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3071
3072         p.tun_if.admin_down()
3073         self.verify_tun_dropped_44(p, count=n_pkts)
3074         p.tun_if.admin_up()
3075         self.verify_tun_44(p, count=n_pkts)
3076
3077         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3078         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3079
3080         # it's a v6 packet when its encrypted
3081         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3082
3083         self.verify_tun_64(p, count=n_pkts)
3084         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3085         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3086
3087         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3088
3089         # update the SA tunnel
3090         config_tun_params(
3091             p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3092         )
3093         p.tun_sa_in.update_vpp_config(
3094             is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3095         )
3096         p.tun_sa_out.update_vpp_config(
3097             is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3098         )
3099         self.verify_tun_44(p, count=n_pkts)
3100         self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3101         self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3102
3103         self.vapi.cli("clear interfaces")
3104
3105         # rekey - create new SAs and update the tunnel protection
3106         np = copy.copy(p)
3107         np.crypt_key = b"X" + p.crypt_key[1:]
3108         np.scapy_tun_spi += 100
3109         np.scapy_tun_sa_id += 1
3110         np.vpp_tun_spi += 100
3111         np.vpp_tun_sa_id += 1
3112         np.tun_if.local_spi = p.vpp_tun_spi
3113         np.tun_if.remote_spi = p.scapy_tun_spi
3114
3115         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3116         self.config_protect(np)
3117         self.unconfig_sa(p)
3118
3119         self.verify_tun_44(np, count=n_pkts)
3120         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3121         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3122
3123         # teardown
3124         self.unconfig_protect(np)
3125         self.unconfig_sa(np)
3126         self.unconfig_network(p)
3127
3128     def test_tun_44_null(self):
3129         """IPSEC interface IPv4 NULL auth/crypto"""
3130
3131         n_pkts = 127
3132         p = copy.copy(self.ipv4_params)
3133
3134         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3135         p.crypt_algo_vpp_id = (
3136             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3137         )
3138         p.crypt_algo = "NULL"
3139         p.auth_algo = "NULL"
3140
3141         self.config_network(p)
3142         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3143         self.config_protect(p)
3144
3145         self.logger.info(self.vapi.cli("sh ipsec sa"))
3146         self.verify_tun_44(p, count=n_pkts)
3147
3148         # teardown
3149         self.unconfig_protect(p)
3150         self.unconfig_sa(p)
3151         self.unconfig_network(p)
3152
3153     def test_tun_44_police(self):
3154         """IPSEC interface IPv4 with input policer"""
3155         n_pkts = 127
3156         p = self.ipv4_params
3157
3158         self.config_network(p)
3159         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3160         self.config_protect(p)
3161
3162         action_tx = PolicerAction(
3163             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3164         )
3165         policer = VppPolicer(
3166             self,
3167             "pol1",
3168             80,
3169             0,
3170             1000,
3171             0,
3172             conform_action=action_tx,
3173             exceed_action=action_tx,
3174             violate_action=action_tx,
3175         )
3176         policer.add_vpp_config()
3177
3178         # Start policing on tun
3179         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3180
3181         self.verify_tun_44(p, count=n_pkts)
3182         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3183         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3184
3185         stats = policer.get_stats()
3186
3187         # Single rate, 2 colour policer - expect conform, violate but no exceed
3188         self.assertGreater(stats["conform_packets"], 0)
3189         self.assertEqual(stats["exceed_packets"], 0)
3190         self.assertGreater(stats["violate_packets"], 0)
3191
3192         # Stop policing on tun
3193         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3194         self.verify_tun_44(p, count=n_pkts)
3195
3196         # No new policer stats
3197         statsnew = policer.get_stats()
3198         self.assertEqual(stats, statsnew)
3199
3200         # teardown
3201         policer.remove_vpp_config()
3202         self.unconfig_protect(p)
3203         self.unconfig_sa(p)
3204         self.unconfig_network(p)
3205
3206
3207 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3208     """IPsec Interface MPLSoIPv4"""
3209
3210     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3211
3212     def setUp(self):
3213         super(TestIpsecItf4MPLS, self).setUp()
3214
3215         self.tun_if = self.pg0
3216
3217     def tearDown(self):
3218         super(TestIpsecItf4MPLS, self).tearDown()
3219
3220     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3221         return [
3222             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3223             / sa.encrypt(
3224                 MPLS(label=44, ttl=3)
3225                 / IP(src=src, dst=dst)
3226                 / UDP(sport=1166, dport=2233)
3227                 / Raw(b"X" * payload_size)
3228             )
3229             for i in range(count)
3230         ]
3231
3232     def verify_encrypted(self, p, sa, rxs):
3233         for rx in rxs:
3234             try:
3235                 pkt = sa.decrypt(rx[IP])
3236                 if not pkt.haslayer(IP):
3237                     pkt = IP(pkt[Raw].load)
3238                 self.assert_packet_checksums_valid(pkt)
3239                 self.assert_equal(pkt[MPLS].label, 44)
3240                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3241             except (IndexError, AssertionError):
3242                 self.logger.debug(ppp("Unexpected packet:", rx))
3243                 try:
3244                     self.logger.debug(ppp("Decrypted packet:", pkt))
3245                 except:
3246                     pass
3247                 raise
3248
3249     def test_tun_mpls_o_ip4(self):
3250         """IPSEC interface MPLS over IPv4"""
3251
3252         n_pkts = 127
3253         p = self.ipv4_params
3254         f = FibPathProto
3255
3256         tbl = VppMplsTable(self, 0)
3257         tbl.add_vpp_config()
3258
3259         self.config_network(p)
3260         # deag MPLS routes from the tunnel
3261         r4 = VppMplsRoute(
3262             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3263         ).add_vpp_config()
3264         p.route.modify(
3265             [
3266                 VppRoutePath(
3267                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3268                 )
3269             ]
3270         )
3271         p.tun_if.enable_mpls()
3272
3273         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3274         self.config_protect(p)
3275
3276         self.verify_tun_44(p, count=n_pkts)
3277
3278         # cleanup
3279         p.tun_if.disable_mpls()
3280         self.unconfig_protect(p)
3281         self.unconfig_sa(p)
3282         self.unconfig_network(p)
3283
3284
3285 class TemplateIpsecItf6(object):
3286     """IPsec Interface IPv6"""
3287
3288     encryption_type = ESP
3289     tun6_encrypt_node_name = "esp6-encrypt-tun"
3290     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3291     tun6_input_node = "ipsec6-tun-input"
3292
3293     def config_sa_tun(self, p, src, dst):
3294         config_tun_params(p, self.encryption_type, None, src, dst)
3295
3296         if not hasattr(p, "tun_flags"):
3297             p.tun_flags = None
3298         if not hasattr(p, "hop_limit"):
3299             p.hop_limit = 255
3300
3301         p.tun_sa_out = VppIpsecSA(
3302             self,
3303             p.vpp_tun_sa_id,
3304             p.vpp_tun_spi,
3305             p.auth_algo_vpp_id,
3306             p.auth_key,
3307             p.crypt_algo_vpp_id,
3308             p.crypt_key,
3309             self.vpp_esp_protocol,
3310             src,
3311             dst,
3312             flags=p.flags,
3313             tun_flags=p.tun_flags,
3314             hop_limit=p.hop_limit,
3315         )
3316         p.tun_sa_out.add_vpp_config()
3317
3318         p.tun_sa_in = VppIpsecSA(
3319             self,
3320             p.scapy_tun_sa_id,
3321             p.scapy_tun_spi,
3322             p.auth_algo_vpp_id,
3323             p.auth_key,
3324             p.crypt_algo_vpp_id,
3325             p.crypt_key,
3326             self.vpp_esp_protocol,
3327             dst,
3328             src,
3329             flags=p.flags,
3330         )
3331         p.tun_sa_in.add_vpp_config()
3332
3333     def config_protect(self, p):
3334         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3335         p.tun_protect.add_vpp_config()
3336
3337     def config_network(self, p):
3338         p.tun_if = VppIpsecInterface(self)
3339
3340         p.tun_if.add_vpp_config()
3341         p.tun_if.admin_up()
3342         p.tun_if.config_ip4()
3343         p.tun_if.config_ip6()
3344
3345         r = VppIpRoute(
3346             self,
3347             p.remote_tun_if_host4,
3348             32,
3349             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3350         )
3351         r.add_vpp_config()
3352
3353         p.route = VppIpRoute(
3354             self,
3355             p.remote_tun_if_host,
3356             128,
3357             [
3358                 VppRoutePath(
3359                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3360                 )
3361             ],
3362         )
3363         p.route.add_vpp_config()
3364
3365     def unconfig_network(self, p):
3366         p.route.remove_vpp_config()
3367         p.tun_if.remove_vpp_config()
3368
3369     def unconfig_protect(self, p):
3370         p.tun_protect.remove_vpp_config()
3371
3372     def unconfig_sa(self, p):
3373         p.tun_sa_out.remove_vpp_config()
3374         p.tun_sa_in.remove_vpp_config()
3375
3376
3377 @tag_fixme_vpp_workers
3378 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3379     """IPsec Interface IPv6"""
3380
3381     def setUp(self):
3382         super(TestIpsecItf6, self).setUp()
3383
3384         self.tun_if = self.pg0
3385
3386     def tearDown(self):
3387         super(TestIpsecItf6, self).tearDown()
3388
3389     def test_tun_66(self):
3390         """IPSEC interface IPv6"""
3391
3392         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3393         n_pkts = 127
3394         p = self.ipv6_params
3395         p.inner_hop_limit = 24
3396         p.outer_hop_limit = 23
3397         p.outer_flow_label = 243224
3398         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3399
3400         self.config_network(p)
3401         config_tun_params(
3402             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3403         )
3404         self.verify_drop_tun_66(p, count=n_pkts)
3405         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3406         self.config_protect(p)
3407
3408         self.verify_tun_66(p, count=n_pkts)
3409         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3410         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3411
3412         p.tun_if.admin_down()
3413         self.verify_drop_tun_66(p, count=n_pkts)
3414         p.tun_if.admin_up()
3415         self.verify_tun_66(p, count=n_pkts)
3416
3417         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3418         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3419
3420         # it's a v4 packet when its encrypted
3421         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3422
3423         self.verify_tun_46(p, count=n_pkts)
3424         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3425         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3426
3427         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3428
3429         self.vapi.cli("clear interfaces")
3430
3431         # rekey - create new SAs and update the tunnel protection
3432         np = copy.copy(p)
3433         np.crypt_key = b"X" + p.crypt_key[1:]
3434         np.scapy_tun_spi += 100
3435         np.scapy_tun_sa_id += 1
3436         np.vpp_tun_spi += 100
3437         np.vpp_tun_sa_id += 1
3438         np.tun_if.local_spi = p.vpp_tun_spi
3439         np.tun_if.remote_spi = p.scapy_tun_spi
3440         np.inner_hop_limit = 24
3441         np.outer_hop_limit = 128
3442         np.inner_flow_label = 0xABCDE
3443         np.outer_flow_label = 0xABCDE
3444         np.hop_limit = 128
3445         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3446
3447         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3448         self.config_protect(np)
3449         self.unconfig_sa(p)
3450
3451         self.verify_tun_66(np, count=n_pkts)
3452         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3453         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3454
3455         # teardown
3456         self.unconfig_protect(np)
3457         self.unconfig_sa(np)
3458         self.unconfig_network(p)
3459
3460     def test_tun_66_police(self):
3461         """IPSEC interface IPv6 with input policer"""
3462         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3463         n_pkts = 127
3464         p = self.ipv6_params
3465         p.inner_hop_limit = 24
3466         p.outer_hop_limit = 23
3467         p.outer_flow_label = 243224
3468         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3469
3470         self.config_network(p)
3471         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3472         self.config_protect(p)
3473
3474         action_tx = PolicerAction(
3475             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3476         )
3477         policer = VppPolicer(
3478             self,
3479             "pol1",
3480             80,
3481             0,
3482             1000,
3483             0,
3484             conform_action=action_tx,
3485             exceed_action=action_tx,
3486             violate_action=action_tx,
3487         )
3488         policer.add_vpp_config()
3489
3490         # Start policing on tun
3491         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3492
3493         self.verify_tun_66(p, count=n_pkts)
3494         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3495         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3496
3497         stats = policer.get_stats()
3498
3499         # Single rate, 2 colour policer - expect conform, violate but no exceed
3500         self.assertGreater(stats["conform_packets"], 0)
3501         self.assertEqual(stats["exceed_packets"], 0)
3502         self.assertGreater(stats["violate_packets"], 0)
3503
3504         # Stop policing on tun
3505         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3506         self.verify_tun_66(p, count=n_pkts)
3507
3508         # No new policer stats
3509         statsnew = policer.get_stats()
3510         self.assertEqual(stats, statsnew)
3511
3512         # teardown
3513         policer.remove_vpp_config()
3514         self.unconfig_protect(p)
3515         self.unconfig_sa(p)
3516         self.unconfig_network(p)
3517
3518
3519 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3520     """Ipsec P2MP ESP v4 tests"""
3521
3522     tun4_encrypt_node_name = "esp4-encrypt-tun"
3523     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3524     encryption_type = ESP
3525
3526     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3527         return [
3528             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3529             / sa.encrypt(
3530                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3531                 / UDP(sport=1144, dport=2233)
3532                 / Raw(b"X" * payload_size)
3533             )
3534             for i in range(count)
3535         ]
3536
3537     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3538         return [
3539             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3540             / IP(src="1.1.1.1", dst=dst)
3541             / UDP(sport=1144, dport=2233)
3542             / Raw(b"X" * payload_size)
3543             for i in range(count)
3544         ]
3545
3546     def verify_decrypted(self, p, rxs):
3547         for rx in rxs:
3548             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3549             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3550
3551     def verify_encrypted(self, p, sa, rxs):
3552         for rx in rxs:
3553             try:
3554                 self.assertEqual(
3555                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3556                 )
3557                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3558                 pkt = sa.decrypt(rx[IP])
3559                 if not pkt.haslayer(IP):
3560                     pkt = IP(pkt[Raw].load)
3561                 self.assert_packet_checksums_valid(pkt)
3562                 e = pkt[IP]
3563                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3564             except (IndexError, AssertionError):
3565                 self.logger.debug(ppp("Unexpected packet:", rx))
3566                 try:
3567                     self.logger.debug(ppp("Decrypted packet:", pkt))
3568                 except:
3569                     pass
3570                 raise
3571
3572     def setUp(self):
3573         super(TestIpsecMIfEsp4, self).setUp()
3574
3575         N_NHS = 16
3576         self.tun_if = self.pg0
3577         p = self.ipv4_params
3578         p.tun_if = VppIpsecInterface(
3579             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3580         )
3581         p.tun_if.add_vpp_config()
3582         p.tun_if.admin_up()
3583         p.tun_if.config_ip4()
3584         p.tun_if.unconfig_ip4()
3585         p.tun_if.config_ip4()
3586         p.tun_if.generate_remote_hosts(N_NHS)
3587         self.pg0.generate_remote_hosts(N_NHS)
3588         self.pg0.configure_ipv4_neighbors()
3589
3590         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3591         a = VppAcl(self, [r_all]).add_vpp_config()
3592
3593         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3594         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3595
3596         # setup some SAs for several next-hops on the interface
3597         self.multi_params = []
3598
3599         for ii in range(N_NHS):
3600             p = copy.copy(self.ipv4_params)
3601
3602             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3603             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3604             p.scapy_tun_spi = p.scapy_tun_spi + ii
3605             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3606             p.vpp_tun_spi = p.vpp_tun_spi + ii
3607
3608             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3609             p.scapy_tra_spi = p.scapy_tra_spi + ii
3610             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3611             p.vpp_tra_spi = p.vpp_tra_spi + ii
3612             p.hop_limit = ii + 10
3613             p.tun_sa_out = VppIpsecSA(
3614                 self,
3615                 p.vpp_tun_sa_id,
3616                 p.vpp_tun_spi,
3617                 p.auth_algo_vpp_id,
3618                 p.auth_key,
3619                 p.crypt_algo_vpp_id,
3620                 p.crypt_key,
3621                 self.vpp_esp_protocol,
3622                 self.pg0.local_ip4,
3623                 self.pg0.remote_hosts[ii].ip4,
3624                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3625                 hop_limit=p.hop_limit,
3626             )
3627             p.tun_sa_out.add_vpp_config()
3628
3629             p.tun_sa_in = VppIpsecSA(
3630                 self,
3631                 p.scapy_tun_sa_id,
3632                 p.scapy_tun_spi,
3633                 p.auth_algo_vpp_id,
3634                 p.auth_key,
3635                 p.crypt_algo_vpp_id,
3636                 p.crypt_key,
3637                 self.vpp_esp_protocol,
3638                 self.pg0.remote_hosts[ii].ip4,
3639                 self.pg0.local_ip4,
3640                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3641                 hop_limit=p.hop_limit,
3642             )
3643             p.tun_sa_in.add_vpp_config()
3644
3645             p.tun_protect = VppIpsecTunProtect(
3646                 self,
3647                 p.tun_if,
3648                 p.tun_sa_out,
3649                 [p.tun_sa_in],
3650                 nh=p.tun_if.remote_hosts[ii].ip4,
3651             )
3652             p.tun_protect.add_vpp_config()
3653             config_tun_params(
3654                 p,
3655                 self.encryption_type,
3656                 None,
3657                 self.pg0.local_ip4,
3658                 self.pg0.remote_hosts[ii].ip4,
3659             )
3660             self.multi_params.append(p)
3661
3662             p.via_tun_route = VppIpRoute(
3663                 self,
3664                 p.remote_tun_if_host,
3665                 32,
3666                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3667             ).add_vpp_config()
3668
3669             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3670
3671     def tearDown(self):
3672         p = self.ipv4_params
3673         p.tun_if.unconfig_ip4()
3674         super(TestIpsecMIfEsp4, self).tearDown()
3675
3676     def test_tun_44(self):
3677         """P2MP IPSEC 44"""
3678         N_PKTS = 63
3679         for p in self.multi_params:
3680             self.verify_tun_44(p, count=N_PKTS)
3681
3682         # remove one tunnel protect, the rest should still work
3683         self.multi_params[0].tun_protect.remove_vpp_config()
3684         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3685         self.multi_params[0].via_tun_route.remove_vpp_config()
3686         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3687
3688         for p in self.multi_params[1:]:
3689             self.verify_tun_44(p, count=N_PKTS)
3690
3691         self.multi_params[0].tun_protect.add_vpp_config()
3692         self.multi_params[0].via_tun_route.add_vpp_config()
3693
3694         for p in self.multi_params:
3695             self.verify_tun_44(p, count=N_PKTS)
3696
3697
3698 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3699     """IPsec Interface MPLSoIPv6"""
3700
3701     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3702
3703     def setUp(self):
3704         super(TestIpsecItf6MPLS, self).setUp()
3705
3706         self.tun_if = self.pg0
3707
3708     def tearDown(self):
3709         super(TestIpsecItf6MPLS, self).tearDown()
3710
3711     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3712         return [
3713             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3714             / sa.encrypt(
3715                 MPLS(label=66, ttl=3)
3716                 / IPv6(src=src, dst=dst)
3717                 / UDP(sport=1166, dport=2233)
3718                 / Raw(b"X" * payload_size)
3719             )
3720             for i in range(count)
3721         ]
3722
3723     def verify_encrypted6(self, p, sa, rxs):
3724         for rx in rxs:
3725             try:
3726                 pkt = sa.decrypt(rx[IPv6])
3727                 if not pkt.haslayer(IPv6):
3728                     pkt = IP(pkt[Raw].load)
3729                 self.assert_packet_checksums_valid(pkt)
3730                 self.assert_equal(pkt[MPLS].label, 66)
3731                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3732             except (IndexError, AssertionError):
3733                 self.logger.debug(ppp("Unexpected packet:", rx))
3734                 try:
3735                     self.logger.debug(ppp("Decrypted packet:", pkt))
3736                 except:
3737                     pass
3738                 raise
3739
3740     def test_tun_mpls_o_ip6(self):
3741         """IPSEC interface MPLS over IPv6"""
3742
3743         n_pkts = 127
3744         p = self.ipv6_params
3745         f = FibPathProto
3746
3747         tbl = VppMplsTable(self, 0)
3748         tbl.add_vpp_config()
3749
3750         self.config_network(p)
3751         # deag MPLS routes from the tunnel
3752         r6 = VppMplsRoute(
3753             self,
3754             66,
3755             1,
3756             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3757             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3758         ).add_vpp_config()
3759         p.route.modify(
3760             [
3761                 VppRoutePath(
3762                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3763                 )
3764             ]
3765         )
3766         p.tun_if.enable_mpls()
3767
3768         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3769         self.config_protect(p)
3770
3771         self.verify_tun_66(p, count=n_pkts)
3772
3773         # cleanup
3774         p.tun_if.disable_mpls()
3775         self.unconfig_protect(p)
3776         self.unconfig_sa(p)
3777         self.unconfig_network(p)
3778
3779
3780 if __name__ == "__main__":
3781     unittest.main(testRunner=VppTestRunner)