61a66d40a4e7afeaf0ab2b1ed32ce07e9bfac637
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
14     TemplateIpsec,
15     IpsecTun4Tests,
16     IpsecTun6Tests,
17     IpsecTun4,
18     IpsecTun6,
19     IpsecTcpTests,
20     mk_scapy_crypt_key,
21     IpsecTun6HandoffTests,
22     IpsecTun4HandoffTests,
23     config_tun_params,
24 )
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
28     VppIpRoute,
29     VppRoutePath,
30     DpoProto,
31     VppMplsLabel,
32     VppMplsTable,
33     VppMplsRoute,
34     FibPathProto,
35 )
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
40 from util import ppp
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
45
46
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
49     esn_en = bool(
50         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
51     )
52     crypt_key = mk_scapy_crypt_key(p)
53     if tun_if:
54         p.tun_dst = tun_if.remote_ip
55         p.tun_src = tun_if.local_ip
56     else:
57         p.tun_dst = dst
58         p.tun_src = src
59
60     if p.nat_header:
61         is_default_port = p.nat_header.dport == 4500
62     else:
63         is_default_port = True
64
65     if is_default_port:
66         outbound_nat_header = p.nat_header
67     else:
68         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69         bind_layers(UDP, ESP, dport=p.nat_header.dport)
70
71     p.scapy_tun_sa = SecurityAssociation(
72         encryption_type,
73         spi=p.vpp_tun_spi,
74         crypt_algo=p.crypt_algo,
75         crypt_key=crypt_key,
76         auth_algo=p.auth_algo,
77         auth_key=p.auth_key,
78         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79         nat_t_header=outbound_nat_header,
80         esn_en=esn_en,
81     )
82     p.vpp_tun_sa = SecurityAssociation(
83         encryption_type,
84         spi=p.scapy_tun_spi,
85         crypt_algo=p.crypt_algo,
86         crypt_key=crypt_key,
87         auth_algo=p.auth_algo,
88         auth_key=p.auth_key,
89         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90         nat_t_header=p.nat_header,
91         esn_en=esn_en,
92     )
93
94
95 def config_tra_params(p, encryption_type, tun_if):
96     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97     esn_en = bool(
98         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
99     )
100     crypt_key = mk_scapy_crypt_key(p)
101     p.tun_dst = tun_if.remote_ip
102     p.tun_src = tun_if.local_ip
103
104     if p.nat_header:
105         is_default_port = p.nat_header.dport == 4500
106     else:
107         is_default_port = True
108
109     if is_default_port:
110         outbound_nat_header = p.nat_header
111     else:
112         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113         bind_layers(UDP, ESP, dport=p.nat_header.dport)
114
115     p.scapy_tun_sa = SecurityAssociation(
116         encryption_type,
117         spi=p.vpp_tun_spi,
118         crypt_algo=p.crypt_algo,
119         crypt_key=crypt_key,
120         auth_algo=p.auth_algo,
121         auth_key=p.auth_key,
122         esn_en=esn_en,
123         nat_t_header=outbound_nat_header,
124     )
125     p.vpp_tun_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.scapy_tun_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         esn_en=esn_en,
133         nat_t_header=p.nat_header,
134     )
135
136
137 class TemplateIpsec4TunProtect(object):
138     """IPsec IPv4 Tunnel protect"""
139
140     encryption_type = ESP
141     tun4_encrypt_node_name = "esp4-encrypt-tun"
142     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143     tun4_input_node = "ipsec4-tun-input"
144
145     def config_sa_tra(self, p):
146         config_tun_params(p, self.encryption_type, p.tun_if)
147
148         p.tun_sa_out = VppIpsecSA(
149             self,
150             p.scapy_tun_sa_id,
151             p.scapy_tun_spi,
152             p.auth_algo_vpp_id,
153             p.auth_key,
154             p.crypt_algo_vpp_id,
155             p.crypt_key,
156             self.vpp_esp_protocol,
157             flags=p.flags,
158         )
159         p.tun_sa_out.add_vpp_config()
160
161         p.tun_sa_in = VppIpsecSA(
162             self,
163             p.vpp_tun_sa_id,
164             p.vpp_tun_spi,
165             p.auth_algo_vpp_id,
166             p.auth_key,
167             p.crypt_algo_vpp_id,
168             p.crypt_key,
169             self.vpp_esp_protocol,
170             flags=p.flags,
171         )
172         p.tun_sa_in.add_vpp_config()
173
174     def config_sa_tun(self, p):
175         config_tun_params(p, self.encryption_type, p.tun_if)
176
177         p.tun_sa_out = VppIpsecSA(
178             self,
179             p.scapy_tun_sa_id,
180             p.scapy_tun_spi,
181             p.auth_algo_vpp_id,
182             p.auth_key,
183             p.crypt_algo_vpp_id,
184             p.crypt_key,
185             self.vpp_esp_protocol,
186             self.tun_if.local_addr[p.addr_type],
187             self.tun_if.remote_addr[p.addr_type],
188             flags=p.flags,
189         )
190         p.tun_sa_out.add_vpp_config()
191
192         p.tun_sa_in = VppIpsecSA(
193             self,
194             p.vpp_tun_sa_id,
195             p.vpp_tun_spi,
196             p.auth_algo_vpp_id,
197             p.auth_key,
198             p.crypt_algo_vpp_id,
199             p.crypt_key,
200             self.vpp_esp_protocol,
201             self.tun_if.remote_addr[p.addr_type],
202             self.tun_if.local_addr[p.addr_type],
203             flags=p.flags,
204         )
205         p.tun_sa_in.add_vpp_config()
206
207     def config_protect(self, p):
208         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209         p.tun_protect.add_vpp_config()
210
211     def config_network(self, p):
212         if hasattr(p, "tun_dst"):
213             tun_dst = p.tun_dst
214         else:
215             tun_dst = self.pg0.remote_ip4
216         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217         p.tun_if.add_vpp_config()
218         p.tun_if.admin_up()
219         p.tun_if.config_ip4()
220         p.tun_if.config_ip6()
221
222         p.route = VppIpRoute(
223             self,
224             p.remote_tun_if_host,
225             32,
226             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
227         )
228         p.route.add_vpp_config()
229         r = VppIpRoute(
230             self,
231             p.remote_tun_if_host6,
232             128,
233             [
234                 VppRoutePath(
235                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
236                 )
237             ],
238         )
239         r.add_vpp_config()
240
241     def unconfig_network(self, p):
242         p.route.remove_vpp_config()
243         p.tun_if.remove_vpp_config()
244
245     def unconfig_protect(self, p):
246         p.tun_protect.remove_vpp_config()
247
248     def unconfig_sa(self, p):
249         p.tun_sa_out.remove_vpp_config()
250         p.tun_sa_in.remove_vpp_config()
251
252
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254     """IPsec tunnel interface tests"""
255
256     encryption_type = ESP
257
258     @classmethod
259     def setUpClass(cls):
260         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
265
266     def setUp(self):
267         super(TemplateIpsec4TunIfEsp, self).setUp()
268
269         self.tun_if = self.pg0
270
271         p = self.ipv4_params
272
273         self.config_network(p)
274         self.config_sa_tra(p)
275         self.config_protect(p)
276
277     def tearDown(self):
278         super(TemplateIpsec4TunIfEsp, self).tearDown()
279
280
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282     """IPsec UDP tunnel interface tests"""
283
284     tun4_encrypt_node_name = "esp4-encrypt-tun"
285     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286     encryption_type = ESP
287
288     @classmethod
289     def setUpClass(cls):
290         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
291
292     @classmethod
293     def tearDownClass(cls):
294         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
295
296     def verify_encrypted(self, p, sa, rxs):
297         for rx in rxs:
298             try:
299                 # ensure the UDP ports are correct before we decrypt
300                 # which strips them
301                 self.assertTrue(rx.haslayer(UDP))
302                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303                 self.assert_equal(rx[UDP].dport, 4500)
304
305                 pkt = sa.decrypt(rx[IP])
306                 if not pkt.haslayer(IP):
307                     pkt = IP(pkt[Raw].load)
308
309                 self.assert_packet_checksums_valid(pkt)
310                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312             except (IndexError, AssertionError):
313                 self.logger.debug(ppp("Unexpected packet:", rx))
314                 try:
315                     self.logger.debug(ppp("Decrypted packet:", pkt))
316                 except:
317                     pass
318                 raise
319
320     def config_sa_tra(self, p):
321         config_tun_params(p, self.encryption_type, p.tun_if)
322
323         p.tun_sa_out = VppIpsecSA(
324             self,
325             p.scapy_tun_sa_id,
326             p.scapy_tun_spi,
327             p.auth_algo_vpp_id,
328             p.auth_key,
329             p.crypt_algo_vpp_id,
330             p.crypt_key,
331             self.vpp_esp_protocol,
332             flags=p.flags,
333             udp_src=p.nat_header.sport,
334             udp_dst=p.nat_header.dport,
335         )
336         p.tun_sa_out.add_vpp_config()
337
338         p.tun_sa_in = VppIpsecSA(
339             self,
340             p.vpp_tun_sa_id,
341             p.vpp_tun_spi,
342             p.auth_algo_vpp_id,
343             p.auth_key,
344             p.crypt_algo_vpp_id,
345             p.crypt_key,
346             self.vpp_esp_protocol,
347             flags=p.flags,
348             udp_src=p.nat_header.sport,
349             udp_dst=p.nat_header.dport,
350         )
351         p.tun_sa_in.add_vpp_config()
352
353     def setUp(self):
354         super(TemplateIpsec4TunIfEspUdp, self).setUp()
355
356         p = self.ipv4_params
357         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358         p.nat_header = UDP(sport=5454, dport=4500)
359
360         self.tun_if = self.pg0
361
362         self.config_network(p)
363         self.config_sa_tra(p)
364         self.config_protect(p)
365
366     def tearDown(self):
367         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
368
369
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371     """Ipsec ESP - TUN tests"""
372
373     tun4_encrypt_node_name = "esp4-encrypt-tun"
374     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
375
376     def test_tun_basic64(self):
377         """ipsec 6o4 tunnel basic test"""
378         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
379
380         self.verify_tun_64(self.params[socket.AF_INET], count=1)
381
382     def test_tun_burst64(self):
383         """ipsec 6o4 tunnel basic test"""
384         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
385
386         self.verify_tun_64(self.params[socket.AF_INET], count=257)
387
388     def test_tun_basic_frag44(self):
389         """ipsec 4o4 tunnel frag basic test"""
390         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
391
392         p = self.ipv4_params
393
394         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
395         self.verify_tun_44(
396             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
397         )
398         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
399
400
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402     """Ipsec ESP UDP tests"""
403
404     tun4_input_node = "ipsec4-tun-input"
405
406     def setUp(self):
407         super(TestIpsec4TunIfEspUdp, self).setUp()
408
409     def test_keepalive(self):
410         """IPSEC NAT Keepalive"""
411         self.verify_keepalive(self.ipv4_params)
412
413
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415     """Ipsec ESP UDP GCM tests"""
416
417     tun4_input_node = "ipsec4-tun-input"
418
419     def setUp(self):
420         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
421         p = self.ipv4_params
422         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423         p.crypt_algo_vpp_id = (
424             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
425         )
426         p.crypt_algo = "AES-GCM"
427         p.auth_algo = "NULL"
428         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
429         p.salt = 0
430
431
432 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
433     """Ipsec ESP - TCP tests"""
434
435     pass
436
437
438 class TemplateIpsec6TunProtect(object):
439     """IPsec IPv6 Tunnel protect"""
440
441     def config_sa_tra(self, p):
442         config_tun_params(p, self.encryption_type, p.tun_if)
443
444         p.tun_sa_out = VppIpsecSA(
445             self,
446             p.scapy_tun_sa_id,
447             p.scapy_tun_spi,
448             p.auth_algo_vpp_id,
449             p.auth_key,
450             p.crypt_algo_vpp_id,
451             p.crypt_key,
452             self.vpp_esp_protocol,
453         )
454         p.tun_sa_out.add_vpp_config()
455
456         p.tun_sa_in = VppIpsecSA(
457             self,
458             p.vpp_tun_sa_id,
459             p.vpp_tun_spi,
460             p.auth_algo_vpp_id,
461             p.auth_key,
462             p.crypt_algo_vpp_id,
463             p.crypt_key,
464             self.vpp_esp_protocol,
465         )
466         p.tun_sa_in.add_vpp_config()
467
468     def config_sa_tun(self, p):
469         config_tun_params(p, self.encryption_type, p.tun_if)
470
471         p.tun_sa_out = VppIpsecSA(
472             self,
473             p.scapy_tun_sa_id,
474             p.scapy_tun_spi,
475             p.auth_algo_vpp_id,
476             p.auth_key,
477             p.crypt_algo_vpp_id,
478             p.crypt_key,
479             self.vpp_esp_protocol,
480             self.tun_if.local_addr[p.addr_type],
481             self.tun_if.remote_addr[p.addr_type],
482         )
483         p.tun_sa_out.add_vpp_config()
484
485         p.tun_sa_in = VppIpsecSA(
486             self,
487             p.vpp_tun_sa_id,
488             p.vpp_tun_spi,
489             p.auth_algo_vpp_id,
490             p.auth_key,
491             p.crypt_algo_vpp_id,
492             p.crypt_key,
493             self.vpp_esp_protocol,
494             self.tun_if.remote_addr[p.addr_type],
495             self.tun_if.local_addr[p.addr_type],
496         )
497         p.tun_sa_in.add_vpp_config()
498
499     def config_protect(self, p):
500         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
501         p.tun_protect.add_vpp_config()
502
503     def config_network(self, p):
504         if hasattr(p, "tun_dst"):
505             tun_dst = p.tun_dst
506         else:
507             tun_dst = self.pg0.remote_ip6
508         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
509         p.tun_if.add_vpp_config()
510         p.tun_if.admin_up()
511         p.tun_if.config_ip6()
512         p.tun_if.config_ip4()
513
514         p.route = VppIpRoute(
515             self,
516             p.remote_tun_if_host,
517             128,
518             [
519                 VppRoutePath(
520                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
521                 )
522             ],
523         )
524         p.route.add_vpp_config()
525         r = VppIpRoute(
526             self,
527             p.remote_tun_if_host4,
528             32,
529             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
530         )
531         r.add_vpp_config()
532
533     def unconfig_network(self, p):
534         p.route.remove_vpp_config()
535         p.tun_if.remove_vpp_config()
536
537     def unconfig_protect(self, p):
538         p.tun_protect.remove_vpp_config()
539
540     def unconfig_sa(self, p):
541         p.tun_sa_out.remove_vpp_config()
542         p.tun_sa_in.remove_vpp_config()
543
544
545 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
546     """IPsec tunnel interface tests"""
547
548     encryption_type = ESP
549
550     def setUp(self):
551         super(TemplateIpsec6TunIfEsp, self).setUp()
552
553         self.tun_if = self.pg0
554
555         p = self.ipv6_params
556         self.config_network(p)
557         self.config_sa_tra(p)
558         self.config_protect(p)
559
560     def tearDown(self):
561         super(TemplateIpsec6TunIfEsp, self).tearDown()
562
563
564 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
565     """IPsec6 UDP tunnel interface tests"""
566
567     tun4_encrypt_node_name = "esp6-encrypt-tun"
568     tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
569     encryption_type = ESP
570
571     @classmethod
572     def setUpClass(cls):
573         super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
574
575     @classmethod
576     def tearDownClass(cls):
577         super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
578
579     def verify_encrypted(self, p, sa, rxs):
580         for rx in rxs:
581             try:
582                 # ensure the UDP ports are correct before we decrypt
583                 # which strips them
584                 self.assertTrue(rx.haslayer(UDP))
585                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
586                 self.assert_equal(rx[UDP].dport, 4500)
587
588                 pkt = sa.decrypt(rx[IP])
589                 if not pkt.haslayer(IP):
590                     pkt = IP(pkt[Raw].load)
591
592                 self.assert_packet_checksums_valid(pkt)
593                 self.assert_equal(
594                     pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
595                 )
596                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
597             except (IndexError, AssertionError):
598                 self.logger.debug(ppp("Unexpected packet:", rx))
599                 try:
600                     self.logger.debug(ppp("Decrypted packet:", pkt))
601                 except:
602                     pass
603                 raise
604
605     def config_sa_tra(self, p):
606         config_tun_params(p, self.encryption_type, p.tun_if)
607
608         p.tun_sa_out = VppIpsecSA(
609             self,
610             p.scapy_tun_sa_id,
611             p.scapy_tun_spi,
612             p.auth_algo_vpp_id,
613             p.auth_key,
614             p.crypt_algo_vpp_id,
615             p.crypt_key,
616             self.vpp_esp_protocol,
617             flags=p.flags,
618             udp_src=p.nat_header.sport,
619             udp_dst=p.nat_header.dport,
620         )
621         p.tun_sa_out.add_vpp_config()
622
623         p.tun_sa_in = VppIpsecSA(
624             self,
625             p.vpp_tun_sa_id,
626             p.vpp_tun_spi,
627             p.auth_algo_vpp_id,
628             p.auth_key,
629             p.crypt_algo_vpp_id,
630             p.crypt_key,
631             self.vpp_esp_protocol,
632             flags=p.flags,
633             udp_src=p.nat_header.sport,
634             udp_dst=p.nat_header.dport,
635         )
636         p.tun_sa_in.add_vpp_config()
637
638     def setUp(self):
639         super(TemplateIpsec6TunIfEspUdp, self).setUp()
640
641         p = self.ipv6_params
642         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
643         p.nat_header = UDP(sport=5454, dport=4500)
644
645         self.tun_if = self.pg0
646
647         self.config_network(p)
648         self.config_sa_tra(p)
649         self.config_protect(p)
650
651     def tearDown(self):
652         super(TemplateIpsec6TunIfEspUdp, self).tearDown()
653
654
655 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
656     """Ipsec ESP 6 UDP tests"""
657
658     tun6_input_node = "ipsec6-tun-input"
659     tun6_encrypt_node_name = "esp6-encrypt-tun"
660     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
661
662     def setUp(self):
663         super(TestIpsec6TunIfEspUdp, self).setUp()
664
665     def test_keepalive(self):
666         """IPSEC6 NAT Keepalive"""
667         self.verify_keepalive(self.ipv6_params)
668
669
670 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
671     """Ipsec ESP 6 UDP GCM tests"""
672
673     tun6_input_node = "ipsec6-tun-input"
674     tun6_encrypt_node_name = "esp6-encrypt-tun"
675     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
676
677     def setUp(self):
678         super(TestIpsec6TunIfEspUdpGCM, self).setUp()
679         p = self.ipv6_params
680         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
681         p.crypt_algo_vpp_id = (
682             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
683         )
684         p.crypt_algo = "AES-GCM"
685         p.auth_algo = "NULL"
686         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
687         p.salt = 0
688
689
690 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
691     """Ipsec ESP - TUN tests"""
692
693     tun6_encrypt_node_name = "esp6-encrypt-tun"
694     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
695
696     def test_tun_basic46(self):
697         """ipsec 4o6 tunnel basic test"""
698         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
699         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
700
701     def test_tun_burst46(self):
702         """ipsec 4o6 tunnel burst test"""
703         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
704         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
705
706
707 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
708     """Ipsec ESP 6 Handoff tests"""
709
710     tun6_encrypt_node_name = "esp6-encrypt-tun"
711     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
712
713     def test_tun_handoff_66_police(self):
714         """ESP 6o6 tunnel with policer worker hand-off test"""
715         self.vapi.cli("clear errors")
716         self.vapi.cli("clear ipsec sa")
717
718         N_PKTS = 15
719         p = self.params[socket.AF_INET6]
720
721         action_tx = PolicerAction(
722             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
723         )
724         policer = VppPolicer(
725             self,
726             "pol1",
727             80,
728             0,
729             1000,
730             0,
731             conform_action=action_tx,
732             exceed_action=action_tx,
733             violate_action=action_tx,
734         )
735         policer.add_vpp_config()
736
737         # Start policing on tun
738         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
739
740         for pol_bind in [1, 0]:
741             policer.bind_vpp_config(pol_bind, True)
742
743             # inject alternately on worker 0 and 1.
744             for worker in [0, 1, 0, 1]:
745                 send_pkts = self.gen_encrypt_pkts6(
746                     p,
747                     p.scapy_tun_sa,
748                     self.tun_if,
749                     src=p.remote_tun_if_host,
750                     dst=self.pg1.remote_ip6,
751                     count=N_PKTS,
752                 )
753                 recv_pkts = self.send_and_expect(
754                     self.tun_if, send_pkts, self.pg1, worker=worker
755                 )
756                 self.verify_decrypted6(p, recv_pkts)
757                 self.logger.debug(self.vapi.cli("show trace max 100"))
758
759             stats = policer.get_stats()
760             stats0 = policer.get_stats(worker=0)
761             stats1 = policer.get_stats(worker=1)
762
763             if pol_bind == 1:
764                 # First pass: Worker 1, should have done all the policing
765                 self.assertEqual(stats, stats1)
766
767                 # Worker 0, should have handed everything off
768                 self.assertEqual(stats0["conform_packets"], 0)
769                 self.assertEqual(stats0["exceed_packets"], 0)
770                 self.assertEqual(stats0["violate_packets"], 0)
771             else:
772                 # Second pass: both workers should have policed equal amounts
773                 self.assertGreater(stats1["conform_packets"], 0)
774                 self.assertEqual(stats1["exceed_packets"], 0)
775                 self.assertGreater(stats1["violate_packets"], 0)
776
777                 self.assertGreater(stats0["conform_packets"], 0)
778                 self.assertEqual(stats0["exceed_packets"], 0)
779                 self.assertGreater(stats0["violate_packets"], 0)
780
781                 self.assertEqual(
782                     stats0["conform_packets"] + stats0["violate_packets"],
783                     stats1["conform_packets"] + stats1["violate_packets"],
784                 )
785
786         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
787         policer.remove_vpp_config()
788
789
790 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
791     """Ipsec ESP 4 Handoff tests"""
792
793     tun4_encrypt_node_name = "esp4-encrypt-tun"
794     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
795
796     def test_tun_handoff_44_police(self):
797         """ESP 4o4 tunnel with policer worker hand-off test"""
798         self.vapi.cli("clear errors")
799         self.vapi.cli("clear ipsec sa")
800
801         N_PKTS = 15
802         p = self.params[socket.AF_INET]
803
804         action_tx = PolicerAction(
805             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
806         )
807         policer = VppPolicer(
808             self,
809             "pol1",
810             80,
811             0,
812             1000,
813             0,
814             conform_action=action_tx,
815             exceed_action=action_tx,
816             violate_action=action_tx,
817         )
818         policer.add_vpp_config()
819
820         # Start policing on tun
821         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
822
823         for pol_bind in [1, 0]:
824             policer.bind_vpp_config(pol_bind, True)
825
826             # inject alternately on worker 0 and 1.
827             for worker in [0, 1, 0, 1]:
828                 send_pkts = self.gen_encrypt_pkts(
829                     p,
830                     p.scapy_tun_sa,
831                     self.tun_if,
832                     src=p.remote_tun_if_host,
833                     dst=self.pg1.remote_ip4,
834                     count=N_PKTS,
835                 )
836                 recv_pkts = self.send_and_expect(
837                     self.tun_if, send_pkts, self.pg1, worker=worker
838                 )
839                 self.verify_decrypted(p, recv_pkts)
840                 self.logger.debug(self.vapi.cli("show trace max 100"))
841
842             stats = policer.get_stats()
843             stats0 = policer.get_stats(worker=0)
844             stats1 = policer.get_stats(worker=1)
845
846             if pol_bind == 1:
847                 # First pass: Worker 1, should have done all the policing
848                 self.assertEqual(stats, stats1)
849
850                 # Worker 0, should have handed everything off
851                 self.assertEqual(stats0["conform_packets"], 0)
852                 self.assertEqual(stats0["exceed_packets"], 0)
853                 self.assertEqual(stats0["violate_packets"], 0)
854             else:
855                 # Second pass: both workers should have policed equal amounts
856                 self.assertGreater(stats1["conform_packets"], 0)
857                 self.assertEqual(stats1["exceed_packets"], 0)
858                 self.assertGreater(stats1["violate_packets"], 0)
859
860                 self.assertGreater(stats0["conform_packets"], 0)
861                 self.assertEqual(stats0["exceed_packets"], 0)
862                 self.assertGreater(stats0["violate_packets"], 0)
863
864                 self.assertEqual(
865                     stats0["conform_packets"] + stats0["violate_packets"],
866                     stats1["conform_packets"] + stats1["violate_packets"],
867                 )
868
869         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
870         policer.remove_vpp_config()
871
872
873 @tag_fixme_vpp_workers
874 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
875     """IPsec IPv4 Multi Tunnel interface"""
876
877     encryption_type = ESP
878     tun4_encrypt_node_name = "esp4-encrypt-tun"
879     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
880
881     def setUp(self):
882         super(TestIpsec4MultiTunIfEsp, self).setUp()
883
884         self.tun_if = self.pg0
885
886         self.multi_params = []
887         self.pg0.generate_remote_hosts(10)
888         self.pg0.configure_ipv4_neighbors()
889
890         for ii in range(10):
891             p = copy.copy(self.ipv4_params)
892
893             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
894             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
895             p.scapy_tun_spi = p.scapy_tun_spi + ii
896             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
897             p.vpp_tun_spi = p.vpp_tun_spi + ii
898
899             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
900             p.scapy_tra_spi = p.scapy_tra_spi + ii
901             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
902             p.vpp_tra_spi = p.vpp_tra_spi + ii
903             p.tun_dst = self.pg0.remote_hosts[ii].ip4
904
905             self.multi_params.append(p)
906             self.config_network(p)
907             self.config_sa_tra(p)
908             self.config_protect(p)
909
910     def tearDown(self):
911         super(TestIpsec4MultiTunIfEsp, self).tearDown()
912
913     def test_tun_44(self):
914         """Multiple IPSEC tunnel interfaces"""
915         for p in self.multi_params:
916             self.verify_tun_44(p, count=127)
917             self.assertEqual(p.tun_if.get_rx_stats(), 127)
918             self.assertEqual(p.tun_if.get_tx_stats(), 127)
919
920     def test_tun_rr_44(self):
921         """Round-robin packets acrros multiple interface"""
922         tx = []
923         for p in self.multi_params:
924             tx = tx + self.gen_encrypt_pkts(
925                 p,
926                 p.scapy_tun_sa,
927                 self.tun_if,
928                 src=p.remote_tun_if_host,
929                 dst=self.pg1.remote_ip4,
930             )
931         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
932
933         for rx, p in zip(rxs, self.multi_params):
934             self.verify_decrypted(p, [rx])
935
936         tx = []
937         for p in self.multi_params:
938             tx = tx + self.gen_pkts(
939                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
940             )
941         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
942
943         for rx, p in zip(rxs, self.multi_params):
944             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
945
946
947 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
948     """IPsec IPv4 Tunnel interface all Algos"""
949
950     encryption_type = ESP
951     tun4_encrypt_node_name = "esp4-encrypt-tun"
952     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
953
954     def setUp(self):
955         super(TestIpsec4TunIfEspAll, self).setUp()
956
957         self.tun_if = self.pg0
958         p = self.ipv4_params
959
960         self.config_network(p)
961         self.config_sa_tra(p)
962         self.config_protect(p)
963
964     def tearDown(self):
965         p = self.ipv4_params
966         self.unconfig_protect(p)
967         self.unconfig_network(p)
968         self.unconfig_sa(p)
969
970         super(TestIpsec4TunIfEspAll, self).tearDown()
971
972     def rekey(self, p):
973         #
974         # change the key and the SPI
975         #
976         np = copy.copy(p)
977         p.crypt_key = b"X" + p.crypt_key[1:]
978         p.scapy_tun_spi += 1
979         p.scapy_tun_sa_id += 1
980         p.vpp_tun_spi += 1
981         p.vpp_tun_sa_id += 1
982         p.tun_if.local_spi = p.vpp_tun_spi
983         p.tun_if.remote_spi = p.scapy_tun_spi
984
985         config_tun_params(p, self.encryption_type, p.tun_if)
986
987         p.tun_sa_out = VppIpsecSA(
988             self,
989             p.scapy_tun_sa_id,
990             p.scapy_tun_spi,
991             p.auth_algo_vpp_id,
992             p.auth_key,
993             p.crypt_algo_vpp_id,
994             p.crypt_key,
995             self.vpp_esp_protocol,
996             flags=p.flags,
997             salt=p.salt,
998         )
999         p.tun_sa_in = VppIpsecSA(
1000             self,
1001             p.vpp_tun_sa_id,
1002             p.vpp_tun_spi,
1003             p.auth_algo_vpp_id,
1004             p.auth_key,
1005             p.crypt_algo_vpp_id,
1006             p.crypt_key,
1007             self.vpp_esp_protocol,
1008             flags=p.flags,
1009             salt=p.salt,
1010         )
1011         p.tun_sa_in.add_vpp_config()
1012         p.tun_sa_out.add_vpp_config()
1013
1014         self.config_protect(p)
1015         np.tun_sa_out.remove_vpp_config()
1016         np.tun_sa_in.remove_vpp_config()
1017         self.logger.info(self.vapi.cli("sh ipsec sa"))
1018
1019     def test_tun_44(self):
1020         """IPSEC tunnel all algos"""
1021
1022         # foreach VPP crypto engine
1023         engines = ["ia32", "ipsecmb", "openssl"]
1024
1025         # foreach crypto algorithm
1026         algos = [
1027             {
1028                 "vpp-crypto": (
1029                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1030                 ),
1031                 "vpp-integ": (
1032                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1033                 ),
1034                 "scapy-crypto": "AES-GCM",
1035                 "scapy-integ": "NULL",
1036                 "key": b"JPjyOWBeVEQiMe7h",
1037                 "salt": 3333,
1038             },
1039             {
1040                 "vpp-crypto": (
1041                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1042                 ),
1043                 "vpp-integ": (
1044                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1045                 ),
1046                 "scapy-crypto": "AES-GCM",
1047                 "scapy-integ": "NULL",
1048                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1049                 "salt": 0,
1050             },
1051             {
1052                 "vpp-crypto": (
1053                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1054                 ),
1055                 "vpp-integ": (
1056                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1057                 ),
1058                 "scapy-crypto": "AES-GCM",
1059                 "scapy-integ": "NULL",
1060                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1061                 "salt": 9999,
1062             },
1063             {
1064                 "vpp-crypto": (
1065                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1066                 ),
1067                 "vpp-integ": (
1068                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1069                 ),
1070                 "scapy-crypto": "AES-CBC",
1071                 "scapy-integ": "HMAC-SHA1-96",
1072                 "salt": 0,
1073                 "key": b"JPjyOWBeVEQiMe7h",
1074             },
1075             {
1076                 "vpp-crypto": (
1077                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1078                 ),
1079                 "vpp-integ": (
1080                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1081                 ),
1082                 "scapy-crypto": "AES-CBC",
1083                 "scapy-integ": "SHA2-512-256",
1084                 "salt": 0,
1085                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1086             },
1087             {
1088                 "vpp-crypto": (
1089                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1090                 ),
1091                 "vpp-integ": (
1092                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1093                 ),
1094                 "scapy-crypto": "AES-CBC",
1095                 "scapy-integ": "SHA2-256-128",
1096                 "salt": 0,
1097                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1098             },
1099             {
1100                 "vpp-crypto": (
1101                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1102                 ),
1103                 "vpp-integ": (
1104                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1105                 ),
1106                 "scapy-crypto": "NULL",
1107                 "scapy-integ": "HMAC-SHA1-96",
1108                 "salt": 0,
1109                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1110             },
1111         ]
1112
1113         for engine in engines:
1114             self.vapi.cli("set crypto handler all %s" % engine)
1115
1116             #
1117             # loop through each of the algorithms
1118             #
1119             for algo in algos:
1120                 # with self.subTest(algo=algo['scapy']):
1121
1122                 p = self.ipv4_params
1123                 p.auth_algo_vpp_id = algo["vpp-integ"]
1124                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1125                 p.crypt_algo = algo["scapy-crypto"]
1126                 p.auth_algo = algo["scapy-integ"]
1127                 p.crypt_key = algo["key"]
1128                 p.salt = algo["salt"]
1129
1130                 #
1131                 # rekey the tunnel
1132                 #
1133                 self.rekey(p)
1134                 self.verify_tun_44(p, count=127)
1135
1136
1137 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1138     """IPsec IPv4 Tunnel interface no Algos"""
1139
1140     encryption_type = ESP
1141     tun4_encrypt_node_name = "esp4-encrypt-tun"
1142     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1143
1144     def setUp(self):
1145         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1146
1147         self.tun_if = self.pg0
1148         p = self.ipv4_params
1149         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1150         p.auth_algo = "NULL"
1151         p.auth_key = []
1152
1153         p.crypt_algo_vpp_id = (
1154             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1155         )
1156         p.crypt_algo = "NULL"
1157         p.crypt_key = []
1158
1159     def tearDown(self):
1160         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1161
1162     def test_tun_44(self):
1163         """IPSec SA with NULL algos"""
1164         p = self.ipv4_params
1165
1166         self.config_network(p)
1167         self.config_sa_tra(p)
1168         self.config_protect(p)
1169
1170         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1171         self.send_and_assert_no_replies(self.pg1, tx)
1172
1173         self.unconfig_protect(p)
1174         self.unconfig_sa(p)
1175         self.unconfig_network(p)
1176
1177
1178 @tag_fixme_vpp_workers
1179 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1180     """IPsec IPv6 Multi Tunnel interface"""
1181
1182     encryption_type = ESP
1183     tun6_encrypt_node_name = "esp6-encrypt-tun"
1184     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1185
1186     def setUp(self):
1187         super(TestIpsec6MultiTunIfEsp, self).setUp()
1188
1189         self.tun_if = self.pg0
1190
1191         self.multi_params = []
1192         self.pg0.generate_remote_hosts(10)
1193         self.pg0.configure_ipv6_neighbors()
1194
1195         for ii in range(10):
1196             p = copy.copy(self.ipv6_params)
1197
1198             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1199             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1200             p.scapy_tun_spi = p.scapy_tun_spi + ii
1201             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1202             p.vpp_tun_spi = p.vpp_tun_spi + ii
1203
1204             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1205             p.scapy_tra_spi = p.scapy_tra_spi + ii
1206             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1207             p.vpp_tra_spi = p.vpp_tra_spi + ii
1208             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1209
1210             self.multi_params.append(p)
1211             self.config_network(p)
1212             self.config_sa_tra(p)
1213             self.config_protect(p)
1214
1215     def tearDown(self):
1216         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1217
1218     def test_tun_66(self):
1219         """Multiple IPSEC tunnel interfaces"""
1220         for p in self.multi_params:
1221             self.verify_tun_66(p, count=127)
1222             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1223             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1224
1225
1226 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1227     """Ipsec GRE TEB ESP - TUN tests"""
1228
1229     tun4_encrypt_node_name = "esp4-encrypt-tun"
1230     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1231     encryption_type = ESP
1232     omac = "00:11:22:33:44:55"
1233
1234     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1235         return [
1236             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1237             / sa.encrypt(
1238                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1239                 / GRE()
1240                 / Ether(dst=self.omac)
1241                 / IP(src="1.1.1.1", dst="1.1.1.2")
1242                 / UDP(sport=1144, dport=2233)
1243                 / Raw(b"X" * payload_size)
1244             )
1245             for i in range(count)
1246         ]
1247
1248     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1249         return [
1250             Ether(dst=self.omac)
1251             / IP(src="1.1.1.1", dst="1.1.1.2")
1252             / UDP(sport=1144, dport=2233)
1253             / Raw(b"X" * payload_size)
1254             for i in range(count)
1255         ]
1256
1257     def verify_decrypted(self, p, rxs):
1258         for rx in rxs:
1259             self.assert_equal(rx[Ether].dst, self.omac)
1260             self.assert_equal(rx[IP].dst, "1.1.1.2")
1261
1262     def verify_encrypted(self, p, sa, rxs):
1263         for rx in rxs:
1264             try:
1265                 pkt = sa.decrypt(rx[IP])
1266                 if not pkt.haslayer(IP):
1267                     pkt = IP(pkt[Raw].load)
1268                 self.assert_packet_checksums_valid(pkt)
1269                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1270                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1271                 self.assertTrue(pkt.haslayer(GRE))
1272                 e = pkt[Ether]
1273                 self.assertEqual(e[Ether].dst, self.omac)
1274                 self.assertEqual(e[IP].dst, "1.1.1.2")
1275             except (IndexError, AssertionError):
1276                 self.logger.debug(ppp("Unexpected packet:", rx))
1277                 try:
1278                     self.logger.debug(ppp("Decrypted packet:", pkt))
1279                 except:
1280                     pass
1281                 raise
1282
1283     def setUp(self):
1284         super(TestIpsecGreTebIfEsp, self).setUp()
1285
1286         self.tun_if = self.pg0
1287
1288         p = self.ipv4_params
1289
1290         bd1 = VppBridgeDomain(self, 1)
1291         bd1.add_vpp_config()
1292
1293         p.tun_sa_out = VppIpsecSA(
1294             self,
1295             p.scapy_tun_sa_id,
1296             p.scapy_tun_spi,
1297             p.auth_algo_vpp_id,
1298             p.auth_key,
1299             p.crypt_algo_vpp_id,
1300             p.crypt_key,
1301             self.vpp_esp_protocol,
1302             self.pg0.local_ip4,
1303             self.pg0.remote_ip4,
1304         )
1305         p.tun_sa_out.add_vpp_config()
1306
1307         p.tun_sa_in = VppIpsecSA(
1308             self,
1309             p.vpp_tun_sa_id,
1310             p.vpp_tun_spi,
1311             p.auth_algo_vpp_id,
1312             p.auth_key,
1313             p.crypt_algo_vpp_id,
1314             p.crypt_key,
1315             self.vpp_esp_protocol,
1316             self.pg0.remote_ip4,
1317             self.pg0.local_ip4,
1318         )
1319         p.tun_sa_in.add_vpp_config()
1320
1321         p.tun_if = VppGreInterface(
1322             self,
1323             self.pg0.local_ip4,
1324             self.pg0.remote_ip4,
1325             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1326         )
1327         p.tun_if.add_vpp_config()
1328
1329         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1330
1331         p.tun_protect.add_vpp_config()
1332
1333         p.tun_if.admin_up()
1334         p.tun_if.config_ip4()
1335         config_tun_params(p, self.encryption_type, p.tun_if)
1336
1337         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1338         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1339
1340         self.vapi.cli("clear ipsec sa")
1341         self.vapi.cli("sh adj")
1342         self.vapi.cli("sh ipsec tun")
1343
1344     def tearDown(self):
1345         p = self.ipv4_params
1346         p.tun_if.unconfig_ip4()
1347         super(TestIpsecGreTebIfEsp, self).tearDown()
1348
1349
1350 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1351     """Ipsec GRE TEB ESP - TUN tests"""
1352
1353     tun4_encrypt_node_name = "esp4-encrypt-tun"
1354     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1355     encryption_type = ESP
1356     omac = "00:11:22:33:44:55"
1357
1358     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1359         return [
1360             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1361             / sa.encrypt(
1362                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1363                 / GRE()
1364                 / Ether(dst=self.omac)
1365                 / IP(src="1.1.1.1", dst="1.1.1.2")
1366                 / UDP(sport=1144, dport=2233)
1367                 / Raw(b"X" * payload_size)
1368             )
1369             for i in range(count)
1370         ]
1371
1372     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1373         return [
1374             Ether(dst=self.omac)
1375             / Dot1Q(vlan=11)
1376             / IP(src="1.1.1.1", dst="1.1.1.2")
1377             / UDP(sport=1144, dport=2233)
1378             / Raw(b"X" * payload_size)
1379             for i in range(count)
1380         ]
1381
1382     def verify_decrypted(self, p, rxs):
1383         for rx in rxs:
1384             self.assert_equal(rx[Ether].dst, self.omac)
1385             self.assert_equal(rx[Dot1Q].vlan, 11)
1386             self.assert_equal(rx[IP].dst, "1.1.1.2")
1387
1388     def verify_encrypted(self, p, sa, rxs):
1389         for rx in rxs:
1390             try:
1391                 pkt = sa.decrypt(rx[IP])
1392                 if not pkt.haslayer(IP):
1393                     pkt = IP(pkt[Raw].load)
1394                 self.assert_packet_checksums_valid(pkt)
1395                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1396                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1397                 self.assertTrue(pkt.haslayer(GRE))
1398                 e = pkt[Ether]
1399                 self.assertEqual(e[Ether].dst, self.omac)
1400                 self.assertFalse(e.haslayer(Dot1Q))
1401                 self.assertEqual(e[IP].dst, "1.1.1.2")
1402             except (IndexError, AssertionError):
1403                 self.logger.debug(ppp("Unexpected packet:", rx))
1404                 try:
1405                     self.logger.debug(ppp("Decrypted packet:", pkt))
1406                 except:
1407                     pass
1408                 raise
1409
1410     def setUp(self):
1411         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1412
1413         self.tun_if = self.pg0
1414
1415         p = self.ipv4_params
1416
1417         bd1 = VppBridgeDomain(self, 1)
1418         bd1.add_vpp_config()
1419
1420         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1421         self.vapi.l2_interface_vlan_tag_rewrite(
1422             sw_if_index=self.pg1_11.sw_if_index,
1423             vtr_op=L2_VTR_OP.L2_POP_1,
1424             push_dot1q=11,
1425         )
1426         self.pg1_11.admin_up()
1427
1428         p.tun_sa_out = VppIpsecSA(
1429             self,
1430             p.scapy_tun_sa_id,
1431             p.scapy_tun_spi,
1432             p.auth_algo_vpp_id,
1433             p.auth_key,
1434             p.crypt_algo_vpp_id,
1435             p.crypt_key,
1436             self.vpp_esp_protocol,
1437             self.pg0.local_ip4,
1438             self.pg0.remote_ip4,
1439         )
1440         p.tun_sa_out.add_vpp_config()
1441
1442         p.tun_sa_in = VppIpsecSA(
1443             self,
1444             p.vpp_tun_sa_id,
1445             p.vpp_tun_spi,
1446             p.auth_algo_vpp_id,
1447             p.auth_key,
1448             p.crypt_algo_vpp_id,
1449             p.crypt_key,
1450             self.vpp_esp_protocol,
1451             self.pg0.remote_ip4,
1452             self.pg0.local_ip4,
1453         )
1454         p.tun_sa_in.add_vpp_config()
1455
1456         p.tun_if = VppGreInterface(
1457             self,
1458             self.pg0.local_ip4,
1459             self.pg0.remote_ip4,
1460             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1461         )
1462         p.tun_if.add_vpp_config()
1463
1464         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1465
1466         p.tun_protect.add_vpp_config()
1467
1468         p.tun_if.admin_up()
1469         p.tun_if.config_ip4()
1470         config_tun_params(p, self.encryption_type, p.tun_if)
1471
1472         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1473         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1474
1475         self.vapi.cli("clear ipsec sa")
1476
1477     def tearDown(self):
1478         p = self.ipv4_params
1479         p.tun_if.unconfig_ip4()
1480         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1481         self.pg1_11.admin_down()
1482         self.pg1_11.remove_vpp_config()
1483
1484
1485 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1486     """Ipsec GRE TEB ESP - Tra tests"""
1487
1488     tun4_encrypt_node_name = "esp4-encrypt-tun"
1489     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1490     encryption_type = ESP
1491     omac = "00:11:22:33:44:55"
1492
1493     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1494         return [
1495             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1496             / sa.encrypt(
1497                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1498                 / GRE()
1499                 / Ether(dst=self.omac)
1500                 / IP(src="1.1.1.1", dst="1.1.1.2")
1501                 / UDP(sport=1144, dport=2233)
1502                 / Raw(b"X" * payload_size)
1503             )
1504             for i in range(count)
1505         ]
1506
1507     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1508         return [
1509             Ether(dst=self.omac)
1510             / IP(src="1.1.1.1", dst="1.1.1.2")
1511             / UDP(sport=1144, dport=2233)
1512             / Raw(b"X" * payload_size)
1513             for i in range(count)
1514         ]
1515
1516     def verify_decrypted(self, p, rxs):
1517         for rx in rxs:
1518             self.assert_equal(rx[Ether].dst, self.omac)
1519             self.assert_equal(rx[IP].dst, "1.1.1.2")
1520
1521     def verify_encrypted(self, p, sa, rxs):
1522         for rx in rxs:
1523             try:
1524                 pkt = sa.decrypt(rx[IP])
1525                 if not pkt.haslayer(IP):
1526                     pkt = IP(pkt[Raw].load)
1527                 self.assert_packet_checksums_valid(pkt)
1528                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1529                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1530                 self.assertTrue(pkt.haslayer(GRE))
1531                 e = pkt[Ether]
1532                 self.assertEqual(e[Ether].dst, self.omac)
1533                 self.assertEqual(e[IP].dst, "1.1.1.2")
1534             except (IndexError, AssertionError):
1535                 self.logger.debug(ppp("Unexpected packet:", rx))
1536                 try:
1537                     self.logger.debug(ppp("Decrypted packet:", pkt))
1538                 except:
1539                     pass
1540                 raise
1541
1542     def setUp(self):
1543         super(TestIpsecGreTebIfEspTra, self).setUp()
1544
1545         self.tun_if = self.pg0
1546
1547         p = self.ipv4_params
1548
1549         bd1 = VppBridgeDomain(self, 1)
1550         bd1.add_vpp_config()
1551
1552         p.tun_sa_out = VppIpsecSA(
1553             self,
1554             p.scapy_tun_sa_id,
1555             p.scapy_tun_spi,
1556             p.auth_algo_vpp_id,
1557             p.auth_key,
1558             p.crypt_algo_vpp_id,
1559             p.crypt_key,
1560             self.vpp_esp_protocol,
1561         )
1562         p.tun_sa_out.add_vpp_config()
1563
1564         p.tun_sa_in = VppIpsecSA(
1565             self,
1566             p.vpp_tun_sa_id,
1567             p.vpp_tun_spi,
1568             p.auth_algo_vpp_id,
1569             p.auth_key,
1570             p.crypt_algo_vpp_id,
1571             p.crypt_key,
1572             self.vpp_esp_protocol,
1573         )
1574         p.tun_sa_in.add_vpp_config()
1575
1576         p.tun_if = VppGreInterface(
1577             self,
1578             self.pg0.local_ip4,
1579             self.pg0.remote_ip4,
1580             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1581         )
1582         p.tun_if.add_vpp_config()
1583
1584         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1585
1586         p.tun_protect.add_vpp_config()
1587
1588         p.tun_if.admin_up()
1589         p.tun_if.config_ip4()
1590         config_tra_params(p, self.encryption_type, p.tun_if)
1591
1592         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1593         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1594
1595         self.vapi.cli("clear ipsec sa")
1596
1597     def tearDown(self):
1598         p = self.ipv4_params
1599         p.tun_if.unconfig_ip4()
1600         super(TestIpsecGreTebIfEspTra, self).tearDown()
1601
1602
1603 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1604     """Ipsec GRE TEB UDP ESP - Tra tests"""
1605
1606     tun4_encrypt_node_name = "esp4-encrypt-tun"
1607     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1608     encryption_type = ESP
1609     omac = "00:11:22:33:44:55"
1610
1611     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1612         return [
1613             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1614             / sa.encrypt(
1615                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1616                 / GRE()
1617                 / Ether(dst=self.omac)
1618                 / IP(src="1.1.1.1", dst="1.1.1.2")
1619                 / UDP(sport=1144, dport=2233)
1620                 / Raw(b"X" * payload_size)
1621             )
1622             for i in range(count)
1623         ]
1624
1625     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1626         return [
1627             Ether(dst=self.omac)
1628             / IP(src="1.1.1.1", dst="1.1.1.2")
1629             / UDP(sport=1144, dport=2233)
1630             / Raw(b"X" * payload_size)
1631             for i in range(count)
1632         ]
1633
1634     def verify_decrypted(self, p, rxs):
1635         for rx in rxs:
1636             self.assert_equal(rx[Ether].dst, self.omac)
1637             self.assert_equal(rx[IP].dst, "1.1.1.2")
1638
1639     def verify_encrypted(self, p, sa, rxs):
1640         for rx in rxs:
1641             self.assertTrue(rx.haslayer(UDP))
1642             self.assertEqual(rx[UDP].dport, 4545)
1643             self.assertEqual(rx[UDP].sport, 5454)
1644             try:
1645                 pkt = sa.decrypt(rx[IP])
1646                 if not pkt.haslayer(IP):
1647                     pkt = IP(pkt[Raw].load)
1648                 self.assert_packet_checksums_valid(pkt)
1649                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1650                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1651                 self.assertTrue(pkt.haslayer(GRE))
1652                 e = pkt[Ether]
1653                 self.assertEqual(e[Ether].dst, self.omac)
1654                 self.assertEqual(e[IP].dst, "1.1.1.2")
1655             except (IndexError, AssertionError):
1656                 self.logger.debug(ppp("Unexpected packet:", rx))
1657                 try:
1658                     self.logger.debug(ppp("Decrypted packet:", pkt))
1659                 except:
1660                     pass
1661                 raise
1662
1663     def setUp(self):
1664         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1665
1666         self.tun_if = self.pg0
1667
1668         p = self.ipv4_params
1669         p = self.ipv4_params
1670         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1671         p.nat_header = UDP(sport=5454, dport=4545)
1672
1673         bd1 = VppBridgeDomain(self, 1)
1674         bd1.add_vpp_config()
1675
1676         p.tun_sa_out = VppIpsecSA(
1677             self,
1678             p.scapy_tun_sa_id,
1679             p.scapy_tun_spi,
1680             p.auth_algo_vpp_id,
1681             p.auth_key,
1682             p.crypt_algo_vpp_id,
1683             p.crypt_key,
1684             self.vpp_esp_protocol,
1685             flags=p.flags,
1686             udp_src=5454,
1687             udp_dst=4545,
1688         )
1689         p.tun_sa_out.add_vpp_config()
1690
1691         p.tun_sa_in = VppIpsecSA(
1692             self,
1693             p.vpp_tun_sa_id,
1694             p.vpp_tun_spi,
1695             p.auth_algo_vpp_id,
1696             p.auth_key,
1697             p.crypt_algo_vpp_id,
1698             p.crypt_key,
1699             self.vpp_esp_protocol,
1700             flags=(
1701                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1702             ),
1703             udp_src=4545,
1704             udp_dst=5454,
1705         )
1706         p.tun_sa_in.add_vpp_config()
1707
1708         p.tun_if = VppGreInterface(
1709             self,
1710             self.pg0.local_ip4,
1711             self.pg0.remote_ip4,
1712             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1713         )
1714         p.tun_if.add_vpp_config()
1715
1716         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1717
1718         p.tun_protect.add_vpp_config()
1719
1720         p.tun_if.admin_up()
1721         p.tun_if.config_ip4()
1722         config_tra_params(p, self.encryption_type, p.tun_if)
1723
1724         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1725         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1726
1727         self.vapi.cli("clear ipsec sa")
1728         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1729
1730     def tearDown(self):
1731         p = self.ipv4_params
1732         p.tun_if.unconfig_ip4()
1733         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1734
1735
1736 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1737     """Ipsec GRE ESP - TUN tests"""
1738
1739     tun4_encrypt_node_name = "esp4-encrypt-tun"
1740     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1741     encryption_type = ESP
1742
1743     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1744         return [
1745             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1746             / sa.encrypt(
1747                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1748                 / GRE()
1749                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1750                 / UDP(sport=1144, dport=2233)
1751                 / Raw(b"X" * payload_size)
1752             )
1753             for i in range(count)
1754         ]
1755
1756     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1757         return [
1758             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1759             / IP(src="1.1.1.1", dst="1.1.1.2")
1760             / UDP(sport=1144, dport=2233)
1761             / Raw(b"X" * payload_size)
1762             for i in range(count)
1763         ]
1764
1765     def verify_decrypted(self, p, rxs):
1766         for rx in rxs:
1767             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1768             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1769
1770     def verify_encrypted(self, p, sa, rxs):
1771         for rx in rxs:
1772             try:
1773                 pkt = sa.decrypt(rx[IP])
1774                 if not pkt.haslayer(IP):
1775                     pkt = IP(pkt[Raw].load)
1776                 self.assert_packet_checksums_valid(pkt)
1777                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1778                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1779                 self.assertTrue(pkt.haslayer(GRE))
1780                 e = pkt[GRE]
1781                 self.assertEqual(e[IP].dst, "1.1.1.2")
1782             except (IndexError, AssertionError):
1783                 self.logger.debug(ppp("Unexpected packet:", rx))
1784                 try:
1785                     self.logger.debug(ppp("Decrypted packet:", pkt))
1786                 except:
1787                     pass
1788                 raise
1789
1790     def setUp(self):
1791         super(TestIpsecGreIfEsp, self).setUp()
1792
1793         self.tun_if = self.pg0
1794
1795         p = self.ipv4_params
1796
1797         bd1 = VppBridgeDomain(self, 1)
1798         bd1.add_vpp_config()
1799
1800         p.tun_sa_out = VppIpsecSA(
1801             self,
1802             p.scapy_tun_sa_id,
1803             p.scapy_tun_spi,
1804             p.auth_algo_vpp_id,
1805             p.auth_key,
1806             p.crypt_algo_vpp_id,
1807             p.crypt_key,
1808             self.vpp_esp_protocol,
1809             self.pg0.local_ip4,
1810             self.pg0.remote_ip4,
1811         )
1812         p.tun_sa_out.add_vpp_config()
1813
1814         p.tun_sa_in = VppIpsecSA(
1815             self,
1816             p.vpp_tun_sa_id,
1817             p.vpp_tun_spi,
1818             p.auth_algo_vpp_id,
1819             p.auth_key,
1820             p.crypt_algo_vpp_id,
1821             p.crypt_key,
1822             self.vpp_esp_protocol,
1823             self.pg0.remote_ip4,
1824             self.pg0.local_ip4,
1825         )
1826         p.tun_sa_in.add_vpp_config()
1827
1828         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1829         p.tun_if.add_vpp_config()
1830
1831         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1832         p.tun_protect.add_vpp_config()
1833
1834         p.tun_if.admin_up()
1835         p.tun_if.config_ip4()
1836         config_tun_params(p, self.encryption_type, p.tun_if)
1837
1838         VppIpRoute(
1839             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1840         ).add_vpp_config()
1841
1842     def tearDown(self):
1843         p = self.ipv4_params
1844         p.tun_if.unconfig_ip4()
1845         super(TestIpsecGreIfEsp, self).tearDown()
1846
1847
1848 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1849     """Ipsec GRE ESP - TRA tests"""
1850
1851     tun4_encrypt_node_name = "esp4-encrypt-tun"
1852     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1853     encryption_type = ESP
1854
1855     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1856         return [
1857             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1858             / sa.encrypt(
1859                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1860                 / GRE()
1861                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1862                 / UDP(sport=1144, dport=2233)
1863                 / Raw(b"X" * payload_size)
1864             )
1865             for i in range(count)
1866         ]
1867
1868     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1869         return [
1870             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1871             / sa.encrypt(
1872                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1873                 / GRE()
1874                 / UDP(sport=1144, dport=2233)
1875                 / Raw(b"X" * payload_size)
1876             )
1877             for i in range(count)
1878         ]
1879
1880     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1881         return [
1882             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1883             / IP(src="1.1.1.1", dst="1.1.1.2")
1884             / UDP(sport=1144, dport=2233)
1885             / Raw(b"X" * payload_size)
1886             for i in range(count)
1887         ]
1888
1889     def verify_decrypted(self, p, rxs):
1890         for rx in rxs:
1891             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1892             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1893
1894     def verify_encrypted(self, p, sa, rxs):
1895         for rx in rxs:
1896             try:
1897                 pkt = sa.decrypt(rx[IP])
1898                 if not pkt.haslayer(IP):
1899                     pkt = IP(pkt[Raw].load)
1900                 self.assert_packet_checksums_valid(pkt)
1901                 self.assertTrue(pkt.haslayer(GRE))
1902                 e = pkt[GRE]
1903                 self.assertEqual(e[IP].dst, "1.1.1.2")
1904             except (IndexError, AssertionError):
1905                 self.logger.debug(ppp("Unexpected packet:", rx))
1906                 try:
1907                     self.logger.debug(ppp("Decrypted packet:", pkt))
1908                 except:
1909                     pass
1910                 raise
1911
1912     def setUp(self):
1913         super(TestIpsecGreIfEspTra, self).setUp()
1914
1915         self.tun_if = self.pg0
1916
1917         p = self.ipv4_params
1918
1919         p.tun_sa_out = VppIpsecSA(
1920             self,
1921             p.scapy_tun_sa_id,
1922             p.scapy_tun_spi,
1923             p.auth_algo_vpp_id,
1924             p.auth_key,
1925             p.crypt_algo_vpp_id,
1926             p.crypt_key,
1927             self.vpp_esp_protocol,
1928         )
1929         p.tun_sa_out.add_vpp_config()
1930
1931         p.tun_sa_in = VppIpsecSA(
1932             self,
1933             p.vpp_tun_sa_id,
1934             p.vpp_tun_spi,
1935             p.auth_algo_vpp_id,
1936             p.auth_key,
1937             p.crypt_algo_vpp_id,
1938             p.crypt_key,
1939             self.vpp_esp_protocol,
1940         )
1941         p.tun_sa_in.add_vpp_config()
1942
1943         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1944         p.tun_if.add_vpp_config()
1945
1946         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1947         p.tun_protect.add_vpp_config()
1948
1949         p.tun_if.admin_up()
1950         p.tun_if.config_ip4()
1951         config_tra_params(p, self.encryption_type, p.tun_if)
1952
1953         VppIpRoute(
1954             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1955         ).add_vpp_config()
1956
1957     def tearDown(self):
1958         p = self.ipv4_params
1959         p.tun_if.unconfig_ip4()
1960         super(TestIpsecGreIfEspTra, self).tearDown()
1961
1962     def test_gre_non_ip(self):
1963         p = self.ipv4_params
1964         tx = self.gen_encrypt_non_ip_pkts(
1965             p.scapy_tun_sa,
1966             self.tun_if,
1967             src=p.remote_tun_if_host,
1968             dst=self.pg1.remote_ip6,
1969         )
1970         self.send_and_assert_no_replies(self.tun_if, tx)
1971         node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1972         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1973
1974
1975 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1976     """Ipsec GRE ESP - TRA tests"""
1977
1978     tun6_encrypt_node_name = "esp6-encrypt-tun"
1979     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1980     encryption_type = ESP
1981
1982     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1983         return [
1984             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1985             / sa.encrypt(
1986                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
1987                 / GRE()
1988                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
1989                 / UDP(sport=1144, dport=2233)
1990                 / Raw(b"X" * payload_size)
1991             )
1992             for i in range(count)
1993         ]
1994
1995     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
1996         return [
1997             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1998             / IPv6(src="1::1", dst="1::2")
1999             / UDP(sport=1144, dport=2233)
2000             / Raw(b"X" * payload_size)
2001             for i in range(count)
2002         ]
2003
2004     def verify_decrypted6(self, p, rxs):
2005         for rx in rxs:
2006             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2007             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2008
2009     def verify_encrypted6(self, p, sa, rxs):
2010         for rx in rxs:
2011             try:
2012                 pkt = sa.decrypt(rx[IPv6])
2013                 if not pkt.haslayer(IPv6):
2014                     pkt = IPv6(pkt[Raw].load)
2015                 self.assert_packet_checksums_valid(pkt)
2016                 self.assertTrue(pkt.haslayer(GRE))
2017                 e = pkt[GRE]
2018                 self.assertEqual(e[IPv6].dst, "1::2")
2019             except (IndexError, AssertionError):
2020                 self.logger.debug(ppp("Unexpected packet:", rx))
2021                 try:
2022                     self.logger.debug(ppp("Decrypted packet:", pkt))
2023                 except:
2024                     pass
2025                 raise
2026
2027     def setUp(self):
2028         super(TestIpsecGre6IfEspTra, self).setUp()
2029
2030         self.tun_if = self.pg0
2031
2032         p = self.ipv6_params
2033
2034         bd1 = VppBridgeDomain(self, 1)
2035         bd1.add_vpp_config()
2036
2037         p.tun_sa_out = VppIpsecSA(
2038             self,
2039             p.scapy_tun_sa_id,
2040             p.scapy_tun_spi,
2041             p.auth_algo_vpp_id,
2042             p.auth_key,
2043             p.crypt_algo_vpp_id,
2044             p.crypt_key,
2045             self.vpp_esp_protocol,
2046         )
2047         p.tun_sa_out.add_vpp_config()
2048
2049         p.tun_sa_in = VppIpsecSA(
2050             self,
2051             p.vpp_tun_sa_id,
2052             p.vpp_tun_spi,
2053             p.auth_algo_vpp_id,
2054             p.auth_key,
2055             p.crypt_algo_vpp_id,
2056             p.crypt_key,
2057             self.vpp_esp_protocol,
2058         )
2059         p.tun_sa_in.add_vpp_config()
2060
2061         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2062         p.tun_if.add_vpp_config()
2063
2064         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2065         p.tun_protect.add_vpp_config()
2066
2067         p.tun_if.admin_up()
2068         p.tun_if.config_ip6()
2069         config_tra_params(p, self.encryption_type, p.tun_if)
2070
2071         r = VppIpRoute(
2072             self,
2073             "1::2",
2074             128,
2075             [
2076                 VppRoutePath(
2077                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2078                 )
2079             ],
2080         )
2081         r.add_vpp_config()
2082
2083     def tearDown(self):
2084         p = self.ipv6_params
2085         p.tun_if.unconfig_ip6()
2086         super(TestIpsecGre6IfEspTra, self).tearDown()
2087
2088
2089 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2090     """Ipsec mGRE ESP v4 TRA tests"""
2091
2092     tun4_encrypt_node_name = "esp4-encrypt-tun"
2093     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2094     encryption_type = ESP
2095
2096     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2097         return [
2098             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2099             / sa.encrypt(
2100                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2101                 / GRE()
2102                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2103                 / UDP(sport=1144, dport=2233)
2104                 / Raw(b"X" * payload_size)
2105             )
2106             for i in range(count)
2107         ]
2108
2109     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2110         return [
2111             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2112             / IP(src="1.1.1.1", dst=dst)
2113             / UDP(sport=1144, dport=2233)
2114             / Raw(b"X" * payload_size)
2115             for i in range(count)
2116         ]
2117
2118     def verify_decrypted(self, p, rxs):
2119         for rx in rxs:
2120             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2121             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2122
2123     def verify_encrypted(self, p, sa, rxs):
2124         for rx in rxs:
2125             try:
2126                 pkt = sa.decrypt(rx[IP])
2127                 if not pkt.haslayer(IP):
2128                     pkt = IP(pkt[Raw].load)
2129                 self.assert_packet_checksums_valid(pkt)
2130                 self.assertTrue(pkt.haslayer(GRE))
2131                 e = pkt[GRE]
2132                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2133             except (IndexError, AssertionError):
2134                 self.logger.debug(ppp("Unexpected packet:", rx))
2135                 try:
2136                     self.logger.debug(ppp("Decrypted packet:", pkt))
2137                 except:
2138                     pass
2139                 raise
2140
2141     def setUp(self):
2142         super(TestIpsecMGreIfEspTra4, self).setUp()
2143
2144         N_NHS = 16
2145         self.tun_if = self.pg0
2146         p = self.ipv4_params
2147         p.tun_if = VppGreInterface(
2148             self,
2149             self.pg0.local_ip4,
2150             "0.0.0.0",
2151             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2152         )
2153         p.tun_if.add_vpp_config()
2154         p.tun_if.admin_up()
2155         p.tun_if.config_ip4()
2156         p.tun_if.generate_remote_hosts(N_NHS)
2157         self.pg0.generate_remote_hosts(N_NHS)
2158         self.pg0.configure_ipv4_neighbors()
2159
2160         # setup some SAs for several next-hops on the interface
2161         self.multi_params = []
2162
2163         for ii in range(N_NHS):
2164             p = copy.copy(self.ipv4_params)
2165
2166             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2167             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2168             p.scapy_tun_spi = p.scapy_tun_spi + ii
2169             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2170             p.vpp_tun_spi = p.vpp_tun_spi + ii
2171
2172             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2173             p.scapy_tra_spi = p.scapy_tra_spi + ii
2174             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2175             p.vpp_tra_spi = p.vpp_tra_spi + ii
2176             p.tun_sa_out = VppIpsecSA(
2177                 self,
2178                 p.scapy_tun_sa_id,
2179                 p.scapy_tun_spi,
2180                 p.auth_algo_vpp_id,
2181                 p.auth_key,
2182                 p.crypt_algo_vpp_id,
2183                 p.crypt_key,
2184                 self.vpp_esp_protocol,
2185             )
2186             p.tun_sa_out.add_vpp_config()
2187
2188             p.tun_sa_in = VppIpsecSA(
2189                 self,
2190                 p.vpp_tun_sa_id,
2191                 p.vpp_tun_spi,
2192                 p.auth_algo_vpp_id,
2193                 p.auth_key,
2194                 p.crypt_algo_vpp_id,
2195                 p.crypt_key,
2196                 self.vpp_esp_protocol,
2197             )
2198             p.tun_sa_in.add_vpp_config()
2199
2200             p.tun_protect = VppIpsecTunProtect(
2201                 self,
2202                 p.tun_if,
2203                 p.tun_sa_out,
2204                 [p.tun_sa_in],
2205                 nh=p.tun_if.remote_hosts[ii].ip4,
2206             )
2207             p.tun_protect.add_vpp_config()
2208             config_tra_params(p, self.encryption_type, p.tun_if)
2209             self.multi_params.append(p)
2210
2211             VppIpRoute(
2212                 self,
2213                 p.remote_tun_if_host,
2214                 32,
2215                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2216             ).add_vpp_config()
2217
2218             # in this v4 variant add the teibs after the protect
2219             p.teib = VppTeib(
2220                 self,
2221                 p.tun_if,
2222                 p.tun_if.remote_hosts[ii].ip4,
2223                 self.pg0.remote_hosts[ii].ip4,
2224             ).add_vpp_config()
2225             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2226         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2227
2228     def tearDown(self):
2229         p = self.ipv4_params
2230         p.tun_if.unconfig_ip4()
2231         super(TestIpsecMGreIfEspTra4, self).tearDown()
2232
2233     def test_tun_44(self):
2234         """mGRE IPSEC 44"""
2235         N_PKTS = 63
2236         for p in self.multi_params:
2237             self.verify_tun_44(p, count=N_PKTS)
2238             p.teib.remove_vpp_config()
2239             self.verify_tun_dropped_44(p, count=N_PKTS)
2240             p.teib.add_vpp_config()
2241             self.verify_tun_44(p, count=N_PKTS)
2242
2243
2244 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2245     """Ipsec mGRE ESP v6 TRA tests"""
2246
2247     tun6_encrypt_node_name = "esp6-encrypt-tun"
2248     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2249     encryption_type = ESP
2250
2251     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2252         return [
2253             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2254             / sa.encrypt(
2255                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2256                 / GRE()
2257                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2258                 / UDP(sport=1144, dport=2233)
2259                 / Raw(b"X" * payload_size)
2260             )
2261             for i in range(count)
2262         ]
2263
2264     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2265         return [
2266             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2267             / IPv6(src="1::1", dst=dst)
2268             / UDP(sport=1144, dport=2233)
2269             / Raw(b"X" * payload_size)
2270             for i in range(count)
2271         ]
2272
2273     def verify_decrypted6(self, p, rxs):
2274         for rx in rxs:
2275             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2276             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2277
2278     def verify_encrypted6(self, p, sa, rxs):
2279         for rx in rxs:
2280             try:
2281                 pkt = sa.decrypt(rx[IPv6])
2282                 if not pkt.haslayer(IPv6):
2283                     pkt = IPv6(pkt[Raw].load)
2284                 self.assert_packet_checksums_valid(pkt)
2285                 self.assertTrue(pkt.haslayer(GRE))
2286                 e = pkt[GRE]
2287                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2288             except (IndexError, AssertionError):
2289                 self.logger.debug(ppp("Unexpected packet:", rx))
2290                 try:
2291                     self.logger.debug(ppp("Decrypted packet:", pkt))
2292                 except:
2293                     pass
2294                 raise
2295
2296     def setUp(self):
2297         super(TestIpsecMGreIfEspTra6, self).setUp()
2298
2299         self.vapi.cli("set logging class ipsec level debug")
2300
2301         N_NHS = 16
2302         self.tun_if = self.pg0
2303         p = self.ipv6_params
2304         p.tun_if = VppGreInterface(
2305             self,
2306             self.pg0.local_ip6,
2307             "::",
2308             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2309         )
2310         p.tun_if.add_vpp_config()
2311         p.tun_if.admin_up()
2312         p.tun_if.config_ip6()
2313         p.tun_if.generate_remote_hosts(N_NHS)
2314         self.pg0.generate_remote_hosts(N_NHS)
2315         self.pg0.configure_ipv6_neighbors()
2316
2317         # setup some SAs for several next-hops on the interface
2318         self.multi_params = []
2319
2320         for ii in range(N_NHS):
2321             p = copy.copy(self.ipv6_params)
2322
2323             p.remote_tun_if_host = "1::%d" % (ii + 1)
2324             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2325             p.scapy_tun_spi = p.scapy_tun_spi + ii
2326             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2327             p.vpp_tun_spi = p.vpp_tun_spi + ii
2328
2329             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2330             p.scapy_tra_spi = p.scapy_tra_spi + ii
2331             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2332             p.vpp_tra_spi = p.vpp_tra_spi + ii
2333             p.tun_sa_out = VppIpsecSA(
2334                 self,
2335                 p.scapy_tun_sa_id,
2336                 p.scapy_tun_spi,
2337                 p.auth_algo_vpp_id,
2338                 p.auth_key,
2339                 p.crypt_algo_vpp_id,
2340                 p.crypt_key,
2341                 self.vpp_esp_protocol,
2342             )
2343             p.tun_sa_out.add_vpp_config()
2344
2345             p.tun_sa_in = VppIpsecSA(
2346                 self,
2347                 p.vpp_tun_sa_id,
2348                 p.vpp_tun_spi,
2349                 p.auth_algo_vpp_id,
2350                 p.auth_key,
2351                 p.crypt_algo_vpp_id,
2352                 p.crypt_key,
2353                 self.vpp_esp_protocol,
2354             )
2355             p.tun_sa_in.add_vpp_config()
2356
2357             # in this v6 variant add the teibs first then the protection
2358             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2359             VppTeib(
2360                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2361             ).add_vpp_config()
2362
2363             p.tun_protect = VppIpsecTunProtect(
2364                 self,
2365                 p.tun_if,
2366                 p.tun_sa_out,
2367                 [p.tun_sa_in],
2368                 nh=p.tun_if.remote_hosts[ii].ip6,
2369             )
2370             p.tun_protect.add_vpp_config()
2371             config_tra_params(p, self.encryption_type, p.tun_if)
2372             self.multi_params.append(p)
2373
2374             VppIpRoute(
2375                 self,
2376                 p.remote_tun_if_host,
2377                 128,
2378                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2379             ).add_vpp_config()
2380             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2381
2382         self.logger.info(self.vapi.cli("sh log"))
2383         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2384         self.logger.info(self.vapi.cli("sh adj 41"))
2385
2386     def tearDown(self):
2387         p = self.ipv6_params
2388         p.tun_if.unconfig_ip6()
2389         super(TestIpsecMGreIfEspTra6, self).tearDown()
2390
2391     def test_tun_66(self):
2392         """mGRE IPSec 66"""
2393         for p in self.multi_params:
2394             self.verify_tun_66(p, count=63)
2395
2396
2397 @tag_fixme_vpp_workers
2398 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2399     """IPsec IPv4 Tunnel protect - transport mode"""
2400
2401     def setUp(self):
2402         super(TestIpsec4TunProtect, self).setUp()
2403
2404         self.tun_if = self.pg0
2405
2406     def tearDown(self):
2407         super(TestIpsec4TunProtect, self).tearDown()
2408
2409     def test_tun_44(self):
2410         """IPSEC tunnel protect"""
2411
2412         p = self.ipv4_params
2413
2414         self.config_network(p)
2415         self.config_sa_tra(p)
2416         self.config_protect(p)
2417
2418         self.verify_tun_44(p, count=127)
2419         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2420         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2421
2422         self.vapi.cli("clear ipsec sa")
2423         self.verify_tun_64(p, count=127)
2424         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2425         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2426
2427         # rekey - create new SAs and update the tunnel protection
2428         np = copy.copy(p)
2429         np.crypt_key = b"X" + p.crypt_key[1:]
2430         np.scapy_tun_spi += 100
2431         np.scapy_tun_sa_id += 1
2432         np.vpp_tun_spi += 100
2433         np.vpp_tun_sa_id += 1
2434         np.tun_if.local_spi = p.vpp_tun_spi
2435         np.tun_if.remote_spi = p.scapy_tun_spi
2436
2437         self.config_sa_tra(np)
2438         self.config_protect(np)
2439         self.unconfig_sa(p)
2440
2441         self.verify_tun_44(np, count=127)
2442         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2443         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2444
2445         # teardown
2446         self.unconfig_protect(np)
2447         self.unconfig_sa(np)
2448         self.unconfig_network(p)
2449
2450
2451 @tag_fixme_vpp_workers
2452 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2453     """IPsec IPv4 Tunnel protect - transport mode"""
2454
2455     def setUp(self):
2456         super(TestIpsec4TunProtectUdp, self).setUp()
2457
2458         self.tun_if = self.pg0
2459
2460         p = self.ipv4_params
2461         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2462         p.nat_header = UDP(sport=4500, dport=4500)
2463         self.config_network(p)
2464         self.config_sa_tra(p)
2465         self.config_protect(p)
2466
2467     def tearDown(self):
2468         p = self.ipv4_params
2469         self.unconfig_protect(p)
2470         self.unconfig_sa(p)
2471         self.unconfig_network(p)
2472         super(TestIpsec4TunProtectUdp, self).tearDown()
2473
2474     def verify_encrypted(self, p, sa, rxs):
2475         # ensure encrypted packets are recieved with the default UDP ports
2476         for rx in rxs:
2477             self.assertEqual(rx[UDP].sport, 4500)
2478             self.assertEqual(rx[UDP].dport, 4500)
2479         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2480
2481     def test_tun_44(self):
2482         """IPSEC UDP tunnel protect"""
2483
2484         p = self.ipv4_params
2485
2486         self.verify_tun_44(p, count=127)
2487         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2488         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2489
2490     def test_keepalive(self):
2491         """IPSEC NAT Keepalive"""
2492         self.verify_keepalive(self.ipv4_params)
2493
2494
2495 @tag_fixme_vpp_workers
2496 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2497     """IPsec IPv4 Tunnel protect - tunnel mode"""
2498
2499     encryption_type = ESP
2500     tun4_encrypt_node_name = "esp4-encrypt-tun"
2501     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2502
2503     def setUp(self):
2504         super(TestIpsec4TunProtectTun, self).setUp()
2505
2506         self.tun_if = self.pg0
2507
2508     def tearDown(self):
2509         super(TestIpsec4TunProtectTun, self).tearDown()
2510
2511     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2512         return [
2513             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2514             / sa.encrypt(
2515                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2516                 / IP(src=src, dst=dst)
2517                 / UDP(sport=1144, dport=2233)
2518                 / Raw(b"X" * payload_size)
2519             )
2520             for i in range(count)
2521         ]
2522
2523     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2524         return [
2525             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2526             / IP(src=src, dst=dst)
2527             / UDP(sport=1144, dport=2233)
2528             / Raw(b"X" * payload_size)
2529             for i in range(count)
2530         ]
2531
2532     def verify_decrypted(self, p, rxs):
2533         for rx in rxs:
2534             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2535             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2536             self.assert_packet_checksums_valid(rx)
2537
2538     def verify_encrypted(self, p, sa, rxs):
2539         for rx in rxs:
2540             try:
2541                 pkt = sa.decrypt(rx[IP])
2542                 if not pkt.haslayer(IP):
2543                     pkt = IP(pkt[Raw].load)
2544                 self.assert_packet_checksums_valid(pkt)
2545                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2546                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2547                 inner = pkt[IP].payload
2548                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2549
2550             except (IndexError, AssertionError):
2551                 self.logger.debug(ppp("Unexpected packet:", rx))
2552                 try:
2553                     self.logger.debug(ppp("Decrypted packet:", pkt))
2554                 except:
2555                     pass
2556                 raise
2557
2558     def test_tun_44(self):
2559         """IPSEC tunnel protect"""
2560
2561         p = self.ipv4_params
2562
2563         self.config_network(p)
2564         self.config_sa_tun(p)
2565         self.config_protect(p)
2566
2567         # also add an output features on the tunnel and physical interface
2568         # so we test they still work
2569         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2570         a = VppAcl(self, [r_all]).add_vpp_config()
2571
2572         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2573         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2574
2575         self.verify_tun_44(p, count=127)
2576
2577         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2578         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2579
2580         # rekey - create new SAs and update the tunnel protection
2581         np = copy.copy(p)
2582         np.crypt_key = b"X" + p.crypt_key[1:]
2583         np.scapy_tun_spi += 100
2584         np.scapy_tun_sa_id += 1
2585         np.vpp_tun_spi += 100
2586         np.vpp_tun_sa_id += 1
2587         np.tun_if.local_spi = p.vpp_tun_spi
2588         np.tun_if.remote_spi = p.scapy_tun_spi
2589
2590         self.config_sa_tun(np)
2591         self.config_protect(np)
2592         self.unconfig_sa(p)
2593
2594         self.verify_tun_44(np, count=127)
2595         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2596         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2597
2598         # teardown
2599         self.unconfig_protect(np)
2600         self.unconfig_sa(np)
2601         self.unconfig_network(p)
2602
2603
2604 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2605     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2606
2607     encryption_type = ESP
2608     tun4_encrypt_node_name = "esp4-encrypt-tun"
2609     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2610
2611     def setUp(self):
2612         super(TestIpsec4TunProtectTunDrop, self).setUp()
2613
2614         self.tun_if = self.pg0
2615
2616     def tearDown(self):
2617         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2618
2619     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2620         return [
2621             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2622             / sa.encrypt(
2623                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2624                 / IP(src=src, dst=dst)
2625                 / UDP(sport=1144, dport=2233)
2626                 / Raw(b"X" * payload_size)
2627             )
2628             for i in range(count)
2629         ]
2630
2631     def test_tun_drop_44(self):
2632         """IPSEC tunnel protect bogus tunnel header"""
2633
2634         p = self.ipv4_params
2635
2636         self.config_network(p)
2637         self.config_sa_tun(p)
2638         self.config_protect(p)
2639
2640         tx = self.gen_encrypt_pkts(
2641             p,
2642             p.scapy_tun_sa,
2643             self.tun_if,
2644             src=p.remote_tun_if_host,
2645             dst=self.pg1.remote_ip4,
2646             count=63,
2647         )
2648         self.send_and_assert_no_replies(self.tun_if, tx)
2649
2650         # teardown
2651         self.unconfig_protect(p)
2652         self.unconfig_sa(p)
2653         self.unconfig_network(p)
2654
2655
2656 @tag_fixme_vpp_workers
2657 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2658     """IPsec IPv6 Tunnel protect - transport mode"""
2659
2660     encryption_type = ESP
2661     tun6_encrypt_node_name = "esp6-encrypt-tun"
2662     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2663
2664     def setUp(self):
2665         super(TestIpsec6TunProtect, self).setUp()
2666
2667         self.tun_if = self.pg0
2668
2669     def tearDown(self):
2670         super(TestIpsec6TunProtect, self).tearDown()
2671
2672     def test_tun_66(self):
2673         """IPSEC tunnel protect 6o6"""
2674
2675         p = self.ipv6_params
2676
2677         self.config_network(p)
2678         self.config_sa_tra(p)
2679         self.config_protect(p)
2680
2681         self.verify_tun_66(p, count=127)
2682         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2683         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2684
2685         # rekey - create new SAs and update the tunnel protection
2686         np = copy.copy(p)
2687         np.crypt_key = b"X" + p.crypt_key[1:]
2688         np.scapy_tun_spi += 100
2689         np.scapy_tun_sa_id += 1
2690         np.vpp_tun_spi += 100
2691         np.vpp_tun_sa_id += 1
2692         np.tun_if.local_spi = p.vpp_tun_spi
2693         np.tun_if.remote_spi = p.scapy_tun_spi
2694
2695         self.config_sa_tra(np)
2696         self.config_protect(np)
2697         self.unconfig_sa(p)
2698
2699         self.verify_tun_66(np, count=127)
2700         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2701         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2702
2703         # bounce the interface state
2704         p.tun_if.admin_down()
2705         self.verify_drop_tun_66(np, count=127)
2706         node = "/err/ipsec6-tun-input/disabled"
2707         self.assertEqual(127, self.statistics.get_err_counter(node))
2708         p.tun_if.admin_up()
2709         self.verify_tun_66(np, count=127)
2710
2711         # 3 phase rekey
2712         #  1) add two input SAs [old, new]
2713         #  2) swap output SA to [new]
2714         #  3) use only [new] input SA
2715         np3 = copy.copy(np)
2716         np3.crypt_key = b"Z" + p.crypt_key[1:]
2717         np3.scapy_tun_spi += 100
2718         np3.scapy_tun_sa_id += 1
2719         np3.vpp_tun_spi += 100
2720         np3.vpp_tun_sa_id += 1
2721         np3.tun_if.local_spi = p.vpp_tun_spi
2722         np3.tun_if.remote_spi = p.scapy_tun_spi
2723
2724         self.config_sa_tra(np3)
2725
2726         # step 1;
2727         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2728         self.verify_tun_66(np, np, count=127)
2729         self.verify_tun_66(np3, np, count=127)
2730
2731         # step 2;
2732         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2733         self.verify_tun_66(np, np3, count=127)
2734         self.verify_tun_66(np3, np3, count=127)
2735
2736         # step 1;
2737         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2738         self.verify_tun_66(np3, np3, count=127)
2739         self.verify_drop_tun_rx_66(np, count=127)
2740
2741         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2742         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2743         self.unconfig_sa(np)
2744
2745         # teardown
2746         self.unconfig_protect(np3)
2747         self.unconfig_sa(np3)
2748         self.unconfig_network(p)
2749
2750     def test_tun_46(self):
2751         """IPSEC tunnel protect 4o6"""
2752
2753         p = self.ipv6_params
2754
2755         self.config_network(p)
2756         self.config_sa_tra(p)
2757         self.config_protect(p)
2758
2759         self.verify_tun_46(p, count=127)
2760         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2761         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2762
2763         # teardown
2764         self.unconfig_protect(p)
2765         self.unconfig_sa(p)
2766         self.unconfig_network(p)
2767
2768
2769 @tag_fixme_vpp_workers
2770 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2771     """IPsec IPv6 Tunnel protect - tunnel mode"""
2772
2773     encryption_type = ESP
2774     tun6_encrypt_node_name = "esp6-encrypt-tun"
2775     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2776
2777     def setUp(self):
2778         super(TestIpsec6TunProtectTun, self).setUp()
2779
2780         self.tun_if = self.pg0
2781
2782     def tearDown(self):
2783         super(TestIpsec6TunProtectTun, self).tearDown()
2784
2785     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2786         return [
2787             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2788             / sa.encrypt(
2789                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2790                 / IPv6(src=src, dst=dst)
2791                 / UDP(sport=1166, dport=2233)
2792                 / Raw(b"X" * payload_size)
2793             )
2794             for i in range(count)
2795         ]
2796
2797     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2798         return [
2799             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2800             / IPv6(src=src, dst=dst)
2801             / UDP(sport=1166, dport=2233)
2802             / Raw(b"X" * payload_size)
2803             for i in range(count)
2804         ]
2805
2806     def verify_decrypted6(self, p, rxs):
2807         for rx in rxs:
2808             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2809             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2810             self.assert_packet_checksums_valid(rx)
2811
2812     def verify_encrypted6(self, p, sa, rxs):
2813         for rx in rxs:
2814             try:
2815                 pkt = sa.decrypt(rx[IPv6])
2816                 if not pkt.haslayer(IPv6):
2817                     pkt = IPv6(pkt[Raw].load)
2818                 self.assert_packet_checksums_valid(pkt)
2819                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2820                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2821                 inner = pkt[IPv6].payload
2822                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2823
2824             except (IndexError, AssertionError):
2825                 self.logger.debug(ppp("Unexpected packet:", rx))
2826                 try:
2827                     self.logger.debug(ppp("Decrypted packet:", pkt))
2828                 except:
2829                     pass
2830                 raise
2831
2832     def test_tun_66(self):
2833         """IPSEC tunnel protect"""
2834
2835         p = self.ipv6_params
2836
2837         self.config_network(p)
2838         self.config_sa_tun(p)
2839         self.config_protect(p)
2840
2841         self.verify_tun_66(p, count=127)
2842
2843         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2844         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2845
2846         # rekey - create new SAs and update the tunnel protection
2847         np = copy.copy(p)
2848         np.crypt_key = b"X" + p.crypt_key[1:]
2849         np.scapy_tun_spi += 100
2850         np.scapy_tun_sa_id += 1
2851         np.vpp_tun_spi += 100
2852         np.vpp_tun_sa_id += 1
2853         np.tun_if.local_spi = p.vpp_tun_spi
2854         np.tun_if.remote_spi = p.scapy_tun_spi
2855
2856         self.config_sa_tun(np)
2857         self.config_protect(np)
2858         self.unconfig_sa(p)
2859
2860         self.verify_tun_66(np, count=127)
2861         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2862         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2863
2864         # teardown
2865         self.unconfig_protect(np)
2866         self.unconfig_sa(np)
2867         self.unconfig_network(p)
2868
2869
2870 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2871     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2872
2873     encryption_type = ESP
2874     tun6_encrypt_node_name = "esp6-encrypt-tun"
2875     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2876
2877     def setUp(self):
2878         super(TestIpsec6TunProtectTunDrop, self).setUp()
2879
2880         self.tun_if = self.pg0
2881
2882     def tearDown(self):
2883         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2884
2885     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2886         # the IP destination of the revelaed packet does not match
2887         # that assigned to the tunnel
2888         return [
2889             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2890             / sa.encrypt(
2891                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2892                 / IPv6(src=src, dst=dst)
2893                 / UDP(sport=1144, dport=2233)
2894                 / Raw(b"X" * payload_size)
2895             )
2896             for i in range(count)
2897         ]
2898
2899     def test_tun_drop_66(self):
2900         """IPSEC 6 tunnel protect bogus tunnel header"""
2901
2902         p = self.ipv6_params
2903
2904         self.config_network(p)
2905         self.config_sa_tun(p)
2906         self.config_protect(p)
2907
2908         tx = self.gen_encrypt_pkts6(
2909             p,
2910             p.scapy_tun_sa,
2911             self.tun_if,
2912             src=p.remote_tun_if_host,
2913             dst=self.pg1.remote_ip6,
2914             count=63,
2915         )
2916         self.send_and_assert_no_replies(self.tun_if, tx)
2917
2918         self.unconfig_protect(p)
2919         self.unconfig_sa(p)
2920         self.unconfig_network(p)
2921
2922
2923 class TemplateIpsecItf4(object):
2924     """IPsec Interface IPv4"""
2925
2926     encryption_type = ESP
2927     tun4_encrypt_node_name = "esp4-encrypt-tun"
2928     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2929     tun4_input_node = "ipsec4-tun-input"
2930
2931     def config_sa_tun(self, p, src, dst):
2932         config_tun_params(p, self.encryption_type, None, src, dst)
2933
2934         p.tun_sa_out = VppIpsecSA(
2935             self,
2936             p.scapy_tun_sa_id,
2937             p.scapy_tun_spi,
2938             p.auth_algo_vpp_id,
2939             p.auth_key,
2940             p.crypt_algo_vpp_id,
2941             p.crypt_key,
2942             self.vpp_esp_protocol,
2943             src,
2944             dst,
2945             flags=p.flags,
2946         )
2947         p.tun_sa_out.add_vpp_config()
2948
2949         p.tun_sa_in = VppIpsecSA(
2950             self,
2951             p.vpp_tun_sa_id,
2952             p.vpp_tun_spi,
2953             p.auth_algo_vpp_id,
2954             p.auth_key,
2955             p.crypt_algo_vpp_id,
2956             p.crypt_key,
2957             self.vpp_esp_protocol,
2958             dst,
2959             src,
2960             flags=p.flags,
2961         )
2962         p.tun_sa_in.add_vpp_config()
2963
2964     def config_protect(self, p):
2965         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2966         p.tun_protect.add_vpp_config()
2967
2968     def config_network(self, p, instance=0xFFFFFFFF):
2969         p.tun_if = VppIpsecInterface(self, instance=instance)
2970
2971         p.tun_if.add_vpp_config()
2972         p.tun_if.admin_up()
2973         p.tun_if.config_ip4()
2974         p.tun_if.config_ip6()
2975
2976         p.route = VppIpRoute(
2977             self,
2978             p.remote_tun_if_host,
2979             32,
2980             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
2981         )
2982         p.route.add_vpp_config()
2983         r = VppIpRoute(
2984             self,
2985             p.remote_tun_if_host6,
2986             128,
2987             [
2988                 VppRoutePath(
2989                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2990                 )
2991             ],
2992         )
2993         r.add_vpp_config()
2994
2995     def unconfig_network(self, p):
2996         p.route.remove_vpp_config()
2997         p.tun_if.remove_vpp_config()
2998
2999     def unconfig_protect(self, p):
3000         p.tun_protect.remove_vpp_config()
3001
3002     def unconfig_sa(self, p):
3003         p.tun_sa_out.remove_vpp_config()
3004         p.tun_sa_in.remove_vpp_config()
3005
3006
3007 @tag_fixme_vpp_workers
3008 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3009     """IPsec Interface IPv4"""
3010
3011     def setUp(self):
3012         super(TestIpsecItf4, self).setUp()
3013
3014         self.tun_if = self.pg0
3015
3016     def tearDown(self):
3017         super(TestIpsecItf4, self).tearDown()
3018
3019     def test_tun_instance_44(self):
3020         p = self.ipv4_params
3021         self.config_network(p, instance=3)
3022
3023         with self.assertRaises(CliFailedCommandError):
3024             self.vapi.cli("show interface ipsec0")
3025
3026         output = self.vapi.cli("show interface ipsec3")
3027         self.assertTrue("unknown" not in output)
3028
3029         self.unconfig_network(p)
3030
3031     def test_tun_44(self):
3032         """IPSEC interface IPv4"""
3033
3034         n_pkts = 127
3035         p = self.ipv4_params
3036
3037         self.config_network(p)
3038         config_tun_params(
3039             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3040         )
3041         self.verify_tun_dropped_44(p, count=n_pkts)
3042         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3043         self.config_protect(p)
3044
3045         self.verify_tun_44(p, count=n_pkts)
3046         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3047         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3048
3049         p.tun_if.admin_down()
3050         self.verify_tun_dropped_44(p, count=n_pkts)
3051         p.tun_if.admin_up()
3052         self.verify_tun_44(p, count=n_pkts)
3053
3054         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3055         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3056
3057         # it's a v6 packet when its encrypted
3058         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3059
3060         self.verify_tun_64(p, count=n_pkts)
3061         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3062         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3063
3064         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3065
3066         self.vapi.cli("clear interfaces")
3067
3068         # rekey - create new SAs and update the tunnel protection
3069         np = copy.copy(p)
3070         np.crypt_key = b"X" + p.crypt_key[1:]
3071         np.scapy_tun_spi += 100
3072         np.scapy_tun_sa_id += 1
3073         np.vpp_tun_spi += 100
3074         np.vpp_tun_sa_id += 1
3075         np.tun_if.local_spi = p.vpp_tun_spi
3076         np.tun_if.remote_spi = p.scapy_tun_spi
3077
3078         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3079         self.config_protect(np)
3080         self.unconfig_sa(p)
3081
3082         self.verify_tun_44(np, count=n_pkts)
3083         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3084         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3085
3086         # teardown
3087         self.unconfig_protect(np)
3088         self.unconfig_sa(np)
3089         self.unconfig_network(p)
3090
3091     def test_tun_44_null(self):
3092         """IPSEC interface IPv4 NULL auth/crypto"""
3093
3094         n_pkts = 127
3095         p = copy.copy(self.ipv4_params)
3096
3097         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3098         p.crypt_algo_vpp_id = (
3099             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3100         )
3101         p.crypt_algo = "NULL"
3102         p.auth_algo = "NULL"
3103
3104         self.config_network(p)
3105         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3106         self.config_protect(p)
3107
3108         self.logger.info(self.vapi.cli("sh ipsec sa"))
3109         self.verify_tun_44(p, count=n_pkts)
3110
3111         # teardown
3112         self.unconfig_protect(p)
3113         self.unconfig_sa(p)
3114         self.unconfig_network(p)
3115
3116     def test_tun_44_police(self):
3117         """IPSEC interface IPv4 with input policer"""
3118         n_pkts = 127
3119         p = self.ipv4_params
3120
3121         self.config_network(p)
3122         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3123         self.config_protect(p)
3124
3125         action_tx = PolicerAction(
3126             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3127         )
3128         policer = VppPolicer(
3129             self,
3130             "pol1",
3131             80,
3132             0,
3133             1000,
3134             0,
3135             conform_action=action_tx,
3136             exceed_action=action_tx,
3137             violate_action=action_tx,
3138         )
3139         policer.add_vpp_config()
3140
3141         # Start policing on tun
3142         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3143
3144         self.verify_tun_44(p, count=n_pkts)
3145         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3146         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3147
3148         stats = policer.get_stats()
3149
3150         # Single rate, 2 colour policer - expect conform, violate but no exceed
3151         self.assertGreater(stats["conform_packets"], 0)
3152         self.assertEqual(stats["exceed_packets"], 0)
3153         self.assertGreater(stats["violate_packets"], 0)
3154
3155         # Stop policing on tun
3156         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3157         self.verify_tun_44(p, count=n_pkts)
3158
3159         # No new policer stats
3160         statsnew = policer.get_stats()
3161         self.assertEqual(stats, statsnew)
3162
3163         # teardown
3164         policer.remove_vpp_config()
3165         self.unconfig_protect(p)
3166         self.unconfig_sa(p)
3167         self.unconfig_network(p)
3168
3169
3170 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3171     """IPsec Interface MPLSoIPv4"""
3172
3173     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3174
3175     def setUp(self):
3176         super(TestIpsecItf4MPLS, self).setUp()
3177
3178         self.tun_if = self.pg0
3179
3180     def tearDown(self):
3181         super(TestIpsecItf4MPLS, self).tearDown()
3182
3183     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3184         return [
3185             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3186             / sa.encrypt(
3187                 MPLS(label=44, ttl=3)
3188                 / IP(src=src, dst=dst)
3189                 / UDP(sport=1166, dport=2233)
3190                 / Raw(b"X" * payload_size)
3191             )
3192             for i in range(count)
3193         ]
3194
3195     def verify_encrypted(self, p, sa, rxs):
3196         for rx in rxs:
3197             try:
3198                 pkt = sa.decrypt(rx[IP])
3199                 if not pkt.haslayer(IP):
3200                     pkt = IP(pkt[Raw].load)
3201                 self.assert_packet_checksums_valid(pkt)
3202                 self.assert_equal(pkt[MPLS].label, 44)
3203                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3204             except (IndexError, AssertionError):
3205                 self.logger.debug(ppp("Unexpected packet:", rx))
3206                 try:
3207                     self.logger.debug(ppp("Decrypted packet:", pkt))
3208                 except:
3209                     pass
3210                 raise
3211
3212     def test_tun_mpls_o_ip4(self):
3213         """IPSEC interface MPLS over IPv4"""
3214
3215         n_pkts = 127
3216         p = self.ipv4_params
3217         f = FibPathProto
3218
3219         tbl = VppMplsTable(self, 0)
3220         tbl.add_vpp_config()
3221
3222         self.config_network(p)
3223         # deag MPLS routes from the tunnel
3224         r4 = VppMplsRoute(
3225             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3226         ).add_vpp_config()
3227         p.route.modify(
3228             [
3229                 VppRoutePath(
3230                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3231                 )
3232             ]
3233         )
3234         p.tun_if.enable_mpls()
3235
3236         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3237         self.config_protect(p)
3238
3239         self.verify_tun_44(p, count=n_pkts)
3240
3241         # cleanup
3242         p.tun_if.disable_mpls()
3243         self.unconfig_protect(p)
3244         self.unconfig_sa(p)
3245         self.unconfig_network(p)
3246
3247
3248 class TemplateIpsecItf6(object):
3249     """IPsec Interface IPv6"""
3250
3251     encryption_type = ESP
3252     tun6_encrypt_node_name = "esp6-encrypt-tun"
3253     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3254     tun6_input_node = "ipsec6-tun-input"
3255
3256     def config_sa_tun(self, p, src, dst):
3257         config_tun_params(p, self.encryption_type, None, src, dst)
3258
3259         if not hasattr(p, "tun_flags"):
3260             p.tun_flags = None
3261         if not hasattr(p, "hop_limit"):
3262             p.hop_limit = 255
3263
3264         p.tun_sa_out = VppIpsecSA(
3265             self,
3266             p.scapy_tun_sa_id,
3267             p.scapy_tun_spi,
3268             p.auth_algo_vpp_id,
3269             p.auth_key,
3270             p.crypt_algo_vpp_id,
3271             p.crypt_key,
3272             self.vpp_esp_protocol,
3273             src,
3274             dst,
3275             flags=p.flags,
3276             tun_flags=p.tun_flags,
3277             hop_limit=p.hop_limit,
3278         )
3279         p.tun_sa_out.add_vpp_config()
3280
3281         p.tun_sa_in = VppIpsecSA(
3282             self,
3283             p.vpp_tun_sa_id,
3284             p.vpp_tun_spi,
3285             p.auth_algo_vpp_id,
3286             p.auth_key,
3287             p.crypt_algo_vpp_id,
3288             p.crypt_key,
3289             self.vpp_esp_protocol,
3290             dst,
3291             src,
3292             flags=p.flags,
3293         )
3294         p.tun_sa_in.add_vpp_config()
3295
3296     def config_protect(self, p):
3297         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3298         p.tun_protect.add_vpp_config()
3299
3300     def config_network(self, p):
3301         p.tun_if = VppIpsecInterface(self)
3302
3303         p.tun_if.add_vpp_config()
3304         p.tun_if.admin_up()
3305         p.tun_if.config_ip4()
3306         p.tun_if.config_ip6()
3307
3308         r = VppIpRoute(
3309             self,
3310             p.remote_tun_if_host4,
3311             32,
3312             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3313         )
3314         r.add_vpp_config()
3315
3316         p.route = VppIpRoute(
3317             self,
3318             p.remote_tun_if_host,
3319             128,
3320             [
3321                 VppRoutePath(
3322                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3323                 )
3324             ],
3325         )
3326         p.route.add_vpp_config()
3327
3328     def unconfig_network(self, p):
3329         p.route.remove_vpp_config()
3330         p.tun_if.remove_vpp_config()
3331
3332     def unconfig_protect(self, p):
3333         p.tun_protect.remove_vpp_config()
3334
3335     def unconfig_sa(self, p):
3336         p.tun_sa_out.remove_vpp_config()
3337         p.tun_sa_in.remove_vpp_config()
3338
3339
3340 @tag_fixme_vpp_workers
3341 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3342     """IPsec Interface IPv6"""
3343
3344     def setUp(self):
3345         super(TestIpsecItf6, self).setUp()
3346
3347         self.tun_if = self.pg0
3348
3349     def tearDown(self):
3350         super(TestIpsecItf6, self).tearDown()
3351
3352     def test_tun_66(self):
3353         """IPSEC interface IPv6"""
3354
3355         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3356         n_pkts = 127
3357         p = self.ipv6_params
3358         p.inner_hop_limit = 24
3359         p.outer_hop_limit = 23
3360         p.outer_flow_label = 243224
3361         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3362
3363         self.config_network(p)
3364         config_tun_params(
3365             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3366         )
3367         self.verify_drop_tun_66(p, count=n_pkts)
3368         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3369         self.config_protect(p)
3370
3371         self.verify_tun_66(p, count=n_pkts)
3372         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3373         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3374
3375         p.tun_if.admin_down()
3376         self.verify_drop_tun_66(p, count=n_pkts)
3377         p.tun_if.admin_up()
3378         self.verify_tun_66(p, count=n_pkts)
3379
3380         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3381         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3382
3383         # it's a v4 packet when its encrypted
3384         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3385
3386         self.verify_tun_46(p, count=n_pkts)
3387         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3388         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3389
3390         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3391
3392         self.vapi.cli("clear interfaces")
3393
3394         # rekey - create new SAs and update the tunnel protection
3395         np = copy.copy(p)
3396         np.crypt_key = b"X" + p.crypt_key[1:]
3397         np.scapy_tun_spi += 100
3398         np.scapy_tun_sa_id += 1
3399         np.vpp_tun_spi += 100
3400         np.vpp_tun_sa_id += 1
3401         np.tun_if.local_spi = p.vpp_tun_spi
3402         np.tun_if.remote_spi = p.scapy_tun_spi
3403         np.inner_hop_limit = 24
3404         np.outer_hop_limit = 128
3405         np.inner_flow_label = 0xABCDE
3406         np.outer_flow_label = 0xABCDE
3407         np.hop_limit = 128
3408         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3409
3410         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3411         self.config_protect(np)
3412         self.unconfig_sa(p)
3413
3414         self.verify_tun_66(np, count=n_pkts)
3415         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3416         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3417
3418         # teardown
3419         self.unconfig_protect(np)
3420         self.unconfig_sa(np)
3421         self.unconfig_network(p)
3422
3423     def test_tun_66_police(self):
3424         """IPSEC interface IPv6 with input policer"""
3425         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3426         n_pkts = 127
3427         p = self.ipv6_params
3428         p.inner_hop_limit = 24
3429         p.outer_hop_limit = 23
3430         p.outer_flow_label = 243224
3431         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3432
3433         self.config_network(p)
3434         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3435         self.config_protect(p)
3436
3437         action_tx = PolicerAction(
3438             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3439         )
3440         policer = VppPolicer(
3441             self,
3442             "pol1",
3443             80,
3444             0,
3445             1000,
3446             0,
3447             conform_action=action_tx,
3448             exceed_action=action_tx,
3449             violate_action=action_tx,
3450         )
3451         policer.add_vpp_config()
3452
3453         # Start policing on tun
3454         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3455
3456         self.verify_tun_66(p, count=n_pkts)
3457         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3458         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3459
3460         stats = policer.get_stats()
3461
3462         # Single rate, 2 colour policer - expect conform, violate but no exceed
3463         self.assertGreater(stats["conform_packets"], 0)
3464         self.assertEqual(stats["exceed_packets"], 0)
3465         self.assertGreater(stats["violate_packets"], 0)
3466
3467         # Stop policing on tun
3468         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3469         self.verify_tun_66(p, count=n_pkts)
3470
3471         # No new policer stats
3472         statsnew = policer.get_stats()
3473         self.assertEqual(stats, statsnew)
3474
3475         # teardown
3476         policer.remove_vpp_config()
3477         self.unconfig_protect(p)
3478         self.unconfig_sa(p)
3479         self.unconfig_network(p)
3480
3481
3482 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3483     """Ipsec P2MP ESP v4 tests"""
3484
3485     tun4_encrypt_node_name = "esp4-encrypt-tun"
3486     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3487     encryption_type = ESP
3488
3489     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3490         return [
3491             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3492             / sa.encrypt(
3493                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3494                 / UDP(sport=1144, dport=2233)
3495                 / Raw(b"X" * payload_size)
3496             )
3497             for i in range(count)
3498         ]
3499
3500     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3501         return [
3502             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3503             / IP(src="1.1.1.1", dst=dst)
3504             / UDP(sport=1144, dport=2233)
3505             / Raw(b"X" * payload_size)
3506             for i in range(count)
3507         ]
3508
3509     def verify_decrypted(self, p, rxs):
3510         for rx in rxs:
3511             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3512             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3513
3514     def verify_encrypted(self, p, sa, rxs):
3515         for rx in rxs:
3516             try:
3517                 self.assertEqual(
3518                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3519                 )
3520                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3521                 pkt = sa.decrypt(rx[IP])
3522                 if not pkt.haslayer(IP):
3523                     pkt = IP(pkt[Raw].load)
3524                 self.assert_packet_checksums_valid(pkt)
3525                 e = pkt[IP]
3526                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3527             except (IndexError, AssertionError):
3528                 self.logger.debug(ppp("Unexpected packet:", rx))
3529                 try:
3530                     self.logger.debug(ppp("Decrypted packet:", pkt))
3531                 except:
3532                     pass
3533                 raise
3534
3535     def setUp(self):
3536         super(TestIpsecMIfEsp4, self).setUp()
3537
3538         N_NHS = 16
3539         self.tun_if = self.pg0
3540         p = self.ipv4_params
3541         p.tun_if = VppIpsecInterface(
3542             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3543         )
3544         p.tun_if.add_vpp_config()
3545         p.tun_if.admin_up()
3546         p.tun_if.config_ip4()
3547         p.tun_if.unconfig_ip4()
3548         p.tun_if.config_ip4()
3549         p.tun_if.generate_remote_hosts(N_NHS)
3550         self.pg0.generate_remote_hosts(N_NHS)
3551         self.pg0.configure_ipv4_neighbors()
3552
3553         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3554         a = VppAcl(self, [r_all]).add_vpp_config()
3555
3556         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3557         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3558
3559         # setup some SAs for several next-hops on the interface
3560         self.multi_params = []
3561
3562         for ii in range(N_NHS):
3563             p = copy.copy(self.ipv4_params)
3564
3565             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3566             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3567             p.scapy_tun_spi = p.scapy_tun_spi + ii
3568             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3569             p.vpp_tun_spi = p.vpp_tun_spi + ii
3570
3571             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3572             p.scapy_tra_spi = p.scapy_tra_spi + ii
3573             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3574             p.vpp_tra_spi = p.vpp_tra_spi + ii
3575             p.hop_limit = ii + 10
3576             p.tun_sa_out = VppIpsecSA(
3577                 self,
3578                 p.scapy_tun_sa_id,
3579                 p.scapy_tun_spi,
3580                 p.auth_algo_vpp_id,
3581                 p.auth_key,
3582                 p.crypt_algo_vpp_id,
3583                 p.crypt_key,
3584                 self.vpp_esp_protocol,
3585                 self.pg0.local_ip4,
3586                 self.pg0.remote_hosts[ii].ip4,
3587                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3588                 hop_limit=p.hop_limit,
3589             )
3590             p.tun_sa_out.add_vpp_config()
3591
3592             p.tun_sa_in = VppIpsecSA(
3593                 self,
3594                 p.vpp_tun_sa_id,
3595                 p.vpp_tun_spi,
3596                 p.auth_algo_vpp_id,
3597                 p.auth_key,
3598                 p.crypt_algo_vpp_id,
3599                 p.crypt_key,
3600                 self.vpp_esp_protocol,
3601                 self.pg0.remote_hosts[ii].ip4,
3602                 self.pg0.local_ip4,
3603                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3604                 hop_limit=p.hop_limit,
3605             )
3606             p.tun_sa_in.add_vpp_config()
3607
3608             p.tun_protect = VppIpsecTunProtect(
3609                 self,
3610                 p.tun_if,
3611                 p.tun_sa_out,
3612                 [p.tun_sa_in],
3613                 nh=p.tun_if.remote_hosts[ii].ip4,
3614             )
3615             p.tun_protect.add_vpp_config()
3616             config_tun_params(
3617                 p,
3618                 self.encryption_type,
3619                 None,
3620                 self.pg0.local_ip4,
3621                 self.pg0.remote_hosts[ii].ip4,
3622             )
3623             self.multi_params.append(p)
3624
3625             p.via_tun_route = VppIpRoute(
3626                 self,
3627                 p.remote_tun_if_host,
3628                 32,
3629                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3630             ).add_vpp_config()
3631
3632             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3633
3634     def tearDown(self):
3635         p = self.ipv4_params
3636         p.tun_if.unconfig_ip4()
3637         super(TestIpsecMIfEsp4, self).tearDown()
3638
3639     def test_tun_44(self):
3640         """P2MP IPSEC 44"""
3641         N_PKTS = 63
3642         for p in self.multi_params:
3643             self.verify_tun_44(p, count=N_PKTS)
3644
3645         # remove one tunnel protect, the rest should still work
3646         self.multi_params[0].tun_protect.remove_vpp_config()
3647         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3648         self.multi_params[0].via_tun_route.remove_vpp_config()
3649         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3650
3651         for p in self.multi_params[1:]:
3652             self.verify_tun_44(p, count=N_PKTS)
3653
3654         self.multi_params[0].tun_protect.add_vpp_config()
3655         self.multi_params[0].via_tun_route.add_vpp_config()
3656
3657         for p in self.multi_params:
3658             self.verify_tun_44(p, count=N_PKTS)
3659
3660
3661 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3662     """IPsec Interface MPLSoIPv6"""
3663
3664     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3665
3666     def setUp(self):
3667         super(TestIpsecItf6MPLS, self).setUp()
3668
3669         self.tun_if = self.pg0
3670
3671     def tearDown(self):
3672         super(TestIpsecItf6MPLS, self).tearDown()
3673
3674     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3675         return [
3676             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3677             / sa.encrypt(
3678                 MPLS(label=66, ttl=3)
3679                 / IPv6(src=src, dst=dst)
3680                 / UDP(sport=1166, dport=2233)
3681                 / Raw(b"X" * payload_size)
3682             )
3683             for i in range(count)
3684         ]
3685
3686     def verify_encrypted6(self, p, sa, rxs):
3687         for rx in rxs:
3688             try:
3689                 pkt = sa.decrypt(rx[IPv6])
3690                 if not pkt.haslayer(IPv6):
3691                     pkt = IP(pkt[Raw].load)
3692                 self.assert_packet_checksums_valid(pkt)
3693                 self.assert_equal(pkt[MPLS].label, 66)
3694                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3695             except (IndexError, AssertionError):
3696                 self.logger.debug(ppp("Unexpected packet:", rx))
3697                 try:
3698                     self.logger.debug(ppp("Decrypted packet:", pkt))
3699                 except:
3700                     pass
3701                 raise
3702
3703     def test_tun_mpls_o_ip6(self):
3704         """IPSEC interface MPLS over IPv6"""
3705
3706         n_pkts = 127
3707         p = self.ipv6_params
3708         f = FibPathProto
3709
3710         tbl = VppMplsTable(self, 0)
3711         tbl.add_vpp_config()
3712
3713         self.config_network(p)
3714         # deag MPLS routes from the tunnel
3715         r6 = VppMplsRoute(
3716             self,
3717             66,
3718             1,
3719             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3720             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3721         ).add_vpp_config()
3722         p.route.modify(
3723             [
3724                 VppRoutePath(
3725                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3726                 )
3727             ]
3728         )
3729         p.tun_if.enable_mpls()
3730
3731         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3732         self.config_protect(p)
3733
3734         self.verify_tun_66(p, count=n_pkts)
3735
3736         # cleanup
3737         p.tun_if.disable_mpls()
3738         self.unconfig_protect(p)
3739         self.unconfig_sa(p)
3740         self.unconfig_network(p)
3741
3742
3743 if __name__ == "__main__":
3744     unittest.main(testRunner=VppTestRunner)