tests: refactor asf framework code
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from asfframework import VppTestRunner, tag_fixme_vpp_workers
12 from template_ipsec import (
13     TemplateIpsec,
14     IpsecTun4Tests,
15     IpsecTun6Tests,
16     IpsecTun4,
17     IpsecTun6,
18     IpsecTcpTests,
19     mk_scapy_crypt_key,
20     IpsecTun6HandoffTests,
21     IpsecTun4HandoffTests,
22     config_tun_params,
23 )
24 from vpp_gre_interface import VppGreInterface
25 from vpp_ipip_tun_interface import VppIpIpTunInterface
26 from vpp_ip_route import (
27     VppIpRoute,
28     VppRoutePath,
29     DpoProto,
30     VppMplsLabel,
31     VppMplsTable,
32     VppMplsRoute,
33     FibPathProto,
34 )
35 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
36 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
37 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
38 from vpp_teib import VppTeib
39 from util import ppp
40 from vpp_papi import VppEnum
41 from vpp_papi_provider import CliFailedCommandError
42 from vpp_acl import AclRule, VppAcl, VppAclInterface
43 from vpp_policer import PolicerAction, VppPolicer, Dir
44
45
46 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
47     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
48     esn_en = bool(
49         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
50     )
51     crypt_key = mk_scapy_crypt_key(p)
52     if tun_if:
53         p.tun_dst = tun_if.remote_ip
54         p.tun_src = tun_if.local_ip
55     else:
56         p.tun_dst = dst
57         p.tun_src = src
58
59     if p.nat_header:
60         is_default_port = p.nat_header.dport == 4500
61     else:
62         is_default_port = True
63
64     if is_default_port:
65         outbound_nat_header = p.nat_header
66     else:
67         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
68         bind_layers(UDP, ESP, dport=p.nat_header.dport)
69
70     p.scapy_tun_sa = SecurityAssociation(
71         encryption_type,
72         spi=p.scapy_tun_spi,
73         crypt_algo=p.crypt_algo,
74         crypt_key=crypt_key,
75         auth_algo=p.auth_algo,
76         auth_key=p.auth_key,
77         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
78         nat_t_header=outbound_nat_header,
79         esn_en=esn_en,
80     )
81     p.vpp_tun_sa = SecurityAssociation(
82         encryption_type,
83         spi=p.vpp_tun_spi,
84         crypt_algo=p.crypt_algo,
85         crypt_key=crypt_key,
86         auth_algo=p.auth_algo,
87         auth_key=p.auth_key,
88         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
89         nat_t_header=p.nat_header,
90         esn_en=esn_en,
91     )
92
93
94 def config_tra_params(p, encryption_type, tun_if):
95     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
96     esn_en = bool(
97         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
98     )
99     crypt_key = mk_scapy_crypt_key(p)
100     p.tun_dst = tun_if.remote_ip
101     p.tun_src = tun_if.local_ip
102
103     if p.nat_header:
104         is_default_port = p.nat_header.dport == 4500
105     else:
106         is_default_port = True
107
108     if is_default_port:
109         outbound_nat_header = p.nat_header
110     else:
111         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
112         bind_layers(UDP, ESP, dport=p.nat_header.dport)
113
114     p.scapy_tun_sa = SecurityAssociation(
115         encryption_type,
116         spi=p.scapy_tun_spi,
117         crypt_algo=p.crypt_algo,
118         crypt_key=crypt_key,
119         auth_algo=p.auth_algo,
120         auth_key=p.auth_key,
121         esn_en=esn_en,
122         nat_t_header=outbound_nat_header,
123     )
124     p.vpp_tun_sa = SecurityAssociation(
125         encryption_type,
126         spi=p.vpp_tun_spi,
127         crypt_algo=p.crypt_algo,
128         crypt_key=crypt_key,
129         auth_algo=p.auth_algo,
130         auth_key=p.auth_key,
131         esn_en=esn_en,
132         nat_t_header=p.nat_header,
133     )
134
135
136 class TemplateIpsec4TunProtect(object):
137     """IPsec IPv4 Tunnel protect"""
138
139     encryption_type = ESP
140     tun4_encrypt_node_name = "esp4-encrypt-tun"
141     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
142     tun4_input_node = "ipsec4-tun-input"
143
144     def config_sa_tra(self, p):
145         config_tun_params(p, self.encryption_type, p.tun_if)
146
147         p.tun_sa_out = VppIpsecSA(
148             self,
149             p.vpp_tun_sa_id,
150             p.vpp_tun_spi,
151             p.auth_algo_vpp_id,
152             p.auth_key,
153             p.crypt_algo_vpp_id,
154             p.crypt_key,
155             self.vpp_esp_protocol,
156             flags=p.flags,
157         )
158         p.tun_sa_out.add_vpp_config()
159
160         p.tun_sa_in = VppIpsecSA(
161             self,
162             p.scapy_tun_sa_id,
163             p.scapy_tun_spi,
164             p.auth_algo_vpp_id,
165             p.auth_key,
166             p.crypt_algo_vpp_id,
167             p.crypt_key,
168             self.vpp_esp_protocol,
169             flags=p.flags,
170         )
171         p.tun_sa_in.add_vpp_config()
172
173     def config_sa_tun(self, p):
174         config_tun_params(p, self.encryption_type, p.tun_if)
175
176         p.tun_sa_out = VppIpsecSA(
177             self,
178             p.vpp_tun_sa_id,
179             p.vpp_tun_spi,
180             p.auth_algo_vpp_id,
181             p.auth_key,
182             p.crypt_algo_vpp_id,
183             p.crypt_key,
184             self.vpp_esp_protocol,
185             self.tun_if.local_addr[p.addr_type],
186             self.tun_if.remote_addr[p.addr_type],
187             flags=p.flags,
188         )
189         p.tun_sa_out.add_vpp_config()
190
191         p.tun_sa_in = VppIpsecSA(
192             self,
193             p.scapy_tun_sa_id,
194             p.scapy_tun_spi,
195             p.auth_algo_vpp_id,
196             p.auth_key,
197             p.crypt_algo_vpp_id,
198             p.crypt_key,
199             self.vpp_esp_protocol,
200             self.tun_if.remote_addr[p.addr_type],
201             self.tun_if.local_addr[p.addr_type],
202             flags=p.flags,
203         )
204         p.tun_sa_in.add_vpp_config()
205
206     def config_protect(self, p):
207         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
208         p.tun_protect.add_vpp_config()
209
210     def config_network(self, p):
211         if hasattr(p, "tun_dst"):
212             tun_dst = p.tun_dst
213         else:
214             tun_dst = self.pg0.remote_ip4
215         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
216         p.tun_if.add_vpp_config()
217         p.tun_if.admin_up()
218         p.tun_if.config_ip4()
219         p.tun_if.config_ip6()
220
221         p.route = VppIpRoute(
222             self,
223             p.remote_tun_if_host,
224             32,
225             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
226         )
227         p.route.add_vpp_config()
228         r = VppIpRoute(
229             self,
230             p.remote_tun_if_host6,
231             128,
232             [
233                 VppRoutePath(
234                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
235                 )
236             ],
237         )
238         r.add_vpp_config()
239
240     def unconfig_network(self, p):
241         p.route.remove_vpp_config()
242         p.tun_if.remove_vpp_config()
243
244     def unconfig_protect(self, p):
245         p.tun_protect.remove_vpp_config()
246
247     def unconfig_sa(self, p):
248         p.tun_sa_out.remove_vpp_config()
249         p.tun_sa_in.remove_vpp_config()
250
251
252 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
253     """IPsec tunnel interface tests"""
254
255     encryption_type = ESP
256
257     @classmethod
258     def setUpClass(cls):
259         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
260
261     @classmethod
262     def tearDownClass(cls):
263         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
264
265     def setUp(self):
266         super(TemplateIpsec4TunIfEsp, self).setUp()
267
268         self.tun_if = self.pg0
269
270         p = self.ipv4_params
271
272         self.config_network(p)
273         self.config_sa_tra(p)
274         self.config_protect(p)
275
276     def tearDown(self):
277         super(TemplateIpsec4TunIfEsp, self).tearDown()
278
279
280 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
281     """IPsec UDP tunnel interface tests"""
282
283     tun4_encrypt_node_name = "esp4-encrypt-tun"
284     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
285     encryption_type = ESP
286
287     @classmethod
288     def setUpClass(cls):
289         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
290
291     @classmethod
292     def tearDownClass(cls):
293         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
294
295     def verify_encrypted(self, p, sa, rxs):
296         for rx in rxs:
297             try:
298                 # ensure the UDP ports are correct before we decrypt
299                 # which strips them
300                 self.assertTrue(rx.haslayer(UDP))
301                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
302                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
303
304                 pkt = sa.decrypt(rx[IP])
305                 if not pkt.haslayer(IP):
306                     pkt = IP(pkt[Raw].load)
307
308                 self.assert_packet_checksums_valid(pkt)
309                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
310                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
311             except (IndexError, AssertionError):
312                 self.logger.debug(ppp("Unexpected packet:", rx))
313                 try:
314                     self.logger.debug(ppp("Decrypted packet:", pkt))
315                 except:
316                     pass
317                 raise
318
319     def config_sa_tra(self, p):
320         config_tun_params(p, self.encryption_type, p.tun_if)
321
322         p.tun_sa_out = VppIpsecSA(
323             self,
324             p.vpp_tun_sa_id,
325             p.vpp_tun_spi,
326             p.auth_algo_vpp_id,
327             p.auth_key,
328             p.crypt_algo_vpp_id,
329             p.crypt_key,
330             self.vpp_esp_protocol,
331             flags=p.flags,
332             udp_src=p.nat_header.sport,
333             udp_dst=p.nat_header.dport,
334         )
335         p.tun_sa_out.add_vpp_config()
336
337         p.tun_sa_in = VppIpsecSA(
338             self,
339             p.scapy_tun_sa_id,
340             p.scapy_tun_spi,
341             p.auth_algo_vpp_id,
342             p.auth_key,
343             p.crypt_algo_vpp_id,
344             p.crypt_key,
345             self.vpp_esp_protocol,
346             flags=p.flags
347             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
348             udp_src=p.nat_header.sport,
349             udp_dst=p.nat_header.dport,
350         )
351         p.tun_sa_in.add_vpp_config()
352
353     def setUp(self):
354         super(TemplateIpsec4TunIfEspUdp, self).setUp()
355
356         p = self.ipv4_params
357         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358         p.nat_header = UDP(sport=5454, dport=4500)
359
360         self.tun_if = self.pg0
361
362         self.config_network(p)
363         self.config_sa_tra(p)
364         self.config_protect(p)
365
366     def tearDown(self):
367         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
368
369
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371     """Ipsec ESP - TUN tests"""
372
373     tun4_encrypt_node_name = "esp4-encrypt-tun"
374     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
375
376     def test_tun_basic64(self):
377         """ipsec 6o4 tunnel basic test"""
378         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
379
380         self.verify_tun_64(self.params[socket.AF_INET], count=1)
381
382     def test_tun_burst64(self):
383         """ipsec 6o4 tunnel basic test"""
384         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
385
386         self.verify_tun_64(self.params[socket.AF_INET], count=257)
387
388     def test_tun_basic_frag44(self):
389         """ipsec 4o4 tunnel frag basic test"""
390         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
391
392         p = self.ipv4_params
393
394         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
395         self.verify_tun_44(
396             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
397         )
398         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
399
400
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402     """Ipsec ESP UDP tests"""
403
404     tun4_input_node = "ipsec4-tun-input"
405
406     def setUp(self):
407         super(TestIpsec4TunIfEspUdp, self).setUp()
408
409     def test_keepalive(self):
410         """IPSEC NAT Keepalive"""
411         self.verify_keepalive(self.ipv4_params)
412
413
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415     """Ipsec ESP UDP GCM tests"""
416
417     tun4_input_node = "ipsec4-tun-input"
418
419     def setUp(self):
420         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
421         p = self.ipv4_params
422         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423         p.crypt_algo_vpp_id = (
424             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
425         )
426         p.crypt_algo = "AES-GCM"
427         p.auth_algo = "NULL"
428         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
429         p.salt = 0
430
431
432 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
433     """Ipsec ESP UDP update tests"""
434
435     tun4_input_node = "ipsec4-tun-input"
436
437     def setUp(self):
438         super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
439         p = self.ipv4_params
440         p.nat_header = UDP(sport=6565, dport=7676)
441         config_tun_params(p, self.encryption_type, p.tun_if)
442         p.tun_sa_in.update_vpp_config(
443             udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
444         )
445         p.tun_sa_out.update_vpp_config(
446             udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
447         )
448
449
450 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
451     """Ipsec ESP - TCP tests"""
452
453     pass
454
455
456 class TemplateIpsec6TunProtect(object):
457     """IPsec IPv6 Tunnel protect"""
458
459     def config_sa_tra(self, p):
460         config_tun_params(p, self.encryption_type, p.tun_if)
461
462         p.tun_sa_out = VppIpsecSA(
463             self,
464             p.vpp_tun_sa_id,
465             p.vpp_tun_spi,
466             p.auth_algo_vpp_id,
467             p.auth_key,
468             p.crypt_algo_vpp_id,
469             p.crypt_key,
470             self.vpp_esp_protocol,
471         )
472         p.tun_sa_out.add_vpp_config()
473
474         p.tun_sa_in = VppIpsecSA(
475             self,
476             p.scapy_tun_sa_id,
477             p.scapy_tun_spi,
478             p.auth_algo_vpp_id,
479             p.auth_key,
480             p.crypt_algo_vpp_id,
481             p.crypt_key,
482             self.vpp_esp_protocol,
483         )
484         p.tun_sa_in.add_vpp_config()
485
486     def config_sa_tun(self, p):
487         config_tun_params(p, self.encryption_type, p.tun_if)
488
489         p.tun_sa_out = VppIpsecSA(
490             self,
491             p.vpp_tun_sa_id,
492             p.vpp_tun_spi,
493             p.auth_algo_vpp_id,
494             p.auth_key,
495             p.crypt_algo_vpp_id,
496             p.crypt_key,
497             self.vpp_esp_protocol,
498             self.tun_if.local_addr[p.addr_type],
499             self.tun_if.remote_addr[p.addr_type],
500         )
501         p.tun_sa_out.add_vpp_config()
502
503         p.tun_sa_in = VppIpsecSA(
504             self,
505             p.scapy_tun_sa_id,
506             p.scapy_tun_spi,
507             p.auth_algo_vpp_id,
508             p.auth_key,
509             p.crypt_algo_vpp_id,
510             p.crypt_key,
511             self.vpp_esp_protocol,
512             self.tun_if.remote_addr[p.addr_type],
513             self.tun_if.local_addr[p.addr_type],
514         )
515         p.tun_sa_in.add_vpp_config()
516
517     def config_protect(self, p):
518         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
519         p.tun_protect.add_vpp_config()
520
521     def config_network(self, p):
522         if hasattr(p, "tun_dst"):
523             tun_dst = p.tun_dst
524         else:
525             tun_dst = self.pg0.remote_ip6
526         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
527         p.tun_if.add_vpp_config()
528         p.tun_if.admin_up()
529         p.tun_if.config_ip6()
530         p.tun_if.config_ip4()
531
532         p.route = VppIpRoute(
533             self,
534             p.remote_tun_if_host,
535             128,
536             [
537                 VppRoutePath(
538                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
539                 )
540             ],
541         )
542         p.route.add_vpp_config()
543         r = VppIpRoute(
544             self,
545             p.remote_tun_if_host4,
546             32,
547             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
548         )
549         r.add_vpp_config()
550
551     def unconfig_network(self, p):
552         p.route.remove_vpp_config()
553         p.tun_if.remove_vpp_config()
554
555     def unconfig_protect(self, p):
556         p.tun_protect.remove_vpp_config()
557
558     def unconfig_sa(self, p):
559         p.tun_sa_out.remove_vpp_config()
560         p.tun_sa_in.remove_vpp_config()
561
562
563 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
564     """IPsec tunnel interface tests"""
565
566     encryption_type = ESP
567
568     def setUp(self):
569         super(TemplateIpsec6TunIfEsp, self).setUp()
570
571         self.tun_if = self.pg0
572
573         p = self.ipv6_params
574         self.config_network(p)
575         self.config_sa_tra(p)
576         self.config_protect(p)
577
578     def tearDown(self):
579         super(TemplateIpsec6TunIfEsp, self).tearDown()
580
581
582 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
583     """IPsec6 UDP tunnel interface tests"""
584
585     tun4_encrypt_node_name = "esp6-encrypt-tun"
586     tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
587     encryption_type = ESP
588
589     @classmethod
590     def setUpClass(cls):
591         super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
592
593     @classmethod
594     def tearDownClass(cls):
595         super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
596
597     def verify_encrypted(self, p, sa, rxs):
598         for rx in rxs:
599             try:
600                 # ensure the UDP ports are correct before we decrypt
601                 # which strips them
602                 self.assertTrue(rx.haslayer(UDP))
603                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
604                 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
605
606                 pkt = sa.decrypt(rx[IP])
607                 if not pkt.haslayer(IP):
608                     pkt = IP(pkt[Raw].load)
609
610                 self.assert_packet_checksums_valid(pkt)
611                 self.assert_equal(
612                     pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
613                 )
614                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
615             except (IndexError, AssertionError):
616                 self.logger.debug(ppp("Unexpected packet:", rx))
617                 try:
618                     self.logger.debug(ppp("Decrypted packet:", pkt))
619                 except:
620                     pass
621                 raise
622
623     def config_sa_tra(self, p):
624         config_tun_params(p, self.encryption_type, p.tun_if)
625
626         p.tun_sa_out = VppIpsecSA(
627             self,
628             p.vpp_tun_sa_id,
629             p.vpp_tun_spi,
630             p.auth_algo_vpp_id,
631             p.auth_key,
632             p.crypt_algo_vpp_id,
633             p.crypt_key,
634             self.vpp_esp_protocol,
635             flags=p.flags,
636             udp_src=p.nat_header.sport,
637             udp_dst=p.nat_header.dport,
638         )
639         p.tun_sa_out.add_vpp_config()
640
641         p.tun_sa_in = VppIpsecSA(
642             self,
643             p.scapy_tun_sa_id,
644             p.scapy_tun_spi,
645             p.auth_algo_vpp_id,
646             p.auth_key,
647             p.crypt_algo_vpp_id,
648             p.crypt_key,
649             self.vpp_esp_protocol,
650             flags=p.flags
651             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
652             udp_src=p.nat_header.sport,
653             udp_dst=p.nat_header.dport,
654         )
655         p.tun_sa_in.add_vpp_config()
656
657     def setUp(self):
658         super(TemplateIpsec6TunIfEspUdp, self).setUp()
659
660         p = self.ipv6_params
661         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
662         p.nat_header = UDP(sport=5454, dport=4500)
663
664         self.tun_if = self.pg0
665
666         self.config_network(p)
667         self.config_sa_tra(p)
668         self.config_protect(p)
669
670     def tearDown(self):
671         super(TemplateIpsec6TunIfEspUdp, self).tearDown()
672
673
674 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
675     """Ipsec ESP 6 UDP tests"""
676
677     tun6_input_node = "ipsec6-tun-input"
678     tun6_encrypt_node_name = "esp6-encrypt-tun"
679     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
680
681     def setUp(self):
682         super(TestIpsec6TunIfEspUdp, self).setUp()
683
684     def test_keepalive(self):
685         """IPSEC6 NAT Keepalive"""
686         self.verify_keepalive(self.ipv6_params)
687
688
689 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
690     """Ipsec ESP 6 UDP GCM tests"""
691
692     tun6_input_node = "ipsec6-tun-input"
693     tun6_encrypt_node_name = "esp6-encrypt-tun"
694     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
695
696     def setUp(self):
697         super(TestIpsec6TunIfEspUdpGCM, self).setUp()
698         p = self.ipv6_params
699         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
700         p.crypt_algo_vpp_id = (
701             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
702         )
703         p.crypt_algo = "AES-GCM"
704         p.auth_algo = "NULL"
705         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
706         p.salt = 0
707
708
709 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
710     """Ipsec ESP - TUN tests"""
711
712     tun6_encrypt_node_name = "esp6-encrypt-tun"
713     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
714
715     def test_tun_basic46(self):
716         """ipsec 4o6 tunnel basic test"""
717         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
718         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
719
720     def test_tun_burst46(self):
721         """ipsec 4o6 tunnel burst test"""
722         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
723         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
724
725
726 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
727     """Ipsec ESP 6 Handoff tests"""
728
729     tun6_encrypt_node_name = "esp6-encrypt-tun"
730     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
731
732     def test_tun_handoff_66_police(self):
733         """ESP 6o6 tunnel with policer worker hand-off test"""
734         self.vapi.cli("clear errors")
735         self.vapi.cli("clear ipsec sa")
736
737         N_PKTS = 15
738         p = self.params[socket.AF_INET6]
739
740         action_tx = PolicerAction(
741             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
742         )
743         policer = VppPolicer(
744             self,
745             "pol1",
746             80,
747             0,
748             1000,
749             0,
750             conform_action=action_tx,
751             exceed_action=action_tx,
752             violate_action=action_tx,
753         )
754         policer.add_vpp_config()
755
756         # Start policing on tun
757         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
758
759         for pol_bind in [1, 0]:
760             policer.bind_vpp_config(pol_bind, True)
761
762             # inject alternately on worker 0 and 1.
763             for worker in [0, 1, 0, 1]:
764                 send_pkts = self.gen_encrypt_pkts6(
765                     p,
766                     p.scapy_tun_sa,
767                     self.tun_if,
768                     src=p.remote_tun_if_host,
769                     dst=self.pg1.remote_ip6,
770                     count=N_PKTS,
771                 )
772                 recv_pkts = self.send_and_expect(
773                     self.tun_if, send_pkts, self.pg1, worker=worker
774                 )
775                 self.verify_decrypted6(p, recv_pkts)
776                 self.logger.debug(self.vapi.cli("show trace max 100"))
777
778             stats = policer.get_stats()
779             stats0 = policer.get_stats(worker=0)
780             stats1 = policer.get_stats(worker=1)
781
782             if pol_bind == 1:
783                 # First pass: Worker 1, should have done all the policing
784                 self.assertEqual(stats, stats1)
785
786                 # Worker 0, should have handed everything off
787                 self.assertEqual(stats0["conform_packets"], 0)
788                 self.assertEqual(stats0["exceed_packets"], 0)
789                 self.assertEqual(stats0["violate_packets"], 0)
790             else:
791                 # Second pass: both workers should have policed equal amounts
792                 self.assertGreater(stats1["conform_packets"], 0)
793                 self.assertEqual(stats1["exceed_packets"], 0)
794                 self.assertGreater(stats1["violate_packets"], 0)
795
796                 self.assertGreater(stats0["conform_packets"], 0)
797                 self.assertEqual(stats0["exceed_packets"], 0)
798                 self.assertGreater(stats0["violate_packets"], 0)
799
800                 self.assertEqual(
801                     stats0["conform_packets"] + stats0["violate_packets"],
802                     stats1["conform_packets"] + stats1["violate_packets"],
803                 )
804
805         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
806         policer.remove_vpp_config()
807
808
809 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
810     """Ipsec ESP 4 Handoff tests"""
811
812     tun4_encrypt_node_name = "esp4-encrypt-tun"
813     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
814
815     def test_tun_handoff_44_police(self):
816         """ESP 4o4 tunnel with policer worker hand-off test"""
817         self.vapi.cli("clear errors")
818         self.vapi.cli("clear ipsec sa")
819
820         N_PKTS = 15
821         p = self.params[socket.AF_INET]
822
823         action_tx = PolicerAction(
824             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
825         )
826         policer = VppPolicer(
827             self,
828             "pol1",
829             80,
830             0,
831             1000,
832             0,
833             conform_action=action_tx,
834             exceed_action=action_tx,
835             violate_action=action_tx,
836         )
837         policer.add_vpp_config()
838
839         # Start policing on tun
840         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
841
842         for pol_bind in [1, 0]:
843             policer.bind_vpp_config(pol_bind, True)
844
845             # inject alternately on worker 0 and 1.
846             for worker in [0, 1, 0, 1]:
847                 send_pkts = self.gen_encrypt_pkts(
848                     p,
849                     p.scapy_tun_sa,
850                     self.tun_if,
851                     src=p.remote_tun_if_host,
852                     dst=self.pg1.remote_ip4,
853                     count=N_PKTS,
854                 )
855                 recv_pkts = self.send_and_expect(
856                     self.tun_if, send_pkts, self.pg1, worker=worker
857                 )
858                 self.verify_decrypted(p, recv_pkts)
859                 self.logger.debug(self.vapi.cli("show trace max 100"))
860
861             stats = policer.get_stats()
862             stats0 = policer.get_stats(worker=0)
863             stats1 = policer.get_stats(worker=1)
864
865             if pol_bind == 1:
866                 # First pass: Worker 1, should have done all the policing
867                 self.assertEqual(stats, stats1)
868
869                 # Worker 0, should have handed everything off
870                 self.assertEqual(stats0["conform_packets"], 0)
871                 self.assertEqual(stats0["exceed_packets"], 0)
872                 self.assertEqual(stats0["violate_packets"], 0)
873             else:
874                 # Second pass: both workers should have policed equal amounts
875                 self.assertGreater(stats1["conform_packets"], 0)
876                 self.assertEqual(stats1["exceed_packets"], 0)
877                 self.assertGreater(stats1["violate_packets"], 0)
878
879                 self.assertGreater(stats0["conform_packets"], 0)
880                 self.assertEqual(stats0["exceed_packets"], 0)
881                 self.assertGreater(stats0["violate_packets"], 0)
882
883                 self.assertEqual(
884                     stats0["conform_packets"] + stats0["violate_packets"],
885                     stats1["conform_packets"] + stats1["violate_packets"],
886                 )
887
888         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
889         policer.remove_vpp_config()
890
891
892 @tag_fixme_vpp_workers
893 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
894     """IPsec IPv4 Multi Tunnel interface"""
895
896     encryption_type = ESP
897     tun4_encrypt_node_name = "esp4-encrypt-tun"
898     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
899
900     def setUp(self):
901         super(TestIpsec4MultiTunIfEsp, self).setUp()
902
903         self.tun_if = self.pg0
904
905         self.multi_params = []
906         self.pg0.generate_remote_hosts(10)
907         self.pg0.configure_ipv4_neighbors()
908
909         for ii in range(10):
910             p = copy.copy(self.ipv4_params)
911
912             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
913             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
914             p.scapy_tun_spi = p.scapy_tun_spi + ii
915             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
916             p.vpp_tun_spi = p.vpp_tun_spi + ii
917
918             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
919             p.scapy_tra_spi = p.scapy_tra_spi + ii
920             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
921             p.vpp_tra_spi = p.vpp_tra_spi + ii
922             p.tun_dst = self.pg0.remote_hosts[ii].ip4
923
924             self.multi_params.append(p)
925             self.config_network(p)
926             self.config_sa_tra(p)
927             self.config_protect(p)
928
929     def tearDown(self):
930         super(TestIpsec4MultiTunIfEsp, self).tearDown()
931
932     def test_tun_44(self):
933         """Multiple IPSEC tunnel interfaces"""
934         for p in self.multi_params:
935             self.verify_tun_44(p, count=127)
936             self.assertEqual(p.tun_if.get_rx_stats(), 127)
937             self.assertEqual(p.tun_if.get_tx_stats(), 127)
938
939     def test_tun_rr_44(self):
940         """Round-robin packets acrros multiple interface"""
941         tx = []
942         for p in self.multi_params:
943             tx = tx + self.gen_encrypt_pkts(
944                 p,
945                 p.scapy_tun_sa,
946                 self.tun_if,
947                 src=p.remote_tun_if_host,
948                 dst=self.pg1.remote_ip4,
949             )
950         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
951
952         for rx, p in zip(rxs, self.multi_params):
953             self.verify_decrypted(p, [rx])
954
955         tx = []
956         for p in self.multi_params:
957             tx = tx + self.gen_pkts(
958                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
959             )
960         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
961
962         for rx, p in zip(rxs, self.multi_params):
963             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
964
965
966 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
967     """IPsec IPv4 Tunnel interface all Algos"""
968
969     encryption_type = ESP
970     tun4_encrypt_node_name = "esp4-encrypt-tun"
971     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
972
973     def setUp(self):
974         super(TestIpsec4TunIfEspAll, self).setUp()
975
976         self.tun_if = self.pg0
977         p = self.ipv4_params
978
979         self.config_network(p)
980         self.config_sa_tra(p)
981         self.config_protect(p)
982
983     def tearDown(self):
984         p = self.ipv4_params
985         self.unconfig_protect(p)
986         self.unconfig_network(p)
987         self.unconfig_sa(p)
988
989         super(TestIpsec4TunIfEspAll, self).tearDown()
990
991     def rekey(self, p):
992         #
993         # change the key and the SPI
994         #
995         np = copy.copy(p)
996         p.crypt_key = b"X" + p.crypt_key[1:]
997         p.scapy_tun_spi += 1
998         p.scapy_tun_sa_id += 1
999         p.vpp_tun_spi += 1
1000         p.vpp_tun_sa_id += 1
1001         p.tun_if.local_spi = p.vpp_tun_spi
1002         p.tun_if.remote_spi = p.scapy_tun_spi
1003
1004         config_tun_params(p, self.encryption_type, p.tun_if)
1005
1006         p.tun_sa_out = VppIpsecSA(
1007             self,
1008             p.vpp_tun_sa_id,
1009             p.vpp_tun_spi,
1010             p.auth_algo_vpp_id,
1011             p.auth_key,
1012             p.crypt_algo_vpp_id,
1013             p.crypt_key,
1014             self.vpp_esp_protocol,
1015             flags=p.flags,
1016             salt=p.salt,
1017         )
1018         p.tun_sa_in = VppIpsecSA(
1019             self,
1020             p.scapy_tun_sa_id,
1021             p.scapy_tun_spi,
1022             p.auth_algo_vpp_id,
1023             p.auth_key,
1024             p.crypt_algo_vpp_id,
1025             p.crypt_key,
1026             self.vpp_esp_protocol,
1027             flags=p.flags,
1028             salt=p.salt,
1029         )
1030         p.tun_sa_in.add_vpp_config()
1031         p.tun_sa_out.add_vpp_config()
1032
1033         self.config_protect(p)
1034         np.tun_sa_out.remove_vpp_config()
1035         np.tun_sa_in.remove_vpp_config()
1036         self.logger.info(self.vapi.cli("sh ipsec sa"))
1037
1038     def test_tun_44(self):
1039         """IPSEC tunnel all algos"""
1040
1041         # foreach VPP crypto engine
1042         engines = ["ia32", "ipsecmb", "openssl"]
1043
1044         # foreach crypto algorithm
1045         algos = [
1046             {
1047                 "vpp-crypto": (
1048                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1049                 ),
1050                 "vpp-integ": (
1051                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1052                 ),
1053                 "scapy-crypto": "AES-GCM",
1054                 "scapy-integ": "NULL",
1055                 "key": b"JPjyOWBeVEQiMe7h",
1056                 "salt": 3333,
1057             },
1058             {
1059                 "vpp-crypto": (
1060                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1061                 ),
1062                 "vpp-integ": (
1063                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1064                 ),
1065                 "scapy-crypto": "AES-GCM",
1066                 "scapy-integ": "NULL",
1067                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1068                 "salt": 0,
1069             },
1070             {
1071                 "vpp-crypto": (
1072                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1073                 ),
1074                 "vpp-integ": (
1075                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1076                 ),
1077                 "scapy-crypto": "AES-GCM",
1078                 "scapy-integ": "NULL",
1079                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1080                 "salt": 9999,
1081             },
1082             {
1083                 "vpp-crypto": (
1084                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1085                 ),
1086                 "vpp-integ": (
1087                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1088                 ),
1089                 "scapy-crypto": "AES-CBC",
1090                 "scapy-integ": "HMAC-SHA1-96",
1091                 "salt": 0,
1092                 "key": b"JPjyOWBeVEQiMe7h",
1093             },
1094             {
1095                 "vpp-crypto": (
1096                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1097                 ),
1098                 "vpp-integ": (
1099                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1100                 ),
1101                 "scapy-crypto": "AES-CBC",
1102                 "scapy-integ": "SHA2-512-256",
1103                 "salt": 0,
1104                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1105             },
1106             {
1107                 "vpp-crypto": (
1108                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1109                 ),
1110                 "vpp-integ": (
1111                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1112                 ),
1113                 "scapy-crypto": "AES-CBC",
1114                 "scapy-integ": "SHA2-256-128",
1115                 "salt": 0,
1116                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1117             },
1118             {
1119                 "vpp-crypto": (
1120                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1121                 ),
1122                 "vpp-integ": (
1123                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1124                 ),
1125                 "scapy-crypto": "NULL",
1126                 "scapy-integ": "HMAC-SHA1-96",
1127                 "salt": 0,
1128                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1129             },
1130         ]
1131
1132         for engine in engines:
1133             self.vapi.cli("set crypto handler all %s" % engine)
1134
1135             #
1136             # loop through each of the algorithms
1137             #
1138             for algo in algos:
1139                 # with self.subTest(algo=algo['scapy']):
1140
1141                 p = self.ipv4_params
1142                 p.auth_algo_vpp_id = algo["vpp-integ"]
1143                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1144                 p.crypt_algo = algo["scapy-crypto"]
1145                 p.auth_algo = algo["scapy-integ"]
1146                 p.crypt_key = algo["key"]
1147                 p.salt = algo["salt"]
1148
1149                 #
1150                 # rekey the tunnel
1151                 #
1152                 self.rekey(p)
1153                 self.verify_tun_44(p, count=127)
1154
1155
1156 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1157     """IPsec IPv4 Tunnel interface no Algos"""
1158
1159     encryption_type = ESP
1160     tun4_encrypt_node_name = "esp4-encrypt-tun"
1161     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1162
1163     def setUp(self):
1164         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1165
1166         self.tun_if = self.pg0
1167         p = self.ipv4_params
1168         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1169         p.auth_algo = "NULL"
1170         p.auth_key = []
1171
1172         p.crypt_algo_vpp_id = (
1173             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1174         )
1175         p.crypt_algo = "NULL"
1176         p.crypt_key = []
1177
1178     def tearDown(self):
1179         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1180
1181     def test_tun_44(self):
1182         """IPSec SA with NULL algos"""
1183         p = self.ipv4_params
1184
1185         self.config_network(p)
1186         self.config_sa_tra(p)
1187         self.config_protect(p)
1188
1189         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1190         self.send_and_assert_no_replies(self.pg1, tx)
1191
1192         self.unconfig_protect(p)
1193         self.unconfig_sa(p)
1194         self.unconfig_network(p)
1195
1196
1197 @tag_fixme_vpp_workers
1198 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1199     """IPsec IPv6 Multi Tunnel interface"""
1200
1201     encryption_type = ESP
1202     tun6_encrypt_node_name = "esp6-encrypt-tun"
1203     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1204
1205     def setUp(self):
1206         super(TestIpsec6MultiTunIfEsp, self).setUp()
1207
1208         self.tun_if = self.pg0
1209
1210         self.multi_params = []
1211         self.pg0.generate_remote_hosts(10)
1212         self.pg0.configure_ipv6_neighbors()
1213
1214         for ii in range(10):
1215             p = copy.copy(self.ipv6_params)
1216
1217             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1218             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1219             p.scapy_tun_spi = p.scapy_tun_spi + ii
1220             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1221             p.vpp_tun_spi = p.vpp_tun_spi + ii
1222
1223             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1224             p.scapy_tra_spi = p.scapy_tra_spi + ii
1225             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1226             p.vpp_tra_spi = p.vpp_tra_spi + ii
1227             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1228
1229             self.multi_params.append(p)
1230             self.config_network(p)
1231             self.config_sa_tra(p)
1232             self.config_protect(p)
1233
1234     def tearDown(self):
1235         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1236
1237     def test_tun_66(self):
1238         """Multiple IPSEC tunnel interfaces"""
1239         for p in self.multi_params:
1240             self.verify_tun_66(p, count=127)
1241             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1242             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1243
1244
1245 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1246     """Ipsec GRE TEB ESP - TUN tests"""
1247
1248     tun4_encrypt_node_name = "esp4-encrypt-tun"
1249     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1250     encryption_type = ESP
1251     omac = "00:11:22:33:44:55"
1252
1253     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1254         return [
1255             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1256             / sa.encrypt(
1257                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1258                 / GRE()
1259                 / Ether(dst=self.omac)
1260                 / IP(src="1.1.1.1", dst="1.1.1.2")
1261                 / UDP(sport=1144, dport=2233)
1262                 / Raw(b"X" * payload_size)
1263             )
1264             for i in range(count)
1265         ]
1266
1267     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1268         return [
1269             Ether(dst=self.omac)
1270             / IP(src="1.1.1.1", dst="1.1.1.2")
1271             / UDP(sport=1144, dport=2233)
1272             / Raw(b"X" * payload_size)
1273             for i in range(count)
1274         ]
1275
1276     def verify_decrypted(self, p, rxs):
1277         for rx in rxs:
1278             self.assert_equal(rx[Ether].dst, self.omac)
1279             self.assert_equal(rx[IP].dst, "1.1.1.2")
1280
1281     def verify_encrypted(self, p, sa, rxs):
1282         for rx in rxs:
1283             try:
1284                 pkt = sa.decrypt(rx[IP])
1285                 if not pkt.haslayer(IP):
1286                     pkt = IP(pkt[Raw].load)
1287                 self.assert_packet_checksums_valid(pkt)
1288                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1289                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1290                 self.assertTrue(pkt.haslayer(GRE))
1291                 e = pkt[Ether]
1292                 self.assertEqual(e[Ether].dst, self.omac)
1293                 self.assertEqual(e[IP].dst, "1.1.1.2")
1294             except (IndexError, AssertionError):
1295                 self.logger.debug(ppp("Unexpected packet:", rx))
1296                 try:
1297                     self.logger.debug(ppp("Decrypted packet:", pkt))
1298                 except:
1299                     pass
1300                 raise
1301
1302     def setUp(self):
1303         super(TestIpsecGreTebIfEsp, self).setUp()
1304
1305         self.tun_if = self.pg0
1306
1307         p = self.ipv4_params
1308
1309         bd1 = VppBridgeDomain(self, 1)
1310         bd1.add_vpp_config()
1311
1312         p.tun_sa_out = VppIpsecSA(
1313             self,
1314             p.vpp_tun_sa_id,
1315             p.vpp_tun_spi,
1316             p.auth_algo_vpp_id,
1317             p.auth_key,
1318             p.crypt_algo_vpp_id,
1319             p.crypt_key,
1320             self.vpp_esp_protocol,
1321             self.pg0.local_ip4,
1322             self.pg0.remote_ip4,
1323         )
1324         p.tun_sa_out.add_vpp_config()
1325
1326         p.tun_sa_in = VppIpsecSA(
1327             self,
1328             p.scapy_tun_sa_id,
1329             p.scapy_tun_spi,
1330             p.auth_algo_vpp_id,
1331             p.auth_key,
1332             p.crypt_algo_vpp_id,
1333             p.crypt_key,
1334             self.vpp_esp_protocol,
1335             self.pg0.remote_ip4,
1336             self.pg0.local_ip4,
1337         )
1338         p.tun_sa_in.add_vpp_config()
1339
1340         p.tun_if = VppGreInterface(
1341             self,
1342             self.pg0.local_ip4,
1343             self.pg0.remote_ip4,
1344             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1345         )
1346         p.tun_if.add_vpp_config()
1347
1348         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1349
1350         p.tun_protect.add_vpp_config()
1351
1352         p.tun_if.admin_up()
1353         p.tun_if.config_ip4()
1354         config_tun_params(p, self.encryption_type, p.tun_if)
1355
1356         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1357         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1358
1359         self.vapi.cli("clear ipsec sa")
1360         self.vapi.cli("sh adj")
1361         self.vapi.cli("sh ipsec tun")
1362
1363     def tearDown(self):
1364         p = self.ipv4_params
1365         p.tun_if.unconfig_ip4()
1366         super(TestIpsecGreTebIfEsp, self).tearDown()
1367
1368
1369 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1370     """Ipsec GRE TEB ESP - TUN tests"""
1371
1372     tun4_encrypt_node_name = "esp4-encrypt-tun"
1373     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1374     encryption_type = ESP
1375     omac = "00:11:22:33:44:55"
1376
1377     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1378         return [
1379             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1380             / sa.encrypt(
1381                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1382                 / GRE()
1383                 / Ether(dst=self.omac)
1384                 / IP(src="1.1.1.1", dst="1.1.1.2")
1385                 / UDP(sport=1144, dport=2233)
1386                 / Raw(b"X" * payload_size)
1387             )
1388             for i in range(count)
1389         ]
1390
1391     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1392         return [
1393             Ether(dst=self.omac)
1394             / Dot1Q(vlan=11)
1395             / IP(src="1.1.1.1", dst="1.1.1.2")
1396             / UDP(sport=1144, dport=2233)
1397             / Raw(b"X" * payload_size)
1398             for i in range(count)
1399         ]
1400
1401     def verify_decrypted(self, p, rxs):
1402         for rx in rxs:
1403             self.assert_equal(rx[Ether].dst, self.omac)
1404             self.assert_equal(rx[Dot1Q].vlan, 11)
1405             self.assert_equal(rx[IP].dst, "1.1.1.2")
1406
1407     def verify_encrypted(self, p, sa, rxs):
1408         for rx in rxs:
1409             try:
1410                 pkt = sa.decrypt(rx[IP])
1411                 if not pkt.haslayer(IP):
1412                     pkt = IP(pkt[Raw].load)
1413                 self.assert_packet_checksums_valid(pkt)
1414                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1415                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1416                 self.assertTrue(pkt.haslayer(GRE))
1417                 e = pkt[Ether]
1418                 self.assertEqual(e[Ether].dst, self.omac)
1419                 self.assertFalse(e.haslayer(Dot1Q))
1420                 self.assertEqual(e[IP].dst, "1.1.1.2")
1421             except (IndexError, AssertionError):
1422                 self.logger.debug(ppp("Unexpected packet:", rx))
1423                 try:
1424                     self.logger.debug(ppp("Decrypted packet:", pkt))
1425                 except:
1426                     pass
1427                 raise
1428
1429     def setUp(self):
1430         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1431
1432         self.tun_if = self.pg0
1433
1434         p = self.ipv4_params
1435
1436         bd1 = VppBridgeDomain(self, 1)
1437         bd1.add_vpp_config()
1438
1439         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1440         self.vapi.l2_interface_vlan_tag_rewrite(
1441             sw_if_index=self.pg1_11.sw_if_index,
1442             vtr_op=L2_VTR_OP.L2_POP_1,
1443             push_dot1q=11,
1444         )
1445         self.pg1_11.admin_up()
1446
1447         p.tun_sa_out = VppIpsecSA(
1448             self,
1449             p.vpp_tun_sa_id,
1450             p.vpp_tun_spi,
1451             p.auth_algo_vpp_id,
1452             p.auth_key,
1453             p.crypt_algo_vpp_id,
1454             p.crypt_key,
1455             self.vpp_esp_protocol,
1456             self.pg0.local_ip4,
1457             self.pg0.remote_ip4,
1458         )
1459         p.tun_sa_out.add_vpp_config()
1460
1461         p.tun_sa_in = VppIpsecSA(
1462             self,
1463             p.scapy_tun_sa_id,
1464             p.scapy_tun_spi,
1465             p.auth_algo_vpp_id,
1466             p.auth_key,
1467             p.crypt_algo_vpp_id,
1468             p.crypt_key,
1469             self.vpp_esp_protocol,
1470             self.pg0.remote_ip4,
1471             self.pg0.local_ip4,
1472         )
1473         p.tun_sa_in.add_vpp_config()
1474
1475         p.tun_if = VppGreInterface(
1476             self,
1477             self.pg0.local_ip4,
1478             self.pg0.remote_ip4,
1479             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1480         )
1481         p.tun_if.add_vpp_config()
1482
1483         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1484
1485         p.tun_protect.add_vpp_config()
1486
1487         p.tun_if.admin_up()
1488         p.tun_if.config_ip4()
1489         config_tun_params(p, self.encryption_type, p.tun_if)
1490
1491         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1492         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1493
1494         self.vapi.cli("clear ipsec sa")
1495
1496     def tearDown(self):
1497         p = self.ipv4_params
1498         p.tun_if.unconfig_ip4()
1499         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1500         self.pg1_11.admin_down()
1501         self.pg1_11.remove_vpp_config()
1502
1503
1504 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1505     """Ipsec GRE TEB ESP - Tra tests"""
1506
1507     tun4_encrypt_node_name = "esp4-encrypt-tun"
1508     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1509     encryption_type = ESP
1510     omac = "00:11:22:33:44:55"
1511
1512     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1513         return [
1514             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1515             / sa.encrypt(
1516                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1517                 / GRE()
1518                 / Ether(dst=self.omac)
1519                 / IP(src="1.1.1.1", dst="1.1.1.2")
1520                 / UDP(sport=1144, dport=2233)
1521                 / Raw(b"X" * payload_size)
1522             )
1523             for i in range(count)
1524         ]
1525
1526     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1527         return [
1528             Ether(dst=self.omac)
1529             / IP(src="1.1.1.1", dst="1.1.1.2")
1530             / UDP(sport=1144, dport=2233)
1531             / Raw(b"X" * payload_size)
1532             for i in range(count)
1533         ]
1534
1535     def verify_decrypted(self, p, rxs):
1536         for rx in rxs:
1537             self.assert_equal(rx[Ether].dst, self.omac)
1538             self.assert_equal(rx[IP].dst, "1.1.1.2")
1539
1540     def verify_encrypted(self, p, sa, rxs):
1541         for rx in rxs:
1542             try:
1543                 pkt = sa.decrypt(rx[IP])
1544                 if not pkt.haslayer(IP):
1545                     pkt = IP(pkt[Raw].load)
1546                 self.assert_packet_checksums_valid(pkt)
1547                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1548                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1549                 self.assertTrue(pkt.haslayer(GRE))
1550                 e = pkt[Ether]
1551                 self.assertEqual(e[Ether].dst, self.omac)
1552                 self.assertEqual(e[IP].dst, "1.1.1.2")
1553             except (IndexError, AssertionError):
1554                 self.logger.debug(ppp("Unexpected packet:", rx))
1555                 try:
1556                     self.logger.debug(ppp("Decrypted packet:", pkt))
1557                 except:
1558                     pass
1559                 raise
1560
1561     def setUp(self):
1562         super(TestIpsecGreTebIfEspTra, self).setUp()
1563
1564         self.tun_if = self.pg0
1565
1566         p = self.ipv4_params
1567
1568         bd1 = VppBridgeDomain(self, 1)
1569         bd1.add_vpp_config()
1570
1571         p.tun_sa_out = VppIpsecSA(
1572             self,
1573             p.vpp_tun_sa_id,
1574             p.vpp_tun_spi,
1575             p.auth_algo_vpp_id,
1576             p.auth_key,
1577             p.crypt_algo_vpp_id,
1578             p.crypt_key,
1579             self.vpp_esp_protocol,
1580         )
1581         p.tun_sa_out.add_vpp_config()
1582
1583         p.tun_sa_in = VppIpsecSA(
1584             self,
1585             p.scapy_tun_sa_id,
1586             p.scapy_tun_spi,
1587             p.auth_algo_vpp_id,
1588             p.auth_key,
1589             p.crypt_algo_vpp_id,
1590             p.crypt_key,
1591             self.vpp_esp_protocol,
1592         )
1593         p.tun_sa_in.add_vpp_config()
1594
1595         p.tun_if = VppGreInterface(
1596             self,
1597             self.pg0.local_ip4,
1598             self.pg0.remote_ip4,
1599             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1600         )
1601         p.tun_if.add_vpp_config()
1602
1603         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1604
1605         p.tun_protect.add_vpp_config()
1606
1607         p.tun_if.admin_up()
1608         p.tun_if.config_ip4()
1609         config_tra_params(p, self.encryption_type, p.tun_if)
1610
1611         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1612         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1613
1614         self.vapi.cli("clear ipsec sa")
1615
1616     def tearDown(self):
1617         p = self.ipv4_params
1618         p.tun_if.unconfig_ip4()
1619         super(TestIpsecGreTebIfEspTra, self).tearDown()
1620
1621
1622 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1623     """Ipsec GRE TEB UDP ESP - Tra tests"""
1624
1625     tun4_encrypt_node_name = "esp4-encrypt-tun"
1626     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1627     encryption_type = ESP
1628     omac = "00:11:22:33:44:55"
1629
1630     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1631         return [
1632             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1633             / sa.encrypt(
1634                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1635                 / GRE()
1636                 / Ether(dst=self.omac)
1637                 / IP(src="1.1.1.1", dst="1.1.1.2")
1638                 / UDP(sport=1144, dport=2233)
1639                 / Raw(b"X" * payload_size)
1640             )
1641             for i in range(count)
1642         ]
1643
1644     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1645         return [
1646             Ether(dst=self.omac)
1647             / IP(src="1.1.1.1", dst="1.1.1.2")
1648             / UDP(sport=1144, dport=2233)
1649             / Raw(b"X" * payload_size)
1650             for i in range(count)
1651         ]
1652
1653     def verify_decrypted(self, p, rxs):
1654         for rx in rxs:
1655             self.assert_equal(rx[Ether].dst, self.omac)
1656             self.assert_equal(rx[IP].dst, "1.1.1.2")
1657
1658     def verify_encrypted(self, p, sa, rxs):
1659         for rx in rxs:
1660             self.assertTrue(rx.haslayer(UDP))
1661             self.assertEqual(rx[UDP].dport, 4545)
1662             self.assertEqual(rx[UDP].sport, 5454)
1663             try:
1664                 pkt = sa.decrypt(rx[IP])
1665                 if not pkt.haslayer(IP):
1666                     pkt = IP(pkt[Raw].load)
1667                 self.assert_packet_checksums_valid(pkt)
1668                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1669                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1670                 self.assertTrue(pkt.haslayer(GRE))
1671                 e = pkt[Ether]
1672                 self.assertEqual(e[Ether].dst, self.omac)
1673                 self.assertEqual(e[IP].dst, "1.1.1.2")
1674             except (IndexError, AssertionError):
1675                 self.logger.debug(ppp("Unexpected packet:", rx))
1676                 try:
1677                     self.logger.debug(ppp("Decrypted packet:", pkt))
1678                 except:
1679                     pass
1680                 raise
1681
1682     def setUp(self):
1683         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1684
1685         self.tun_if = self.pg0
1686
1687         p = self.ipv4_params
1688         p = self.ipv4_params
1689         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1690         p.nat_header = UDP(sport=5454, dport=4545)
1691
1692         bd1 = VppBridgeDomain(self, 1)
1693         bd1.add_vpp_config()
1694
1695         p.tun_sa_out = VppIpsecSA(
1696             self,
1697             p.vpp_tun_sa_id,
1698             p.vpp_tun_spi,
1699             p.auth_algo_vpp_id,
1700             p.auth_key,
1701             p.crypt_algo_vpp_id,
1702             p.crypt_key,
1703             self.vpp_esp_protocol,
1704             flags=p.flags,
1705             udp_src=5454,
1706             udp_dst=4545,
1707         )
1708         p.tun_sa_out.add_vpp_config()
1709
1710         p.tun_sa_in = VppIpsecSA(
1711             self,
1712             p.scapy_tun_sa_id,
1713             p.scapy_tun_spi,
1714             p.auth_algo_vpp_id,
1715             p.auth_key,
1716             p.crypt_algo_vpp_id,
1717             p.crypt_key,
1718             self.vpp_esp_protocol,
1719             flags=(
1720                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1721             ),
1722             udp_src=4545,
1723             udp_dst=5454,
1724         )
1725         p.tun_sa_in.add_vpp_config()
1726
1727         p.tun_if = VppGreInterface(
1728             self,
1729             self.pg0.local_ip4,
1730             self.pg0.remote_ip4,
1731             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1732         )
1733         p.tun_if.add_vpp_config()
1734
1735         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1736
1737         p.tun_protect.add_vpp_config()
1738
1739         p.tun_if.admin_up()
1740         p.tun_if.config_ip4()
1741         config_tra_params(p, self.encryption_type, p.tun_if)
1742
1743         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1744         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1745
1746         self.vapi.cli("clear ipsec sa")
1747         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1748
1749     def tearDown(self):
1750         p = self.ipv4_params
1751         p.tun_if.unconfig_ip4()
1752         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1753
1754
1755 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1756     """Ipsec GRE ESP - TUN tests"""
1757
1758     tun4_encrypt_node_name = "esp4-encrypt-tun"
1759     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1760     encryption_type = ESP
1761
1762     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1763         return [
1764             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1765             / sa.encrypt(
1766                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1767                 / GRE()
1768                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1769                 / UDP(sport=1144, dport=2233)
1770                 / Raw(b"X" * payload_size)
1771             )
1772             for i in range(count)
1773         ]
1774
1775     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1776         return [
1777             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1778             / IP(src="1.1.1.1", dst="1.1.1.2")
1779             / UDP(sport=1144, dport=2233)
1780             / Raw(b"X" * payload_size)
1781             for i in range(count)
1782         ]
1783
1784     def verify_decrypted(self, p, rxs):
1785         for rx in rxs:
1786             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1787             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1788
1789     def verify_encrypted(self, p, sa, rxs):
1790         for rx in rxs:
1791             try:
1792                 pkt = sa.decrypt(rx[IP])
1793                 if not pkt.haslayer(IP):
1794                     pkt = IP(pkt[Raw].load)
1795                 self.assert_packet_checksums_valid(pkt)
1796                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1797                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1798                 self.assertTrue(pkt.haslayer(GRE))
1799                 e = pkt[GRE]
1800                 self.assertEqual(e[IP].dst, "1.1.1.2")
1801             except (IndexError, AssertionError):
1802                 self.logger.debug(ppp("Unexpected packet:", rx))
1803                 try:
1804                     self.logger.debug(ppp("Decrypted packet:", pkt))
1805                 except:
1806                     pass
1807                 raise
1808
1809     def setUp(self):
1810         super(TestIpsecGreIfEsp, self).setUp()
1811
1812         self.tun_if = self.pg0
1813
1814         p = self.ipv4_params
1815
1816         bd1 = VppBridgeDomain(self, 1)
1817         bd1.add_vpp_config()
1818
1819         p.tun_sa_out = VppIpsecSA(
1820             self,
1821             p.vpp_tun_sa_id,
1822             p.vpp_tun_spi,
1823             p.auth_algo_vpp_id,
1824             p.auth_key,
1825             p.crypt_algo_vpp_id,
1826             p.crypt_key,
1827             self.vpp_esp_protocol,
1828             self.pg0.local_ip4,
1829             self.pg0.remote_ip4,
1830         )
1831         p.tun_sa_out.add_vpp_config()
1832
1833         p.tun_sa_in = VppIpsecSA(
1834             self,
1835             p.scapy_tun_sa_id,
1836             p.scapy_tun_spi,
1837             p.auth_algo_vpp_id,
1838             p.auth_key,
1839             p.crypt_algo_vpp_id,
1840             p.crypt_key,
1841             self.vpp_esp_protocol,
1842             self.pg0.remote_ip4,
1843             self.pg0.local_ip4,
1844         )
1845         p.tun_sa_in.add_vpp_config()
1846
1847         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1848         p.tun_if.add_vpp_config()
1849
1850         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1851         p.tun_protect.add_vpp_config()
1852
1853         p.tun_if.admin_up()
1854         p.tun_if.config_ip4()
1855         config_tun_params(p, self.encryption_type, p.tun_if)
1856
1857         VppIpRoute(
1858             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1859         ).add_vpp_config()
1860
1861     def tearDown(self):
1862         p = self.ipv4_params
1863         p.tun_if.unconfig_ip4()
1864         super(TestIpsecGreIfEsp, self).tearDown()
1865
1866
1867 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1868     """Ipsec GRE ESP - TRA tests"""
1869
1870     tun4_encrypt_node_name = "esp4-encrypt-tun"
1871     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1872     encryption_type = ESP
1873
1874     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1875         return [
1876             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1877             / sa.encrypt(
1878                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1879                 / GRE()
1880                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1881                 / UDP(sport=1144, dport=2233)
1882                 / Raw(b"X" * payload_size)
1883             )
1884             for i in range(count)
1885         ]
1886
1887     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1888         return [
1889             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1890             / sa.encrypt(
1891                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1892                 / GRE()
1893                 / UDP(sport=1144, dport=2233)
1894                 / Raw(b"X" * payload_size)
1895             )
1896             for i in range(count)
1897         ]
1898
1899     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1900         return [
1901             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1902             / IP(src="1.1.1.1", dst="1.1.1.2")
1903             / UDP(sport=1144, dport=2233)
1904             / Raw(b"X" * payload_size)
1905             for i in range(count)
1906         ]
1907
1908     def verify_decrypted(self, p, rxs):
1909         for rx in rxs:
1910             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1911             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1912
1913     def verify_encrypted(self, p, sa, rxs):
1914         for rx in rxs:
1915             try:
1916                 pkt = sa.decrypt(rx[IP])
1917                 if not pkt.haslayer(IP):
1918                     pkt = IP(pkt[Raw].load)
1919                 self.assert_packet_checksums_valid(pkt)
1920                 self.assertTrue(pkt.haslayer(GRE))
1921                 e = pkt[GRE]
1922                 self.assertEqual(e[IP].dst, "1.1.1.2")
1923             except (IndexError, AssertionError):
1924                 self.logger.debug(ppp("Unexpected packet:", rx))
1925                 try:
1926                     self.logger.debug(ppp("Decrypted packet:", pkt))
1927                 except:
1928                     pass
1929                 raise
1930
1931     def setUp(self):
1932         super(TestIpsecGreIfEspTra, self).setUp()
1933
1934         self.tun_if = self.pg0
1935
1936         p = self.ipv4_params
1937
1938         p.tun_sa_out = VppIpsecSA(
1939             self,
1940             p.vpp_tun_sa_id,
1941             p.vpp_tun_spi,
1942             p.auth_algo_vpp_id,
1943             p.auth_key,
1944             p.crypt_algo_vpp_id,
1945             p.crypt_key,
1946             self.vpp_esp_protocol,
1947         )
1948         p.tun_sa_out.add_vpp_config()
1949
1950         p.tun_sa_in = VppIpsecSA(
1951             self,
1952             p.scapy_tun_sa_id,
1953             p.scapy_tun_spi,
1954             p.auth_algo_vpp_id,
1955             p.auth_key,
1956             p.crypt_algo_vpp_id,
1957             p.crypt_key,
1958             self.vpp_esp_protocol,
1959         )
1960         p.tun_sa_in.add_vpp_config()
1961
1962         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1963         p.tun_if.add_vpp_config()
1964
1965         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1966         p.tun_protect.add_vpp_config()
1967
1968         p.tun_if.admin_up()
1969         p.tun_if.config_ip4()
1970         config_tra_params(p, self.encryption_type, p.tun_if)
1971
1972         VppIpRoute(
1973             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1974         ).add_vpp_config()
1975
1976     def tearDown(self):
1977         p = self.ipv4_params
1978         p.tun_if.unconfig_ip4()
1979         super(TestIpsecGreIfEspTra, self).tearDown()
1980
1981     def test_gre_non_ip(self):
1982         p = self.ipv4_params
1983         tx = self.gen_encrypt_non_ip_pkts(
1984             p.scapy_tun_sa,
1985             self.tun_if,
1986             src=p.remote_tun_if_host,
1987             dst=self.pg1.remote_ip6,
1988         )
1989         self.send_and_assert_no_replies(self.tun_if, tx)
1990         node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1991         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1992         err = p.tun_sa_in.get_err("unsup_payload")
1993         self.assertEqual(err, 1)
1994
1995
1996 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1997     """Ipsec GRE ESP - TRA tests"""
1998
1999     tun6_encrypt_node_name = "esp6-encrypt-tun"
2000     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2001     encryption_type = ESP
2002
2003     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2004         return [
2005             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2006             / sa.encrypt(
2007                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2008                 / GRE()
2009                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2010                 / UDP(sport=1144, dport=2233)
2011                 / Raw(b"X" * payload_size)
2012             )
2013             for i in range(count)
2014         ]
2015
2016     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2017         return [
2018             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2019             / IPv6(src="1::1", dst="1::2")
2020             / UDP(sport=1144, dport=2233)
2021             / Raw(b"X" * payload_size)
2022             for i in range(count)
2023         ]
2024
2025     def verify_decrypted6(self, p, rxs):
2026         for rx in rxs:
2027             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2028             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2029
2030     def verify_encrypted6(self, p, sa, rxs):
2031         for rx in rxs:
2032             try:
2033                 pkt = sa.decrypt(rx[IPv6])
2034                 if not pkt.haslayer(IPv6):
2035                     pkt = IPv6(pkt[Raw].load)
2036                 self.assert_packet_checksums_valid(pkt)
2037                 self.assertTrue(pkt.haslayer(GRE))
2038                 e = pkt[GRE]
2039                 self.assertEqual(e[IPv6].dst, "1::2")
2040             except (IndexError, AssertionError):
2041                 self.logger.debug(ppp("Unexpected packet:", rx))
2042                 try:
2043                     self.logger.debug(ppp("Decrypted packet:", pkt))
2044                 except:
2045                     pass
2046                 raise
2047
2048     def setUp(self):
2049         super(TestIpsecGre6IfEspTra, self).setUp()
2050
2051         self.tun_if = self.pg0
2052
2053         p = self.ipv6_params
2054
2055         bd1 = VppBridgeDomain(self, 1)
2056         bd1.add_vpp_config()
2057
2058         p.tun_sa_out = VppIpsecSA(
2059             self,
2060             p.vpp_tun_sa_id,
2061             p.vpp_tun_spi,
2062             p.auth_algo_vpp_id,
2063             p.auth_key,
2064             p.crypt_algo_vpp_id,
2065             p.crypt_key,
2066             self.vpp_esp_protocol,
2067         )
2068         p.tun_sa_out.add_vpp_config()
2069
2070         p.tun_sa_in = VppIpsecSA(
2071             self,
2072             p.scapy_tun_sa_id,
2073             p.scapy_tun_spi,
2074             p.auth_algo_vpp_id,
2075             p.auth_key,
2076             p.crypt_algo_vpp_id,
2077             p.crypt_key,
2078             self.vpp_esp_protocol,
2079         )
2080         p.tun_sa_in.add_vpp_config()
2081
2082         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2083         p.tun_if.add_vpp_config()
2084
2085         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2086         p.tun_protect.add_vpp_config()
2087
2088         p.tun_if.admin_up()
2089         p.tun_if.config_ip6()
2090         config_tra_params(p, self.encryption_type, p.tun_if)
2091
2092         r = VppIpRoute(
2093             self,
2094             "1::2",
2095             128,
2096             [
2097                 VppRoutePath(
2098                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2099                 )
2100             ],
2101         )
2102         r.add_vpp_config()
2103
2104     def tearDown(self):
2105         p = self.ipv6_params
2106         p.tun_if.unconfig_ip6()
2107         super(TestIpsecGre6IfEspTra, self).tearDown()
2108
2109
2110 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2111     """Ipsec mGRE ESP v4 TRA tests"""
2112
2113     tun4_encrypt_node_name = "esp4-encrypt-tun"
2114     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2115     encryption_type = ESP
2116
2117     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2118         return [
2119             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2120             / sa.encrypt(
2121                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2122                 / GRE()
2123                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2124                 / UDP(sport=1144, dport=2233)
2125                 / Raw(b"X" * payload_size)
2126             )
2127             for i in range(count)
2128         ]
2129
2130     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2131         return [
2132             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2133             / IP(src="1.1.1.1", dst=dst)
2134             / UDP(sport=1144, dport=2233)
2135             / Raw(b"X" * payload_size)
2136             for i in range(count)
2137         ]
2138
2139     def verify_decrypted(self, p, rxs):
2140         for rx in rxs:
2141             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2142             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2143
2144     def verify_encrypted(self, p, sa, rxs):
2145         for rx in rxs:
2146             try:
2147                 pkt = sa.decrypt(rx[IP])
2148                 if not pkt.haslayer(IP):
2149                     pkt = IP(pkt[Raw].load)
2150                 self.assert_packet_checksums_valid(pkt)
2151                 self.assertTrue(pkt.haslayer(GRE))
2152                 e = pkt[GRE]
2153                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2154             except (IndexError, AssertionError):
2155                 self.logger.debug(ppp("Unexpected packet:", rx))
2156                 try:
2157                     self.logger.debug(ppp("Decrypted packet:", pkt))
2158                 except:
2159                     pass
2160                 raise
2161
2162     def setUp(self):
2163         super(TestIpsecMGreIfEspTra4, self).setUp()
2164
2165         N_NHS = 16
2166         self.tun_if = self.pg0
2167         p = self.ipv4_params
2168         p.tun_if = VppGreInterface(
2169             self,
2170             self.pg0.local_ip4,
2171             "0.0.0.0",
2172             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2173         )
2174         p.tun_if.add_vpp_config()
2175         p.tun_if.admin_up()
2176         p.tun_if.config_ip4()
2177         p.tun_if.generate_remote_hosts(N_NHS)
2178         self.pg0.generate_remote_hosts(N_NHS)
2179         self.pg0.configure_ipv4_neighbors()
2180
2181         # setup some SAs for several next-hops on the interface
2182         self.multi_params = []
2183
2184         for ii in range(N_NHS):
2185             p = copy.copy(self.ipv4_params)
2186
2187             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2188             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2189             p.scapy_tun_spi = p.scapy_tun_spi + ii
2190             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2191             p.vpp_tun_spi = p.vpp_tun_spi + ii
2192
2193             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2194             p.scapy_tra_spi = p.scapy_tra_spi + ii
2195             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2196             p.vpp_tra_spi = p.vpp_tra_spi + ii
2197             p.tun_sa_out = VppIpsecSA(
2198                 self,
2199                 p.vpp_tun_sa_id,
2200                 p.vpp_tun_spi,
2201                 p.auth_algo_vpp_id,
2202                 p.auth_key,
2203                 p.crypt_algo_vpp_id,
2204                 p.crypt_key,
2205                 self.vpp_esp_protocol,
2206             )
2207             p.tun_sa_out.add_vpp_config()
2208
2209             p.tun_sa_in = VppIpsecSA(
2210                 self,
2211                 p.scapy_tun_sa_id,
2212                 p.scapy_tun_spi,
2213                 p.auth_algo_vpp_id,
2214                 p.auth_key,
2215                 p.crypt_algo_vpp_id,
2216                 p.crypt_key,
2217                 self.vpp_esp_protocol,
2218             )
2219             p.tun_sa_in.add_vpp_config()
2220
2221             p.tun_protect = VppIpsecTunProtect(
2222                 self,
2223                 p.tun_if,
2224                 p.tun_sa_out,
2225                 [p.tun_sa_in],
2226                 nh=p.tun_if.remote_hosts[ii].ip4,
2227             )
2228             p.tun_protect.add_vpp_config()
2229             config_tra_params(p, self.encryption_type, p.tun_if)
2230             self.multi_params.append(p)
2231
2232             VppIpRoute(
2233                 self,
2234                 p.remote_tun_if_host,
2235                 32,
2236                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2237             ).add_vpp_config()
2238
2239             # in this v4 variant add the teibs after the protect
2240             p.teib = VppTeib(
2241                 self,
2242                 p.tun_if,
2243                 p.tun_if.remote_hosts[ii].ip4,
2244                 self.pg0.remote_hosts[ii].ip4,
2245             ).add_vpp_config()
2246             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2247         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2248
2249     def tearDown(self):
2250         p = self.ipv4_params
2251         p.tun_if.unconfig_ip4()
2252         super(TestIpsecMGreIfEspTra4, self).tearDown()
2253
2254     def test_tun_44(self):
2255         """mGRE IPSEC 44"""
2256         N_PKTS = 63
2257         for p in self.multi_params:
2258             self.verify_tun_44(p, count=N_PKTS)
2259             p.teib.remove_vpp_config()
2260             self.verify_tun_dropped_44(p, count=N_PKTS)
2261             p.teib.add_vpp_config()
2262             self.verify_tun_44(p, count=N_PKTS)
2263
2264
2265 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2266     """Ipsec mGRE ESP v6 TRA tests"""
2267
2268     tun6_encrypt_node_name = "esp6-encrypt-tun"
2269     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2270     encryption_type = ESP
2271
2272     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2273         return [
2274             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2275             / sa.encrypt(
2276                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2277                 / GRE()
2278                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2279                 / UDP(sport=1144, dport=2233)
2280                 / Raw(b"X" * payload_size)
2281             )
2282             for i in range(count)
2283         ]
2284
2285     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2286         return [
2287             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2288             / IPv6(src="1::1", dst=dst)
2289             / UDP(sport=1144, dport=2233)
2290             / Raw(b"X" * payload_size)
2291             for i in range(count)
2292         ]
2293
2294     def verify_decrypted6(self, p, rxs):
2295         for rx in rxs:
2296             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2297             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2298
2299     def verify_encrypted6(self, p, sa, rxs):
2300         for rx in rxs:
2301             try:
2302                 pkt = sa.decrypt(rx[IPv6])
2303                 if not pkt.haslayer(IPv6):
2304                     pkt = IPv6(pkt[Raw].load)
2305                 self.assert_packet_checksums_valid(pkt)
2306                 self.assertTrue(pkt.haslayer(GRE))
2307                 e = pkt[GRE]
2308                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2309             except (IndexError, AssertionError):
2310                 self.logger.debug(ppp("Unexpected packet:", rx))
2311                 try:
2312                     self.logger.debug(ppp("Decrypted packet:", pkt))
2313                 except:
2314                     pass
2315                 raise
2316
2317     def setUp(self):
2318         super(TestIpsecMGreIfEspTra6, self).setUp()
2319
2320         self.vapi.cli("set logging class ipsec level debug")
2321
2322         N_NHS = 16
2323         self.tun_if = self.pg0
2324         p = self.ipv6_params
2325         p.tun_if = VppGreInterface(
2326             self,
2327             self.pg0.local_ip6,
2328             "::",
2329             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2330         )
2331         p.tun_if.add_vpp_config()
2332         p.tun_if.admin_up()
2333         p.tun_if.config_ip6()
2334         p.tun_if.generate_remote_hosts(N_NHS)
2335         self.pg0.generate_remote_hosts(N_NHS)
2336         self.pg0.configure_ipv6_neighbors()
2337
2338         # setup some SAs for several next-hops on the interface
2339         self.multi_params = []
2340
2341         for ii in range(N_NHS):
2342             p = copy.copy(self.ipv6_params)
2343
2344             p.remote_tun_if_host = "1::%d" % (ii + 1)
2345             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2346             p.scapy_tun_spi = p.scapy_tun_spi + ii
2347             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2348             p.vpp_tun_spi = p.vpp_tun_spi + ii
2349
2350             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2351             p.scapy_tra_spi = p.scapy_tra_spi + ii
2352             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2353             p.vpp_tra_spi = p.vpp_tra_spi + ii
2354             p.tun_sa_out = VppIpsecSA(
2355                 self,
2356                 p.vpp_tun_sa_id,
2357                 p.vpp_tun_spi,
2358                 p.auth_algo_vpp_id,
2359                 p.auth_key,
2360                 p.crypt_algo_vpp_id,
2361                 p.crypt_key,
2362                 self.vpp_esp_protocol,
2363             )
2364             p.tun_sa_out.add_vpp_config()
2365
2366             p.tun_sa_in = VppIpsecSA(
2367                 self,
2368                 p.scapy_tun_sa_id,
2369                 p.scapy_tun_spi,
2370                 p.auth_algo_vpp_id,
2371                 p.auth_key,
2372                 p.crypt_algo_vpp_id,
2373                 p.crypt_key,
2374                 self.vpp_esp_protocol,
2375             )
2376             p.tun_sa_in.add_vpp_config()
2377
2378             # in this v6 variant add the teibs first then the protection
2379             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2380             VppTeib(
2381                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2382             ).add_vpp_config()
2383
2384             p.tun_protect = VppIpsecTunProtect(
2385                 self,
2386                 p.tun_if,
2387                 p.tun_sa_out,
2388                 [p.tun_sa_in],
2389                 nh=p.tun_if.remote_hosts[ii].ip6,
2390             )
2391             p.tun_protect.add_vpp_config()
2392             config_tra_params(p, self.encryption_type, p.tun_if)
2393             self.multi_params.append(p)
2394
2395             VppIpRoute(
2396                 self,
2397                 p.remote_tun_if_host,
2398                 128,
2399                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2400             ).add_vpp_config()
2401             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2402
2403         self.logger.info(self.vapi.cli("sh log"))
2404         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2405         self.logger.info(self.vapi.cli("sh adj 41"))
2406
2407     def tearDown(self):
2408         p = self.ipv6_params
2409         p.tun_if.unconfig_ip6()
2410         super(TestIpsecMGreIfEspTra6, self).tearDown()
2411
2412     def test_tun_66(self):
2413         """mGRE IPSec 66"""
2414         for p in self.multi_params:
2415             self.verify_tun_66(p, count=63)
2416
2417
2418 @tag_fixme_vpp_workers
2419 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2420     """IPsec IPv4 Tunnel protect - transport mode"""
2421
2422     def setUp(self):
2423         super(TestIpsec4TunProtect, self).setUp()
2424
2425         self.tun_if = self.pg0
2426
2427     def tearDown(self):
2428         super(TestIpsec4TunProtect, self).tearDown()
2429
2430     def test_tun_44(self):
2431         """IPSEC tunnel protect"""
2432
2433         p = self.ipv4_params
2434
2435         self.config_network(p)
2436         self.config_sa_tra(p)
2437         self.config_protect(p)
2438
2439         self.verify_tun_44(p, count=127)
2440         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2441         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2442
2443         self.vapi.cli("clear ipsec sa")
2444         self.verify_tun_64(p, count=127)
2445         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2446         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2447
2448         # rekey - create new SAs and update the tunnel protection
2449         np = copy.copy(p)
2450         np.crypt_key = b"X" + p.crypt_key[1:]
2451         np.scapy_tun_spi += 100
2452         np.scapy_tun_sa_id += 1
2453         np.vpp_tun_spi += 100
2454         np.vpp_tun_sa_id += 1
2455         np.tun_if.local_spi = p.vpp_tun_spi
2456         np.tun_if.remote_spi = p.scapy_tun_spi
2457
2458         self.config_sa_tra(np)
2459         self.config_protect(np)
2460         self.unconfig_sa(p)
2461
2462         self.verify_tun_44(np, count=127)
2463         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2464         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2465
2466         # teardown
2467         self.unconfig_protect(np)
2468         self.unconfig_sa(np)
2469         self.unconfig_network(p)
2470
2471
2472 @tag_fixme_vpp_workers
2473 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2474     """IPsec IPv4 Tunnel protect - transport mode"""
2475
2476     def setUp(self):
2477         super(TestIpsec4TunProtectUdp, self).setUp()
2478
2479         self.tun_if = self.pg0
2480
2481         p = self.ipv4_params
2482         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2483         p.nat_header = UDP(sport=4500, dport=4500)
2484         self.config_network(p)
2485         self.config_sa_tra(p)
2486         self.config_protect(p)
2487
2488     def tearDown(self):
2489         p = self.ipv4_params
2490         self.unconfig_protect(p)
2491         self.unconfig_sa(p)
2492         self.unconfig_network(p)
2493         super(TestIpsec4TunProtectUdp, self).tearDown()
2494
2495     def verify_encrypted(self, p, sa, rxs):
2496         # ensure encrypted packets are recieved with the default UDP ports
2497         for rx in rxs:
2498             self.assertEqual(rx[UDP].sport, 4500)
2499             self.assertEqual(rx[UDP].dport, 4500)
2500         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2501
2502     def test_tun_44(self):
2503         """IPSEC UDP tunnel protect"""
2504
2505         p = self.ipv4_params
2506
2507         self.verify_tun_44(p, count=127)
2508         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2509         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2510
2511     def test_keepalive(self):
2512         """IPSEC NAT Keepalive"""
2513         self.verify_keepalive(self.ipv4_params)
2514
2515
2516 @tag_fixme_vpp_workers
2517 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2518     """IPsec IPv4 Tunnel protect - tunnel mode"""
2519
2520     encryption_type = ESP
2521     tun4_encrypt_node_name = "esp4-encrypt-tun"
2522     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2523
2524     def setUp(self):
2525         super(TestIpsec4TunProtectTun, self).setUp()
2526
2527         self.tun_if = self.pg0
2528
2529     def tearDown(self):
2530         super(TestIpsec4TunProtectTun, self).tearDown()
2531
2532     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2533         return [
2534             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2535             / sa.encrypt(
2536                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2537                 / IP(src=src, dst=dst)
2538                 / UDP(sport=1144, dport=2233)
2539                 / Raw(b"X" * payload_size)
2540             )
2541             for i in range(count)
2542         ]
2543
2544     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2545         return [
2546             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2547             / IP(src=src, dst=dst)
2548             / UDP(sport=1144, dport=2233)
2549             / Raw(b"X" * payload_size)
2550             for i in range(count)
2551         ]
2552
2553     def verify_decrypted(self, p, rxs):
2554         for rx in rxs:
2555             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2556             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2557             self.assert_packet_checksums_valid(rx)
2558
2559     def verify_encrypted(self, p, sa, rxs):
2560         for rx in rxs:
2561             try:
2562                 pkt = sa.decrypt(rx[IP])
2563                 if not pkt.haslayer(IP):
2564                     pkt = IP(pkt[Raw].load)
2565                 self.assert_packet_checksums_valid(pkt)
2566                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2567                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2568                 inner = pkt[IP].payload
2569                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2570
2571             except (IndexError, AssertionError):
2572                 self.logger.debug(ppp("Unexpected packet:", rx))
2573                 try:
2574                     self.logger.debug(ppp("Decrypted packet:", pkt))
2575                 except:
2576                     pass
2577                 raise
2578
2579     def test_tun_44(self):
2580         """IPSEC tunnel protect"""
2581
2582         p = self.ipv4_params
2583
2584         self.config_network(p)
2585         self.config_sa_tun(p)
2586         self.config_protect(p)
2587
2588         # also add an output features on the tunnel and physical interface
2589         # so we test they still work
2590         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2591         a = VppAcl(self, [r_all]).add_vpp_config()
2592
2593         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2594         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2595
2596         self.verify_tun_44(p, count=127)
2597
2598         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2599         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2600
2601         # rekey - create new SAs and update the tunnel protection
2602         np = copy.copy(p)
2603         np.crypt_key = b"X" + p.crypt_key[1:]
2604         np.scapy_tun_spi += 100
2605         np.scapy_tun_sa_id += 1
2606         np.vpp_tun_spi += 100
2607         np.vpp_tun_sa_id += 1
2608         np.tun_if.local_spi = p.vpp_tun_spi
2609         np.tun_if.remote_spi = p.scapy_tun_spi
2610
2611         self.config_sa_tun(np)
2612         self.config_protect(np)
2613         self.unconfig_sa(p)
2614
2615         self.verify_tun_44(np, count=127)
2616         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2617         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2618
2619         # teardown
2620         self.unconfig_protect(np)
2621         self.unconfig_sa(np)
2622         self.unconfig_network(p)
2623
2624
2625 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2626     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2627
2628     encryption_type = ESP
2629     tun4_encrypt_node_name = "esp4-encrypt-tun"
2630     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2631
2632     def setUp(self):
2633         super(TestIpsec4TunProtectTunDrop, self).setUp()
2634
2635         self.tun_if = self.pg0
2636
2637     def tearDown(self):
2638         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2639
2640     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2641         return [
2642             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2643             / sa.encrypt(
2644                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2645                 / IP(src=src, dst=dst)
2646                 / UDP(sport=1144, dport=2233)
2647                 / Raw(b"X" * payload_size)
2648             )
2649             for i in range(count)
2650         ]
2651
2652     def test_tun_drop_44(self):
2653         """IPSEC tunnel protect bogus tunnel header"""
2654
2655         p = self.ipv4_params
2656
2657         self.config_network(p)
2658         self.config_sa_tun(p)
2659         self.config_protect(p)
2660
2661         tx = self.gen_encrypt_pkts(
2662             p,
2663             p.scapy_tun_sa,
2664             self.tun_if,
2665             src=p.remote_tun_if_host,
2666             dst=self.pg1.remote_ip4,
2667             count=63,
2668         )
2669         self.send_and_assert_no_replies(self.tun_if, tx)
2670
2671         # teardown
2672         self.unconfig_protect(p)
2673         self.unconfig_sa(p)
2674         self.unconfig_network(p)
2675
2676
2677 @tag_fixme_vpp_workers
2678 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2679     """IPsec IPv6 Tunnel protect - transport mode"""
2680
2681     encryption_type = ESP
2682     tun6_encrypt_node_name = "esp6-encrypt-tun"
2683     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2684
2685     def setUp(self):
2686         super(TestIpsec6TunProtect, self).setUp()
2687
2688         self.tun_if = self.pg0
2689
2690     def tearDown(self):
2691         super(TestIpsec6TunProtect, self).tearDown()
2692
2693     def test_tun_66(self):
2694         """IPSEC tunnel protect 6o6"""
2695
2696         p = self.ipv6_params
2697
2698         self.config_network(p)
2699         self.config_sa_tra(p)
2700         self.config_protect(p)
2701
2702         self.verify_tun_66(p, count=127)
2703         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2704         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2705
2706         # rekey - create new SAs and update the tunnel protection
2707         np = copy.copy(p)
2708         np.crypt_key = b"X" + p.crypt_key[1:]
2709         np.scapy_tun_spi += 100
2710         np.scapy_tun_sa_id += 1
2711         np.vpp_tun_spi += 100
2712         np.vpp_tun_sa_id += 1
2713         np.tun_if.local_spi = p.vpp_tun_spi
2714         np.tun_if.remote_spi = p.scapy_tun_spi
2715
2716         self.config_sa_tra(np)
2717         self.config_protect(np)
2718         self.unconfig_sa(p)
2719
2720         self.verify_tun_66(np, count=127)
2721         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2722         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2723
2724         # bounce the interface state
2725         p.tun_if.admin_down()
2726         self.verify_drop_tun_66(np, count=127)
2727         node = "/err/ipsec6-tun-input/disabled"
2728         self.assertEqual(127, self.statistics.get_err_counter(node))
2729         p.tun_if.admin_up()
2730         self.verify_tun_66(np, count=127)
2731
2732         # 3 phase rekey
2733         #  1) add two input SAs [old, new]
2734         #  2) swap output SA to [new]
2735         #  3) use only [new] input SA
2736         np3 = copy.copy(np)
2737         np3.crypt_key = b"Z" + p.crypt_key[1:]
2738         np3.scapy_tun_spi += 100
2739         np3.scapy_tun_sa_id += 1
2740         np3.vpp_tun_spi += 100
2741         np3.vpp_tun_sa_id += 1
2742         np3.tun_if.local_spi = p.vpp_tun_spi
2743         np3.tun_if.remote_spi = p.scapy_tun_spi
2744
2745         self.config_sa_tra(np3)
2746
2747         # step 1;
2748         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2749         self.verify_tun_66(np, np, count=127)
2750         self.verify_tun_66(np3, np, count=127)
2751
2752         # step 2;
2753         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2754         self.verify_tun_66(np, np3, count=127)
2755         self.verify_tun_66(np3, np3, count=127)
2756
2757         # step 1;
2758         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2759         self.verify_tun_66(np3, np3, count=127)
2760         self.verify_drop_tun_rx_66(np, count=127)
2761
2762         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2763         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2764         self.unconfig_sa(np)
2765
2766         # teardown
2767         self.unconfig_protect(np3)
2768         self.unconfig_sa(np3)
2769         self.unconfig_network(p)
2770
2771     def test_tun_46(self):
2772         """IPSEC tunnel protect 4o6"""
2773
2774         p = self.ipv6_params
2775
2776         self.config_network(p)
2777         self.config_sa_tra(p)
2778         self.config_protect(p)
2779
2780         self.verify_tun_46(p, count=127)
2781         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2782         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2783
2784         # teardown
2785         self.unconfig_protect(p)
2786         self.unconfig_sa(p)
2787         self.unconfig_network(p)
2788
2789
2790 @tag_fixme_vpp_workers
2791 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2792     """IPsec IPv6 Tunnel protect - tunnel mode"""
2793
2794     encryption_type = ESP
2795     tun6_encrypt_node_name = "esp6-encrypt-tun"
2796     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2797
2798     def setUp(self):
2799         super(TestIpsec6TunProtectTun, self).setUp()
2800
2801         self.tun_if = self.pg0
2802
2803     def tearDown(self):
2804         super(TestIpsec6TunProtectTun, self).tearDown()
2805
2806     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2807         return [
2808             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2809             / sa.encrypt(
2810                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2811                 / IPv6(src=src, dst=dst)
2812                 / UDP(sport=1166, dport=2233)
2813                 / Raw(b"X" * payload_size)
2814             )
2815             for i in range(count)
2816         ]
2817
2818     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2819         return [
2820             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2821             / IPv6(src=src, dst=dst)
2822             / UDP(sport=1166, dport=2233)
2823             / Raw(b"X" * payload_size)
2824             for i in range(count)
2825         ]
2826
2827     def verify_decrypted6(self, p, rxs):
2828         for rx in rxs:
2829             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2830             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2831             self.assert_packet_checksums_valid(rx)
2832
2833     def verify_encrypted6(self, p, sa, rxs):
2834         for rx in rxs:
2835             try:
2836                 pkt = sa.decrypt(rx[IPv6])
2837                 if not pkt.haslayer(IPv6):
2838                     pkt = IPv6(pkt[Raw].load)
2839                 self.assert_packet_checksums_valid(pkt)
2840                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2841                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2842                 inner = pkt[IPv6].payload
2843                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2844
2845             except (IndexError, AssertionError):
2846                 self.logger.debug(ppp("Unexpected packet:", rx))
2847                 try:
2848                     self.logger.debug(ppp("Decrypted packet:", pkt))
2849                 except:
2850                     pass
2851                 raise
2852
2853     def test_tun_66(self):
2854         """IPSEC tunnel protect"""
2855
2856         p = self.ipv6_params
2857
2858         self.config_network(p)
2859         self.config_sa_tun(p)
2860         self.config_protect(p)
2861
2862         self.verify_tun_66(p, count=127)
2863
2864         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2865         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2866
2867         # rekey - create new SAs and update the tunnel protection
2868         np = copy.copy(p)
2869         np.crypt_key = b"X" + p.crypt_key[1:]
2870         np.scapy_tun_spi += 100
2871         np.scapy_tun_sa_id += 1
2872         np.vpp_tun_spi += 100
2873         np.vpp_tun_sa_id += 1
2874         np.tun_if.local_spi = p.vpp_tun_spi
2875         np.tun_if.remote_spi = p.scapy_tun_spi
2876
2877         self.config_sa_tun(np)
2878         self.config_protect(np)
2879         self.unconfig_sa(p)
2880
2881         self.verify_tun_66(np, count=127)
2882         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2883         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2884
2885         # teardown
2886         self.unconfig_protect(np)
2887         self.unconfig_sa(np)
2888         self.unconfig_network(p)
2889
2890
2891 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2892     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2893
2894     encryption_type = ESP
2895     tun6_encrypt_node_name = "esp6-encrypt-tun"
2896     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2897
2898     def setUp(self):
2899         super(TestIpsec6TunProtectTunDrop, self).setUp()
2900
2901         self.tun_if = self.pg0
2902
2903     def tearDown(self):
2904         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2905
2906     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2907         # the IP destination of the revelaed packet does not match
2908         # that assigned to the tunnel
2909         return [
2910             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2911             / sa.encrypt(
2912                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2913                 / IPv6(src=src, dst=dst)
2914                 / UDP(sport=1144, dport=2233)
2915                 / Raw(b"X" * payload_size)
2916             )
2917             for i in range(count)
2918         ]
2919
2920     def test_tun_drop_66(self):
2921         """IPSEC 6 tunnel protect bogus tunnel header"""
2922
2923         p = self.ipv6_params
2924
2925         self.config_network(p)
2926         self.config_sa_tun(p)
2927         self.config_protect(p)
2928
2929         tx = self.gen_encrypt_pkts6(
2930             p,
2931             p.scapy_tun_sa,
2932             self.tun_if,
2933             src=p.remote_tun_if_host,
2934             dst=self.pg1.remote_ip6,
2935             count=63,
2936         )
2937         self.send_and_assert_no_replies(self.tun_if, tx)
2938
2939         self.unconfig_protect(p)
2940         self.unconfig_sa(p)
2941         self.unconfig_network(p)
2942
2943
2944 class TemplateIpsecItf4(object):
2945     """IPsec Interface IPv4"""
2946
2947     encryption_type = ESP
2948     tun4_encrypt_node_name = "esp4-encrypt-tun"
2949     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2950     tun4_input_node = "ipsec4-tun-input"
2951
2952     def config_sa_tun(self, p, src, dst):
2953         config_tun_params(p, self.encryption_type, None, src, dst)
2954
2955         p.tun_sa_out = VppIpsecSA(
2956             self,
2957             p.vpp_tun_sa_id,
2958             p.vpp_tun_spi,
2959             p.auth_algo_vpp_id,
2960             p.auth_key,
2961             p.crypt_algo_vpp_id,
2962             p.crypt_key,
2963             self.vpp_esp_protocol,
2964             src,
2965             dst,
2966             flags=p.flags,
2967         )
2968         p.tun_sa_out.add_vpp_config()
2969
2970         p.tun_sa_in = VppIpsecSA(
2971             self,
2972             p.scapy_tun_sa_id,
2973             p.scapy_tun_spi,
2974             p.auth_algo_vpp_id,
2975             p.auth_key,
2976             p.crypt_algo_vpp_id,
2977             p.crypt_key,
2978             self.vpp_esp_protocol,
2979             dst,
2980             src,
2981             flags=p.flags
2982             | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
2983         )
2984         p.tun_sa_in.add_vpp_config()
2985
2986     def config_protect(self, p):
2987         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2988         p.tun_protect.add_vpp_config()
2989
2990     def config_network(self, p, instance=0xFFFFFFFF):
2991         p.tun_if = VppIpsecInterface(self, instance=instance)
2992
2993         p.tun_if.add_vpp_config()
2994         p.tun_if.admin_up()
2995         p.tun_if.config_ip4()
2996         p.tun_if.config_ip6()
2997
2998         p.route = VppIpRoute(
2999             self,
3000             p.remote_tun_if_host,
3001             32,
3002             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3003         )
3004         p.route.add_vpp_config()
3005         r = VppIpRoute(
3006             self,
3007             p.remote_tun_if_host6,
3008             128,
3009             [
3010                 VppRoutePath(
3011                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3012                 )
3013             ],
3014         )
3015         r.add_vpp_config()
3016
3017     def unconfig_network(self, p):
3018         p.route.remove_vpp_config()
3019         p.tun_if.remove_vpp_config()
3020
3021     def unconfig_protect(self, p):
3022         p.tun_protect.remove_vpp_config()
3023
3024     def unconfig_sa(self, p):
3025         p.tun_sa_out.remove_vpp_config()
3026         p.tun_sa_in.remove_vpp_config()
3027
3028
3029 @tag_fixme_vpp_workers
3030 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3031     """IPsec Interface IPv4"""
3032
3033     def setUp(self):
3034         super(TestIpsecItf4, self).setUp()
3035
3036         self.tun_if = self.pg0
3037
3038     def tearDown(self):
3039         super(TestIpsecItf4, self).tearDown()
3040
3041     def test_tun_instance_44(self):
3042         p = self.ipv4_params
3043         self.config_network(p, instance=3)
3044
3045         with self.assertRaises(CliFailedCommandError):
3046             self.vapi.cli("show interface ipsec0")
3047
3048         output = self.vapi.cli("show interface ipsec3")
3049         self.assertTrue("unknown" not in output)
3050
3051         self.unconfig_network(p)
3052
3053     def test_tun_44(self):
3054         """IPSEC interface IPv4"""
3055
3056         n_pkts = 127
3057         p = self.ipv4_params
3058
3059         self.config_network(p)
3060         config_tun_params(
3061             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3062         )
3063         self.verify_tun_dropped_44(p, count=n_pkts)
3064         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3065         self.config_protect(p)
3066
3067         self.verify_tun_44(p, count=n_pkts)
3068         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3069         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3070
3071         p.tun_if.admin_down()
3072         self.verify_tun_dropped_44(p, count=n_pkts)
3073         p.tun_if.admin_up()
3074         self.verify_tun_44(p, count=n_pkts)
3075
3076         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3077         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3078
3079         # it's a v6 packet when its encrypted
3080         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3081
3082         self.verify_tun_64(p, count=n_pkts)
3083         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3084         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3085
3086         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3087
3088         # update the SA tunnel
3089         config_tun_params(
3090             p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3091         )
3092         p.tun_sa_in.update_vpp_config(
3093             is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3094         )
3095         p.tun_sa_out.update_vpp_config(
3096             is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3097         )
3098         self.verify_tun_44(p, count=n_pkts)
3099         self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3100         self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3101
3102         self.vapi.cli("clear interfaces")
3103
3104         # rekey - create new SAs and update the tunnel protection
3105         np = copy.copy(p)
3106         np.crypt_key = b"X" + p.crypt_key[1:]
3107         np.scapy_tun_spi += 100
3108         np.scapy_tun_sa_id += 1
3109         np.vpp_tun_spi += 100
3110         np.vpp_tun_sa_id += 1
3111         np.tun_if.local_spi = p.vpp_tun_spi
3112         np.tun_if.remote_spi = p.scapy_tun_spi
3113
3114         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3115         self.config_protect(np)
3116         self.unconfig_sa(p)
3117
3118         self.verify_tun_44(np, count=n_pkts)
3119         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3120         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3121
3122         # teardown
3123         self.unconfig_protect(np)
3124         self.unconfig_sa(np)
3125         self.unconfig_network(p)
3126
3127     def test_tun_44_null(self):
3128         """IPSEC interface IPv4 NULL auth/crypto"""
3129
3130         n_pkts = 127
3131         p = copy.copy(self.ipv4_params)
3132
3133         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3134         p.crypt_algo_vpp_id = (
3135             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3136         )
3137         p.crypt_algo = "NULL"
3138         p.auth_algo = "NULL"
3139
3140         self.config_network(p)
3141         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3142         self.config_protect(p)
3143
3144         self.logger.info(self.vapi.cli("sh ipsec sa"))
3145         self.verify_tun_44(p, count=n_pkts)
3146
3147         # teardown
3148         self.unconfig_protect(p)
3149         self.unconfig_sa(p)
3150         self.unconfig_network(p)
3151
3152     def test_tun_44_police(self):
3153         """IPSEC interface IPv4 with input policer"""
3154         n_pkts = 127
3155         p = self.ipv4_params
3156
3157         self.config_network(p)
3158         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3159         self.config_protect(p)
3160
3161         action_tx = PolicerAction(
3162             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3163         )
3164         policer = VppPolicer(
3165             self,
3166             "pol1",
3167             80,
3168             0,
3169             1000,
3170             0,
3171             conform_action=action_tx,
3172             exceed_action=action_tx,
3173             violate_action=action_tx,
3174         )
3175         policer.add_vpp_config()
3176
3177         # Start policing on tun
3178         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3179
3180         self.verify_tun_44(p, count=n_pkts)
3181         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3182         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3183
3184         stats = policer.get_stats()
3185
3186         # Single rate, 2 colour policer - expect conform, violate but no exceed
3187         self.assertGreater(stats["conform_packets"], 0)
3188         self.assertEqual(stats["exceed_packets"], 0)
3189         self.assertGreater(stats["violate_packets"], 0)
3190
3191         # Stop policing on tun
3192         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3193         self.verify_tun_44(p, count=n_pkts)
3194
3195         # No new policer stats
3196         statsnew = policer.get_stats()
3197         self.assertEqual(stats, statsnew)
3198
3199         # teardown
3200         policer.remove_vpp_config()
3201         self.unconfig_protect(p)
3202         self.unconfig_sa(p)
3203         self.unconfig_network(p)
3204
3205
3206 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3207     """IPsec Interface MPLSoIPv4"""
3208
3209     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3210
3211     def setUp(self):
3212         super(TestIpsecItf4MPLS, self).setUp()
3213
3214         self.tun_if = self.pg0
3215
3216     def tearDown(self):
3217         super(TestIpsecItf4MPLS, self).tearDown()
3218
3219     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3220         return [
3221             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3222             / sa.encrypt(
3223                 MPLS(label=44, ttl=3)
3224                 / IP(src=src, dst=dst)
3225                 / UDP(sport=1166, dport=2233)
3226                 / Raw(b"X" * payload_size)
3227             )
3228             for i in range(count)
3229         ]
3230
3231     def verify_encrypted(self, p, sa, rxs):
3232         for rx in rxs:
3233             try:
3234                 pkt = sa.decrypt(rx[IP])
3235                 if not pkt.haslayer(IP):
3236                     pkt = IP(pkt[Raw].load)
3237                 self.assert_packet_checksums_valid(pkt)
3238                 self.assert_equal(pkt[MPLS].label, 44)
3239                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3240             except (IndexError, AssertionError):
3241                 self.logger.debug(ppp("Unexpected packet:", rx))
3242                 try:
3243                     self.logger.debug(ppp("Decrypted packet:", pkt))
3244                 except:
3245                     pass
3246                 raise
3247
3248     def test_tun_mpls_o_ip4(self):
3249         """IPSEC interface MPLS over IPv4"""
3250
3251         n_pkts = 127
3252         p = self.ipv4_params
3253         f = FibPathProto
3254
3255         tbl = VppMplsTable(self, 0)
3256         tbl.add_vpp_config()
3257
3258         self.config_network(p)
3259         # deag MPLS routes from the tunnel
3260         r4 = VppMplsRoute(
3261             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3262         ).add_vpp_config()
3263         p.route.modify(
3264             [
3265                 VppRoutePath(
3266                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3267                 )
3268             ]
3269         )
3270         p.tun_if.enable_mpls()
3271
3272         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3273         self.config_protect(p)
3274
3275         self.verify_tun_44(p, count=n_pkts)
3276
3277         # cleanup
3278         p.tun_if.disable_mpls()
3279         self.unconfig_protect(p)
3280         self.unconfig_sa(p)
3281         self.unconfig_network(p)
3282
3283
3284 class TemplateIpsecItf6(object):
3285     """IPsec Interface IPv6"""
3286
3287     encryption_type = ESP
3288     tun6_encrypt_node_name = "esp6-encrypt-tun"
3289     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3290     tun6_input_node = "ipsec6-tun-input"
3291
3292     def config_sa_tun(self, p, src, dst):
3293         config_tun_params(p, self.encryption_type, None, src, dst)
3294
3295         if not hasattr(p, "tun_flags"):
3296             p.tun_flags = None
3297         if not hasattr(p, "hop_limit"):
3298             p.hop_limit = 255
3299
3300         p.tun_sa_out = VppIpsecSA(
3301             self,
3302             p.vpp_tun_sa_id,
3303             p.vpp_tun_spi,
3304             p.auth_algo_vpp_id,
3305             p.auth_key,
3306             p.crypt_algo_vpp_id,
3307             p.crypt_key,
3308             self.vpp_esp_protocol,
3309             src,
3310             dst,
3311             flags=p.flags,
3312             tun_flags=p.tun_flags,
3313             hop_limit=p.hop_limit,
3314         )
3315         p.tun_sa_out.add_vpp_config()
3316
3317         p.tun_sa_in = VppIpsecSA(
3318             self,
3319             p.scapy_tun_sa_id,
3320             p.scapy_tun_spi,
3321             p.auth_algo_vpp_id,
3322             p.auth_key,
3323             p.crypt_algo_vpp_id,
3324             p.crypt_key,
3325             self.vpp_esp_protocol,
3326             dst,
3327             src,
3328             flags=p.flags,
3329         )
3330         p.tun_sa_in.add_vpp_config()
3331
3332     def config_protect(self, p):
3333         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3334         p.tun_protect.add_vpp_config()
3335
3336     def config_network(self, p):
3337         p.tun_if = VppIpsecInterface(self)
3338
3339         p.tun_if.add_vpp_config()
3340         p.tun_if.admin_up()
3341         p.tun_if.config_ip4()
3342         p.tun_if.config_ip6()
3343
3344         r = VppIpRoute(
3345             self,
3346             p.remote_tun_if_host4,
3347             32,
3348             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3349         )
3350         r.add_vpp_config()
3351
3352         p.route = VppIpRoute(
3353             self,
3354             p.remote_tun_if_host,
3355             128,
3356             [
3357                 VppRoutePath(
3358                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3359                 )
3360             ],
3361         )
3362         p.route.add_vpp_config()
3363
3364     def unconfig_network(self, p):
3365         p.route.remove_vpp_config()
3366         p.tun_if.remove_vpp_config()
3367
3368     def unconfig_protect(self, p):
3369         p.tun_protect.remove_vpp_config()
3370
3371     def unconfig_sa(self, p):
3372         p.tun_sa_out.remove_vpp_config()
3373         p.tun_sa_in.remove_vpp_config()
3374
3375
3376 @tag_fixme_vpp_workers
3377 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3378     """IPsec Interface IPv6"""
3379
3380     def setUp(self):
3381         super(TestIpsecItf6, self).setUp()
3382
3383         self.tun_if = self.pg0
3384
3385     def tearDown(self):
3386         super(TestIpsecItf6, self).tearDown()
3387
3388     def test_tun_66(self):
3389         """IPSEC interface IPv6"""
3390
3391         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3392         n_pkts = 127
3393         p = self.ipv6_params
3394         p.inner_hop_limit = 24
3395         p.outer_hop_limit = 23
3396         p.outer_flow_label = 243224
3397         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3398
3399         self.config_network(p)
3400         config_tun_params(
3401             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3402         )
3403         self.verify_drop_tun_66(p, count=n_pkts)
3404         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3405         self.config_protect(p)
3406
3407         self.verify_tun_66(p, count=n_pkts)
3408         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3409         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3410
3411         p.tun_if.admin_down()
3412         self.verify_drop_tun_66(p, count=n_pkts)
3413         p.tun_if.admin_up()
3414         self.verify_tun_66(p, count=n_pkts)
3415
3416         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3417         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3418
3419         # it's a v4 packet when its encrypted
3420         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3421
3422         self.verify_tun_46(p, count=n_pkts)
3423         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3424         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3425
3426         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3427
3428         self.vapi.cli("clear interfaces")
3429
3430         # rekey - create new SAs and update the tunnel protection
3431         np = copy.copy(p)
3432         np.crypt_key = b"X" + p.crypt_key[1:]
3433         np.scapy_tun_spi += 100
3434         np.scapy_tun_sa_id += 1
3435         np.vpp_tun_spi += 100
3436         np.vpp_tun_sa_id += 1
3437         np.tun_if.local_spi = p.vpp_tun_spi
3438         np.tun_if.remote_spi = p.scapy_tun_spi
3439         np.inner_hop_limit = 24
3440         np.outer_hop_limit = 128
3441         np.inner_flow_label = 0xABCDE
3442         np.outer_flow_label = 0xABCDE
3443         np.hop_limit = 128
3444         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3445
3446         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3447         self.config_protect(np)
3448         self.unconfig_sa(p)
3449
3450         self.verify_tun_66(np, count=n_pkts)
3451         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3452         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3453
3454         # teardown
3455         self.unconfig_protect(np)
3456         self.unconfig_sa(np)
3457         self.unconfig_network(p)
3458
3459     def test_tun_66_police(self):
3460         """IPSEC interface IPv6 with input policer"""
3461         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3462         n_pkts = 127
3463         p = self.ipv6_params
3464         p.inner_hop_limit = 24
3465         p.outer_hop_limit = 23
3466         p.outer_flow_label = 243224
3467         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3468
3469         self.config_network(p)
3470         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3471         self.config_protect(p)
3472
3473         action_tx = PolicerAction(
3474             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3475         )
3476         policer = VppPolicer(
3477             self,
3478             "pol1",
3479             80,
3480             0,
3481             1000,
3482             0,
3483             conform_action=action_tx,
3484             exceed_action=action_tx,
3485             violate_action=action_tx,
3486         )
3487         policer.add_vpp_config()
3488
3489         # Start policing on tun
3490         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3491
3492         self.verify_tun_66(p, count=n_pkts)
3493         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3494         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3495
3496         stats = policer.get_stats()
3497
3498         # Single rate, 2 colour policer - expect conform, violate but no exceed
3499         self.assertGreater(stats["conform_packets"], 0)
3500         self.assertEqual(stats["exceed_packets"], 0)
3501         self.assertGreater(stats["violate_packets"], 0)
3502
3503         # Stop policing on tun
3504         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3505         self.verify_tun_66(p, count=n_pkts)
3506
3507         # No new policer stats
3508         statsnew = policer.get_stats()
3509         self.assertEqual(stats, statsnew)
3510
3511         # teardown
3512         policer.remove_vpp_config()
3513         self.unconfig_protect(p)
3514         self.unconfig_sa(p)
3515         self.unconfig_network(p)
3516
3517
3518 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3519     """Ipsec P2MP ESP v4 tests"""
3520
3521     tun4_encrypt_node_name = "esp4-encrypt-tun"
3522     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3523     encryption_type = ESP
3524
3525     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3526         return [
3527             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3528             / sa.encrypt(
3529                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3530                 / UDP(sport=1144, dport=2233)
3531                 / Raw(b"X" * payload_size)
3532             )
3533             for i in range(count)
3534         ]
3535
3536     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3537         return [
3538             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3539             / IP(src="1.1.1.1", dst=dst)
3540             / UDP(sport=1144, dport=2233)
3541             / Raw(b"X" * payload_size)
3542             for i in range(count)
3543         ]
3544
3545     def verify_decrypted(self, p, rxs):
3546         for rx in rxs:
3547             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3548             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3549
3550     def verify_encrypted(self, p, sa, rxs):
3551         for rx in rxs:
3552             try:
3553                 self.assertEqual(
3554                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3555                 )
3556                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3557                 pkt = sa.decrypt(rx[IP])
3558                 if not pkt.haslayer(IP):
3559                     pkt = IP(pkt[Raw].load)
3560                 self.assert_packet_checksums_valid(pkt)
3561                 e = pkt[IP]
3562                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3563             except (IndexError, AssertionError):
3564                 self.logger.debug(ppp("Unexpected packet:", rx))
3565                 try:
3566                     self.logger.debug(ppp("Decrypted packet:", pkt))
3567                 except:
3568                     pass
3569                 raise
3570
3571     def setUp(self):
3572         super(TestIpsecMIfEsp4, self).setUp()
3573
3574         N_NHS = 16
3575         self.tun_if = self.pg0
3576         p = self.ipv4_params
3577         p.tun_if = VppIpsecInterface(
3578             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3579         )
3580         p.tun_if.add_vpp_config()
3581         p.tun_if.admin_up()
3582         p.tun_if.config_ip4()
3583         p.tun_if.unconfig_ip4()
3584         p.tun_if.config_ip4()
3585         p.tun_if.generate_remote_hosts(N_NHS)
3586         self.pg0.generate_remote_hosts(N_NHS)
3587         self.pg0.configure_ipv4_neighbors()
3588
3589         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3590         a = VppAcl(self, [r_all]).add_vpp_config()
3591
3592         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3593         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3594
3595         # setup some SAs for several next-hops on the interface
3596         self.multi_params = []
3597
3598         for ii in range(N_NHS):
3599             p = copy.copy(self.ipv4_params)
3600
3601             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3602             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3603             p.scapy_tun_spi = p.scapy_tun_spi + ii
3604             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3605             p.vpp_tun_spi = p.vpp_tun_spi + ii
3606
3607             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3608             p.scapy_tra_spi = p.scapy_tra_spi + ii
3609             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3610             p.vpp_tra_spi = p.vpp_tra_spi + ii
3611             p.hop_limit = ii + 10
3612             p.tun_sa_out = VppIpsecSA(
3613                 self,
3614                 p.vpp_tun_sa_id,
3615                 p.vpp_tun_spi,
3616                 p.auth_algo_vpp_id,
3617                 p.auth_key,
3618                 p.crypt_algo_vpp_id,
3619                 p.crypt_key,
3620                 self.vpp_esp_protocol,
3621                 self.pg0.local_ip4,
3622                 self.pg0.remote_hosts[ii].ip4,
3623                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3624                 hop_limit=p.hop_limit,
3625             )
3626             p.tun_sa_out.add_vpp_config()
3627
3628             p.tun_sa_in = VppIpsecSA(
3629                 self,
3630                 p.scapy_tun_sa_id,
3631                 p.scapy_tun_spi,
3632                 p.auth_algo_vpp_id,
3633                 p.auth_key,
3634                 p.crypt_algo_vpp_id,
3635                 p.crypt_key,
3636                 self.vpp_esp_protocol,
3637                 self.pg0.remote_hosts[ii].ip4,
3638                 self.pg0.local_ip4,
3639                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3640                 hop_limit=p.hop_limit,
3641             )
3642             p.tun_sa_in.add_vpp_config()
3643
3644             p.tun_protect = VppIpsecTunProtect(
3645                 self,
3646                 p.tun_if,
3647                 p.tun_sa_out,
3648                 [p.tun_sa_in],
3649                 nh=p.tun_if.remote_hosts[ii].ip4,
3650             )
3651             p.tun_protect.add_vpp_config()
3652             config_tun_params(
3653                 p,
3654                 self.encryption_type,
3655                 None,
3656                 self.pg0.local_ip4,
3657                 self.pg0.remote_hosts[ii].ip4,
3658             )
3659             self.multi_params.append(p)
3660
3661             p.via_tun_route = VppIpRoute(
3662                 self,
3663                 p.remote_tun_if_host,
3664                 32,
3665                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3666             ).add_vpp_config()
3667
3668             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3669
3670     def tearDown(self):
3671         p = self.ipv4_params
3672         p.tun_if.unconfig_ip4()
3673         super(TestIpsecMIfEsp4, self).tearDown()
3674
3675     def test_tun_44(self):
3676         """P2MP IPSEC 44"""
3677         N_PKTS = 63
3678         for p in self.multi_params:
3679             self.verify_tun_44(p, count=N_PKTS)
3680
3681         # remove one tunnel protect, the rest should still work
3682         self.multi_params[0].tun_protect.remove_vpp_config()
3683         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3684         self.multi_params[0].via_tun_route.remove_vpp_config()
3685         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3686
3687         for p in self.multi_params[1:]:
3688             self.verify_tun_44(p, count=N_PKTS)
3689
3690         self.multi_params[0].tun_protect.add_vpp_config()
3691         self.multi_params[0].via_tun_route.add_vpp_config()
3692
3693         for p in self.multi_params:
3694             self.verify_tun_44(p, count=N_PKTS)
3695
3696
3697 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3698     """IPsec Interface MPLSoIPv6"""
3699
3700     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3701
3702     def setUp(self):
3703         super(TestIpsecItf6MPLS, self).setUp()
3704
3705         self.tun_if = self.pg0
3706
3707     def tearDown(self):
3708         super(TestIpsecItf6MPLS, self).tearDown()
3709
3710     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3711         return [
3712             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3713             / sa.encrypt(
3714                 MPLS(label=66, ttl=3)
3715                 / IPv6(src=src, dst=dst)
3716                 / UDP(sport=1166, dport=2233)
3717                 / Raw(b"X" * payload_size)
3718             )
3719             for i in range(count)
3720         ]
3721
3722     def verify_encrypted6(self, p, sa, rxs):
3723         for rx in rxs:
3724             try:
3725                 pkt = sa.decrypt(rx[IPv6])
3726                 if not pkt.haslayer(IPv6):
3727                     pkt = IP(pkt[Raw].load)
3728                 self.assert_packet_checksums_valid(pkt)
3729                 self.assert_equal(pkt[MPLS].label, 66)
3730                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3731             except (IndexError, AssertionError):
3732                 self.logger.debug(ppp("Unexpected packet:", rx))
3733                 try:
3734                     self.logger.debug(ppp("Decrypted packet:", pkt))
3735                 except:
3736                     pass
3737                 raise
3738
3739     def test_tun_mpls_o_ip6(self):
3740         """IPSEC interface MPLS over IPv6"""
3741
3742         n_pkts = 127
3743         p = self.ipv6_params
3744         f = FibPathProto
3745
3746         tbl = VppMplsTable(self, 0)
3747         tbl.add_vpp_config()
3748
3749         self.config_network(p)
3750         # deag MPLS routes from the tunnel
3751         r6 = VppMplsRoute(
3752             self,
3753             66,
3754             1,
3755             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3756             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3757         ).add_vpp_config()
3758         p.route.modify(
3759             [
3760                 VppRoutePath(
3761                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3762                 )
3763             ]
3764         )
3765         p.tun_if.enable_mpls()
3766
3767         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3768         self.config_protect(p)
3769
3770         self.verify_tun_66(p, count=n_pkts)
3771
3772         # cleanup
3773         p.tun_if.disable_mpls()
3774         self.unconfig_protect(p)
3775         self.unconfig_sa(p)
3776         self.unconfig_network(p)
3777
3778
3779 if __name__ == "__main__":
3780     unittest.main(testRunner=VppTestRunner)