hs-test: fix cpu allocator
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP, ICMP
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
10 from scapy.contrib.mpls import MPLS
11 from asfframework import VppTestRunner, tag_fixme_vpp_workers
12 from template_ipsec import (
13     TemplateIpsec,
14     IpsecTun4Tests,
15     IpsecTun6Tests,
16     IpsecTun4,
17     IpsecTun6,
18     IpsecTcpTests,
19     mk_scapy_crypt_key,
20     IpsecTun6HandoffTests,
21     IpsecTun4HandoffTests,
22     config_tun_params,
23 )
24 from vpp_gre_interface import VppGreInterface
25 from vpp_ipip_tun_interface import VppIpIpTunInterface
26 from vpp_ip_route import (
27     VppIpRoute,
28     VppRoutePath,
29     DpoProto,
30     VppMplsLabel,
31     VppMplsTable,
32     VppMplsRoute,
33     FibPathProto,
34 )
35 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
36 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
37 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
38 from vpp_teib import VppTeib
39 from util import ppp
40 from vpp_papi import VppEnum
41 from vpp_papi_provider import CliFailedCommandError
42 from vpp_acl import AclRule, VppAcl, VppAclInterface
43 from vpp_policer import PolicerAction, VppPolicer, Dir
44
45
46 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
47     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
48     esn_en = bool(
49         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
50     )
51     crypt_key = mk_scapy_crypt_key(p)
52     if tun_if:
53         p.tun_dst = tun_if.remote_ip
54         p.tun_src = tun_if.local_ip
55     else:
56         p.tun_dst = dst
57         p.tun_src = src
58
59     if p.nat_header:
60         is_default_port = p.nat_header.dport == 4500
61     else:
62         is_default_port = True
63
64     if is_default_port:
65         outbound_nat_header = p.nat_header
66     else:
67         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
68         bind_layers(UDP, ESP, dport=p.nat_header.dport)
69
70     p.scapy_tun_sa = SecurityAssociation(
71         encryption_type,
72         spi=p.scapy_tun_spi,
73         crypt_algo=p.crypt_algo,
74         crypt_key=crypt_key,
75         auth_algo=p.auth_algo,
76         auth_key=p.auth_key,
77         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
78         nat_t_header=outbound_nat_header,
79         esn_en=esn_en,
80     )
81     p.vpp_tun_sa = SecurityAssociation(
82         encryption_type,
83         spi=p.vpp_tun_spi,
84         crypt_algo=p.crypt_algo,
85         crypt_key=crypt_key,
86         auth_algo=p.auth_algo,
87         auth_key=p.auth_key,
88         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
89         nat_t_header=p.nat_header,
90         esn_en=esn_en,
91     )
92
93
94 def config_tra_params(p, encryption_type, tun_if):
95     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
96     esn_en = bool(
97         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
98     )
99     crypt_key = mk_scapy_crypt_key(p)
100     p.tun_dst = tun_if.remote_ip
101     p.tun_src = tun_if.local_ip
102
103     if p.nat_header:
104         is_default_port = p.nat_header.dport == 4500
105     else:
106         is_default_port = True
107
108     if is_default_port:
109         outbound_nat_header = p.nat_header
110     else:
111         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
112         bind_layers(UDP, ESP, dport=p.nat_header.dport)
113
114     p.scapy_tun_sa = SecurityAssociation(
115         encryption_type,
116         spi=p.scapy_tun_spi,
117         crypt_algo=p.crypt_algo,
118         crypt_key=crypt_key,
119         auth_algo=p.auth_algo,
120         auth_key=p.auth_key,
121         esn_en=esn_en,
122         nat_t_header=outbound_nat_header,
123     )
124     p.vpp_tun_sa = SecurityAssociation(
125         encryption_type,
126         spi=p.vpp_tun_spi,
127         crypt_algo=p.crypt_algo,
128         crypt_key=crypt_key,
129         auth_algo=p.auth_algo,
130         auth_key=p.auth_key,
131         esn_en=esn_en,
132         nat_t_header=p.nat_header,
133     )
134
135
136 class TemplateIpsec4TunProtect(object):
137     """IPsec IPv4 Tunnel protect"""
138
139     encryption_type = ESP
140     tun4_encrypt_node_name = "esp4-encrypt-tun"
141     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
142     tun4_input_node = "ipsec4-tun-input"
143
144     def config_sa_tra(self, p):
145         config_tun_params(p, self.encryption_type, p.tun_if)
146
147         p.tun_sa_out = VppIpsecSA(
148             self,
149             p.vpp_tun_sa_id,
150             p.vpp_tun_spi,
151             p.auth_algo_vpp_id,
152             p.auth_key,
153             p.crypt_algo_vpp_id,
154             p.crypt_key,
155             self.vpp_esp_protocol,
156             flags=p.flags,
157         )
158         p.tun_sa_out.add_vpp_config()
159
160         p.tun_sa_in = VppIpsecSA(
161             self,
162             p.scapy_tun_sa_id,
163             p.scapy_tun_spi,
164             p.auth_algo_vpp_id,
165             p.auth_key,
166             p.crypt_algo_vpp_id,
167             p.crypt_key,
168             self.vpp_esp_protocol,
169             flags=p.flags,
170         )
171         p.tun_sa_in.add_vpp_config()
172
173     def config_sa_tun(self, p):
174         config_tun_params(p, self.encryption_type, p.tun_if)
175
176         p.tun_sa_out = VppIpsecSA(
177             self,
178             p.vpp_tun_sa_id,
179             p.vpp_tun_spi,
180             p.auth_algo_vpp_id,
181             p.auth_key,
182             p.crypt_algo_vpp_id,
183             p.crypt_key,
184             self.vpp_esp_protocol,
185             self.tun_if.local_addr[p.addr_type],
186             self.tun_if.remote_addr[p.addr_type],
187             flags=p.flags,
188         )
189         p.tun_sa_out.add_vpp_config()
190
191         p.tun_sa_in = VppIpsecSA(
192             self,
193             p.scapy_tun_sa_id,
194             p.scapy_tun_spi,
195             p.auth_algo_vpp_id,
196             p.auth_key,
197             p.crypt_algo_vpp_id,
198             p.crypt_key,
199             self.vpp_esp_protocol,
200             self.tun_if.remote_addr[p.addr_type],
201             self.tun_if.local_addr[p.addr_type],
202             flags=p.flags,
203         )
204         p.tun_sa_in.add_vpp_config()
205
206     def config_protect(self, p):
207         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
208         p.tun_protect.add_vpp_config()
209
210     def config_network(self, p):
211         if hasattr(p, "tun_dst"):
212             tun_dst = p.tun_dst
213         else:
214             tun_dst = self.pg0.remote_ip4
215         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
216         p.tun_if.add_vpp_config()
217         p.tun_if.admin_up()
218         p.tun_if.config_ip4()
219         p.tun_if.config_ip6()
220
221         p.route = VppIpRoute(
222             self,
223             p.remote_tun_if_host,
224             32,
225             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
226         )
227         p.route.add_vpp_config()
228         r = VppIpRoute(
229             self,
230             p.remote_tun_if_host6,
231             128,
232             [
233                 VppRoutePath(
234                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
235                 )
236             ],
237         )
238         r.add_vpp_config()
239
240     def unconfig_network(self, p):
241         p.route.remove_vpp_config()
242         p.tun_if.remove_vpp_config()
243
244     def unconfig_protect(self, p):
245         p.tun_protect.remove_vpp_config()
246
247     def unconfig_sa(self, p):
248         p.tun_sa_out.remove_vpp_config()
249         p.tun_sa_in.remove_vpp_config()
250
251
252 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
253     """IPsec tunnel interface tests"""
254
255     encryption_type = ESP
256
257     @classmethod
258     def setUpClass(cls):
259         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
260
261     @classmethod
262     def tearDownClass(cls):
263         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
264
265     def setUp(self):
266         super(TemplateIpsec4TunIfEsp, self).setUp()
267
268         self.tun_if = self.pg0
269
270         p = self.ipv4_params
271
272         self.config_network(p)
273         self.config_sa_tra(p)
274         self.config_protect(p)
275
276     def tearDown(self):
277         super(TemplateIpsec4TunIfEsp, self).tearDown()
278
279
280 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
281     """IPsec UDP tunnel interface tests"""
282
283     tun4_encrypt_node_name = "esp4-encrypt-tun"
284     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
285     encryption_type = ESP
286
287     @classmethod
288     def setUpClass(cls):
289         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
290
291     @classmethod
292     def tearDownClass(cls):
293         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
294
295     def verify_encrypted(self, p, sa, rxs):
296         for rx in rxs:
297             try:
298                 # ensure the UDP ports are correct before we decrypt
299                 # which strips them
300                 self.assertTrue(rx.haslayer(UDP))
301                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
302                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
303
304                 pkt = sa.decrypt(rx[IP])
305                 if not pkt.haslayer(IP):
306                     pkt = IP(pkt[Raw].load)
307
308                 self.assert_packet_checksums_valid(pkt)
309                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
310                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
311             except (IndexError, AssertionError):
312                 self.logger.debug(ppp("Unexpected packet:", rx))
313                 try:
314                     self.logger.debug(ppp("Decrypted packet:", pkt))
315                 except:
316                     pass
317                 raise
318
319     def config_sa_tra(self, p):
320         config_tun_params(p, self.encryption_type, p.tun_if)
321
322         p.tun_sa_out = VppIpsecSA(
323             self,
324             p.vpp_tun_sa_id,
325             p.vpp_tun_spi,
326             p.auth_algo_vpp_id,
327             p.auth_key,
328             p.crypt_algo_vpp_id,
329             p.crypt_key,
330             self.vpp_esp_protocol,
331             flags=p.flags,
332             udp_src=p.nat_header.sport,
333             udp_dst=p.nat_header.dport,
334         )
335         p.tun_sa_out.add_vpp_config()
336
337         p.tun_sa_in = VppIpsecSA(
338             self,
339             p.scapy_tun_sa_id,
340             p.scapy_tun_spi,
341             p.auth_algo_vpp_id,
342             p.auth_key,
343             p.crypt_algo_vpp_id,
344             p.crypt_key,
345             self.vpp_esp_protocol,
346             flags=p.flags
347             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
348             udp_src=p.nat_header.sport,
349             udp_dst=p.nat_header.dport,
350         )
351         p.tun_sa_in.add_vpp_config()
352
353     def setUp(self):
354         super(TemplateIpsec4TunIfEspUdp, self).setUp()
355
356         p = self.ipv4_params
357         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358         p.nat_header = UDP(sport=5454, dport=4500)
359
360         self.tun_if = self.pg0
361
362         self.config_network(p)
363         self.config_sa_tra(p)
364         self.config_protect(p)
365
366     def tearDown(self):
367         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
368
369
370 class TemplateIpsec4TunTfc:
371     """IPsec IPv4 tunnel with TFC"""
372
373     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
374         pkt = (
375             IP(src=src, dst=dst, len=28 + payload_size)
376             / ICMP()
377             / Raw(b"X" * payload_size)
378             / Padding(b"Y" * 100)
379         )
380         return [
381             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(pkt)
382             for i in range(count)
383         ]
384
385     def verify_decrypted(self, p, rxs):
386         for rx in rxs:
387             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
388             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
389             self.assert_equal(rx[IP].len, len(rx[IP]))
390             self.assert_packet_checksums_valid(rx)
391
392
393 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
394     """Ipsec ESP - TUN tests"""
395
396     tun4_encrypt_node_name = "esp4-encrypt-tun"
397     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
398
399     def test_tun_basic64(self):
400         """ipsec 6o4 tunnel basic test"""
401         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
402
403         self.verify_tun_64(self.params[socket.AF_INET], count=1)
404
405     def test_tun_burst64(self):
406         """ipsec 6o4 tunnel basic test"""
407         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
408
409         self.verify_tun_64(self.params[socket.AF_INET], count=257)
410
411     def test_tun_basic_frag44(self):
412         """ipsec 4o4 tunnel frag basic test"""
413         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
414
415         p = self.ipv4_params
416
417         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
418         self.verify_tun_44(
419             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
420         )
421         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
422
423
424 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
425     """Ipsec ESP UDP tests"""
426
427     tun4_input_node = "ipsec4-tun-input"
428
429     def setUp(self):
430         super(TestIpsec4TunIfEspUdp, self).setUp()
431
432     def test_keepalive(self):
433         """IPSEC NAT Keepalive"""
434         self.verify_keepalive(self.ipv4_params)
435
436
437 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
438     """Ipsec ESP UDP GCM tests"""
439
440     tun4_input_node = "ipsec4-tun-input"
441
442     def setUp(self):
443         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
444         p = self.ipv4_params
445         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
446         p.crypt_algo_vpp_id = (
447             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
448         )
449         p.crypt_algo = "AES-GCM"
450         p.auth_algo = "NULL"
451         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
452         p.salt = 0
453
454
455 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
456     """Ipsec ESP UDP update tests"""
457
458     tun4_input_node = "ipsec4-tun-input"
459
460     def setUp(self):
461         super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
462         p = self.ipv4_params
463         p.nat_header = UDP(sport=6565, dport=7676)
464         config_tun_params(p, self.encryption_type, p.tun_if)
465         p.tun_sa_in.update_vpp_config(
466             udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
467         )
468         p.tun_sa_out.update_vpp_config(
469             udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
470         )
471
472
473 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
474     """Ipsec ESP - TCP tests"""
475
476     pass
477
478
479 class TemplateIpsec6TunProtect(object):
480     """IPsec IPv6 Tunnel protect"""
481
482     def config_sa_tra(self, p):
483         config_tun_params(p, self.encryption_type, p.tun_if)
484
485         p.tun_sa_out = VppIpsecSA(
486             self,
487             p.vpp_tun_sa_id,
488             p.vpp_tun_spi,
489             p.auth_algo_vpp_id,
490             p.auth_key,
491             p.crypt_algo_vpp_id,
492             p.crypt_key,
493             self.vpp_esp_protocol,
494         )
495         p.tun_sa_out.add_vpp_config()
496
497         p.tun_sa_in = VppIpsecSA(
498             self,
499             p.scapy_tun_sa_id,
500             p.scapy_tun_spi,
501             p.auth_algo_vpp_id,
502             p.auth_key,
503             p.crypt_algo_vpp_id,
504             p.crypt_key,
505             self.vpp_esp_protocol,
506         )
507         p.tun_sa_in.add_vpp_config()
508
509     def config_sa_tun(self, p):
510         config_tun_params(p, self.encryption_type, p.tun_if)
511
512         p.tun_sa_out = VppIpsecSA(
513             self,
514             p.vpp_tun_sa_id,
515             p.vpp_tun_spi,
516             p.auth_algo_vpp_id,
517             p.auth_key,
518             p.crypt_algo_vpp_id,
519             p.crypt_key,
520             self.vpp_esp_protocol,
521             self.tun_if.local_addr[p.addr_type],
522             self.tun_if.remote_addr[p.addr_type],
523         )
524         p.tun_sa_out.add_vpp_config()
525
526         p.tun_sa_in = VppIpsecSA(
527             self,
528             p.scapy_tun_sa_id,
529             p.scapy_tun_spi,
530             p.auth_algo_vpp_id,
531             p.auth_key,
532             p.crypt_algo_vpp_id,
533             p.crypt_key,
534             self.vpp_esp_protocol,
535             self.tun_if.remote_addr[p.addr_type],
536             self.tun_if.local_addr[p.addr_type],
537         )
538         p.tun_sa_in.add_vpp_config()
539
540     def config_protect(self, p):
541         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
542         p.tun_protect.add_vpp_config()
543
544     def config_network(self, p):
545         if hasattr(p, "tun_dst"):
546             tun_dst = p.tun_dst
547         else:
548             tun_dst = self.pg0.remote_ip6
549         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
550         p.tun_if.add_vpp_config()
551         p.tun_if.admin_up()
552         p.tun_if.config_ip6()
553         p.tun_if.config_ip4()
554
555         p.route = VppIpRoute(
556             self,
557             p.remote_tun_if_host,
558             128,
559             [
560                 VppRoutePath(
561                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
562                 )
563             ],
564         )
565         p.route.add_vpp_config()
566         r = VppIpRoute(
567             self,
568             p.remote_tun_if_host4,
569             32,
570             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
571         )
572         r.add_vpp_config()
573
574     def unconfig_network(self, p):
575         p.route.remove_vpp_config()
576         p.tun_if.remove_vpp_config()
577
578     def unconfig_protect(self, p):
579         p.tun_protect.remove_vpp_config()
580
581     def unconfig_sa(self, p):
582         p.tun_sa_out.remove_vpp_config()
583         p.tun_sa_in.remove_vpp_config()
584
585
586 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
587     """IPsec tunnel interface tests"""
588
589     encryption_type = ESP
590
591     def setUp(self):
592         super(TemplateIpsec6TunIfEsp, self).setUp()
593
594         self.tun_if = self.pg0
595
596         p = self.ipv6_params
597         self.config_network(p)
598         self.config_sa_tra(p)
599         self.config_protect(p)
600
601     def tearDown(self):
602         super(TemplateIpsec6TunIfEsp, self).tearDown()
603
604
605 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
606     """IPsec6 UDP tunnel interface tests"""
607
608     tun4_encrypt_node_name = "esp6-encrypt-tun"
609     tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
610     encryption_type = ESP
611
612     @classmethod
613     def setUpClass(cls):
614         super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
615
616     @classmethod
617     def tearDownClass(cls):
618         super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
619
620     def verify_encrypted(self, p, sa, rxs):
621         for rx in rxs:
622             try:
623                 # ensure the UDP ports are correct before we decrypt
624                 # which strips them
625                 self.assertTrue(rx.haslayer(UDP))
626                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
627                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
628
629                 pkt = sa.decrypt(rx[IP])
630                 if not pkt.haslayer(IP):
631                     pkt = IP(pkt[Raw].load)
632
633                 self.assert_packet_checksums_valid(pkt)
634                 self.assert_equal(
635                     pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
636                 )
637                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
638             except (IndexError, AssertionError):
639                 self.logger.debug(ppp("Unexpected packet:", rx))
640                 try:
641                     self.logger.debug(ppp("Decrypted packet:", pkt))
642                 except:
643                     pass
644                 raise
645
646     def config_sa_tra(self, p):
647         config_tun_params(p, self.encryption_type, p.tun_if)
648
649         p.tun_sa_out = VppIpsecSA(
650             self,
651             p.vpp_tun_sa_id,
652             p.vpp_tun_spi,
653             p.auth_algo_vpp_id,
654             p.auth_key,
655             p.crypt_algo_vpp_id,
656             p.crypt_key,
657             self.vpp_esp_protocol,
658             flags=p.flags,
659             udp_src=p.nat_header.sport,
660             udp_dst=p.nat_header.dport,
661         )
662         p.tun_sa_out.add_vpp_config()
663
664         p.tun_sa_in = VppIpsecSA(
665             self,
666             p.scapy_tun_sa_id,
667             p.scapy_tun_spi,
668             p.auth_algo_vpp_id,
669             p.auth_key,
670             p.crypt_algo_vpp_id,
671             p.crypt_key,
672             self.vpp_esp_protocol,
673             flags=p.flags
674             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
675             udp_src=p.nat_header.sport,
676             udp_dst=p.nat_header.dport,
677         )
678         p.tun_sa_in.add_vpp_config()
679
680     def setUp(self):
681         super(TemplateIpsec6TunIfEspUdp, self).setUp()
682
683         p = self.ipv6_params
684         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
685         p.nat_header = UDP(sport=5454, dport=4500)
686
687         self.tun_if = self.pg0
688
689         self.config_network(p)
690         self.config_sa_tra(p)
691         self.config_protect(p)
692
693     def tearDown(self):
694         super(TemplateIpsec6TunIfEspUdp, self).tearDown()
695
696
697 class TemplateIpsec6TunTfc:
698     """IPsec IPv6 tunnel with TFC"""
699
700     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
701         return [
702             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
703             / sa.encrypt(
704                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
705                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
706                 / Padding(b"Y" * 100)
707             )
708             for i in range(count)
709         ]
710
711     def verify_decrypted6(self, p, rxs):
712         for rx in rxs:
713             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
714             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
715             self.assert_equal(rx[IPv6].plen, len(rx[IPv6].payload))
716             self.assert_packet_checksums_valid(rx)
717
718
719 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
720     """Ipsec ESP 6 UDP tests"""
721
722     tun6_input_node = "ipsec6-tun-input"
723     tun6_encrypt_node_name = "esp6-encrypt-tun"
724     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
725
726     def setUp(self):
727         super(TestIpsec6TunIfEspUdp, self).setUp()
728
729     def test_keepalive(self):
730         """IPSEC6 NAT Keepalive"""
731         self.verify_keepalive(self.ipv6_params)
732
733
734 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
735     """Ipsec ESP 6 UDP GCM tests"""
736
737     tun6_input_node = "ipsec6-tun-input"
738     tun6_encrypt_node_name = "esp6-encrypt-tun"
739     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
740
741     def setUp(self):
742         super(TestIpsec6TunIfEspUdpGCM, self).setUp()
743         p = self.ipv6_params
744         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
745         p.crypt_algo_vpp_id = (
746             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
747         )
748         p.crypt_algo = "AES-GCM"
749         p.auth_algo = "NULL"
750         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
751         p.salt = 0
752
753
754 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
755     """Ipsec ESP - TUN tests"""
756
757     tun6_encrypt_node_name = "esp6-encrypt-tun"
758     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
759
760     def test_tun_basic46(self):
761         """ipsec 4o6 tunnel basic test"""
762         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
763         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
764
765     def test_tun_burst46(self):
766         """ipsec 4o6 tunnel burst test"""
767         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
768         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
769
770
771 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
772     """Ipsec ESP 6 Handoff tests"""
773
774     tun6_encrypt_node_name = "esp6-encrypt-tun"
775     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
776
777     def test_tun_handoff_66_police(self):
778         """ESP 6o6 tunnel with policer worker hand-off test"""
779         self.vapi.cli("clear errors")
780         self.vapi.cli("clear ipsec sa")
781
782         N_PKTS = 15
783         p = self.params[socket.AF_INET6]
784
785         action_tx = PolicerAction(
786             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
787         )
788         policer = VppPolicer(
789             self,
790             "pol1",
791             80,
792             0,
793             1000,
794             0,
795             conform_action=action_tx,
796             exceed_action=action_tx,
797             violate_action=action_tx,
798         )
799         policer.add_vpp_config()
800
801         # Start policing on tun
802         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
803
804         for pol_bind in [1, 0]:
805             policer.bind_vpp_config(pol_bind, True)
806
807             # inject alternately on worker 0 and 1.
808             for worker in [0, 1, 0, 1]:
809                 send_pkts = self.gen_encrypt_pkts6(
810                     p,
811                     p.scapy_tun_sa,
812                     self.tun_if,
813                     src=p.remote_tun_if_host,
814                     dst=self.pg1.remote_ip6,
815                     count=N_PKTS,
816                 )
817                 recv_pkts = self.send_and_expect(
818                     self.tun_if, send_pkts, self.pg1, worker=worker
819                 )
820                 self.verify_decrypted6(p, recv_pkts)
821                 self.logger.debug(self.vapi.cli("show trace max 100"))
822
823             stats = policer.get_stats()
824             stats0 = policer.get_stats(worker=0)
825             stats1 = policer.get_stats(worker=1)
826
827             if pol_bind == 1:
828                 # First pass: Worker 1, should have done all the policing
829                 self.assertEqual(stats, stats1)
830
831                 # Worker 0, should have handed everything off
832                 self.assertEqual(stats0["conform_packets"], 0)
833                 self.assertEqual(stats0["exceed_packets"], 0)
834                 self.assertEqual(stats0["violate_packets"], 0)
835             else:
836                 # Second pass: both workers should have policed equal amounts
837                 self.assertGreater(stats1["conform_packets"], 0)
838                 self.assertEqual(stats1["exceed_packets"], 0)
839                 self.assertGreater(stats1["violate_packets"], 0)
840
841                 self.assertGreater(stats0["conform_packets"], 0)
842                 self.assertEqual(stats0["exceed_packets"], 0)
843                 self.assertGreater(stats0["violate_packets"], 0)
844
845                 self.assertEqual(
846                     stats0["conform_packets"] + stats0["violate_packets"],
847                     stats1["conform_packets"] + stats1["violate_packets"],
848                 )
849
850         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
851         policer.remove_vpp_config()
852
853
854 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
855     """Ipsec ESP 4 Handoff tests"""
856
857     tun4_encrypt_node_name = "esp4-encrypt-tun"
858     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
859
860     def test_tun_handoff_44_police(self):
861         """ESP 4o4 tunnel with policer worker hand-off test"""
862         self.vapi.cli("clear errors")
863         self.vapi.cli("clear ipsec sa")
864
865         N_PKTS = 15
866         p = self.params[socket.AF_INET]
867
868         action_tx = PolicerAction(
869             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
870         )
871         policer = VppPolicer(
872             self,
873             "pol1",
874             80,
875             0,
876             1000,
877             0,
878             conform_action=action_tx,
879             exceed_action=action_tx,
880             violate_action=action_tx,
881         )
882         policer.add_vpp_config()
883
884         # Start policing on tun
885         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
886
887         for pol_bind in [1, 0]:
888             policer.bind_vpp_config(pol_bind, True)
889
890             # inject alternately on worker 0 and 1.
891             for worker in [0, 1, 0, 1]:
892                 send_pkts = self.gen_encrypt_pkts(
893                     p,
894                     p.scapy_tun_sa,
895                     self.tun_if,
896                     src=p.remote_tun_if_host,
897                     dst=self.pg1.remote_ip4,
898                     count=N_PKTS,
899                 )
900                 recv_pkts = self.send_and_expect(
901                     self.tun_if, send_pkts, self.pg1, worker=worker
902                 )
903                 self.verify_decrypted(p, recv_pkts)
904                 self.logger.debug(self.vapi.cli("show trace max 100"))
905
906             stats = policer.get_stats()
907             stats0 = policer.get_stats(worker=0)
908             stats1 = policer.get_stats(worker=1)
909
910             if pol_bind == 1:
911                 # First pass: Worker 1, should have done all the policing
912                 self.assertEqual(stats, stats1)
913
914                 # Worker 0, should have handed everything off
915                 self.assertEqual(stats0["conform_packets"], 0)
916                 self.assertEqual(stats0["exceed_packets"], 0)
917                 self.assertEqual(stats0["violate_packets"], 0)
918             else:
919                 # Second pass: both workers should have policed equal amounts
920                 self.assertGreater(stats1["conform_packets"], 0)
921                 self.assertEqual(stats1["exceed_packets"], 0)
922                 self.assertGreater(stats1["violate_packets"], 0)
923
924                 self.assertGreater(stats0["conform_packets"], 0)
925                 self.assertEqual(stats0["exceed_packets"], 0)
926                 self.assertGreater(stats0["violate_packets"], 0)
927
928                 self.assertEqual(
929                     stats0["conform_packets"] + stats0["violate_packets"],
930                     stats1["conform_packets"] + stats1["violate_packets"],
931                 )
932
933         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
934         policer.remove_vpp_config()
935
936
937 @tag_fixme_vpp_workers
938 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
939     """IPsec IPv4 Multi Tunnel interface"""
940
941     encryption_type = ESP
942     tun4_encrypt_node_name = "esp4-encrypt-tun"
943     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
944
945     def setUp(self):
946         super(TestIpsec4MultiTunIfEsp, self).setUp()
947
948         self.tun_if = self.pg0
949
950         self.multi_params = []
951         self.pg0.generate_remote_hosts(10)
952         self.pg0.configure_ipv4_neighbors()
953
954         for ii in range(10):
955             p = copy.copy(self.ipv4_params)
956
957             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
958             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
959             p.scapy_tun_spi = p.scapy_tun_spi + ii
960             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
961             p.vpp_tun_spi = p.vpp_tun_spi + ii
962
963             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
964             p.scapy_tra_spi = p.scapy_tra_spi + ii
965             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
966             p.vpp_tra_spi = p.vpp_tra_spi + ii
967             p.tun_dst = self.pg0.remote_hosts[ii].ip4
968
969             self.multi_params.append(p)
970             self.config_network(p)
971             self.config_sa_tra(p)
972             self.config_protect(p)
973
974     def tearDown(self):
975         super(TestIpsec4MultiTunIfEsp, self).tearDown()
976
977     def test_tun_44(self):
978         """Multiple IPSEC tunnel interfaces"""
979         for p in self.multi_params:
980             self.verify_tun_44(p, count=127)
981             self.assertEqual(p.tun_if.get_rx_stats(), 127)
982             self.assertEqual(p.tun_if.get_tx_stats(), 127)
983
984     def test_tun_rr_44(self):
985         """Round-robin packets acrros multiple interface"""
986         tx = []
987         for p in self.multi_params:
988             tx = tx + self.gen_encrypt_pkts(
989                 p,
990                 p.scapy_tun_sa,
991                 self.tun_if,
992                 src=p.remote_tun_if_host,
993                 dst=self.pg1.remote_ip4,
994             )
995         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
996
997         for rx, p in zip(rxs, self.multi_params):
998             self.verify_decrypted(p, [rx])
999
1000         tx = []
1001         for p in self.multi_params:
1002             tx = tx + self.gen_pkts(
1003                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
1004             )
1005         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
1006
1007         for rx, p in zip(rxs, self.multi_params):
1008             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
1009
1010
1011 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1012     """IPsec IPv4 Tunnel interface all Algos"""
1013
1014     encryption_type = ESP
1015     tun4_encrypt_node_name = "esp4-encrypt-tun"
1016     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1017
1018     def setUp(self):
1019         super(TestIpsec4TunIfEspAll, self).setUp()
1020
1021         self.tun_if = self.pg0
1022         p = self.ipv4_params
1023
1024         self.config_network(p)
1025         self.config_sa_tra(p)
1026         self.config_protect(p)
1027
1028     def tearDown(self):
1029         p = self.ipv4_params
1030         self.unconfig_protect(p)
1031         self.unconfig_network(p)
1032         self.unconfig_sa(p)
1033
1034         super(TestIpsec4TunIfEspAll, self).tearDown()
1035
1036     def rekey(self, p):
1037         #
1038         # change the key and the SPI
1039         #
1040         np = copy.copy(p)
1041         p.crypt_key = b"X" + p.crypt_key[1:]
1042         p.scapy_tun_spi += 1
1043         p.scapy_tun_sa_id += 1
1044         p.vpp_tun_spi += 1
1045         p.vpp_tun_sa_id += 1
1046         p.tun_if.local_spi = p.vpp_tun_spi
1047         p.tun_if.remote_spi = p.scapy_tun_spi
1048
1049         config_tun_params(p, self.encryption_type, p.tun_if)
1050
1051         p.tun_sa_out = VppIpsecSA(
1052             self,
1053             p.vpp_tun_sa_id,
1054             p.vpp_tun_spi,
1055             p.auth_algo_vpp_id,
1056             p.auth_key,
1057             p.crypt_algo_vpp_id,
1058             p.crypt_key,
1059             self.vpp_esp_protocol,
1060             flags=p.flags,
1061             salt=p.salt,
1062         )
1063         p.tun_sa_in = VppIpsecSA(
1064             self,
1065             p.scapy_tun_sa_id,
1066             p.scapy_tun_spi,
1067             p.auth_algo_vpp_id,
1068             p.auth_key,
1069             p.crypt_algo_vpp_id,
1070             p.crypt_key,
1071             self.vpp_esp_protocol,
1072             flags=p.flags,
1073             salt=p.salt,
1074         )
1075         p.tun_sa_in.add_vpp_config()
1076         p.tun_sa_out.add_vpp_config()
1077
1078         self.config_protect(p)
1079         np.tun_sa_out.remove_vpp_config()
1080         np.tun_sa_in.remove_vpp_config()
1081         self.logger.info(self.vapi.cli("sh ipsec sa"))
1082
1083     def test_tun_44(self):
1084         """IPSEC tunnel all algos"""
1085
1086         # foreach VPP crypto engine
1087         engines = ["ia32", "ipsecmb", "openssl"]
1088
1089         # foreach crypto algorithm
1090         algos = [
1091             {
1092                 "vpp-crypto": (
1093                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1094                 ),
1095                 "vpp-integ": (
1096                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1097                 ),
1098                 "scapy-crypto": "AES-GCM",
1099                 "scapy-integ": "NULL",
1100                 "key": b"JPjyOWBeVEQiMe7h",
1101                 "salt": 3333,
1102             },
1103             {
1104                 "vpp-crypto": (
1105                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1106                 ),
1107                 "vpp-integ": (
1108                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1109                 ),
1110                 "scapy-crypto": "AES-GCM",
1111                 "scapy-integ": "NULL",
1112                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1113                 "salt": 0,
1114             },
1115             {
1116                 "vpp-crypto": (
1117                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1118                 ),
1119                 "vpp-integ": (
1120                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1121                 ),
1122                 "scapy-crypto": "AES-GCM",
1123                 "scapy-integ": "NULL",
1124                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1125                 "salt": 9999,
1126             },
1127             {
1128                 "vpp-crypto": (
1129                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1130                 ),
1131                 "vpp-integ": (
1132                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1133                 ),
1134                 "scapy-crypto": "AES-CBC",
1135                 "scapy-integ": "HMAC-SHA1-96",
1136                 "salt": 0,
1137                 "key": b"JPjyOWBeVEQiMe7h",
1138             },
1139             {
1140                 "vpp-crypto": (
1141                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1142                 ),
1143                 "vpp-integ": (
1144                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1145                 ),
1146                 "scapy-crypto": "AES-CBC",
1147                 "scapy-integ": "SHA2-512-256",
1148                 "salt": 0,
1149                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1150             },
1151             {
1152                 "vpp-crypto": (
1153                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1154                 ),
1155                 "vpp-integ": (
1156                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1157                 ),
1158                 "scapy-crypto": "AES-CBC",
1159                 "scapy-integ": "SHA2-256-128",
1160                 "salt": 0,
1161                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1162             },
1163             {
1164                 "vpp-crypto": (
1165                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1166                 ),
1167                 "vpp-integ": (
1168                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1169                 ),
1170                 "scapy-crypto": "NULL",
1171                 "scapy-integ": "HMAC-SHA1-96",
1172                 "salt": 0,
1173                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1174             },
1175         ]
1176
1177         for engine in engines:
1178             self.vapi.cli("set crypto handler all %s" % engine)
1179
1180             #
1181             # loop through each of the algorithms
1182             #
1183             for algo in algos:
1184                 # with self.subTest(algo=algo['scapy']):
1185
1186                 p = self.ipv4_params
1187                 p.auth_algo_vpp_id = algo["vpp-integ"]
1188                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1189                 p.crypt_algo = algo["scapy-crypto"]
1190                 p.auth_algo = algo["scapy-integ"]
1191                 p.crypt_key = algo["key"]
1192                 p.salt = algo["salt"]
1193
1194                 #
1195                 # rekey the tunnel
1196                 #
1197                 self.rekey(p)
1198                 self.verify_tun_44(p, count=127)
1199
1200
1201 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1202     """IPsec IPv4 Tunnel interface no Algos"""
1203
1204     encryption_type = ESP
1205     tun4_encrypt_node_name = "esp4-encrypt-tun"
1206     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1207
1208     def setUp(self):
1209         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1210
1211         self.tun_if = self.pg0
1212         p = self.ipv4_params
1213         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1214         p.auth_algo = "NULL"
1215         p.auth_key = []
1216
1217         p.crypt_algo_vpp_id = (
1218             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1219         )
1220         p.crypt_algo = "NULL"
1221         p.crypt_key = []
1222
1223     def tearDown(self):
1224         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1225
1226     def test_tun_44(self):
1227         """IPSec SA with NULL algos"""
1228         p = self.ipv4_params
1229
1230         self.config_network(p)
1231         self.config_sa_tra(p)
1232         self.config_protect(p)
1233
1234         tx = self.gen_pkts(
1235             self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=127
1236         )
1237         self.send_and_assert_no_replies(self.pg1, tx)
1238
1239         self.unconfig_protect(p)
1240         self.unconfig_sa(p)
1241         self.unconfig_network(p)
1242
1243     def test_tun_44_async(self):
1244         """IPSec SA with NULL algos using async crypto"""
1245         p = self.ipv4_params
1246
1247         self.vapi.ipsec_set_async_mode(async_enable=True)
1248         self.config_network(p)
1249         self.config_sa_tra(p)
1250         self.config_protect(p)
1251
1252         tx = self.gen_pkts(
1253             self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=127
1254         )
1255         self.send_and_assert_no_replies(self.pg1, tx)
1256
1257         self.unconfig_protect(p)
1258         self.unconfig_sa(p)
1259         self.unconfig_network(p)
1260
1261         self.vapi.ipsec_set_async_mode(async_enable=False)
1262
1263
1264 @tag_fixme_vpp_workers
1265 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1266     """IPsec IPv6 Multi Tunnel interface"""
1267
1268     encryption_type = ESP
1269     tun6_encrypt_node_name = "esp6-encrypt-tun"
1270     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1271
1272     def setUp(self):
1273         super(TestIpsec6MultiTunIfEsp, self).setUp()
1274
1275         self.tun_if = self.pg0
1276
1277         self.multi_params = []
1278         self.pg0.generate_remote_hosts(10)
1279         self.pg0.configure_ipv6_neighbors()
1280
1281         for ii in range(10):
1282             p = copy.copy(self.ipv6_params)
1283
1284             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1285             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1286             p.scapy_tun_spi = p.scapy_tun_spi + ii
1287             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1288             p.vpp_tun_spi = p.vpp_tun_spi + ii
1289
1290             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1291             p.scapy_tra_spi = p.scapy_tra_spi + ii
1292             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1293             p.vpp_tra_spi = p.vpp_tra_spi + ii
1294             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1295
1296             self.multi_params.append(p)
1297             self.config_network(p)
1298             self.config_sa_tra(p)
1299             self.config_protect(p)
1300
1301     def tearDown(self):
1302         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1303
1304     def test_tun_66(self):
1305         """Multiple IPSEC tunnel interfaces"""
1306         for p in self.multi_params:
1307             self.verify_tun_66(p, count=127)
1308             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1309             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1310
1311
1312 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1313     """Ipsec GRE TEB ESP - TUN tests"""
1314
1315     tun4_encrypt_node_name = "esp4-encrypt-tun"
1316     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1317     encryption_type = ESP
1318     omac = "00:11:22:33:44:55"
1319
1320     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1321         return [
1322             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1323             / sa.encrypt(
1324                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1325                 / GRE()
1326                 / Ether(dst=self.omac)
1327                 / IP(src="1.1.1.1", dst="1.1.1.2")
1328                 / UDP(sport=1144, dport=2233)
1329                 / Raw(b"X" * payload_size)
1330             )
1331             for i in range(count)
1332         ]
1333
1334     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1335         return [
1336             Ether(dst=self.omac)
1337             / IP(src="1.1.1.1", dst="1.1.1.2")
1338             / UDP(sport=1144, dport=2233)
1339             / Raw(b"X" * payload_size)
1340             for i in range(count)
1341         ]
1342
1343     def verify_decrypted(self, p, rxs):
1344         for rx in rxs:
1345             self.assert_equal(rx[Ether].dst, self.omac)
1346             self.assert_equal(rx[IP].dst, "1.1.1.2")
1347
1348     def verify_encrypted(self, p, sa, rxs):
1349         for rx in rxs:
1350             try:
1351                 pkt = sa.decrypt(rx[IP])
1352                 if not pkt.haslayer(IP):
1353                     pkt = IP(pkt[Raw].load)
1354                 self.assert_packet_checksums_valid(pkt)
1355                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1356                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1357                 self.assertTrue(pkt.haslayer(GRE))
1358                 e = pkt[Ether]
1359                 self.assertEqual(e[Ether].dst, self.omac)
1360                 self.assertEqual(e[IP].dst, "1.1.1.2")
1361             except (IndexError, AssertionError):
1362                 self.logger.debug(ppp("Unexpected packet:", rx))
1363                 try:
1364                     self.logger.debug(ppp("Decrypted packet:", pkt))
1365                 except:
1366                     pass
1367                 raise
1368
1369     def setUp(self):
1370         super(TestIpsecGreTebIfEsp, self).setUp()
1371
1372         self.tun_if = self.pg0
1373
1374         p = self.ipv4_params
1375
1376         bd1 = VppBridgeDomain(self, 1)
1377         bd1.add_vpp_config()
1378
1379         p.tun_sa_out = VppIpsecSA(
1380             self,
1381             p.vpp_tun_sa_id,
1382             p.vpp_tun_spi,
1383             p.auth_algo_vpp_id,
1384             p.auth_key,
1385             p.crypt_algo_vpp_id,
1386             p.crypt_key,
1387             self.vpp_esp_protocol,
1388             self.pg0.local_ip4,
1389             self.pg0.remote_ip4,
1390         )
1391         p.tun_sa_out.add_vpp_config()
1392
1393         p.tun_sa_in = VppIpsecSA(
1394             self,
1395             p.scapy_tun_sa_id,
1396             p.scapy_tun_spi,
1397             p.auth_algo_vpp_id,
1398             p.auth_key,
1399             p.crypt_algo_vpp_id,
1400             p.crypt_key,
1401             self.vpp_esp_protocol,
1402             self.pg0.remote_ip4,
1403             self.pg0.local_ip4,
1404         )
1405         p.tun_sa_in.add_vpp_config()
1406
1407         p.tun_if = VppGreInterface(
1408             self,
1409             self.pg0.local_ip4,
1410             self.pg0.remote_ip4,
1411             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1412         )
1413         p.tun_if.add_vpp_config()
1414
1415         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1416
1417         p.tun_protect.add_vpp_config()
1418
1419         p.tun_if.admin_up()
1420         p.tun_if.config_ip4()
1421         config_tun_params(p, self.encryption_type, p.tun_if)
1422
1423         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1424         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1425
1426         self.vapi.cli("clear ipsec sa")
1427         self.vapi.cli("sh adj")
1428         self.vapi.cli("sh ipsec tun")
1429
1430     def tearDown(self):
1431         p = self.ipv4_params
1432         p.tun_if.unconfig_ip4()
1433         super(TestIpsecGreTebIfEsp, self).tearDown()
1434
1435
1436 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1437     """Ipsec GRE TEB ESP - TUN tests"""
1438
1439     tun4_encrypt_node_name = "esp4-encrypt-tun"
1440     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1441     encryption_type = ESP
1442     omac = "00:11:22:33:44:55"
1443
1444     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1445         return [
1446             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1447             / sa.encrypt(
1448                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1449                 / GRE()
1450                 / Ether(dst=self.omac)
1451                 / IP(src="1.1.1.1", dst="1.1.1.2")
1452                 / UDP(sport=1144, dport=2233)
1453                 / Raw(b"X" * payload_size)
1454             )
1455             for i in range(count)
1456         ]
1457
1458     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1459         return [
1460             Ether(dst=self.omac)
1461             / Dot1Q(vlan=11)
1462             / IP(src="1.1.1.1", dst="1.1.1.2")
1463             / UDP(sport=1144, dport=2233)
1464             / Raw(b"X" * payload_size)
1465             for i in range(count)
1466         ]
1467
1468     def verify_decrypted(self, p, rxs):
1469         for rx in rxs:
1470             self.assert_equal(rx[Ether].dst, self.omac)
1471             self.assert_equal(rx[Dot1Q].vlan, 11)
1472             self.assert_equal(rx[IP].dst, "1.1.1.2")
1473
1474     def verify_encrypted(self, p, sa, rxs):
1475         for rx in rxs:
1476             try:
1477                 pkt = sa.decrypt(rx[IP])
1478                 if not pkt.haslayer(IP):
1479                     pkt = IP(pkt[Raw].load)
1480                 self.assert_packet_checksums_valid(pkt)
1481                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1482                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1483                 self.assertTrue(pkt.haslayer(GRE))
1484                 e = pkt[Ether]
1485                 self.assertEqual(e[Ether].dst, self.omac)
1486                 self.assertFalse(e.haslayer(Dot1Q))
1487                 self.assertEqual(e[IP].dst, "1.1.1.2")
1488             except (IndexError, AssertionError):
1489                 self.logger.debug(ppp("Unexpected packet:", rx))
1490                 try:
1491                     self.logger.debug(ppp("Decrypted packet:", pkt))
1492                 except:
1493                     pass
1494                 raise
1495
1496     def setUp(self):
1497         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1498
1499         self.tun_if = self.pg0
1500
1501         p = self.ipv4_params
1502
1503         bd1 = VppBridgeDomain(self, 1)
1504         bd1.add_vpp_config()
1505
1506         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1507         self.vapi.l2_interface_vlan_tag_rewrite(
1508             sw_if_index=self.pg1_11.sw_if_index,
1509             vtr_op=L2_VTR_OP.L2_POP_1,
1510             push_dot1q=11,
1511         )
1512         self.pg1_11.admin_up()
1513
1514         p.tun_sa_out = VppIpsecSA(
1515             self,
1516             p.vpp_tun_sa_id,
1517             p.vpp_tun_spi,
1518             p.auth_algo_vpp_id,
1519             p.auth_key,
1520             p.crypt_algo_vpp_id,
1521             p.crypt_key,
1522             self.vpp_esp_protocol,
1523             self.pg0.local_ip4,
1524             self.pg0.remote_ip4,
1525         )
1526         p.tun_sa_out.add_vpp_config()
1527
1528         p.tun_sa_in = VppIpsecSA(
1529             self,
1530             p.scapy_tun_sa_id,
1531             p.scapy_tun_spi,
1532             p.auth_algo_vpp_id,
1533             p.auth_key,
1534             p.crypt_algo_vpp_id,
1535             p.crypt_key,
1536             self.vpp_esp_protocol,
1537             self.pg0.remote_ip4,
1538             self.pg0.local_ip4,
1539         )
1540         p.tun_sa_in.add_vpp_config()
1541
1542         p.tun_if = VppGreInterface(
1543             self,
1544             self.pg0.local_ip4,
1545             self.pg0.remote_ip4,
1546             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1547         )
1548         p.tun_if.add_vpp_config()
1549
1550         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1551
1552         p.tun_protect.add_vpp_config()
1553
1554         p.tun_if.admin_up()
1555         p.tun_if.config_ip4()
1556         config_tun_params(p, self.encryption_type, p.tun_if)
1557
1558         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1559         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1560
1561         self.vapi.cli("clear ipsec sa")
1562
1563     def tearDown(self):
1564         p = self.ipv4_params
1565         p.tun_if.unconfig_ip4()
1566         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1567         self.pg1_11.admin_down()
1568         self.pg1_11.remove_vpp_config()
1569
1570
1571 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1572     """Ipsec GRE TEB ESP - Tra tests"""
1573
1574     tun4_encrypt_node_name = "esp4-encrypt-tun"
1575     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1576     encryption_type = ESP
1577     omac = "00:11:22:33:44:55"
1578
1579     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1580         return [
1581             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1582             / sa.encrypt(
1583                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1584                 / GRE()
1585                 / Ether(dst=self.omac)
1586                 / IP(src="1.1.1.1", dst="1.1.1.2")
1587                 / UDP(sport=1144, dport=2233)
1588                 / Raw(b"X" * payload_size)
1589             )
1590             for i in range(count)
1591         ]
1592
1593     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1594         return [
1595             Ether(dst=self.omac)
1596             / IP(src="1.1.1.1", dst="1.1.1.2")
1597             / UDP(sport=1144, dport=2233)
1598             / Raw(b"X" * payload_size)
1599             for i in range(count)
1600         ]
1601
1602     def verify_decrypted(self, p, rxs):
1603         for rx in rxs:
1604             self.assert_equal(rx[Ether].dst, self.omac)
1605             self.assert_equal(rx[IP].dst, "1.1.1.2")
1606
1607     def verify_encrypted(self, p, sa, rxs):
1608         for rx in rxs:
1609             try:
1610                 pkt = sa.decrypt(rx[IP])
1611                 if not pkt.haslayer(IP):
1612                     pkt = IP(pkt[Raw].load)
1613                 self.assert_packet_checksums_valid(pkt)
1614                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1615                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1616                 self.assertTrue(pkt.haslayer(GRE))
1617                 e = pkt[Ether]
1618                 self.assertEqual(e[Ether].dst, self.omac)
1619                 self.assertEqual(e[IP].dst, "1.1.1.2")
1620             except (IndexError, AssertionError):
1621                 self.logger.debug(ppp("Unexpected packet:", rx))
1622                 try:
1623                     self.logger.debug(ppp("Decrypted packet:", pkt))
1624                 except:
1625                     pass
1626                 raise
1627
1628     def setUp(self):
1629         super(TestIpsecGreTebIfEspTra, self).setUp()
1630
1631         self.tun_if = self.pg0
1632
1633         p = self.ipv4_params
1634
1635         bd1 = VppBridgeDomain(self, 1)
1636         bd1.add_vpp_config()
1637
1638         p.tun_sa_out = VppIpsecSA(
1639             self,
1640             p.vpp_tun_sa_id,
1641             p.vpp_tun_spi,
1642             p.auth_algo_vpp_id,
1643             p.auth_key,
1644             p.crypt_algo_vpp_id,
1645             p.crypt_key,
1646             self.vpp_esp_protocol,
1647         )
1648         p.tun_sa_out.add_vpp_config()
1649
1650         p.tun_sa_in = VppIpsecSA(
1651             self,
1652             p.scapy_tun_sa_id,
1653             p.scapy_tun_spi,
1654             p.auth_algo_vpp_id,
1655             p.auth_key,
1656             p.crypt_algo_vpp_id,
1657             p.crypt_key,
1658             self.vpp_esp_protocol,
1659         )
1660         p.tun_sa_in.add_vpp_config()
1661
1662         p.tun_if = VppGreInterface(
1663             self,
1664             self.pg0.local_ip4,
1665             self.pg0.remote_ip4,
1666             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1667         )
1668         p.tun_if.add_vpp_config()
1669
1670         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1671
1672         p.tun_protect.add_vpp_config()
1673
1674         p.tun_if.admin_up()
1675         p.tun_if.config_ip4()
1676         config_tra_params(p, self.encryption_type, p.tun_if)
1677
1678         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1679         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1680
1681         self.vapi.cli("clear ipsec sa")
1682
1683     def tearDown(self):
1684         p = self.ipv4_params
1685         p.tun_if.unconfig_ip4()
1686         super(TestIpsecGreTebIfEspTra, self).tearDown()
1687
1688
1689 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1690     """Ipsec GRE TEB UDP ESP - Tra tests"""
1691
1692     tun4_encrypt_node_name = "esp4-encrypt-tun"
1693     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1694     encryption_type = ESP
1695     omac = "00:11:22:33:44:55"
1696
1697     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1698         return [
1699             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1700             / sa.encrypt(
1701                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1702                 / GRE()
1703                 / Ether(dst=self.omac)
1704                 / IP(src="1.1.1.1", dst="1.1.1.2")
1705                 / UDP(sport=1144, dport=2233)
1706                 / Raw(b"X" * payload_size)
1707             )
1708             for i in range(count)
1709         ]
1710
1711     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1712         return [
1713             Ether(dst=self.omac)
1714             / IP(src="1.1.1.1", dst="1.1.1.2")
1715             / UDP(sport=1144, dport=2233)
1716             / Raw(b"X" * payload_size)
1717             for i in range(count)
1718         ]
1719
1720     def verify_decrypted(self, p, rxs):
1721         for rx in rxs:
1722             self.assert_equal(rx[Ether].dst, self.omac)
1723             self.assert_equal(rx[IP].dst, "1.1.1.2")
1724
1725     def verify_encrypted(self, p, sa, rxs):
1726         for rx in rxs:
1727             self.assertTrue(rx.haslayer(UDP))
1728             self.assertEqual(rx[UDP].dport, 4545)
1729             self.assertEqual(rx[UDP].sport, 5454)
1730             try:
1731                 pkt = sa.decrypt(rx[IP])
1732                 if not pkt.haslayer(IP):
1733                     pkt = IP(pkt[Raw].load)
1734                 self.assert_packet_checksums_valid(pkt)
1735                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1736                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1737                 self.assertTrue(pkt.haslayer(GRE))
1738                 e = pkt[Ether]
1739                 self.assertEqual(e[Ether].dst, self.omac)
1740                 self.assertEqual(e[IP].dst, "1.1.1.2")
1741             except (IndexError, AssertionError):
1742                 self.logger.debug(ppp("Unexpected packet:", rx))
1743                 try:
1744                     self.logger.debug(ppp("Decrypted packet:", pkt))
1745                 except:
1746                     pass
1747                 raise
1748
1749     def setUp(self):
1750         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1751
1752         self.tun_if = self.pg0
1753
1754         p = self.ipv4_params
1755         p = self.ipv4_params
1756         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1757         p.nat_header = UDP(sport=5454, dport=4545)
1758
1759         bd1 = VppBridgeDomain(self, 1)
1760         bd1.add_vpp_config()
1761
1762         p.tun_sa_out = VppIpsecSA(
1763             self,
1764             p.vpp_tun_sa_id,
1765             p.vpp_tun_spi,
1766             p.auth_algo_vpp_id,
1767             p.auth_key,
1768             p.crypt_algo_vpp_id,
1769             p.crypt_key,
1770             self.vpp_esp_protocol,
1771             flags=p.flags,
1772             udp_src=5454,
1773             udp_dst=4545,
1774         )
1775         p.tun_sa_out.add_vpp_config()
1776
1777         p.tun_sa_in = VppIpsecSA(
1778             self,
1779             p.scapy_tun_sa_id,
1780             p.scapy_tun_spi,
1781             p.auth_algo_vpp_id,
1782             p.auth_key,
1783             p.crypt_algo_vpp_id,
1784             p.crypt_key,
1785             self.vpp_esp_protocol,
1786             flags=(
1787                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1788             ),
1789             udp_src=4545,
1790             udp_dst=5454,
1791         )
1792         p.tun_sa_in.add_vpp_config()
1793
1794         p.tun_if = VppGreInterface(
1795             self,
1796             self.pg0.local_ip4,
1797             self.pg0.remote_ip4,
1798             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1799         )
1800         p.tun_if.add_vpp_config()
1801
1802         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1803
1804         p.tun_protect.add_vpp_config()
1805
1806         p.tun_if.admin_up()
1807         p.tun_if.config_ip4()
1808         config_tra_params(p, self.encryption_type, p.tun_if)
1809
1810         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1811         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1812
1813         self.vapi.cli("clear ipsec sa")
1814         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1815
1816     def tearDown(self):
1817         p = self.ipv4_params
1818         p.tun_if.unconfig_ip4()
1819         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1820
1821
1822 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1823     """Ipsec GRE ESP - TUN tests"""
1824
1825     tun4_encrypt_node_name = "esp4-encrypt-tun"
1826     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1827     encryption_type = ESP
1828
1829     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1830         return [
1831             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1832             / sa.encrypt(
1833                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1834                 / GRE()
1835                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1836                 / UDP(sport=1144, dport=2233)
1837                 / Raw(b"X" * payload_size)
1838             )
1839             for i in range(count)
1840         ]
1841
1842     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1843         return [
1844             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1845             / IP(src="1.1.1.1", dst="1.1.1.2")
1846             / UDP(sport=1144, dport=2233)
1847             / Raw(b"X" * payload_size)
1848             for i in range(count)
1849         ]
1850
1851     def verify_decrypted(self, p, rxs):
1852         for rx in rxs:
1853             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1854             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1855
1856     def verify_encrypted(self, p, sa, rxs):
1857         for rx in rxs:
1858             try:
1859                 pkt = sa.decrypt(rx[IP])
1860                 if not pkt.haslayer(IP):
1861                     pkt = IP(pkt[Raw].load)
1862                 self.assert_packet_checksums_valid(pkt)
1863                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1864                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1865                 self.assertTrue(pkt.haslayer(GRE))
1866                 e = pkt[GRE]
1867                 self.assertEqual(e[IP].dst, "1.1.1.2")
1868             except (IndexError, AssertionError):
1869                 self.logger.debug(ppp("Unexpected packet:", rx))
1870                 try:
1871                     self.logger.debug(ppp("Decrypted packet:", pkt))
1872                 except:
1873                     pass
1874                 raise
1875
1876     def setUp(self):
1877         super(TestIpsecGreIfEsp, self).setUp()
1878
1879         self.tun_if = self.pg0
1880
1881         p = self.ipv4_params
1882
1883         bd1 = VppBridgeDomain(self, 1)
1884         bd1.add_vpp_config()
1885
1886         p.tun_sa_out = VppIpsecSA(
1887             self,
1888             p.vpp_tun_sa_id,
1889             p.vpp_tun_spi,
1890             p.auth_algo_vpp_id,
1891             p.auth_key,
1892             p.crypt_algo_vpp_id,
1893             p.crypt_key,
1894             self.vpp_esp_protocol,
1895             self.pg0.local_ip4,
1896             self.pg0.remote_ip4,
1897         )
1898         p.tun_sa_out.add_vpp_config()
1899
1900         p.tun_sa_in = VppIpsecSA(
1901             self,
1902             p.scapy_tun_sa_id,
1903             p.scapy_tun_spi,
1904             p.auth_algo_vpp_id,
1905             p.auth_key,
1906             p.crypt_algo_vpp_id,
1907             p.crypt_key,
1908             self.vpp_esp_protocol,
1909             self.pg0.remote_ip4,
1910             self.pg0.local_ip4,
1911         )
1912         p.tun_sa_in.add_vpp_config()
1913
1914         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1915         p.tun_if.add_vpp_config()
1916
1917         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1918         p.tun_protect.add_vpp_config()
1919
1920         p.tun_if.admin_up()
1921         p.tun_if.config_ip4()
1922         config_tun_params(p, self.encryption_type, p.tun_if)
1923
1924         VppIpRoute(
1925             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1926         ).add_vpp_config()
1927
1928     def tearDown(self):
1929         p = self.ipv4_params
1930         p.tun_if.unconfig_ip4()
1931         super(TestIpsecGreIfEsp, self).tearDown()
1932
1933
1934 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1935     """Ipsec GRE ESP - TRA tests"""
1936
1937     tun4_encrypt_node_name = "esp4-encrypt-tun"
1938     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1939     encryption_type = ESP
1940
1941     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1942         return [
1943             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1944             / sa.encrypt(
1945                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1946                 / GRE()
1947                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1948                 / UDP(sport=1144, dport=2233)
1949                 / Raw(b"X" * payload_size)
1950             )
1951             for i in range(count)
1952         ]
1953
1954     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1955         return [
1956             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1957             / sa.encrypt(
1958                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1959                 / GRE()
1960                 / UDP(sport=1144, dport=2233)
1961                 / Raw(b"X" * payload_size)
1962             )
1963             for i in range(count)
1964         ]
1965
1966     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1967         return [
1968             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1969             / IP(src="1.1.1.1", dst="1.1.1.2")
1970             / UDP(sport=1144, dport=2233)
1971             / Raw(b"X" * payload_size)
1972             for i in range(count)
1973         ]
1974
1975     def verify_decrypted(self, p, rxs):
1976         for rx in rxs:
1977             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1978             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1979
1980     def verify_encrypted(self, p, sa, rxs):
1981         for rx in rxs:
1982             try:
1983                 pkt = sa.decrypt(rx[IP])
1984                 if not pkt.haslayer(IP):
1985                     pkt = IP(pkt[Raw].load)
1986                 self.assert_packet_checksums_valid(pkt)
1987                 self.assertTrue(pkt.haslayer(GRE))
1988                 e = pkt[GRE]
1989                 self.assertEqual(e[IP].dst, "1.1.1.2")
1990             except (IndexError, AssertionError):
1991                 self.logger.debug(ppp("Unexpected packet:", rx))
1992                 try:
1993                     self.logger.debug(ppp("Decrypted packet:", pkt))
1994                 except:
1995                     pass
1996                 raise
1997
1998     def setUp(self):
1999         super(TestIpsecGreIfEspTra, self).setUp()
2000
2001         self.tun_if = self.pg0
2002
2003         p = self.ipv4_params
2004
2005         p.tun_sa_out = VppIpsecSA(
2006             self,
2007             p.vpp_tun_sa_id,
2008             p.vpp_tun_spi,
2009             p.auth_algo_vpp_id,
2010             p.auth_key,
2011             p.crypt_algo_vpp_id,
2012             p.crypt_key,
2013             self.vpp_esp_protocol,
2014         )
2015         p.tun_sa_out.add_vpp_config()
2016
2017         p.tun_sa_in = VppIpsecSA(
2018             self,
2019             p.scapy_tun_sa_id,
2020             p.scapy_tun_spi,
2021             p.auth_algo_vpp_id,
2022             p.auth_key,
2023             p.crypt_algo_vpp_id,
2024             p.crypt_key,
2025             self.vpp_esp_protocol,
2026         )
2027         p.tun_sa_in.add_vpp_config()
2028
2029         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2030         p.tun_if.add_vpp_config()
2031
2032         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2033         p.tun_protect.add_vpp_config()
2034
2035         p.tun_if.admin_up()
2036         p.tun_if.config_ip4()
2037         config_tra_params(p, self.encryption_type, p.tun_if)
2038
2039         VppIpRoute(
2040             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
2041         ).add_vpp_config()
2042
2043     def tearDown(self):
2044         p = self.ipv4_params
2045         p.tun_if.unconfig_ip4()
2046         super(TestIpsecGreIfEspTra, self).tearDown()
2047
2048     def test_gre_non_ip(self):
2049         p = self.ipv4_params
2050         tx = self.gen_encrypt_non_ip_pkts(
2051             p.scapy_tun_sa,
2052             self.tun_if,
2053             src=p.remote_tun_if_host,
2054             dst=self.pg1.remote_ip6,
2055         )
2056         self.send_and_assert_no_replies(self.tun_if, tx)
2057         node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
2058         self.assertEqual(1, self.statistics.get_err_counter(node_name))
2059         err = p.tun_sa_in.get_err("unsup_payload")
2060         self.assertEqual(err, 1)
2061
2062
2063 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
2064     """Ipsec GRE ESP - TRA tests"""
2065
2066     tun6_encrypt_node_name = "esp6-encrypt-tun"
2067     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2068     encryption_type = ESP
2069
2070     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2071         return [
2072             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2073             / sa.encrypt(
2074                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2075                 / GRE()
2076                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2077                 / UDP(sport=1144, dport=2233)
2078                 / Raw(b"X" * payload_size)
2079             )
2080             for i in range(count)
2081         ]
2082
2083     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2084         return [
2085             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2086             / IPv6(src="1::1", dst="1::2")
2087             / UDP(sport=1144, dport=2233)
2088             / Raw(b"X" * payload_size)
2089             for i in range(count)
2090         ]
2091
2092     def verify_decrypted6(self, p, rxs):
2093         for rx in rxs:
2094             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2095             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2096
2097     def verify_encrypted6(self, p, sa, rxs):
2098         for rx in rxs:
2099             try:
2100                 pkt = sa.decrypt(rx[IPv6])
2101                 if not pkt.haslayer(IPv6):
2102                     pkt = IPv6(pkt[Raw].load)
2103                 self.assert_packet_checksums_valid(pkt)
2104                 self.assertTrue(pkt.haslayer(GRE))
2105                 e = pkt[GRE]
2106                 self.assertEqual(e[IPv6].dst, "1::2")
2107             except (IndexError, AssertionError):
2108                 self.logger.debug(ppp("Unexpected packet:", rx))
2109                 try:
2110                     self.logger.debug(ppp("Decrypted packet:", pkt))
2111                 except:
2112                     pass
2113                 raise
2114
2115     def setUp(self):
2116         super(TestIpsecGre6IfEspTra, self).setUp()
2117
2118         self.tun_if = self.pg0
2119
2120         p = self.ipv6_params
2121
2122         bd1 = VppBridgeDomain(self, 1)
2123         bd1.add_vpp_config()
2124
2125         p.tun_sa_out = VppIpsecSA(
2126             self,
2127             p.vpp_tun_sa_id,
2128             p.vpp_tun_spi,
2129             p.auth_algo_vpp_id,
2130             p.auth_key,
2131             p.crypt_algo_vpp_id,
2132             p.crypt_key,
2133             self.vpp_esp_protocol,
2134         )
2135         p.tun_sa_out.add_vpp_config()
2136
2137         p.tun_sa_in = VppIpsecSA(
2138             self,
2139             p.scapy_tun_sa_id,
2140             p.scapy_tun_spi,
2141             p.auth_algo_vpp_id,
2142             p.auth_key,
2143             p.crypt_algo_vpp_id,
2144             p.crypt_key,
2145             self.vpp_esp_protocol,
2146         )
2147         p.tun_sa_in.add_vpp_config()
2148
2149         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2150         p.tun_if.add_vpp_config()
2151
2152         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2153         p.tun_protect.add_vpp_config()
2154
2155         p.tun_if.admin_up()
2156         p.tun_if.config_ip6()
2157         config_tra_params(p, self.encryption_type, p.tun_if)
2158
2159         r = VppIpRoute(
2160             self,
2161             "1::2",
2162             128,
2163             [
2164                 VppRoutePath(
2165                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2166                 )
2167             ],
2168         )
2169         r.add_vpp_config()
2170
2171     def tearDown(self):
2172         p = self.ipv6_params
2173         p.tun_if.unconfig_ip6()
2174         super(TestIpsecGre6IfEspTra, self).tearDown()
2175
2176
2177 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2178     """Ipsec mGRE ESP v4 TRA tests"""
2179
2180     tun4_encrypt_node_name = "esp4-encrypt-tun"
2181     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2182     encryption_type = ESP
2183
2184     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2185         return [
2186             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2187             / sa.encrypt(
2188                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2189                 / GRE()
2190                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2191                 / UDP(sport=1144, dport=2233)
2192                 / Raw(b"X" * payload_size)
2193             )
2194             for i in range(count)
2195         ]
2196
2197     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2198         return [
2199             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2200             / IP(src="1.1.1.1", dst=dst)
2201             / UDP(sport=1144, dport=2233)
2202             / Raw(b"X" * payload_size)
2203             for i in range(count)
2204         ]
2205
2206     def verify_decrypted(self, p, rxs):
2207         for rx in rxs:
2208             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2209             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2210
2211     def verify_encrypted(self, p, sa, rxs):
2212         for rx in rxs:
2213             try:
2214                 pkt = sa.decrypt(rx[IP])
2215                 if not pkt.haslayer(IP):
2216                     pkt = IP(pkt[Raw].load)
2217                 self.assert_packet_checksums_valid(pkt)
2218                 self.assertTrue(pkt.haslayer(GRE))
2219                 e = pkt[GRE]
2220                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2221             except (IndexError, AssertionError):
2222                 self.logger.debug(ppp("Unexpected packet:", rx))
2223                 try:
2224                     self.logger.debug(ppp("Decrypted packet:", pkt))
2225                 except:
2226                     pass
2227                 raise
2228
2229     def setUp(self):
2230         super(TestIpsecMGreIfEspTra4, self).setUp()
2231
2232         N_NHS = 16
2233         self.tun_if = self.pg0
2234         p = self.ipv4_params
2235         p.tun_if = VppGreInterface(
2236             self,
2237             self.pg0.local_ip4,
2238             "0.0.0.0",
2239             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2240         )
2241         p.tun_if.add_vpp_config()
2242         p.tun_if.admin_up()
2243         p.tun_if.config_ip4()
2244         p.tun_if.generate_remote_hosts(N_NHS)
2245         self.pg0.generate_remote_hosts(N_NHS)
2246         self.pg0.configure_ipv4_neighbors()
2247
2248         # setup some SAs for several next-hops on the interface
2249         self.multi_params = []
2250
2251         for ii in range(N_NHS):
2252             p = copy.copy(self.ipv4_params)
2253
2254             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2255             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2256             p.scapy_tun_spi = p.scapy_tun_spi + ii
2257             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2258             p.vpp_tun_spi = p.vpp_tun_spi + ii
2259
2260             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2261             p.scapy_tra_spi = p.scapy_tra_spi + ii
2262             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2263             p.vpp_tra_spi = p.vpp_tra_spi + ii
2264             p.tun_sa_out = VppIpsecSA(
2265                 self,
2266                 p.vpp_tun_sa_id,
2267                 p.vpp_tun_spi,
2268                 p.auth_algo_vpp_id,
2269                 p.auth_key,
2270                 p.crypt_algo_vpp_id,
2271                 p.crypt_key,
2272                 self.vpp_esp_protocol,
2273             )
2274             p.tun_sa_out.add_vpp_config()
2275
2276             p.tun_sa_in = VppIpsecSA(
2277                 self,
2278                 p.scapy_tun_sa_id,
2279                 p.scapy_tun_spi,
2280                 p.auth_algo_vpp_id,
2281                 p.auth_key,
2282                 p.crypt_algo_vpp_id,
2283                 p.crypt_key,
2284                 self.vpp_esp_protocol,
2285             )
2286             p.tun_sa_in.add_vpp_config()
2287
2288             p.tun_protect = VppIpsecTunProtect(
2289                 self,
2290                 p.tun_if,
2291                 p.tun_sa_out,
2292                 [p.tun_sa_in],
2293                 nh=p.tun_if.remote_hosts[ii].ip4,
2294             )
2295             p.tun_protect.add_vpp_config()
2296             config_tra_params(p, self.encryption_type, p.tun_if)
2297             self.multi_params.append(p)
2298
2299             VppIpRoute(
2300                 self,
2301                 p.remote_tun_if_host,
2302                 32,
2303                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2304             ).add_vpp_config()
2305
2306             # in this v4 variant add the teibs after the protect
2307             p.teib = VppTeib(
2308                 self,
2309                 p.tun_if,
2310                 p.tun_if.remote_hosts[ii].ip4,
2311                 self.pg0.remote_hosts[ii].ip4,
2312             ).add_vpp_config()
2313             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2314         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2315
2316     def tearDown(self):
2317         p = self.ipv4_params
2318         p.tun_if.unconfig_ip4()
2319         super(TestIpsecMGreIfEspTra4, self).tearDown()
2320
2321     def test_tun_44(self):
2322         """mGRE IPSEC 44"""
2323         N_PKTS = 63
2324         for p in self.multi_params:
2325             self.verify_tun_44(p, count=N_PKTS)
2326             p.teib.remove_vpp_config()
2327             self.verify_tun_dropped_44(p, count=N_PKTS)
2328             p.teib.add_vpp_config()
2329             self.verify_tun_44(p, count=N_PKTS)
2330
2331
2332 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2333     """Ipsec mGRE ESP v6 TRA tests"""
2334
2335     tun6_encrypt_node_name = "esp6-encrypt-tun"
2336     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2337     encryption_type = ESP
2338
2339     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2340         return [
2341             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2342             / sa.encrypt(
2343                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2344                 / GRE()
2345                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2346                 / UDP(sport=1144, dport=2233)
2347                 / Raw(b"X" * payload_size)
2348             )
2349             for i in range(count)
2350         ]
2351
2352     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2353         return [
2354             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2355             / IPv6(src="1::1", dst=dst)
2356             / UDP(sport=1144, dport=2233)
2357             / Raw(b"X" * payload_size)
2358             for i in range(count)
2359         ]
2360
2361     def verify_decrypted6(self, p, rxs):
2362         for rx in rxs:
2363             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2364             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2365
2366     def verify_encrypted6(self, p, sa, rxs):
2367         for rx in rxs:
2368             try:
2369                 pkt = sa.decrypt(rx[IPv6])
2370                 if not pkt.haslayer(IPv6):
2371                     pkt = IPv6(pkt[Raw].load)
2372                 self.assert_packet_checksums_valid(pkt)
2373                 self.assertTrue(pkt.haslayer(GRE))
2374                 e = pkt[GRE]
2375                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2376             except (IndexError, AssertionError):
2377                 self.logger.debug(ppp("Unexpected packet:", rx))
2378                 try:
2379                     self.logger.debug(ppp("Decrypted packet:", pkt))
2380                 except:
2381                     pass
2382                 raise
2383
2384     def setUp(self):
2385         super(TestIpsecMGreIfEspTra6, self).setUp()
2386
2387         self.vapi.cli("set logging class ipsec level debug")
2388
2389         N_NHS = 16
2390         self.tun_if = self.pg0
2391         p = self.ipv6_params
2392         p.tun_if = VppGreInterface(
2393             self,
2394             self.pg0.local_ip6,
2395             "::",
2396             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2397         )
2398         p.tun_if.add_vpp_config()
2399         p.tun_if.admin_up()
2400         p.tun_if.config_ip6()
2401         p.tun_if.generate_remote_hosts(N_NHS)
2402         self.pg0.generate_remote_hosts(N_NHS)
2403         self.pg0.configure_ipv6_neighbors()
2404
2405         # setup some SAs for several next-hops on the interface
2406         self.multi_params = []
2407
2408         for ii in range(N_NHS):
2409             p = copy.copy(self.ipv6_params)
2410
2411             p.remote_tun_if_host = "1::%d" % (ii + 1)
2412             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2413             p.scapy_tun_spi = p.scapy_tun_spi + ii
2414             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2415             p.vpp_tun_spi = p.vpp_tun_spi + ii
2416
2417             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2418             p.scapy_tra_spi = p.scapy_tra_spi + ii
2419             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2420             p.vpp_tra_spi = p.vpp_tra_spi + ii
2421             p.tun_sa_out = VppIpsecSA(
2422                 self,
2423                 p.vpp_tun_sa_id,
2424                 p.vpp_tun_spi,
2425                 p.auth_algo_vpp_id,
2426                 p.auth_key,
2427                 p.crypt_algo_vpp_id,
2428                 p.crypt_key,
2429                 self.vpp_esp_protocol,
2430             )
2431             p.tun_sa_out.add_vpp_config()
2432
2433             p.tun_sa_in = VppIpsecSA(
2434                 self,
2435                 p.scapy_tun_sa_id,
2436                 p.scapy_tun_spi,
2437                 p.auth_algo_vpp_id,
2438                 p.auth_key,
2439                 p.crypt_algo_vpp_id,
2440                 p.crypt_key,
2441                 self.vpp_esp_protocol,
2442             )
2443             p.tun_sa_in.add_vpp_config()
2444
2445             # in this v6 variant add the teibs first then the protection
2446             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2447             VppTeib(
2448                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2449             ).add_vpp_config()
2450
2451             p.tun_protect = VppIpsecTunProtect(
2452                 self,
2453                 p.tun_if,
2454                 p.tun_sa_out,
2455                 [p.tun_sa_in],
2456                 nh=p.tun_if.remote_hosts[ii].ip6,
2457             )
2458             p.tun_protect.add_vpp_config()
2459             config_tra_params(p, self.encryption_type, p.tun_if)
2460             self.multi_params.append(p)
2461
2462             VppIpRoute(
2463                 self,
2464                 p.remote_tun_if_host,
2465                 128,
2466                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2467             ).add_vpp_config()
2468             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2469
2470         self.logger.info(self.vapi.cli("sh log"))
2471         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2472         self.logger.info(self.vapi.cli("sh adj 41"))
2473
2474     def tearDown(self):
2475         p = self.ipv6_params
2476         p.tun_if.unconfig_ip6()
2477         super(TestIpsecMGreIfEspTra6, self).tearDown()
2478
2479     def test_tun_66(self):
2480         """mGRE IPSec 66"""
2481         for p in self.multi_params:
2482             self.verify_tun_66(p, count=63)
2483
2484
2485 @tag_fixme_vpp_workers
2486 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2487     """IPsec IPv4 Tunnel protect - transport mode"""
2488
2489     def setUp(self):
2490         super(TestIpsec4TunProtect, self).setUp()
2491
2492         self.tun_if = self.pg0
2493
2494     def tearDown(self):
2495         super(TestIpsec4TunProtect, self).tearDown()
2496
2497     def test_tun_44(self):
2498         """IPSEC tunnel protect"""
2499
2500         p = self.ipv4_params
2501
2502         self.config_network(p)
2503         self.config_sa_tra(p)
2504         self.config_protect(p)
2505
2506         self.verify_tun_44(p, count=127)
2507         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2508         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2509
2510         self.vapi.cli("clear ipsec sa")
2511         self.verify_tun_64(p, count=127)
2512         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2513         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2514
2515         # rekey - create new SAs and update the tunnel protection
2516         np = copy.copy(p)
2517         np.crypt_key = b"X" + p.crypt_key[1:]
2518         np.scapy_tun_spi += 100
2519         np.scapy_tun_sa_id += 1
2520         np.vpp_tun_spi += 100
2521         np.vpp_tun_sa_id += 1
2522         np.tun_if.local_spi = p.vpp_tun_spi
2523         np.tun_if.remote_spi = p.scapy_tun_spi
2524
2525         self.config_sa_tra(np)
2526         self.config_protect(np)
2527         self.unconfig_sa(p)
2528
2529         self.verify_tun_44(np, count=127)
2530         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2531         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2532
2533         # teardown
2534         self.unconfig_protect(np)
2535         self.unconfig_sa(np)
2536         self.unconfig_network(p)
2537
2538
2539 @tag_fixme_vpp_workers
2540 class TestIpsec4TunProtectTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtect):
2541     """IPsec IPv4 Tunnel protect with TFC - transport mode"""
2542
2543
2544 @tag_fixme_vpp_workers
2545 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2546     """IPsec IPv4 UDP Tunnel protect - transport mode"""
2547
2548     def setUp(self):
2549         super(TestIpsec4TunProtectUdp, self).setUp()
2550
2551         self.tun_if = self.pg0
2552
2553         p = self.ipv4_params
2554         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2555         p.nat_header = UDP(sport=4500, dport=4500)
2556         self.config_network(p)
2557         self.config_sa_tra(p)
2558         self.config_protect(p)
2559
2560     def tearDown(self):
2561         p = self.ipv4_params
2562         self.unconfig_protect(p)
2563         self.unconfig_sa(p)
2564         self.unconfig_network(p)
2565         super(TestIpsec4TunProtectUdp, self).tearDown()
2566
2567     def verify_encrypted(self, p, sa, rxs):
2568         # ensure encrypted packets are recieved with the default UDP ports
2569         for rx in rxs:
2570             self.assertEqual(rx[UDP].sport, 4500)
2571             self.assertEqual(rx[UDP].dport, 4500)
2572         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2573
2574     def test_tun_44(self):
2575         """IPSEC UDP tunnel protect"""
2576
2577         p = self.ipv4_params
2578
2579         self.verify_tun_44(p, count=127)
2580         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2581         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2582
2583     def test_keepalive(self):
2584         """IPSEC NAT Keepalive"""
2585         self.verify_keepalive(self.ipv4_params)
2586
2587
2588 @tag_fixme_vpp_workers
2589 class TestIpsec4TunProtectUdpTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtectUdp):
2590     """IPsec IPv4 UDP Tunnel protect with TFC - transport mode"""
2591
2592
2593 @tag_fixme_vpp_workers
2594 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2595     """IPsec IPv4 Tunnel protect - tunnel mode"""
2596
2597     encryption_type = ESP
2598     tun4_encrypt_node_name = "esp4-encrypt-tun"
2599     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2600
2601     def setUp(self):
2602         super(TestIpsec4TunProtectTun, self).setUp()
2603
2604         self.tun_if = self.pg0
2605
2606     def tearDown(self):
2607         super(TestIpsec4TunProtectTun, self).tearDown()
2608
2609     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2610         return [
2611             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2612             / sa.encrypt(
2613                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2614                 / IP(src=src, dst=dst)
2615                 / UDP(sport=1144, dport=2233)
2616                 / Raw(b"X" * payload_size)
2617             )
2618             for i in range(count)
2619         ]
2620
2621     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2622         return [
2623             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2624             / IP(src=src, dst=dst)
2625             / UDP(sport=1144, dport=2233)
2626             / Raw(b"X" * payload_size)
2627             for i in range(count)
2628         ]
2629
2630     def verify_decrypted(self, p, rxs):
2631         for rx in rxs:
2632             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2633             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2634             self.assert_packet_checksums_valid(rx)
2635
2636     def verify_encrypted(self, p, sa, rxs):
2637         for rx in rxs:
2638             try:
2639                 pkt = sa.decrypt(rx[IP])
2640                 if not pkt.haslayer(IP):
2641                     pkt = IP(pkt[Raw].load)
2642                 self.assert_packet_checksums_valid(pkt)
2643                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2644                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2645                 inner = pkt[IP].payload
2646                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2647
2648             except (IndexError, AssertionError):
2649                 self.logger.debug(ppp("Unexpected packet:", rx))
2650                 try:
2651                     self.logger.debug(ppp("Decrypted packet:", pkt))
2652                 except:
2653                     pass
2654                 raise
2655
2656     def test_tun_44(self):
2657         """IPSEC tunnel protect"""
2658
2659         p = self.ipv4_params
2660
2661         self.config_network(p)
2662         self.config_sa_tun(p)
2663         self.config_protect(p)
2664
2665         # also add an output features on the tunnel and physical interface
2666         # so we test they still work
2667         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2668         a = VppAcl(self, [r_all]).add_vpp_config()
2669
2670         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2671         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2672
2673         self.verify_tun_44(p, count=127)
2674
2675         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2676         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2677
2678         # rekey - create new SAs and update the tunnel protection
2679         np = copy.copy(p)
2680         np.crypt_key = b"X" + p.crypt_key[1:]
2681         np.scapy_tun_spi += 100
2682         np.scapy_tun_sa_id += 1
2683         np.vpp_tun_spi += 100
2684         np.vpp_tun_sa_id += 1
2685         np.tun_if.local_spi = p.vpp_tun_spi
2686         np.tun_if.remote_spi = p.scapy_tun_spi
2687
2688         self.config_sa_tun(np)
2689         self.config_protect(np)
2690         self.unconfig_sa(p)
2691
2692         self.verify_tun_44(np, count=127)
2693         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2694         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2695
2696         # teardown
2697         self.unconfig_protect(np)
2698         self.unconfig_sa(np)
2699         self.unconfig_network(p)
2700
2701
2702 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2703     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2704
2705     encryption_type = ESP
2706     tun4_encrypt_node_name = "esp4-encrypt-tun"
2707     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2708
2709     def setUp(self):
2710         super(TestIpsec4TunProtectTunDrop, self).setUp()
2711
2712         self.tun_if = self.pg0
2713
2714     def tearDown(self):
2715         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2716
2717     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2718         return [
2719             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2720             / sa.encrypt(
2721                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2722                 / IP(src=src, dst=dst)
2723                 / UDP(sport=1144, dport=2233)
2724                 / Raw(b"X" * payload_size)
2725             )
2726             for i in range(count)
2727         ]
2728
2729     def test_tun_drop_44(self):
2730         """IPSEC tunnel protect bogus tunnel header"""
2731
2732         p = self.ipv4_params
2733
2734         self.config_network(p)
2735         self.config_sa_tun(p)
2736         self.config_protect(p)
2737
2738         tx = self.gen_encrypt_pkts(
2739             p,
2740             p.scapy_tun_sa,
2741             self.tun_if,
2742             src=p.remote_tun_if_host,
2743             dst=self.pg1.remote_ip4,
2744             count=63,
2745         )
2746         self.send_and_assert_no_replies(self.tun_if, tx)
2747
2748         # teardown
2749         self.unconfig_protect(p)
2750         self.unconfig_sa(p)
2751         self.unconfig_network(p)
2752
2753
2754 @tag_fixme_vpp_workers
2755 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2756     """IPsec IPv6 Tunnel protect - transport mode"""
2757
2758     encryption_type = ESP
2759     tun6_encrypt_node_name = "esp6-encrypt-tun"
2760     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2761
2762     def setUp(self):
2763         super(TestIpsec6TunProtect, self).setUp()
2764
2765         self.tun_if = self.pg0
2766
2767     def tearDown(self):
2768         super(TestIpsec6TunProtect, self).tearDown()
2769
2770     def test_tun_66(self):
2771         """IPSEC tunnel protect 6o6"""
2772
2773         p = self.ipv6_params
2774
2775         self.config_network(p)
2776         self.config_sa_tra(p)
2777         self.config_protect(p)
2778
2779         self.verify_tun_66(p, count=127)
2780         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2781         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2782
2783         # rekey - create new SAs and update the tunnel protection
2784         np = copy.copy(p)
2785         np.crypt_key = b"X" + p.crypt_key[1:]
2786         np.scapy_tun_spi += 100
2787         np.scapy_tun_sa_id += 1
2788         np.vpp_tun_spi += 100
2789         np.vpp_tun_sa_id += 1
2790         np.tun_if.local_spi = p.vpp_tun_spi
2791         np.tun_if.remote_spi = p.scapy_tun_spi
2792
2793         self.config_sa_tra(np)
2794         self.config_protect(np)
2795         self.unconfig_sa(p)
2796
2797         self.verify_tun_66(np, count=127)
2798         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2799         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2800
2801         # bounce the interface state
2802         p.tun_if.admin_down()
2803         self.verify_drop_tun_66(np, count=127)
2804         node = "/err/ipsec6-tun-input/disabled"
2805         self.assertEqual(127, self.statistics.get_err_counter(node))
2806         p.tun_if.admin_up()
2807         self.verify_tun_66(np, count=127)
2808
2809         # 3 phase rekey
2810         #  1) add two input SAs [old, new]
2811         #  2) swap output SA to [new]
2812         #  3) use only [new] input SA
2813         np3 = copy.copy(np)
2814         np3.crypt_key = b"Z" + p.crypt_key[1:]
2815         np3.scapy_tun_spi += 100
2816         np3.scapy_tun_sa_id += 1
2817         np3.vpp_tun_spi += 100
2818         np3.vpp_tun_sa_id += 1
2819         np3.tun_if.local_spi = p.vpp_tun_spi
2820         np3.tun_if.remote_spi = p.scapy_tun_spi
2821
2822         self.config_sa_tra(np3)
2823
2824         # step 1;
2825         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2826         self.verify_tun_66(np, np, count=127)
2827         self.verify_tun_66(np3, np, count=127)
2828
2829         # step 2;
2830         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2831         self.verify_tun_66(np, np3, count=127)
2832         self.verify_tun_66(np3, np3, count=127)
2833
2834         # step 1;
2835         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2836         self.verify_tun_66(np3, np3, count=127)
2837         self.verify_drop_tun_rx_66(np, count=127)
2838
2839         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2840         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2841         self.unconfig_sa(np)
2842
2843         # teardown
2844         self.unconfig_protect(np3)
2845         self.unconfig_sa(np3)
2846         self.unconfig_network(p)
2847
2848     def test_tun_46(self):
2849         """IPSEC tunnel protect 4o6"""
2850
2851         p = self.ipv6_params
2852
2853         self.config_network(p)
2854         self.config_sa_tra(p)
2855         self.config_protect(p)
2856
2857         self.verify_tun_46(p, count=127)
2858         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2859         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2860
2861         # teardown
2862         self.unconfig_protect(p)
2863         self.unconfig_sa(p)
2864         self.unconfig_network(p)
2865
2866
2867 @tag_fixme_vpp_workers
2868 class TestIpsec6TunProtectTfc(TemplateIpsec6TunTfc, TestIpsec6TunProtect):
2869     """IPsec IPv6 Tunnel protect with TFC - transport mode"""
2870
2871
2872 @tag_fixme_vpp_workers
2873 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2874     """IPsec IPv6 Tunnel protect - tunnel mode"""
2875
2876     encryption_type = ESP
2877     tun6_encrypt_node_name = "esp6-encrypt-tun"
2878     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2879
2880     def setUp(self):
2881         super(TestIpsec6TunProtectTun, self).setUp()
2882
2883         self.tun_if = self.pg0
2884
2885     def tearDown(self):
2886         super(TestIpsec6TunProtectTun, self).tearDown()
2887
2888     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2889         return [
2890             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2891             / sa.encrypt(
2892                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2893                 / IPv6(src=src, dst=dst)
2894                 / UDP(sport=1166, dport=2233)
2895                 / Raw(b"X" * payload_size)
2896             )
2897             for i in range(count)
2898         ]
2899
2900     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2901         return [
2902             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2903             / IPv6(src=src, dst=dst)
2904             / UDP(sport=1166, dport=2233)
2905             / Raw(b"X" * payload_size)
2906             for i in range(count)
2907         ]
2908
2909     def verify_decrypted6(self, p, rxs):
2910         for rx in rxs:
2911             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2912             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2913             self.assert_packet_checksums_valid(rx)
2914
2915     def verify_encrypted6(self, p, sa, rxs):
2916         for rx in rxs:
2917             try:
2918                 pkt = sa.decrypt(rx[IPv6])
2919                 if not pkt.haslayer(IPv6):
2920                     pkt = IPv6(pkt[Raw].load)
2921                 self.assert_packet_checksums_valid(pkt)
2922                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2923                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2924                 inner = pkt[IPv6].payload
2925                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2926
2927             except (IndexError, AssertionError):
2928                 self.logger.debug(ppp("Unexpected packet:", rx))
2929                 try:
2930                     self.logger.debug(ppp("Decrypted packet:", pkt))
2931                 except:
2932                     pass
2933                 raise
2934
2935     def test_tun_66(self):
2936         """IPSEC tunnel protect"""
2937
2938         p = self.ipv6_params
2939
2940         self.config_network(p)
2941         self.config_sa_tun(p)
2942         self.config_protect(p)
2943
2944         self.verify_tun_66(p, count=127)
2945
2946         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2947         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2948
2949         # rekey - create new SAs and update the tunnel protection
2950         np = copy.copy(p)
2951         np.crypt_key = b"X" + p.crypt_key[1:]
2952         np.scapy_tun_spi += 100
2953         np.scapy_tun_sa_id += 1
2954         np.vpp_tun_spi += 100
2955         np.vpp_tun_sa_id += 1
2956         np.tun_if.local_spi = p.vpp_tun_spi
2957         np.tun_if.remote_spi = p.scapy_tun_spi
2958
2959         self.config_sa_tun(np)
2960         self.config_protect(np)
2961         self.unconfig_sa(p)
2962
2963         self.verify_tun_66(np, count=127)
2964         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2965         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2966
2967         # teardown
2968         self.unconfig_protect(np)
2969         self.unconfig_sa(np)
2970         self.unconfig_network(p)
2971
2972
2973 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2974     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2975
2976     encryption_type = ESP
2977     tun6_encrypt_node_name = "esp6-encrypt-tun"
2978     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2979
2980     def setUp(self):
2981         super(TestIpsec6TunProtectTunDrop, self).setUp()
2982
2983         self.tun_if = self.pg0
2984
2985     def tearDown(self):
2986         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2987
2988     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2989         # the IP destination of the revelaed packet does not match
2990         # that assigned to the tunnel
2991         return [
2992             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2993             / sa.encrypt(
2994                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2995                 / IPv6(src=src, dst=dst)
2996                 / UDP(sport=1144, dport=2233)
2997                 / Raw(b"X" * payload_size)
2998             )
2999             for i in range(count)
3000         ]
3001
3002     def test_tun_drop_66(self):
3003         """IPSEC 6 tunnel protect bogus tunnel header"""
3004
3005         p = self.ipv6_params
3006
3007         self.config_network(p)
3008         self.config_sa_tun(p)
3009         self.config_protect(p)
3010
3011         tx = self.gen_encrypt_pkts6(
3012             p,
3013             p.scapy_tun_sa,
3014             self.tun_if,
3015             src=p.remote_tun_if_host,
3016             dst=self.pg1.remote_ip6,
3017             count=63,
3018         )
3019         self.send_and_assert_no_replies(self.tun_if, tx)
3020
3021         self.unconfig_protect(p)
3022         self.unconfig_sa(p)
3023         self.unconfig_network(p)
3024
3025
3026 class TemplateIpsecItf4(object):
3027     """IPsec Interface IPv4"""
3028
3029     encryption_type = ESP
3030     tun4_encrypt_node_name = "esp4-encrypt-tun"
3031     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3032     tun4_input_node = "ipsec4-tun-input"
3033
3034     def config_sa_tun(self, p, src, dst):
3035         config_tun_params(p, self.encryption_type, None, src, dst)
3036
3037         p.tun_sa_out = VppIpsecSA(
3038             self,
3039             p.vpp_tun_sa_id,
3040             p.vpp_tun_spi,
3041             p.auth_algo_vpp_id,
3042             p.auth_key,
3043             p.crypt_algo_vpp_id,
3044             p.crypt_key,
3045             self.vpp_esp_protocol,
3046             src,
3047             dst,
3048             flags=p.flags,
3049         )
3050         p.tun_sa_out.add_vpp_config()
3051
3052         p.tun_sa_in = VppIpsecSA(
3053             self,
3054             p.scapy_tun_sa_id,
3055             p.scapy_tun_spi,
3056             p.auth_algo_vpp_id,
3057             p.auth_key,
3058             p.crypt_algo_vpp_id,
3059             p.crypt_key,
3060             self.vpp_esp_protocol,
3061             dst,
3062             src,
3063             flags=p.flags
3064             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
3065         )
3066         p.tun_sa_in.add_vpp_config()
3067
3068     def config_protect(self, p):
3069         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3070         p.tun_protect.add_vpp_config()
3071
3072     def config_network(self, p, instance=0xFFFFFFFF):
3073         p.tun_if = VppIpsecInterface(self, instance=instance)
3074
3075         p.tun_if.add_vpp_config()
3076         p.tun_if.admin_up()
3077         p.tun_if.config_ip4()
3078         p.tun_if.config_ip6()
3079
3080         p.route = VppIpRoute(
3081             self,
3082             p.remote_tun_if_host,
3083             32,
3084             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3085         )
3086         p.route.add_vpp_config()
3087         r = VppIpRoute(
3088             self,
3089             p.remote_tun_if_host6,
3090             128,
3091             [
3092                 VppRoutePath(
3093                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3094                 )
3095             ],
3096         )
3097         r.add_vpp_config()
3098
3099     def unconfig_network(self, p):
3100         p.route.remove_vpp_config()
3101         p.tun_if.remove_vpp_config()
3102
3103     def unconfig_protect(self, p):
3104         p.tun_protect.remove_vpp_config()
3105
3106     def unconfig_sa(self, p):
3107         p.tun_sa_out.remove_vpp_config()
3108         p.tun_sa_in.remove_vpp_config()
3109
3110
3111 @tag_fixme_vpp_workers
3112 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3113     """IPsec Interface IPv4"""
3114
3115     def setUp(self):
3116         super(TestIpsecItf4, self).setUp()
3117
3118         self.tun_if = self.pg0
3119
3120     def tearDown(self):
3121         super(TestIpsecItf4, self).tearDown()
3122
3123     def test_tun_instance_44(self):
3124         p = self.ipv4_params
3125         self.config_network(p, instance=3)
3126
3127         with self.assertRaises(CliFailedCommandError):
3128             self.vapi.cli("show interface ipsec0")
3129
3130         output = self.vapi.cli("show interface ipsec3")
3131         self.assertTrue("unknown" not in output)
3132
3133         self.unconfig_network(p)
3134
3135     def test_tun_44(self):
3136         """IPSEC interface IPv4"""
3137
3138         n_pkts = 127
3139         p = self.ipv4_params
3140
3141         self.config_network(p)
3142         config_tun_params(
3143             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3144         )
3145         self.verify_tun_dropped_44(p, count=n_pkts)
3146         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3147         self.config_protect(p)
3148
3149         self.verify_tun_44(p, count=n_pkts)
3150         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3151         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3152
3153         p.tun_if.admin_down()
3154         self.verify_tun_dropped_44(p, count=n_pkts)
3155         p.tun_if.admin_up()
3156         self.verify_tun_44(p, count=n_pkts)
3157
3158         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3159         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3160
3161         # it's a v6 packet when its encrypted
3162         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3163
3164         self.verify_tun_64(p, count=n_pkts)
3165         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3166         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3167
3168         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3169
3170         # update the SA tunnel
3171         config_tun_params(
3172             p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3173         )
3174         p.tun_sa_in.update_vpp_config(
3175             is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3176         )
3177         p.tun_sa_out.update_vpp_config(
3178             is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3179         )
3180         self.verify_tun_44(p, count=n_pkts)
3181         self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3182         self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3183
3184         self.vapi.cli("clear interfaces")
3185
3186         # rekey - create new SAs and update the tunnel protection
3187         np = copy.copy(p)
3188         np.crypt_key = b"X" + p.crypt_key[1:]
3189         np.scapy_tun_spi += 100
3190         np.scapy_tun_sa_id += 1
3191         np.vpp_tun_spi += 100
3192         np.vpp_tun_sa_id += 1
3193         np.tun_if.local_spi = p.vpp_tun_spi
3194         np.tun_if.remote_spi = p.scapy_tun_spi
3195
3196         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3197         self.config_protect(np)
3198         self.unconfig_sa(p)
3199
3200         self.verify_tun_44(np, count=n_pkts)
3201         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3202         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3203
3204         # teardown
3205         self.unconfig_protect(np)
3206         self.unconfig_sa(np)
3207         self.unconfig_network(p)
3208
3209     def test_tun_44_null(self):
3210         """IPSEC interface IPv4 NULL auth/crypto"""
3211
3212         n_pkts = 127
3213         p = copy.copy(self.ipv4_params)
3214
3215         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3216         p.crypt_algo_vpp_id = (
3217             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3218         )
3219         p.crypt_algo = "NULL"
3220         p.auth_algo = "NULL"
3221
3222         self.config_network(p)
3223         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3224         self.config_protect(p)
3225
3226         self.logger.info(self.vapi.cli("sh ipsec sa"))
3227         self.verify_tun_44(p, count=n_pkts)
3228
3229         # teardown
3230         self.unconfig_protect(p)
3231         self.unconfig_sa(p)
3232         self.unconfig_network(p)
3233
3234     def test_tun_44_police(self):
3235         """IPSEC interface IPv4 with input policer"""
3236         n_pkts = 127
3237         p = self.ipv4_params
3238
3239         self.config_network(p)
3240         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3241         self.config_protect(p)
3242
3243         action_tx = PolicerAction(
3244             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3245         )
3246         policer = VppPolicer(
3247             self,
3248             "pol1",
3249             80,
3250             0,
3251             1000,
3252             0,
3253             conform_action=action_tx,
3254             exceed_action=action_tx,
3255             violate_action=action_tx,
3256         )
3257         policer.add_vpp_config()
3258
3259         # Start policing on tun
3260         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3261
3262         self.verify_tun_44(p, count=n_pkts)
3263         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3264         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3265
3266         stats = policer.get_stats()
3267
3268         # Single rate, 2 colour policer - expect conform, violate but no exceed
3269         self.assertGreater(stats["conform_packets"], 0)
3270         self.assertEqual(stats["exceed_packets"], 0)
3271         self.assertGreater(stats["violate_packets"], 0)
3272
3273         # Stop policing on tun
3274         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3275         self.verify_tun_44(p, count=n_pkts)
3276
3277         # No new policer stats
3278         statsnew = policer.get_stats()
3279         self.assertEqual(stats, statsnew)
3280
3281         # teardown
3282         policer.remove_vpp_config()
3283         self.unconfig_protect(p)
3284         self.unconfig_sa(p)
3285         self.unconfig_network(p)
3286
3287
3288 @tag_fixme_vpp_workers
3289 class TestIpsecItf4Tfc(TemplateIpsec4TunTfc, TestIpsecItf4):
3290     """IPsec Interface IPv4 with TFC"""
3291
3292
3293 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3294     """IPsec Interface MPLSoIPv4"""
3295
3296     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3297
3298     def setUp(self):
3299         super(TestIpsecItf4MPLS, self).setUp()
3300
3301         self.tun_if = self.pg0
3302
3303     def tearDown(self):
3304         super(TestIpsecItf4MPLS, self).tearDown()
3305
3306     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3307         return [
3308             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3309             / sa.encrypt(
3310                 MPLS(label=44, ttl=3)
3311                 / IP(src=src, dst=dst)
3312                 / UDP(sport=1166, dport=2233)
3313                 / Raw(b"X" * payload_size)
3314             )
3315             for i in range(count)
3316         ]
3317
3318     def verify_encrypted(self, p, sa, rxs):
3319         for rx in rxs:
3320             try:
3321                 pkt = sa.decrypt(rx[IP])
3322                 if not pkt.haslayer(IP):
3323                     pkt = IP(pkt[Raw].load)
3324                 self.assert_packet_checksums_valid(pkt)
3325                 self.assert_equal(pkt[MPLS].label, 44)
3326                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3327             except (IndexError, AssertionError):
3328                 self.logger.debug(ppp("Unexpected packet:", rx))
3329                 try:
3330                     self.logger.debug(ppp("Decrypted packet:", pkt))
3331                 except:
3332                     pass
3333                 raise
3334
3335     def test_tun_mpls_o_ip4(self):
3336         """IPSEC interface MPLS over IPv4"""
3337
3338         n_pkts = 127
3339         p = self.ipv4_params
3340         f = FibPathProto
3341
3342         tbl = VppMplsTable(self, 0)
3343         tbl.add_vpp_config()
3344
3345         self.config_network(p)
3346         # deag MPLS routes from the tunnel
3347         r4 = VppMplsRoute(
3348             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3349         ).add_vpp_config()
3350         p.route.modify(
3351             [
3352                 VppRoutePath(
3353                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3354                 )
3355             ]
3356         )
3357         p.tun_if.enable_mpls()
3358
3359         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3360         self.config_protect(p)
3361
3362         self.verify_tun_44(p, count=n_pkts)
3363
3364         # cleanup
3365         p.tun_if.disable_mpls()
3366         self.unconfig_protect(p)
3367         self.unconfig_sa(p)
3368         self.unconfig_network(p)
3369
3370
3371 class TemplateIpsecItf6(object):
3372     """IPsec Interface IPv6"""
3373
3374     encryption_type = ESP
3375     tun6_encrypt_node_name = "esp6-encrypt-tun"
3376     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3377     tun6_input_node = "ipsec6-tun-input"
3378
3379     def config_sa_tun(self, p, src, dst):
3380         config_tun_params(p, self.encryption_type, None, src, dst)
3381
3382         if not hasattr(p, "tun_flags"):
3383             p.tun_flags = None
3384         if not hasattr(p, "hop_limit"):
3385             p.hop_limit = 255
3386
3387         p.tun_sa_out = VppIpsecSA(
3388             self,
3389             p.vpp_tun_sa_id,
3390             p.vpp_tun_spi,
3391             p.auth_algo_vpp_id,
3392             p.auth_key,
3393             p.crypt_algo_vpp_id,
3394             p.crypt_key,
3395             self.vpp_esp_protocol,
3396             src,
3397             dst,
3398             flags=p.flags,
3399             tun_flags=p.tun_flags,
3400             hop_limit=p.hop_limit,
3401         )
3402         p.tun_sa_out.add_vpp_config()
3403
3404         p.tun_sa_in = VppIpsecSA(
3405             self,
3406             p.scapy_tun_sa_id,
3407             p.scapy_tun_spi,
3408             p.auth_algo_vpp_id,
3409             p.auth_key,
3410             p.crypt_algo_vpp_id,
3411             p.crypt_key,
3412             self.vpp_esp_protocol,
3413             dst,
3414             src,
3415             flags=p.flags,
3416         )
3417         p.tun_sa_in.add_vpp_config()
3418
3419     def config_protect(self, p):
3420         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3421         p.tun_protect.add_vpp_config()
3422
3423     def config_network(self, p):
3424         p.tun_if = VppIpsecInterface(self)
3425
3426         p.tun_if.add_vpp_config()
3427         p.tun_if.admin_up()
3428         p.tun_if.config_ip4()
3429         p.tun_if.config_ip6()
3430
3431         r = VppIpRoute(
3432             self,
3433             p.remote_tun_if_host4,
3434             32,
3435             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3436         )
3437         r.add_vpp_config()
3438
3439         p.route = VppIpRoute(
3440             self,
3441             p.remote_tun_if_host,
3442             128,
3443             [
3444                 VppRoutePath(
3445                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3446                 )
3447             ],
3448         )
3449         p.route.add_vpp_config()
3450
3451     def unconfig_network(self, p):
3452         p.route.remove_vpp_config()
3453         p.tun_if.remove_vpp_config()
3454
3455     def unconfig_protect(self, p):
3456         p.tun_protect.remove_vpp_config()
3457
3458     def unconfig_sa(self, p):
3459         p.tun_sa_out.remove_vpp_config()
3460         p.tun_sa_in.remove_vpp_config()
3461
3462
3463 @tag_fixme_vpp_workers
3464 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3465     """IPsec Interface IPv6"""
3466
3467     def setUp(self):
3468         super(TestIpsecItf6, self).setUp()
3469
3470         self.tun_if = self.pg0
3471
3472     def tearDown(self):
3473         super(TestIpsecItf6, self).tearDown()
3474
3475     def test_tun_66(self):
3476         """IPSEC interface IPv6"""
3477
3478         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3479         n_pkts = 127
3480         p = self.ipv6_params
3481         p.inner_hop_limit = 24
3482         p.outer_hop_limit = 23
3483         p.outer_flow_label = 243224
3484         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3485
3486         self.config_network(p)
3487         config_tun_params(
3488             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3489         )
3490         self.verify_drop_tun_66(p, count=n_pkts)
3491         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3492         self.config_protect(p)
3493
3494         self.verify_tun_66(p, count=n_pkts)
3495         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3496         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3497
3498         p.tun_if.admin_down()
3499         self.verify_drop_tun_66(p, count=n_pkts)
3500         p.tun_if.admin_up()
3501         self.verify_tun_66(p, count=n_pkts)
3502
3503         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3504         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3505
3506         # it's a v4 packet when its encrypted
3507         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3508
3509         self.verify_tun_46(p, count=n_pkts)
3510         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3511         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3512
3513         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3514
3515         self.vapi.cli("clear interfaces")
3516
3517         # rekey - create new SAs and update the tunnel protection
3518         np = copy.copy(p)
3519         np.crypt_key = b"X" + p.crypt_key[1:]
3520         np.scapy_tun_spi += 100
3521         np.scapy_tun_sa_id += 1
3522         np.vpp_tun_spi += 100
3523         np.vpp_tun_sa_id += 1
3524         np.tun_if.local_spi = p.vpp_tun_spi
3525         np.tun_if.remote_spi = p.scapy_tun_spi
3526         np.inner_hop_limit = 24
3527         np.outer_hop_limit = 128
3528         np.inner_flow_label = 0xABCDE
3529         np.outer_flow_label = 0xABCDE
3530         np.hop_limit = 128
3531         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3532
3533         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3534         self.config_protect(np)
3535         self.unconfig_sa(p)
3536
3537         self.verify_tun_66(np, count=n_pkts)
3538         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3539         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3540
3541         # teardown
3542         self.unconfig_protect(np)
3543         self.unconfig_sa(np)
3544         self.unconfig_network(p)
3545
3546     def test_tun_66_police(self):
3547         """IPSEC interface IPv6 with input policer"""
3548         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3549         n_pkts = 127
3550         p = self.ipv6_params
3551         p.inner_hop_limit = 24
3552         p.outer_hop_limit = 23
3553         p.outer_flow_label = 243224
3554         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3555
3556         self.config_network(p)
3557         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3558         self.config_protect(p)
3559
3560         action_tx = PolicerAction(
3561             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3562         )
3563         policer = VppPolicer(
3564             self,
3565             "pol1",
3566             80,
3567             0,
3568             1000,
3569             0,
3570             conform_action=action_tx,
3571             exceed_action=action_tx,
3572             violate_action=action_tx,
3573         )
3574         policer.add_vpp_config()
3575
3576         # Start policing on tun
3577         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3578
3579         self.verify_tun_66(p, count=n_pkts)
3580         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3581         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3582
3583         stats = policer.get_stats()
3584
3585         # Single rate, 2 colour policer - expect conform, violate but no exceed
3586         self.assertGreater(stats["conform_packets"], 0)
3587         self.assertEqual(stats["exceed_packets"], 0)
3588         self.assertGreater(stats["violate_packets"], 0)
3589
3590         # Stop policing on tun
3591         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3592         self.verify_tun_66(p, count=n_pkts)
3593
3594         # No new policer stats
3595         statsnew = policer.get_stats()
3596         self.assertEqual(stats, statsnew)
3597
3598         # teardown
3599         policer.remove_vpp_config()
3600         self.unconfig_protect(p)
3601         self.unconfig_sa(p)
3602         self.unconfig_network(p)
3603
3604
3605 @tag_fixme_vpp_workers
3606 class TestIpsecItf6Tfc(TemplateIpsec6TunTfc, TestIpsecItf6):
3607     """IPsec Interface IPv6 with TFC"""
3608
3609
3610 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3611     """Ipsec P2MP ESP v4 tests"""
3612
3613     tun4_encrypt_node_name = "esp4-encrypt-tun"
3614     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3615     encryption_type = ESP
3616
3617     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3618         return [
3619             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3620             / sa.encrypt(
3621                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3622                 / UDP(sport=1144, dport=2233)
3623                 / Raw(b"X" * payload_size)
3624             )
3625             for i in range(count)
3626         ]
3627
3628     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3629         return [
3630             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3631             / IP(src="1.1.1.1", dst=dst)
3632             / UDP(sport=1144, dport=2233)
3633             / Raw(b"X" * payload_size)
3634             for i in range(count)
3635         ]
3636
3637     def verify_decrypted(self, p, rxs):
3638         for rx in rxs:
3639             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3640             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3641
3642     def verify_encrypted(self, p, sa, rxs):
3643         for rx in rxs:
3644             try:
3645                 self.assertEqual(
3646                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3647                 )
3648                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3649                 pkt = sa.decrypt(rx[IP])
3650                 if not pkt.haslayer(IP):
3651                     pkt = IP(pkt[Raw].load)
3652                 self.assert_packet_checksums_valid(pkt)
3653                 e = pkt[IP]
3654                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3655             except (IndexError, AssertionError):
3656                 self.logger.debug(ppp("Unexpected packet:", rx))
3657                 try:
3658                     self.logger.debug(ppp("Decrypted packet:", pkt))
3659                 except:
3660                     pass
3661                 raise
3662
3663     def setUp(self):
3664         super(TestIpsecMIfEsp4, self).setUp()
3665
3666         N_NHS = 16
3667         self.tun_if = self.pg0
3668         p = self.ipv4_params
3669         p.tun_if = VppIpsecInterface(
3670             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3671         )
3672         p.tun_if.add_vpp_config()
3673         p.tun_if.admin_up()
3674         p.tun_if.config_ip4()
3675         p.tun_if.unconfig_ip4()
3676         p.tun_if.config_ip4()
3677         p.tun_if.generate_remote_hosts(N_NHS)
3678         self.pg0.generate_remote_hosts(N_NHS)
3679         self.pg0.configure_ipv4_neighbors()
3680
3681         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3682         a = VppAcl(self, [r_all]).add_vpp_config()
3683
3684         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3685         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3686
3687         # setup some SAs for several next-hops on the interface
3688         self.multi_params = []
3689
3690         for ii in range(N_NHS):
3691             p = copy.copy(self.ipv4_params)
3692
3693             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3694             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3695             p.scapy_tun_spi = p.scapy_tun_spi + ii
3696             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3697             p.vpp_tun_spi = p.vpp_tun_spi + ii
3698
3699             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3700             p.scapy_tra_spi = p.scapy_tra_spi + ii
3701             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3702             p.vpp_tra_spi = p.vpp_tra_spi + ii
3703             p.hop_limit = ii + 10
3704             p.tun_sa_out = VppIpsecSA(
3705                 self,
3706                 p.vpp_tun_sa_id,
3707                 p.vpp_tun_spi,
3708                 p.auth_algo_vpp_id,
3709                 p.auth_key,
3710                 p.crypt_algo_vpp_id,
3711                 p.crypt_key,
3712                 self.vpp_esp_protocol,
3713                 self.pg0.local_ip4,
3714                 self.pg0.remote_hosts[ii].ip4,
3715                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3716                 hop_limit=p.hop_limit,
3717             )
3718             p.tun_sa_out.add_vpp_config()
3719
3720             p.tun_sa_in = VppIpsecSA(
3721                 self,
3722                 p.scapy_tun_sa_id,
3723                 p.scapy_tun_spi,
3724                 p.auth_algo_vpp_id,
3725                 p.auth_key,
3726                 p.crypt_algo_vpp_id,
3727                 p.crypt_key,
3728                 self.vpp_esp_protocol,
3729                 self.pg0.remote_hosts[ii].ip4,
3730                 self.pg0.local_ip4,
3731                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3732                 hop_limit=p.hop_limit,
3733             )
3734             p.tun_sa_in.add_vpp_config()
3735
3736             p.tun_protect = VppIpsecTunProtect(
3737                 self,
3738                 p.tun_if,
3739                 p.tun_sa_out,
3740                 [p.tun_sa_in],
3741                 nh=p.tun_if.remote_hosts[ii].ip4,
3742             )
3743             p.tun_protect.add_vpp_config()
3744             config_tun_params(
3745                 p,
3746                 self.encryption_type,
3747                 None,
3748                 self.pg0.local_ip4,
3749                 self.pg0.remote_hosts[ii].ip4,
3750             )
3751             self.multi_params.append(p)
3752
3753             p.via_tun_route = VppIpRoute(
3754                 self,
3755                 p.remote_tun_if_host,
3756                 32,
3757                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3758             ).add_vpp_config()
3759
3760             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3761
3762     def tearDown(self):
3763         p = self.ipv4_params
3764         p.tun_if.unconfig_ip4()
3765         super(TestIpsecMIfEsp4, self).tearDown()
3766
3767     def test_tun_44(self):
3768         """P2MP IPSEC 44"""
3769         N_PKTS = 63
3770         for p in self.multi_params:
3771             self.verify_tun_44(p, count=N_PKTS)
3772
3773         # remove one tunnel protect, the rest should still work
3774         self.multi_params[0].tun_protect.remove_vpp_config()
3775         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3776         self.multi_params[0].via_tun_route.remove_vpp_config()
3777         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3778
3779         for p in self.multi_params[1:]:
3780             self.verify_tun_44(p, count=N_PKTS)
3781
3782         self.multi_params[0].tun_protect.add_vpp_config()
3783         self.multi_params[0].via_tun_route.add_vpp_config()
3784
3785         for p in self.multi_params:
3786             self.verify_tun_44(p, count=N_PKTS)
3787
3788
3789 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3790     """IPsec Interface MPLSoIPv6"""
3791
3792     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3793
3794     def setUp(self):
3795         super(TestIpsecItf6MPLS, self).setUp()
3796
3797         self.tun_if = self.pg0
3798
3799     def tearDown(self):
3800         super(TestIpsecItf6MPLS, self).tearDown()
3801
3802     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3803         return [
3804             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3805             / sa.encrypt(
3806                 MPLS(label=66, ttl=3)
3807                 / IPv6(src=src, dst=dst)
3808                 / UDP(sport=1166, dport=2233)
3809                 / Raw(b"X" * payload_size)
3810             )
3811             for i in range(count)
3812         ]
3813
3814     def verify_encrypted6(self, p, sa, rxs):
3815         for rx in rxs:
3816             try:
3817                 pkt = sa.decrypt(rx[IPv6])
3818                 if not pkt.haslayer(IPv6):
3819                     pkt = IP(pkt[Raw].load)
3820                 self.assert_packet_checksums_valid(pkt)
3821                 self.assert_equal(pkt[MPLS].label, 66)
3822                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3823             except (IndexError, AssertionError):
3824                 self.logger.debug(ppp("Unexpected packet:", rx))
3825                 try:
3826                     self.logger.debug(ppp("Decrypted packet:", pkt))
3827                 except:
3828                     pass
3829                 raise
3830
3831     def test_tun_mpls_o_ip6(self):
3832         """IPSEC interface MPLS over IPv6"""
3833
3834         n_pkts = 127
3835         p = self.ipv6_params
3836         f = FibPathProto
3837
3838         tbl = VppMplsTable(self, 0)
3839         tbl.add_vpp_config()
3840
3841         self.config_network(p)
3842         # deag MPLS routes from the tunnel
3843         r6 = VppMplsRoute(
3844             self,
3845             66,
3846             1,
3847             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3848             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3849         ).add_vpp_config()
3850         p.route.modify(
3851             [
3852                 VppRoutePath(
3853                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3854                 )
3855             ]
3856         )
3857         p.tun_if.enable_mpls()
3858
3859         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3860         self.config_protect(p)
3861
3862         self.verify_tun_66(p, count=n_pkts)
3863
3864         # cleanup
3865         p.tun_if.disable_mpls()
3866         self.unconfig_protect(p)
3867         self.unconfig_sa(p)
3868         self.unconfig_network(p)
3869
3870
3871 if __name__ == "__main__":
3872     unittest.main(testRunner=VppTestRunner)