ipsec: Use .api declared error counters
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
14     TemplateIpsec,
15     IpsecTun4Tests,
16     IpsecTun6Tests,
17     IpsecTun4,
18     IpsecTun6,
19     IpsecTcpTests,
20     mk_scapy_crypt_key,
21     IpsecTun6HandoffTests,
22     IpsecTun4HandoffTests,
23     config_tun_params,
24 )
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
28     VppIpRoute,
29     VppRoutePath,
30     DpoProto,
31     VppMplsLabel,
32     VppMplsTable,
33     VppMplsRoute,
34     FibPathProto,
35 )
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
40 from util import ppp
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
45
46
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
49     esn_en = bool(
50         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
51     )
52     crypt_key = mk_scapy_crypt_key(p)
53     if tun_if:
54         p.tun_dst = tun_if.remote_ip
55         p.tun_src = tun_if.local_ip
56     else:
57         p.tun_dst = dst
58         p.tun_src = src
59
60     if p.nat_header:
61         is_default_port = p.nat_header.dport == 4500
62     else:
63         is_default_port = True
64
65     if is_default_port:
66         outbound_nat_header = p.nat_header
67     else:
68         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69         bind_layers(UDP, ESP, dport=p.nat_header.dport)
70
71     p.scapy_tun_sa = SecurityAssociation(
72         encryption_type,
73         spi=p.vpp_tun_spi,
74         crypt_algo=p.crypt_algo,
75         crypt_key=crypt_key,
76         auth_algo=p.auth_algo,
77         auth_key=p.auth_key,
78         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79         nat_t_header=outbound_nat_header,
80         esn_en=esn_en,
81     )
82     p.vpp_tun_sa = SecurityAssociation(
83         encryption_type,
84         spi=p.scapy_tun_spi,
85         crypt_algo=p.crypt_algo,
86         crypt_key=crypt_key,
87         auth_algo=p.auth_algo,
88         auth_key=p.auth_key,
89         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90         nat_t_header=p.nat_header,
91         esn_en=esn_en,
92     )
93
94
95 def config_tra_params(p, encryption_type, tun_if):
96     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97     esn_en = bool(
98         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
99     )
100     crypt_key = mk_scapy_crypt_key(p)
101     p.tun_dst = tun_if.remote_ip
102     p.tun_src = tun_if.local_ip
103
104     if p.nat_header:
105         is_default_port = p.nat_header.dport == 4500
106     else:
107         is_default_port = True
108
109     if is_default_port:
110         outbound_nat_header = p.nat_header
111     else:
112         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113         bind_layers(UDP, ESP, dport=p.nat_header.dport)
114
115     p.scapy_tun_sa = SecurityAssociation(
116         encryption_type,
117         spi=p.vpp_tun_spi,
118         crypt_algo=p.crypt_algo,
119         crypt_key=crypt_key,
120         auth_algo=p.auth_algo,
121         auth_key=p.auth_key,
122         esn_en=esn_en,
123         nat_t_header=outbound_nat_header,
124     )
125     p.vpp_tun_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.scapy_tun_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         esn_en=esn_en,
133         nat_t_header=p.nat_header,
134     )
135
136
137 class TemplateIpsec4TunProtect(object):
138     """IPsec IPv4 Tunnel protect"""
139
140     encryption_type = ESP
141     tun4_encrypt_node_name = "esp4-encrypt-tun"
142     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143     tun4_input_node = "ipsec4-tun-input"
144
145     def config_sa_tra(self, p):
146         config_tun_params(p, self.encryption_type, p.tun_if)
147
148         p.tun_sa_out = VppIpsecSA(
149             self,
150             p.scapy_tun_sa_id,
151             p.scapy_tun_spi,
152             p.auth_algo_vpp_id,
153             p.auth_key,
154             p.crypt_algo_vpp_id,
155             p.crypt_key,
156             self.vpp_esp_protocol,
157             flags=p.flags,
158         )
159         p.tun_sa_out.add_vpp_config()
160
161         p.tun_sa_in = VppIpsecSA(
162             self,
163             p.vpp_tun_sa_id,
164             p.vpp_tun_spi,
165             p.auth_algo_vpp_id,
166             p.auth_key,
167             p.crypt_algo_vpp_id,
168             p.crypt_key,
169             self.vpp_esp_protocol,
170             flags=p.flags,
171         )
172         p.tun_sa_in.add_vpp_config()
173
174     def config_sa_tun(self, p):
175         config_tun_params(p, self.encryption_type, p.tun_if)
176
177         p.tun_sa_out = VppIpsecSA(
178             self,
179             p.scapy_tun_sa_id,
180             p.scapy_tun_spi,
181             p.auth_algo_vpp_id,
182             p.auth_key,
183             p.crypt_algo_vpp_id,
184             p.crypt_key,
185             self.vpp_esp_protocol,
186             self.tun_if.local_addr[p.addr_type],
187             self.tun_if.remote_addr[p.addr_type],
188             flags=p.flags,
189         )
190         p.tun_sa_out.add_vpp_config()
191
192         p.tun_sa_in = VppIpsecSA(
193             self,
194             p.vpp_tun_sa_id,
195             p.vpp_tun_spi,
196             p.auth_algo_vpp_id,
197             p.auth_key,
198             p.crypt_algo_vpp_id,
199             p.crypt_key,
200             self.vpp_esp_protocol,
201             self.tun_if.remote_addr[p.addr_type],
202             self.tun_if.local_addr[p.addr_type],
203             flags=p.flags,
204         )
205         p.tun_sa_in.add_vpp_config()
206
207     def config_protect(self, p):
208         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209         p.tun_protect.add_vpp_config()
210
211     def config_network(self, p):
212         if hasattr(p, "tun_dst"):
213             tun_dst = p.tun_dst
214         else:
215             tun_dst = self.pg0.remote_ip4
216         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217         p.tun_if.add_vpp_config()
218         p.tun_if.admin_up()
219         p.tun_if.config_ip4()
220         p.tun_if.config_ip6()
221
222         p.route = VppIpRoute(
223             self,
224             p.remote_tun_if_host,
225             32,
226             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
227         )
228         p.route.add_vpp_config()
229         r = VppIpRoute(
230             self,
231             p.remote_tun_if_host6,
232             128,
233             [
234                 VppRoutePath(
235                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
236                 )
237             ],
238         )
239         r.add_vpp_config()
240
241     def unconfig_network(self, p):
242         p.route.remove_vpp_config()
243         p.tun_if.remove_vpp_config()
244
245     def unconfig_protect(self, p):
246         p.tun_protect.remove_vpp_config()
247
248     def unconfig_sa(self, p):
249         p.tun_sa_out.remove_vpp_config()
250         p.tun_sa_in.remove_vpp_config()
251
252
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254     """IPsec tunnel interface tests"""
255
256     encryption_type = ESP
257
258     @classmethod
259     def setUpClass(cls):
260         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
265
266     def setUp(self):
267         super(TemplateIpsec4TunIfEsp, self).setUp()
268
269         self.tun_if = self.pg0
270
271         p = self.ipv4_params
272
273         self.config_network(p)
274         self.config_sa_tra(p)
275         self.config_protect(p)
276
277     def tearDown(self):
278         super(TemplateIpsec4TunIfEsp, self).tearDown()
279
280
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282     """IPsec UDP tunnel interface tests"""
283
284     tun4_encrypt_node_name = "esp4-encrypt-tun"
285     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286     encryption_type = ESP
287
288     @classmethod
289     def setUpClass(cls):
290         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
291
292     @classmethod
293     def tearDownClass(cls):
294         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
295
296     def verify_encrypted(self, p, sa, rxs):
297         for rx in rxs:
298             try:
299                 # ensure the UDP ports are correct before we decrypt
300                 # which strips them
301                 self.assertTrue(rx.haslayer(UDP))
302                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303                 self.assert_equal(rx[UDP].dport, 4500)
304
305                 pkt = sa.decrypt(rx[IP])
306                 if not pkt.haslayer(IP):
307                     pkt = IP(pkt[Raw].load)
308
309                 self.assert_packet_checksums_valid(pkt)
310                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312             except (IndexError, AssertionError):
313                 self.logger.debug(ppp("Unexpected packet:", rx))
314                 try:
315                     self.logger.debug(ppp("Decrypted packet:", pkt))
316                 except:
317                     pass
318                 raise
319
320     def config_sa_tra(self, p):
321         config_tun_params(p, self.encryption_type, p.tun_if)
322
323         p.tun_sa_out = VppIpsecSA(
324             self,
325             p.scapy_tun_sa_id,
326             p.scapy_tun_spi,
327             p.auth_algo_vpp_id,
328             p.auth_key,
329             p.crypt_algo_vpp_id,
330             p.crypt_key,
331             self.vpp_esp_protocol,
332             flags=p.flags,
333             udp_src=p.nat_header.sport,
334             udp_dst=p.nat_header.dport,
335         )
336         p.tun_sa_out.add_vpp_config()
337
338         p.tun_sa_in = VppIpsecSA(
339             self,
340             p.vpp_tun_sa_id,
341             p.vpp_tun_spi,
342             p.auth_algo_vpp_id,
343             p.auth_key,
344             p.crypt_algo_vpp_id,
345             p.crypt_key,
346             self.vpp_esp_protocol,
347             flags=p.flags,
348             udp_src=p.nat_header.sport,
349             udp_dst=p.nat_header.dport,
350         )
351         p.tun_sa_in.add_vpp_config()
352
353     def setUp(self):
354         super(TemplateIpsec4TunIfEspUdp, self).setUp()
355
356         p = self.ipv4_params
357         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358         p.nat_header = UDP(sport=5454, dport=4500)
359
360         self.tun_if = self.pg0
361
362         self.config_network(p)
363         self.config_sa_tra(p)
364         self.config_protect(p)
365
366     def tearDown(self):
367         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
368
369
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371     """Ipsec ESP - TUN tests"""
372
373     tun4_encrypt_node_name = "esp4-encrypt-tun"
374     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
375
376     def test_tun_basic64(self):
377         """ipsec 6o4 tunnel basic test"""
378         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
379
380         self.verify_tun_64(self.params[socket.AF_INET], count=1)
381
382     def test_tun_burst64(self):
383         """ipsec 6o4 tunnel basic test"""
384         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
385
386         self.verify_tun_64(self.params[socket.AF_INET], count=257)
387
388     def test_tun_basic_frag44(self):
389         """ipsec 4o4 tunnel frag basic test"""
390         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
391
392         p = self.ipv4_params
393
394         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
395         self.verify_tun_44(
396             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
397         )
398         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
399
400
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402     """Ipsec ESP UDP tests"""
403
404     tun4_input_node = "ipsec4-tun-input"
405
406     def setUp(self):
407         super(TestIpsec4TunIfEspUdp, self).setUp()
408
409     def test_keepalive(self):
410         """IPSEC NAT Keepalive"""
411         self.verify_keepalive(self.ipv4_params)
412
413
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415     """Ipsec ESP UDP GCM tests"""
416
417     tun4_input_node = "ipsec4-tun-input"
418
419     def setUp(self):
420         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
421         p = self.ipv4_params
422         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423         p.crypt_algo_vpp_id = (
424             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
425         )
426         p.crypt_algo = "AES-GCM"
427         p.auth_algo = "NULL"
428         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
429         p.salt = 0
430
431
432 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
433     """Ipsec ESP - TCP tests"""
434
435     pass
436
437
438 class TemplateIpsec6TunProtect(object):
439     """IPsec IPv6 Tunnel protect"""
440
441     def config_sa_tra(self, p):
442         config_tun_params(p, self.encryption_type, p.tun_if)
443
444         p.tun_sa_out = VppIpsecSA(
445             self,
446             p.scapy_tun_sa_id,
447             p.scapy_tun_spi,
448             p.auth_algo_vpp_id,
449             p.auth_key,
450             p.crypt_algo_vpp_id,
451             p.crypt_key,
452             self.vpp_esp_protocol,
453         )
454         p.tun_sa_out.add_vpp_config()
455
456         p.tun_sa_in = VppIpsecSA(
457             self,
458             p.vpp_tun_sa_id,
459             p.vpp_tun_spi,
460             p.auth_algo_vpp_id,
461             p.auth_key,
462             p.crypt_algo_vpp_id,
463             p.crypt_key,
464             self.vpp_esp_protocol,
465         )
466         p.tun_sa_in.add_vpp_config()
467
468     def config_sa_tun(self, p):
469         config_tun_params(p, self.encryption_type, p.tun_if)
470
471         p.tun_sa_out = VppIpsecSA(
472             self,
473             p.scapy_tun_sa_id,
474             p.scapy_tun_spi,
475             p.auth_algo_vpp_id,
476             p.auth_key,
477             p.crypt_algo_vpp_id,
478             p.crypt_key,
479             self.vpp_esp_protocol,
480             self.tun_if.local_addr[p.addr_type],
481             self.tun_if.remote_addr[p.addr_type],
482         )
483         p.tun_sa_out.add_vpp_config()
484
485         p.tun_sa_in = VppIpsecSA(
486             self,
487             p.vpp_tun_sa_id,
488             p.vpp_tun_spi,
489             p.auth_algo_vpp_id,
490             p.auth_key,
491             p.crypt_algo_vpp_id,
492             p.crypt_key,
493             self.vpp_esp_protocol,
494             self.tun_if.remote_addr[p.addr_type],
495             self.tun_if.local_addr[p.addr_type],
496         )
497         p.tun_sa_in.add_vpp_config()
498
499     def config_protect(self, p):
500         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
501         p.tun_protect.add_vpp_config()
502
503     def config_network(self, p):
504         if hasattr(p, "tun_dst"):
505             tun_dst = p.tun_dst
506         else:
507             tun_dst = self.pg0.remote_ip6
508         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
509         p.tun_if.add_vpp_config()
510         p.tun_if.admin_up()
511         p.tun_if.config_ip6()
512         p.tun_if.config_ip4()
513
514         p.route = VppIpRoute(
515             self,
516             p.remote_tun_if_host,
517             128,
518             [
519                 VppRoutePath(
520                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
521                 )
522             ],
523         )
524         p.route.add_vpp_config()
525         r = VppIpRoute(
526             self,
527             p.remote_tun_if_host4,
528             32,
529             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
530         )
531         r.add_vpp_config()
532
533     def unconfig_network(self, p):
534         p.route.remove_vpp_config()
535         p.tun_if.remove_vpp_config()
536
537     def unconfig_protect(self, p):
538         p.tun_protect.remove_vpp_config()
539
540     def unconfig_sa(self, p):
541         p.tun_sa_out.remove_vpp_config()
542         p.tun_sa_in.remove_vpp_config()
543
544
545 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
546     """IPsec tunnel interface tests"""
547
548     encryption_type = ESP
549
550     def setUp(self):
551         super(TemplateIpsec6TunIfEsp, self).setUp()
552
553         self.tun_if = self.pg0
554
555         p = self.ipv6_params
556         self.config_network(p)
557         self.config_sa_tra(p)
558         self.config_protect(p)
559
560     def tearDown(self):
561         super(TemplateIpsec6TunIfEsp, self).tearDown()
562
563
564 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
565     """Ipsec ESP - TUN tests"""
566
567     tun6_encrypt_node_name = "esp6-encrypt-tun"
568     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
569
570     def test_tun_basic46(self):
571         """ipsec 4o6 tunnel basic test"""
572         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
573         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
574
575     def test_tun_burst46(self):
576         """ipsec 4o6 tunnel burst test"""
577         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
578         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
579
580
581 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
582     """Ipsec ESP 6 Handoff tests"""
583
584     tun6_encrypt_node_name = "esp6-encrypt-tun"
585     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
586
587     def test_tun_handoff_66_police(self):
588         """ESP 6o6 tunnel with policer worker hand-off test"""
589         self.vapi.cli("clear errors")
590         self.vapi.cli("clear ipsec sa")
591
592         N_PKTS = 15
593         p = self.params[socket.AF_INET6]
594
595         action_tx = PolicerAction(
596             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
597         )
598         policer = VppPolicer(
599             self,
600             "pol1",
601             80,
602             0,
603             1000,
604             0,
605             conform_action=action_tx,
606             exceed_action=action_tx,
607             violate_action=action_tx,
608         )
609         policer.add_vpp_config()
610
611         # Start policing on tun
612         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
613
614         for pol_bind in [1, 0]:
615             policer.bind_vpp_config(pol_bind, True)
616
617             # inject alternately on worker 0 and 1.
618             for worker in [0, 1, 0, 1]:
619                 send_pkts = self.gen_encrypt_pkts6(
620                     p,
621                     p.scapy_tun_sa,
622                     self.tun_if,
623                     src=p.remote_tun_if_host,
624                     dst=self.pg1.remote_ip6,
625                     count=N_PKTS,
626                 )
627                 recv_pkts = self.send_and_expect(
628                     self.tun_if, send_pkts, self.pg1, worker=worker
629                 )
630                 self.verify_decrypted6(p, recv_pkts)
631                 self.logger.debug(self.vapi.cli("show trace max 100"))
632
633             stats = policer.get_stats()
634             stats0 = policer.get_stats(worker=0)
635             stats1 = policer.get_stats(worker=1)
636
637             if pol_bind == 1:
638                 # First pass: Worker 1, should have done all the policing
639                 self.assertEqual(stats, stats1)
640
641                 # Worker 0, should have handed everything off
642                 self.assertEqual(stats0["conform_packets"], 0)
643                 self.assertEqual(stats0["exceed_packets"], 0)
644                 self.assertEqual(stats0["violate_packets"], 0)
645             else:
646                 # Second pass: both workers should have policed equal amounts
647                 self.assertGreater(stats1["conform_packets"], 0)
648                 self.assertEqual(stats1["exceed_packets"], 0)
649                 self.assertGreater(stats1["violate_packets"], 0)
650
651                 self.assertGreater(stats0["conform_packets"], 0)
652                 self.assertEqual(stats0["exceed_packets"], 0)
653                 self.assertGreater(stats0["violate_packets"], 0)
654
655                 self.assertEqual(
656                     stats0["conform_packets"] + stats0["violate_packets"],
657                     stats1["conform_packets"] + stats1["violate_packets"],
658                 )
659
660         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
661         policer.remove_vpp_config()
662
663
664 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
665     """Ipsec ESP 4 Handoff tests"""
666
667     tun4_encrypt_node_name = "esp4-encrypt-tun"
668     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
669
670     def test_tun_handoff_44_police(self):
671         """ESP 4o4 tunnel with policer worker hand-off test"""
672         self.vapi.cli("clear errors")
673         self.vapi.cli("clear ipsec sa")
674
675         N_PKTS = 15
676         p = self.params[socket.AF_INET]
677
678         action_tx = PolicerAction(
679             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
680         )
681         policer = VppPolicer(
682             self,
683             "pol1",
684             80,
685             0,
686             1000,
687             0,
688             conform_action=action_tx,
689             exceed_action=action_tx,
690             violate_action=action_tx,
691         )
692         policer.add_vpp_config()
693
694         # Start policing on tun
695         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
696
697         for pol_bind in [1, 0]:
698             policer.bind_vpp_config(pol_bind, True)
699
700             # inject alternately on worker 0 and 1.
701             for worker in [0, 1, 0, 1]:
702                 send_pkts = self.gen_encrypt_pkts(
703                     p,
704                     p.scapy_tun_sa,
705                     self.tun_if,
706                     src=p.remote_tun_if_host,
707                     dst=self.pg1.remote_ip4,
708                     count=N_PKTS,
709                 )
710                 recv_pkts = self.send_and_expect(
711                     self.tun_if, send_pkts, self.pg1, worker=worker
712                 )
713                 self.verify_decrypted(p, recv_pkts)
714                 self.logger.debug(self.vapi.cli("show trace max 100"))
715
716             stats = policer.get_stats()
717             stats0 = policer.get_stats(worker=0)
718             stats1 = policer.get_stats(worker=1)
719
720             if pol_bind == 1:
721                 # First pass: Worker 1, should have done all the policing
722                 self.assertEqual(stats, stats1)
723
724                 # Worker 0, should have handed everything off
725                 self.assertEqual(stats0["conform_packets"], 0)
726                 self.assertEqual(stats0["exceed_packets"], 0)
727                 self.assertEqual(stats0["violate_packets"], 0)
728             else:
729                 # Second pass: both workers should have policed equal amounts
730                 self.assertGreater(stats1["conform_packets"], 0)
731                 self.assertEqual(stats1["exceed_packets"], 0)
732                 self.assertGreater(stats1["violate_packets"], 0)
733
734                 self.assertGreater(stats0["conform_packets"], 0)
735                 self.assertEqual(stats0["exceed_packets"], 0)
736                 self.assertGreater(stats0["violate_packets"], 0)
737
738                 self.assertEqual(
739                     stats0["conform_packets"] + stats0["violate_packets"],
740                     stats1["conform_packets"] + stats1["violate_packets"],
741                 )
742
743         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
744         policer.remove_vpp_config()
745
746
747 @tag_fixme_vpp_workers
748 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
749     """IPsec IPv4 Multi Tunnel interface"""
750
751     encryption_type = ESP
752     tun4_encrypt_node_name = "esp4-encrypt-tun"
753     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
754
755     def setUp(self):
756         super(TestIpsec4MultiTunIfEsp, self).setUp()
757
758         self.tun_if = self.pg0
759
760         self.multi_params = []
761         self.pg0.generate_remote_hosts(10)
762         self.pg0.configure_ipv4_neighbors()
763
764         for ii in range(10):
765             p = copy.copy(self.ipv4_params)
766
767             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
768             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
769             p.scapy_tun_spi = p.scapy_tun_spi + ii
770             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
771             p.vpp_tun_spi = p.vpp_tun_spi + ii
772
773             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
774             p.scapy_tra_spi = p.scapy_tra_spi + ii
775             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
776             p.vpp_tra_spi = p.vpp_tra_spi + ii
777             p.tun_dst = self.pg0.remote_hosts[ii].ip4
778
779             self.multi_params.append(p)
780             self.config_network(p)
781             self.config_sa_tra(p)
782             self.config_protect(p)
783
784     def tearDown(self):
785         super(TestIpsec4MultiTunIfEsp, self).tearDown()
786
787     def test_tun_44(self):
788         """Multiple IPSEC tunnel interfaces"""
789         for p in self.multi_params:
790             self.verify_tun_44(p, count=127)
791             self.assertEqual(p.tun_if.get_rx_stats(), 127)
792             self.assertEqual(p.tun_if.get_tx_stats(), 127)
793
794     def test_tun_rr_44(self):
795         """Round-robin packets acrros multiple interface"""
796         tx = []
797         for p in self.multi_params:
798             tx = tx + self.gen_encrypt_pkts(
799                 p,
800                 p.scapy_tun_sa,
801                 self.tun_if,
802                 src=p.remote_tun_if_host,
803                 dst=self.pg1.remote_ip4,
804             )
805         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
806
807         for rx, p in zip(rxs, self.multi_params):
808             self.verify_decrypted(p, [rx])
809
810         tx = []
811         for p in self.multi_params:
812             tx = tx + self.gen_pkts(
813                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
814             )
815         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
816
817         for rx, p in zip(rxs, self.multi_params):
818             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
819
820
821 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
822     """IPsec IPv4 Tunnel interface all Algos"""
823
824     encryption_type = ESP
825     tun4_encrypt_node_name = "esp4-encrypt-tun"
826     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
827
828     def setUp(self):
829         super(TestIpsec4TunIfEspAll, self).setUp()
830
831         self.tun_if = self.pg0
832         p = self.ipv4_params
833
834         self.config_network(p)
835         self.config_sa_tra(p)
836         self.config_protect(p)
837
838     def tearDown(self):
839         p = self.ipv4_params
840         self.unconfig_protect(p)
841         self.unconfig_network(p)
842         self.unconfig_sa(p)
843
844         super(TestIpsec4TunIfEspAll, self).tearDown()
845
846     def rekey(self, p):
847         #
848         # change the key and the SPI
849         #
850         np = copy.copy(p)
851         p.crypt_key = b"X" + p.crypt_key[1:]
852         p.scapy_tun_spi += 1
853         p.scapy_tun_sa_id += 1
854         p.vpp_tun_spi += 1
855         p.vpp_tun_sa_id += 1
856         p.tun_if.local_spi = p.vpp_tun_spi
857         p.tun_if.remote_spi = p.scapy_tun_spi
858
859         config_tun_params(p, self.encryption_type, p.tun_if)
860
861         p.tun_sa_out = VppIpsecSA(
862             self,
863             p.scapy_tun_sa_id,
864             p.scapy_tun_spi,
865             p.auth_algo_vpp_id,
866             p.auth_key,
867             p.crypt_algo_vpp_id,
868             p.crypt_key,
869             self.vpp_esp_protocol,
870             flags=p.flags,
871             salt=p.salt,
872         )
873         p.tun_sa_in = VppIpsecSA(
874             self,
875             p.vpp_tun_sa_id,
876             p.vpp_tun_spi,
877             p.auth_algo_vpp_id,
878             p.auth_key,
879             p.crypt_algo_vpp_id,
880             p.crypt_key,
881             self.vpp_esp_protocol,
882             flags=p.flags,
883             salt=p.salt,
884         )
885         p.tun_sa_in.add_vpp_config()
886         p.tun_sa_out.add_vpp_config()
887
888         self.config_protect(p)
889         np.tun_sa_out.remove_vpp_config()
890         np.tun_sa_in.remove_vpp_config()
891         self.logger.info(self.vapi.cli("sh ipsec sa"))
892
893     def test_tun_44(self):
894         """IPSEC tunnel all algos"""
895
896         # foreach VPP crypto engine
897         engines = ["ia32", "ipsecmb", "openssl"]
898
899         # foreach crypto algorithm
900         algos = [
901             {
902                 "vpp-crypto": (
903                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
904                 ),
905                 "vpp-integ": (
906                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
907                 ),
908                 "scapy-crypto": "AES-GCM",
909                 "scapy-integ": "NULL",
910                 "key": b"JPjyOWBeVEQiMe7h",
911                 "salt": 3333,
912             },
913             {
914                 "vpp-crypto": (
915                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
916                 ),
917                 "vpp-integ": (
918                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
919                 ),
920                 "scapy-crypto": "AES-GCM",
921                 "scapy-integ": "NULL",
922                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
923                 "salt": 0,
924             },
925             {
926                 "vpp-crypto": (
927                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
928                 ),
929                 "vpp-integ": (
930                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
931                 ),
932                 "scapy-crypto": "AES-GCM",
933                 "scapy-integ": "NULL",
934                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
935                 "salt": 9999,
936             },
937             {
938                 "vpp-crypto": (
939                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
940                 ),
941                 "vpp-integ": (
942                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
943                 ),
944                 "scapy-crypto": "AES-CBC",
945                 "scapy-integ": "HMAC-SHA1-96",
946                 "salt": 0,
947                 "key": b"JPjyOWBeVEQiMe7h",
948             },
949             {
950                 "vpp-crypto": (
951                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
952                 ),
953                 "vpp-integ": (
954                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
955                 ),
956                 "scapy-crypto": "AES-CBC",
957                 "scapy-integ": "SHA2-512-256",
958                 "salt": 0,
959                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
960             },
961             {
962                 "vpp-crypto": (
963                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
964                 ),
965                 "vpp-integ": (
966                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
967                 ),
968                 "scapy-crypto": "AES-CBC",
969                 "scapy-integ": "SHA2-256-128",
970                 "salt": 0,
971                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
972             },
973             {
974                 "vpp-crypto": (
975                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
976                 ),
977                 "vpp-integ": (
978                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
979                 ),
980                 "scapy-crypto": "NULL",
981                 "scapy-integ": "HMAC-SHA1-96",
982                 "salt": 0,
983                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
984             },
985         ]
986
987         for engine in engines:
988             self.vapi.cli("set crypto handler all %s" % engine)
989
990             #
991             # loop through each of the algorithms
992             #
993             for algo in algos:
994                 # with self.subTest(algo=algo['scapy']):
995
996                 p = self.ipv4_params
997                 p.auth_algo_vpp_id = algo["vpp-integ"]
998                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
999                 p.crypt_algo = algo["scapy-crypto"]
1000                 p.auth_algo = algo["scapy-integ"]
1001                 p.crypt_key = algo["key"]
1002                 p.salt = algo["salt"]
1003
1004                 #
1005                 # rekey the tunnel
1006                 #
1007                 self.rekey(p)
1008                 self.verify_tun_44(p, count=127)
1009
1010
1011 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1012     """IPsec IPv4 Tunnel interface no Algos"""
1013
1014     encryption_type = ESP
1015     tun4_encrypt_node_name = "esp4-encrypt-tun"
1016     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1017
1018     def setUp(self):
1019         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1020
1021         self.tun_if = self.pg0
1022         p = self.ipv4_params
1023         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1024         p.auth_algo = "NULL"
1025         p.auth_key = []
1026
1027         p.crypt_algo_vpp_id = (
1028             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1029         )
1030         p.crypt_algo = "NULL"
1031         p.crypt_key = []
1032
1033     def tearDown(self):
1034         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1035
1036     def test_tun_44(self):
1037         """IPSec SA with NULL algos"""
1038         p = self.ipv4_params
1039
1040         self.config_network(p)
1041         self.config_sa_tra(p)
1042         self.config_protect(p)
1043
1044         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1045         self.send_and_assert_no_replies(self.pg1, tx)
1046
1047         self.unconfig_protect(p)
1048         self.unconfig_sa(p)
1049         self.unconfig_network(p)
1050
1051
1052 @tag_fixme_vpp_workers
1053 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1054     """IPsec IPv6 Multi Tunnel interface"""
1055
1056     encryption_type = ESP
1057     tun6_encrypt_node_name = "esp6-encrypt-tun"
1058     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1059
1060     def setUp(self):
1061         super(TestIpsec6MultiTunIfEsp, self).setUp()
1062
1063         self.tun_if = self.pg0
1064
1065         self.multi_params = []
1066         self.pg0.generate_remote_hosts(10)
1067         self.pg0.configure_ipv6_neighbors()
1068
1069         for ii in range(10):
1070             p = copy.copy(self.ipv6_params)
1071
1072             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1073             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1074             p.scapy_tun_spi = p.scapy_tun_spi + ii
1075             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1076             p.vpp_tun_spi = p.vpp_tun_spi + ii
1077
1078             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1079             p.scapy_tra_spi = p.scapy_tra_spi + ii
1080             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1081             p.vpp_tra_spi = p.vpp_tra_spi + ii
1082             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1083
1084             self.multi_params.append(p)
1085             self.config_network(p)
1086             self.config_sa_tra(p)
1087             self.config_protect(p)
1088
1089     def tearDown(self):
1090         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1091
1092     def test_tun_66(self):
1093         """Multiple IPSEC tunnel interfaces"""
1094         for p in self.multi_params:
1095             self.verify_tun_66(p, count=127)
1096             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1097             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1098
1099
1100 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1101     """Ipsec GRE TEB ESP - TUN tests"""
1102
1103     tun4_encrypt_node_name = "esp4-encrypt-tun"
1104     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1105     encryption_type = ESP
1106     omac = "00:11:22:33:44:55"
1107
1108     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1109         return [
1110             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1111             / sa.encrypt(
1112                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1113                 / GRE()
1114                 / Ether(dst=self.omac)
1115                 / IP(src="1.1.1.1", dst="1.1.1.2")
1116                 / UDP(sport=1144, dport=2233)
1117                 / Raw(b"X" * payload_size)
1118             )
1119             for i in range(count)
1120         ]
1121
1122     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1123         return [
1124             Ether(dst=self.omac)
1125             / IP(src="1.1.1.1", dst="1.1.1.2")
1126             / UDP(sport=1144, dport=2233)
1127             / Raw(b"X" * payload_size)
1128             for i in range(count)
1129         ]
1130
1131     def verify_decrypted(self, p, rxs):
1132         for rx in rxs:
1133             self.assert_equal(rx[Ether].dst, self.omac)
1134             self.assert_equal(rx[IP].dst, "1.1.1.2")
1135
1136     def verify_encrypted(self, p, sa, rxs):
1137         for rx in rxs:
1138             try:
1139                 pkt = sa.decrypt(rx[IP])
1140                 if not pkt.haslayer(IP):
1141                     pkt = IP(pkt[Raw].load)
1142                 self.assert_packet_checksums_valid(pkt)
1143                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1144                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1145                 self.assertTrue(pkt.haslayer(GRE))
1146                 e = pkt[Ether]
1147                 self.assertEqual(e[Ether].dst, self.omac)
1148                 self.assertEqual(e[IP].dst, "1.1.1.2")
1149             except (IndexError, AssertionError):
1150                 self.logger.debug(ppp("Unexpected packet:", rx))
1151                 try:
1152                     self.logger.debug(ppp("Decrypted packet:", pkt))
1153                 except:
1154                     pass
1155                 raise
1156
1157     def setUp(self):
1158         super(TestIpsecGreTebIfEsp, self).setUp()
1159
1160         self.tun_if = self.pg0
1161
1162         p = self.ipv4_params
1163
1164         bd1 = VppBridgeDomain(self, 1)
1165         bd1.add_vpp_config()
1166
1167         p.tun_sa_out = VppIpsecSA(
1168             self,
1169             p.scapy_tun_sa_id,
1170             p.scapy_tun_spi,
1171             p.auth_algo_vpp_id,
1172             p.auth_key,
1173             p.crypt_algo_vpp_id,
1174             p.crypt_key,
1175             self.vpp_esp_protocol,
1176             self.pg0.local_ip4,
1177             self.pg0.remote_ip4,
1178         )
1179         p.tun_sa_out.add_vpp_config()
1180
1181         p.tun_sa_in = VppIpsecSA(
1182             self,
1183             p.vpp_tun_sa_id,
1184             p.vpp_tun_spi,
1185             p.auth_algo_vpp_id,
1186             p.auth_key,
1187             p.crypt_algo_vpp_id,
1188             p.crypt_key,
1189             self.vpp_esp_protocol,
1190             self.pg0.remote_ip4,
1191             self.pg0.local_ip4,
1192         )
1193         p.tun_sa_in.add_vpp_config()
1194
1195         p.tun_if = VppGreInterface(
1196             self,
1197             self.pg0.local_ip4,
1198             self.pg0.remote_ip4,
1199             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1200         )
1201         p.tun_if.add_vpp_config()
1202
1203         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1204
1205         p.tun_protect.add_vpp_config()
1206
1207         p.tun_if.admin_up()
1208         p.tun_if.config_ip4()
1209         config_tun_params(p, self.encryption_type, p.tun_if)
1210
1211         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1212         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1213
1214         self.vapi.cli("clear ipsec sa")
1215         self.vapi.cli("sh adj")
1216         self.vapi.cli("sh ipsec tun")
1217
1218     def tearDown(self):
1219         p = self.ipv4_params
1220         p.tun_if.unconfig_ip4()
1221         super(TestIpsecGreTebIfEsp, self).tearDown()
1222
1223
1224 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1225     """Ipsec GRE TEB ESP - TUN tests"""
1226
1227     tun4_encrypt_node_name = "esp4-encrypt-tun"
1228     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1229     encryption_type = ESP
1230     omac = "00:11:22:33:44:55"
1231
1232     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1233         return [
1234             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1235             / sa.encrypt(
1236                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1237                 / GRE()
1238                 / Ether(dst=self.omac)
1239                 / IP(src="1.1.1.1", dst="1.1.1.2")
1240                 / UDP(sport=1144, dport=2233)
1241                 / Raw(b"X" * payload_size)
1242             )
1243             for i in range(count)
1244         ]
1245
1246     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1247         return [
1248             Ether(dst=self.omac)
1249             / Dot1Q(vlan=11)
1250             / IP(src="1.1.1.1", dst="1.1.1.2")
1251             / UDP(sport=1144, dport=2233)
1252             / Raw(b"X" * payload_size)
1253             for i in range(count)
1254         ]
1255
1256     def verify_decrypted(self, p, rxs):
1257         for rx in rxs:
1258             self.assert_equal(rx[Ether].dst, self.omac)
1259             self.assert_equal(rx[Dot1Q].vlan, 11)
1260             self.assert_equal(rx[IP].dst, "1.1.1.2")
1261
1262     def verify_encrypted(self, p, sa, rxs):
1263         for rx in rxs:
1264             try:
1265                 pkt = sa.decrypt(rx[IP])
1266                 if not pkt.haslayer(IP):
1267                     pkt = IP(pkt[Raw].load)
1268                 self.assert_packet_checksums_valid(pkt)
1269                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1270                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1271                 self.assertTrue(pkt.haslayer(GRE))
1272                 e = pkt[Ether]
1273                 self.assertEqual(e[Ether].dst, self.omac)
1274                 self.assertFalse(e.haslayer(Dot1Q))
1275                 self.assertEqual(e[IP].dst, "1.1.1.2")
1276             except (IndexError, AssertionError):
1277                 self.logger.debug(ppp("Unexpected packet:", rx))
1278                 try:
1279                     self.logger.debug(ppp("Decrypted packet:", pkt))
1280                 except:
1281                     pass
1282                 raise
1283
1284     def setUp(self):
1285         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1286
1287         self.tun_if = self.pg0
1288
1289         p = self.ipv4_params
1290
1291         bd1 = VppBridgeDomain(self, 1)
1292         bd1.add_vpp_config()
1293
1294         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1295         self.vapi.l2_interface_vlan_tag_rewrite(
1296             sw_if_index=self.pg1_11.sw_if_index,
1297             vtr_op=L2_VTR_OP.L2_POP_1,
1298             push_dot1q=11,
1299         )
1300         self.pg1_11.admin_up()
1301
1302         p.tun_sa_out = VppIpsecSA(
1303             self,
1304             p.scapy_tun_sa_id,
1305             p.scapy_tun_spi,
1306             p.auth_algo_vpp_id,
1307             p.auth_key,
1308             p.crypt_algo_vpp_id,
1309             p.crypt_key,
1310             self.vpp_esp_protocol,
1311             self.pg0.local_ip4,
1312             self.pg0.remote_ip4,
1313         )
1314         p.tun_sa_out.add_vpp_config()
1315
1316         p.tun_sa_in = VppIpsecSA(
1317             self,
1318             p.vpp_tun_sa_id,
1319             p.vpp_tun_spi,
1320             p.auth_algo_vpp_id,
1321             p.auth_key,
1322             p.crypt_algo_vpp_id,
1323             p.crypt_key,
1324             self.vpp_esp_protocol,
1325             self.pg0.remote_ip4,
1326             self.pg0.local_ip4,
1327         )
1328         p.tun_sa_in.add_vpp_config()
1329
1330         p.tun_if = VppGreInterface(
1331             self,
1332             self.pg0.local_ip4,
1333             self.pg0.remote_ip4,
1334             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1335         )
1336         p.tun_if.add_vpp_config()
1337
1338         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1339
1340         p.tun_protect.add_vpp_config()
1341
1342         p.tun_if.admin_up()
1343         p.tun_if.config_ip4()
1344         config_tun_params(p, self.encryption_type, p.tun_if)
1345
1346         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1347         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1348
1349         self.vapi.cli("clear ipsec sa")
1350
1351     def tearDown(self):
1352         p = self.ipv4_params
1353         p.tun_if.unconfig_ip4()
1354         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1355         self.pg1_11.admin_down()
1356         self.pg1_11.remove_vpp_config()
1357
1358
1359 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1360     """Ipsec GRE TEB ESP - Tra tests"""
1361
1362     tun4_encrypt_node_name = "esp4-encrypt-tun"
1363     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1364     encryption_type = ESP
1365     omac = "00:11:22:33:44:55"
1366
1367     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1368         return [
1369             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1370             / sa.encrypt(
1371                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1372                 / GRE()
1373                 / Ether(dst=self.omac)
1374                 / IP(src="1.1.1.1", dst="1.1.1.2")
1375                 / UDP(sport=1144, dport=2233)
1376                 / Raw(b"X" * payload_size)
1377             )
1378             for i in range(count)
1379         ]
1380
1381     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1382         return [
1383             Ether(dst=self.omac)
1384             / IP(src="1.1.1.1", dst="1.1.1.2")
1385             / UDP(sport=1144, dport=2233)
1386             / Raw(b"X" * payload_size)
1387             for i in range(count)
1388         ]
1389
1390     def verify_decrypted(self, p, rxs):
1391         for rx in rxs:
1392             self.assert_equal(rx[Ether].dst, self.omac)
1393             self.assert_equal(rx[IP].dst, "1.1.1.2")
1394
1395     def verify_encrypted(self, p, sa, rxs):
1396         for rx in rxs:
1397             try:
1398                 pkt = sa.decrypt(rx[IP])
1399                 if not pkt.haslayer(IP):
1400                     pkt = IP(pkt[Raw].load)
1401                 self.assert_packet_checksums_valid(pkt)
1402                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1403                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1404                 self.assertTrue(pkt.haslayer(GRE))
1405                 e = pkt[Ether]
1406                 self.assertEqual(e[Ether].dst, self.omac)
1407                 self.assertEqual(e[IP].dst, "1.1.1.2")
1408             except (IndexError, AssertionError):
1409                 self.logger.debug(ppp("Unexpected packet:", rx))
1410                 try:
1411                     self.logger.debug(ppp("Decrypted packet:", pkt))
1412                 except:
1413                     pass
1414                 raise
1415
1416     def setUp(self):
1417         super(TestIpsecGreTebIfEspTra, self).setUp()
1418
1419         self.tun_if = self.pg0
1420
1421         p = self.ipv4_params
1422
1423         bd1 = VppBridgeDomain(self, 1)
1424         bd1.add_vpp_config()
1425
1426         p.tun_sa_out = VppIpsecSA(
1427             self,
1428             p.scapy_tun_sa_id,
1429             p.scapy_tun_spi,
1430             p.auth_algo_vpp_id,
1431             p.auth_key,
1432             p.crypt_algo_vpp_id,
1433             p.crypt_key,
1434             self.vpp_esp_protocol,
1435         )
1436         p.tun_sa_out.add_vpp_config()
1437
1438         p.tun_sa_in = VppIpsecSA(
1439             self,
1440             p.vpp_tun_sa_id,
1441             p.vpp_tun_spi,
1442             p.auth_algo_vpp_id,
1443             p.auth_key,
1444             p.crypt_algo_vpp_id,
1445             p.crypt_key,
1446             self.vpp_esp_protocol,
1447         )
1448         p.tun_sa_in.add_vpp_config()
1449
1450         p.tun_if = VppGreInterface(
1451             self,
1452             self.pg0.local_ip4,
1453             self.pg0.remote_ip4,
1454             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1455         )
1456         p.tun_if.add_vpp_config()
1457
1458         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1459
1460         p.tun_protect.add_vpp_config()
1461
1462         p.tun_if.admin_up()
1463         p.tun_if.config_ip4()
1464         config_tra_params(p, self.encryption_type, p.tun_if)
1465
1466         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1467         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1468
1469         self.vapi.cli("clear ipsec sa")
1470
1471     def tearDown(self):
1472         p = self.ipv4_params
1473         p.tun_if.unconfig_ip4()
1474         super(TestIpsecGreTebIfEspTra, self).tearDown()
1475
1476
1477 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1478     """Ipsec GRE TEB UDP ESP - Tra tests"""
1479
1480     tun4_encrypt_node_name = "esp4-encrypt-tun"
1481     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1482     encryption_type = ESP
1483     omac = "00:11:22:33:44:55"
1484
1485     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1486         return [
1487             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1488             / sa.encrypt(
1489                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1490                 / GRE()
1491                 / Ether(dst=self.omac)
1492                 / IP(src="1.1.1.1", dst="1.1.1.2")
1493                 / UDP(sport=1144, dport=2233)
1494                 / Raw(b"X" * payload_size)
1495             )
1496             for i in range(count)
1497         ]
1498
1499     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1500         return [
1501             Ether(dst=self.omac)
1502             / IP(src="1.1.1.1", dst="1.1.1.2")
1503             / UDP(sport=1144, dport=2233)
1504             / Raw(b"X" * payload_size)
1505             for i in range(count)
1506         ]
1507
1508     def verify_decrypted(self, p, rxs):
1509         for rx in rxs:
1510             self.assert_equal(rx[Ether].dst, self.omac)
1511             self.assert_equal(rx[IP].dst, "1.1.1.2")
1512
1513     def verify_encrypted(self, p, sa, rxs):
1514         for rx in rxs:
1515             self.assertTrue(rx.haslayer(UDP))
1516             self.assertEqual(rx[UDP].dport, 4545)
1517             self.assertEqual(rx[UDP].sport, 5454)
1518             try:
1519                 pkt = sa.decrypt(rx[IP])
1520                 if not pkt.haslayer(IP):
1521                     pkt = IP(pkt[Raw].load)
1522                 self.assert_packet_checksums_valid(pkt)
1523                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1524                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1525                 self.assertTrue(pkt.haslayer(GRE))
1526                 e = pkt[Ether]
1527                 self.assertEqual(e[Ether].dst, self.omac)
1528                 self.assertEqual(e[IP].dst, "1.1.1.2")
1529             except (IndexError, AssertionError):
1530                 self.logger.debug(ppp("Unexpected packet:", rx))
1531                 try:
1532                     self.logger.debug(ppp("Decrypted packet:", pkt))
1533                 except:
1534                     pass
1535                 raise
1536
1537     def setUp(self):
1538         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1539
1540         self.tun_if = self.pg0
1541
1542         p = self.ipv4_params
1543         p = self.ipv4_params
1544         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1545         p.nat_header = UDP(sport=5454, dport=4545)
1546
1547         bd1 = VppBridgeDomain(self, 1)
1548         bd1.add_vpp_config()
1549
1550         p.tun_sa_out = VppIpsecSA(
1551             self,
1552             p.scapy_tun_sa_id,
1553             p.scapy_tun_spi,
1554             p.auth_algo_vpp_id,
1555             p.auth_key,
1556             p.crypt_algo_vpp_id,
1557             p.crypt_key,
1558             self.vpp_esp_protocol,
1559             flags=p.flags,
1560             udp_src=5454,
1561             udp_dst=4545,
1562         )
1563         p.tun_sa_out.add_vpp_config()
1564
1565         p.tun_sa_in = VppIpsecSA(
1566             self,
1567             p.vpp_tun_sa_id,
1568             p.vpp_tun_spi,
1569             p.auth_algo_vpp_id,
1570             p.auth_key,
1571             p.crypt_algo_vpp_id,
1572             p.crypt_key,
1573             self.vpp_esp_protocol,
1574             flags=(
1575                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1576             ),
1577             udp_src=4545,
1578             udp_dst=5454,
1579         )
1580         p.tun_sa_in.add_vpp_config()
1581
1582         p.tun_if = VppGreInterface(
1583             self,
1584             self.pg0.local_ip4,
1585             self.pg0.remote_ip4,
1586             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1587         )
1588         p.tun_if.add_vpp_config()
1589
1590         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1591
1592         p.tun_protect.add_vpp_config()
1593
1594         p.tun_if.admin_up()
1595         p.tun_if.config_ip4()
1596         config_tra_params(p, self.encryption_type, p.tun_if)
1597
1598         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1599         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1600
1601         self.vapi.cli("clear ipsec sa")
1602         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1603
1604     def tearDown(self):
1605         p = self.ipv4_params
1606         p.tun_if.unconfig_ip4()
1607         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1608
1609
1610 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1611     """Ipsec GRE ESP - TUN tests"""
1612
1613     tun4_encrypt_node_name = "esp4-encrypt-tun"
1614     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1615     encryption_type = ESP
1616
1617     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1618         return [
1619             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1620             / sa.encrypt(
1621                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1622                 / GRE()
1623                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1624                 / UDP(sport=1144, dport=2233)
1625                 / Raw(b"X" * payload_size)
1626             )
1627             for i in range(count)
1628         ]
1629
1630     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1631         return [
1632             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1633             / IP(src="1.1.1.1", dst="1.1.1.2")
1634             / UDP(sport=1144, dport=2233)
1635             / Raw(b"X" * payload_size)
1636             for i in range(count)
1637         ]
1638
1639     def verify_decrypted(self, p, rxs):
1640         for rx in rxs:
1641             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1642             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1643
1644     def verify_encrypted(self, p, sa, rxs):
1645         for rx in rxs:
1646             try:
1647                 pkt = sa.decrypt(rx[IP])
1648                 if not pkt.haslayer(IP):
1649                     pkt = IP(pkt[Raw].load)
1650                 self.assert_packet_checksums_valid(pkt)
1651                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1652                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1653                 self.assertTrue(pkt.haslayer(GRE))
1654                 e = pkt[GRE]
1655                 self.assertEqual(e[IP].dst, "1.1.1.2")
1656             except (IndexError, AssertionError):
1657                 self.logger.debug(ppp("Unexpected packet:", rx))
1658                 try:
1659                     self.logger.debug(ppp("Decrypted packet:", pkt))
1660                 except:
1661                     pass
1662                 raise
1663
1664     def setUp(self):
1665         super(TestIpsecGreIfEsp, self).setUp()
1666
1667         self.tun_if = self.pg0
1668
1669         p = self.ipv4_params
1670
1671         bd1 = VppBridgeDomain(self, 1)
1672         bd1.add_vpp_config()
1673
1674         p.tun_sa_out = VppIpsecSA(
1675             self,
1676             p.scapy_tun_sa_id,
1677             p.scapy_tun_spi,
1678             p.auth_algo_vpp_id,
1679             p.auth_key,
1680             p.crypt_algo_vpp_id,
1681             p.crypt_key,
1682             self.vpp_esp_protocol,
1683             self.pg0.local_ip4,
1684             self.pg0.remote_ip4,
1685         )
1686         p.tun_sa_out.add_vpp_config()
1687
1688         p.tun_sa_in = VppIpsecSA(
1689             self,
1690             p.vpp_tun_sa_id,
1691             p.vpp_tun_spi,
1692             p.auth_algo_vpp_id,
1693             p.auth_key,
1694             p.crypt_algo_vpp_id,
1695             p.crypt_key,
1696             self.vpp_esp_protocol,
1697             self.pg0.remote_ip4,
1698             self.pg0.local_ip4,
1699         )
1700         p.tun_sa_in.add_vpp_config()
1701
1702         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1703         p.tun_if.add_vpp_config()
1704
1705         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1706         p.tun_protect.add_vpp_config()
1707
1708         p.tun_if.admin_up()
1709         p.tun_if.config_ip4()
1710         config_tun_params(p, self.encryption_type, p.tun_if)
1711
1712         VppIpRoute(
1713             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1714         ).add_vpp_config()
1715
1716     def tearDown(self):
1717         p = self.ipv4_params
1718         p.tun_if.unconfig_ip4()
1719         super(TestIpsecGreIfEsp, self).tearDown()
1720
1721
1722 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1723     """Ipsec GRE ESP - TRA tests"""
1724
1725     tun4_encrypt_node_name = "esp4-encrypt-tun"
1726     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1727     encryption_type = ESP
1728
1729     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1730         return [
1731             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1732             / sa.encrypt(
1733                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1734                 / GRE()
1735                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1736                 / UDP(sport=1144, dport=2233)
1737                 / Raw(b"X" * payload_size)
1738             )
1739             for i in range(count)
1740         ]
1741
1742     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1743         return [
1744             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1745             / sa.encrypt(
1746                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1747                 / GRE()
1748                 / UDP(sport=1144, dport=2233)
1749                 / Raw(b"X" * payload_size)
1750             )
1751             for i in range(count)
1752         ]
1753
1754     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1755         return [
1756             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1757             / IP(src="1.1.1.1", dst="1.1.1.2")
1758             / UDP(sport=1144, dport=2233)
1759             / Raw(b"X" * payload_size)
1760             for i in range(count)
1761         ]
1762
1763     def verify_decrypted(self, p, rxs):
1764         for rx in rxs:
1765             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1766             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1767
1768     def verify_encrypted(self, p, sa, rxs):
1769         for rx in rxs:
1770             try:
1771                 pkt = sa.decrypt(rx[IP])
1772                 if not pkt.haslayer(IP):
1773                     pkt = IP(pkt[Raw].load)
1774                 self.assert_packet_checksums_valid(pkt)
1775                 self.assertTrue(pkt.haslayer(GRE))
1776                 e = pkt[GRE]
1777                 self.assertEqual(e[IP].dst, "1.1.1.2")
1778             except (IndexError, AssertionError):
1779                 self.logger.debug(ppp("Unexpected packet:", rx))
1780                 try:
1781                     self.logger.debug(ppp("Decrypted packet:", pkt))
1782                 except:
1783                     pass
1784                 raise
1785
1786     def setUp(self):
1787         super(TestIpsecGreIfEspTra, self).setUp()
1788
1789         self.tun_if = self.pg0
1790
1791         p = self.ipv4_params
1792
1793         p.tun_sa_out = VppIpsecSA(
1794             self,
1795             p.scapy_tun_sa_id,
1796             p.scapy_tun_spi,
1797             p.auth_algo_vpp_id,
1798             p.auth_key,
1799             p.crypt_algo_vpp_id,
1800             p.crypt_key,
1801             self.vpp_esp_protocol,
1802         )
1803         p.tun_sa_out.add_vpp_config()
1804
1805         p.tun_sa_in = VppIpsecSA(
1806             self,
1807             p.vpp_tun_sa_id,
1808             p.vpp_tun_spi,
1809             p.auth_algo_vpp_id,
1810             p.auth_key,
1811             p.crypt_algo_vpp_id,
1812             p.crypt_key,
1813             self.vpp_esp_protocol,
1814         )
1815         p.tun_sa_in.add_vpp_config()
1816
1817         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1818         p.tun_if.add_vpp_config()
1819
1820         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1821         p.tun_protect.add_vpp_config()
1822
1823         p.tun_if.admin_up()
1824         p.tun_if.config_ip4()
1825         config_tra_params(p, self.encryption_type, p.tun_if)
1826
1827         VppIpRoute(
1828             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1829         ).add_vpp_config()
1830
1831     def tearDown(self):
1832         p = self.ipv4_params
1833         p.tun_if.unconfig_ip4()
1834         super(TestIpsecGreIfEspTra, self).tearDown()
1835
1836     def test_gre_non_ip(self):
1837         p = self.ipv4_params
1838         tx = self.gen_encrypt_non_ip_pkts(
1839             p.scapy_tun_sa,
1840             self.tun_if,
1841             src=p.remote_tun_if_host,
1842             dst=self.pg1.remote_ip6,
1843         )
1844         self.send_and_assert_no_replies(self.tun_if, tx)
1845         node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1846         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1847
1848
1849 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1850     """Ipsec GRE ESP - TRA tests"""
1851
1852     tun6_encrypt_node_name = "esp6-encrypt-tun"
1853     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1854     encryption_type = ESP
1855
1856     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1857         return [
1858             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1859             / sa.encrypt(
1860                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
1861                 / GRE()
1862                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
1863                 / UDP(sport=1144, dport=2233)
1864                 / Raw(b"X" * payload_size)
1865             )
1866             for i in range(count)
1867         ]
1868
1869     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
1870         return [
1871             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1872             / IPv6(src="1::1", dst="1::2")
1873             / UDP(sport=1144, dport=2233)
1874             / Raw(b"X" * payload_size)
1875             for i in range(count)
1876         ]
1877
1878     def verify_decrypted6(self, p, rxs):
1879         for rx in rxs:
1880             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1881             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1882
1883     def verify_encrypted6(self, p, sa, rxs):
1884         for rx in rxs:
1885             try:
1886                 pkt = sa.decrypt(rx[IPv6])
1887                 if not pkt.haslayer(IPv6):
1888                     pkt = IPv6(pkt[Raw].load)
1889                 self.assert_packet_checksums_valid(pkt)
1890                 self.assertTrue(pkt.haslayer(GRE))
1891                 e = pkt[GRE]
1892                 self.assertEqual(e[IPv6].dst, "1::2")
1893             except (IndexError, AssertionError):
1894                 self.logger.debug(ppp("Unexpected packet:", rx))
1895                 try:
1896                     self.logger.debug(ppp("Decrypted packet:", pkt))
1897                 except:
1898                     pass
1899                 raise
1900
1901     def setUp(self):
1902         super(TestIpsecGre6IfEspTra, self).setUp()
1903
1904         self.tun_if = self.pg0
1905
1906         p = self.ipv6_params
1907
1908         bd1 = VppBridgeDomain(self, 1)
1909         bd1.add_vpp_config()
1910
1911         p.tun_sa_out = VppIpsecSA(
1912             self,
1913             p.scapy_tun_sa_id,
1914             p.scapy_tun_spi,
1915             p.auth_algo_vpp_id,
1916             p.auth_key,
1917             p.crypt_algo_vpp_id,
1918             p.crypt_key,
1919             self.vpp_esp_protocol,
1920         )
1921         p.tun_sa_out.add_vpp_config()
1922
1923         p.tun_sa_in = VppIpsecSA(
1924             self,
1925             p.vpp_tun_sa_id,
1926             p.vpp_tun_spi,
1927             p.auth_algo_vpp_id,
1928             p.auth_key,
1929             p.crypt_algo_vpp_id,
1930             p.crypt_key,
1931             self.vpp_esp_protocol,
1932         )
1933         p.tun_sa_in.add_vpp_config()
1934
1935         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
1936         p.tun_if.add_vpp_config()
1937
1938         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1939         p.tun_protect.add_vpp_config()
1940
1941         p.tun_if.admin_up()
1942         p.tun_if.config_ip6()
1943         config_tra_params(p, self.encryption_type, p.tun_if)
1944
1945         r = VppIpRoute(
1946             self,
1947             "1::2",
1948             128,
1949             [
1950                 VppRoutePath(
1951                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
1952                 )
1953             ],
1954         )
1955         r.add_vpp_config()
1956
1957     def tearDown(self):
1958         p = self.ipv6_params
1959         p.tun_if.unconfig_ip6()
1960         super(TestIpsecGre6IfEspTra, self).tearDown()
1961
1962
1963 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1964     """Ipsec mGRE ESP v4 TRA tests"""
1965
1966     tun4_encrypt_node_name = "esp4-encrypt-tun"
1967     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1968     encryption_type = ESP
1969
1970     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1971         return [
1972             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1973             / sa.encrypt(
1974                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
1975                 / GRE()
1976                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1977                 / UDP(sport=1144, dport=2233)
1978                 / Raw(b"X" * payload_size)
1979             )
1980             for i in range(count)
1981         ]
1982
1983     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1984         return [
1985             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1986             / IP(src="1.1.1.1", dst=dst)
1987             / UDP(sport=1144, dport=2233)
1988             / Raw(b"X" * payload_size)
1989             for i in range(count)
1990         ]
1991
1992     def verify_decrypted(self, p, rxs):
1993         for rx in rxs:
1994             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1995             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1996
1997     def verify_encrypted(self, p, sa, rxs):
1998         for rx in rxs:
1999             try:
2000                 pkt = sa.decrypt(rx[IP])
2001                 if not pkt.haslayer(IP):
2002                     pkt = IP(pkt[Raw].load)
2003                 self.assert_packet_checksums_valid(pkt)
2004                 self.assertTrue(pkt.haslayer(GRE))
2005                 e = pkt[GRE]
2006                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2007             except (IndexError, AssertionError):
2008                 self.logger.debug(ppp("Unexpected packet:", rx))
2009                 try:
2010                     self.logger.debug(ppp("Decrypted packet:", pkt))
2011                 except:
2012                     pass
2013                 raise
2014
2015     def setUp(self):
2016         super(TestIpsecMGreIfEspTra4, self).setUp()
2017
2018         N_NHS = 16
2019         self.tun_if = self.pg0
2020         p = self.ipv4_params
2021         p.tun_if = VppGreInterface(
2022             self,
2023             self.pg0.local_ip4,
2024             "0.0.0.0",
2025             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2026         )
2027         p.tun_if.add_vpp_config()
2028         p.tun_if.admin_up()
2029         p.tun_if.config_ip4()
2030         p.tun_if.generate_remote_hosts(N_NHS)
2031         self.pg0.generate_remote_hosts(N_NHS)
2032         self.pg0.configure_ipv4_neighbors()
2033
2034         # setup some SAs for several next-hops on the interface
2035         self.multi_params = []
2036
2037         for ii in range(N_NHS):
2038             p = copy.copy(self.ipv4_params)
2039
2040             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2041             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2042             p.scapy_tun_spi = p.scapy_tun_spi + ii
2043             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2044             p.vpp_tun_spi = p.vpp_tun_spi + ii
2045
2046             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2047             p.scapy_tra_spi = p.scapy_tra_spi + ii
2048             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2049             p.vpp_tra_spi = p.vpp_tra_spi + ii
2050             p.tun_sa_out = VppIpsecSA(
2051                 self,
2052                 p.scapy_tun_sa_id,
2053                 p.scapy_tun_spi,
2054                 p.auth_algo_vpp_id,
2055                 p.auth_key,
2056                 p.crypt_algo_vpp_id,
2057                 p.crypt_key,
2058                 self.vpp_esp_protocol,
2059             )
2060             p.tun_sa_out.add_vpp_config()
2061
2062             p.tun_sa_in = VppIpsecSA(
2063                 self,
2064                 p.vpp_tun_sa_id,
2065                 p.vpp_tun_spi,
2066                 p.auth_algo_vpp_id,
2067                 p.auth_key,
2068                 p.crypt_algo_vpp_id,
2069                 p.crypt_key,
2070                 self.vpp_esp_protocol,
2071             )
2072             p.tun_sa_in.add_vpp_config()
2073
2074             p.tun_protect = VppIpsecTunProtect(
2075                 self,
2076                 p.tun_if,
2077                 p.tun_sa_out,
2078                 [p.tun_sa_in],
2079                 nh=p.tun_if.remote_hosts[ii].ip4,
2080             )
2081             p.tun_protect.add_vpp_config()
2082             config_tra_params(p, self.encryption_type, p.tun_if)
2083             self.multi_params.append(p)
2084
2085             VppIpRoute(
2086                 self,
2087                 p.remote_tun_if_host,
2088                 32,
2089                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2090             ).add_vpp_config()
2091
2092             # in this v4 variant add the teibs after the protect
2093             p.teib = VppTeib(
2094                 self,
2095                 p.tun_if,
2096                 p.tun_if.remote_hosts[ii].ip4,
2097                 self.pg0.remote_hosts[ii].ip4,
2098             ).add_vpp_config()
2099             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2100         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2101
2102     def tearDown(self):
2103         p = self.ipv4_params
2104         p.tun_if.unconfig_ip4()
2105         super(TestIpsecMGreIfEspTra4, self).tearDown()
2106
2107     def test_tun_44(self):
2108         """mGRE IPSEC 44"""
2109         N_PKTS = 63
2110         for p in self.multi_params:
2111             self.verify_tun_44(p, count=N_PKTS)
2112             p.teib.remove_vpp_config()
2113             self.verify_tun_dropped_44(p, count=N_PKTS)
2114             p.teib.add_vpp_config()
2115             self.verify_tun_44(p, count=N_PKTS)
2116
2117
2118 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2119     """Ipsec mGRE ESP v6 TRA tests"""
2120
2121     tun6_encrypt_node_name = "esp6-encrypt-tun"
2122     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2123     encryption_type = ESP
2124
2125     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2126         return [
2127             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2128             / sa.encrypt(
2129                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2130                 / GRE()
2131                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2132                 / UDP(sport=1144, dport=2233)
2133                 / Raw(b"X" * payload_size)
2134             )
2135             for i in range(count)
2136         ]
2137
2138     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2139         return [
2140             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2141             / IPv6(src="1::1", dst=dst)
2142             / UDP(sport=1144, dport=2233)
2143             / Raw(b"X" * payload_size)
2144             for i in range(count)
2145         ]
2146
2147     def verify_decrypted6(self, p, rxs):
2148         for rx in rxs:
2149             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2150             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2151
2152     def verify_encrypted6(self, p, sa, rxs):
2153         for rx in rxs:
2154             try:
2155                 pkt = sa.decrypt(rx[IPv6])
2156                 if not pkt.haslayer(IPv6):
2157                     pkt = IPv6(pkt[Raw].load)
2158                 self.assert_packet_checksums_valid(pkt)
2159                 self.assertTrue(pkt.haslayer(GRE))
2160                 e = pkt[GRE]
2161                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2162             except (IndexError, AssertionError):
2163                 self.logger.debug(ppp("Unexpected packet:", rx))
2164                 try:
2165                     self.logger.debug(ppp("Decrypted packet:", pkt))
2166                 except:
2167                     pass
2168                 raise
2169
2170     def setUp(self):
2171         super(TestIpsecMGreIfEspTra6, self).setUp()
2172
2173         self.vapi.cli("set logging class ipsec level debug")
2174
2175         N_NHS = 16
2176         self.tun_if = self.pg0
2177         p = self.ipv6_params
2178         p.tun_if = VppGreInterface(
2179             self,
2180             self.pg0.local_ip6,
2181             "::",
2182             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2183         )
2184         p.tun_if.add_vpp_config()
2185         p.tun_if.admin_up()
2186         p.tun_if.config_ip6()
2187         p.tun_if.generate_remote_hosts(N_NHS)
2188         self.pg0.generate_remote_hosts(N_NHS)
2189         self.pg0.configure_ipv6_neighbors()
2190
2191         # setup some SAs for several next-hops on the interface
2192         self.multi_params = []
2193
2194         for ii in range(N_NHS):
2195             p = copy.copy(self.ipv6_params)
2196
2197             p.remote_tun_if_host = "1::%d" % (ii + 1)
2198             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2199             p.scapy_tun_spi = p.scapy_tun_spi + ii
2200             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2201             p.vpp_tun_spi = p.vpp_tun_spi + ii
2202
2203             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2204             p.scapy_tra_spi = p.scapy_tra_spi + ii
2205             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2206             p.vpp_tra_spi = p.vpp_tra_spi + ii
2207             p.tun_sa_out = VppIpsecSA(
2208                 self,
2209                 p.scapy_tun_sa_id,
2210                 p.scapy_tun_spi,
2211                 p.auth_algo_vpp_id,
2212                 p.auth_key,
2213                 p.crypt_algo_vpp_id,
2214                 p.crypt_key,
2215                 self.vpp_esp_protocol,
2216             )
2217             p.tun_sa_out.add_vpp_config()
2218
2219             p.tun_sa_in = VppIpsecSA(
2220                 self,
2221                 p.vpp_tun_sa_id,
2222                 p.vpp_tun_spi,
2223                 p.auth_algo_vpp_id,
2224                 p.auth_key,
2225                 p.crypt_algo_vpp_id,
2226                 p.crypt_key,
2227                 self.vpp_esp_protocol,
2228             )
2229             p.tun_sa_in.add_vpp_config()
2230
2231             # in this v6 variant add the teibs first then the protection
2232             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2233             VppTeib(
2234                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2235             ).add_vpp_config()
2236
2237             p.tun_protect = VppIpsecTunProtect(
2238                 self,
2239                 p.tun_if,
2240                 p.tun_sa_out,
2241                 [p.tun_sa_in],
2242                 nh=p.tun_if.remote_hosts[ii].ip6,
2243             )
2244             p.tun_protect.add_vpp_config()
2245             config_tra_params(p, self.encryption_type, p.tun_if)
2246             self.multi_params.append(p)
2247
2248             VppIpRoute(
2249                 self,
2250                 p.remote_tun_if_host,
2251                 128,
2252                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2253             ).add_vpp_config()
2254             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2255
2256         self.logger.info(self.vapi.cli("sh log"))
2257         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2258         self.logger.info(self.vapi.cli("sh adj 41"))
2259
2260     def tearDown(self):
2261         p = self.ipv6_params
2262         p.tun_if.unconfig_ip6()
2263         super(TestIpsecMGreIfEspTra6, self).tearDown()
2264
2265     def test_tun_66(self):
2266         """mGRE IPSec 66"""
2267         for p in self.multi_params:
2268             self.verify_tun_66(p, count=63)
2269
2270
2271 @tag_fixme_vpp_workers
2272 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2273     """IPsec IPv4 Tunnel protect - transport mode"""
2274
2275     def setUp(self):
2276         super(TestIpsec4TunProtect, self).setUp()
2277
2278         self.tun_if = self.pg0
2279
2280     def tearDown(self):
2281         super(TestIpsec4TunProtect, self).tearDown()
2282
2283     def test_tun_44(self):
2284         """IPSEC tunnel protect"""
2285
2286         p = self.ipv4_params
2287
2288         self.config_network(p)
2289         self.config_sa_tra(p)
2290         self.config_protect(p)
2291
2292         self.verify_tun_44(p, count=127)
2293         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2294         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2295
2296         self.vapi.cli("clear ipsec sa")
2297         self.verify_tun_64(p, count=127)
2298         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2299         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2300
2301         # rekey - create new SAs and update the tunnel protection
2302         np = copy.copy(p)
2303         np.crypt_key = b"X" + p.crypt_key[1:]
2304         np.scapy_tun_spi += 100
2305         np.scapy_tun_sa_id += 1
2306         np.vpp_tun_spi += 100
2307         np.vpp_tun_sa_id += 1
2308         np.tun_if.local_spi = p.vpp_tun_spi
2309         np.tun_if.remote_spi = p.scapy_tun_spi
2310
2311         self.config_sa_tra(np)
2312         self.config_protect(np)
2313         self.unconfig_sa(p)
2314
2315         self.verify_tun_44(np, count=127)
2316         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2317         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2318
2319         # teardown
2320         self.unconfig_protect(np)
2321         self.unconfig_sa(np)
2322         self.unconfig_network(p)
2323
2324
2325 @tag_fixme_vpp_workers
2326 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2327     """IPsec IPv4 Tunnel protect - transport mode"""
2328
2329     def setUp(self):
2330         super(TestIpsec4TunProtectUdp, self).setUp()
2331
2332         self.tun_if = self.pg0
2333
2334         p = self.ipv4_params
2335         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2336         p.nat_header = UDP(sport=4500, dport=4500)
2337         self.config_network(p)
2338         self.config_sa_tra(p)
2339         self.config_protect(p)
2340
2341     def tearDown(self):
2342         p = self.ipv4_params
2343         self.unconfig_protect(p)
2344         self.unconfig_sa(p)
2345         self.unconfig_network(p)
2346         super(TestIpsec4TunProtectUdp, self).tearDown()
2347
2348     def verify_encrypted(self, p, sa, rxs):
2349         # ensure encrypted packets are recieved with the default UDP ports
2350         for rx in rxs:
2351             self.assertEqual(rx[UDP].sport, 4500)
2352             self.assertEqual(rx[UDP].dport, 4500)
2353         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2354
2355     def test_tun_44(self):
2356         """IPSEC UDP tunnel protect"""
2357
2358         p = self.ipv4_params
2359
2360         self.verify_tun_44(p, count=127)
2361         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2362         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2363
2364     def test_keepalive(self):
2365         """IPSEC NAT Keepalive"""
2366         self.verify_keepalive(self.ipv4_params)
2367
2368
2369 @tag_fixme_vpp_workers
2370 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2371     """IPsec IPv4 Tunnel protect - tunnel mode"""
2372
2373     encryption_type = ESP
2374     tun4_encrypt_node_name = "esp4-encrypt-tun"
2375     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2376
2377     def setUp(self):
2378         super(TestIpsec4TunProtectTun, self).setUp()
2379
2380         self.tun_if = self.pg0
2381
2382     def tearDown(self):
2383         super(TestIpsec4TunProtectTun, self).tearDown()
2384
2385     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2386         return [
2387             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2388             / sa.encrypt(
2389                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2390                 / IP(src=src, dst=dst)
2391                 / UDP(sport=1144, dport=2233)
2392                 / Raw(b"X" * payload_size)
2393             )
2394             for i in range(count)
2395         ]
2396
2397     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2398         return [
2399             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2400             / IP(src=src, dst=dst)
2401             / UDP(sport=1144, dport=2233)
2402             / Raw(b"X" * payload_size)
2403             for i in range(count)
2404         ]
2405
2406     def verify_decrypted(self, p, rxs):
2407         for rx in rxs:
2408             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2409             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2410             self.assert_packet_checksums_valid(rx)
2411
2412     def verify_encrypted(self, p, sa, rxs):
2413         for rx in rxs:
2414             try:
2415                 pkt = sa.decrypt(rx[IP])
2416                 if not pkt.haslayer(IP):
2417                     pkt = IP(pkt[Raw].load)
2418                 self.assert_packet_checksums_valid(pkt)
2419                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2420                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2421                 inner = pkt[IP].payload
2422                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2423
2424             except (IndexError, AssertionError):
2425                 self.logger.debug(ppp("Unexpected packet:", rx))
2426                 try:
2427                     self.logger.debug(ppp("Decrypted packet:", pkt))
2428                 except:
2429                     pass
2430                 raise
2431
2432     def test_tun_44(self):
2433         """IPSEC tunnel protect"""
2434
2435         p = self.ipv4_params
2436
2437         self.config_network(p)
2438         self.config_sa_tun(p)
2439         self.config_protect(p)
2440
2441         # also add an output features on the tunnel and physical interface
2442         # so we test they still work
2443         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2444         a = VppAcl(self, [r_all]).add_vpp_config()
2445
2446         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2447         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2448
2449         self.verify_tun_44(p, count=127)
2450
2451         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2452         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2453
2454         # rekey - create new SAs and update the tunnel protection
2455         np = copy.copy(p)
2456         np.crypt_key = b"X" + p.crypt_key[1:]
2457         np.scapy_tun_spi += 100
2458         np.scapy_tun_sa_id += 1
2459         np.vpp_tun_spi += 100
2460         np.vpp_tun_sa_id += 1
2461         np.tun_if.local_spi = p.vpp_tun_spi
2462         np.tun_if.remote_spi = p.scapy_tun_spi
2463
2464         self.config_sa_tun(np)
2465         self.config_protect(np)
2466         self.unconfig_sa(p)
2467
2468         self.verify_tun_44(np, count=127)
2469         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2470         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2471
2472         # teardown
2473         self.unconfig_protect(np)
2474         self.unconfig_sa(np)
2475         self.unconfig_network(p)
2476
2477
2478 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2479     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2480
2481     encryption_type = ESP
2482     tun4_encrypt_node_name = "esp4-encrypt-tun"
2483     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2484
2485     def setUp(self):
2486         super(TestIpsec4TunProtectTunDrop, self).setUp()
2487
2488         self.tun_if = self.pg0
2489
2490     def tearDown(self):
2491         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2492
2493     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2494         return [
2495             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2496             / sa.encrypt(
2497                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2498                 / IP(src=src, dst=dst)
2499                 / UDP(sport=1144, dport=2233)
2500                 / Raw(b"X" * payload_size)
2501             )
2502             for i in range(count)
2503         ]
2504
2505     def test_tun_drop_44(self):
2506         """IPSEC tunnel protect bogus tunnel header"""
2507
2508         p = self.ipv4_params
2509
2510         self.config_network(p)
2511         self.config_sa_tun(p)
2512         self.config_protect(p)
2513
2514         tx = self.gen_encrypt_pkts(
2515             p,
2516             p.scapy_tun_sa,
2517             self.tun_if,
2518             src=p.remote_tun_if_host,
2519             dst=self.pg1.remote_ip4,
2520             count=63,
2521         )
2522         self.send_and_assert_no_replies(self.tun_if, tx)
2523
2524         # teardown
2525         self.unconfig_protect(p)
2526         self.unconfig_sa(p)
2527         self.unconfig_network(p)
2528
2529
2530 @tag_fixme_vpp_workers
2531 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2532     """IPsec IPv6 Tunnel protect - transport mode"""
2533
2534     encryption_type = ESP
2535     tun6_encrypt_node_name = "esp6-encrypt-tun"
2536     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2537
2538     def setUp(self):
2539         super(TestIpsec6TunProtect, self).setUp()
2540
2541         self.tun_if = self.pg0
2542
2543     def tearDown(self):
2544         super(TestIpsec6TunProtect, self).tearDown()
2545
2546     def test_tun_66(self):
2547         """IPSEC tunnel protect 6o6"""
2548
2549         p = self.ipv6_params
2550
2551         self.config_network(p)
2552         self.config_sa_tra(p)
2553         self.config_protect(p)
2554
2555         self.verify_tun_66(p, count=127)
2556         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2557         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2558
2559         # rekey - create new SAs and update the tunnel protection
2560         np = copy.copy(p)
2561         np.crypt_key = b"X" + p.crypt_key[1:]
2562         np.scapy_tun_spi += 100
2563         np.scapy_tun_sa_id += 1
2564         np.vpp_tun_spi += 100
2565         np.vpp_tun_sa_id += 1
2566         np.tun_if.local_spi = p.vpp_tun_spi
2567         np.tun_if.remote_spi = p.scapy_tun_spi
2568
2569         self.config_sa_tra(np)
2570         self.config_protect(np)
2571         self.unconfig_sa(p)
2572
2573         self.verify_tun_66(np, count=127)
2574         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2575         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2576
2577         # bounce the interface state
2578         p.tun_if.admin_down()
2579         self.verify_drop_tun_66(np, count=127)
2580         node = "/err/ipsec6-tun-input/disabled"
2581         self.assertEqual(127, self.statistics.get_err_counter(node))
2582         p.tun_if.admin_up()
2583         self.verify_tun_66(np, count=127)
2584
2585         # 3 phase rekey
2586         #  1) add two input SAs [old, new]
2587         #  2) swap output SA to [new]
2588         #  3) use only [new] input SA
2589         np3 = copy.copy(np)
2590         np3.crypt_key = b"Z" + p.crypt_key[1:]
2591         np3.scapy_tun_spi += 100
2592         np3.scapy_tun_sa_id += 1
2593         np3.vpp_tun_spi += 100
2594         np3.vpp_tun_sa_id += 1
2595         np3.tun_if.local_spi = p.vpp_tun_spi
2596         np3.tun_if.remote_spi = p.scapy_tun_spi
2597
2598         self.config_sa_tra(np3)
2599
2600         # step 1;
2601         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2602         self.verify_tun_66(np, np, count=127)
2603         self.verify_tun_66(np3, np, count=127)
2604
2605         # step 2;
2606         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2607         self.verify_tun_66(np, np3, count=127)
2608         self.verify_tun_66(np3, np3, count=127)
2609
2610         # step 1;
2611         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2612         self.verify_tun_66(np3, np3, count=127)
2613         self.verify_drop_tun_rx_66(np, count=127)
2614
2615         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2616         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2617         self.unconfig_sa(np)
2618
2619         # teardown
2620         self.unconfig_protect(np3)
2621         self.unconfig_sa(np3)
2622         self.unconfig_network(p)
2623
2624     def test_tun_46(self):
2625         """IPSEC tunnel protect 4o6"""
2626
2627         p = self.ipv6_params
2628
2629         self.config_network(p)
2630         self.config_sa_tra(p)
2631         self.config_protect(p)
2632
2633         self.verify_tun_46(p, count=127)
2634         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2635         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2636
2637         # teardown
2638         self.unconfig_protect(p)
2639         self.unconfig_sa(p)
2640         self.unconfig_network(p)
2641
2642
2643 @tag_fixme_vpp_workers
2644 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2645     """IPsec IPv6 Tunnel protect - tunnel mode"""
2646
2647     encryption_type = ESP
2648     tun6_encrypt_node_name = "esp6-encrypt-tun"
2649     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2650
2651     def setUp(self):
2652         super(TestIpsec6TunProtectTun, self).setUp()
2653
2654         self.tun_if = self.pg0
2655
2656     def tearDown(self):
2657         super(TestIpsec6TunProtectTun, self).tearDown()
2658
2659     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2660         return [
2661             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2662             / sa.encrypt(
2663                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2664                 / IPv6(src=src, dst=dst)
2665                 / UDP(sport=1166, dport=2233)
2666                 / Raw(b"X" * payload_size)
2667             )
2668             for i in range(count)
2669         ]
2670
2671     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2672         return [
2673             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2674             / IPv6(src=src, dst=dst)
2675             / UDP(sport=1166, dport=2233)
2676             / Raw(b"X" * payload_size)
2677             for i in range(count)
2678         ]
2679
2680     def verify_decrypted6(self, p, rxs):
2681         for rx in rxs:
2682             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2683             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2684             self.assert_packet_checksums_valid(rx)
2685
2686     def verify_encrypted6(self, p, sa, rxs):
2687         for rx in rxs:
2688             try:
2689                 pkt = sa.decrypt(rx[IPv6])
2690                 if not pkt.haslayer(IPv6):
2691                     pkt = IPv6(pkt[Raw].load)
2692                 self.assert_packet_checksums_valid(pkt)
2693                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2694                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2695                 inner = pkt[IPv6].payload
2696                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2697
2698             except (IndexError, AssertionError):
2699                 self.logger.debug(ppp("Unexpected packet:", rx))
2700                 try:
2701                     self.logger.debug(ppp("Decrypted packet:", pkt))
2702                 except:
2703                     pass
2704                 raise
2705
2706     def test_tun_66(self):
2707         """IPSEC tunnel protect"""
2708
2709         p = self.ipv6_params
2710
2711         self.config_network(p)
2712         self.config_sa_tun(p)
2713         self.config_protect(p)
2714
2715         self.verify_tun_66(p, count=127)
2716
2717         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2718         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2719
2720         # rekey - create new SAs and update the tunnel protection
2721         np = copy.copy(p)
2722         np.crypt_key = b"X" + p.crypt_key[1:]
2723         np.scapy_tun_spi += 100
2724         np.scapy_tun_sa_id += 1
2725         np.vpp_tun_spi += 100
2726         np.vpp_tun_sa_id += 1
2727         np.tun_if.local_spi = p.vpp_tun_spi
2728         np.tun_if.remote_spi = p.scapy_tun_spi
2729
2730         self.config_sa_tun(np)
2731         self.config_protect(np)
2732         self.unconfig_sa(p)
2733
2734         self.verify_tun_66(np, count=127)
2735         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2736         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2737
2738         # teardown
2739         self.unconfig_protect(np)
2740         self.unconfig_sa(np)
2741         self.unconfig_network(p)
2742
2743
2744 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2745     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2746
2747     encryption_type = ESP
2748     tun6_encrypt_node_name = "esp6-encrypt-tun"
2749     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2750
2751     def setUp(self):
2752         super(TestIpsec6TunProtectTunDrop, self).setUp()
2753
2754         self.tun_if = self.pg0
2755
2756     def tearDown(self):
2757         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2758
2759     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2760         # the IP destination of the revelaed packet does not match
2761         # that assigned to the tunnel
2762         return [
2763             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2764             / sa.encrypt(
2765                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2766                 / IPv6(src=src, dst=dst)
2767                 / UDP(sport=1144, dport=2233)
2768                 / Raw(b"X" * payload_size)
2769             )
2770             for i in range(count)
2771         ]
2772
2773     def test_tun_drop_66(self):
2774         """IPSEC 6 tunnel protect bogus tunnel header"""
2775
2776         p = self.ipv6_params
2777
2778         self.config_network(p)
2779         self.config_sa_tun(p)
2780         self.config_protect(p)
2781
2782         tx = self.gen_encrypt_pkts6(
2783             p,
2784             p.scapy_tun_sa,
2785             self.tun_if,
2786             src=p.remote_tun_if_host,
2787             dst=self.pg1.remote_ip6,
2788             count=63,
2789         )
2790         self.send_and_assert_no_replies(self.tun_if, tx)
2791
2792         self.unconfig_protect(p)
2793         self.unconfig_sa(p)
2794         self.unconfig_network(p)
2795
2796
2797 class TemplateIpsecItf4(object):
2798     """IPsec Interface IPv4"""
2799
2800     encryption_type = ESP
2801     tun4_encrypt_node_name = "esp4-encrypt-tun"
2802     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2803     tun4_input_node = "ipsec4-tun-input"
2804
2805     def config_sa_tun(self, p, src, dst):
2806         config_tun_params(p, self.encryption_type, None, src, dst)
2807
2808         p.tun_sa_out = VppIpsecSA(
2809             self,
2810             p.scapy_tun_sa_id,
2811             p.scapy_tun_spi,
2812             p.auth_algo_vpp_id,
2813             p.auth_key,
2814             p.crypt_algo_vpp_id,
2815             p.crypt_key,
2816             self.vpp_esp_protocol,
2817             src,
2818             dst,
2819             flags=p.flags,
2820         )
2821         p.tun_sa_out.add_vpp_config()
2822
2823         p.tun_sa_in = VppIpsecSA(
2824             self,
2825             p.vpp_tun_sa_id,
2826             p.vpp_tun_spi,
2827             p.auth_algo_vpp_id,
2828             p.auth_key,
2829             p.crypt_algo_vpp_id,
2830             p.crypt_key,
2831             self.vpp_esp_protocol,
2832             dst,
2833             src,
2834             flags=p.flags,
2835         )
2836         p.tun_sa_in.add_vpp_config()
2837
2838     def config_protect(self, p):
2839         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2840         p.tun_protect.add_vpp_config()
2841
2842     def config_network(self, p, instance=0xFFFFFFFF):
2843         p.tun_if = VppIpsecInterface(self, instance=instance)
2844
2845         p.tun_if.add_vpp_config()
2846         p.tun_if.admin_up()
2847         p.tun_if.config_ip4()
2848         p.tun_if.config_ip6()
2849
2850         p.route = VppIpRoute(
2851             self,
2852             p.remote_tun_if_host,
2853             32,
2854             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
2855         )
2856         p.route.add_vpp_config()
2857         r = VppIpRoute(
2858             self,
2859             p.remote_tun_if_host6,
2860             128,
2861             [
2862                 VppRoutePath(
2863                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2864                 )
2865             ],
2866         )
2867         r.add_vpp_config()
2868
2869     def unconfig_network(self, p):
2870         p.route.remove_vpp_config()
2871         p.tun_if.remove_vpp_config()
2872
2873     def unconfig_protect(self, p):
2874         p.tun_protect.remove_vpp_config()
2875
2876     def unconfig_sa(self, p):
2877         p.tun_sa_out.remove_vpp_config()
2878         p.tun_sa_in.remove_vpp_config()
2879
2880
2881 @tag_fixme_vpp_workers
2882 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
2883     """IPsec Interface IPv4"""
2884
2885     def setUp(self):
2886         super(TestIpsecItf4, self).setUp()
2887
2888         self.tun_if = self.pg0
2889
2890     def tearDown(self):
2891         super(TestIpsecItf4, self).tearDown()
2892
2893     def test_tun_instance_44(self):
2894         p = self.ipv4_params
2895         self.config_network(p, instance=3)
2896
2897         with self.assertRaises(CliFailedCommandError):
2898             self.vapi.cli("show interface ipsec0")
2899
2900         output = self.vapi.cli("show interface ipsec3")
2901         self.assertTrue("unknown" not in output)
2902
2903         self.unconfig_network(p)
2904
2905     def test_tun_44(self):
2906         """IPSEC interface IPv4"""
2907
2908         n_pkts = 127
2909         p = self.ipv4_params
2910
2911         self.config_network(p)
2912         config_tun_params(
2913             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
2914         )
2915         self.verify_tun_dropped_44(p, count=n_pkts)
2916         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2917         self.config_protect(p)
2918
2919         self.verify_tun_44(p, count=n_pkts)
2920         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2921         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2922
2923         p.tun_if.admin_down()
2924         self.verify_tun_dropped_44(p, count=n_pkts)
2925         p.tun_if.admin_up()
2926         self.verify_tun_44(p, count=n_pkts)
2927
2928         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
2929         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
2930
2931         # it's a v6 packet when its encrypted
2932         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2933
2934         self.verify_tun_64(p, count=n_pkts)
2935         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
2936         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
2937
2938         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2939
2940         self.vapi.cli("clear interfaces")
2941
2942         # rekey - create new SAs and update the tunnel protection
2943         np = copy.copy(p)
2944         np.crypt_key = b"X" + p.crypt_key[1:]
2945         np.scapy_tun_spi += 100
2946         np.scapy_tun_sa_id += 1
2947         np.vpp_tun_spi += 100
2948         np.vpp_tun_sa_id += 1
2949         np.tun_if.local_spi = p.vpp_tun_spi
2950         np.tun_if.remote_spi = p.scapy_tun_spi
2951
2952         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
2953         self.config_protect(np)
2954         self.unconfig_sa(p)
2955
2956         self.verify_tun_44(np, count=n_pkts)
2957         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2958         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2959
2960         # teardown
2961         self.unconfig_protect(np)
2962         self.unconfig_sa(np)
2963         self.unconfig_network(p)
2964
2965     def test_tun_44_null(self):
2966         """IPSEC interface IPv4 NULL auth/crypto"""
2967
2968         n_pkts = 127
2969         p = copy.copy(self.ipv4_params)
2970
2971         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
2972         p.crypt_algo_vpp_id = (
2973             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
2974         )
2975         p.crypt_algo = "NULL"
2976         p.auth_algo = "NULL"
2977
2978         self.config_network(p)
2979         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2980         self.config_protect(p)
2981
2982         self.logger.info(self.vapi.cli("sh ipsec sa"))
2983         self.verify_tun_44(p, count=n_pkts)
2984
2985         # teardown
2986         self.unconfig_protect(p)
2987         self.unconfig_sa(p)
2988         self.unconfig_network(p)
2989
2990     def test_tun_44_police(self):
2991         """IPSEC interface IPv4 with input policer"""
2992         n_pkts = 127
2993         p = self.ipv4_params
2994
2995         self.config_network(p)
2996         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2997         self.config_protect(p)
2998
2999         action_tx = PolicerAction(
3000             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3001         )
3002         policer = VppPolicer(
3003             self,
3004             "pol1",
3005             80,
3006             0,
3007             1000,
3008             0,
3009             conform_action=action_tx,
3010             exceed_action=action_tx,
3011             violate_action=action_tx,
3012         )
3013         policer.add_vpp_config()
3014
3015         # Start policing on tun
3016         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3017
3018         self.verify_tun_44(p, count=n_pkts)
3019         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3020         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3021
3022         stats = policer.get_stats()
3023
3024         # Single rate, 2 colour policer - expect conform, violate but no exceed
3025         self.assertGreater(stats["conform_packets"], 0)
3026         self.assertEqual(stats["exceed_packets"], 0)
3027         self.assertGreater(stats["violate_packets"], 0)
3028
3029         # Stop policing on tun
3030         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3031         self.verify_tun_44(p, count=n_pkts)
3032
3033         # No new policer stats
3034         statsnew = policer.get_stats()
3035         self.assertEqual(stats, statsnew)
3036
3037         # teardown
3038         policer.remove_vpp_config()
3039         self.unconfig_protect(p)
3040         self.unconfig_sa(p)
3041         self.unconfig_network(p)
3042
3043
3044 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3045     """IPsec Interface MPLSoIPv4"""
3046
3047     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3048
3049     def setUp(self):
3050         super(TestIpsecItf4MPLS, self).setUp()
3051
3052         self.tun_if = self.pg0
3053
3054     def tearDown(self):
3055         super(TestIpsecItf4MPLS, self).tearDown()
3056
3057     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3058         return [
3059             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3060             / sa.encrypt(
3061                 MPLS(label=44, ttl=3)
3062                 / IP(src=src, dst=dst)
3063                 / UDP(sport=1166, dport=2233)
3064                 / Raw(b"X" * payload_size)
3065             )
3066             for i in range(count)
3067         ]
3068
3069     def verify_encrypted(self, p, sa, rxs):
3070         for rx in rxs:
3071             try:
3072                 pkt = sa.decrypt(rx[IP])
3073                 if not pkt.haslayer(IP):
3074                     pkt = IP(pkt[Raw].load)
3075                 self.assert_packet_checksums_valid(pkt)
3076                 self.assert_equal(pkt[MPLS].label, 44)
3077                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3078             except (IndexError, AssertionError):
3079                 self.logger.debug(ppp("Unexpected packet:", rx))
3080                 try:
3081                     self.logger.debug(ppp("Decrypted packet:", pkt))
3082                 except:
3083                     pass
3084                 raise
3085
3086     def test_tun_mpls_o_ip4(self):
3087         """IPSEC interface MPLS over IPv4"""
3088
3089         n_pkts = 127
3090         p = self.ipv4_params
3091         f = FibPathProto
3092
3093         tbl = VppMplsTable(self, 0)
3094         tbl.add_vpp_config()
3095
3096         self.config_network(p)
3097         # deag MPLS routes from the tunnel
3098         r4 = VppMplsRoute(
3099             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3100         ).add_vpp_config()
3101         p.route.modify(
3102             [
3103                 VppRoutePath(
3104                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3105                 )
3106             ]
3107         )
3108         p.tun_if.enable_mpls()
3109
3110         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3111         self.config_protect(p)
3112
3113         self.verify_tun_44(p, count=n_pkts)
3114
3115         # cleanup
3116         p.tun_if.disable_mpls()
3117         self.unconfig_protect(p)
3118         self.unconfig_sa(p)
3119         self.unconfig_network(p)
3120
3121
3122 class TemplateIpsecItf6(object):
3123     """IPsec Interface IPv6"""
3124
3125     encryption_type = ESP
3126     tun6_encrypt_node_name = "esp6-encrypt-tun"
3127     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3128     tun6_input_node = "ipsec6-tun-input"
3129
3130     def config_sa_tun(self, p, src, dst):
3131         config_tun_params(p, self.encryption_type, None, src, dst)
3132
3133         if not hasattr(p, "tun_flags"):
3134             p.tun_flags = None
3135         if not hasattr(p, "hop_limit"):
3136             p.hop_limit = 255
3137
3138         p.tun_sa_out = VppIpsecSA(
3139             self,
3140             p.scapy_tun_sa_id,
3141             p.scapy_tun_spi,
3142             p.auth_algo_vpp_id,
3143             p.auth_key,
3144             p.crypt_algo_vpp_id,
3145             p.crypt_key,
3146             self.vpp_esp_protocol,
3147             src,
3148             dst,
3149             flags=p.flags,
3150             tun_flags=p.tun_flags,
3151             hop_limit=p.hop_limit,
3152         )
3153         p.tun_sa_out.add_vpp_config()
3154
3155         p.tun_sa_in = VppIpsecSA(
3156             self,
3157             p.vpp_tun_sa_id,
3158             p.vpp_tun_spi,
3159             p.auth_algo_vpp_id,
3160             p.auth_key,
3161             p.crypt_algo_vpp_id,
3162             p.crypt_key,
3163             self.vpp_esp_protocol,
3164             dst,
3165             src,
3166             flags=p.flags,
3167         )
3168         p.tun_sa_in.add_vpp_config()
3169
3170     def config_protect(self, p):
3171         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3172         p.tun_protect.add_vpp_config()
3173
3174     def config_network(self, p):
3175         p.tun_if = VppIpsecInterface(self)
3176
3177         p.tun_if.add_vpp_config()
3178         p.tun_if.admin_up()
3179         p.tun_if.config_ip4()
3180         p.tun_if.config_ip6()
3181
3182         r = VppIpRoute(
3183             self,
3184             p.remote_tun_if_host4,
3185             32,
3186             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3187         )
3188         r.add_vpp_config()
3189
3190         p.route = VppIpRoute(
3191             self,
3192             p.remote_tun_if_host,
3193             128,
3194             [
3195                 VppRoutePath(
3196                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3197                 )
3198             ],
3199         )
3200         p.route.add_vpp_config()
3201
3202     def unconfig_network(self, p):
3203         p.route.remove_vpp_config()
3204         p.tun_if.remove_vpp_config()
3205
3206     def unconfig_protect(self, p):
3207         p.tun_protect.remove_vpp_config()
3208
3209     def unconfig_sa(self, p):
3210         p.tun_sa_out.remove_vpp_config()
3211         p.tun_sa_in.remove_vpp_config()
3212
3213
3214 @tag_fixme_vpp_workers
3215 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3216     """IPsec Interface IPv6"""
3217
3218     def setUp(self):
3219         super(TestIpsecItf6, self).setUp()
3220
3221         self.tun_if = self.pg0
3222
3223     def tearDown(self):
3224         super(TestIpsecItf6, self).tearDown()
3225
3226     def test_tun_66(self):
3227         """IPSEC interface IPv6"""
3228
3229         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3230         n_pkts = 127
3231         p = self.ipv6_params
3232         p.inner_hop_limit = 24
3233         p.outer_hop_limit = 23
3234         p.outer_flow_label = 243224
3235         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3236
3237         self.config_network(p)
3238         config_tun_params(
3239             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3240         )
3241         self.verify_drop_tun_66(p, count=n_pkts)
3242         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3243         self.config_protect(p)
3244
3245         self.verify_tun_66(p, count=n_pkts)
3246         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3247         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3248
3249         p.tun_if.admin_down()
3250         self.verify_drop_tun_66(p, count=n_pkts)
3251         p.tun_if.admin_up()
3252         self.verify_tun_66(p, count=n_pkts)
3253
3254         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3255         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3256
3257         # it's a v4 packet when its encrypted
3258         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3259
3260         self.verify_tun_46(p, count=n_pkts)
3261         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3262         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3263
3264         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3265
3266         self.vapi.cli("clear interfaces")
3267
3268         # rekey - create new SAs and update the tunnel protection
3269         np = copy.copy(p)
3270         np.crypt_key = b"X" + p.crypt_key[1:]
3271         np.scapy_tun_spi += 100
3272         np.scapy_tun_sa_id += 1
3273         np.vpp_tun_spi += 100
3274         np.vpp_tun_sa_id += 1
3275         np.tun_if.local_spi = p.vpp_tun_spi
3276         np.tun_if.remote_spi = p.scapy_tun_spi
3277         np.inner_hop_limit = 24
3278         np.outer_hop_limit = 128
3279         np.inner_flow_label = 0xABCDE
3280         np.outer_flow_label = 0xABCDE
3281         np.hop_limit = 128
3282         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3283
3284         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3285         self.config_protect(np)
3286         self.unconfig_sa(p)
3287
3288         self.verify_tun_66(np, count=n_pkts)
3289         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3290         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3291
3292         # teardown
3293         self.unconfig_protect(np)
3294         self.unconfig_sa(np)
3295         self.unconfig_network(p)
3296
3297     def test_tun_66_police(self):
3298         """IPSEC interface IPv6 with input policer"""
3299         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3300         n_pkts = 127
3301         p = self.ipv6_params
3302         p.inner_hop_limit = 24
3303         p.outer_hop_limit = 23
3304         p.outer_flow_label = 243224
3305         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3306
3307         self.config_network(p)
3308         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3309         self.config_protect(p)
3310
3311         action_tx = PolicerAction(
3312             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3313         )
3314         policer = VppPolicer(
3315             self,
3316             "pol1",
3317             80,
3318             0,
3319             1000,
3320             0,
3321             conform_action=action_tx,
3322             exceed_action=action_tx,
3323             violate_action=action_tx,
3324         )
3325         policer.add_vpp_config()
3326
3327         # Start policing on tun
3328         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3329
3330         self.verify_tun_66(p, count=n_pkts)
3331         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3332         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3333
3334         stats = policer.get_stats()
3335
3336         # Single rate, 2 colour policer - expect conform, violate but no exceed
3337         self.assertGreater(stats["conform_packets"], 0)
3338         self.assertEqual(stats["exceed_packets"], 0)
3339         self.assertGreater(stats["violate_packets"], 0)
3340
3341         # Stop policing on tun
3342         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3343         self.verify_tun_66(p, count=n_pkts)
3344
3345         # No new policer stats
3346         statsnew = policer.get_stats()
3347         self.assertEqual(stats, statsnew)
3348
3349         # teardown
3350         policer.remove_vpp_config()
3351         self.unconfig_protect(p)
3352         self.unconfig_sa(p)
3353         self.unconfig_network(p)
3354
3355
3356 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3357     """Ipsec P2MP ESP v4 tests"""
3358
3359     tun4_encrypt_node_name = "esp4-encrypt-tun"
3360     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3361     encryption_type = ESP
3362
3363     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3364         return [
3365             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3366             / sa.encrypt(
3367                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3368                 / UDP(sport=1144, dport=2233)
3369                 / Raw(b"X" * payload_size)
3370             )
3371             for i in range(count)
3372         ]
3373
3374     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3375         return [
3376             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3377             / IP(src="1.1.1.1", dst=dst)
3378             / UDP(sport=1144, dport=2233)
3379             / Raw(b"X" * payload_size)
3380             for i in range(count)
3381         ]
3382
3383     def verify_decrypted(self, p, rxs):
3384         for rx in rxs:
3385             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3386             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3387
3388     def verify_encrypted(self, p, sa, rxs):
3389         for rx in rxs:
3390             try:
3391                 self.assertEqual(
3392                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3393                 )
3394                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3395                 pkt = sa.decrypt(rx[IP])
3396                 if not pkt.haslayer(IP):
3397                     pkt = IP(pkt[Raw].load)
3398                 self.assert_packet_checksums_valid(pkt)
3399                 e = pkt[IP]
3400                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3401             except (IndexError, AssertionError):
3402                 self.logger.debug(ppp("Unexpected packet:", rx))
3403                 try:
3404                     self.logger.debug(ppp("Decrypted packet:", pkt))
3405                 except:
3406                     pass
3407                 raise
3408
3409     def setUp(self):
3410         super(TestIpsecMIfEsp4, self).setUp()
3411
3412         N_NHS = 16
3413         self.tun_if = self.pg0
3414         p = self.ipv4_params
3415         p.tun_if = VppIpsecInterface(
3416             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3417         )
3418         p.tun_if.add_vpp_config()
3419         p.tun_if.admin_up()
3420         p.tun_if.config_ip4()
3421         p.tun_if.unconfig_ip4()
3422         p.tun_if.config_ip4()
3423         p.tun_if.generate_remote_hosts(N_NHS)
3424         self.pg0.generate_remote_hosts(N_NHS)
3425         self.pg0.configure_ipv4_neighbors()
3426
3427         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3428         a = VppAcl(self, [r_all]).add_vpp_config()
3429
3430         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3431         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3432
3433         # setup some SAs for several next-hops on the interface
3434         self.multi_params = []
3435
3436         for ii in range(N_NHS):
3437             p = copy.copy(self.ipv4_params)
3438
3439             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3440             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3441             p.scapy_tun_spi = p.scapy_tun_spi + ii
3442             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3443             p.vpp_tun_spi = p.vpp_tun_spi + ii
3444
3445             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3446             p.scapy_tra_spi = p.scapy_tra_spi + ii
3447             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3448             p.vpp_tra_spi = p.vpp_tra_spi + ii
3449             p.hop_limit = ii + 10
3450             p.tun_sa_out = VppIpsecSA(
3451                 self,
3452                 p.scapy_tun_sa_id,
3453                 p.scapy_tun_spi,
3454                 p.auth_algo_vpp_id,
3455                 p.auth_key,
3456                 p.crypt_algo_vpp_id,
3457                 p.crypt_key,
3458                 self.vpp_esp_protocol,
3459                 self.pg0.local_ip4,
3460                 self.pg0.remote_hosts[ii].ip4,
3461                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3462                 hop_limit=p.hop_limit,
3463             )
3464             p.tun_sa_out.add_vpp_config()
3465
3466             p.tun_sa_in = VppIpsecSA(
3467                 self,
3468                 p.vpp_tun_sa_id,
3469                 p.vpp_tun_spi,
3470                 p.auth_algo_vpp_id,
3471                 p.auth_key,
3472                 p.crypt_algo_vpp_id,
3473                 p.crypt_key,
3474                 self.vpp_esp_protocol,
3475                 self.pg0.remote_hosts[ii].ip4,
3476                 self.pg0.local_ip4,
3477                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3478                 hop_limit=p.hop_limit,
3479             )
3480             p.tun_sa_in.add_vpp_config()
3481
3482             p.tun_protect = VppIpsecTunProtect(
3483                 self,
3484                 p.tun_if,
3485                 p.tun_sa_out,
3486                 [p.tun_sa_in],
3487                 nh=p.tun_if.remote_hosts[ii].ip4,
3488             )
3489             p.tun_protect.add_vpp_config()
3490             config_tun_params(
3491                 p,
3492                 self.encryption_type,
3493                 None,
3494                 self.pg0.local_ip4,
3495                 self.pg0.remote_hosts[ii].ip4,
3496             )
3497             self.multi_params.append(p)
3498
3499             p.via_tun_route = VppIpRoute(
3500                 self,
3501                 p.remote_tun_if_host,
3502                 32,
3503                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3504             ).add_vpp_config()
3505
3506             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3507
3508     def tearDown(self):
3509         p = self.ipv4_params
3510         p.tun_if.unconfig_ip4()
3511         super(TestIpsecMIfEsp4, self).tearDown()
3512
3513     def test_tun_44(self):
3514         """P2MP IPSEC 44"""
3515         N_PKTS = 63
3516         for p in self.multi_params:
3517             self.verify_tun_44(p, count=N_PKTS)
3518
3519         # remove one tunnel protect, the rest should still work
3520         self.multi_params[0].tun_protect.remove_vpp_config()
3521         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3522         self.multi_params[0].via_tun_route.remove_vpp_config()
3523         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3524
3525         for p in self.multi_params[1:]:
3526             self.verify_tun_44(p, count=N_PKTS)
3527
3528         self.multi_params[0].tun_protect.add_vpp_config()
3529         self.multi_params[0].via_tun_route.add_vpp_config()
3530
3531         for p in self.multi_params:
3532             self.verify_tun_44(p, count=N_PKTS)
3533
3534
3535 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3536     """IPsec Interface MPLSoIPv6"""
3537
3538     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3539
3540     def setUp(self):
3541         super(TestIpsecItf6MPLS, self).setUp()
3542
3543         self.tun_if = self.pg0
3544
3545     def tearDown(self):
3546         super(TestIpsecItf6MPLS, self).tearDown()
3547
3548     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3549         return [
3550             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3551             / sa.encrypt(
3552                 MPLS(label=66, ttl=3)
3553                 / IPv6(src=src, dst=dst)
3554                 / UDP(sport=1166, dport=2233)
3555                 / Raw(b"X" * payload_size)
3556             )
3557             for i in range(count)
3558         ]
3559
3560     def verify_encrypted6(self, p, sa, rxs):
3561         for rx in rxs:
3562             try:
3563                 pkt = sa.decrypt(rx[IPv6])
3564                 if not pkt.haslayer(IPv6):
3565                     pkt = IP(pkt[Raw].load)
3566                 self.assert_packet_checksums_valid(pkt)
3567                 self.assert_equal(pkt[MPLS].label, 66)
3568                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3569             except (IndexError, AssertionError):
3570                 self.logger.debug(ppp("Unexpected packet:", rx))
3571                 try:
3572                     self.logger.debug(ppp("Decrypted packet:", pkt))
3573                 except:
3574                     pass
3575                 raise
3576
3577     def test_tun_mpls_o_ip6(self):
3578         """IPSEC interface MPLS over IPv6"""
3579
3580         n_pkts = 127
3581         p = self.ipv6_params
3582         f = FibPathProto
3583
3584         tbl = VppMplsTable(self, 0)
3585         tbl.add_vpp_config()
3586
3587         self.config_network(p)
3588         # deag MPLS routes from the tunnel
3589         r6 = VppMplsRoute(
3590             self,
3591             66,
3592             1,
3593             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3594             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3595         ).add_vpp_config()
3596         p.route.modify(
3597             [
3598                 VppRoutePath(
3599                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3600                 )
3601             ]
3602         )
3603         p.tun_if.enable_mpls()
3604
3605         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3606         self.config_protect(p)
3607
3608         self.verify_tun_66(p, count=n_pkts)
3609
3610         # cleanup
3611         p.tun_if.disable_mpls()
3612         self.unconfig_protect(p)
3613         self.unconfig_sa(p)
3614         self.unconfig_network(p)
3615
3616
3617 if __name__ == "__main__":
3618     unittest.main(testRunner=VppTestRunner)