ipsec: allow receiving encrypted IP packets with TFC padding
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP, ICMP
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
10 from scapy.contrib.mpls import MPLS
11 from asfframework import VppTestRunner, tag_fixme_vpp_workers
12 from template_ipsec import (
13     TemplateIpsec,
14     IpsecTun4Tests,
15     IpsecTun6Tests,
16     IpsecTun4,
17     IpsecTun6,
18     IpsecTcpTests,
19     mk_scapy_crypt_key,
20     IpsecTun6HandoffTests,
21     IpsecTun4HandoffTests,
22     config_tun_params,
23 )
24 from vpp_gre_interface import VppGreInterface
25 from vpp_ipip_tun_interface import VppIpIpTunInterface
26 from vpp_ip_route import (
27     VppIpRoute,
28     VppRoutePath,
29     DpoProto,
30     VppMplsLabel,
31     VppMplsTable,
32     VppMplsRoute,
33     FibPathProto,
34 )
35 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
36 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
37 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
38 from vpp_teib import VppTeib
39 from util import ppp
40 from vpp_papi import VppEnum
41 from vpp_papi_provider import CliFailedCommandError
42 from vpp_acl import AclRule, VppAcl, VppAclInterface
43 from vpp_policer import PolicerAction, VppPolicer, Dir
44
45
46 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
47     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
48     esn_en = bool(
49         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
50     )
51     crypt_key = mk_scapy_crypt_key(p)
52     if tun_if:
53         p.tun_dst = tun_if.remote_ip
54         p.tun_src = tun_if.local_ip
55     else:
56         p.tun_dst = dst
57         p.tun_src = src
58
59     if p.nat_header:
60         is_default_port = p.nat_header.dport == 4500
61     else:
62         is_default_port = True
63
64     if is_default_port:
65         outbound_nat_header = p.nat_header
66     else:
67         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
68         bind_layers(UDP, ESP, dport=p.nat_header.dport)
69
70     p.scapy_tun_sa = SecurityAssociation(
71         encryption_type,
72         spi=p.scapy_tun_spi,
73         crypt_algo=p.crypt_algo,
74         crypt_key=crypt_key,
75         auth_algo=p.auth_algo,
76         auth_key=p.auth_key,
77         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
78         nat_t_header=outbound_nat_header,
79         esn_en=esn_en,
80     )
81     p.vpp_tun_sa = SecurityAssociation(
82         encryption_type,
83         spi=p.vpp_tun_spi,
84         crypt_algo=p.crypt_algo,
85         crypt_key=crypt_key,
86         auth_algo=p.auth_algo,
87         auth_key=p.auth_key,
88         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
89         nat_t_header=p.nat_header,
90         esn_en=esn_en,
91     )
92
93
94 def config_tra_params(p, encryption_type, tun_if):
95     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
96     esn_en = bool(
97         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
98     )
99     crypt_key = mk_scapy_crypt_key(p)
100     p.tun_dst = tun_if.remote_ip
101     p.tun_src = tun_if.local_ip
102
103     if p.nat_header:
104         is_default_port = p.nat_header.dport == 4500
105     else:
106         is_default_port = True
107
108     if is_default_port:
109         outbound_nat_header = p.nat_header
110     else:
111         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
112         bind_layers(UDP, ESP, dport=p.nat_header.dport)
113
114     p.scapy_tun_sa = SecurityAssociation(
115         encryption_type,
116         spi=p.scapy_tun_spi,
117         crypt_algo=p.crypt_algo,
118         crypt_key=crypt_key,
119         auth_algo=p.auth_algo,
120         auth_key=p.auth_key,
121         esn_en=esn_en,
122         nat_t_header=outbound_nat_header,
123     )
124     p.vpp_tun_sa = SecurityAssociation(
125         encryption_type,
126         spi=p.vpp_tun_spi,
127         crypt_algo=p.crypt_algo,
128         crypt_key=crypt_key,
129         auth_algo=p.auth_algo,
130         auth_key=p.auth_key,
131         esn_en=esn_en,
132         nat_t_header=p.nat_header,
133     )
134
135
136 class TemplateIpsec4TunProtect(object):
137     """IPsec IPv4 Tunnel protect"""
138
139     encryption_type = ESP
140     tun4_encrypt_node_name = "esp4-encrypt-tun"
141     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
142     tun4_input_node = "ipsec4-tun-input"
143
144     def config_sa_tra(self, p):
145         config_tun_params(p, self.encryption_type, p.tun_if)
146
147         p.tun_sa_out = VppIpsecSA(
148             self,
149             p.vpp_tun_sa_id,
150             p.vpp_tun_spi,
151             p.auth_algo_vpp_id,
152             p.auth_key,
153             p.crypt_algo_vpp_id,
154             p.crypt_key,
155             self.vpp_esp_protocol,
156             flags=p.flags,
157         )
158         p.tun_sa_out.add_vpp_config()
159
160         p.tun_sa_in = VppIpsecSA(
161             self,
162             p.scapy_tun_sa_id,
163             p.scapy_tun_spi,
164             p.auth_algo_vpp_id,
165             p.auth_key,
166             p.crypt_algo_vpp_id,
167             p.crypt_key,
168             self.vpp_esp_protocol,
169             flags=p.flags,
170         )
171         p.tun_sa_in.add_vpp_config()
172
173     def config_sa_tun(self, p):
174         config_tun_params(p, self.encryption_type, p.tun_if)
175
176         p.tun_sa_out = VppIpsecSA(
177             self,
178             p.vpp_tun_sa_id,
179             p.vpp_tun_spi,
180             p.auth_algo_vpp_id,
181             p.auth_key,
182             p.crypt_algo_vpp_id,
183             p.crypt_key,
184             self.vpp_esp_protocol,
185             self.tun_if.local_addr[p.addr_type],
186             self.tun_if.remote_addr[p.addr_type],
187             flags=p.flags,
188         )
189         p.tun_sa_out.add_vpp_config()
190
191         p.tun_sa_in = VppIpsecSA(
192             self,
193             p.scapy_tun_sa_id,
194             p.scapy_tun_spi,
195             p.auth_algo_vpp_id,
196             p.auth_key,
197             p.crypt_algo_vpp_id,
198             p.crypt_key,
199             self.vpp_esp_protocol,
200             self.tun_if.remote_addr[p.addr_type],
201             self.tun_if.local_addr[p.addr_type],
202             flags=p.flags,
203         )
204         p.tun_sa_in.add_vpp_config()
205
206     def config_protect(self, p):
207         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
208         p.tun_protect.add_vpp_config()
209
210     def config_network(self, p):
211         if hasattr(p, "tun_dst"):
212             tun_dst = p.tun_dst
213         else:
214             tun_dst = self.pg0.remote_ip4
215         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
216         p.tun_if.add_vpp_config()
217         p.tun_if.admin_up()
218         p.tun_if.config_ip4()
219         p.tun_if.config_ip6()
220
221         p.route = VppIpRoute(
222             self,
223             p.remote_tun_if_host,
224             32,
225             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
226         )
227         p.route.add_vpp_config()
228         r = VppIpRoute(
229             self,
230             p.remote_tun_if_host6,
231             128,
232             [
233                 VppRoutePath(
234                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
235                 )
236             ],
237         )
238         r.add_vpp_config()
239
240     def unconfig_network(self, p):
241         p.route.remove_vpp_config()
242         p.tun_if.remove_vpp_config()
243
244     def unconfig_protect(self, p):
245         p.tun_protect.remove_vpp_config()
246
247     def unconfig_sa(self, p):
248         p.tun_sa_out.remove_vpp_config()
249         p.tun_sa_in.remove_vpp_config()
250
251
252 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
253     """IPsec tunnel interface tests"""
254
255     encryption_type = ESP
256
257     @classmethod
258     def setUpClass(cls):
259         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
260
261     @classmethod
262     def tearDownClass(cls):
263         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
264
265     def setUp(self):
266         super(TemplateIpsec4TunIfEsp, self).setUp()
267
268         self.tun_if = self.pg0
269
270         p = self.ipv4_params
271
272         self.config_network(p)
273         self.config_sa_tra(p)
274         self.config_protect(p)
275
276     def tearDown(self):
277         super(TemplateIpsec4TunIfEsp, self).tearDown()
278
279
280 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
281     """IPsec UDP tunnel interface tests"""
282
283     tun4_encrypt_node_name = "esp4-encrypt-tun"
284     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
285     encryption_type = ESP
286
287     @classmethod
288     def setUpClass(cls):
289         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
290
291     @classmethod
292     def tearDownClass(cls):
293         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
294
295     def verify_encrypted(self, p, sa, rxs):
296         for rx in rxs:
297             try:
298                 # ensure the UDP ports are correct before we decrypt
299                 # which strips them
300                 self.assertTrue(rx.haslayer(UDP))
301                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
302                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
303
304                 pkt = sa.decrypt(rx[IP])
305                 if not pkt.haslayer(IP):
306                     pkt = IP(pkt[Raw].load)
307
308                 self.assert_packet_checksums_valid(pkt)
309                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
310                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
311             except (IndexError, AssertionError):
312                 self.logger.debug(ppp("Unexpected packet:", rx))
313                 try:
314                     self.logger.debug(ppp("Decrypted packet:", pkt))
315                 except:
316                     pass
317                 raise
318
319     def config_sa_tra(self, p):
320         config_tun_params(p, self.encryption_type, p.tun_if)
321
322         p.tun_sa_out = VppIpsecSA(
323             self,
324             p.vpp_tun_sa_id,
325             p.vpp_tun_spi,
326             p.auth_algo_vpp_id,
327             p.auth_key,
328             p.crypt_algo_vpp_id,
329             p.crypt_key,
330             self.vpp_esp_protocol,
331             flags=p.flags,
332             udp_src=p.nat_header.sport,
333             udp_dst=p.nat_header.dport,
334         )
335         p.tun_sa_out.add_vpp_config()
336
337         p.tun_sa_in = VppIpsecSA(
338             self,
339             p.scapy_tun_sa_id,
340             p.scapy_tun_spi,
341             p.auth_algo_vpp_id,
342             p.auth_key,
343             p.crypt_algo_vpp_id,
344             p.crypt_key,
345             self.vpp_esp_protocol,
346             flags=p.flags
347             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
348             udp_src=p.nat_header.sport,
349             udp_dst=p.nat_header.dport,
350         )
351         p.tun_sa_in.add_vpp_config()
352
353     def setUp(self):
354         super(TemplateIpsec4TunIfEspUdp, self).setUp()
355
356         p = self.ipv4_params
357         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358         p.nat_header = UDP(sport=5454, dport=4500)
359
360         self.tun_if = self.pg0
361
362         self.config_network(p)
363         self.config_sa_tra(p)
364         self.config_protect(p)
365
366     def tearDown(self):
367         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
368
369
370 class TemplateIpsec4TunTfc:
371     """IPsec IPv4 tunnel with TFC"""
372
373     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
374         pkt = (
375             IP(src=src, dst=dst, len=28 + payload_size)
376             / ICMP()
377             / Raw(b"X" * payload_size)
378             / Padding(b"Y" * 100)
379         )
380         return [
381             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(pkt)
382             for i in range(count)
383         ]
384
385     def verify_decrypted(self, p, rxs):
386         for rx in rxs:
387             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
388             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
389             self.assert_equal(rx[IP].len, len(rx[IP]))
390             self.assert_packet_checksums_valid(rx)
391
392
393 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
394     """Ipsec ESP - TUN tests"""
395
396     tun4_encrypt_node_name = "esp4-encrypt-tun"
397     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
398
399     def test_tun_basic64(self):
400         """ipsec 6o4 tunnel basic test"""
401         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
402
403         self.verify_tun_64(self.params[socket.AF_INET], count=1)
404
405     def test_tun_burst64(self):
406         """ipsec 6o4 tunnel basic test"""
407         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
408
409         self.verify_tun_64(self.params[socket.AF_INET], count=257)
410
411     def test_tun_basic_frag44(self):
412         """ipsec 4o4 tunnel frag basic test"""
413         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
414
415         p = self.ipv4_params
416
417         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
418         self.verify_tun_44(
419             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
420         )
421         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
422
423
424 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
425     """Ipsec ESP UDP tests"""
426
427     tun4_input_node = "ipsec4-tun-input"
428
429     def setUp(self):
430         super(TestIpsec4TunIfEspUdp, self).setUp()
431
432     def test_keepalive(self):
433         """IPSEC NAT Keepalive"""
434         self.verify_keepalive(self.ipv4_params)
435
436
437 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
438     """Ipsec ESP UDP GCM tests"""
439
440     tun4_input_node = "ipsec4-tun-input"
441
442     def setUp(self):
443         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
444         p = self.ipv4_params
445         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
446         p.crypt_algo_vpp_id = (
447             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
448         )
449         p.crypt_algo = "AES-GCM"
450         p.auth_algo = "NULL"
451         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
452         p.salt = 0
453
454
455 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
456     """Ipsec ESP UDP update tests"""
457
458     tun4_input_node = "ipsec4-tun-input"
459
460     def setUp(self):
461         super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
462         p = self.ipv4_params
463         p.nat_header = UDP(sport=6565, dport=7676)
464         config_tun_params(p, self.encryption_type, p.tun_if)
465         p.tun_sa_in.update_vpp_config(
466             udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
467         )
468         p.tun_sa_out.update_vpp_config(
469             udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
470         )
471
472
473 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
474     """Ipsec ESP - TCP tests"""
475
476     pass
477
478
479 class TemplateIpsec6TunProtect(object):
480     """IPsec IPv6 Tunnel protect"""
481
482     def config_sa_tra(self, p):
483         config_tun_params(p, self.encryption_type, p.tun_if)
484
485         p.tun_sa_out = VppIpsecSA(
486             self,
487             p.vpp_tun_sa_id,
488             p.vpp_tun_spi,
489             p.auth_algo_vpp_id,
490             p.auth_key,
491             p.crypt_algo_vpp_id,
492             p.crypt_key,
493             self.vpp_esp_protocol,
494         )
495         p.tun_sa_out.add_vpp_config()
496
497         p.tun_sa_in = VppIpsecSA(
498             self,
499             p.scapy_tun_sa_id,
500             p.scapy_tun_spi,
501             p.auth_algo_vpp_id,
502             p.auth_key,
503             p.crypt_algo_vpp_id,
504             p.crypt_key,
505             self.vpp_esp_protocol,
506         )
507         p.tun_sa_in.add_vpp_config()
508
509     def config_sa_tun(self, p):
510         config_tun_params(p, self.encryption_type, p.tun_if)
511
512         p.tun_sa_out = VppIpsecSA(
513             self,
514             p.vpp_tun_sa_id,
515             p.vpp_tun_spi,
516             p.auth_algo_vpp_id,
517             p.auth_key,
518             p.crypt_algo_vpp_id,
519             p.crypt_key,
520             self.vpp_esp_protocol,
521             self.tun_if.local_addr[p.addr_type],
522             self.tun_if.remote_addr[p.addr_type],
523         )
524         p.tun_sa_out.add_vpp_config()
525
526         p.tun_sa_in = VppIpsecSA(
527             self,
528             p.scapy_tun_sa_id,
529             p.scapy_tun_spi,
530             p.auth_algo_vpp_id,
531             p.auth_key,
532             p.crypt_algo_vpp_id,
533             p.crypt_key,
534             self.vpp_esp_protocol,
535             self.tun_if.remote_addr[p.addr_type],
536             self.tun_if.local_addr[p.addr_type],
537         )
538         p.tun_sa_in.add_vpp_config()
539
540     def config_protect(self, p):
541         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
542         p.tun_protect.add_vpp_config()
543
544     def config_network(self, p):
545         if hasattr(p, "tun_dst"):
546             tun_dst = p.tun_dst
547         else:
548             tun_dst = self.pg0.remote_ip6
549         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
550         p.tun_if.add_vpp_config()
551         p.tun_if.admin_up()
552         p.tun_if.config_ip6()
553         p.tun_if.config_ip4()
554
555         p.route = VppIpRoute(
556             self,
557             p.remote_tun_if_host,
558             128,
559             [
560                 VppRoutePath(
561                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
562                 )
563             ],
564         )
565         p.route.add_vpp_config()
566         r = VppIpRoute(
567             self,
568             p.remote_tun_if_host4,
569             32,
570             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
571         )
572         r.add_vpp_config()
573
574     def unconfig_network(self, p):
575         p.route.remove_vpp_config()
576         p.tun_if.remove_vpp_config()
577
578     def unconfig_protect(self, p):
579         p.tun_protect.remove_vpp_config()
580
581     def unconfig_sa(self, p):
582         p.tun_sa_out.remove_vpp_config()
583         p.tun_sa_in.remove_vpp_config()
584
585
586 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
587     """IPsec tunnel interface tests"""
588
589     encryption_type = ESP
590
591     def setUp(self):
592         super(TemplateIpsec6TunIfEsp, self).setUp()
593
594         self.tun_if = self.pg0
595
596         p = self.ipv6_params
597         self.config_network(p)
598         self.config_sa_tra(p)
599         self.config_protect(p)
600
601     def tearDown(self):
602         super(TemplateIpsec6TunIfEsp, self).tearDown()
603
604
605 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
606     """IPsec6 UDP tunnel interface tests"""
607
608     tun4_encrypt_node_name = "esp6-encrypt-tun"
609     tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
610     encryption_type = ESP
611
612     @classmethod
613     def setUpClass(cls):
614         super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
615
616     @classmethod
617     def tearDownClass(cls):
618         super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
619
620     def verify_encrypted(self, p, sa, rxs):
621         for rx in rxs:
622             try:
623                 # ensure the UDP ports are correct before we decrypt
624                 # which strips them
625                 self.assertTrue(rx.haslayer(UDP))
626                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
627                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
628
629                 pkt = sa.decrypt(rx[IP])
630                 if not pkt.haslayer(IP):
631                     pkt = IP(pkt[Raw].load)
632
633                 self.assert_packet_checksums_valid(pkt)
634                 self.assert_equal(
635                     pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
636                 )
637                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
638             except (IndexError, AssertionError):
639                 self.logger.debug(ppp("Unexpected packet:", rx))
640                 try:
641                     self.logger.debug(ppp("Decrypted packet:", pkt))
642                 except:
643                     pass
644                 raise
645
646     def config_sa_tra(self, p):
647         config_tun_params(p, self.encryption_type, p.tun_if)
648
649         p.tun_sa_out = VppIpsecSA(
650             self,
651             p.vpp_tun_sa_id,
652             p.vpp_tun_spi,
653             p.auth_algo_vpp_id,
654             p.auth_key,
655             p.crypt_algo_vpp_id,
656             p.crypt_key,
657             self.vpp_esp_protocol,
658             flags=p.flags,
659             udp_src=p.nat_header.sport,
660             udp_dst=p.nat_header.dport,
661         )
662         p.tun_sa_out.add_vpp_config()
663
664         p.tun_sa_in = VppIpsecSA(
665             self,
666             p.scapy_tun_sa_id,
667             p.scapy_tun_spi,
668             p.auth_algo_vpp_id,
669             p.auth_key,
670             p.crypt_algo_vpp_id,
671             p.crypt_key,
672             self.vpp_esp_protocol,
673             flags=p.flags
674             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
675             udp_src=p.nat_header.sport,
676             udp_dst=p.nat_header.dport,
677         )
678         p.tun_sa_in.add_vpp_config()
679
680     def setUp(self):
681         super(TemplateIpsec6TunIfEspUdp, self).setUp()
682
683         p = self.ipv6_params
684         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
685         p.nat_header = UDP(sport=5454, dport=4500)
686
687         self.tun_if = self.pg0
688
689         self.config_network(p)
690         self.config_sa_tra(p)
691         self.config_protect(p)
692
693     def tearDown(self):
694         super(TemplateIpsec6TunIfEspUdp, self).tearDown()
695
696
697 class TemplateIpsec6TunTfc:
698     """IPsec IPv6 tunnel with TFC"""
699
700     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
701         return [
702             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
703             / sa.encrypt(
704                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
705                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
706                 / Padding(b"Y" * 100)
707             )
708             for i in range(count)
709         ]
710
711     def verify_decrypted6(self, p, rxs):
712         for rx in rxs:
713             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
714             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
715             self.assert_equal(rx[IPv6].plen, len(rx[IPv6].payload))
716             self.assert_packet_checksums_valid(rx)
717
718
719 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
720     """Ipsec ESP 6 UDP tests"""
721
722     tun6_input_node = "ipsec6-tun-input"
723     tun6_encrypt_node_name = "esp6-encrypt-tun"
724     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
725
726     def setUp(self):
727         super(TestIpsec6TunIfEspUdp, self).setUp()
728
729     def test_keepalive(self):
730         """IPSEC6 NAT Keepalive"""
731         self.verify_keepalive(self.ipv6_params)
732
733
734 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
735     """Ipsec ESP 6 UDP GCM tests"""
736
737     tun6_input_node = "ipsec6-tun-input"
738     tun6_encrypt_node_name = "esp6-encrypt-tun"
739     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
740
741     def setUp(self):
742         super(TestIpsec6TunIfEspUdpGCM, self).setUp()
743         p = self.ipv6_params
744         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
745         p.crypt_algo_vpp_id = (
746             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
747         )
748         p.crypt_algo = "AES-GCM"
749         p.auth_algo = "NULL"
750         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
751         p.salt = 0
752
753
754 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
755     """Ipsec ESP - TUN tests"""
756
757     tun6_encrypt_node_name = "esp6-encrypt-tun"
758     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
759
760     def test_tun_basic46(self):
761         """ipsec 4o6 tunnel basic test"""
762         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
763         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
764
765     def test_tun_burst46(self):
766         """ipsec 4o6 tunnel burst test"""
767         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
768         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
769
770
771 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
772     """Ipsec ESP 6 Handoff tests"""
773
774     tun6_encrypt_node_name = "esp6-encrypt-tun"
775     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
776
777     def test_tun_handoff_66_police(self):
778         """ESP 6o6 tunnel with policer worker hand-off test"""
779         self.vapi.cli("clear errors")
780         self.vapi.cli("clear ipsec sa")
781
782         N_PKTS = 15
783         p = self.params[socket.AF_INET6]
784
785         action_tx = PolicerAction(
786             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
787         )
788         policer = VppPolicer(
789             self,
790             "pol1",
791             80,
792             0,
793             1000,
794             0,
795             conform_action=action_tx,
796             exceed_action=action_tx,
797             violate_action=action_tx,
798         )
799         policer.add_vpp_config()
800
801         # Start policing on tun
802         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
803
804         for pol_bind in [1, 0]:
805             policer.bind_vpp_config(pol_bind, True)
806
807             # inject alternately on worker 0 and 1.
808             for worker in [0, 1, 0, 1]:
809                 send_pkts = self.gen_encrypt_pkts6(
810                     p,
811                     p.scapy_tun_sa,
812                     self.tun_if,
813                     src=p.remote_tun_if_host,
814                     dst=self.pg1.remote_ip6,
815                     count=N_PKTS,
816                 )
817                 recv_pkts = self.send_and_expect(
818                     self.tun_if, send_pkts, self.pg1, worker=worker
819                 )
820                 self.verify_decrypted6(p, recv_pkts)
821                 self.logger.debug(self.vapi.cli("show trace max 100"))
822
823             stats = policer.get_stats()
824             stats0 = policer.get_stats(worker=0)
825             stats1 = policer.get_stats(worker=1)
826
827             if pol_bind == 1:
828                 # First pass: Worker 1, should have done all the policing
829                 self.assertEqual(stats, stats1)
830
831                 # Worker 0, should have handed everything off
832                 self.assertEqual(stats0["conform_packets"], 0)
833                 self.assertEqual(stats0["exceed_packets"], 0)
834                 self.assertEqual(stats0["violate_packets"], 0)
835             else:
836                 # Second pass: both workers should have policed equal amounts
837                 self.assertGreater(stats1["conform_packets"], 0)
838                 self.assertEqual(stats1["exceed_packets"], 0)
839                 self.assertGreater(stats1["violate_packets"], 0)
840
841                 self.assertGreater(stats0["conform_packets"], 0)
842                 self.assertEqual(stats0["exceed_packets"], 0)
843                 self.assertGreater(stats0["violate_packets"], 0)
844
845                 self.assertEqual(
846                     stats0["conform_packets"] + stats0["violate_packets"],
847                     stats1["conform_packets"] + stats1["violate_packets"],
848                 )
849
850         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
851         policer.remove_vpp_config()
852
853
854 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
855     """Ipsec ESP 4 Handoff tests"""
856
857     tun4_encrypt_node_name = "esp4-encrypt-tun"
858     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
859
860     def test_tun_handoff_44_police(self):
861         """ESP 4o4 tunnel with policer worker hand-off test"""
862         self.vapi.cli("clear errors")
863         self.vapi.cli("clear ipsec sa")
864
865         N_PKTS = 15
866         p = self.params[socket.AF_INET]
867
868         action_tx = PolicerAction(
869             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
870         )
871         policer = VppPolicer(
872             self,
873             "pol1",
874             80,
875             0,
876             1000,
877             0,
878             conform_action=action_tx,
879             exceed_action=action_tx,
880             violate_action=action_tx,
881         )
882         policer.add_vpp_config()
883
884         # Start policing on tun
885         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
886
887         for pol_bind in [1, 0]:
888             policer.bind_vpp_config(pol_bind, True)
889
890             # inject alternately on worker 0 and 1.
891             for worker in [0, 1, 0, 1]:
892                 send_pkts = self.gen_encrypt_pkts(
893                     p,
894                     p.scapy_tun_sa,
895                     self.tun_if,
896                     src=p.remote_tun_if_host,
897                     dst=self.pg1.remote_ip4,
898                     count=N_PKTS,
899                 )
900                 recv_pkts = self.send_and_expect(
901                     self.tun_if, send_pkts, self.pg1, worker=worker
902                 )
903                 self.verify_decrypted(p, recv_pkts)
904                 self.logger.debug(self.vapi.cli("show trace max 100"))
905
906             stats = policer.get_stats()
907             stats0 = policer.get_stats(worker=0)
908             stats1 = policer.get_stats(worker=1)
909
910             if pol_bind == 1:
911                 # First pass: Worker 1, should have done all the policing
912                 self.assertEqual(stats, stats1)
913
914                 # Worker 0, should have handed everything off
915                 self.assertEqual(stats0["conform_packets"], 0)
916                 self.assertEqual(stats0["exceed_packets"], 0)
917                 self.assertEqual(stats0["violate_packets"], 0)
918             else:
919                 # Second pass: both workers should have policed equal amounts
920                 self.assertGreater(stats1["conform_packets"], 0)
921                 self.assertEqual(stats1["exceed_packets"], 0)
922                 self.assertGreater(stats1["violate_packets"], 0)
923
924                 self.assertGreater(stats0["conform_packets"], 0)
925                 self.assertEqual(stats0["exceed_packets"], 0)
926                 self.assertGreater(stats0["violate_packets"], 0)
927
928                 self.assertEqual(
929                     stats0["conform_packets"] + stats0["violate_packets"],
930                     stats1["conform_packets"] + stats1["violate_packets"],
931                 )
932
933         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
934         policer.remove_vpp_config()
935
936
937 @tag_fixme_vpp_workers
938 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
939     """IPsec IPv4 Multi Tunnel interface"""
940
941     encryption_type = ESP
942     tun4_encrypt_node_name = "esp4-encrypt-tun"
943     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
944
945     def setUp(self):
946         super(TestIpsec4MultiTunIfEsp, self).setUp()
947
948         self.tun_if = self.pg0
949
950         self.multi_params = []
951         self.pg0.generate_remote_hosts(10)
952         self.pg0.configure_ipv4_neighbors()
953
954         for ii in range(10):
955             p = copy.copy(self.ipv4_params)
956
957             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
958             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
959             p.scapy_tun_spi = p.scapy_tun_spi + ii
960             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
961             p.vpp_tun_spi = p.vpp_tun_spi + ii
962
963             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
964             p.scapy_tra_spi = p.scapy_tra_spi + ii
965             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
966             p.vpp_tra_spi = p.vpp_tra_spi + ii
967             p.tun_dst = self.pg0.remote_hosts[ii].ip4
968
969             self.multi_params.append(p)
970             self.config_network(p)
971             self.config_sa_tra(p)
972             self.config_protect(p)
973
974     def tearDown(self):
975         super(TestIpsec4MultiTunIfEsp, self).tearDown()
976
977     def test_tun_44(self):
978         """Multiple IPSEC tunnel interfaces"""
979         for p in self.multi_params:
980             self.verify_tun_44(p, count=127)
981             self.assertEqual(p.tun_if.get_rx_stats(), 127)
982             self.assertEqual(p.tun_if.get_tx_stats(), 127)
983
984     def test_tun_rr_44(self):
985         """Round-robin packets acrros multiple interface"""
986         tx = []
987         for p in self.multi_params:
988             tx = tx + self.gen_encrypt_pkts(
989                 p,
990                 p.scapy_tun_sa,
991                 self.tun_if,
992                 src=p.remote_tun_if_host,
993                 dst=self.pg1.remote_ip4,
994             )
995         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
996
997         for rx, p in zip(rxs, self.multi_params):
998             self.verify_decrypted(p, [rx])
999
1000         tx = []
1001         for p in self.multi_params:
1002             tx = tx + self.gen_pkts(
1003                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
1004             )
1005         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
1006
1007         for rx, p in zip(rxs, self.multi_params):
1008             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
1009
1010
1011 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1012     """IPsec IPv4 Tunnel interface all Algos"""
1013
1014     encryption_type = ESP
1015     tun4_encrypt_node_name = "esp4-encrypt-tun"
1016     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1017
1018     def setUp(self):
1019         super(TestIpsec4TunIfEspAll, self).setUp()
1020
1021         self.tun_if = self.pg0
1022         p = self.ipv4_params
1023
1024         self.config_network(p)
1025         self.config_sa_tra(p)
1026         self.config_protect(p)
1027
1028     def tearDown(self):
1029         p = self.ipv4_params
1030         self.unconfig_protect(p)
1031         self.unconfig_network(p)
1032         self.unconfig_sa(p)
1033
1034         super(TestIpsec4TunIfEspAll, self).tearDown()
1035
1036     def rekey(self, p):
1037         #
1038         # change the key and the SPI
1039         #
1040         np = copy.copy(p)
1041         p.crypt_key = b"X" + p.crypt_key[1:]
1042         p.scapy_tun_spi += 1
1043         p.scapy_tun_sa_id += 1
1044         p.vpp_tun_spi += 1
1045         p.vpp_tun_sa_id += 1
1046         p.tun_if.local_spi = p.vpp_tun_spi
1047         p.tun_if.remote_spi = p.scapy_tun_spi
1048
1049         config_tun_params(p, self.encryption_type, p.tun_if)
1050
1051         p.tun_sa_out = VppIpsecSA(
1052             self,
1053             p.vpp_tun_sa_id,
1054             p.vpp_tun_spi,
1055             p.auth_algo_vpp_id,
1056             p.auth_key,
1057             p.crypt_algo_vpp_id,
1058             p.crypt_key,
1059             self.vpp_esp_protocol,
1060             flags=p.flags,
1061             salt=p.salt,
1062         )
1063         p.tun_sa_in = VppIpsecSA(
1064             self,
1065             p.scapy_tun_sa_id,
1066             p.scapy_tun_spi,
1067             p.auth_algo_vpp_id,
1068             p.auth_key,
1069             p.crypt_algo_vpp_id,
1070             p.crypt_key,
1071             self.vpp_esp_protocol,
1072             flags=p.flags,
1073             salt=p.salt,
1074         )
1075         p.tun_sa_in.add_vpp_config()
1076         p.tun_sa_out.add_vpp_config()
1077
1078         self.config_protect(p)
1079         np.tun_sa_out.remove_vpp_config()
1080         np.tun_sa_in.remove_vpp_config()
1081         self.logger.info(self.vapi.cli("sh ipsec sa"))
1082
1083     def test_tun_44(self):
1084         """IPSEC tunnel all algos"""
1085
1086         # foreach VPP crypto engine
1087         engines = ["ia32", "ipsecmb", "openssl"]
1088
1089         # foreach crypto algorithm
1090         algos = [
1091             {
1092                 "vpp-crypto": (
1093                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1094                 ),
1095                 "vpp-integ": (
1096                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1097                 ),
1098                 "scapy-crypto": "AES-GCM",
1099                 "scapy-integ": "NULL",
1100                 "key": b"JPjyOWBeVEQiMe7h",
1101                 "salt": 3333,
1102             },
1103             {
1104                 "vpp-crypto": (
1105                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1106                 ),
1107                 "vpp-integ": (
1108                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1109                 ),
1110                 "scapy-crypto": "AES-GCM",
1111                 "scapy-integ": "NULL",
1112                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1113                 "salt": 0,
1114             },
1115             {
1116                 "vpp-crypto": (
1117                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1118                 ),
1119                 "vpp-integ": (
1120                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1121                 ),
1122                 "scapy-crypto": "AES-GCM",
1123                 "scapy-integ": "NULL",
1124                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1125                 "salt": 9999,
1126             },
1127             {
1128                 "vpp-crypto": (
1129                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1130                 ),
1131                 "vpp-integ": (
1132                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1133                 ),
1134                 "scapy-crypto": "AES-CBC",
1135                 "scapy-integ": "HMAC-SHA1-96",
1136                 "salt": 0,
1137                 "key": b"JPjyOWBeVEQiMe7h",
1138             },
1139             {
1140                 "vpp-crypto": (
1141                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1142                 ),
1143                 "vpp-integ": (
1144                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1145                 ),
1146                 "scapy-crypto": "AES-CBC",
1147                 "scapy-integ": "SHA2-512-256",
1148                 "salt": 0,
1149                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1150             },
1151             {
1152                 "vpp-crypto": (
1153                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1154                 ),
1155                 "vpp-integ": (
1156                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1157                 ),
1158                 "scapy-crypto": "AES-CBC",
1159                 "scapy-integ": "SHA2-256-128",
1160                 "salt": 0,
1161                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1162             },
1163             {
1164                 "vpp-crypto": (
1165                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1166                 ),
1167                 "vpp-integ": (
1168                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1169                 ),
1170                 "scapy-crypto": "NULL",
1171                 "scapy-integ": "HMAC-SHA1-96",
1172                 "salt": 0,
1173                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1174             },
1175         ]
1176
1177         for engine in engines:
1178             self.vapi.cli("set crypto handler all %s" % engine)
1179
1180             #
1181             # loop through each of the algorithms
1182             #
1183             for algo in algos:
1184                 # with self.subTest(algo=algo['scapy']):
1185
1186                 p = self.ipv4_params
1187                 p.auth_algo_vpp_id = algo["vpp-integ"]
1188                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1189                 p.crypt_algo = algo["scapy-crypto"]
1190                 p.auth_algo = algo["scapy-integ"]
1191                 p.crypt_key = algo["key"]
1192                 p.salt = algo["salt"]
1193
1194                 #
1195                 # rekey the tunnel
1196                 #
1197                 self.rekey(p)
1198                 self.verify_tun_44(p, count=127)
1199
1200
1201 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1202     """IPsec IPv4 Tunnel interface no Algos"""
1203
1204     encryption_type = ESP
1205     tun4_encrypt_node_name = "esp4-encrypt-tun"
1206     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1207
1208     def setUp(self):
1209         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1210
1211         self.tun_if = self.pg0
1212         p = self.ipv4_params
1213         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1214         p.auth_algo = "NULL"
1215         p.auth_key = []
1216
1217         p.crypt_algo_vpp_id = (
1218             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1219         )
1220         p.crypt_algo = "NULL"
1221         p.crypt_key = []
1222
1223     def tearDown(self):
1224         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1225
1226     def test_tun_44(self):
1227         """IPSec SA with NULL algos"""
1228         p = self.ipv4_params
1229
1230         self.config_network(p)
1231         self.config_sa_tra(p)
1232         self.config_protect(p)
1233
1234         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1235         self.send_and_assert_no_replies(self.pg1, tx)
1236
1237         self.unconfig_protect(p)
1238         self.unconfig_sa(p)
1239         self.unconfig_network(p)
1240
1241
1242 @tag_fixme_vpp_workers
1243 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1244     """IPsec IPv6 Multi Tunnel interface"""
1245
1246     encryption_type = ESP
1247     tun6_encrypt_node_name = "esp6-encrypt-tun"
1248     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1249
1250     def setUp(self):
1251         super(TestIpsec6MultiTunIfEsp, self).setUp()
1252
1253         self.tun_if = self.pg0
1254
1255         self.multi_params = []
1256         self.pg0.generate_remote_hosts(10)
1257         self.pg0.configure_ipv6_neighbors()
1258
1259         for ii in range(10):
1260             p = copy.copy(self.ipv6_params)
1261
1262             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1263             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1264             p.scapy_tun_spi = p.scapy_tun_spi + ii
1265             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1266             p.vpp_tun_spi = p.vpp_tun_spi + ii
1267
1268             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1269             p.scapy_tra_spi = p.scapy_tra_spi + ii
1270             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1271             p.vpp_tra_spi = p.vpp_tra_spi + ii
1272             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1273
1274             self.multi_params.append(p)
1275             self.config_network(p)
1276             self.config_sa_tra(p)
1277             self.config_protect(p)
1278
1279     def tearDown(self):
1280         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1281
1282     def test_tun_66(self):
1283         """Multiple IPSEC tunnel interfaces"""
1284         for p in self.multi_params:
1285             self.verify_tun_66(p, count=127)
1286             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1287             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1288
1289
1290 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1291     """Ipsec GRE TEB ESP - TUN tests"""
1292
1293     tun4_encrypt_node_name = "esp4-encrypt-tun"
1294     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1295     encryption_type = ESP
1296     omac = "00:11:22:33:44:55"
1297
1298     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1299         return [
1300             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1301             / sa.encrypt(
1302                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1303                 / GRE()
1304                 / Ether(dst=self.omac)
1305                 / IP(src="1.1.1.1", dst="1.1.1.2")
1306                 / UDP(sport=1144, dport=2233)
1307                 / Raw(b"X" * payload_size)
1308             )
1309             for i in range(count)
1310         ]
1311
1312     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1313         return [
1314             Ether(dst=self.omac)
1315             / IP(src="1.1.1.1", dst="1.1.1.2")
1316             / UDP(sport=1144, dport=2233)
1317             / Raw(b"X" * payload_size)
1318             for i in range(count)
1319         ]
1320
1321     def verify_decrypted(self, p, rxs):
1322         for rx in rxs:
1323             self.assert_equal(rx[Ether].dst, self.omac)
1324             self.assert_equal(rx[IP].dst, "1.1.1.2")
1325
1326     def verify_encrypted(self, p, sa, rxs):
1327         for rx in rxs:
1328             try:
1329                 pkt = sa.decrypt(rx[IP])
1330                 if not pkt.haslayer(IP):
1331                     pkt = IP(pkt[Raw].load)
1332                 self.assert_packet_checksums_valid(pkt)
1333                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1334                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1335                 self.assertTrue(pkt.haslayer(GRE))
1336                 e = pkt[Ether]
1337                 self.assertEqual(e[Ether].dst, self.omac)
1338                 self.assertEqual(e[IP].dst, "1.1.1.2")
1339             except (IndexError, AssertionError):
1340                 self.logger.debug(ppp("Unexpected packet:", rx))
1341                 try:
1342                     self.logger.debug(ppp("Decrypted packet:", pkt))
1343                 except:
1344                     pass
1345                 raise
1346
1347     def setUp(self):
1348         super(TestIpsecGreTebIfEsp, self).setUp()
1349
1350         self.tun_if = self.pg0
1351
1352         p = self.ipv4_params
1353
1354         bd1 = VppBridgeDomain(self, 1)
1355         bd1.add_vpp_config()
1356
1357         p.tun_sa_out = VppIpsecSA(
1358             self,
1359             p.vpp_tun_sa_id,
1360             p.vpp_tun_spi,
1361             p.auth_algo_vpp_id,
1362             p.auth_key,
1363             p.crypt_algo_vpp_id,
1364             p.crypt_key,
1365             self.vpp_esp_protocol,
1366             self.pg0.local_ip4,
1367             self.pg0.remote_ip4,
1368         )
1369         p.tun_sa_out.add_vpp_config()
1370
1371         p.tun_sa_in = VppIpsecSA(
1372             self,
1373             p.scapy_tun_sa_id,
1374             p.scapy_tun_spi,
1375             p.auth_algo_vpp_id,
1376             p.auth_key,
1377             p.crypt_algo_vpp_id,
1378             p.crypt_key,
1379             self.vpp_esp_protocol,
1380             self.pg0.remote_ip4,
1381             self.pg0.local_ip4,
1382         )
1383         p.tun_sa_in.add_vpp_config()
1384
1385         p.tun_if = VppGreInterface(
1386             self,
1387             self.pg0.local_ip4,
1388             self.pg0.remote_ip4,
1389             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1390         )
1391         p.tun_if.add_vpp_config()
1392
1393         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1394
1395         p.tun_protect.add_vpp_config()
1396
1397         p.tun_if.admin_up()
1398         p.tun_if.config_ip4()
1399         config_tun_params(p, self.encryption_type, p.tun_if)
1400
1401         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1402         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1403
1404         self.vapi.cli("clear ipsec sa")
1405         self.vapi.cli("sh adj")
1406         self.vapi.cli("sh ipsec tun")
1407
1408     def tearDown(self):
1409         p = self.ipv4_params
1410         p.tun_if.unconfig_ip4()
1411         super(TestIpsecGreTebIfEsp, self).tearDown()
1412
1413
1414 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1415     """Ipsec GRE TEB ESP - TUN tests"""
1416
1417     tun4_encrypt_node_name = "esp4-encrypt-tun"
1418     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1419     encryption_type = ESP
1420     omac = "00:11:22:33:44:55"
1421
1422     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1423         return [
1424             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1425             / sa.encrypt(
1426                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1427                 / GRE()
1428                 / Ether(dst=self.omac)
1429                 / IP(src="1.1.1.1", dst="1.1.1.2")
1430                 / UDP(sport=1144, dport=2233)
1431                 / Raw(b"X" * payload_size)
1432             )
1433             for i in range(count)
1434         ]
1435
1436     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1437         return [
1438             Ether(dst=self.omac)
1439             / Dot1Q(vlan=11)
1440             / IP(src="1.1.1.1", dst="1.1.1.2")
1441             / UDP(sport=1144, dport=2233)
1442             / Raw(b"X" * payload_size)
1443             for i in range(count)
1444         ]
1445
1446     def verify_decrypted(self, p, rxs):
1447         for rx in rxs:
1448             self.assert_equal(rx[Ether].dst, self.omac)
1449             self.assert_equal(rx[Dot1Q].vlan, 11)
1450             self.assert_equal(rx[IP].dst, "1.1.1.2")
1451
1452     def verify_encrypted(self, p, sa, rxs):
1453         for rx in rxs:
1454             try:
1455                 pkt = sa.decrypt(rx[IP])
1456                 if not pkt.haslayer(IP):
1457                     pkt = IP(pkt[Raw].load)
1458                 self.assert_packet_checksums_valid(pkt)
1459                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1460                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1461                 self.assertTrue(pkt.haslayer(GRE))
1462                 e = pkt[Ether]
1463                 self.assertEqual(e[Ether].dst, self.omac)
1464                 self.assertFalse(e.haslayer(Dot1Q))
1465                 self.assertEqual(e[IP].dst, "1.1.1.2")
1466             except (IndexError, AssertionError):
1467                 self.logger.debug(ppp("Unexpected packet:", rx))
1468                 try:
1469                     self.logger.debug(ppp("Decrypted packet:", pkt))
1470                 except:
1471                     pass
1472                 raise
1473
1474     def setUp(self):
1475         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1476
1477         self.tun_if = self.pg0
1478
1479         p = self.ipv4_params
1480
1481         bd1 = VppBridgeDomain(self, 1)
1482         bd1.add_vpp_config()
1483
1484         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1485         self.vapi.l2_interface_vlan_tag_rewrite(
1486             sw_if_index=self.pg1_11.sw_if_index,
1487             vtr_op=L2_VTR_OP.L2_POP_1,
1488             push_dot1q=11,
1489         )
1490         self.pg1_11.admin_up()
1491
1492         p.tun_sa_out = VppIpsecSA(
1493             self,
1494             p.vpp_tun_sa_id,
1495             p.vpp_tun_spi,
1496             p.auth_algo_vpp_id,
1497             p.auth_key,
1498             p.crypt_algo_vpp_id,
1499             p.crypt_key,
1500             self.vpp_esp_protocol,
1501             self.pg0.local_ip4,
1502             self.pg0.remote_ip4,
1503         )
1504         p.tun_sa_out.add_vpp_config()
1505
1506         p.tun_sa_in = VppIpsecSA(
1507             self,
1508             p.scapy_tun_sa_id,
1509             p.scapy_tun_spi,
1510             p.auth_algo_vpp_id,
1511             p.auth_key,
1512             p.crypt_algo_vpp_id,
1513             p.crypt_key,
1514             self.vpp_esp_protocol,
1515             self.pg0.remote_ip4,
1516             self.pg0.local_ip4,
1517         )
1518         p.tun_sa_in.add_vpp_config()
1519
1520         p.tun_if = VppGreInterface(
1521             self,
1522             self.pg0.local_ip4,
1523             self.pg0.remote_ip4,
1524             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1525         )
1526         p.tun_if.add_vpp_config()
1527
1528         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1529
1530         p.tun_protect.add_vpp_config()
1531
1532         p.tun_if.admin_up()
1533         p.tun_if.config_ip4()
1534         config_tun_params(p, self.encryption_type, p.tun_if)
1535
1536         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1537         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1538
1539         self.vapi.cli("clear ipsec sa")
1540
1541     def tearDown(self):
1542         p = self.ipv4_params
1543         p.tun_if.unconfig_ip4()
1544         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1545         self.pg1_11.admin_down()
1546         self.pg1_11.remove_vpp_config()
1547
1548
1549 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1550     """Ipsec GRE TEB ESP - Tra tests"""
1551
1552     tun4_encrypt_node_name = "esp4-encrypt-tun"
1553     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1554     encryption_type = ESP
1555     omac = "00:11:22:33:44:55"
1556
1557     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1558         return [
1559             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1560             / sa.encrypt(
1561                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1562                 / GRE()
1563                 / Ether(dst=self.omac)
1564                 / IP(src="1.1.1.1", dst="1.1.1.2")
1565                 / UDP(sport=1144, dport=2233)
1566                 / Raw(b"X" * payload_size)
1567             )
1568             for i in range(count)
1569         ]
1570
1571     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1572         return [
1573             Ether(dst=self.omac)
1574             / IP(src="1.1.1.1", dst="1.1.1.2")
1575             / UDP(sport=1144, dport=2233)
1576             / Raw(b"X" * payload_size)
1577             for i in range(count)
1578         ]
1579
1580     def verify_decrypted(self, p, rxs):
1581         for rx in rxs:
1582             self.assert_equal(rx[Ether].dst, self.omac)
1583             self.assert_equal(rx[IP].dst, "1.1.1.2")
1584
1585     def verify_encrypted(self, p, sa, rxs):
1586         for rx in rxs:
1587             try:
1588                 pkt = sa.decrypt(rx[IP])
1589                 if not pkt.haslayer(IP):
1590                     pkt = IP(pkt[Raw].load)
1591                 self.assert_packet_checksums_valid(pkt)
1592                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1593                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1594                 self.assertTrue(pkt.haslayer(GRE))
1595                 e = pkt[Ether]
1596                 self.assertEqual(e[Ether].dst, self.omac)
1597                 self.assertEqual(e[IP].dst, "1.1.1.2")
1598             except (IndexError, AssertionError):
1599                 self.logger.debug(ppp("Unexpected packet:", rx))
1600                 try:
1601                     self.logger.debug(ppp("Decrypted packet:", pkt))
1602                 except:
1603                     pass
1604                 raise
1605
1606     def setUp(self):
1607         super(TestIpsecGreTebIfEspTra, self).setUp()
1608
1609         self.tun_if = self.pg0
1610
1611         p = self.ipv4_params
1612
1613         bd1 = VppBridgeDomain(self, 1)
1614         bd1.add_vpp_config()
1615
1616         p.tun_sa_out = VppIpsecSA(
1617             self,
1618             p.vpp_tun_sa_id,
1619             p.vpp_tun_spi,
1620             p.auth_algo_vpp_id,
1621             p.auth_key,
1622             p.crypt_algo_vpp_id,
1623             p.crypt_key,
1624             self.vpp_esp_protocol,
1625         )
1626         p.tun_sa_out.add_vpp_config()
1627
1628         p.tun_sa_in = VppIpsecSA(
1629             self,
1630             p.scapy_tun_sa_id,
1631             p.scapy_tun_spi,
1632             p.auth_algo_vpp_id,
1633             p.auth_key,
1634             p.crypt_algo_vpp_id,
1635             p.crypt_key,
1636             self.vpp_esp_protocol,
1637         )
1638         p.tun_sa_in.add_vpp_config()
1639
1640         p.tun_if = VppGreInterface(
1641             self,
1642             self.pg0.local_ip4,
1643             self.pg0.remote_ip4,
1644             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1645         )
1646         p.tun_if.add_vpp_config()
1647
1648         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1649
1650         p.tun_protect.add_vpp_config()
1651
1652         p.tun_if.admin_up()
1653         p.tun_if.config_ip4()
1654         config_tra_params(p, self.encryption_type, p.tun_if)
1655
1656         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1657         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1658
1659         self.vapi.cli("clear ipsec sa")
1660
1661     def tearDown(self):
1662         p = self.ipv4_params
1663         p.tun_if.unconfig_ip4()
1664         super(TestIpsecGreTebIfEspTra, self).tearDown()
1665
1666
1667 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1668     """Ipsec GRE TEB UDP ESP - Tra tests"""
1669
1670     tun4_encrypt_node_name = "esp4-encrypt-tun"
1671     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1672     encryption_type = ESP
1673     omac = "00:11:22:33:44:55"
1674
1675     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1676         return [
1677             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1678             / sa.encrypt(
1679                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1680                 / GRE()
1681                 / Ether(dst=self.omac)
1682                 / IP(src="1.1.1.1", dst="1.1.1.2")
1683                 / UDP(sport=1144, dport=2233)
1684                 / Raw(b"X" * payload_size)
1685             )
1686             for i in range(count)
1687         ]
1688
1689     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1690         return [
1691             Ether(dst=self.omac)
1692             / IP(src="1.1.1.1", dst="1.1.1.2")
1693             / UDP(sport=1144, dport=2233)
1694             / Raw(b"X" * payload_size)
1695             for i in range(count)
1696         ]
1697
1698     def verify_decrypted(self, p, rxs):
1699         for rx in rxs:
1700             self.assert_equal(rx[Ether].dst, self.omac)
1701             self.assert_equal(rx[IP].dst, "1.1.1.2")
1702
1703     def verify_encrypted(self, p, sa, rxs):
1704         for rx in rxs:
1705             self.assertTrue(rx.haslayer(UDP))
1706             self.assertEqual(rx[UDP].dport, 4545)
1707             self.assertEqual(rx[UDP].sport, 5454)
1708             try:
1709                 pkt = sa.decrypt(rx[IP])
1710                 if not pkt.haslayer(IP):
1711                     pkt = IP(pkt[Raw].load)
1712                 self.assert_packet_checksums_valid(pkt)
1713                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1714                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1715                 self.assertTrue(pkt.haslayer(GRE))
1716                 e = pkt[Ether]
1717                 self.assertEqual(e[Ether].dst, self.omac)
1718                 self.assertEqual(e[IP].dst, "1.1.1.2")
1719             except (IndexError, AssertionError):
1720                 self.logger.debug(ppp("Unexpected packet:", rx))
1721                 try:
1722                     self.logger.debug(ppp("Decrypted packet:", pkt))
1723                 except:
1724                     pass
1725                 raise
1726
1727     def setUp(self):
1728         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1729
1730         self.tun_if = self.pg0
1731
1732         p = self.ipv4_params
1733         p = self.ipv4_params
1734         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1735         p.nat_header = UDP(sport=5454, dport=4545)
1736
1737         bd1 = VppBridgeDomain(self, 1)
1738         bd1.add_vpp_config()
1739
1740         p.tun_sa_out = VppIpsecSA(
1741             self,
1742             p.vpp_tun_sa_id,
1743             p.vpp_tun_spi,
1744             p.auth_algo_vpp_id,
1745             p.auth_key,
1746             p.crypt_algo_vpp_id,
1747             p.crypt_key,
1748             self.vpp_esp_protocol,
1749             flags=p.flags,
1750             udp_src=5454,
1751             udp_dst=4545,
1752         )
1753         p.tun_sa_out.add_vpp_config()
1754
1755         p.tun_sa_in = VppIpsecSA(
1756             self,
1757             p.scapy_tun_sa_id,
1758             p.scapy_tun_spi,
1759             p.auth_algo_vpp_id,
1760             p.auth_key,
1761             p.crypt_algo_vpp_id,
1762             p.crypt_key,
1763             self.vpp_esp_protocol,
1764             flags=(
1765                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1766             ),
1767             udp_src=4545,
1768             udp_dst=5454,
1769         )
1770         p.tun_sa_in.add_vpp_config()
1771
1772         p.tun_if = VppGreInterface(
1773             self,
1774             self.pg0.local_ip4,
1775             self.pg0.remote_ip4,
1776             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1777         )
1778         p.tun_if.add_vpp_config()
1779
1780         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1781
1782         p.tun_protect.add_vpp_config()
1783
1784         p.tun_if.admin_up()
1785         p.tun_if.config_ip4()
1786         config_tra_params(p, self.encryption_type, p.tun_if)
1787
1788         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1789         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1790
1791         self.vapi.cli("clear ipsec sa")
1792         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1793
1794     def tearDown(self):
1795         p = self.ipv4_params
1796         p.tun_if.unconfig_ip4()
1797         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1798
1799
1800 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1801     """Ipsec GRE ESP - TUN tests"""
1802
1803     tun4_encrypt_node_name = "esp4-encrypt-tun"
1804     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1805     encryption_type = ESP
1806
1807     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1808         return [
1809             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1810             / sa.encrypt(
1811                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1812                 / GRE()
1813                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1814                 / UDP(sport=1144, dport=2233)
1815                 / Raw(b"X" * payload_size)
1816             )
1817             for i in range(count)
1818         ]
1819
1820     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1821         return [
1822             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1823             / IP(src="1.1.1.1", dst="1.1.1.2")
1824             / UDP(sport=1144, dport=2233)
1825             / Raw(b"X" * payload_size)
1826             for i in range(count)
1827         ]
1828
1829     def verify_decrypted(self, p, rxs):
1830         for rx in rxs:
1831             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1832             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1833
1834     def verify_encrypted(self, p, sa, rxs):
1835         for rx in rxs:
1836             try:
1837                 pkt = sa.decrypt(rx[IP])
1838                 if not pkt.haslayer(IP):
1839                     pkt = IP(pkt[Raw].load)
1840                 self.assert_packet_checksums_valid(pkt)
1841                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1842                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1843                 self.assertTrue(pkt.haslayer(GRE))
1844                 e = pkt[GRE]
1845                 self.assertEqual(e[IP].dst, "1.1.1.2")
1846             except (IndexError, AssertionError):
1847                 self.logger.debug(ppp("Unexpected packet:", rx))
1848                 try:
1849                     self.logger.debug(ppp("Decrypted packet:", pkt))
1850                 except:
1851                     pass
1852                 raise
1853
1854     def setUp(self):
1855         super(TestIpsecGreIfEsp, self).setUp()
1856
1857         self.tun_if = self.pg0
1858
1859         p = self.ipv4_params
1860
1861         bd1 = VppBridgeDomain(self, 1)
1862         bd1.add_vpp_config()
1863
1864         p.tun_sa_out = VppIpsecSA(
1865             self,
1866             p.vpp_tun_sa_id,
1867             p.vpp_tun_spi,
1868             p.auth_algo_vpp_id,
1869             p.auth_key,
1870             p.crypt_algo_vpp_id,
1871             p.crypt_key,
1872             self.vpp_esp_protocol,
1873             self.pg0.local_ip4,
1874             self.pg0.remote_ip4,
1875         )
1876         p.tun_sa_out.add_vpp_config()
1877
1878         p.tun_sa_in = VppIpsecSA(
1879             self,
1880             p.scapy_tun_sa_id,
1881             p.scapy_tun_spi,
1882             p.auth_algo_vpp_id,
1883             p.auth_key,
1884             p.crypt_algo_vpp_id,
1885             p.crypt_key,
1886             self.vpp_esp_protocol,
1887             self.pg0.remote_ip4,
1888             self.pg0.local_ip4,
1889         )
1890         p.tun_sa_in.add_vpp_config()
1891
1892         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1893         p.tun_if.add_vpp_config()
1894
1895         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1896         p.tun_protect.add_vpp_config()
1897
1898         p.tun_if.admin_up()
1899         p.tun_if.config_ip4()
1900         config_tun_params(p, self.encryption_type, p.tun_if)
1901
1902         VppIpRoute(
1903             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1904         ).add_vpp_config()
1905
1906     def tearDown(self):
1907         p = self.ipv4_params
1908         p.tun_if.unconfig_ip4()
1909         super(TestIpsecGreIfEsp, self).tearDown()
1910
1911
1912 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1913     """Ipsec GRE ESP - TRA tests"""
1914
1915     tun4_encrypt_node_name = "esp4-encrypt-tun"
1916     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1917     encryption_type = ESP
1918
1919     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1920         return [
1921             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1922             / sa.encrypt(
1923                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1924                 / GRE()
1925                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1926                 / UDP(sport=1144, dport=2233)
1927                 / Raw(b"X" * payload_size)
1928             )
1929             for i in range(count)
1930         ]
1931
1932     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1933         return [
1934             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1935             / sa.encrypt(
1936                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1937                 / GRE()
1938                 / UDP(sport=1144, dport=2233)
1939                 / Raw(b"X" * payload_size)
1940             )
1941             for i in range(count)
1942         ]
1943
1944     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1945         return [
1946             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1947             / IP(src="1.1.1.1", dst="1.1.1.2")
1948             / UDP(sport=1144, dport=2233)
1949             / Raw(b"X" * payload_size)
1950             for i in range(count)
1951         ]
1952
1953     def verify_decrypted(self, p, rxs):
1954         for rx in rxs:
1955             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1956             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1957
1958     def verify_encrypted(self, p, sa, rxs):
1959         for rx in rxs:
1960             try:
1961                 pkt = sa.decrypt(rx[IP])
1962                 if not pkt.haslayer(IP):
1963                     pkt = IP(pkt[Raw].load)
1964                 self.assert_packet_checksums_valid(pkt)
1965                 self.assertTrue(pkt.haslayer(GRE))
1966                 e = pkt[GRE]
1967                 self.assertEqual(e[IP].dst, "1.1.1.2")
1968             except (IndexError, AssertionError):
1969                 self.logger.debug(ppp("Unexpected packet:", rx))
1970                 try:
1971                     self.logger.debug(ppp("Decrypted packet:", pkt))
1972                 except:
1973                     pass
1974                 raise
1975
1976     def setUp(self):
1977         super(TestIpsecGreIfEspTra, self).setUp()
1978
1979         self.tun_if = self.pg0
1980
1981         p = self.ipv4_params
1982
1983         p.tun_sa_out = VppIpsecSA(
1984             self,
1985             p.vpp_tun_sa_id,
1986             p.vpp_tun_spi,
1987             p.auth_algo_vpp_id,
1988             p.auth_key,
1989             p.crypt_algo_vpp_id,
1990             p.crypt_key,
1991             self.vpp_esp_protocol,
1992         )
1993         p.tun_sa_out.add_vpp_config()
1994
1995         p.tun_sa_in = VppIpsecSA(
1996             self,
1997             p.scapy_tun_sa_id,
1998             p.scapy_tun_spi,
1999             p.auth_algo_vpp_id,
2000             p.auth_key,
2001             p.crypt_algo_vpp_id,
2002             p.crypt_key,
2003             self.vpp_esp_protocol,
2004         )
2005         p.tun_sa_in.add_vpp_config()
2006
2007         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2008         p.tun_if.add_vpp_config()
2009
2010         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2011         p.tun_protect.add_vpp_config()
2012
2013         p.tun_if.admin_up()
2014         p.tun_if.config_ip4()
2015         config_tra_params(p, self.encryption_type, p.tun_if)
2016
2017         VppIpRoute(
2018             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
2019         ).add_vpp_config()
2020
2021     def tearDown(self):
2022         p = self.ipv4_params
2023         p.tun_if.unconfig_ip4()
2024         super(TestIpsecGreIfEspTra, self).tearDown()
2025
2026     def test_gre_non_ip(self):
2027         p = self.ipv4_params
2028         tx = self.gen_encrypt_non_ip_pkts(
2029             p.scapy_tun_sa,
2030             self.tun_if,
2031             src=p.remote_tun_if_host,
2032             dst=self.pg1.remote_ip6,
2033         )
2034         self.send_and_assert_no_replies(self.tun_if, tx)
2035         node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
2036         self.assertEqual(1, self.statistics.get_err_counter(node_name))
2037         err = p.tun_sa_in.get_err("unsup_payload")
2038         self.assertEqual(err, 1)
2039
2040
2041 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
2042     """Ipsec GRE ESP - TRA tests"""
2043
2044     tun6_encrypt_node_name = "esp6-encrypt-tun"
2045     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2046     encryption_type = ESP
2047
2048     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2049         return [
2050             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2051             / sa.encrypt(
2052                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2053                 / GRE()
2054                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2055                 / UDP(sport=1144, dport=2233)
2056                 / Raw(b"X" * payload_size)
2057             )
2058             for i in range(count)
2059         ]
2060
2061     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2062         return [
2063             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2064             / IPv6(src="1::1", dst="1::2")
2065             / UDP(sport=1144, dport=2233)
2066             / Raw(b"X" * payload_size)
2067             for i in range(count)
2068         ]
2069
2070     def verify_decrypted6(self, p, rxs):
2071         for rx in rxs:
2072             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2073             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2074
2075     def verify_encrypted6(self, p, sa, rxs):
2076         for rx in rxs:
2077             try:
2078                 pkt = sa.decrypt(rx[IPv6])
2079                 if not pkt.haslayer(IPv6):
2080                     pkt = IPv6(pkt[Raw].load)
2081                 self.assert_packet_checksums_valid(pkt)
2082                 self.assertTrue(pkt.haslayer(GRE))
2083                 e = pkt[GRE]
2084                 self.assertEqual(e[IPv6].dst, "1::2")
2085             except (IndexError, AssertionError):
2086                 self.logger.debug(ppp("Unexpected packet:", rx))
2087                 try:
2088                     self.logger.debug(ppp("Decrypted packet:", pkt))
2089                 except:
2090                     pass
2091                 raise
2092
2093     def setUp(self):
2094         super(TestIpsecGre6IfEspTra, self).setUp()
2095
2096         self.tun_if = self.pg0
2097
2098         p = self.ipv6_params
2099
2100         bd1 = VppBridgeDomain(self, 1)
2101         bd1.add_vpp_config()
2102
2103         p.tun_sa_out = VppIpsecSA(
2104             self,
2105             p.vpp_tun_sa_id,
2106             p.vpp_tun_spi,
2107             p.auth_algo_vpp_id,
2108             p.auth_key,
2109             p.crypt_algo_vpp_id,
2110             p.crypt_key,
2111             self.vpp_esp_protocol,
2112         )
2113         p.tun_sa_out.add_vpp_config()
2114
2115         p.tun_sa_in = VppIpsecSA(
2116             self,
2117             p.scapy_tun_sa_id,
2118             p.scapy_tun_spi,
2119             p.auth_algo_vpp_id,
2120             p.auth_key,
2121             p.crypt_algo_vpp_id,
2122             p.crypt_key,
2123             self.vpp_esp_protocol,
2124         )
2125         p.tun_sa_in.add_vpp_config()
2126
2127         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2128         p.tun_if.add_vpp_config()
2129
2130         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2131         p.tun_protect.add_vpp_config()
2132
2133         p.tun_if.admin_up()
2134         p.tun_if.config_ip6()
2135         config_tra_params(p, self.encryption_type, p.tun_if)
2136
2137         r = VppIpRoute(
2138             self,
2139             "1::2",
2140             128,
2141             [
2142                 VppRoutePath(
2143                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2144                 )
2145             ],
2146         )
2147         r.add_vpp_config()
2148
2149     def tearDown(self):
2150         p = self.ipv6_params
2151         p.tun_if.unconfig_ip6()
2152         super(TestIpsecGre6IfEspTra, self).tearDown()
2153
2154
2155 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2156     """Ipsec mGRE ESP v4 TRA tests"""
2157
2158     tun4_encrypt_node_name = "esp4-encrypt-tun"
2159     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2160     encryption_type = ESP
2161
2162     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2163         return [
2164             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2165             / sa.encrypt(
2166                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2167                 / GRE()
2168                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2169                 / UDP(sport=1144, dport=2233)
2170                 / Raw(b"X" * payload_size)
2171             )
2172             for i in range(count)
2173         ]
2174
2175     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2176         return [
2177             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2178             / IP(src="1.1.1.1", dst=dst)
2179             / UDP(sport=1144, dport=2233)
2180             / Raw(b"X" * payload_size)
2181             for i in range(count)
2182         ]
2183
2184     def verify_decrypted(self, p, rxs):
2185         for rx in rxs:
2186             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2187             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2188
2189     def verify_encrypted(self, p, sa, rxs):
2190         for rx in rxs:
2191             try:
2192                 pkt = sa.decrypt(rx[IP])
2193                 if not pkt.haslayer(IP):
2194                     pkt = IP(pkt[Raw].load)
2195                 self.assert_packet_checksums_valid(pkt)
2196                 self.assertTrue(pkt.haslayer(GRE))
2197                 e = pkt[GRE]
2198                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2199             except (IndexError, AssertionError):
2200                 self.logger.debug(ppp("Unexpected packet:", rx))
2201                 try:
2202                     self.logger.debug(ppp("Decrypted packet:", pkt))
2203                 except:
2204                     pass
2205                 raise
2206
2207     def setUp(self):
2208         super(TestIpsecMGreIfEspTra4, self).setUp()
2209
2210         N_NHS = 16
2211         self.tun_if = self.pg0
2212         p = self.ipv4_params
2213         p.tun_if = VppGreInterface(
2214             self,
2215             self.pg0.local_ip4,
2216             "0.0.0.0",
2217             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2218         )
2219         p.tun_if.add_vpp_config()
2220         p.tun_if.admin_up()
2221         p.tun_if.config_ip4()
2222         p.tun_if.generate_remote_hosts(N_NHS)
2223         self.pg0.generate_remote_hosts(N_NHS)
2224         self.pg0.configure_ipv4_neighbors()
2225
2226         # setup some SAs for several next-hops on the interface
2227         self.multi_params = []
2228
2229         for ii in range(N_NHS):
2230             p = copy.copy(self.ipv4_params)
2231
2232             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2233             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2234             p.scapy_tun_spi = p.scapy_tun_spi + ii
2235             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2236             p.vpp_tun_spi = p.vpp_tun_spi + ii
2237
2238             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2239             p.scapy_tra_spi = p.scapy_tra_spi + ii
2240             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2241             p.vpp_tra_spi = p.vpp_tra_spi + ii
2242             p.tun_sa_out = VppIpsecSA(
2243                 self,
2244                 p.vpp_tun_sa_id,
2245                 p.vpp_tun_spi,
2246                 p.auth_algo_vpp_id,
2247                 p.auth_key,
2248                 p.crypt_algo_vpp_id,
2249                 p.crypt_key,
2250                 self.vpp_esp_protocol,
2251             )
2252             p.tun_sa_out.add_vpp_config()
2253
2254             p.tun_sa_in = VppIpsecSA(
2255                 self,
2256                 p.scapy_tun_sa_id,
2257                 p.scapy_tun_spi,
2258                 p.auth_algo_vpp_id,
2259                 p.auth_key,
2260                 p.crypt_algo_vpp_id,
2261                 p.crypt_key,
2262                 self.vpp_esp_protocol,
2263             )
2264             p.tun_sa_in.add_vpp_config()
2265
2266             p.tun_protect = VppIpsecTunProtect(
2267                 self,
2268                 p.tun_if,
2269                 p.tun_sa_out,
2270                 [p.tun_sa_in],
2271                 nh=p.tun_if.remote_hosts[ii].ip4,
2272             )
2273             p.tun_protect.add_vpp_config()
2274             config_tra_params(p, self.encryption_type, p.tun_if)
2275             self.multi_params.append(p)
2276
2277             VppIpRoute(
2278                 self,
2279                 p.remote_tun_if_host,
2280                 32,
2281                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2282             ).add_vpp_config()
2283
2284             # in this v4 variant add the teibs after the protect
2285             p.teib = VppTeib(
2286                 self,
2287                 p.tun_if,
2288                 p.tun_if.remote_hosts[ii].ip4,
2289                 self.pg0.remote_hosts[ii].ip4,
2290             ).add_vpp_config()
2291             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2292         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2293
2294     def tearDown(self):
2295         p = self.ipv4_params
2296         p.tun_if.unconfig_ip4()
2297         super(TestIpsecMGreIfEspTra4, self).tearDown()
2298
2299     def test_tun_44(self):
2300         """mGRE IPSEC 44"""
2301         N_PKTS = 63
2302         for p in self.multi_params:
2303             self.verify_tun_44(p, count=N_PKTS)
2304             p.teib.remove_vpp_config()
2305             self.verify_tun_dropped_44(p, count=N_PKTS)
2306             p.teib.add_vpp_config()
2307             self.verify_tun_44(p, count=N_PKTS)
2308
2309
2310 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2311     """Ipsec mGRE ESP v6 TRA tests"""
2312
2313     tun6_encrypt_node_name = "esp6-encrypt-tun"
2314     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2315     encryption_type = ESP
2316
2317     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2318         return [
2319             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2320             / sa.encrypt(
2321                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2322                 / GRE()
2323                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2324                 / UDP(sport=1144, dport=2233)
2325                 / Raw(b"X" * payload_size)
2326             )
2327             for i in range(count)
2328         ]
2329
2330     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2331         return [
2332             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2333             / IPv6(src="1::1", dst=dst)
2334             / UDP(sport=1144, dport=2233)
2335             / Raw(b"X" * payload_size)
2336             for i in range(count)
2337         ]
2338
2339     def verify_decrypted6(self, p, rxs):
2340         for rx in rxs:
2341             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2342             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2343
2344     def verify_encrypted6(self, p, sa, rxs):
2345         for rx in rxs:
2346             try:
2347                 pkt = sa.decrypt(rx[IPv6])
2348                 if not pkt.haslayer(IPv6):
2349                     pkt = IPv6(pkt[Raw].load)
2350                 self.assert_packet_checksums_valid(pkt)
2351                 self.assertTrue(pkt.haslayer(GRE))
2352                 e = pkt[GRE]
2353                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2354             except (IndexError, AssertionError):
2355                 self.logger.debug(ppp("Unexpected packet:", rx))
2356                 try:
2357                     self.logger.debug(ppp("Decrypted packet:", pkt))
2358                 except:
2359                     pass
2360                 raise
2361
2362     def setUp(self):
2363         super(TestIpsecMGreIfEspTra6, self).setUp()
2364
2365         self.vapi.cli("set logging class ipsec level debug")
2366
2367         N_NHS = 16
2368         self.tun_if = self.pg0
2369         p = self.ipv6_params
2370         p.tun_if = VppGreInterface(
2371             self,
2372             self.pg0.local_ip6,
2373             "::",
2374             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2375         )
2376         p.tun_if.add_vpp_config()
2377         p.tun_if.admin_up()
2378         p.tun_if.config_ip6()
2379         p.tun_if.generate_remote_hosts(N_NHS)
2380         self.pg0.generate_remote_hosts(N_NHS)
2381         self.pg0.configure_ipv6_neighbors()
2382
2383         # setup some SAs for several next-hops on the interface
2384         self.multi_params = []
2385
2386         for ii in range(N_NHS):
2387             p = copy.copy(self.ipv6_params)
2388
2389             p.remote_tun_if_host = "1::%d" % (ii + 1)
2390             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2391             p.scapy_tun_spi = p.scapy_tun_spi + ii
2392             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2393             p.vpp_tun_spi = p.vpp_tun_spi + ii
2394
2395             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2396             p.scapy_tra_spi = p.scapy_tra_spi + ii
2397             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2398             p.vpp_tra_spi = p.vpp_tra_spi + ii
2399             p.tun_sa_out = VppIpsecSA(
2400                 self,
2401                 p.vpp_tun_sa_id,
2402                 p.vpp_tun_spi,
2403                 p.auth_algo_vpp_id,
2404                 p.auth_key,
2405                 p.crypt_algo_vpp_id,
2406                 p.crypt_key,
2407                 self.vpp_esp_protocol,
2408             )
2409             p.tun_sa_out.add_vpp_config()
2410
2411             p.tun_sa_in = VppIpsecSA(
2412                 self,
2413                 p.scapy_tun_sa_id,
2414                 p.scapy_tun_spi,
2415                 p.auth_algo_vpp_id,
2416                 p.auth_key,
2417                 p.crypt_algo_vpp_id,
2418                 p.crypt_key,
2419                 self.vpp_esp_protocol,
2420             )
2421             p.tun_sa_in.add_vpp_config()
2422
2423             # in this v6 variant add the teibs first then the protection
2424             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2425             VppTeib(
2426                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2427             ).add_vpp_config()
2428
2429             p.tun_protect = VppIpsecTunProtect(
2430                 self,
2431                 p.tun_if,
2432                 p.tun_sa_out,
2433                 [p.tun_sa_in],
2434                 nh=p.tun_if.remote_hosts[ii].ip6,
2435             )
2436             p.tun_protect.add_vpp_config()
2437             config_tra_params(p, self.encryption_type, p.tun_if)
2438             self.multi_params.append(p)
2439
2440             VppIpRoute(
2441                 self,
2442                 p.remote_tun_if_host,
2443                 128,
2444                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2445             ).add_vpp_config()
2446             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2447
2448         self.logger.info(self.vapi.cli("sh log"))
2449         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2450         self.logger.info(self.vapi.cli("sh adj 41"))
2451
2452     def tearDown(self):
2453         p = self.ipv6_params
2454         p.tun_if.unconfig_ip6()
2455         super(TestIpsecMGreIfEspTra6, self).tearDown()
2456
2457     def test_tun_66(self):
2458         """mGRE IPSec 66"""
2459         for p in self.multi_params:
2460             self.verify_tun_66(p, count=63)
2461
2462
2463 @tag_fixme_vpp_workers
2464 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2465     """IPsec IPv4 Tunnel protect - transport mode"""
2466
2467     def setUp(self):
2468         super(TestIpsec4TunProtect, self).setUp()
2469
2470         self.tun_if = self.pg0
2471
2472     def tearDown(self):
2473         super(TestIpsec4TunProtect, self).tearDown()
2474
2475     def test_tun_44(self):
2476         """IPSEC tunnel protect"""
2477
2478         p = self.ipv4_params
2479
2480         self.config_network(p)
2481         self.config_sa_tra(p)
2482         self.config_protect(p)
2483
2484         self.verify_tun_44(p, count=127)
2485         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2486         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2487
2488         self.vapi.cli("clear ipsec sa")
2489         self.verify_tun_64(p, count=127)
2490         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2491         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2492
2493         # rekey - create new SAs and update the tunnel protection
2494         np = copy.copy(p)
2495         np.crypt_key = b"X" + p.crypt_key[1:]
2496         np.scapy_tun_spi += 100
2497         np.scapy_tun_sa_id += 1
2498         np.vpp_tun_spi += 100
2499         np.vpp_tun_sa_id += 1
2500         np.tun_if.local_spi = p.vpp_tun_spi
2501         np.tun_if.remote_spi = p.scapy_tun_spi
2502
2503         self.config_sa_tra(np)
2504         self.config_protect(np)
2505         self.unconfig_sa(p)
2506
2507         self.verify_tun_44(np, count=127)
2508         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2509         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2510
2511         # teardown
2512         self.unconfig_protect(np)
2513         self.unconfig_sa(np)
2514         self.unconfig_network(p)
2515
2516
2517 @tag_fixme_vpp_workers
2518 class TestIpsec4TunProtectTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtect):
2519     """IPsec IPv4 Tunnel protect with TFC - transport mode"""
2520
2521
2522 @tag_fixme_vpp_workers
2523 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2524     """IPsec IPv4 UDP Tunnel protect - transport mode"""
2525
2526     def setUp(self):
2527         super(TestIpsec4TunProtectUdp, self).setUp()
2528
2529         self.tun_if = self.pg0
2530
2531         p = self.ipv4_params
2532         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2533         p.nat_header = UDP(sport=4500, dport=4500)
2534         self.config_network(p)
2535         self.config_sa_tra(p)
2536         self.config_protect(p)
2537
2538     def tearDown(self):
2539         p = self.ipv4_params
2540         self.unconfig_protect(p)
2541         self.unconfig_sa(p)
2542         self.unconfig_network(p)
2543         super(TestIpsec4TunProtectUdp, self).tearDown()
2544
2545     def verify_encrypted(self, p, sa, rxs):
2546         # ensure encrypted packets are recieved with the default UDP ports
2547         for rx in rxs:
2548             self.assertEqual(rx[UDP].sport, 4500)
2549             self.assertEqual(rx[UDP].dport, 4500)
2550         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2551
2552     def test_tun_44(self):
2553         """IPSEC UDP tunnel protect"""
2554
2555         p = self.ipv4_params
2556
2557         self.verify_tun_44(p, count=127)
2558         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2559         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2560
2561     def test_keepalive(self):
2562         """IPSEC NAT Keepalive"""
2563         self.verify_keepalive(self.ipv4_params)
2564
2565
2566 @tag_fixme_vpp_workers
2567 class TestIpsec4TunProtectUdpTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtectUdp):
2568     """IPsec IPv4 UDP Tunnel protect with TFC - transport mode"""
2569
2570
2571 @tag_fixme_vpp_workers
2572 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2573     """IPsec IPv4 Tunnel protect - tunnel mode"""
2574
2575     encryption_type = ESP
2576     tun4_encrypt_node_name = "esp4-encrypt-tun"
2577     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2578
2579     def setUp(self):
2580         super(TestIpsec4TunProtectTun, self).setUp()
2581
2582         self.tun_if = self.pg0
2583
2584     def tearDown(self):
2585         super(TestIpsec4TunProtectTun, self).tearDown()
2586
2587     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2588         return [
2589             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2590             / sa.encrypt(
2591                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2592                 / IP(src=src, dst=dst)
2593                 / UDP(sport=1144, dport=2233)
2594                 / Raw(b"X" * payload_size)
2595             )
2596             for i in range(count)
2597         ]
2598
2599     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2600         return [
2601             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2602             / IP(src=src, dst=dst)
2603             / UDP(sport=1144, dport=2233)
2604             / Raw(b"X" * payload_size)
2605             for i in range(count)
2606         ]
2607
2608     def verify_decrypted(self, p, rxs):
2609         for rx in rxs:
2610             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2611             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2612             self.assert_packet_checksums_valid(rx)
2613
2614     def verify_encrypted(self, p, sa, rxs):
2615         for rx in rxs:
2616             try:
2617                 pkt = sa.decrypt(rx[IP])
2618                 if not pkt.haslayer(IP):
2619                     pkt = IP(pkt[Raw].load)
2620                 self.assert_packet_checksums_valid(pkt)
2621                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2622                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2623                 inner = pkt[IP].payload
2624                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2625
2626             except (IndexError, AssertionError):
2627                 self.logger.debug(ppp("Unexpected packet:", rx))
2628                 try:
2629                     self.logger.debug(ppp("Decrypted packet:", pkt))
2630                 except:
2631                     pass
2632                 raise
2633
2634     def test_tun_44(self):
2635         """IPSEC tunnel protect"""
2636
2637         p = self.ipv4_params
2638
2639         self.config_network(p)
2640         self.config_sa_tun(p)
2641         self.config_protect(p)
2642
2643         # also add an output features on the tunnel and physical interface
2644         # so we test they still work
2645         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2646         a = VppAcl(self, [r_all]).add_vpp_config()
2647
2648         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2649         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2650
2651         self.verify_tun_44(p, count=127)
2652
2653         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2654         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2655
2656         # rekey - create new SAs and update the tunnel protection
2657         np = copy.copy(p)
2658         np.crypt_key = b"X" + p.crypt_key[1:]
2659         np.scapy_tun_spi += 100
2660         np.scapy_tun_sa_id += 1
2661         np.vpp_tun_spi += 100
2662         np.vpp_tun_sa_id += 1
2663         np.tun_if.local_spi = p.vpp_tun_spi
2664         np.tun_if.remote_spi = p.scapy_tun_spi
2665
2666         self.config_sa_tun(np)
2667         self.config_protect(np)
2668         self.unconfig_sa(p)
2669
2670         self.verify_tun_44(np, count=127)
2671         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2672         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2673
2674         # teardown
2675         self.unconfig_protect(np)
2676         self.unconfig_sa(np)
2677         self.unconfig_network(p)
2678
2679
2680 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2681     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2682
2683     encryption_type = ESP
2684     tun4_encrypt_node_name = "esp4-encrypt-tun"
2685     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2686
2687     def setUp(self):
2688         super(TestIpsec4TunProtectTunDrop, self).setUp()
2689
2690         self.tun_if = self.pg0
2691
2692     def tearDown(self):
2693         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2694
2695     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2696         return [
2697             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2698             / sa.encrypt(
2699                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2700                 / IP(src=src, dst=dst)
2701                 / UDP(sport=1144, dport=2233)
2702                 / Raw(b"X" * payload_size)
2703             )
2704             for i in range(count)
2705         ]
2706
2707     def test_tun_drop_44(self):
2708         """IPSEC tunnel protect bogus tunnel header"""
2709
2710         p = self.ipv4_params
2711
2712         self.config_network(p)
2713         self.config_sa_tun(p)
2714         self.config_protect(p)
2715
2716         tx = self.gen_encrypt_pkts(
2717             p,
2718             p.scapy_tun_sa,
2719             self.tun_if,
2720             src=p.remote_tun_if_host,
2721             dst=self.pg1.remote_ip4,
2722             count=63,
2723         )
2724         self.send_and_assert_no_replies(self.tun_if, tx)
2725
2726         # teardown
2727         self.unconfig_protect(p)
2728         self.unconfig_sa(p)
2729         self.unconfig_network(p)
2730
2731
2732 @tag_fixme_vpp_workers
2733 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2734     """IPsec IPv6 Tunnel protect - transport mode"""
2735
2736     encryption_type = ESP
2737     tun6_encrypt_node_name = "esp6-encrypt-tun"
2738     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2739
2740     def setUp(self):
2741         super(TestIpsec6TunProtect, self).setUp()
2742
2743         self.tun_if = self.pg0
2744
2745     def tearDown(self):
2746         super(TestIpsec6TunProtect, self).tearDown()
2747
2748     def test_tun_66(self):
2749         """IPSEC tunnel protect 6o6"""
2750
2751         p = self.ipv6_params
2752
2753         self.config_network(p)
2754         self.config_sa_tra(p)
2755         self.config_protect(p)
2756
2757         self.verify_tun_66(p, count=127)
2758         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2759         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2760
2761         # rekey - create new SAs and update the tunnel protection
2762         np = copy.copy(p)
2763         np.crypt_key = b"X" + p.crypt_key[1:]
2764         np.scapy_tun_spi += 100
2765         np.scapy_tun_sa_id += 1
2766         np.vpp_tun_spi += 100
2767         np.vpp_tun_sa_id += 1
2768         np.tun_if.local_spi = p.vpp_tun_spi
2769         np.tun_if.remote_spi = p.scapy_tun_spi
2770
2771         self.config_sa_tra(np)
2772         self.config_protect(np)
2773         self.unconfig_sa(p)
2774
2775         self.verify_tun_66(np, count=127)
2776         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2777         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2778
2779         # bounce the interface state
2780         p.tun_if.admin_down()
2781         self.verify_drop_tun_66(np, count=127)
2782         node = "/err/ipsec6-tun-input/disabled"
2783         self.assertEqual(127, self.statistics.get_err_counter(node))
2784         p.tun_if.admin_up()
2785         self.verify_tun_66(np, count=127)
2786
2787         # 3 phase rekey
2788         #  1) add two input SAs [old, new]
2789         #  2) swap output SA to [new]
2790         #  3) use only [new] input SA
2791         np3 = copy.copy(np)
2792         np3.crypt_key = b"Z" + p.crypt_key[1:]
2793         np3.scapy_tun_spi += 100
2794         np3.scapy_tun_sa_id += 1
2795         np3.vpp_tun_spi += 100
2796         np3.vpp_tun_sa_id += 1
2797         np3.tun_if.local_spi = p.vpp_tun_spi
2798         np3.tun_if.remote_spi = p.scapy_tun_spi
2799
2800         self.config_sa_tra(np3)
2801
2802         # step 1;
2803         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2804         self.verify_tun_66(np, np, count=127)
2805         self.verify_tun_66(np3, np, count=127)
2806
2807         # step 2;
2808         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2809         self.verify_tun_66(np, np3, count=127)
2810         self.verify_tun_66(np3, np3, count=127)
2811
2812         # step 1;
2813         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2814         self.verify_tun_66(np3, np3, count=127)
2815         self.verify_drop_tun_rx_66(np, count=127)
2816
2817         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2818         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2819         self.unconfig_sa(np)
2820
2821         # teardown
2822         self.unconfig_protect(np3)
2823         self.unconfig_sa(np3)
2824         self.unconfig_network(p)
2825
2826     def test_tun_46(self):
2827         """IPSEC tunnel protect 4o6"""
2828
2829         p = self.ipv6_params
2830
2831         self.config_network(p)
2832         self.config_sa_tra(p)
2833         self.config_protect(p)
2834
2835         self.verify_tun_46(p, count=127)
2836         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2837         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2838
2839         # teardown
2840         self.unconfig_protect(p)
2841         self.unconfig_sa(p)
2842         self.unconfig_network(p)
2843
2844
2845 @tag_fixme_vpp_workers
2846 class TestIpsec6TunProtectTfc(TemplateIpsec6TunTfc, TestIpsec6TunProtect):
2847     """IPsec IPv6 Tunnel protect with TFC - transport mode"""
2848
2849
2850 @tag_fixme_vpp_workers
2851 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2852     """IPsec IPv6 Tunnel protect - tunnel mode"""
2853
2854     encryption_type = ESP
2855     tun6_encrypt_node_name = "esp6-encrypt-tun"
2856     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2857
2858     def setUp(self):
2859         super(TestIpsec6TunProtectTun, self).setUp()
2860
2861         self.tun_if = self.pg0
2862
2863     def tearDown(self):
2864         super(TestIpsec6TunProtectTun, self).tearDown()
2865
2866     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2867         return [
2868             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2869             / sa.encrypt(
2870                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2871                 / IPv6(src=src, dst=dst)
2872                 / UDP(sport=1166, dport=2233)
2873                 / Raw(b"X" * payload_size)
2874             )
2875             for i in range(count)
2876         ]
2877
2878     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2879         return [
2880             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2881             / IPv6(src=src, dst=dst)
2882             / UDP(sport=1166, dport=2233)
2883             / Raw(b"X" * payload_size)
2884             for i in range(count)
2885         ]
2886
2887     def verify_decrypted6(self, p, rxs):
2888         for rx in rxs:
2889             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2890             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2891             self.assert_packet_checksums_valid(rx)
2892
2893     def verify_encrypted6(self, p, sa, rxs):
2894         for rx in rxs:
2895             try:
2896                 pkt = sa.decrypt(rx[IPv6])
2897                 if not pkt.haslayer(IPv6):
2898                     pkt = IPv6(pkt[Raw].load)
2899                 self.assert_packet_checksums_valid(pkt)
2900                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2901                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2902                 inner = pkt[IPv6].payload
2903                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2904
2905             except (IndexError, AssertionError):
2906                 self.logger.debug(ppp("Unexpected packet:", rx))
2907                 try:
2908                     self.logger.debug(ppp("Decrypted packet:", pkt))
2909                 except:
2910                     pass
2911                 raise
2912
2913     def test_tun_66(self):
2914         """IPSEC tunnel protect"""
2915
2916         p = self.ipv6_params
2917
2918         self.config_network(p)
2919         self.config_sa_tun(p)
2920         self.config_protect(p)
2921
2922         self.verify_tun_66(p, count=127)
2923
2924         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2925         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2926
2927         # rekey - create new SAs and update the tunnel protection
2928         np = copy.copy(p)
2929         np.crypt_key = b"X" + p.crypt_key[1:]
2930         np.scapy_tun_spi += 100
2931         np.scapy_tun_sa_id += 1
2932         np.vpp_tun_spi += 100
2933         np.vpp_tun_sa_id += 1
2934         np.tun_if.local_spi = p.vpp_tun_spi
2935         np.tun_if.remote_spi = p.scapy_tun_spi
2936
2937         self.config_sa_tun(np)
2938         self.config_protect(np)
2939         self.unconfig_sa(p)
2940
2941         self.verify_tun_66(np, count=127)
2942         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2943         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2944
2945         # teardown
2946         self.unconfig_protect(np)
2947         self.unconfig_sa(np)
2948         self.unconfig_network(p)
2949
2950
2951 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2952     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2953
2954     encryption_type = ESP
2955     tun6_encrypt_node_name = "esp6-encrypt-tun"
2956     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2957
2958     def setUp(self):
2959         super(TestIpsec6TunProtectTunDrop, self).setUp()
2960
2961         self.tun_if = self.pg0
2962
2963     def tearDown(self):
2964         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2965
2966     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2967         # the IP destination of the revelaed packet does not match
2968         # that assigned to the tunnel
2969         return [
2970             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2971             / sa.encrypt(
2972                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2973                 / IPv6(src=src, dst=dst)
2974                 / UDP(sport=1144, dport=2233)
2975                 / Raw(b"X" * payload_size)
2976             )
2977             for i in range(count)
2978         ]
2979
2980     def test_tun_drop_66(self):
2981         """IPSEC 6 tunnel protect bogus tunnel header"""
2982
2983         p = self.ipv6_params
2984
2985         self.config_network(p)
2986         self.config_sa_tun(p)
2987         self.config_protect(p)
2988
2989         tx = self.gen_encrypt_pkts6(
2990             p,
2991             p.scapy_tun_sa,
2992             self.tun_if,
2993             src=p.remote_tun_if_host,
2994             dst=self.pg1.remote_ip6,
2995             count=63,
2996         )
2997         self.send_and_assert_no_replies(self.tun_if, tx)
2998
2999         self.unconfig_protect(p)
3000         self.unconfig_sa(p)
3001         self.unconfig_network(p)
3002
3003
3004 class TemplateIpsecItf4(object):
3005     """IPsec Interface IPv4"""
3006
3007     encryption_type = ESP
3008     tun4_encrypt_node_name = "esp4-encrypt-tun"
3009     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3010     tun4_input_node = "ipsec4-tun-input"
3011
3012     def config_sa_tun(self, p, src, dst):
3013         config_tun_params(p, self.encryption_type, None, src, dst)
3014
3015         p.tun_sa_out = VppIpsecSA(
3016             self,
3017             p.vpp_tun_sa_id,
3018             p.vpp_tun_spi,
3019             p.auth_algo_vpp_id,
3020             p.auth_key,
3021             p.crypt_algo_vpp_id,
3022             p.crypt_key,
3023             self.vpp_esp_protocol,
3024             src,
3025             dst,
3026             flags=p.flags,
3027         )
3028         p.tun_sa_out.add_vpp_config()
3029
3030         p.tun_sa_in = VppIpsecSA(
3031             self,
3032             p.scapy_tun_sa_id,
3033             p.scapy_tun_spi,
3034             p.auth_algo_vpp_id,
3035             p.auth_key,
3036             p.crypt_algo_vpp_id,
3037             p.crypt_key,
3038             self.vpp_esp_protocol,
3039             dst,
3040             src,
3041             flags=p.flags
3042             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
3043         )
3044         p.tun_sa_in.add_vpp_config()
3045
3046     def config_protect(self, p):
3047         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3048         p.tun_protect.add_vpp_config()
3049
3050     def config_network(self, p, instance=0xFFFFFFFF):
3051         p.tun_if = VppIpsecInterface(self, instance=instance)
3052
3053         p.tun_if.add_vpp_config()
3054         p.tun_if.admin_up()
3055         p.tun_if.config_ip4()
3056         p.tun_if.config_ip6()
3057
3058         p.route = VppIpRoute(
3059             self,
3060             p.remote_tun_if_host,
3061             32,
3062             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3063         )
3064         p.route.add_vpp_config()
3065         r = VppIpRoute(
3066             self,
3067             p.remote_tun_if_host6,
3068             128,
3069             [
3070                 VppRoutePath(
3071                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3072                 )
3073             ],
3074         )
3075         r.add_vpp_config()
3076
3077     def unconfig_network(self, p):
3078         p.route.remove_vpp_config()
3079         p.tun_if.remove_vpp_config()
3080
3081     def unconfig_protect(self, p):
3082         p.tun_protect.remove_vpp_config()
3083
3084     def unconfig_sa(self, p):
3085         p.tun_sa_out.remove_vpp_config()
3086         p.tun_sa_in.remove_vpp_config()
3087
3088
3089 @tag_fixme_vpp_workers
3090 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3091     """IPsec Interface IPv4"""
3092
3093     def setUp(self):
3094         super(TestIpsecItf4, self).setUp()
3095
3096         self.tun_if = self.pg0
3097
3098     def tearDown(self):
3099         super(TestIpsecItf4, self).tearDown()
3100
3101     def test_tun_instance_44(self):
3102         p = self.ipv4_params
3103         self.config_network(p, instance=3)
3104
3105         with self.assertRaises(CliFailedCommandError):
3106             self.vapi.cli("show interface ipsec0")
3107
3108         output = self.vapi.cli("show interface ipsec3")
3109         self.assertTrue("unknown" not in output)
3110
3111         self.unconfig_network(p)
3112
3113     def test_tun_44(self):
3114         """IPSEC interface IPv4"""
3115
3116         n_pkts = 127
3117         p = self.ipv4_params
3118
3119         self.config_network(p)
3120         config_tun_params(
3121             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3122         )
3123         self.verify_tun_dropped_44(p, count=n_pkts)
3124         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3125         self.config_protect(p)
3126
3127         self.verify_tun_44(p, count=n_pkts)
3128         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3129         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3130
3131         p.tun_if.admin_down()
3132         self.verify_tun_dropped_44(p, count=n_pkts)
3133         p.tun_if.admin_up()
3134         self.verify_tun_44(p, count=n_pkts)
3135
3136         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3137         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3138
3139         # it's a v6 packet when its encrypted
3140         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3141
3142         self.verify_tun_64(p, count=n_pkts)
3143         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3144         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3145
3146         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3147
3148         # update the SA tunnel
3149         config_tun_params(
3150             p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3151         )
3152         p.tun_sa_in.update_vpp_config(
3153             is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3154         )
3155         p.tun_sa_out.update_vpp_config(
3156             is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3157         )
3158         self.verify_tun_44(p, count=n_pkts)
3159         self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3160         self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3161
3162         self.vapi.cli("clear interfaces")
3163
3164         # rekey - create new SAs and update the tunnel protection
3165         np = copy.copy(p)
3166         np.crypt_key = b"X" + p.crypt_key[1:]
3167         np.scapy_tun_spi += 100
3168         np.scapy_tun_sa_id += 1
3169         np.vpp_tun_spi += 100
3170         np.vpp_tun_sa_id += 1
3171         np.tun_if.local_spi = p.vpp_tun_spi
3172         np.tun_if.remote_spi = p.scapy_tun_spi
3173
3174         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3175         self.config_protect(np)
3176         self.unconfig_sa(p)
3177
3178         self.verify_tun_44(np, count=n_pkts)
3179         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3180         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3181
3182         # teardown
3183         self.unconfig_protect(np)
3184         self.unconfig_sa(np)
3185         self.unconfig_network(p)
3186
3187     def test_tun_44_null(self):
3188         """IPSEC interface IPv4 NULL auth/crypto"""
3189
3190         n_pkts = 127
3191         p = copy.copy(self.ipv4_params)
3192
3193         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3194         p.crypt_algo_vpp_id = (
3195             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3196         )
3197         p.crypt_algo = "NULL"
3198         p.auth_algo = "NULL"
3199
3200         self.config_network(p)
3201         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3202         self.config_protect(p)
3203
3204         self.logger.info(self.vapi.cli("sh ipsec sa"))
3205         self.verify_tun_44(p, count=n_pkts)
3206
3207         # teardown
3208         self.unconfig_protect(p)
3209         self.unconfig_sa(p)
3210         self.unconfig_network(p)
3211
3212     def test_tun_44_police(self):
3213         """IPSEC interface IPv4 with input policer"""
3214         n_pkts = 127
3215         p = self.ipv4_params
3216
3217         self.config_network(p)
3218         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3219         self.config_protect(p)
3220
3221         action_tx = PolicerAction(
3222             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3223         )
3224         policer = VppPolicer(
3225             self,
3226             "pol1",
3227             80,
3228             0,
3229             1000,
3230             0,
3231             conform_action=action_tx,
3232             exceed_action=action_tx,
3233             violate_action=action_tx,
3234         )
3235         policer.add_vpp_config()
3236
3237         # Start policing on tun
3238         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3239
3240         self.verify_tun_44(p, count=n_pkts)
3241         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3242         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3243
3244         stats = policer.get_stats()
3245
3246         # Single rate, 2 colour policer - expect conform, violate but no exceed
3247         self.assertGreater(stats["conform_packets"], 0)
3248         self.assertEqual(stats["exceed_packets"], 0)
3249         self.assertGreater(stats["violate_packets"], 0)
3250
3251         # Stop policing on tun
3252         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3253         self.verify_tun_44(p, count=n_pkts)
3254
3255         # No new policer stats
3256         statsnew = policer.get_stats()
3257         self.assertEqual(stats, statsnew)
3258
3259         # teardown
3260         policer.remove_vpp_config()
3261         self.unconfig_protect(p)
3262         self.unconfig_sa(p)
3263         self.unconfig_network(p)
3264
3265
3266 @tag_fixme_vpp_workers
3267 class TestIpsecItf4Tfc(TemplateIpsec4TunTfc, TestIpsecItf4):
3268     """IPsec Interface IPv4 with TFC"""
3269
3270
3271 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3272     """IPsec Interface MPLSoIPv4"""
3273
3274     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3275
3276     def setUp(self):
3277         super(TestIpsecItf4MPLS, self).setUp()
3278
3279         self.tun_if = self.pg0
3280
3281     def tearDown(self):
3282         super(TestIpsecItf4MPLS, self).tearDown()
3283
3284     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3285         return [
3286             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3287             / sa.encrypt(
3288                 MPLS(label=44, ttl=3)
3289                 / IP(src=src, dst=dst)
3290                 / UDP(sport=1166, dport=2233)
3291                 / Raw(b"X" * payload_size)
3292             )
3293             for i in range(count)
3294         ]
3295
3296     def verify_encrypted(self, p, sa, rxs):
3297         for rx in rxs:
3298             try:
3299                 pkt = sa.decrypt(rx[IP])
3300                 if not pkt.haslayer(IP):
3301                     pkt = IP(pkt[Raw].load)
3302                 self.assert_packet_checksums_valid(pkt)
3303                 self.assert_equal(pkt[MPLS].label, 44)
3304                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3305             except (IndexError, AssertionError):
3306                 self.logger.debug(ppp("Unexpected packet:", rx))
3307                 try:
3308                     self.logger.debug(ppp("Decrypted packet:", pkt))
3309                 except:
3310                     pass
3311                 raise
3312
3313     def test_tun_mpls_o_ip4(self):
3314         """IPSEC interface MPLS over IPv4"""
3315
3316         n_pkts = 127
3317         p = self.ipv4_params
3318         f = FibPathProto
3319
3320         tbl = VppMplsTable(self, 0)
3321         tbl.add_vpp_config()
3322
3323         self.config_network(p)
3324         # deag MPLS routes from the tunnel
3325         r4 = VppMplsRoute(
3326             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3327         ).add_vpp_config()
3328         p.route.modify(
3329             [
3330                 VppRoutePath(
3331                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3332                 )
3333             ]
3334         )
3335         p.tun_if.enable_mpls()
3336
3337         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3338         self.config_protect(p)
3339
3340         self.verify_tun_44(p, count=n_pkts)
3341
3342         # cleanup
3343         p.tun_if.disable_mpls()
3344         self.unconfig_protect(p)
3345         self.unconfig_sa(p)
3346         self.unconfig_network(p)
3347
3348
3349 class TemplateIpsecItf6(object):
3350     """IPsec Interface IPv6"""
3351
3352     encryption_type = ESP
3353     tun6_encrypt_node_name = "esp6-encrypt-tun"
3354     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3355     tun6_input_node = "ipsec6-tun-input"
3356
3357     def config_sa_tun(self, p, src, dst):
3358         config_tun_params(p, self.encryption_type, None, src, dst)
3359
3360         if not hasattr(p, "tun_flags"):
3361             p.tun_flags = None
3362         if not hasattr(p, "hop_limit"):
3363             p.hop_limit = 255
3364
3365         p.tun_sa_out = VppIpsecSA(
3366             self,
3367             p.vpp_tun_sa_id,
3368             p.vpp_tun_spi,
3369             p.auth_algo_vpp_id,
3370             p.auth_key,
3371             p.crypt_algo_vpp_id,
3372             p.crypt_key,
3373             self.vpp_esp_protocol,
3374             src,
3375             dst,
3376             flags=p.flags,
3377             tun_flags=p.tun_flags,
3378             hop_limit=p.hop_limit,
3379         )
3380         p.tun_sa_out.add_vpp_config()
3381
3382         p.tun_sa_in = VppIpsecSA(
3383             self,
3384             p.scapy_tun_sa_id,
3385             p.scapy_tun_spi,
3386             p.auth_algo_vpp_id,
3387             p.auth_key,
3388             p.crypt_algo_vpp_id,
3389             p.crypt_key,
3390             self.vpp_esp_protocol,
3391             dst,
3392             src,
3393             flags=p.flags,
3394         )
3395         p.tun_sa_in.add_vpp_config()
3396
3397     def config_protect(self, p):
3398         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3399         p.tun_protect.add_vpp_config()
3400
3401     def config_network(self, p):
3402         p.tun_if = VppIpsecInterface(self)
3403
3404         p.tun_if.add_vpp_config()
3405         p.tun_if.admin_up()
3406         p.tun_if.config_ip4()
3407         p.tun_if.config_ip6()
3408
3409         r = VppIpRoute(
3410             self,
3411             p.remote_tun_if_host4,
3412             32,
3413             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3414         )
3415         r.add_vpp_config()
3416
3417         p.route = VppIpRoute(
3418             self,
3419             p.remote_tun_if_host,
3420             128,
3421             [
3422                 VppRoutePath(
3423                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3424                 )
3425             ],
3426         )
3427         p.route.add_vpp_config()
3428
3429     def unconfig_network(self, p):
3430         p.route.remove_vpp_config()
3431         p.tun_if.remove_vpp_config()
3432
3433     def unconfig_protect(self, p):
3434         p.tun_protect.remove_vpp_config()
3435
3436     def unconfig_sa(self, p):
3437         p.tun_sa_out.remove_vpp_config()
3438         p.tun_sa_in.remove_vpp_config()
3439
3440
3441 @tag_fixme_vpp_workers
3442 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3443     """IPsec Interface IPv6"""
3444
3445     def setUp(self):
3446         super(TestIpsecItf6, self).setUp()
3447
3448         self.tun_if = self.pg0
3449
3450     def tearDown(self):
3451         super(TestIpsecItf6, self).tearDown()
3452
3453     def test_tun_66(self):
3454         """IPSEC interface IPv6"""
3455
3456         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3457         n_pkts = 127
3458         p = self.ipv6_params
3459         p.inner_hop_limit = 24
3460         p.outer_hop_limit = 23
3461         p.outer_flow_label = 243224
3462         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3463
3464         self.config_network(p)
3465         config_tun_params(
3466             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3467         )
3468         self.verify_drop_tun_66(p, count=n_pkts)
3469         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3470         self.config_protect(p)
3471
3472         self.verify_tun_66(p, count=n_pkts)
3473         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3474         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3475
3476         p.tun_if.admin_down()
3477         self.verify_drop_tun_66(p, count=n_pkts)
3478         p.tun_if.admin_up()
3479         self.verify_tun_66(p, count=n_pkts)
3480
3481         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3482         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3483
3484         # it's a v4 packet when its encrypted
3485         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3486
3487         self.verify_tun_46(p, count=n_pkts)
3488         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3489         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3490
3491         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3492
3493         self.vapi.cli("clear interfaces")
3494
3495         # rekey - create new SAs and update the tunnel protection
3496         np = copy.copy(p)
3497         np.crypt_key = b"X" + p.crypt_key[1:]
3498         np.scapy_tun_spi += 100
3499         np.scapy_tun_sa_id += 1
3500         np.vpp_tun_spi += 100
3501         np.vpp_tun_sa_id += 1
3502         np.tun_if.local_spi = p.vpp_tun_spi
3503         np.tun_if.remote_spi = p.scapy_tun_spi
3504         np.inner_hop_limit = 24
3505         np.outer_hop_limit = 128
3506         np.inner_flow_label = 0xABCDE
3507         np.outer_flow_label = 0xABCDE
3508         np.hop_limit = 128
3509         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3510
3511         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3512         self.config_protect(np)
3513         self.unconfig_sa(p)
3514
3515         self.verify_tun_66(np, count=n_pkts)
3516         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3517         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3518
3519         # teardown
3520         self.unconfig_protect(np)
3521         self.unconfig_sa(np)
3522         self.unconfig_network(p)
3523
3524     def test_tun_66_police(self):
3525         """IPSEC interface IPv6 with input policer"""
3526         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3527         n_pkts = 127
3528         p = self.ipv6_params
3529         p.inner_hop_limit = 24
3530         p.outer_hop_limit = 23
3531         p.outer_flow_label = 243224
3532         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3533
3534         self.config_network(p)
3535         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3536         self.config_protect(p)
3537
3538         action_tx = PolicerAction(
3539             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3540         )
3541         policer = VppPolicer(
3542             self,
3543             "pol1",
3544             80,
3545             0,
3546             1000,
3547             0,
3548             conform_action=action_tx,
3549             exceed_action=action_tx,
3550             violate_action=action_tx,
3551         )
3552         policer.add_vpp_config()
3553
3554         # Start policing on tun
3555         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3556
3557         self.verify_tun_66(p, count=n_pkts)
3558         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3559         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3560
3561         stats = policer.get_stats()
3562
3563         # Single rate, 2 colour policer - expect conform, violate but no exceed
3564         self.assertGreater(stats["conform_packets"], 0)
3565         self.assertEqual(stats["exceed_packets"], 0)
3566         self.assertGreater(stats["violate_packets"], 0)
3567
3568         # Stop policing on tun
3569         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3570         self.verify_tun_66(p, count=n_pkts)
3571
3572         # No new policer stats
3573         statsnew = policer.get_stats()
3574         self.assertEqual(stats, statsnew)
3575
3576         # teardown
3577         policer.remove_vpp_config()
3578         self.unconfig_protect(p)
3579         self.unconfig_sa(p)
3580         self.unconfig_network(p)
3581
3582
3583 @tag_fixme_vpp_workers
3584 class TestIpsecItf6Tfc(TemplateIpsec6TunTfc, TestIpsecItf6):
3585     """IPsec Interface IPv6 with TFC"""
3586
3587
3588 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3589     """Ipsec P2MP ESP v4 tests"""
3590
3591     tun4_encrypt_node_name = "esp4-encrypt-tun"
3592     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3593     encryption_type = ESP
3594
3595     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3596         return [
3597             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3598             / sa.encrypt(
3599                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3600                 / UDP(sport=1144, dport=2233)
3601                 / Raw(b"X" * payload_size)
3602             )
3603             for i in range(count)
3604         ]
3605
3606     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3607         return [
3608             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3609             / IP(src="1.1.1.1", dst=dst)
3610             / UDP(sport=1144, dport=2233)
3611             / Raw(b"X" * payload_size)
3612             for i in range(count)
3613         ]
3614
3615     def verify_decrypted(self, p, rxs):
3616         for rx in rxs:
3617             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3618             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3619
3620     def verify_encrypted(self, p, sa, rxs):
3621         for rx in rxs:
3622             try:
3623                 self.assertEqual(
3624                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3625                 )
3626                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3627                 pkt = sa.decrypt(rx[IP])
3628                 if not pkt.haslayer(IP):
3629                     pkt = IP(pkt[Raw].load)
3630                 self.assert_packet_checksums_valid(pkt)
3631                 e = pkt[IP]
3632                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3633             except (IndexError, AssertionError):
3634                 self.logger.debug(ppp("Unexpected packet:", rx))
3635                 try:
3636                     self.logger.debug(ppp("Decrypted packet:", pkt))
3637                 except:
3638                     pass
3639                 raise
3640
3641     def setUp(self):
3642         super(TestIpsecMIfEsp4, self).setUp()
3643
3644         N_NHS = 16
3645         self.tun_if = self.pg0
3646         p = self.ipv4_params
3647         p.tun_if = VppIpsecInterface(
3648             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3649         )
3650         p.tun_if.add_vpp_config()
3651         p.tun_if.admin_up()
3652         p.tun_if.config_ip4()
3653         p.tun_if.unconfig_ip4()
3654         p.tun_if.config_ip4()
3655         p.tun_if.generate_remote_hosts(N_NHS)
3656         self.pg0.generate_remote_hosts(N_NHS)
3657         self.pg0.configure_ipv4_neighbors()
3658
3659         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3660         a = VppAcl(self, [r_all]).add_vpp_config()
3661
3662         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3663         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3664
3665         # setup some SAs for several next-hops on the interface
3666         self.multi_params = []
3667
3668         for ii in range(N_NHS):
3669             p = copy.copy(self.ipv4_params)
3670
3671             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3672             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3673             p.scapy_tun_spi = p.scapy_tun_spi + ii
3674             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3675             p.vpp_tun_spi = p.vpp_tun_spi + ii
3676
3677             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3678             p.scapy_tra_spi = p.scapy_tra_spi + ii
3679             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3680             p.vpp_tra_spi = p.vpp_tra_spi + ii
3681             p.hop_limit = ii + 10
3682             p.tun_sa_out = VppIpsecSA(
3683                 self,
3684                 p.vpp_tun_sa_id,
3685                 p.vpp_tun_spi,
3686                 p.auth_algo_vpp_id,
3687                 p.auth_key,
3688                 p.crypt_algo_vpp_id,
3689                 p.crypt_key,
3690                 self.vpp_esp_protocol,
3691                 self.pg0.local_ip4,
3692                 self.pg0.remote_hosts[ii].ip4,
3693                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3694                 hop_limit=p.hop_limit,
3695             )
3696             p.tun_sa_out.add_vpp_config()
3697
3698             p.tun_sa_in = VppIpsecSA(
3699                 self,
3700                 p.scapy_tun_sa_id,
3701                 p.scapy_tun_spi,
3702                 p.auth_algo_vpp_id,
3703                 p.auth_key,
3704                 p.crypt_algo_vpp_id,
3705                 p.crypt_key,
3706                 self.vpp_esp_protocol,
3707                 self.pg0.remote_hosts[ii].ip4,
3708                 self.pg0.local_ip4,
3709                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3710                 hop_limit=p.hop_limit,
3711             )
3712             p.tun_sa_in.add_vpp_config()
3713
3714             p.tun_protect = VppIpsecTunProtect(
3715                 self,
3716                 p.tun_if,
3717                 p.tun_sa_out,
3718                 [p.tun_sa_in],
3719                 nh=p.tun_if.remote_hosts[ii].ip4,
3720             )
3721             p.tun_protect.add_vpp_config()
3722             config_tun_params(
3723                 p,
3724                 self.encryption_type,
3725                 None,
3726                 self.pg0.local_ip4,
3727                 self.pg0.remote_hosts[ii].ip4,
3728             )
3729             self.multi_params.append(p)
3730
3731             p.via_tun_route = VppIpRoute(
3732                 self,
3733                 p.remote_tun_if_host,
3734                 32,
3735                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3736             ).add_vpp_config()
3737
3738             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3739
3740     def tearDown(self):
3741         p = self.ipv4_params
3742         p.tun_if.unconfig_ip4()
3743         super(TestIpsecMIfEsp4, self).tearDown()
3744
3745     def test_tun_44(self):
3746         """P2MP IPSEC 44"""
3747         N_PKTS = 63
3748         for p in self.multi_params:
3749             self.verify_tun_44(p, count=N_PKTS)
3750
3751         # remove one tunnel protect, the rest should still work
3752         self.multi_params[0].tun_protect.remove_vpp_config()
3753         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3754         self.multi_params[0].via_tun_route.remove_vpp_config()
3755         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3756
3757         for p in self.multi_params[1:]:
3758             self.verify_tun_44(p, count=N_PKTS)
3759
3760         self.multi_params[0].tun_protect.add_vpp_config()
3761         self.multi_params[0].via_tun_route.add_vpp_config()
3762
3763         for p in self.multi_params:
3764             self.verify_tun_44(p, count=N_PKTS)
3765
3766
3767 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3768     """IPsec Interface MPLSoIPv6"""
3769
3770     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3771
3772     def setUp(self):
3773         super(TestIpsecItf6MPLS, self).setUp()
3774
3775         self.tun_if = self.pg0
3776
3777     def tearDown(self):
3778         super(TestIpsecItf6MPLS, self).tearDown()
3779
3780     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3781         return [
3782             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3783             / sa.encrypt(
3784                 MPLS(label=66, ttl=3)
3785                 / IPv6(src=src, dst=dst)
3786                 / UDP(sport=1166, dport=2233)
3787                 / Raw(b"X" * payload_size)
3788             )
3789             for i in range(count)
3790         ]
3791
3792     def verify_encrypted6(self, p, sa, rxs):
3793         for rx in rxs:
3794             try:
3795                 pkt = sa.decrypt(rx[IPv6])
3796                 if not pkt.haslayer(IPv6):
3797                     pkt = IP(pkt[Raw].load)
3798                 self.assert_packet_checksums_valid(pkt)
3799                 self.assert_equal(pkt[MPLS].label, 66)
3800                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3801             except (IndexError, AssertionError):
3802                 self.logger.debug(ppp("Unexpected packet:", rx))
3803                 try:
3804                     self.logger.debug(ppp("Decrypted packet:", pkt))
3805                 except:
3806                     pass
3807                 raise
3808
3809     def test_tun_mpls_o_ip6(self):
3810         """IPSEC interface MPLS over IPv6"""
3811
3812         n_pkts = 127
3813         p = self.ipv6_params
3814         f = FibPathProto
3815
3816         tbl = VppMplsTable(self, 0)
3817         tbl.add_vpp_config()
3818
3819         self.config_network(p)
3820         # deag MPLS routes from the tunnel
3821         r6 = VppMplsRoute(
3822             self,
3823             66,
3824             1,
3825             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3826             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3827         ).add_vpp_config()
3828         p.route.modify(
3829             [
3830                 VppRoutePath(
3831                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3832                 )
3833             ]
3834         )
3835         p.tun_if.enable_mpls()
3836
3837         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3838         self.config_protect(p)
3839
3840         self.verify_tun_66(p, count=n_pkts)
3841
3842         # cleanup
3843         p.tun_if.disable_mpls()
3844         self.unconfig_protect(p)
3845         self.unconfig_sa(p)
3846         self.unconfig_network(p)
3847
3848
3849 if __name__ == "__main__":
3850     unittest.main(testRunner=VppTestRunner)