tests: skip tests failing on ubuntu 22.04
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204
12 from framework import VppTestRunner
13 from template_ipsec import (
14     TemplateIpsec,
15     IpsecTun4Tests,
16     IpsecTun6Tests,
17     IpsecTun4,
18     IpsecTun6,
19     IpsecTcpTests,
20     mk_scapy_crypt_key,
21     IpsecTun6HandoffTests,
22     IpsecTun4HandoffTests,
23     config_tun_params,
24 )
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
28     VppIpRoute,
29     VppRoutePath,
30     DpoProto,
31     VppMplsLabel,
32     VppMplsTable,
33     VppMplsRoute,
34     FibPathProto,
35 )
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
40 from util import ppp
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
45
46
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
49     esn_en = bool(
50         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
51     )
52     crypt_key = mk_scapy_crypt_key(p)
53     if tun_if:
54         p.tun_dst = tun_if.remote_ip
55         p.tun_src = tun_if.local_ip
56     else:
57         p.tun_dst = dst
58         p.tun_src = src
59
60     if p.nat_header:
61         is_default_port = p.nat_header.dport == 4500
62     else:
63         is_default_port = True
64
65     if is_default_port:
66         outbound_nat_header = p.nat_header
67     else:
68         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69         bind_layers(UDP, ESP, dport=p.nat_header.dport)
70
71     p.scapy_tun_sa = SecurityAssociation(
72         encryption_type,
73         spi=p.vpp_tun_spi,
74         crypt_algo=p.crypt_algo,
75         crypt_key=crypt_key,
76         auth_algo=p.auth_algo,
77         auth_key=p.auth_key,
78         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79         nat_t_header=outbound_nat_header,
80         esn_en=esn_en,
81     )
82     p.vpp_tun_sa = SecurityAssociation(
83         encryption_type,
84         spi=p.scapy_tun_spi,
85         crypt_algo=p.crypt_algo,
86         crypt_key=crypt_key,
87         auth_algo=p.auth_algo,
88         auth_key=p.auth_key,
89         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90         nat_t_header=p.nat_header,
91         esn_en=esn_en,
92     )
93
94
95 def config_tra_params(p, encryption_type, tun_if):
96     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97     esn_en = bool(
98         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
99     )
100     crypt_key = mk_scapy_crypt_key(p)
101     p.tun_dst = tun_if.remote_ip
102     p.tun_src = tun_if.local_ip
103
104     if p.nat_header:
105         is_default_port = p.nat_header.dport == 4500
106     else:
107         is_default_port = True
108
109     if is_default_port:
110         outbound_nat_header = p.nat_header
111     else:
112         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113         bind_layers(UDP, ESP, dport=p.nat_header.dport)
114
115     p.scapy_tun_sa = SecurityAssociation(
116         encryption_type,
117         spi=p.vpp_tun_spi,
118         crypt_algo=p.crypt_algo,
119         crypt_key=crypt_key,
120         auth_algo=p.auth_algo,
121         auth_key=p.auth_key,
122         esn_en=esn_en,
123         nat_t_header=outbound_nat_header,
124     )
125     p.vpp_tun_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.scapy_tun_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         esn_en=esn_en,
133         nat_t_header=p.nat_header,
134     )
135
136
137 class TemplateIpsec4TunProtect(object):
138     """IPsec IPv4 Tunnel protect"""
139
140     encryption_type = ESP
141     tun4_encrypt_node_name = "esp4-encrypt-tun"
142     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143     tun4_input_node = "ipsec4-tun-input"
144
145     def config_sa_tra(self, p):
146         config_tun_params(p, self.encryption_type, p.tun_if)
147
148         p.tun_sa_out = VppIpsecSA(
149             self,
150             p.scapy_tun_sa_id,
151             p.scapy_tun_spi,
152             p.auth_algo_vpp_id,
153             p.auth_key,
154             p.crypt_algo_vpp_id,
155             p.crypt_key,
156             self.vpp_esp_protocol,
157             flags=p.flags,
158         )
159         p.tun_sa_out.add_vpp_config()
160
161         p.tun_sa_in = VppIpsecSA(
162             self,
163             p.vpp_tun_sa_id,
164             p.vpp_tun_spi,
165             p.auth_algo_vpp_id,
166             p.auth_key,
167             p.crypt_algo_vpp_id,
168             p.crypt_key,
169             self.vpp_esp_protocol,
170             flags=p.flags,
171         )
172         p.tun_sa_in.add_vpp_config()
173
174     def config_sa_tun(self, p):
175         config_tun_params(p, self.encryption_type, p.tun_if)
176
177         p.tun_sa_out = VppIpsecSA(
178             self,
179             p.scapy_tun_sa_id,
180             p.scapy_tun_spi,
181             p.auth_algo_vpp_id,
182             p.auth_key,
183             p.crypt_algo_vpp_id,
184             p.crypt_key,
185             self.vpp_esp_protocol,
186             self.tun_if.local_addr[p.addr_type],
187             self.tun_if.remote_addr[p.addr_type],
188             flags=p.flags,
189         )
190         p.tun_sa_out.add_vpp_config()
191
192         p.tun_sa_in = VppIpsecSA(
193             self,
194             p.vpp_tun_sa_id,
195             p.vpp_tun_spi,
196             p.auth_algo_vpp_id,
197             p.auth_key,
198             p.crypt_algo_vpp_id,
199             p.crypt_key,
200             self.vpp_esp_protocol,
201             self.tun_if.remote_addr[p.addr_type],
202             self.tun_if.local_addr[p.addr_type],
203             flags=p.flags,
204         )
205         p.tun_sa_in.add_vpp_config()
206
207     def config_protect(self, p):
208         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209         p.tun_protect.add_vpp_config()
210
211     def config_network(self, p):
212         if hasattr(p, "tun_dst"):
213             tun_dst = p.tun_dst
214         else:
215             tun_dst = self.pg0.remote_ip4
216         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217         p.tun_if.add_vpp_config()
218         p.tun_if.admin_up()
219         p.tun_if.config_ip4()
220         p.tun_if.config_ip6()
221
222         p.route = VppIpRoute(
223             self,
224             p.remote_tun_if_host,
225             32,
226             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
227         )
228         p.route.add_vpp_config()
229         r = VppIpRoute(
230             self,
231             p.remote_tun_if_host6,
232             128,
233             [
234                 VppRoutePath(
235                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
236                 )
237             ],
238         )
239         r.add_vpp_config()
240
241     def unconfig_network(self, p):
242         p.route.remove_vpp_config()
243         p.tun_if.remove_vpp_config()
244
245     def unconfig_protect(self, p):
246         p.tun_protect.remove_vpp_config()
247
248     def unconfig_sa(self, p):
249         p.tun_sa_out.remove_vpp_config()
250         p.tun_sa_in.remove_vpp_config()
251
252
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254     """IPsec tunnel interface tests"""
255
256     encryption_type = ESP
257
258     @classmethod
259     def setUpClass(cls):
260         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
265
266     def setUp(self):
267         super(TemplateIpsec4TunIfEsp, self).setUp()
268
269         self.tun_if = self.pg0
270
271         p = self.ipv4_params
272
273         self.config_network(p)
274         self.config_sa_tra(p)
275         self.config_protect(p)
276
277     def tearDown(self):
278         super(TemplateIpsec4TunIfEsp, self).tearDown()
279
280
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282     """IPsec UDP tunnel interface tests"""
283
284     tun4_encrypt_node_name = "esp4-encrypt-tun"
285     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286     encryption_type = ESP
287
288     @classmethod
289     def setUpClass(cls):
290         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
291
292     @classmethod
293     def tearDownClass(cls):
294         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
295
296     def verify_encrypted(self, p, sa, rxs):
297         for rx in rxs:
298             try:
299                 # ensure the UDP ports are correct before we decrypt
300                 # which strips them
301                 self.assertTrue(rx.haslayer(UDP))
302                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303                 self.assert_equal(rx[UDP].dport, 4500)
304
305                 pkt = sa.decrypt(rx[IP])
306                 if not pkt.haslayer(IP):
307                     pkt = IP(pkt[Raw].load)
308
309                 self.assert_packet_checksums_valid(pkt)
310                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312             except (IndexError, AssertionError):
313                 self.logger.debug(ppp("Unexpected packet:", rx))
314                 try:
315                     self.logger.debug(ppp("Decrypted packet:", pkt))
316                 except:
317                     pass
318                 raise
319
320     def config_sa_tra(self, p):
321         config_tun_params(p, self.encryption_type, p.tun_if)
322
323         p.tun_sa_out = VppIpsecSA(
324             self,
325             p.scapy_tun_sa_id,
326             p.scapy_tun_spi,
327             p.auth_algo_vpp_id,
328             p.auth_key,
329             p.crypt_algo_vpp_id,
330             p.crypt_key,
331             self.vpp_esp_protocol,
332             flags=p.flags,
333             udp_src=p.nat_header.sport,
334             udp_dst=p.nat_header.dport,
335         )
336         p.tun_sa_out.add_vpp_config()
337
338         p.tun_sa_in = VppIpsecSA(
339             self,
340             p.vpp_tun_sa_id,
341             p.vpp_tun_spi,
342             p.auth_algo_vpp_id,
343             p.auth_key,
344             p.crypt_algo_vpp_id,
345             p.crypt_key,
346             self.vpp_esp_protocol,
347             flags=p.flags,
348             udp_src=p.nat_header.sport,
349             udp_dst=p.nat_header.dport,
350         )
351         p.tun_sa_in.add_vpp_config()
352
353     def setUp(self):
354         super(TemplateIpsec4TunIfEspUdp, self).setUp()
355
356         p = self.ipv4_params
357         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358         p.nat_header = UDP(sport=5454, dport=4500)
359
360         self.tun_if = self.pg0
361
362         self.config_network(p)
363         self.config_sa_tra(p)
364         self.config_protect(p)
365
366     def tearDown(self):
367         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
368
369
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371     """Ipsec ESP - TUN tests"""
372
373     tun4_encrypt_node_name = "esp4-encrypt-tun"
374     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
375
376     def test_tun_basic64(self):
377         """ipsec 6o4 tunnel basic test"""
378         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
379
380         self.verify_tun_64(self.params[socket.AF_INET], count=1)
381
382     def test_tun_burst64(self):
383         """ipsec 6o4 tunnel basic test"""
384         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
385
386         self.verify_tun_64(self.params[socket.AF_INET], count=257)
387
388     def test_tun_basic_frag44(self):
389         """ipsec 4o4 tunnel frag basic test"""
390         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
391
392         p = self.ipv4_params
393
394         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
395         self.verify_tun_44(
396             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
397         )
398         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
399
400
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402     """Ipsec ESP UDP tests"""
403
404     tun4_input_node = "ipsec4-tun-input"
405
406     def setUp(self):
407         super(TestIpsec4TunIfEspUdp, self).setUp()
408
409     def test_keepalive(self):
410         """IPSEC NAT Keepalive"""
411         self.verify_keepalive(self.ipv4_params)
412
413
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415     """Ipsec ESP UDP GCM tests"""
416
417     tun4_input_node = "ipsec4-tun-input"
418
419     def setUp(self):
420         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
421         p = self.ipv4_params
422         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423         p.crypt_algo_vpp_id = (
424             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
425         )
426         p.crypt_algo = "AES-GCM"
427         p.auth_algo = "NULL"
428         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
429         p.salt = 0
430
431
432 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
433     """Ipsec ESP - TCP tests"""
434
435     pass
436
437
438 class TemplateIpsec6TunProtect(object):
439     """IPsec IPv6 Tunnel protect"""
440
441     def config_sa_tra(self, p):
442         config_tun_params(p, self.encryption_type, p.tun_if)
443
444         p.tun_sa_out = VppIpsecSA(
445             self,
446             p.scapy_tun_sa_id,
447             p.scapy_tun_spi,
448             p.auth_algo_vpp_id,
449             p.auth_key,
450             p.crypt_algo_vpp_id,
451             p.crypt_key,
452             self.vpp_esp_protocol,
453         )
454         p.tun_sa_out.add_vpp_config()
455
456         p.tun_sa_in = VppIpsecSA(
457             self,
458             p.vpp_tun_sa_id,
459             p.vpp_tun_spi,
460             p.auth_algo_vpp_id,
461             p.auth_key,
462             p.crypt_algo_vpp_id,
463             p.crypt_key,
464             self.vpp_esp_protocol,
465         )
466         p.tun_sa_in.add_vpp_config()
467
468     def config_sa_tun(self, p):
469         config_tun_params(p, self.encryption_type, p.tun_if)
470
471         p.tun_sa_out = VppIpsecSA(
472             self,
473             p.scapy_tun_sa_id,
474             p.scapy_tun_spi,
475             p.auth_algo_vpp_id,
476             p.auth_key,
477             p.crypt_algo_vpp_id,
478             p.crypt_key,
479             self.vpp_esp_protocol,
480             self.tun_if.local_addr[p.addr_type],
481             self.tun_if.remote_addr[p.addr_type],
482         )
483         p.tun_sa_out.add_vpp_config()
484
485         p.tun_sa_in = VppIpsecSA(
486             self,
487             p.vpp_tun_sa_id,
488             p.vpp_tun_spi,
489             p.auth_algo_vpp_id,
490             p.auth_key,
491             p.crypt_algo_vpp_id,
492             p.crypt_key,
493             self.vpp_esp_protocol,
494             self.tun_if.remote_addr[p.addr_type],
495             self.tun_if.local_addr[p.addr_type],
496         )
497         p.tun_sa_in.add_vpp_config()
498
499     def config_protect(self, p):
500         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
501         p.tun_protect.add_vpp_config()
502
503     def config_network(self, p):
504         if hasattr(p, "tun_dst"):
505             tun_dst = p.tun_dst
506         else:
507             tun_dst = self.pg0.remote_ip6
508         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
509         p.tun_if.add_vpp_config()
510         p.tun_if.admin_up()
511         p.tun_if.config_ip6()
512         p.tun_if.config_ip4()
513
514         p.route = VppIpRoute(
515             self,
516             p.remote_tun_if_host,
517             128,
518             [
519                 VppRoutePath(
520                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
521                 )
522             ],
523         )
524         p.route.add_vpp_config()
525         r = VppIpRoute(
526             self,
527             p.remote_tun_if_host4,
528             32,
529             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
530         )
531         r.add_vpp_config()
532
533     def unconfig_network(self, p):
534         p.route.remove_vpp_config()
535         p.tun_if.remove_vpp_config()
536
537     def unconfig_protect(self, p):
538         p.tun_protect.remove_vpp_config()
539
540     def unconfig_sa(self, p):
541         p.tun_sa_out.remove_vpp_config()
542         p.tun_sa_in.remove_vpp_config()
543
544
545 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
546     """IPsec tunnel interface tests"""
547
548     encryption_type = ESP
549
550     def setUp(self):
551         super(TemplateIpsec6TunIfEsp, self).setUp()
552
553         self.tun_if = self.pg0
554
555         p = self.ipv6_params
556         self.config_network(p)
557         self.config_sa_tra(p)
558         self.config_protect(p)
559
560     def tearDown(self):
561         super(TemplateIpsec6TunIfEsp, self).tearDown()
562
563
564 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
565     """IPsec6 UDP tunnel interface tests"""
566
567     tun4_encrypt_node_name = "esp6-encrypt-tun"
568     tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
569     encryption_type = ESP
570
571     @classmethod
572     def setUpClass(cls):
573         super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
574
575     @classmethod
576     def tearDownClass(cls):
577         super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
578
579     def verify_encrypted(self, p, sa, rxs):
580         for rx in rxs:
581             try:
582                 # ensure the UDP ports are correct before we decrypt
583                 # which strips them
584                 self.assertTrue(rx.haslayer(UDP))
585                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
586                 self.assert_equal(rx[UDP].dport, 4500)
587
588                 pkt = sa.decrypt(rx[IP])
589                 if not pkt.haslayer(IP):
590                     pkt = IP(pkt[Raw].load)
591
592                 self.assert_packet_checksums_valid(pkt)
593                 self.assert_equal(
594                     pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
595                 )
596                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
597             except (IndexError, AssertionError):
598                 self.logger.debug(ppp("Unexpected packet:", rx))
599                 try:
600                     self.logger.debug(ppp("Decrypted packet:", pkt))
601                 except:
602                     pass
603                 raise
604
605     def config_sa_tra(self, p):
606         config_tun_params(p, self.encryption_type, p.tun_if)
607
608         p.tun_sa_out = VppIpsecSA(
609             self,
610             p.scapy_tun_sa_id,
611             p.scapy_tun_spi,
612             p.auth_algo_vpp_id,
613             p.auth_key,
614             p.crypt_algo_vpp_id,
615             p.crypt_key,
616             self.vpp_esp_protocol,
617             flags=p.flags,
618             udp_src=p.nat_header.sport,
619             udp_dst=p.nat_header.dport,
620         )
621         p.tun_sa_out.add_vpp_config()
622
623         p.tun_sa_in = VppIpsecSA(
624             self,
625             p.vpp_tun_sa_id,
626             p.vpp_tun_spi,
627             p.auth_algo_vpp_id,
628             p.auth_key,
629             p.crypt_algo_vpp_id,
630             p.crypt_key,
631             self.vpp_esp_protocol,
632             flags=p.flags,
633             udp_src=p.nat_header.sport,
634             udp_dst=p.nat_header.dport,
635         )
636         p.tun_sa_in.add_vpp_config()
637
638     def setUp(self):
639         super(TemplateIpsec6TunIfEspUdp, self).setUp()
640
641         p = self.ipv6_params
642         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
643         p.nat_header = UDP(sport=5454, dport=4500)
644
645         self.tun_if = self.pg0
646
647         self.config_network(p)
648         self.config_sa_tra(p)
649         self.config_protect(p)
650
651     def tearDown(self):
652         super(TemplateIpsec6TunIfEspUdp, self).tearDown()
653
654
655 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
656     """Ipsec ESP 6 UDP tests"""
657
658     tun6_input_node = "ipsec6-tun-input"
659     tun6_encrypt_node_name = "esp6-encrypt-tun"
660     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
661
662     def setUp(self):
663         super(TestIpsec6TunIfEspUdp, self).setUp()
664
665     def test_keepalive(self):
666         """IPSEC6 NAT Keepalive"""
667         self.verify_keepalive(self.ipv6_params)
668
669
670 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
671     """Ipsec ESP 6 UDP GCM tests"""
672
673     tun6_input_node = "ipsec6-tun-input"
674     tun6_encrypt_node_name = "esp6-encrypt-tun"
675     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
676
677     def setUp(self):
678         super(TestIpsec6TunIfEspUdpGCM, self).setUp()
679         p = self.ipv6_params
680         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
681         p.crypt_algo_vpp_id = (
682             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
683         )
684         p.crypt_algo = "AES-GCM"
685         p.auth_algo = "NULL"
686         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
687         p.salt = 0
688
689
690 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
691     """Ipsec ESP - TUN tests"""
692
693     tun6_encrypt_node_name = "esp6-encrypt-tun"
694     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
695
696     def test_tun_basic46(self):
697         """ipsec 4o6 tunnel basic test"""
698         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
699         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
700
701     def test_tun_burst46(self):
702         """ipsec 4o6 tunnel burst test"""
703         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
704         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
705
706
707 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
708     """Ipsec ESP 6 Handoff tests"""
709
710     tun6_encrypt_node_name = "esp6-encrypt-tun"
711     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
712
713     def test_tun_handoff_66_police(self):
714         """ESP 6o6 tunnel with policer worker hand-off test"""
715         self.vapi.cli("clear errors")
716         self.vapi.cli("clear ipsec sa")
717
718         N_PKTS = 15
719         p = self.params[socket.AF_INET6]
720
721         action_tx = PolicerAction(
722             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
723         )
724         policer = VppPolicer(
725             self,
726             "pol1",
727             80,
728             0,
729             1000,
730             0,
731             conform_action=action_tx,
732             exceed_action=action_tx,
733             violate_action=action_tx,
734         )
735         policer.add_vpp_config()
736
737         # Start policing on tun
738         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
739
740         for pol_bind in [1, 0]:
741             policer.bind_vpp_config(pol_bind, True)
742
743             # inject alternately on worker 0 and 1.
744             for worker in [0, 1, 0, 1]:
745                 send_pkts = self.gen_encrypt_pkts6(
746                     p,
747                     p.scapy_tun_sa,
748                     self.tun_if,
749                     src=p.remote_tun_if_host,
750                     dst=self.pg1.remote_ip6,
751                     count=N_PKTS,
752                 )
753                 recv_pkts = self.send_and_expect(
754                     self.tun_if, send_pkts, self.pg1, worker=worker
755                 )
756                 self.verify_decrypted6(p, recv_pkts)
757                 self.logger.debug(self.vapi.cli("show trace max 100"))
758
759             stats = policer.get_stats()
760             stats0 = policer.get_stats(worker=0)
761             stats1 = policer.get_stats(worker=1)
762
763             if pol_bind == 1:
764                 # First pass: Worker 1, should have done all the policing
765                 self.assertEqual(stats, stats1)
766
767                 # Worker 0, should have handed everything off
768                 self.assertEqual(stats0["conform_packets"], 0)
769                 self.assertEqual(stats0["exceed_packets"], 0)
770                 self.assertEqual(stats0["violate_packets"], 0)
771             else:
772                 # Second pass: both workers should have policed equal amounts
773                 self.assertGreater(stats1["conform_packets"], 0)
774                 self.assertEqual(stats1["exceed_packets"], 0)
775                 self.assertGreater(stats1["violate_packets"], 0)
776
777                 self.assertGreater(stats0["conform_packets"], 0)
778                 self.assertEqual(stats0["exceed_packets"], 0)
779                 self.assertGreater(stats0["violate_packets"], 0)
780
781                 self.assertEqual(
782                     stats0["conform_packets"] + stats0["violate_packets"],
783                     stats1["conform_packets"] + stats1["violate_packets"],
784                 )
785
786         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
787         policer.remove_vpp_config()
788
789
790 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
791     """Ipsec ESP 4 Handoff tests"""
792
793     tun4_encrypt_node_name = "esp4-encrypt-tun"
794     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
795
796     def test_tun_handoff_44_police(self):
797         """ESP 4o4 tunnel with policer worker hand-off test"""
798         self.vapi.cli("clear errors")
799         self.vapi.cli("clear ipsec sa")
800
801         N_PKTS = 15
802         p = self.params[socket.AF_INET]
803
804         action_tx = PolicerAction(
805             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
806         )
807         policer = VppPolicer(
808             self,
809             "pol1",
810             80,
811             0,
812             1000,
813             0,
814             conform_action=action_tx,
815             exceed_action=action_tx,
816             violate_action=action_tx,
817         )
818         policer.add_vpp_config()
819
820         # Start policing on tun
821         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
822
823         for pol_bind in [1, 0]:
824             policer.bind_vpp_config(pol_bind, True)
825
826             # inject alternately on worker 0 and 1.
827             for worker in [0, 1, 0, 1]:
828                 send_pkts = self.gen_encrypt_pkts(
829                     p,
830                     p.scapy_tun_sa,
831                     self.tun_if,
832                     src=p.remote_tun_if_host,
833                     dst=self.pg1.remote_ip4,
834                     count=N_PKTS,
835                 )
836                 recv_pkts = self.send_and_expect(
837                     self.tun_if, send_pkts, self.pg1, worker=worker
838                 )
839                 self.verify_decrypted(p, recv_pkts)
840                 self.logger.debug(self.vapi.cli("show trace max 100"))
841
842             stats = policer.get_stats()
843             stats0 = policer.get_stats(worker=0)
844             stats1 = policer.get_stats(worker=1)
845
846             if pol_bind == 1:
847                 # First pass: Worker 1, should have done all the policing
848                 self.assertEqual(stats, stats1)
849
850                 # Worker 0, should have handed everything off
851                 self.assertEqual(stats0["conform_packets"], 0)
852                 self.assertEqual(stats0["exceed_packets"], 0)
853                 self.assertEqual(stats0["violate_packets"], 0)
854             else:
855                 # Second pass: both workers should have policed equal amounts
856                 self.assertGreater(stats1["conform_packets"], 0)
857                 self.assertEqual(stats1["exceed_packets"], 0)
858                 self.assertGreater(stats1["violate_packets"], 0)
859
860                 self.assertGreater(stats0["conform_packets"], 0)
861                 self.assertEqual(stats0["exceed_packets"], 0)
862                 self.assertGreater(stats0["violate_packets"], 0)
863
864                 self.assertEqual(
865                     stats0["conform_packets"] + stats0["violate_packets"],
866                     stats1["conform_packets"] + stats1["violate_packets"],
867                 )
868
869         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
870         policer.remove_vpp_config()
871
872
873 @tag_fixme_vpp_workers
874 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
875     """IPsec IPv4 Multi Tunnel interface"""
876
877     encryption_type = ESP
878     tun4_encrypt_node_name = "esp4-encrypt-tun"
879     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
880
881     def setUp(self):
882         super(TestIpsec4MultiTunIfEsp, self).setUp()
883
884         self.tun_if = self.pg0
885
886         self.multi_params = []
887         self.pg0.generate_remote_hosts(10)
888         self.pg0.configure_ipv4_neighbors()
889
890         for ii in range(10):
891             p = copy.copy(self.ipv4_params)
892
893             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
894             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
895             p.scapy_tun_spi = p.scapy_tun_spi + ii
896             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
897             p.vpp_tun_spi = p.vpp_tun_spi + ii
898
899             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
900             p.scapy_tra_spi = p.scapy_tra_spi + ii
901             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
902             p.vpp_tra_spi = p.vpp_tra_spi + ii
903             p.tun_dst = self.pg0.remote_hosts[ii].ip4
904
905             self.multi_params.append(p)
906             self.config_network(p)
907             self.config_sa_tra(p)
908             self.config_protect(p)
909
910     def tearDown(self):
911         super(TestIpsec4MultiTunIfEsp, self).tearDown()
912
913     def test_tun_44(self):
914         """Multiple IPSEC tunnel interfaces"""
915         for p in self.multi_params:
916             self.verify_tun_44(p, count=127)
917             self.assertEqual(p.tun_if.get_rx_stats(), 127)
918             self.assertEqual(p.tun_if.get_tx_stats(), 127)
919
920     def test_tun_rr_44(self):
921         """Round-robin packets acrros multiple interface"""
922         tx = []
923         for p in self.multi_params:
924             tx = tx + self.gen_encrypt_pkts(
925                 p,
926                 p.scapy_tun_sa,
927                 self.tun_if,
928                 src=p.remote_tun_if_host,
929                 dst=self.pg1.remote_ip4,
930             )
931         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
932
933         for rx, p in zip(rxs, self.multi_params):
934             self.verify_decrypted(p, [rx])
935
936         tx = []
937         for p in self.multi_params:
938             tx = tx + self.gen_pkts(
939                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
940             )
941         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
942
943         for rx, p in zip(rxs, self.multi_params):
944             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
945
946
947 @tag_fixme_ubuntu2204
948 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
949     """IPsec IPv4 Tunnel interface all Algos"""
950
951     encryption_type = ESP
952     tun4_encrypt_node_name = "esp4-encrypt-tun"
953     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
954
955     def setUp(self):
956         super(TestIpsec4TunIfEspAll, self).setUp()
957
958         self.tun_if = self.pg0
959         p = self.ipv4_params
960
961         self.config_network(p)
962         self.config_sa_tra(p)
963         self.config_protect(p)
964
965     def tearDown(self):
966         p = self.ipv4_params
967         self.unconfig_protect(p)
968         self.unconfig_network(p)
969         self.unconfig_sa(p)
970
971         super(TestIpsec4TunIfEspAll, self).tearDown()
972
973     def rekey(self, p):
974         #
975         # change the key and the SPI
976         #
977         np = copy.copy(p)
978         p.crypt_key = b"X" + p.crypt_key[1:]
979         p.scapy_tun_spi += 1
980         p.scapy_tun_sa_id += 1
981         p.vpp_tun_spi += 1
982         p.vpp_tun_sa_id += 1
983         p.tun_if.local_spi = p.vpp_tun_spi
984         p.tun_if.remote_spi = p.scapy_tun_spi
985
986         config_tun_params(p, self.encryption_type, p.tun_if)
987
988         p.tun_sa_out = VppIpsecSA(
989             self,
990             p.scapy_tun_sa_id,
991             p.scapy_tun_spi,
992             p.auth_algo_vpp_id,
993             p.auth_key,
994             p.crypt_algo_vpp_id,
995             p.crypt_key,
996             self.vpp_esp_protocol,
997             flags=p.flags,
998             salt=p.salt,
999         )
1000         p.tun_sa_in = VppIpsecSA(
1001             self,
1002             p.vpp_tun_sa_id,
1003             p.vpp_tun_spi,
1004             p.auth_algo_vpp_id,
1005             p.auth_key,
1006             p.crypt_algo_vpp_id,
1007             p.crypt_key,
1008             self.vpp_esp_protocol,
1009             flags=p.flags,
1010             salt=p.salt,
1011         )
1012         p.tun_sa_in.add_vpp_config()
1013         p.tun_sa_out.add_vpp_config()
1014
1015         self.config_protect(p)
1016         np.tun_sa_out.remove_vpp_config()
1017         np.tun_sa_in.remove_vpp_config()
1018         self.logger.info(self.vapi.cli("sh ipsec sa"))
1019
1020     def test_tun_44(self):
1021         """IPSEC tunnel all algos"""
1022
1023         # foreach VPP crypto engine
1024         engines = ["ia32", "ipsecmb", "openssl"]
1025
1026         # foreach crypto algorithm
1027         algos = [
1028             {
1029                 "vpp-crypto": (
1030                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1031                 ),
1032                 "vpp-integ": (
1033                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1034                 ),
1035                 "scapy-crypto": "AES-GCM",
1036                 "scapy-integ": "NULL",
1037                 "key": b"JPjyOWBeVEQiMe7h",
1038                 "salt": 3333,
1039             },
1040             {
1041                 "vpp-crypto": (
1042                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1043                 ),
1044                 "vpp-integ": (
1045                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1046                 ),
1047                 "scapy-crypto": "AES-GCM",
1048                 "scapy-integ": "NULL",
1049                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1050                 "salt": 0,
1051             },
1052             {
1053                 "vpp-crypto": (
1054                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1055                 ),
1056                 "vpp-integ": (
1057                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1058                 ),
1059                 "scapy-crypto": "AES-GCM",
1060                 "scapy-integ": "NULL",
1061                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1062                 "salt": 9999,
1063             },
1064             {
1065                 "vpp-crypto": (
1066                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1067                 ),
1068                 "vpp-integ": (
1069                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1070                 ),
1071                 "scapy-crypto": "AES-CBC",
1072                 "scapy-integ": "HMAC-SHA1-96",
1073                 "salt": 0,
1074                 "key": b"JPjyOWBeVEQiMe7h",
1075             },
1076             {
1077                 "vpp-crypto": (
1078                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1079                 ),
1080                 "vpp-integ": (
1081                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1082                 ),
1083                 "scapy-crypto": "AES-CBC",
1084                 "scapy-integ": "SHA2-512-256",
1085                 "salt": 0,
1086                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1087             },
1088             {
1089                 "vpp-crypto": (
1090                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1091                 ),
1092                 "vpp-integ": (
1093                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1094                 ),
1095                 "scapy-crypto": "AES-CBC",
1096                 "scapy-integ": "SHA2-256-128",
1097                 "salt": 0,
1098                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1099             },
1100             {
1101                 "vpp-crypto": (
1102                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1103                 ),
1104                 "vpp-integ": (
1105                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1106                 ),
1107                 "scapy-crypto": "NULL",
1108                 "scapy-integ": "HMAC-SHA1-96",
1109                 "salt": 0,
1110                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1111             },
1112         ]
1113
1114         for engine in engines:
1115             self.vapi.cli("set crypto handler all %s" % engine)
1116
1117             #
1118             # loop through each of the algorithms
1119             #
1120             for algo in algos:
1121                 # with self.subTest(algo=algo['scapy']):
1122
1123                 p = self.ipv4_params
1124                 p.auth_algo_vpp_id = algo["vpp-integ"]
1125                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1126                 p.crypt_algo = algo["scapy-crypto"]
1127                 p.auth_algo = algo["scapy-integ"]
1128                 p.crypt_key = algo["key"]
1129                 p.salt = algo["salt"]
1130
1131                 #
1132                 # rekey the tunnel
1133                 #
1134                 self.rekey(p)
1135                 self.verify_tun_44(p, count=127)
1136
1137
1138 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1139     """IPsec IPv4 Tunnel interface no Algos"""
1140
1141     encryption_type = ESP
1142     tun4_encrypt_node_name = "esp4-encrypt-tun"
1143     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1144
1145     def setUp(self):
1146         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1147
1148         self.tun_if = self.pg0
1149         p = self.ipv4_params
1150         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1151         p.auth_algo = "NULL"
1152         p.auth_key = []
1153
1154         p.crypt_algo_vpp_id = (
1155             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1156         )
1157         p.crypt_algo = "NULL"
1158         p.crypt_key = []
1159
1160     def tearDown(self):
1161         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1162
1163     def test_tun_44(self):
1164         """IPSec SA with NULL algos"""
1165         p = self.ipv4_params
1166
1167         self.config_network(p)
1168         self.config_sa_tra(p)
1169         self.config_protect(p)
1170
1171         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1172         self.send_and_assert_no_replies(self.pg1, tx)
1173
1174         self.unconfig_protect(p)
1175         self.unconfig_sa(p)
1176         self.unconfig_network(p)
1177
1178
1179 @tag_fixme_vpp_workers
1180 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1181     """IPsec IPv6 Multi Tunnel interface"""
1182
1183     encryption_type = ESP
1184     tun6_encrypt_node_name = "esp6-encrypt-tun"
1185     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1186
1187     def setUp(self):
1188         super(TestIpsec6MultiTunIfEsp, self).setUp()
1189
1190         self.tun_if = self.pg0
1191
1192         self.multi_params = []
1193         self.pg0.generate_remote_hosts(10)
1194         self.pg0.configure_ipv6_neighbors()
1195
1196         for ii in range(10):
1197             p = copy.copy(self.ipv6_params)
1198
1199             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1200             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1201             p.scapy_tun_spi = p.scapy_tun_spi + ii
1202             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1203             p.vpp_tun_spi = p.vpp_tun_spi + ii
1204
1205             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1206             p.scapy_tra_spi = p.scapy_tra_spi + ii
1207             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1208             p.vpp_tra_spi = p.vpp_tra_spi + ii
1209             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1210
1211             self.multi_params.append(p)
1212             self.config_network(p)
1213             self.config_sa_tra(p)
1214             self.config_protect(p)
1215
1216     def tearDown(self):
1217         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1218
1219     def test_tun_66(self):
1220         """Multiple IPSEC tunnel interfaces"""
1221         for p in self.multi_params:
1222             self.verify_tun_66(p, count=127)
1223             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1224             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1225
1226
1227 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1228     """Ipsec GRE TEB ESP - TUN tests"""
1229
1230     tun4_encrypt_node_name = "esp4-encrypt-tun"
1231     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1232     encryption_type = ESP
1233     omac = "00:11:22:33:44:55"
1234
1235     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1236         return [
1237             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1238             / sa.encrypt(
1239                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1240                 / GRE()
1241                 / Ether(dst=self.omac)
1242                 / IP(src="1.1.1.1", dst="1.1.1.2")
1243                 / UDP(sport=1144, dport=2233)
1244                 / Raw(b"X" * payload_size)
1245             )
1246             for i in range(count)
1247         ]
1248
1249     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1250         return [
1251             Ether(dst=self.omac)
1252             / IP(src="1.1.1.1", dst="1.1.1.2")
1253             / UDP(sport=1144, dport=2233)
1254             / Raw(b"X" * payload_size)
1255             for i in range(count)
1256         ]
1257
1258     def verify_decrypted(self, p, rxs):
1259         for rx in rxs:
1260             self.assert_equal(rx[Ether].dst, self.omac)
1261             self.assert_equal(rx[IP].dst, "1.1.1.2")
1262
1263     def verify_encrypted(self, p, sa, rxs):
1264         for rx in rxs:
1265             try:
1266                 pkt = sa.decrypt(rx[IP])
1267                 if not pkt.haslayer(IP):
1268                     pkt = IP(pkt[Raw].load)
1269                 self.assert_packet_checksums_valid(pkt)
1270                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1271                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1272                 self.assertTrue(pkt.haslayer(GRE))
1273                 e = pkt[Ether]
1274                 self.assertEqual(e[Ether].dst, self.omac)
1275                 self.assertEqual(e[IP].dst, "1.1.1.2")
1276             except (IndexError, AssertionError):
1277                 self.logger.debug(ppp("Unexpected packet:", rx))
1278                 try:
1279                     self.logger.debug(ppp("Decrypted packet:", pkt))
1280                 except:
1281                     pass
1282                 raise
1283
1284     def setUp(self):
1285         super(TestIpsecGreTebIfEsp, self).setUp()
1286
1287         self.tun_if = self.pg0
1288
1289         p = self.ipv4_params
1290
1291         bd1 = VppBridgeDomain(self, 1)
1292         bd1.add_vpp_config()
1293
1294         p.tun_sa_out = VppIpsecSA(
1295             self,
1296             p.scapy_tun_sa_id,
1297             p.scapy_tun_spi,
1298             p.auth_algo_vpp_id,
1299             p.auth_key,
1300             p.crypt_algo_vpp_id,
1301             p.crypt_key,
1302             self.vpp_esp_protocol,
1303             self.pg0.local_ip4,
1304             self.pg0.remote_ip4,
1305         )
1306         p.tun_sa_out.add_vpp_config()
1307
1308         p.tun_sa_in = VppIpsecSA(
1309             self,
1310             p.vpp_tun_sa_id,
1311             p.vpp_tun_spi,
1312             p.auth_algo_vpp_id,
1313             p.auth_key,
1314             p.crypt_algo_vpp_id,
1315             p.crypt_key,
1316             self.vpp_esp_protocol,
1317             self.pg0.remote_ip4,
1318             self.pg0.local_ip4,
1319         )
1320         p.tun_sa_in.add_vpp_config()
1321
1322         p.tun_if = VppGreInterface(
1323             self,
1324             self.pg0.local_ip4,
1325             self.pg0.remote_ip4,
1326             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1327         )
1328         p.tun_if.add_vpp_config()
1329
1330         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1331
1332         p.tun_protect.add_vpp_config()
1333
1334         p.tun_if.admin_up()
1335         p.tun_if.config_ip4()
1336         config_tun_params(p, self.encryption_type, p.tun_if)
1337
1338         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1339         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1340
1341         self.vapi.cli("clear ipsec sa")
1342         self.vapi.cli("sh adj")
1343         self.vapi.cli("sh ipsec tun")
1344
1345     def tearDown(self):
1346         p = self.ipv4_params
1347         p.tun_if.unconfig_ip4()
1348         super(TestIpsecGreTebIfEsp, self).tearDown()
1349
1350
1351 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1352     """Ipsec GRE TEB ESP - TUN tests"""
1353
1354     tun4_encrypt_node_name = "esp4-encrypt-tun"
1355     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1356     encryption_type = ESP
1357     omac = "00:11:22:33:44:55"
1358
1359     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1360         return [
1361             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1362             / sa.encrypt(
1363                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1364                 / GRE()
1365                 / Ether(dst=self.omac)
1366                 / IP(src="1.1.1.1", dst="1.1.1.2")
1367                 / UDP(sport=1144, dport=2233)
1368                 / Raw(b"X" * payload_size)
1369             )
1370             for i in range(count)
1371         ]
1372
1373     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1374         return [
1375             Ether(dst=self.omac)
1376             / Dot1Q(vlan=11)
1377             / IP(src="1.1.1.1", dst="1.1.1.2")
1378             / UDP(sport=1144, dport=2233)
1379             / Raw(b"X" * payload_size)
1380             for i in range(count)
1381         ]
1382
1383     def verify_decrypted(self, p, rxs):
1384         for rx in rxs:
1385             self.assert_equal(rx[Ether].dst, self.omac)
1386             self.assert_equal(rx[Dot1Q].vlan, 11)
1387             self.assert_equal(rx[IP].dst, "1.1.1.2")
1388
1389     def verify_encrypted(self, p, sa, rxs):
1390         for rx in rxs:
1391             try:
1392                 pkt = sa.decrypt(rx[IP])
1393                 if not pkt.haslayer(IP):
1394                     pkt = IP(pkt[Raw].load)
1395                 self.assert_packet_checksums_valid(pkt)
1396                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1397                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1398                 self.assertTrue(pkt.haslayer(GRE))
1399                 e = pkt[Ether]
1400                 self.assertEqual(e[Ether].dst, self.omac)
1401                 self.assertFalse(e.haslayer(Dot1Q))
1402                 self.assertEqual(e[IP].dst, "1.1.1.2")
1403             except (IndexError, AssertionError):
1404                 self.logger.debug(ppp("Unexpected packet:", rx))
1405                 try:
1406                     self.logger.debug(ppp("Decrypted packet:", pkt))
1407                 except:
1408                     pass
1409                 raise
1410
1411     def setUp(self):
1412         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1413
1414         self.tun_if = self.pg0
1415
1416         p = self.ipv4_params
1417
1418         bd1 = VppBridgeDomain(self, 1)
1419         bd1.add_vpp_config()
1420
1421         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1422         self.vapi.l2_interface_vlan_tag_rewrite(
1423             sw_if_index=self.pg1_11.sw_if_index,
1424             vtr_op=L2_VTR_OP.L2_POP_1,
1425             push_dot1q=11,
1426         )
1427         self.pg1_11.admin_up()
1428
1429         p.tun_sa_out = VppIpsecSA(
1430             self,
1431             p.scapy_tun_sa_id,
1432             p.scapy_tun_spi,
1433             p.auth_algo_vpp_id,
1434             p.auth_key,
1435             p.crypt_algo_vpp_id,
1436             p.crypt_key,
1437             self.vpp_esp_protocol,
1438             self.pg0.local_ip4,
1439             self.pg0.remote_ip4,
1440         )
1441         p.tun_sa_out.add_vpp_config()
1442
1443         p.tun_sa_in = VppIpsecSA(
1444             self,
1445             p.vpp_tun_sa_id,
1446             p.vpp_tun_spi,
1447             p.auth_algo_vpp_id,
1448             p.auth_key,
1449             p.crypt_algo_vpp_id,
1450             p.crypt_key,
1451             self.vpp_esp_protocol,
1452             self.pg0.remote_ip4,
1453             self.pg0.local_ip4,
1454         )
1455         p.tun_sa_in.add_vpp_config()
1456
1457         p.tun_if = VppGreInterface(
1458             self,
1459             self.pg0.local_ip4,
1460             self.pg0.remote_ip4,
1461             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1462         )
1463         p.tun_if.add_vpp_config()
1464
1465         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1466
1467         p.tun_protect.add_vpp_config()
1468
1469         p.tun_if.admin_up()
1470         p.tun_if.config_ip4()
1471         config_tun_params(p, self.encryption_type, p.tun_if)
1472
1473         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1474         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1475
1476         self.vapi.cli("clear ipsec sa")
1477
1478     def tearDown(self):
1479         p = self.ipv4_params
1480         p.tun_if.unconfig_ip4()
1481         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1482         self.pg1_11.admin_down()
1483         self.pg1_11.remove_vpp_config()
1484
1485
1486 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1487     """Ipsec GRE TEB ESP - Tra tests"""
1488
1489     tun4_encrypt_node_name = "esp4-encrypt-tun"
1490     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1491     encryption_type = ESP
1492     omac = "00:11:22:33:44:55"
1493
1494     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1495         return [
1496             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1497             / sa.encrypt(
1498                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1499                 / GRE()
1500                 / Ether(dst=self.omac)
1501                 / IP(src="1.1.1.1", dst="1.1.1.2")
1502                 / UDP(sport=1144, dport=2233)
1503                 / Raw(b"X" * payload_size)
1504             )
1505             for i in range(count)
1506         ]
1507
1508     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1509         return [
1510             Ether(dst=self.omac)
1511             / IP(src="1.1.1.1", dst="1.1.1.2")
1512             / UDP(sport=1144, dport=2233)
1513             / Raw(b"X" * payload_size)
1514             for i in range(count)
1515         ]
1516
1517     def verify_decrypted(self, p, rxs):
1518         for rx in rxs:
1519             self.assert_equal(rx[Ether].dst, self.omac)
1520             self.assert_equal(rx[IP].dst, "1.1.1.2")
1521
1522     def verify_encrypted(self, p, sa, rxs):
1523         for rx in rxs:
1524             try:
1525                 pkt = sa.decrypt(rx[IP])
1526                 if not pkt.haslayer(IP):
1527                     pkt = IP(pkt[Raw].load)
1528                 self.assert_packet_checksums_valid(pkt)
1529                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1530                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1531                 self.assertTrue(pkt.haslayer(GRE))
1532                 e = pkt[Ether]
1533                 self.assertEqual(e[Ether].dst, self.omac)
1534                 self.assertEqual(e[IP].dst, "1.1.1.2")
1535             except (IndexError, AssertionError):
1536                 self.logger.debug(ppp("Unexpected packet:", rx))
1537                 try:
1538                     self.logger.debug(ppp("Decrypted packet:", pkt))
1539                 except:
1540                     pass
1541                 raise
1542
1543     def setUp(self):
1544         super(TestIpsecGreTebIfEspTra, self).setUp()
1545
1546         self.tun_if = self.pg0
1547
1548         p = self.ipv4_params
1549
1550         bd1 = VppBridgeDomain(self, 1)
1551         bd1.add_vpp_config()
1552
1553         p.tun_sa_out = VppIpsecSA(
1554             self,
1555             p.scapy_tun_sa_id,
1556             p.scapy_tun_spi,
1557             p.auth_algo_vpp_id,
1558             p.auth_key,
1559             p.crypt_algo_vpp_id,
1560             p.crypt_key,
1561             self.vpp_esp_protocol,
1562         )
1563         p.tun_sa_out.add_vpp_config()
1564
1565         p.tun_sa_in = VppIpsecSA(
1566             self,
1567             p.vpp_tun_sa_id,
1568             p.vpp_tun_spi,
1569             p.auth_algo_vpp_id,
1570             p.auth_key,
1571             p.crypt_algo_vpp_id,
1572             p.crypt_key,
1573             self.vpp_esp_protocol,
1574         )
1575         p.tun_sa_in.add_vpp_config()
1576
1577         p.tun_if = VppGreInterface(
1578             self,
1579             self.pg0.local_ip4,
1580             self.pg0.remote_ip4,
1581             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1582         )
1583         p.tun_if.add_vpp_config()
1584
1585         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1586
1587         p.tun_protect.add_vpp_config()
1588
1589         p.tun_if.admin_up()
1590         p.tun_if.config_ip4()
1591         config_tra_params(p, self.encryption_type, p.tun_if)
1592
1593         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1594         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1595
1596         self.vapi.cli("clear ipsec sa")
1597
1598     def tearDown(self):
1599         p = self.ipv4_params
1600         p.tun_if.unconfig_ip4()
1601         super(TestIpsecGreTebIfEspTra, self).tearDown()
1602
1603
1604 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1605     """Ipsec GRE TEB UDP ESP - Tra tests"""
1606
1607     tun4_encrypt_node_name = "esp4-encrypt-tun"
1608     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1609     encryption_type = ESP
1610     omac = "00:11:22:33:44:55"
1611
1612     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1613         return [
1614             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1615             / sa.encrypt(
1616                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1617                 / GRE()
1618                 / Ether(dst=self.omac)
1619                 / IP(src="1.1.1.1", dst="1.1.1.2")
1620                 / UDP(sport=1144, dport=2233)
1621                 / Raw(b"X" * payload_size)
1622             )
1623             for i in range(count)
1624         ]
1625
1626     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1627         return [
1628             Ether(dst=self.omac)
1629             / IP(src="1.1.1.1", dst="1.1.1.2")
1630             / UDP(sport=1144, dport=2233)
1631             / Raw(b"X" * payload_size)
1632             for i in range(count)
1633         ]
1634
1635     def verify_decrypted(self, p, rxs):
1636         for rx in rxs:
1637             self.assert_equal(rx[Ether].dst, self.omac)
1638             self.assert_equal(rx[IP].dst, "1.1.1.2")
1639
1640     def verify_encrypted(self, p, sa, rxs):
1641         for rx in rxs:
1642             self.assertTrue(rx.haslayer(UDP))
1643             self.assertEqual(rx[UDP].dport, 4545)
1644             self.assertEqual(rx[UDP].sport, 5454)
1645             try:
1646                 pkt = sa.decrypt(rx[IP])
1647                 if not pkt.haslayer(IP):
1648                     pkt = IP(pkt[Raw].load)
1649                 self.assert_packet_checksums_valid(pkt)
1650                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1651                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1652                 self.assertTrue(pkt.haslayer(GRE))
1653                 e = pkt[Ether]
1654                 self.assertEqual(e[Ether].dst, self.omac)
1655                 self.assertEqual(e[IP].dst, "1.1.1.2")
1656             except (IndexError, AssertionError):
1657                 self.logger.debug(ppp("Unexpected packet:", rx))
1658                 try:
1659                     self.logger.debug(ppp("Decrypted packet:", pkt))
1660                 except:
1661                     pass
1662                 raise
1663
1664     def setUp(self):
1665         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1666
1667         self.tun_if = self.pg0
1668
1669         p = self.ipv4_params
1670         p = self.ipv4_params
1671         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1672         p.nat_header = UDP(sport=5454, dport=4545)
1673
1674         bd1 = VppBridgeDomain(self, 1)
1675         bd1.add_vpp_config()
1676
1677         p.tun_sa_out = VppIpsecSA(
1678             self,
1679             p.scapy_tun_sa_id,
1680             p.scapy_tun_spi,
1681             p.auth_algo_vpp_id,
1682             p.auth_key,
1683             p.crypt_algo_vpp_id,
1684             p.crypt_key,
1685             self.vpp_esp_protocol,
1686             flags=p.flags,
1687             udp_src=5454,
1688             udp_dst=4545,
1689         )
1690         p.tun_sa_out.add_vpp_config()
1691
1692         p.tun_sa_in = VppIpsecSA(
1693             self,
1694             p.vpp_tun_sa_id,
1695             p.vpp_tun_spi,
1696             p.auth_algo_vpp_id,
1697             p.auth_key,
1698             p.crypt_algo_vpp_id,
1699             p.crypt_key,
1700             self.vpp_esp_protocol,
1701             flags=(
1702                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1703             ),
1704             udp_src=4545,
1705             udp_dst=5454,
1706         )
1707         p.tun_sa_in.add_vpp_config()
1708
1709         p.tun_if = VppGreInterface(
1710             self,
1711             self.pg0.local_ip4,
1712             self.pg0.remote_ip4,
1713             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1714         )
1715         p.tun_if.add_vpp_config()
1716
1717         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1718
1719         p.tun_protect.add_vpp_config()
1720
1721         p.tun_if.admin_up()
1722         p.tun_if.config_ip4()
1723         config_tra_params(p, self.encryption_type, p.tun_if)
1724
1725         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1726         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1727
1728         self.vapi.cli("clear ipsec sa")
1729         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1730
1731     def tearDown(self):
1732         p = self.ipv4_params
1733         p.tun_if.unconfig_ip4()
1734         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1735
1736
1737 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1738     """Ipsec GRE ESP - TUN tests"""
1739
1740     tun4_encrypt_node_name = "esp4-encrypt-tun"
1741     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1742     encryption_type = ESP
1743
1744     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1745         return [
1746             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1747             / sa.encrypt(
1748                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1749                 / GRE()
1750                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1751                 / UDP(sport=1144, dport=2233)
1752                 / Raw(b"X" * payload_size)
1753             )
1754             for i in range(count)
1755         ]
1756
1757     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1758         return [
1759             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1760             / IP(src="1.1.1.1", dst="1.1.1.2")
1761             / UDP(sport=1144, dport=2233)
1762             / Raw(b"X" * payload_size)
1763             for i in range(count)
1764         ]
1765
1766     def verify_decrypted(self, p, rxs):
1767         for rx in rxs:
1768             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1769             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1770
1771     def verify_encrypted(self, p, sa, rxs):
1772         for rx in rxs:
1773             try:
1774                 pkt = sa.decrypt(rx[IP])
1775                 if not pkt.haslayer(IP):
1776                     pkt = IP(pkt[Raw].load)
1777                 self.assert_packet_checksums_valid(pkt)
1778                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1779                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1780                 self.assertTrue(pkt.haslayer(GRE))
1781                 e = pkt[GRE]
1782                 self.assertEqual(e[IP].dst, "1.1.1.2")
1783             except (IndexError, AssertionError):
1784                 self.logger.debug(ppp("Unexpected packet:", rx))
1785                 try:
1786                     self.logger.debug(ppp("Decrypted packet:", pkt))
1787                 except:
1788                     pass
1789                 raise
1790
1791     def setUp(self):
1792         super(TestIpsecGreIfEsp, self).setUp()
1793
1794         self.tun_if = self.pg0
1795
1796         p = self.ipv4_params
1797
1798         bd1 = VppBridgeDomain(self, 1)
1799         bd1.add_vpp_config()
1800
1801         p.tun_sa_out = VppIpsecSA(
1802             self,
1803             p.scapy_tun_sa_id,
1804             p.scapy_tun_spi,
1805             p.auth_algo_vpp_id,
1806             p.auth_key,
1807             p.crypt_algo_vpp_id,
1808             p.crypt_key,
1809             self.vpp_esp_protocol,
1810             self.pg0.local_ip4,
1811             self.pg0.remote_ip4,
1812         )
1813         p.tun_sa_out.add_vpp_config()
1814
1815         p.tun_sa_in = VppIpsecSA(
1816             self,
1817             p.vpp_tun_sa_id,
1818             p.vpp_tun_spi,
1819             p.auth_algo_vpp_id,
1820             p.auth_key,
1821             p.crypt_algo_vpp_id,
1822             p.crypt_key,
1823             self.vpp_esp_protocol,
1824             self.pg0.remote_ip4,
1825             self.pg0.local_ip4,
1826         )
1827         p.tun_sa_in.add_vpp_config()
1828
1829         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1830         p.tun_if.add_vpp_config()
1831
1832         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1833         p.tun_protect.add_vpp_config()
1834
1835         p.tun_if.admin_up()
1836         p.tun_if.config_ip4()
1837         config_tun_params(p, self.encryption_type, p.tun_if)
1838
1839         VppIpRoute(
1840             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1841         ).add_vpp_config()
1842
1843     def tearDown(self):
1844         p = self.ipv4_params
1845         p.tun_if.unconfig_ip4()
1846         super(TestIpsecGreIfEsp, self).tearDown()
1847
1848
1849 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1850     """Ipsec GRE ESP - TRA tests"""
1851
1852     tun4_encrypt_node_name = "esp4-encrypt-tun"
1853     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1854     encryption_type = ESP
1855
1856     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1857         return [
1858             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1859             / sa.encrypt(
1860                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1861                 / GRE()
1862                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1863                 / UDP(sport=1144, dport=2233)
1864                 / Raw(b"X" * payload_size)
1865             )
1866             for i in range(count)
1867         ]
1868
1869     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1870         return [
1871             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1872             / sa.encrypt(
1873                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1874                 / GRE()
1875                 / UDP(sport=1144, dport=2233)
1876                 / Raw(b"X" * payload_size)
1877             )
1878             for i in range(count)
1879         ]
1880
1881     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1882         return [
1883             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1884             / IP(src="1.1.1.1", dst="1.1.1.2")
1885             / UDP(sport=1144, dport=2233)
1886             / Raw(b"X" * payload_size)
1887             for i in range(count)
1888         ]
1889
1890     def verify_decrypted(self, p, rxs):
1891         for rx in rxs:
1892             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1893             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1894
1895     def verify_encrypted(self, p, sa, rxs):
1896         for rx in rxs:
1897             try:
1898                 pkt = sa.decrypt(rx[IP])
1899                 if not pkt.haslayer(IP):
1900                     pkt = IP(pkt[Raw].load)
1901                 self.assert_packet_checksums_valid(pkt)
1902                 self.assertTrue(pkt.haslayer(GRE))
1903                 e = pkt[GRE]
1904                 self.assertEqual(e[IP].dst, "1.1.1.2")
1905             except (IndexError, AssertionError):
1906                 self.logger.debug(ppp("Unexpected packet:", rx))
1907                 try:
1908                     self.logger.debug(ppp("Decrypted packet:", pkt))
1909                 except:
1910                     pass
1911                 raise
1912
1913     def setUp(self):
1914         super(TestIpsecGreIfEspTra, self).setUp()
1915
1916         self.tun_if = self.pg0
1917
1918         p = self.ipv4_params
1919
1920         p.tun_sa_out = VppIpsecSA(
1921             self,
1922             p.scapy_tun_sa_id,
1923             p.scapy_tun_spi,
1924             p.auth_algo_vpp_id,
1925             p.auth_key,
1926             p.crypt_algo_vpp_id,
1927             p.crypt_key,
1928             self.vpp_esp_protocol,
1929         )
1930         p.tun_sa_out.add_vpp_config()
1931
1932         p.tun_sa_in = VppIpsecSA(
1933             self,
1934             p.vpp_tun_sa_id,
1935             p.vpp_tun_spi,
1936             p.auth_algo_vpp_id,
1937             p.auth_key,
1938             p.crypt_algo_vpp_id,
1939             p.crypt_key,
1940             self.vpp_esp_protocol,
1941         )
1942         p.tun_sa_in.add_vpp_config()
1943
1944         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1945         p.tun_if.add_vpp_config()
1946
1947         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1948         p.tun_protect.add_vpp_config()
1949
1950         p.tun_if.admin_up()
1951         p.tun_if.config_ip4()
1952         config_tra_params(p, self.encryption_type, p.tun_if)
1953
1954         VppIpRoute(
1955             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1956         ).add_vpp_config()
1957
1958     def tearDown(self):
1959         p = self.ipv4_params
1960         p.tun_if.unconfig_ip4()
1961         super(TestIpsecGreIfEspTra, self).tearDown()
1962
1963     def test_gre_non_ip(self):
1964         p = self.ipv4_params
1965         tx = self.gen_encrypt_non_ip_pkts(
1966             p.scapy_tun_sa,
1967             self.tun_if,
1968             src=p.remote_tun_if_host,
1969             dst=self.pg1.remote_ip6,
1970         )
1971         self.send_and_assert_no_replies(self.tun_if, tx)
1972         node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1973         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1974
1975
1976 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1977     """Ipsec GRE ESP - TRA tests"""
1978
1979     tun6_encrypt_node_name = "esp6-encrypt-tun"
1980     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1981     encryption_type = ESP
1982
1983     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1984         return [
1985             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1986             / sa.encrypt(
1987                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
1988                 / GRE()
1989                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
1990                 / UDP(sport=1144, dport=2233)
1991                 / Raw(b"X" * payload_size)
1992             )
1993             for i in range(count)
1994         ]
1995
1996     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
1997         return [
1998             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1999             / IPv6(src="1::1", dst="1::2")
2000             / UDP(sport=1144, dport=2233)
2001             / Raw(b"X" * payload_size)
2002             for i in range(count)
2003         ]
2004
2005     def verify_decrypted6(self, p, rxs):
2006         for rx in rxs:
2007             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2008             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2009
2010     def verify_encrypted6(self, p, sa, rxs):
2011         for rx in rxs:
2012             try:
2013                 pkt = sa.decrypt(rx[IPv6])
2014                 if not pkt.haslayer(IPv6):
2015                     pkt = IPv6(pkt[Raw].load)
2016                 self.assert_packet_checksums_valid(pkt)
2017                 self.assertTrue(pkt.haslayer(GRE))
2018                 e = pkt[GRE]
2019                 self.assertEqual(e[IPv6].dst, "1::2")
2020             except (IndexError, AssertionError):
2021                 self.logger.debug(ppp("Unexpected packet:", rx))
2022                 try:
2023                     self.logger.debug(ppp("Decrypted packet:", pkt))
2024                 except:
2025                     pass
2026                 raise
2027
2028     def setUp(self):
2029         super(TestIpsecGre6IfEspTra, self).setUp()
2030
2031         self.tun_if = self.pg0
2032
2033         p = self.ipv6_params
2034
2035         bd1 = VppBridgeDomain(self, 1)
2036         bd1.add_vpp_config()
2037
2038         p.tun_sa_out = VppIpsecSA(
2039             self,
2040             p.scapy_tun_sa_id,
2041             p.scapy_tun_spi,
2042             p.auth_algo_vpp_id,
2043             p.auth_key,
2044             p.crypt_algo_vpp_id,
2045             p.crypt_key,
2046             self.vpp_esp_protocol,
2047         )
2048         p.tun_sa_out.add_vpp_config()
2049
2050         p.tun_sa_in = VppIpsecSA(
2051             self,
2052             p.vpp_tun_sa_id,
2053             p.vpp_tun_spi,
2054             p.auth_algo_vpp_id,
2055             p.auth_key,
2056             p.crypt_algo_vpp_id,
2057             p.crypt_key,
2058             self.vpp_esp_protocol,
2059         )
2060         p.tun_sa_in.add_vpp_config()
2061
2062         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2063         p.tun_if.add_vpp_config()
2064
2065         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2066         p.tun_protect.add_vpp_config()
2067
2068         p.tun_if.admin_up()
2069         p.tun_if.config_ip6()
2070         config_tra_params(p, self.encryption_type, p.tun_if)
2071
2072         r = VppIpRoute(
2073             self,
2074             "1::2",
2075             128,
2076             [
2077                 VppRoutePath(
2078                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2079                 )
2080             ],
2081         )
2082         r.add_vpp_config()
2083
2084     def tearDown(self):
2085         p = self.ipv6_params
2086         p.tun_if.unconfig_ip6()
2087         super(TestIpsecGre6IfEspTra, self).tearDown()
2088
2089
2090 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2091     """Ipsec mGRE ESP v4 TRA tests"""
2092
2093     tun4_encrypt_node_name = "esp4-encrypt-tun"
2094     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2095     encryption_type = ESP
2096
2097     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2098         return [
2099             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2100             / sa.encrypt(
2101                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2102                 / GRE()
2103                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2104                 / UDP(sport=1144, dport=2233)
2105                 / Raw(b"X" * payload_size)
2106             )
2107             for i in range(count)
2108         ]
2109
2110     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2111         return [
2112             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2113             / IP(src="1.1.1.1", dst=dst)
2114             / UDP(sport=1144, dport=2233)
2115             / Raw(b"X" * payload_size)
2116             for i in range(count)
2117         ]
2118
2119     def verify_decrypted(self, p, rxs):
2120         for rx in rxs:
2121             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2122             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2123
2124     def verify_encrypted(self, p, sa, rxs):
2125         for rx in rxs:
2126             try:
2127                 pkt = sa.decrypt(rx[IP])
2128                 if not pkt.haslayer(IP):
2129                     pkt = IP(pkt[Raw].load)
2130                 self.assert_packet_checksums_valid(pkt)
2131                 self.assertTrue(pkt.haslayer(GRE))
2132                 e = pkt[GRE]
2133                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2134             except (IndexError, AssertionError):
2135                 self.logger.debug(ppp("Unexpected packet:", rx))
2136                 try:
2137                     self.logger.debug(ppp("Decrypted packet:", pkt))
2138                 except:
2139                     pass
2140                 raise
2141
2142     def setUp(self):
2143         super(TestIpsecMGreIfEspTra4, self).setUp()
2144
2145         N_NHS = 16
2146         self.tun_if = self.pg0
2147         p = self.ipv4_params
2148         p.tun_if = VppGreInterface(
2149             self,
2150             self.pg0.local_ip4,
2151             "0.0.0.0",
2152             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2153         )
2154         p.tun_if.add_vpp_config()
2155         p.tun_if.admin_up()
2156         p.tun_if.config_ip4()
2157         p.tun_if.generate_remote_hosts(N_NHS)
2158         self.pg0.generate_remote_hosts(N_NHS)
2159         self.pg0.configure_ipv4_neighbors()
2160
2161         # setup some SAs for several next-hops on the interface
2162         self.multi_params = []
2163
2164         for ii in range(N_NHS):
2165             p = copy.copy(self.ipv4_params)
2166
2167             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2168             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2169             p.scapy_tun_spi = p.scapy_tun_spi + ii
2170             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2171             p.vpp_tun_spi = p.vpp_tun_spi + ii
2172
2173             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2174             p.scapy_tra_spi = p.scapy_tra_spi + ii
2175             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2176             p.vpp_tra_spi = p.vpp_tra_spi + ii
2177             p.tun_sa_out = VppIpsecSA(
2178                 self,
2179                 p.scapy_tun_sa_id,
2180                 p.scapy_tun_spi,
2181                 p.auth_algo_vpp_id,
2182                 p.auth_key,
2183                 p.crypt_algo_vpp_id,
2184                 p.crypt_key,
2185                 self.vpp_esp_protocol,
2186             )
2187             p.tun_sa_out.add_vpp_config()
2188
2189             p.tun_sa_in = VppIpsecSA(
2190                 self,
2191                 p.vpp_tun_sa_id,
2192                 p.vpp_tun_spi,
2193                 p.auth_algo_vpp_id,
2194                 p.auth_key,
2195                 p.crypt_algo_vpp_id,
2196                 p.crypt_key,
2197                 self.vpp_esp_protocol,
2198             )
2199             p.tun_sa_in.add_vpp_config()
2200
2201             p.tun_protect = VppIpsecTunProtect(
2202                 self,
2203                 p.tun_if,
2204                 p.tun_sa_out,
2205                 [p.tun_sa_in],
2206                 nh=p.tun_if.remote_hosts[ii].ip4,
2207             )
2208             p.tun_protect.add_vpp_config()
2209             config_tra_params(p, self.encryption_type, p.tun_if)
2210             self.multi_params.append(p)
2211
2212             VppIpRoute(
2213                 self,
2214                 p.remote_tun_if_host,
2215                 32,
2216                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2217             ).add_vpp_config()
2218
2219             # in this v4 variant add the teibs after the protect
2220             p.teib = VppTeib(
2221                 self,
2222                 p.tun_if,
2223                 p.tun_if.remote_hosts[ii].ip4,
2224                 self.pg0.remote_hosts[ii].ip4,
2225             ).add_vpp_config()
2226             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2227         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2228
2229     def tearDown(self):
2230         p = self.ipv4_params
2231         p.tun_if.unconfig_ip4()
2232         super(TestIpsecMGreIfEspTra4, self).tearDown()
2233
2234     def test_tun_44(self):
2235         """mGRE IPSEC 44"""
2236         N_PKTS = 63
2237         for p in self.multi_params:
2238             self.verify_tun_44(p, count=N_PKTS)
2239             p.teib.remove_vpp_config()
2240             self.verify_tun_dropped_44(p, count=N_PKTS)
2241             p.teib.add_vpp_config()
2242             self.verify_tun_44(p, count=N_PKTS)
2243
2244
2245 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2246     """Ipsec mGRE ESP v6 TRA tests"""
2247
2248     tun6_encrypt_node_name = "esp6-encrypt-tun"
2249     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2250     encryption_type = ESP
2251
2252     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2253         return [
2254             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2255             / sa.encrypt(
2256                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2257                 / GRE()
2258                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2259                 / UDP(sport=1144, dport=2233)
2260                 / Raw(b"X" * payload_size)
2261             )
2262             for i in range(count)
2263         ]
2264
2265     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2266         return [
2267             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2268             / IPv6(src="1::1", dst=dst)
2269             / UDP(sport=1144, dport=2233)
2270             / Raw(b"X" * payload_size)
2271             for i in range(count)
2272         ]
2273
2274     def verify_decrypted6(self, p, rxs):
2275         for rx in rxs:
2276             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2277             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2278
2279     def verify_encrypted6(self, p, sa, rxs):
2280         for rx in rxs:
2281             try:
2282                 pkt = sa.decrypt(rx[IPv6])
2283                 if not pkt.haslayer(IPv6):
2284                     pkt = IPv6(pkt[Raw].load)
2285                 self.assert_packet_checksums_valid(pkt)
2286                 self.assertTrue(pkt.haslayer(GRE))
2287                 e = pkt[GRE]
2288                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2289             except (IndexError, AssertionError):
2290                 self.logger.debug(ppp("Unexpected packet:", rx))
2291                 try:
2292                     self.logger.debug(ppp("Decrypted packet:", pkt))
2293                 except:
2294                     pass
2295                 raise
2296
2297     def setUp(self):
2298         super(TestIpsecMGreIfEspTra6, self).setUp()
2299
2300         self.vapi.cli("set logging class ipsec level debug")
2301
2302         N_NHS = 16
2303         self.tun_if = self.pg0
2304         p = self.ipv6_params
2305         p.tun_if = VppGreInterface(
2306             self,
2307             self.pg0.local_ip6,
2308             "::",
2309             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2310         )
2311         p.tun_if.add_vpp_config()
2312         p.tun_if.admin_up()
2313         p.tun_if.config_ip6()
2314         p.tun_if.generate_remote_hosts(N_NHS)
2315         self.pg0.generate_remote_hosts(N_NHS)
2316         self.pg0.configure_ipv6_neighbors()
2317
2318         # setup some SAs for several next-hops on the interface
2319         self.multi_params = []
2320
2321         for ii in range(N_NHS):
2322             p = copy.copy(self.ipv6_params)
2323
2324             p.remote_tun_if_host = "1::%d" % (ii + 1)
2325             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2326             p.scapy_tun_spi = p.scapy_tun_spi + ii
2327             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2328             p.vpp_tun_spi = p.vpp_tun_spi + ii
2329
2330             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2331             p.scapy_tra_spi = p.scapy_tra_spi + ii
2332             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2333             p.vpp_tra_spi = p.vpp_tra_spi + ii
2334             p.tun_sa_out = VppIpsecSA(
2335                 self,
2336                 p.scapy_tun_sa_id,
2337                 p.scapy_tun_spi,
2338                 p.auth_algo_vpp_id,
2339                 p.auth_key,
2340                 p.crypt_algo_vpp_id,
2341                 p.crypt_key,
2342                 self.vpp_esp_protocol,
2343             )
2344             p.tun_sa_out.add_vpp_config()
2345
2346             p.tun_sa_in = VppIpsecSA(
2347                 self,
2348                 p.vpp_tun_sa_id,
2349                 p.vpp_tun_spi,
2350                 p.auth_algo_vpp_id,
2351                 p.auth_key,
2352                 p.crypt_algo_vpp_id,
2353                 p.crypt_key,
2354                 self.vpp_esp_protocol,
2355             )
2356             p.tun_sa_in.add_vpp_config()
2357
2358             # in this v6 variant add the teibs first then the protection
2359             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2360             VppTeib(
2361                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2362             ).add_vpp_config()
2363
2364             p.tun_protect = VppIpsecTunProtect(
2365                 self,
2366                 p.tun_if,
2367                 p.tun_sa_out,
2368                 [p.tun_sa_in],
2369                 nh=p.tun_if.remote_hosts[ii].ip6,
2370             )
2371             p.tun_protect.add_vpp_config()
2372             config_tra_params(p, self.encryption_type, p.tun_if)
2373             self.multi_params.append(p)
2374
2375             VppIpRoute(
2376                 self,
2377                 p.remote_tun_if_host,
2378                 128,
2379                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2380             ).add_vpp_config()
2381             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2382
2383         self.logger.info(self.vapi.cli("sh log"))
2384         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2385         self.logger.info(self.vapi.cli("sh adj 41"))
2386
2387     def tearDown(self):
2388         p = self.ipv6_params
2389         p.tun_if.unconfig_ip6()
2390         super(TestIpsecMGreIfEspTra6, self).tearDown()
2391
2392     def test_tun_66(self):
2393         """mGRE IPSec 66"""
2394         for p in self.multi_params:
2395             self.verify_tun_66(p, count=63)
2396
2397
2398 @tag_fixme_vpp_workers
2399 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2400     """IPsec IPv4 Tunnel protect - transport mode"""
2401
2402     def setUp(self):
2403         super(TestIpsec4TunProtect, self).setUp()
2404
2405         self.tun_if = self.pg0
2406
2407     def tearDown(self):
2408         super(TestIpsec4TunProtect, self).tearDown()
2409
2410     def test_tun_44(self):
2411         """IPSEC tunnel protect"""
2412
2413         p = self.ipv4_params
2414
2415         self.config_network(p)
2416         self.config_sa_tra(p)
2417         self.config_protect(p)
2418
2419         self.verify_tun_44(p, count=127)
2420         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2421         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2422
2423         self.vapi.cli("clear ipsec sa")
2424         self.verify_tun_64(p, count=127)
2425         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2426         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2427
2428         # rekey - create new SAs and update the tunnel protection
2429         np = copy.copy(p)
2430         np.crypt_key = b"X" + p.crypt_key[1:]
2431         np.scapy_tun_spi += 100
2432         np.scapy_tun_sa_id += 1
2433         np.vpp_tun_spi += 100
2434         np.vpp_tun_sa_id += 1
2435         np.tun_if.local_spi = p.vpp_tun_spi
2436         np.tun_if.remote_spi = p.scapy_tun_spi
2437
2438         self.config_sa_tra(np)
2439         self.config_protect(np)
2440         self.unconfig_sa(p)
2441
2442         self.verify_tun_44(np, count=127)
2443         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2444         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2445
2446         # teardown
2447         self.unconfig_protect(np)
2448         self.unconfig_sa(np)
2449         self.unconfig_network(p)
2450
2451
2452 @tag_fixme_vpp_workers
2453 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2454     """IPsec IPv4 Tunnel protect - transport mode"""
2455
2456     def setUp(self):
2457         super(TestIpsec4TunProtectUdp, self).setUp()
2458
2459         self.tun_if = self.pg0
2460
2461         p = self.ipv4_params
2462         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2463         p.nat_header = UDP(sport=4500, dport=4500)
2464         self.config_network(p)
2465         self.config_sa_tra(p)
2466         self.config_protect(p)
2467
2468     def tearDown(self):
2469         p = self.ipv4_params
2470         self.unconfig_protect(p)
2471         self.unconfig_sa(p)
2472         self.unconfig_network(p)
2473         super(TestIpsec4TunProtectUdp, self).tearDown()
2474
2475     def verify_encrypted(self, p, sa, rxs):
2476         # ensure encrypted packets are recieved with the default UDP ports
2477         for rx in rxs:
2478             self.assertEqual(rx[UDP].sport, 4500)
2479             self.assertEqual(rx[UDP].dport, 4500)
2480         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2481
2482     def test_tun_44(self):
2483         """IPSEC UDP tunnel protect"""
2484
2485         p = self.ipv4_params
2486
2487         self.verify_tun_44(p, count=127)
2488         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2489         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2490
2491     def test_keepalive(self):
2492         """IPSEC NAT Keepalive"""
2493         self.verify_keepalive(self.ipv4_params)
2494
2495
2496 @tag_fixme_vpp_workers
2497 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2498     """IPsec IPv4 Tunnel protect - tunnel mode"""
2499
2500     encryption_type = ESP
2501     tun4_encrypt_node_name = "esp4-encrypt-tun"
2502     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2503
2504     def setUp(self):
2505         super(TestIpsec4TunProtectTun, self).setUp()
2506
2507         self.tun_if = self.pg0
2508
2509     def tearDown(self):
2510         super(TestIpsec4TunProtectTun, self).tearDown()
2511
2512     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2513         return [
2514             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2515             / sa.encrypt(
2516                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2517                 / IP(src=src, dst=dst)
2518                 / UDP(sport=1144, dport=2233)
2519                 / Raw(b"X" * payload_size)
2520             )
2521             for i in range(count)
2522         ]
2523
2524     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2525         return [
2526             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2527             / IP(src=src, dst=dst)
2528             / UDP(sport=1144, dport=2233)
2529             / Raw(b"X" * payload_size)
2530             for i in range(count)
2531         ]
2532
2533     def verify_decrypted(self, p, rxs):
2534         for rx in rxs:
2535             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2536             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2537             self.assert_packet_checksums_valid(rx)
2538
2539     def verify_encrypted(self, p, sa, rxs):
2540         for rx in rxs:
2541             try:
2542                 pkt = sa.decrypt(rx[IP])
2543                 if not pkt.haslayer(IP):
2544                     pkt = IP(pkt[Raw].load)
2545                 self.assert_packet_checksums_valid(pkt)
2546                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2547                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2548                 inner = pkt[IP].payload
2549                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2550
2551             except (IndexError, AssertionError):
2552                 self.logger.debug(ppp("Unexpected packet:", rx))
2553                 try:
2554                     self.logger.debug(ppp("Decrypted packet:", pkt))
2555                 except:
2556                     pass
2557                 raise
2558
2559     def test_tun_44(self):
2560         """IPSEC tunnel protect"""
2561
2562         p = self.ipv4_params
2563
2564         self.config_network(p)
2565         self.config_sa_tun(p)
2566         self.config_protect(p)
2567
2568         # also add an output features on the tunnel and physical interface
2569         # so we test they still work
2570         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2571         a = VppAcl(self, [r_all]).add_vpp_config()
2572
2573         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2574         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2575
2576         self.verify_tun_44(p, count=127)
2577
2578         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2579         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2580
2581         # rekey - create new SAs and update the tunnel protection
2582         np = copy.copy(p)
2583         np.crypt_key = b"X" + p.crypt_key[1:]
2584         np.scapy_tun_spi += 100
2585         np.scapy_tun_sa_id += 1
2586         np.vpp_tun_spi += 100
2587         np.vpp_tun_sa_id += 1
2588         np.tun_if.local_spi = p.vpp_tun_spi
2589         np.tun_if.remote_spi = p.scapy_tun_spi
2590
2591         self.config_sa_tun(np)
2592         self.config_protect(np)
2593         self.unconfig_sa(p)
2594
2595         self.verify_tun_44(np, count=127)
2596         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2597         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2598
2599         # teardown
2600         self.unconfig_protect(np)
2601         self.unconfig_sa(np)
2602         self.unconfig_network(p)
2603
2604
2605 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2606     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2607
2608     encryption_type = ESP
2609     tun4_encrypt_node_name = "esp4-encrypt-tun"
2610     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2611
2612     def setUp(self):
2613         super(TestIpsec4TunProtectTunDrop, self).setUp()
2614
2615         self.tun_if = self.pg0
2616
2617     def tearDown(self):
2618         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2619
2620     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2621         return [
2622             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2623             / sa.encrypt(
2624                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2625                 / IP(src=src, dst=dst)
2626                 / UDP(sport=1144, dport=2233)
2627                 / Raw(b"X" * payload_size)
2628             )
2629             for i in range(count)
2630         ]
2631
2632     def test_tun_drop_44(self):
2633         """IPSEC tunnel protect bogus tunnel header"""
2634
2635         p = self.ipv4_params
2636
2637         self.config_network(p)
2638         self.config_sa_tun(p)
2639         self.config_protect(p)
2640
2641         tx = self.gen_encrypt_pkts(
2642             p,
2643             p.scapy_tun_sa,
2644             self.tun_if,
2645             src=p.remote_tun_if_host,
2646             dst=self.pg1.remote_ip4,
2647             count=63,
2648         )
2649         self.send_and_assert_no_replies(self.tun_if, tx)
2650
2651         # teardown
2652         self.unconfig_protect(p)
2653         self.unconfig_sa(p)
2654         self.unconfig_network(p)
2655
2656
2657 @tag_fixme_vpp_workers
2658 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2659     """IPsec IPv6 Tunnel protect - transport mode"""
2660
2661     encryption_type = ESP
2662     tun6_encrypt_node_name = "esp6-encrypt-tun"
2663     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2664
2665     def setUp(self):
2666         super(TestIpsec6TunProtect, self).setUp()
2667
2668         self.tun_if = self.pg0
2669
2670     def tearDown(self):
2671         super(TestIpsec6TunProtect, self).tearDown()
2672
2673     def test_tun_66(self):
2674         """IPSEC tunnel protect 6o6"""
2675
2676         p = self.ipv6_params
2677
2678         self.config_network(p)
2679         self.config_sa_tra(p)
2680         self.config_protect(p)
2681
2682         self.verify_tun_66(p, count=127)
2683         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2684         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2685
2686         # rekey - create new SAs and update the tunnel protection
2687         np = copy.copy(p)
2688         np.crypt_key = b"X" + p.crypt_key[1:]
2689         np.scapy_tun_spi += 100
2690         np.scapy_tun_sa_id += 1
2691         np.vpp_tun_spi += 100
2692         np.vpp_tun_sa_id += 1
2693         np.tun_if.local_spi = p.vpp_tun_spi
2694         np.tun_if.remote_spi = p.scapy_tun_spi
2695
2696         self.config_sa_tra(np)
2697         self.config_protect(np)
2698         self.unconfig_sa(p)
2699
2700         self.verify_tun_66(np, count=127)
2701         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2702         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2703
2704         # bounce the interface state
2705         p.tun_if.admin_down()
2706         self.verify_drop_tun_66(np, count=127)
2707         node = "/err/ipsec6-tun-input/disabled"
2708         self.assertEqual(127, self.statistics.get_err_counter(node))
2709         p.tun_if.admin_up()
2710         self.verify_tun_66(np, count=127)
2711
2712         # 3 phase rekey
2713         #  1) add two input SAs [old, new]
2714         #  2) swap output SA to [new]
2715         #  3) use only [new] input SA
2716         np3 = copy.copy(np)
2717         np3.crypt_key = b"Z" + p.crypt_key[1:]
2718         np3.scapy_tun_spi += 100
2719         np3.scapy_tun_sa_id += 1
2720         np3.vpp_tun_spi += 100
2721         np3.vpp_tun_sa_id += 1
2722         np3.tun_if.local_spi = p.vpp_tun_spi
2723         np3.tun_if.remote_spi = p.scapy_tun_spi
2724
2725         self.config_sa_tra(np3)
2726
2727         # step 1;
2728         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2729         self.verify_tun_66(np, np, count=127)
2730         self.verify_tun_66(np3, np, count=127)
2731
2732         # step 2;
2733         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2734         self.verify_tun_66(np, np3, count=127)
2735         self.verify_tun_66(np3, np3, count=127)
2736
2737         # step 1;
2738         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2739         self.verify_tun_66(np3, np3, count=127)
2740         self.verify_drop_tun_rx_66(np, count=127)
2741
2742         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2743         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2744         self.unconfig_sa(np)
2745
2746         # teardown
2747         self.unconfig_protect(np3)
2748         self.unconfig_sa(np3)
2749         self.unconfig_network(p)
2750
2751     def test_tun_46(self):
2752         """IPSEC tunnel protect 4o6"""
2753
2754         p = self.ipv6_params
2755
2756         self.config_network(p)
2757         self.config_sa_tra(p)
2758         self.config_protect(p)
2759
2760         self.verify_tun_46(p, count=127)
2761         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2762         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2763
2764         # teardown
2765         self.unconfig_protect(p)
2766         self.unconfig_sa(p)
2767         self.unconfig_network(p)
2768
2769
2770 @tag_fixme_vpp_workers
2771 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2772     """IPsec IPv6 Tunnel protect - tunnel mode"""
2773
2774     encryption_type = ESP
2775     tun6_encrypt_node_name = "esp6-encrypt-tun"
2776     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2777
2778     def setUp(self):
2779         super(TestIpsec6TunProtectTun, self).setUp()
2780
2781         self.tun_if = self.pg0
2782
2783     def tearDown(self):
2784         super(TestIpsec6TunProtectTun, self).tearDown()
2785
2786     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2787         return [
2788             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2789             / sa.encrypt(
2790                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2791                 / IPv6(src=src, dst=dst)
2792                 / UDP(sport=1166, dport=2233)
2793                 / Raw(b"X" * payload_size)
2794             )
2795             for i in range(count)
2796         ]
2797
2798     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2799         return [
2800             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2801             / IPv6(src=src, dst=dst)
2802             / UDP(sport=1166, dport=2233)
2803             / Raw(b"X" * payload_size)
2804             for i in range(count)
2805         ]
2806
2807     def verify_decrypted6(self, p, rxs):
2808         for rx in rxs:
2809             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2810             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2811             self.assert_packet_checksums_valid(rx)
2812
2813     def verify_encrypted6(self, p, sa, rxs):
2814         for rx in rxs:
2815             try:
2816                 pkt = sa.decrypt(rx[IPv6])
2817                 if not pkt.haslayer(IPv6):
2818                     pkt = IPv6(pkt[Raw].load)
2819                 self.assert_packet_checksums_valid(pkt)
2820                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2821                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2822                 inner = pkt[IPv6].payload
2823                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2824
2825             except (IndexError, AssertionError):
2826                 self.logger.debug(ppp("Unexpected packet:", rx))
2827                 try:
2828                     self.logger.debug(ppp("Decrypted packet:", pkt))
2829                 except:
2830                     pass
2831                 raise
2832
2833     def test_tun_66(self):
2834         """IPSEC tunnel protect"""
2835
2836         p = self.ipv6_params
2837
2838         self.config_network(p)
2839         self.config_sa_tun(p)
2840         self.config_protect(p)
2841
2842         self.verify_tun_66(p, count=127)
2843
2844         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2845         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2846
2847         # rekey - create new SAs and update the tunnel protection
2848         np = copy.copy(p)
2849         np.crypt_key = b"X" + p.crypt_key[1:]
2850         np.scapy_tun_spi += 100
2851         np.scapy_tun_sa_id += 1
2852         np.vpp_tun_spi += 100
2853         np.vpp_tun_sa_id += 1
2854         np.tun_if.local_spi = p.vpp_tun_spi
2855         np.tun_if.remote_spi = p.scapy_tun_spi
2856
2857         self.config_sa_tun(np)
2858         self.config_protect(np)
2859         self.unconfig_sa(p)
2860
2861         self.verify_tun_66(np, count=127)
2862         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2863         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2864
2865         # teardown
2866         self.unconfig_protect(np)
2867         self.unconfig_sa(np)
2868         self.unconfig_network(p)
2869
2870
2871 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2872     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2873
2874     encryption_type = ESP
2875     tun6_encrypt_node_name = "esp6-encrypt-tun"
2876     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2877
2878     def setUp(self):
2879         super(TestIpsec6TunProtectTunDrop, self).setUp()
2880
2881         self.tun_if = self.pg0
2882
2883     def tearDown(self):
2884         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2885
2886     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2887         # the IP destination of the revelaed packet does not match
2888         # that assigned to the tunnel
2889         return [
2890             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2891             / sa.encrypt(
2892                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2893                 / IPv6(src=src, dst=dst)
2894                 / UDP(sport=1144, dport=2233)
2895                 / Raw(b"X" * payload_size)
2896             )
2897             for i in range(count)
2898         ]
2899
2900     def test_tun_drop_66(self):
2901         """IPSEC 6 tunnel protect bogus tunnel header"""
2902
2903         p = self.ipv6_params
2904
2905         self.config_network(p)
2906         self.config_sa_tun(p)
2907         self.config_protect(p)
2908
2909         tx = self.gen_encrypt_pkts6(
2910             p,
2911             p.scapy_tun_sa,
2912             self.tun_if,
2913             src=p.remote_tun_if_host,
2914             dst=self.pg1.remote_ip6,
2915             count=63,
2916         )
2917         self.send_and_assert_no_replies(self.tun_if, tx)
2918
2919         self.unconfig_protect(p)
2920         self.unconfig_sa(p)
2921         self.unconfig_network(p)
2922
2923
2924 class TemplateIpsecItf4(object):
2925     """IPsec Interface IPv4"""
2926
2927     encryption_type = ESP
2928     tun4_encrypt_node_name = "esp4-encrypt-tun"
2929     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2930     tun4_input_node = "ipsec4-tun-input"
2931
2932     def config_sa_tun(self, p, src, dst):
2933         config_tun_params(p, self.encryption_type, None, src, dst)
2934
2935         p.tun_sa_out = VppIpsecSA(
2936             self,
2937             p.scapy_tun_sa_id,
2938             p.scapy_tun_spi,
2939             p.auth_algo_vpp_id,
2940             p.auth_key,
2941             p.crypt_algo_vpp_id,
2942             p.crypt_key,
2943             self.vpp_esp_protocol,
2944             src,
2945             dst,
2946             flags=p.flags,
2947         )
2948         p.tun_sa_out.add_vpp_config()
2949
2950         p.tun_sa_in = VppIpsecSA(
2951             self,
2952             p.vpp_tun_sa_id,
2953             p.vpp_tun_spi,
2954             p.auth_algo_vpp_id,
2955             p.auth_key,
2956             p.crypt_algo_vpp_id,
2957             p.crypt_key,
2958             self.vpp_esp_protocol,
2959             dst,
2960             src,
2961             flags=p.flags,
2962         )
2963         p.tun_sa_in.add_vpp_config()
2964
2965     def config_protect(self, p):
2966         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2967         p.tun_protect.add_vpp_config()
2968
2969     def config_network(self, p, instance=0xFFFFFFFF):
2970         p.tun_if = VppIpsecInterface(self, instance=instance)
2971
2972         p.tun_if.add_vpp_config()
2973         p.tun_if.admin_up()
2974         p.tun_if.config_ip4()
2975         p.tun_if.config_ip6()
2976
2977         p.route = VppIpRoute(
2978             self,
2979             p.remote_tun_if_host,
2980             32,
2981             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
2982         )
2983         p.route.add_vpp_config()
2984         r = VppIpRoute(
2985             self,
2986             p.remote_tun_if_host6,
2987             128,
2988             [
2989                 VppRoutePath(
2990                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2991                 )
2992             ],
2993         )
2994         r.add_vpp_config()
2995
2996     def unconfig_network(self, p):
2997         p.route.remove_vpp_config()
2998         p.tun_if.remove_vpp_config()
2999
3000     def unconfig_protect(self, p):
3001         p.tun_protect.remove_vpp_config()
3002
3003     def unconfig_sa(self, p):
3004         p.tun_sa_out.remove_vpp_config()
3005         p.tun_sa_in.remove_vpp_config()
3006
3007
3008 @tag_fixme_vpp_workers
3009 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3010     """IPsec Interface IPv4"""
3011
3012     def setUp(self):
3013         super(TestIpsecItf4, self).setUp()
3014
3015         self.tun_if = self.pg0
3016
3017     def tearDown(self):
3018         super(TestIpsecItf4, self).tearDown()
3019
3020     def test_tun_instance_44(self):
3021         p = self.ipv4_params
3022         self.config_network(p, instance=3)
3023
3024         with self.assertRaises(CliFailedCommandError):
3025             self.vapi.cli("show interface ipsec0")
3026
3027         output = self.vapi.cli("show interface ipsec3")
3028         self.assertTrue("unknown" not in output)
3029
3030         self.unconfig_network(p)
3031
3032     def test_tun_44(self):
3033         """IPSEC interface IPv4"""
3034
3035         n_pkts = 127
3036         p = self.ipv4_params
3037
3038         self.config_network(p)
3039         config_tun_params(
3040             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3041         )
3042         self.verify_tun_dropped_44(p, count=n_pkts)
3043         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3044         self.config_protect(p)
3045
3046         self.verify_tun_44(p, count=n_pkts)
3047         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3048         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3049
3050         p.tun_if.admin_down()
3051         self.verify_tun_dropped_44(p, count=n_pkts)
3052         p.tun_if.admin_up()
3053         self.verify_tun_44(p, count=n_pkts)
3054
3055         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3056         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3057
3058         # it's a v6 packet when its encrypted
3059         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3060
3061         self.verify_tun_64(p, count=n_pkts)
3062         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3063         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3064
3065         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3066
3067         self.vapi.cli("clear interfaces")
3068
3069         # rekey - create new SAs and update the tunnel protection
3070         np = copy.copy(p)
3071         np.crypt_key = b"X" + p.crypt_key[1:]
3072         np.scapy_tun_spi += 100
3073         np.scapy_tun_sa_id += 1
3074         np.vpp_tun_spi += 100
3075         np.vpp_tun_sa_id += 1
3076         np.tun_if.local_spi = p.vpp_tun_spi
3077         np.tun_if.remote_spi = p.scapy_tun_spi
3078
3079         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3080         self.config_protect(np)
3081         self.unconfig_sa(p)
3082
3083         self.verify_tun_44(np, count=n_pkts)
3084         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3085         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3086
3087         # teardown
3088         self.unconfig_protect(np)
3089         self.unconfig_sa(np)
3090         self.unconfig_network(p)
3091
3092     def test_tun_44_null(self):
3093         """IPSEC interface IPv4 NULL auth/crypto"""
3094
3095         n_pkts = 127
3096         p = copy.copy(self.ipv4_params)
3097
3098         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3099         p.crypt_algo_vpp_id = (
3100             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3101         )
3102         p.crypt_algo = "NULL"
3103         p.auth_algo = "NULL"
3104
3105         self.config_network(p)
3106         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3107         self.config_protect(p)
3108
3109         self.logger.info(self.vapi.cli("sh ipsec sa"))
3110         self.verify_tun_44(p, count=n_pkts)
3111
3112         # teardown
3113         self.unconfig_protect(p)
3114         self.unconfig_sa(p)
3115         self.unconfig_network(p)
3116
3117     def test_tun_44_police(self):
3118         """IPSEC interface IPv4 with input policer"""
3119         n_pkts = 127
3120         p = self.ipv4_params
3121
3122         self.config_network(p)
3123         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3124         self.config_protect(p)
3125
3126         action_tx = PolicerAction(
3127             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3128         )
3129         policer = VppPolicer(
3130             self,
3131             "pol1",
3132             80,
3133             0,
3134             1000,
3135             0,
3136             conform_action=action_tx,
3137             exceed_action=action_tx,
3138             violate_action=action_tx,
3139         )
3140         policer.add_vpp_config()
3141
3142         # Start policing on tun
3143         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3144
3145         self.verify_tun_44(p, count=n_pkts)
3146         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3147         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3148
3149         stats = policer.get_stats()
3150
3151         # Single rate, 2 colour policer - expect conform, violate but no exceed
3152         self.assertGreater(stats["conform_packets"], 0)
3153         self.assertEqual(stats["exceed_packets"], 0)
3154         self.assertGreater(stats["violate_packets"], 0)
3155
3156         # Stop policing on tun
3157         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3158         self.verify_tun_44(p, count=n_pkts)
3159
3160         # No new policer stats
3161         statsnew = policer.get_stats()
3162         self.assertEqual(stats, statsnew)
3163
3164         # teardown
3165         policer.remove_vpp_config()
3166         self.unconfig_protect(p)
3167         self.unconfig_sa(p)
3168         self.unconfig_network(p)
3169
3170
3171 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3172     """IPsec Interface MPLSoIPv4"""
3173
3174     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3175
3176     def setUp(self):
3177         super(TestIpsecItf4MPLS, self).setUp()
3178
3179         self.tun_if = self.pg0
3180
3181     def tearDown(self):
3182         super(TestIpsecItf4MPLS, self).tearDown()
3183
3184     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3185         return [
3186             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3187             / sa.encrypt(
3188                 MPLS(label=44, ttl=3)
3189                 / IP(src=src, dst=dst)
3190                 / UDP(sport=1166, dport=2233)
3191                 / Raw(b"X" * payload_size)
3192             )
3193             for i in range(count)
3194         ]
3195
3196     def verify_encrypted(self, p, sa, rxs):
3197         for rx in rxs:
3198             try:
3199                 pkt = sa.decrypt(rx[IP])
3200                 if not pkt.haslayer(IP):
3201                     pkt = IP(pkt[Raw].load)
3202                 self.assert_packet_checksums_valid(pkt)
3203                 self.assert_equal(pkt[MPLS].label, 44)
3204                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3205             except (IndexError, AssertionError):
3206                 self.logger.debug(ppp("Unexpected packet:", rx))
3207                 try:
3208                     self.logger.debug(ppp("Decrypted packet:", pkt))
3209                 except:
3210                     pass
3211                 raise
3212
3213     def test_tun_mpls_o_ip4(self):
3214         """IPSEC interface MPLS over IPv4"""
3215
3216         n_pkts = 127
3217         p = self.ipv4_params
3218         f = FibPathProto
3219
3220         tbl = VppMplsTable(self, 0)
3221         tbl.add_vpp_config()
3222
3223         self.config_network(p)
3224         # deag MPLS routes from the tunnel
3225         r4 = VppMplsRoute(
3226             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3227         ).add_vpp_config()
3228         p.route.modify(
3229             [
3230                 VppRoutePath(
3231                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3232                 )
3233             ]
3234         )
3235         p.tun_if.enable_mpls()
3236
3237         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3238         self.config_protect(p)
3239
3240         self.verify_tun_44(p, count=n_pkts)
3241
3242         # cleanup
3243         p.tun_if.disable_mpls()
3244         self.unconfig_protect(p)
3245         self.unconfig_sa(p)
3246         self.unconfig_network(p)
3247
3248
3249 class TemplateIpsecItf6(object):
3250     """IPsec Interface IPv6"""
3251
3252     encryption_type = ESP
3253     tun6_encrypt_node_name = "esp6-encrypt-tun"
3254     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3255     tun6_input_node = "ipsec6-tun-input"
3256
3257     def config_sa_tun(self, p, src, dst):
3258         config_tun_params(p, self.encryption_type, None, src, dst)
3259
3260         if not hasattr(p, "tun_flags"):
3261             p.tun_flags = None
3262         if not hasattr(p, "hop_limit"):
3263             p.hop_limit = 255
3264
3265         p.tun_sa_out = VppIpsecSA(
3266             self,
3267             p.scapy_tun_sa_id,
3268             p.scapy_tun_spi,
3269             p.auth_algo_vpp_id,
3270             p.auth_key,
3271             p.crypt_algo_vpp_id,
3272             p.crypt_key,
3273             self.vpp_esp_protocol,
3274             src,
3275             dst,
3276             flags=p.flags,
3277             tun_flags=p.tun_flags,
3278             hop_limit=p.hop_limit,
3279         )
3280         p.tun_sa_out.add_vpp_config()
3281
3282         p.tun_sa_in = VppIpsecSA(
3283             self,
3284             p.vpp_tun_sa_id,
3285             p.vpp_tun_spi,
3286             p.auth_algo_vpp_id,
3287             p.auth_key,
3288             p.crypt_algo_vpp_id,
3289             p.crypt_key,
3290             self.vpp_esp_protocol,
3291             dst,
3292             src,
3293             flags=p.flags,
3294         )
3295         p.tun_sa_in.add_vpp_config()
3296
3297     def config_protect(self, p):
3298         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3299         p.tun_protect.add_vpp_config()
3300
3301     def config_network(self, p):
3302         p.tun_if = VppIpsecInterface(self)
3303
3304         p.tun_if.add_vpp_config()
3305         p.tun_if.admin_up()
3306         p.tun_if.config_ip4()
3307         p.tun_if.config_ip6()
3308
3309         r = VppIpRoute(
3310             self,
3311             p.remote_tun_if_host4,
3312             32,
3313             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3314         )
3315         r.add_vpp_config()
3316
3317         p.route = VppIpRoute(
3318             self,
3319             p.remote_tun_if_host,
3320             128,
3321             [
3322                 VppRoutePath(
3323                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3324                 )
3325             ],
3326         )
3327         p.route.add_vpp_config()
3328
3329     def unconfig_network(self, p):
3330         p.route.remove_vpp_config()
3331         p.tun_if.remove_vpp_config()
3332
3333     def unconfig_protect(self, p):
3334         p.tun_protect.remove_vpp_config()
3335
3336     def unconfig_sa(self, p):
3337         p.tun_sa_out.remove_vpp_config()
3338         p.tun_sa_in.remove_vpp_config()
3339
3340
3341 @tag_fixme_vpp_workers
3342 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3343     """IPsec Interface IPv6"""
3344
3345     def setUp(self):
3346         super(TestIpsecItf6, self).setUp()
3347
3348         self.tun_if = self.pg0
3349
3350     def tearDown(self):
3351         super(TestIpsecItf6, self).tearDown()
3352
3353     def test_tun_66(self):
3354         """IPSEC interface IPv6"""
3355
3356         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3357         n_pkts = 127
3358         p = self.ipv6_params
3359         p.inner_hop_limit = 24
3360         p.outer_hop_limit = 23
3361         p.outer_flow_label = 243224
3362         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3363
3364         self.config_network(p)
3365         config_tun_params(
3366             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3367         )
3368         self.verify_drop_tun_66(p, count=n_pkts)
3369         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3370         self.config_protect(p)
3371
3372         self.verify_tun_66(p, count=n_pkts)
3373         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3374         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3375
3376         p.tun_if.admin_down()
3377         self.verify_drop_tun_66(p, count=n_pkts)
3378         p.tun_if.admin_up()
3379         self.verify_tun_66(p, count=n_pkts)
3380
3381         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3382         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3383
3384         # it's a v4 packet when its encrypted
3385         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3386
3387         self.verify_tun_46(p, count=n_pkts)
3388         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3389         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3390
3391         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3392
3393         self.vapi.cli("clear interfaces")
3394
3395         # rekey - create new SAs and update the tunnel protection
3396         np = copy.copy(p)
3397         np.crypt_key = b"X" + p.crypt_key[1:]
3398         np.scapy_tun_spi += 100
3399         np.scapy_tun_sa_id += 1
3400         np.vpp_tun_spi += 100
3401         np.vpp_tun_sa_id += 1
3402         np.tun_if.local_spi = p.vpp_tun_spi
3403         np.tun_if.remote_spi = p.scapy_tun_spi
3404         np.inner_hop_limit = 24
3405         np.outer_hop_limit = 128
3406         np.inner_flow_label = 0xABCDE
3407         np.outer_flow_label = 0xABCDE
3408         np.hop_limit = 128
3409         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3410
3411         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3412         self.config_protect(np)
3413         self.unconfig_sa(p)
3414
3415         self.verify_tun_66(np, count=n_pkts)
3416         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3417         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3418
3419         # teardown
3420         self.unconfig_protect(np)
3421         self.unconfig_sa(np)
3422         self.unconfig_network(p)
3423
3424     def test_tun_66_police(self):
3425         """IPSEC interface IPv6 with input policer"""
3426         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3427         n_pkts = 127
3428         p = self.ipv6_params
3429         p.inner_hop_limit = 24
3430         p.outer_hop_limit = 23
3431         p.outer_flow_label = 243224
3432         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3433
3434         self.config_network(p)
3435         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3436         self.config_protect(p)
3437
3438         action_tx = PolicerAction(
3439             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3440         )
3441         policer = VppPolicer(
3442             self,
3443             "pol1",
3444             80,
3445             0,
3446             1000,
3447             0,
3448             conform_action=action_tx,
3449             exceed_action=action_tx,
3450             violate_action=action_tx,
3451         )
3452         policer.add_vpp_config()
3453
3454         # Start policing on tun
3455         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3456
3457         self.verify_tun_66(p, count=n_pkts)
3458         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3459         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3460
3461         stats = policer.get_stats()
3462
3463         # Single rate, 2 colour policer - expect conform, violate but no exceed
3464         self.assertGreater(stats["conform_packets"], 0)
3465         self.assertEqual(stats["exceed_packets"], 0)
3466         self.assertGreater(stats["violate_packets"], 0)
3467
3468         # Stop policing on tun
3469         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3470         self.verify_tun_66(p, count=n_pkts)
3471
3472         # No new policer stats
3473         statsnew = policer.get_stats()
3474         self.assertEqual(stats, statsnew)
3475
3476         # teardown
3477         policer.remove_vpp_config()
3478         self.unconfig_protect(p)
3479         self.unconfig_sa(p)
3480         self.unconfig_network(p)
3481
3482
3483 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3484     """Ipsec P2MP ESP v4 tests"""
3485
3486     tun4_encrypt_node_name = "esp4-encrypt-tun"
3487     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3488     encryption_type = ESP
3489
3490     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3491         return [
3492             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3493             / sa.encrypt(
3494                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3495                 / UDP(sport=1144, dport=2233)
3496                 / Raw(b"X" * payload_size)
3497             )
3498             for i in range(count)
3499         ]
3500
3501     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3502         return [
3503             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3504             / IP(src="1.1.1.1", dst=dst)
3505             / UDP(sport=1144, dport=2233)
3506             / Raw(b"X" * payload_size)
3507             for i in range(count)
3508         ]
3509
3510     def verify_decrypted(self, p, rxs):
3511         for rx in rxs:
3512             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3513             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3514
3515     def verify_encrypted(self, p, sa, rxs):
3516         for rx in rxs:
3517             try:
3518                 self.assertEqual(
3519                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3520                 )
3521                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3522                 pkt = sa.decrypt(rx[IP])
3523                 if not pkt.haslayer(IP):
3524                     pkt = IP(pkt[Raw].load)
3525                 self.assert_packet_checksums_valid(pkt)
3526                 e = pkt[IP]
3527                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3528             except (IndexError, AssertionError):
3529                 self.logger.debug(ppp("Unexpected packet:", rx))
3530                 try:
3531                     self.logger.debug(ppp("Decrypted packet:", pkt))
3532                 except:
3533                     pass
3534                 raise
3535
3536     def setUp(self):
3537         super(TestIpsecMIfEsp4, self).setUp()
3538
3539         N_NHS = 16
3540         self.tun_if = self.pg0
3541         p = self.ipv4_params
3542         p.tun_if = VppIpsecInterface(
3543             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3544         )
3545         p.tun_if.add_vpp_config()
3546         p.tun_if.admin_up()
3547         p.tun_if.config_ip4()
3548         p.tun_if.unconfig_ip4()
3549         p.tun_if.config_ip4()
3550         p.tun_if.generate_remote_hosts(N_NHS)
3551         self.pg0.generate_remote_hosts(N_NHS)
3552         self.pg0.configure_ipv4_neighbors()
3553
3554         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3555         a = VppAcl(self, [r_all]).add_vpp_config()
3556
3557         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3558         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3559
3560         # setup some SAs for several next-hops on the interface
3561         self.multi_params = []
3562
3563         for ii in range(N_NHS):
3564             p = copy.copy(self.ipv4_params)
3565
3566             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3567             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3568             p.scapy_tun_spi = p.scapy_tun_spi + ii
3569             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3570             p.vpp_tun_spi = p.vpp_tun_spi + ii
3571
3572             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3573             p.scapy_tra_spi = p.scapy_tra_spi + ii
3574             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3575             p.vpp_tra_spi = p.vpp_tra_spi + ii
3576             p.hop_limit = ii + 10
3577             p.tun_sa_out = VppIpsecSA(
3578                 self,
3579                 p.scapy_tun_sa_id,
3580                 p.scapy_tun_spi,
3581                 p.auth_algo_vpp_id,
3582                 p.auth_key,
3583                 p.crypt_algo_vpp_id,
3584                 p.crypt_key,
3585                 self.vpp_esp_protocol,
3586                 self.pg0.local_ip4,
3587                 self.pg0.remote_hosts[ii].ip4,
3588                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3589                 hop_limit=p.hop_limit,
3590             )
3591             p.tun_sa_out.add_vpp_config()
3592
3593             p.tun_sa_in = VppIpsecSA(
3594                 self,
3595                 p.vpp_tun_sa_id,
3596                 p.vpp_tun_spi,
3597                 p.auth_algo_vpp_id,
3598                 p.auth_key,
3599                 p.crypt_algo_vpp_id,
3600                 p.crypt_key,
3601                 self.vpp_esp_protocol,
3602                 self.pg0.remote_hosts[ii].ip4,
3603                 self.pg0.local_ip4,
3604                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3605                 hop_limit=p.hop_limit,
3606             )
3607             p.tun_sa_in.add_vpp_config()
3608
3609             p.tun_protect = VppIpsecTunProtect(
3610                 self,
3611                 p.tun_if,
3612                 p.tun_sa_out,
3613                 [p.tun_sa_in],
3614                 nh=p.tun_if.remote_hosts[ii].ip4,
3615             )
3616             p.tun_protect.add_vpp_config()
3617             config_tun_params(
3618                 p,
3619                 self.encryption_type,
3620                 None,
3621                 self.pg0.local_ip4,
3622                 self.pg0.remote_hosts[ii].ip4,
3623             )
3624             self.multi_params.append(p)
3625
3626             p.via_tun_route = VppIpRoute(
3627                 self,
3628                 p.remote_tun_if_host,
3629                 32,
3630                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3631             ).add_vpp_config()
3632
3633             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3634
3635     def tearDown(self):
3636         p = self.ipv4_params
3637         p.tun_if.unconfig_ip4()
3638         super(TestIpsecMIfEsp4, self).tearDown()
3639
3640     def test_tun_44(self):
3641         """P2MP IPSEC 44"""
3642         N_PKTS = 63
3643         for p in self.multi_params:
3644             self.verify_tun_44(p, count=N_PKTS)
3645
3646         # remove one tunnel protect, the rest should still work
3647         self.multi_params[0].tun_protect.remove_vpp_config()
3648         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3649         self.multi_params[0].via_tun_route.remove_vpp_config()
3650         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3651
3652         for p in self.multi_params[1:]:
3653             self.verify_tun_44(p, count=N_PKTS)
3654
3655         self.multi_params[0].tun_protect.add_vpp_config()
3656         self.multi_params[0].via_tun_route.add_vpp_config()
3657
3658         for p in self.multi_params:
3659             self.verify_tun_44(p, count=N_PKTS)
3660
3661
3662 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3663     """IPsec Interface MPLSoIPv6"""
3664
3665     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3666
3667     def setUp(self):
3668         super(TestIpsecItf6MPLS, self).setUp()
3669
3670         self.tun_if = self.pg0
3671
3672     def tearDown(self):
3673         super(TestIpsecItf6MPLS, self).tearDown()
3674
3675     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3676         return [
3677             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3678             / sa.encrypt(
3679                 MPLS(label=66, ttl=3)
3680                 / IPv6(src=src, dst=dst)
3681                 / UDP(sport=1166, dport=2233)
3682                 / Raw(b"X" * payload_size)
3683             )
3684             for i in range(count)
3685         ]
3686
3687     def verify_encrypted6(self, p, sa, rxs):
3688         for rx in rxs:
3689             try:
3690                 pkt = sa.decrypt(rx[IPv6])
3691                 if not pkt.haslayer(IPv6):
3692                     pkt = IP(pkt[Raw].load)
3693                 self.assert_packet_checksums_valid(pkt)
3694                 self.assert_equal(pkt[MPLS].label, 66)
3695                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3696             except (IndexError, AssertionError):
3697                 self.logger.debug(ppp("Unexpected packet:", rx))
3698                 try:
3699                     self.logger.debug(ppp("Decrypted packet:", pkt))
3700                 except:
3701                     pass
3702                 raise
3703
3704     def test_tun_mpls_o_ip6(self):
3705         """IPSEC interface MPLS over IPv6"""
3706
3707         n_pkts = 127
3708         p = self.ipv6_params
3709         f = FibPathProto
3710
3711         tbl = VppMplsTable(self, 0)
3712         tbl.add_vpp_config()
3713
3714         self.config_network(p)
3715         # deag MPLS routes from the tunnel
3716         r6 = VppMplsRoute(
3717             self,
3718             66,
3719             1,
3720             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3721             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3722         ).add_vpp_config()
3723         p.route.modify(
3724             [
3725                 VppRoutePath(
3726                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3727                 )
3728             ]
3729         )
3730         p.tun_if.enable_mpls()
3731
3732         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3733         self.config_protect(p)
3734
3735         self.verify_tun_66(p, count=n_pkts)
3736
3737         # cleanup
3738         p.tun_if.disable_mpls()
3739         self.unconfig_protect(p)
3740         self.unconfig_sa(p)
3741         self.unconfig_network(p)
3742
3743
3744 if __name__ == "__main__":
3745     unittest.main(testRunner=VppTestRunner)