tests: replace pycodestyle with black
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
14     TemplateIpsec,
15     IpsecTun4Tests,
16     IpsecTun6Tests,
17     IpsecTun4,
18     IpsecTun6,
19     IpsecTcpTests,
20     mk_scapy_crypt_key,
21     IpsecTun6HandoffTests,
22     IpsecTun4HandoffTests,
23     config_tun_params,
24 )
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
28     VppIpRoute,
29     VppRoutePath,
30     DpoProto,
31     VppMplsLabel,
32     VppMplsTable,
33     VppMplsRoute,
34     FibPathProto,
35 )
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
40 from util import ppp
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
45
46
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
49     esn_en = bool(
50         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
51     )
52     crypt_key = mk_scapy_crypt_key(p)
53     if tun_if:
54         p.tun_dst = tun_if.remote_ip
55         p.tun_src = tun_if.local_ip
56     else:
57         p.tun_dst = dst
58         p.tun_src = src
59
60     if p.nat_header:
61         is_default_port = p.nat_header.dport == 4500
62     else:
63         is_default_port = True
64
65     if is_default_port:
66         outbound_nat_header = p.nat_header
67     else:
68         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69         bind_layers(UDP, ESP, dport=p.nat_header.dport)
70
71     p.scapy_tun_sa = SecurityAssociation(
72         encryption_type,
73         spi=p.vpp_tun_spi,
74         crypt_algo=p.crypt_algo,
75         crypt_key=crypt_key,
76         auth_algo=p.auth_algo,
77         auth_key=p.auth_key,
78         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79         nat_t_header=outbound_nat_header,
80         esn_en=esn_en,
81     )
82     p.vpp_tun_sa = SecurityAssociation(
83         encryption_type,
84         spi=p.scapy_tun_spi,
85         crypt_algo=p.crypt_algo,
86         crypt_key=crypt_key,
87         auth_algo=p.auth_algo,
88         auth_key=p.auth_key,
89         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90         nat_t_header=p.nat_header,
91         esn_en=esn_en,
92     )
93
94
95 def config_tra_params(p, encryption_type, tun_if):
96     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97     esn_en = bool(
98         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
99     )
100     crypt_key = mk_scapy_crypt_key(p)
101     p.tun_dst = tun_if.remote_ip
102     p.tun_src = tun_if.local_ip
103
104     if p.nat_header:
105         is_default_port = p.nat_header.dport == 4500
106     else:
107         is_default_port = True
108
109     if is_default_port:
110         outbound_nat_header = p.nat_header
111     else:
112         outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113         bind_layers(UDP, ESP, dport=p.nat_header.dport)
114
115     p.scapy_tun_sa = SecurityAssociation(
116         encryption_type,
117         spi=p.vpp_tun_spi,
118         crypt_algo=p.crypt_algo,
119         crypt_key=crypt_key,
120         auth_algo=p.auth_algo,
121         auth_key=p.auth_key,
122         esn_en=esn_en,
123         nat_t_header=outbound_nat_header,
124     )
125     p.vpp_tun_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.scapy_tun_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         esn_en=esn_en,
133         nat_t_header=p.nat_header,
134     )
135
136
137 class TemplateIpsec4TunProtect(object):
138     """IPsec IPv4 Tunnel protect"""
139
140     encryption_type = ESP
141     tun4_encrypt_node_name = "esp4-encrypt-tun"
142     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143     tun4_input_node = "ipsec4-tun-input"
144
145     def config_sa_tra(self, p):
146         config_tun_params(p, self.encryption_type, p.tun_if)
147
148         p.tun_sa_out = VppIpsecSA(
149             self,
150             p.scapy_tun_sa_id,
151             p.scapy_tun_spi,
152             p.auth_algo_vpp_id,
153             p.auth_key,
154             p.crypt_algo_vpp_id,
155             p.crypt_key,
156             self.vpp_esp_protocol,
157             flags=p.flags,
158         )
159         p.tun_sa_out.add_vpp_config()
160
161         p.tun_sa_in = VppIpsecSA(
162             self,
163             p.vpp_tun_sa_id,
164             p.vpp_tun_spi,
165             p.auth_algo_vpp_id,
166             p.auth_key,
167             p.crypt_algo_vpp_id,
168             p.crypt_key,
169             self.vpp_esp_protocol,
170             flags=p.flags,
171         )
172         p.tun_sa_in.add_vpp_config()
173
174     def config_sa_tun(self, p):
175         config_tun_params(p, self.encryption_type, p.tun_if)
176
177         p.tun_sa_out = VppIpsecSA(
178             self,
179             p.scapy_tun_sa_id,
180             p.scapy_tun_spi,
181             p.auth_algo_vpp_id,
182             p.auth_key,
183             p.crypt_algo_vpp_id,
184             p.crypt_key,
185             self.vpp_esp_protocol,
186             self.tun_if.local_addr[p.addr_type],
187             self.tun_if.remote_addr[p.addr_type],
188             flags=p.flags,
189         )
190         p.tun_sa_out.add_vpp_config()
191
192         p.tun_sa_in = VppIpsecSA(
193             self,
194             p.vpp_tun_sa_id,
195             p.vpp_tun_spi,
196             p.auth_algo_vpp_id,
197             p.auth_key,
198             p.crypt_algo_vpp_id,
199             p.crypt_key,
200             self.vpp_esp_protocol,
201             self.tun_if.remote_addr[p.addr_type],
202             self.tun_if.local_addr[p.addr_type],
203             flags=p.flags,
204         )
205         p.tun_sa_in.add_vpp_config()
206
207     def config_protect(self, p):
208         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209         p.tun_protect.add_vpp_config()
210
211     def config_network(self, p):
212         if hasattr(p, "tun_dst"):
213             tun_dst = p.tun_dst
214         else:
215             tun_dst = self.pg0.remote_ip4
216         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217         p.tun_if.add_vpp_config()
218         p.tun_if.admin_up()
219         p.tun_if.config_ip4()
220         p.tun_if.config_ip6()
221
222         p.route = VppIpRoute(
223             self,
224             p.remote_tun_if_host,
225             32,
226             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
227         )
228         p.route.add_vpp_config()
229         r = VppIpRoute(
230             self,
231             p.remote_tun_if_host6,
232             128,
233             [
234                 VppRoutePath(
235                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
236                 )
237             ],
238         )
239         r.add_vpp_config()
240
241     def unconfig_network(self, p):
242         p.route.remove_vpp_config()
243         p.tun_if.remove_vpp_config()
244
245     def unconfig_protect(self, p):
246         p.tun_protect.remove_vpp_config()
247
248     def unconfig_sa(self, p):
249         p.tun_sa_out.remove_vpp_config()
250         p.tun_sa_in.remove_vpp_config()
251
252
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254     """IPsec tunnel interface tests"""
255
256     encryption_type = ESP
257
258     @classmethod
259     def setUpClass(cls):
260         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
265
266     def setUp(self):
267         super(TemplateIpsec4TunIfEsp, self).setUp()
268
269         self.tun_if = self.pg0
270
271         p = self.ipv4_params
272
273         self.config_network(p)
274         self.config_sa_tra(p)
275         self.config_protect(p)
276
277     def tearDown(self):
278         super(TemplateIpsec4TunIfEsp, self).tearDown()
279
280
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282     """IPsec UDP tunnel interface tests"""
283
284     tun4_encrypt_node_name = "esp4-encrypt-tun"
285     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286     encryption_type = ESP
287
288     @classmethod
289     def setUpClass(cls):
290         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
291
292     @classmethod
293     def tearDownClass(cls):
294         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
295
296     def verify_encrypted(self, p, sa, rxs):
297         for rx in rxs:
298             try:
299                 # ensure the UDP ports are correct before we decrypt
300                 # which strips them
301                 self.assertTrue(rx.haslayer(UDP))
302                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303                 self.assert_equal(rx[UDP].dport, 4500)
304
305                 pkt = sa.decrypt(rx[IP])
306                 if not pkt.haslayer(IP):
307                     pkt = IP(pkt[Raw].load)
308
309                 self.assert_packet_checksums_valid(pkt)
310                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312             except (IndexError, AssertionError):
313                 self.logger.debug(ppp("Unexpected packet:", rx))
314                 try:
315                     self.logger.debug(ppp("Decrypted packet:", pkt))
316                 except:
317                     pass
318                 raise
319
320     def config_sa_tra(self, p):
321         config_tun_params(p, self.encryption_type, p.tun_if)
322
323         p.tun_sa_out = VppIpsecSA(
324             self,
325             p.scapy_tun_sa_id,
326             p.scapy_tun_spi,
327             p.auth_algo_vpp_id,
328             p.auth_key,
329             p.crypt_algo_vpp_id,
330             p.crypt_key,
331             self.vpp_esp_protocol,
332             flags=p.flags,
333             udp_src=p.nat_header.sport,
334             udp_dst=p.nat_header.dport,
335         )
336         p.tun_sa_out.add_vpp_config()
337
338         p.tun_sa_in = VppIpsecSA(
339             self,
340             p.vpp_tun_sa_id,
341             p.vpp_tun_spi,
342             p.auth_algo_vpp_id,
343             p.auth_key,
344             p.crypt_algo_vpp_id,
345             p.crypt_key,
346             self.vpp_esp_protocol,
347             flags=p.flags,
348             udp_src=p.nat_header.sport,
349             udp_dst=p.nat_header.dport,
350         )
351         p.tun_sa_in.add_vpp_config()
352
353     def setUp(self):
354         super(TemplateIpsec4TunIfEspUdp, self).setUp()
355
356         p = self.ipv4_params
357         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358         p.nat_header = UDP(sport=5454, dport=4500)
359
360         self.tun_if = self.pg0
361
362         self.config_network(p)
363         self.config_sa_tra(p)
364         self.config_protect(p)
365
366     def tearDown(self):
367         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
368
369
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371     """Ipsec ESP - TUN tests"""
372
373     tun4_encrypt_node_name = "esp4-encrypt-tun"
374     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
375
376     def test_tun_basic64(self):
377         """ipsec 6o4 tunnel basic test"""
378         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
379
380         self.verify_tun_64(self.params[socket.AF_INET], count=1)
381
382     def test_tun_burst64(self):
383         """ipsec 6o4 tunnel basic test"""
384         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
385
386         self.verify_tun_64(self.params[socket.AF_INET], count=257)
387
388     def test_tun_basic_frag44(self):
389         """ipsec 4o4 tunnel frag basic test"""
390         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
391
392         p = self.ipv4_params
393
394         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
395         self.verify_tun_44(
396             self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
397         )
398         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
399
400
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402     """Ipsec ESP UDP tests"""
403
404     tun4_input_node = "ipsec4-tun-input"
405
406     def setUp(self):
407         super(TestIpsec4TunIfEspUdp, self).setUp()
408
409     def test_keepalive(self):
410         """IPSEC NAT Keepalive"""
411         self.verify_keepalive(self.ipv4_params)
412
413
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415     """Ipsec ESP UDP GCM tests"""
416
417     tun4_input_node = "ipsec4-tun-input"
418
419     def setUp(self):
420         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
421         p = self.ipv4_params
422         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423         p.crypt_algo_vpp_id = (
424             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
425         )
426         p.crypt_algo = "AES-GCM"
427         p.auth_algo = "NULL"
428         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
429         p.salt = 0
430
431
432 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
433     """Ipsec ESP - TCP tests"""
434
435     pass
436
437
438 class TemplateIpsec6TunProtect(object):
439     """IPsec IPv6 Tunnel protect"""
440
441     def config_sa_tra(self, p):
442         config_tun_params(p, self.encryption_type, p.tun_if)
443
444         p.tun_sa_out = VppIpsecSA(
445             self,
446             p.scapy_tun_sa_id,
447             p.scapy_tun_spi,
448             p.auth_algo_vpp_id,
449             p.auth_key,
450             p.crypt_algo_vpp_id,
451             p.crypt_key,
452             self.vpp_esp_protocol,
453         )
454         p.tun_sa_out.add_vpp_config()
455
456         p.tun_sa_in = VppIpsecSA(
457             self,
458             p.vpp_tun_sa_id,
459             p.vpp_tun_spi,
460             p.auth_algo_vpp_id,
461             p.auth_key,
462             p.crypt_algo_vpp_id,
463             p.crypt_key,
464             self.vpp_esp_protocol,
465         )
466         p.tun_sa_in.add_vpp_config()
467
468     def config_sa_tun(self, p):
469         config_tun_params(p, self.encryption_type, p.tun_if)
470
471         p.tun_sa_out = VppIpsecSA(
472             self,
473             p.scapy_tun_sa_id,
474             p.scapy_tun_spi,
475             p.auth_algo_vpp_id,
476             p.auth_key,
477             p.crypt_algo_vpp_id,
478             p.crypt_key,
479             self.vpp_esp_protocol,
480             self.tun_if.local_addr[p.addr_type],
481             self.tun_if.remote_addr[p.addr_type],
482         )
483         p.tun_sa_out.add_vpp_config()
484
485         p.tun_sa_in = VppIpsecSA(
486             self,
487             p.vpp_tun_sa_id,
488             p.vpp_tun_spi,
489             p.auth_algo_vpp_id,
490             p.auth_key,
491             p.crypt_algo_vpp_id,
492             p.crypt_key,
493             self.vpp_esp_protocol,
494             self.tun_if.remote_addr[p.addr_type],
495             self.tun_if.local_addr[p.addr_type],
496         )
497         p.tun_sa_in.add_vpp_config()
498
499     def config_protect(self, p):
500         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
501         p.tun_protect.add_vpp_config()
502
503     def config_network(self, p):
504         if hasattr(p, "tun_dst"):
505             tun_dst = p.tun_dst
506         else:
507             tun_dst = self.pg0.remote_ip6
508         p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
509         p.tun_if.add_vpp_config()
510         p.tun_if.admin_up()
511         p.tun_if.config_ip6()
512         p.tun_if.config_ip4()
513
514         p.route = VppIpRoute(
515             self,
516             p.remote_tun_if_host,
517             128,
518             [
519                 VppRoutePath(
520                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
521                 )
522             ],
523         )
524         p.route.add_vpp_config()
525         r = VppIpRoute(
526             self,
527             p.remote_tun_if_host4,
528             32,
529             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
530         )
531         r.add_vpp_config()
532
533     def unconfig_network(self, p):
534         p.route.remove_vpp_config()
535         p.tun_if.remove_vpp_config()
536
537     def unconfig_protect(self, p):
538         p.tun_protect.remove_vpp_config()
539
540     def unconfig_sa(self, p):
541         p.tun_sa_out.remove_vpp_config()
542         p.tun_sa_in.remove_vpp_config()
543
544
545 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
546     """IPsec tunnel interface tests"""
547
548     encryption_type = ESP
549
550     def setUp(self):
551         super(TemplateIpsec6TunIfEsp, self).setUp()
552
553         self.tun_if = self.pg0
554
555         p = self.ipv6_params
556         self.config_network(p)
557         self.config_sa_tra(p)
558         self.config_protect(p)
559
560     def tearDown(self):
561         super(TemplateIpsec6TunIfEsp, self).tearDown()
562
563
564 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
565     """Ipsec ESP - TUN tests"""
566
567     tun6_encrypt_node_name = "esp6-encrypt-tun"
568     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
569
570     def test_tun_basic46(self):
571         """ipsec 4o6 tunnel basic test"""
572         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
573         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
574
575     def test_tun_burst46(self):
576         """ipsec 4o6 tunnel burst test"""
577         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
578         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
579
580
581 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
582     """Ipsec ESP 6 Handoff tests"""
583
584     tun6_encrypt_node_name = "esp6-encrypt-tun"
585     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
586
587     def test_tun_handoff_66_police(self):
588         """ESP 6o6 tunnel with policer worker hand-off test"""
589         self.vapi.cli("clear errors")
590         self.vapi.cli("clear ipsec sa")
591
592         N_PKTS = 15
593         p = self.params[socket.AF_INET6]
594
595         action_tx = PolicerAction(
596             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
597         )
598         policer = VppPolicer(
599             self,
600             "pol1",
601             80,
602             0,
603             1000,
604             0,
605             conform_action=action_tx,
606             exceed_action=action_tx,
607             violate_action=action_tx,
608         )
609         policer.add_vpp_config()
610
611         # Start policing on tun
612         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
613
614         for pol_bind in [1, 0]:
615             policer.bind_vpp_config(pol_bind, True)
616
617             # inject alternately on worker 0 and 1.
618             for worker in [0, 1, 0, 1]:
619                 send_pkts = self.gen_encrypt_pkts6(
620                     p,
621                     p.scapy_tun_sa,
622                     self.tun_if,
623                     src=p.remote_tun_if_host,
624                     dst=self.pg1.remote_ip6,
625                     count=N_PKTS,
626                 )
627                 recv_pkts = self.send_and_expect(
628                     self.tun_if, send_pkts, self.pg1, worker=worker
629                 )
630                 self.verify_decrypted6(p, recv_pkts)
631                 self.logger.debug(self.vapi.cli("show trace max 100"))
632
633             stats = policer.get_stats()
634             stats0 = policer.get_stats(worker=0)
635             stats1 = policer.get_stats(worker=1)
636
637             if pol_bind == 1:
638                 # First pass: Worker 1, should have done all the policing
639                 self.assertEqual(stats, stats1)
640
641                 # Worker 0, should have handed everything off
642                 self.assertEqual(stats0["conform_packets"], 0)
643                 self.assertEqual(stats0["exceed_packets"], 0)
644                 self.assertEqual(stats0["violate_packets"], 0)
645             else:
646                 # Second pass: both workers should have policed equal amounts
647                 self.assertGreater(stats1["conform_packets"], 0)
648                 self.assertEqual(stats1["exceed_packets"], 0)
649                 self.assertGreater(stats1["violate_packets"], 0)
650
651                 self.assertGreater(stats0["conform_packets"], 0)
652                 self.assertEqual(stats0["exceed_packets"], 0)
653                 self.assertGreater(stats0["violate_packets"], 0)
654
655                 self.assertEqual(
656                     stats0["conform_packets"] + stats0["violate_packets"],
657                     stats1["conform_packets"] + stats1["violate_packets"],
658                 )
659
660         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
661         policer.remove_vpp_config()
662
663
664 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
665     """Ipsec ESP 4 Handoff tests"""
666
667     tun4_encrypt_node_name = "esp4-encrypt-tun"
668     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
669
670     def test_tun_handoff_44_police(self):
671         """ESP 4o4 tunnel with policer worker hand-off test"""
672         self.vapi.cli("clear errors")
673         self.vapi.cli("clear ipsec sa")
674
675         N_PKTS = 15
676         p = self.params[socket.AF_INET]
677
678         action_tx = PolicerAction(
679             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
680         )
681         policer = VppPolicer(
682             self,
683             "pol1",
684             80,
685             0,
686             1000,
687             0,
688             conform_action=action_tx,
689             exceed_action=action_tx,
690             violate_action=action_tx,
691         )
692         policer.add_vpp_config()
693
694         # Start policing on tun
695         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
696
697         for pol_bind in [1, 0]:
698             policer.bind_vpp_config(pol_bind, True)
699
700             # inject alternately on worker 0 and 1.
701             for worker in [0, 1, 0, 1]:
702                 send_pkts = self.gen_encrypt_pkts(
703                     p,
704                     p.scapy_tun_sa,
705                     self.tun_if,
706                     src=p.remote_tun_if_host,
707                     dst=self.pg1.remote_ip4,
708                     count=N_PKTS,
709                 )
710                 recv_pkts = self.send_and_expect(
711                     self.tun_if, send_pkts, self.pg1, worker=worker
712                 )
713                 self.verify_decrypted(p, recv_pkts)
714                 self.logger.debug(self.vapi.cli("show trace max 100"))
715
716             stats = policer.get_stats()
717             stats0 = policer.get_stats(worker=0)
718             stats1 = policer.get_stats(worker=1)
719
720             if pol_bind == 1:
721                 # First pass: Worker 1, should have done all the policing
722                 self.assertEqual(stats, stats1)
723
724                 # Worker 0, should have handed everything off
725                 self.assertEqual(stats0["conform_packets"], 0)
726                 self.assertEqual(stats0["exceed_packets"], 0)
727                 self.assertEqual(stats0["violate_packets"], 0)
728             else:
729                 # Second pass: both workers should have policed equal amounts
730                 self.assertGreater(stats1["conform_packets"], 0)
731                 self.assertEqual(stats1["exceed_packets"], 0)
732                 self.assertGreater(stats1["violate_packets"], 0)
733
734                 self.assertGreater(stats0["conform_packets"], 0)
735                 self.assertEqual(stats0["exceed_packets"], 0)
736                 self.assertGreater(stats0["violate_packets"], 0)
737
738                 self.assertEqual(
739                     stats0["conform_packets"] + stats0["violate_packets"],
740                     stats1["conform_packets"] + stats1["violate_packets"],
741                 )
742
743         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
744         policer.remove_vpp_config()
745
746
747 @tag_fixme_vpp_workers
748 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
749     """IPsec IPv4 Multi Tunnel interface"""
750
751     encryption_type = ESP
752     tun4_encrypt_node_name = "esp4-encrypt-tun"
753     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
754
755     def setUp(self):
756         super(TestIpsec4MultiTunIfEsp, self).setUp()
757
758         self.tun_if = self.pg0
759
760         self.multi_params = []
761         self.pg0.generate_remote_hosts(10)
762         self.pg0.configure_ipv4_neighbors()
763
764         for ii in range(10):
765             p = copy.copy(self.ipv4_params)
766
767             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
768             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
769             p.scapy_tun_spi = p.scapy_tun_spi + ii
770             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
771             p.vpp_tun_spi = p.vpp_tun_spi + ii
772
773             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
774             p.scapy_tra_spi = p.scapy_tra_spi + ii
775             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
776             p.vpp_tra_spi = p.vpp_tra_spi + ii
777             p.tun_dst = self.pg0.remote_hosts[ii].ip4
778
779             self.multi_params.append(p)
780             self.config_network(p)
781             self.config_sa_tra(p)
782             self.config_protect(p)
783
784     def tearDown(self):
785         super(TestIpsec4MultiTunIfEsp, self).tearDown()
786
787     def test_tun_44(self):
788         """Multiple IPSEC tunnel interfaces"""
789         for p in self.multi_params:
790             self.verify_tun_44(p, count=127)
791             self.assertEqual(p.tun_if.get_rx_stats(), 127)
792             self.assertEqual(p.tun_if.get_tx_stats(), 127)
793
794     def test_tun_rr_44(self):
795         """Round-robin packets acrros multiple interface"""
796         tx = []
797         for p in self.multi_params:
798             tx = tx + self.gen_encrypt_pkts(
799                 p,
800                 p.scapy_tun_sa,
801                 self.tun_if,
802                 src=p.remote_tun_if_host,
803                 dst=self.pg1.remote_ip4,
804             )
805         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
806
807         for rx, p in zip(rxs, self.multi_params):
808             self.verify_decrypted(p, [rx])
809
810         tx = []
811         for p in self.multi_params:
812             tx = tx + self.gen_pkts(
813                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
814             )
815         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
816
817         for rx, p in zip(rxs, self.multi_params):
818             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
819
820
821 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
822     """IPsec IPv4 Tunnel interface all Algos"""
823
824     encryption_type = ESP
825     tun4_encrypt_node_name = "esp4-encrypt-tun"
826     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
827
828     def setUp(self):
829         super(TestIpsec4TunIfEspAll, self).setUp()
830
831         self.tun_if = self.pg0
832         p = self.ipv4_params
833
834         self.config_network(p)
835         self.config_sa_tra(p)
836         self.config_protect(p)
837
838     def tearDown(self):
839         p = self.ipv4_params
840         self.unconfig_protect(p)
841         self.unconfig_network(p)
842         self.unconfig_sa(p)
843
844         super(TestIpsec4TunIfEspAll, self).tearDown()
845
846     def rekey(self, p):
847         #
848         # change the key and the SPI
849         #
850         np = copy.copy(p)
851         p.crypt_key = b"X" + p.crypt_key[1:]
852         p.scapy_tun_spi += 1
853         p.scapy_tun_sa_id += 1
854         p.vpp_tun_spi += 1
855         p.vpp_tun_sa_id += 1
856         p.tun_if.local_spi = p.vpp_tun_spi
857         p.tun_if.remote_spi = p.scapy_tun_spi
858
859         config_tun_params(p, self.encryption_type, p.tun_if)
860
861         p.tun_sa_out = VppIpsecSA(
862             self,
863             p.scapy_tun_sa_id,
864             p.scapy_tun_spi,
865             p.auth_algo_vpp_id,
866             p.auth_key,
867             p.crypt_algo_vpp_id,
868             p.crypt_key,
869             self.vpp_esp_protocol,
870             flags=p.flags,
871             salt=p.salt,
872         )
873         p.tun_sa_in = VppIpsecSA(
874             self,
875             p.vpp_tun_sa_id,
876             p.vpp_tun_spi,
877             p.auth_algo_vpp_id,
878             p.auth_key,
879             p.crypt_algo_vpp_id,
880             p.crypt_key,
881             self.vpp_esp_protocol,
882             flags=p.flags,
883             salt=p.salt,
884         )
885         p.tun_sa_in.add_vpp_config()
886         p.tun_sa_out.add_vpp_config()
887
888         self.config_protect(p)
889         np.tun_sa_out.remove_vpp_config()
890         np.tun_sa_in.remove_vpp_config()
891         self.logger.info(self.vapi.cli("sh ipsec sa"))
892
893     def test_tun_44(self):
894         """IPSEC tunnel all algos"""
895
896         # foreach VPP crypto engine
897         engines = ["ia32", "ipsecmb", "openssl"]
898
899         # foreach crypto algorithm
900         algos = [
901             {
902                 "vpp-crypto": (
903                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
904                 ),
905                 "vpp-integ": (
906                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
907                 ),
908                 "scapy-crypto": "AES-GCM",
909                 "scapy-integ": "NULL",
910                 "key": b"JPjyOWBeVEQiMe7h",
911                 "salt": 3333,
912             },
913             {
914                 "vpp-crypto": (
915                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
916                 ),
917                 "vpp-integ": (
918                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
919                 ),
920                 "scapy-crypto": "AES-GCM",
921                 "scapy-integ": "NULL",
922                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
923                 "salt": 0,
924             },
925             {
926                 "vpp-crypto": (
927                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
928                 ),
929                 "vpp-integ": (
930                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
931                 ),
932                 "scapy-crypto": "AES-GCM",
933                 "scapy-integ": "NULL",
934                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
935                 "salt": 9999,
936             },
937             {
938                 "vpp-crypto": (
939                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
940                 ),
941                 "vpp-integ": (
942                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
943                 ),
944                 "scapy-crypto": "AES-CBC",
945                 "scapy-integ": "HMAC-SHA1-96",
946                 "salt": 0,
947                 "key": b"JPjyOWBeVEQiMe7h",
948             },
949             {
950                 "vpp-crypto": (
951                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
952                 ),
953                 "vpp-integ": (
954                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
955                 ),
956                 "scapy-crypto": "AES-CBC",
957                 "scapy-integ": "SHA2-512-256",
958                 "salt": 0,
959                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
960             },
961             {
962                 "vpp-crypto": (
963                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
964                 ),
965                 "vpp-integ": (
966                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
967                 ),
968                 "scapy-crypto": "AES-CBC",
969                 "scapy-integ": "SHA2-256-128",
970                 "salt": 0,
971                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
972             },
973             {
974                 "vpp-crypto": (
975                     VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
976                 ),
977                 "vpp-integ": (
978                     VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
979                 ),
980                 "scapy-crypto": "NULL",
981                 "scapy-integ": "HMAC-SHA1-96",
982                 "salt": 0,
983                 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
984             },
985         ]
986
987         for engine in engines:
988             self.vapi.cli("set crypto handler all %s" % engine)
989
990             #
991             # loop through each of the algorithms
992             #
993             for algo in algos:
994                 # with self.subTest(algo=algo['scapy']):
995
996                 p = self.ipv4_params
997                 p.auth_algo_vpp_id = algo["vpp-integ"]
998                 p.crypt_algo_vpp_id = algo["vpp-crypto"]
999                 p.crypt_algo = algo["scapy-crypto"]
1000                 p.auth_algo = algo["scapy-integ"]
1001                 p.crypt_key = algo["key"]
1002                 p.salt = algo["salt"]
1003
1004                 #
1005                 # rekey the tunnel
1006                 #
1007                 self.rekey(p)
1008                 self.verify_tun_44(p, count=127)
1009
1010
1011 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1012     """IPsec IPv4 Tunnel interface no Algos"""
1013
1014     encryption_type = ESP
1015     tun4_encrypt_node_name = "esp4-encrypt-tun"
1016     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1017
1018     def setUp(self):
1019         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1020
1021         self.tun_if = self.pg0
1022         p = self.ipv4_params
1023         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1024         p.auth_algo = "NULL"
1025         p.auth_key = []
1026
1027         p.crypt_algo_vpp_id = (
1028             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1029         )
1030         p.crypt_algo = "NULL"
1031         p.crypt_key = []
1032
1033     def tearDown(self):
1034         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1035
1036     def test_tun_44(self):
1037         """IPSec SA with NULL algos"""
1038         p = self.ipv4_params
1039
1040         self.config_network(p)
1041         self.config_sa_tra(p)
1042         self.config_protect(p)
1043
1044         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1045         self.send_and_assert_no_replies(self.pg1, tx)
1046
1047         self.unconfig_protect(p)
1048         self.unconfig_sa(p)
1049         self.unconfig_network(p)
1050
1051
1052 @tag_fixme_vpp_workers
1053 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1054     """IPsec IPv6 Multi Tunnel interface"""
1055
1056     encryption_type = ESP
1057     tun6_encrypt_node_name = "esp6-encrypt-tun"
1058     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1059
1060     def setUp(self):
1061         super(TestIpsec6MultiTunIfEsp, self).setUp()
1062
1063         self.tun_if = self.pg0
1064
1065         self.multi_params = []
1066         self.pg0.generate_remote_hosts(10)
1067         self.pg0.configure_ipv6_neighbors()
1068
1069         for ii in range(10):
1070             p = copy.copy(self.ipv6_params)
1071
1072             p.remote_tun_if_host = "1111::%d" % (ii + 1)
1073             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1074             p.scapy_tun_spi = p.scapy_tun_spi + ii
1075             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1076             p.vpp_tun_spi = p.vpp_tun_spi + ii
1077
1078             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1079             p.scapy_tra_spi = p.scapy_tra_spi + ii
1080             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1081             p.vpp_tra_spi = p.vpp_tra_spi + ii
1082             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1083
1084             self.multi_params.append(p)
1085             self.config_network(p)
1086             self.config_sa_tra(p)
1087             self.config_protect(p)
1088
1089     def tearDown(self):
1090         super(TestIpsec6MultiTunIfEsp, self).tearDown()
1091
1092     def test_tun_66(self):
1093         """Multiple IPSEC tunnel interfaces"""
1094         for p in self.multi_params:
1095             self.verify_tun_66(p, count=127)
1096             self.assertEqual(p.tun_if.get_rx_stats(), 127)
1097             self.assertEqual(p.tun_if.get_tx_stats(), 127)
1098
1099
1100 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1101     """Ipsec GRE TEB ESP - TUN tests"""
1102
1103     tun4_encrypt_node_name = "esp4-encrypt-tun"
1104     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1105     encryption_type = ESP
1106     omac = "00:11:22:33:44:55"
1107
1108     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1109         return [
1110             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1111             / sa.encrypt(
1112                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1113                 / GRE()
1114                 / Ether(dst=self.omac)
1115                 / IP(src="1.1.1.1", dst="1.1.1.2")
1116                 / UDP(sport=1144, dport=2233)
1117                 / Raw(b"X" * payload_size)
1118             )
1119             for i in range(count)
1120         ]
1121
1122     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1123         return [
1124             Ether(dst=self.omac)
1125             / IP(src="1.1.1.1", dst="1.1.1.2")
1126             / UDP(sport=1144, dport=2233)
1127             / Raw(b"X" * payload_size)
1128             for i in range(count)
1129         ]
1130
1131     def verify_decrypted(self, p, rxs):
1132         for rx in rxs:
1133             self.assert_equal(rx[Ether].dst, self.omac)
1134             self.assert_equal(rx[IP].dst, "1.1.1.2")
1135
1136     def verify_encrypted(self, p, sa, rxs):
1137         for rx in rxs:
1138             try:
1139                 pkt = sa.decrypt(rx[IP])
1140                 if not pkt.haslayer(IP):
1141                     pkt = IP(pkt[Raw].load)
1142                 self.assert_packet_checksums_valid(pkt)
1143                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1144                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1145                 self.assertTrue(pkt.haslayer(GRE))
1146                 e = pkt[Ether]
1147                 self.assertEqual(e[Ether].dst, self.omac)
1148                 self.assertEqual(e[IP].dst, "1.1.1.2")
1149             except (IndexError, AssertionError):
1150                 self.logger.debug(ppp("Unexpected packet:", rx))
1151                 try:
1152                     self.logger.debug(ppp("Decrypted packet:", pkt))
1153                 except:
1154                     pass
1155                 raise
1156
1157     def setUp(self):
1158         super(TestIpsecGreTebIfEsp, self).setUp()
1159
1160         self.tun_if = self.pg0
1161
1162         p = self.ipv4_params
1163
1164         bd1 = VppBridgeDomain(self, 1)
1165         bd1.add_vpp_config()
1166
1167         p.tun_sa_out = VppIpsecSA(
1168             self,
1169             p.scapy_tun_sa_id,
1170             p.scapy_tun_spi,
1171             p.auth_algo_vpp_id,
1172             p.auth_key,
1173             p.crypt_algo_vpp_id,
1174             p.crypt_key,
1175             self.vpp_esp_protocol,
1176             self.pg0.local_ip4,
1177             self.pg0.remote_ip4,
1178         )
1179         p.tun_sa_out.add_vpp_config()
1180
1181         p.tun_sa_in = VppIpsecSA(
1182             self,
1183             p.vpp_tun_sa_id,
1184             p.vpp_tun_spi,
1185             p.auth_algo_vpp_id,
1186             p.auth_key,
1187             p.crypt_algo_vpp_id,
1188             p.crypt_key,
1189             self.vpp_esp_protocol,
1190             self.pg0.remote_ip4,
1191             self.pg0.local_ip4,
1192         )
1193         p.tun_sa_in.add_vpp_config()
1194
1195         p.tun_if = VppGreInterface(
1196             self,
1197             self.pg0.local_ip4,
1198             self.pg0.remote_ip4,
1199             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1200         )
1201         p.tun_if.add_vpp_config()
1202
1203         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1204
1205         p.tun_protect.add_vpp_config()
1206
1207         p.tun_if.admin_up()
1208         p.tun_if.config_ip4()
1209         config_tun_params(p, self.encryption_type, p.tun_if)
1210
1211         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1212         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1213
1214         self.vapi.cli("clear ipsec sa")
1215         self.vapi.cli("sh adj")
1216         self.vapi.cli("sh ipsec tun")
1217
1218     def tearDown(self):
1219         p = self.ipv4_params
1220         p.tun_if.unconfig_ip4()
1221         super(TestIpsecGreTebIfEsp, self).tearDown()
1222
1223
1224 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1225     """Ipsec GRE TEB ESP - TUN tests"""
1226
1227     tun4_encrypt_node_name = "esp4-encrypt-tun"
1228     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1229     encryption_type = ESP
1230     omac = "00:11:22:33:44:55"
1231
1232     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1233         return [
1234             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1235             / sa.encrypt(
1236                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1237                 / GRE()
1238                 / Ether(dst=self.omac)
1239                 / IP(src="1.1.1.1", dst="1.1.1.2")
1240                 / UDP(sport=1144, dport=2233)
1241                 / Raw(b"X" * payload_size)
1242             )
1243             for i in range(count)
1244         ]
1245
1246     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1247         return [
1248             Ether(dst=self.omac)
1249             / Dot1Q(vlan=11)
1250             / IP(src="1.1.1.1", dst="1.1.1.2")
1251             / UDP(sport=1144, dport=2233)
1252             / Raw(b"X" * payload_size)
1253             for i in range(count)
1254         ]
1255
1256     def verify_decrypted(self, p, rxs):
1257         for rx in rxs:
1258             self.assert_equal(rx[Ether].dst, self.omac)
1259             self.assert_equal(rx[Dot1Q].vlan, 11)
1260             self.assert_equal(rx[IP].dst, "1.1.1.2")
1261
1262     def verify_encrypted(self, p, sa, rxs):
1263         for rx in rxs:
1264             try:
1265                 pkt = sa.decrypt(rx[IP])
1266                 if not pkt.haslayer(IP):
1267                     pkt = IP(pkt[Raw].load)
1268                 self.assert_packet_checksums_valid(pkt)
1269                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1270                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1271                 self.assertTrue(pkt.haslayer(GRE))
1272                 e = pkt[Ether]
1273                 self.assertEqual(e[Ether].dst, self.omac)
1274                 self.assertFalse(e.haslayer(Dot1Q))
1275                 self.assertEqual(e[IP].dst, "1.1.1.2")
1276             except (IndexError, AssertionError):
1277                 self.logger.debug(ppp("Unexpected packet:", rx))
1278                 try:
1279                     self.logger.debug(ppp("Decrypted packet:", pkt))
1280                 except:
1281                     pass
1282                 raise
1283
1284     def setUp(self):
1285         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1286
1287         self.tun_if = self.pg0
1288
1289         p = self.ipv4_params
1290
1291         bd1 = VppBridgeDomain(self, 1)
1292         bd1.add_vpp_config()
1293
1294         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1295         self.vapi.l2_interface_vlan_tag_rewrite(
1296             sw_if_index=self.pg1_11.sw_if_index,
1297             vtr_op=L2_VTR_OP.L2_POP_1,
1298             push_dot1q=11,
1299         )
1300         self.pg1_11.admin_up()
1301
1302         p.tun_sa_out = VppIpsecSA(
1303             self,
1304             p.scapy_tun_sa_id,
1305             p.scapy_tun_spi,
1306             p.auth_algo_vpp_id,
1307             p.auth_key,
1308             p.crypt_algo_vpp_id,
1309             p.crypt_key,
1310             self.vpp_esp_protocol,
1311             self.pg0.local_ip4,
1312             self.pg0.remote_ip4,
1313         )
1314         p.tun_sa_out.add_vpp_config()
1315
1316         p.tun_sa_in = VppIpsecSA(
1317             self,
1318             p.vpp_tun_sa_id,
1319             p.vpp_tun_spi,
1320             p.auth_algo_vpp_id,
1321             p.auth_key,
1322             p.crypt_algo_vpp_id,
1323             p.crypt_key,
1324             self.vpp_esp_protocol,
1325             self.pg0.remote_ip4,
1326             self.pg0.local_ip4,
1327         )
1328         p.tun_sa_in.add_vpp_config()
1329
1330         p.tun_if = VppGreInterface(
1331             self,
1332             self.pg0.local_ip4,
1333             self.pg0.remote_ip4,
1334             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1335         )
1336         p.tun_if.add_vpp_config()
1337
1338         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1339
1340         p.tun_protect.add_vpp_config()
1341
1342         p.tun_if.admin_up()
1343         p.tun_if.config_ip4()
1344         config_tun_params(p, self.encryption_type, p.tun_if)
1345
1346         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1347         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1348
1349         self.vapi.cli("clear ipsec sa")
1350
1351     def tearDown(self):
1352         p = self.ipv4_params
1353         p.tun_if.unconfig_ip4()
1354         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1355         self.pg1_11.admin_down()
1356         self.pg1_11.remove_vpp_config()
1357
1358
1359 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1360     """Ipsec GRE TEB ESP - Tra tests"""
1361
1362     tun4_encrypt_node_name = "esp4-encrypt-tun"
1363     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1364     encryption_type = ESP
1365     omac = "00:11:22:33:44:55"
1366
1367     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1368         return [
1369             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1370             / sa.encrypt(
1371                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1372                 / GRE()
1373                 / Ether(dst=self.omac)
1374                 / IP(src="1.1.1.1", dst="1.1.1.2")
1375                 / UDP(sport=1144, dport=2233)
1376                 / Raw(b"X" * payload_size)
1377             )
1378             for i in range(count)
1379         ]
1380
1381     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1382         return [
1383             Ether(dst=self.omac)
1384             / IP(src="1.1.1.1", dst="1.1.1.2")
1385             / UDP(sport=1144, dport=2233)
1386             / Raw(b"X" * payload_size)
1387             for i in range(count)
1388         ]
1389
1390     def verify_decrypted(self, p, rxs):
1391         for rx in rxs:
1392             self.assert_equal(rx[Ether].dst, self.omac)
1393             self.assert_equal(rx[IP].dst, "1.1.1.2")
1394
1395     def verify_encrypted(self, p, sa, rxs):
1396         for rx in rxs:
1397             try:
1398                 pkt = sa.decrypt(rx[IP])
1399                 if not pkt.haslayer(IP):
1400                     pkt = IP(pkt[Raw].load)
1401                 self.assert_packet_checksums_valid(pkt)
1402                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1403                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1404                 self.assertTrue(pkt.haslayer(GRE))
1405                 e = pkt[Ether]
1406                 self.assertEqual(e[Ether].dst, self.omac)
1407                 self.assertEqual(e[IP].dst, "1.1.1.2")
1408             except (IndexError, AssertionError):
1409                 self.logger.debug(ppp("Unexpected packet:", rx))
1410                 try:
1411                     self.logger.debug(ppp("Decrypted packet:", pkt))
1412                 except:
1413                     pass
1414                 raise
1415
1416     def setUp(self):
1417         super(TestIpsecGreTebIfEspTra, self).setUp()
1418
1419         self.tun_if = self.pg0
1420
1421         p = self.ipv4_params
1422
1423         bd1 = VppBridgeDomain(self, 1)
1424         bd1.add_vpp_config()
1425
1426         p.tun_sa_out = VppIpsecSA(
1427             self,
1428             p.scapy_tun_sa_id,
1429             p.scapy_tun_spi,
1430             p.auth_algo_vpp_id,
1431             p.auth_key,
1432             p.crypt_algo_vpp_id,
1433             p.crypt_key,
1434             self.vpp_esp_protocol,
1435         )
1436         p.tun_sa_out.add_vpp_config()
1437
1438         p.tun_sa_in = VppIpsecSA(
1439             self,
1440             p.vpp_tun_sa_id,
1441             p.vpp_tun_spi,
1442             p.auth_algo_vpp_id,
1443             p.auth_key,
1444             p.crypt_algo_vpp_id,
1445             p.crypt_key,
1446             self.vpp_esp_protocol,
1447         )
1448         p.tun_sa_in.add_vpp_config()
1449
1450         p.tun_if = VppGreInterface(
1451             self,
1452             self.pg0.local_ip4,
1453             self.pg0.remote_ip4,
1454             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1455         )
1456         p.tun_if.add_vpp_config()
1457
1458         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1459
1460         p.tun_protect.add_vpp_config()
1461
1462         p.tun_if.admin_up()
1463         p.tun_if.config_ip4()
1464         config_tra_params(p, self.encryption_type, p.tun_if)
1465
1466         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1467         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1468
1469         self.vapi.cli("clear ipsec sa")
1470
1471     def tearDown(self):
1472         p = self.ipv4_params
1473         p.tun_if.unconfig_ip4()
1474         super(TestIpsecGreTebIfEspTra, self).tearDown()
1475
1476
1477 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1478     """Ipsec GRE TEB UDP ESP - Tra tests"""
1479
1480     tun4_encrypt_node_name = "esp4-encrypt-tun"
1481     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1482     encryption_type = ESP
1483     omac = "00:11:22:33:44:55"
1484
1485     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1486         return [
1487             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1488             / sa.encrypt(
1489                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1490                 / GRE()
1491                 / Ether(dst=self.omac)
1492                 / IP(src="1.1.1.1", dst="1.1.1.2")
1493                 / UDP(sport=1144, dport=2233)
1494                 / Raw(b"X" * payload_size)
1495             )
1496             for i in range(count)
1497         ]
1498
1499     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1500         return [
1501             Ether(dst=self.omac)
1502             / IP(src="1.1.1.1", dst="1.1.1.2")
1503             / UDP(sport=1144, dport=2233)
1504             / Raw(b"X" * payload_size)
1505             for i in range(count)
1506         ]
1507
1508     def verify_decrypted(self, p, rxs):
1509         for rx in rxs:
1510             self.assert_equal(rx[Ether].dst, self.omac)
1511             self.assert_equal(rx[IP].dst, "1.1.1.2")
1512
1513     def verify_encrypted(self, p, sa, rxs):
1514         for rx in rxs:
1515             self.assertTrue(rx.haslayer(UDP))
1516             self.assertEqual(rx[UDP].dport, 4545)
1517             self.assertEqual(rx[UDP].sport, 5454)
1518             try:
1519                 pkt = sa.decrypt(rx[IP])
1520                 if not pkt.haslayer(IP):
1521                     pkt = IP(pkt[Raw].load)
1522                 self.assert_packet_checksums_valid(pkt)
1523                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1524                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1525                 self.assertTrue(pkt.haslayer(GRE))
1526                 e = pkt[Ether]
1527                 self.assertEqual(e[Ether].dst, self.omac)
1528                 self.assertEqual(e[IP].dst, "1.1.1.2")
1529             except (IndexError, AssertionError):
1530                 self.logger.debug(ppp("Unexpected packet:", rx))
1531                 try:
1532                     self.logger.debug(ppp("Decrypted packet:", pkt))
1533                 except:
1534                     pass
1535                 raise
1536
1537     def setUp(self):
1538         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1539
1540         self.tun_if = self.pg0
1541
1542         p = self.ipv4_params
1543         p = self.ipv4_params
1544         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1545         p.nat_header = UDP(sport=5454, dport=4545)
1546
1547         bd1 = VppBridgeDomain(self, 1)
1548         bd1.add_vpp_config()
1549
1550         p.tun_sa_out = VppIpsecSA(
1551             self,
1552             p.scapy_tun_sa_id,
1553             p.scapy_tun_spi,
1554             p.auth_algo_vpp_id,
1555             p.auth_key,
1556             p.crypt_algo_vpp_id,
1557             p.crypt_key,
1558             self.vpp_esp_protocol,
1559             flags=p.flags,
1560             udp_src=5454,
1561             udp_dst=4545,
1562         )
1563         p.tun_sa_out.add_vpp_config()
1564
1565         p.tun_sa_in = VppIpsecSA(
1566             self,
1567             p.vpp_tun_sa_id,
1568             p.vpp_tun_spi,
1569             p.auth_algo_vpp_id,
1570             p.auth_key,
1571             p.crypt_algo_vpp_id,
1572             p.crypt_key,
1573             self.vpp_esp_protocol,
1574             flags=(
1575                 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1576             ),
1577             udp_src=4545,
1578             udp_dst=5454,
1579         )
1580         p.tun_sa_in.add_vpp_config()
1581
1582         p.tun_if = VppGreInterface(
1583             self,
1584             self.pg0.local_ip4,
1585             self.pg0.remote_ip4,
1586             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1587         )
1588         p.tun_if.add_vpp_config()
1589
1590         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1591
1592         p.tun_protect.add_vpp_config()
1593
1594         p.tun_if.admin_up()
1595         p.tun_if.config_ip4()
1596         config_tra_params(p, self.encryption_type, p.tun_if)
1597
1598         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1599         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1600
1601         self.vapi.cli("clear ipsec sa")
1602         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1603
1604     def tearDown(self):
1605         p = self.ipv4_params
1606         p.tun_if.unconfig_ip4()
1607         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1608
1609
1610 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1611     """Ipsec GRE ESP - TUN tests"""
1612
1613     tun4_encrypt_node_name = "esp4-encrypt-tun"
1614     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1615     encryption_type = ESP
1616
1617     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1618         return [
1619             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1620             / sa.encrypt(
1621                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1622                 / GRE()
1623                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1624                 / UDP(sport=1144, dport=2233)
1625                 / Raw(b"X" * payload_size)
1626             )
1627             for i in range(count)
1628         ]
1629
1630     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1631         return [
1632             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1633             / IP(src="1.1.1.1", dst="1.1.1.2")
1634             / UDP(sport=1144, dport=2233)
1635             / Raw(b"X" * payload_size)
1636             for i in range(count)
1637         ]
1638
1639     def verify_decrypted(self, p, rxs):
1640         for rx in rxs:
1641             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1642             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1643
1644     def verify_encrypted(self, p, sa, rxs):
1645         for rx in rxs:
1646             try:
1647                 pkt = sa.decrypt(rx[IP])
1648                 if not pkt.haslayer(IP):
1649                     pkt = IP(pkt[Raw].load)
1650                 self.assert_packet_checksums_valid(pkt)
1651                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1652                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1653                 self.assertTrue(pkt.haslayer(GRE))
1654                 e = pkt[GRE]
1655                 self.assertEqual(e[IP].dst, "1.1.1.2")
1656             except (IndexError, AssertionError):
1657                 self.logger.debug(ppp("Unexpected packet:", rx))
1658                 try:
1659                     self.logger.debug(ppp("Decrypted packet:", pkt))
1660                 except:
1661                     pass
1662                 raise
1663
1664     def setUp(self):
1665         super(TestIpsecGreIfEsp, self).setUp()
1666
1667         self.tun_if = self.pg0
1668
1669         p = self.ipv4_params
1670
1671         bd1 = VppBridgeDomain(self, 1)
1672         bd1.add_vpp_config()
1673
1674         p.tun_sa_out = VppIpsecSA(
1675             self,
1676             p.scapy_tun_sa_id,
1677             p.scapy_tun_spi,
1678             p.auth_algo_vpp_id,
1679             p.auth_key,
1680             p.crypt_algo_vpp_id,
1681             p.crypt_key,
1682             self.vpp_esp_protocol,
1683             self.pg0.local_ip4,
1684             self.pg0.remote_ip4,
1685         )
1686         p.tun_sa_out.add_vpp_config()
1687
1688         p.tun_sa_in = VppIpsecSA(
1689             self,
1690             p.vpp_tun_sa_id,
1691             p.vpp_tun_spi,
1692             p.auth_algo_vpp_id,
1693             p.auth_key,
1694             p.crypt_algo_vpp_id,
1695             p.crypt_key,
1696             self.vpp_esp_protocol,
1697             self.pg0.remote_ip4,
1698             self.pg0.local_ip4,
1699         )
1700         p.tun_sa_in.add_vpp_config()
1701
1702         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1703         p.tun_if.add_vpp_config()
1704
1705         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1706         p.tun_protect.add_vpp_config()
1707
1708         p.tun_if.admin_up()
1709         p.tun_if.config_ip4()
1710         config_tun_params(p, self.encryption_type, p.tun_if)
1711
1712         VppIpRoute(
1713             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1714         ).add_vpp_config()
1715
1716     def tearDown(self):
1717         p = self.ipv4_params
1718         p.tun_if.unconfig_ip4()
1719         super(TestIpsecGreIfEsp, self).tearDown()
1720
1721
1722 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1723     """Ipsec GRE ESP - TRA tests"""
1724
1725     tun4_encrypt_node_name = "esp4-encrypt-tun"
1726     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1727     encryption_type = ESP
1728
1729     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1730         return [
1731             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1732             / sa.encrypt(
1733                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1734                 / GRE()
1735                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1736                 / UDP(sport=1144, dport=2233)
1737                 / Raw(b"X" * payload_size)
1738             )
1739             for i in range(count)
1740         ]
1741
1742     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1743         return [
1744             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1745             / sa.encrypt(
1746                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1747                 / GRE()
1748                 / UDP(sport=1144, dport=2233)
1749                 / Raw(b"X" * payload_size)
1750             )
1751             for i in range(count)
1752         ]
1753
1754     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1755         return [
1756             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1757             / IP(src="1.1.1.1", dst="1.1.1.2")
1758             / UDP(sport=1144, dport=2233)
1759             / Raw(b"X" * payload_size)
1760             for i in range(count)
1761         ]
1762
1763     def verify_decrypted(self, p, rxs):
1764         for rx in rxs:
1765             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1766             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1767
1768     def verify_encrypted(self, p, sa, rxs):
1769         for rx in rxs:
1770             try:
1771                 pkt = sa.decrypt(rx[IP])
1772                 if not pkt.haslayer(IP):
1773                     pkt = IP(pkt[Raw].load)
1774                 self.assert_packet_checksums_valid(pkt)
1775                 self.assertTrue(pkt.haslayer(GRE))
1776                 e = pkt[GRE]
1777                 self.assertEqual(e[IP].dst, "1.1.1.2")
1778             except (IndexError, AssertionError):
1779                 self.logger.debug(ppp("Unexpected packet:", rx))
1780                 try:
1781                     self.logger.debug(ppp("Decrypted packet:", pkt))
1782                 except:
1783                     pass
1784                 raise
1785
1786     def setUp(self):
1787         super(TestIpsecGreIfEspTra, self).setUp()
1788
1789         self.tun_if = self.pg0
1790
1791         p = self.ipv4_params
1792
1793         p.tun_sa_out = VppIpsecSA(
1794             self,
1795             p.scapy_tun_sa_id,
1796             p.scapy_tun_spi,
1797             p.auth_algo_vpp_id,
1798             p.auth_key,
1799             p.crypt_algo_vpp_id,
1800             p.crypt_key,
1801             self.vpp_esp_protocol,
1802         )
1803         p.tun_sa_out.add_vpp_config()
1804
1805         p.tun_sa_in = VppIpsecSA(
1806             self,
1807             p.vpp_tun_sa_id,
1808             p.vpp_tun_spi,
1809             p.auth_algo_vpp_id,
1810             p.auth_key,
1811             p.crypt_algo_vpp_id,
1812             p.crypt_key,
1813             self.vpp_esp_protocol,
1814         )
1815         p.tun_sa_in.add_vpp_config()
1816
1817         p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1818         p.tun_if.add_vpp_config()
1819
1820         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1821         p.tun_protect.add_vpp_config()
1822
1823         p.tun_if.admin_up()
1824         p.tun_if.config_ip4()
1825         config_tra_params(p, self.encryption_type, p.tun_if)
1826
1827         VppIpRoute(
1828             self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1829         ).add_vpp_config()
1830
1831     def tearDown(self):
1832         p = self.ipv4_params
1833         p.tun_if.unconfig_ip4()
1834         super(TestIpsecGreIfEspTra, self).tearDown()
1835
1836     def test_gre_non_ip(self):
1837         p = self.ipv4_params
1838         tx = self.gen_encrypt_non_ip_pkts(
1839             p.scapy_tun_sa,
1840             self.tun_if,
1841             src=p.remote_tun_if_host,
1842             dst=self.pg1.remote_ip6,
1843         )
1844         self.send_and_assert_no_replies(self.tun_if, tx)
1845         node_name = "/err/%s/unsupported payload" % self.tun4_decrypt_node_name[0]
1846         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1847
1848
1849 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1850     """Ipsec GRE ESP - TRA tests"""
1851
1852     tun6_encrypt_node_name = "esp6-encrypt-tun"
1853     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1854     encryption_type = ESP
1855
1856     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1857         return [
1858             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1859             / sa.encrypt(
1860                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
1861                 / GRE()
1862                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
1863                 / UDP(sport=1144, dport=2233)
1864                 / Raw(b"X" * payload_size)
1865             )
1866             for i in range(count)
1867         ]
1868
1869     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
1870         return [
1871             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1872             / IPv6(src="1::1", dst="1::2")
1873             / UDP(sport=1144, dport=2233)
1874             / Raw(b"X" * payload_size)
1875             for i in range(count)
1876         ]
1877
1878     def verify_decrypted6(self, p, rxs):
1879         for rx in rxs:
1880             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1881             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1882
1883     def verify_encrypted6(self, p, sa, rxs):
1884         for rx in rxs:
1885             try:
1886                 pkt = sa.decrypt(rx[IPv6])
1887                 if not pkt.haslayer(IPv6):
1888                     pkt = IPv6(pkt[Raw].load)
1889                 self.assert_packet_checksums_valid(pkt)
1890                 self.assertTrue(pkt.haslayer(GRE))
1891                 e = pkt[GRE]
1892                 self.assertEqual(e[IPv6].dst, "1::2")
1893             except (IndexError, AssertionError):
1894                 self.logger.debug(ppp("Unexpected packet:", rx))
1895                 try:
1896                     self.logger.debug(ppp("Decrypted packet:", pkt))
1897                 except:
1898                     pass
1899                 raise
1900
1901     def setUp(self):
1902         super(TestIpsecGre6IfEspTra, self).setUp()
1903
1904         self.tun_if = self.pg0
1905
1906         p = self.ipv6_params
1907
1908         bd1 = VppBridgeDomain(self, 1)
1909         bd1.add_vpp_config()
1910
1911         p.tun_sa_out = VppIpsecSA(
1912             self,
1913             p.scapy_tun_sa_id,
1914             p.scapy_tun_spi,
1915             p.auth_algo_vpp_id,
1916             p.auth_key,
1917             p.crypt_algo_vpp_id,
1918             p.crypt_key,
1919             self.vpp_esp_protocol,
1920         )
1921         p.tun_sa_out.add_vpp_config()
1922
1923         p.tun_sa_in = VppIpsecSA(
1924             self,
1925             p.vpp_tun_sa_id,
1926             p.vpp_tun_spi,
1927             p.auth_algo_vpp_id,
1928             p.auth_key,
1929             p.crypt_algo_vpp_id,
1930             p.crypt_key,
1931             self.vpp_esp_protocol,
1932         )
1933         p.tun_sa_in.add_vpp_config()
1934
1935         p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
1936         p.tun_if.add_vpp_config()
1937
1938         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1939         p.tun_protect.add_vpp_config()
1940
1941         p.tun_if.admin_up()
1942         p.tun_if.config_ip6()
1943         config_tra_params(p, self.encryption_type, p.tun_if)
1944
1945         r = VppIpRoute(
1946             self,
1947             "1::2",
1948             128,
1949             [
1950                 VppRoutePath(
1951                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
1952                 )
1953             ],
1954         )
1955         r.add_vpp_config()
1956
1957     def tearDown(self):
1958         p = self.ipv6_params
1959         p.tun_if.unconfig_ip6()
1960         super(TestIpsecGre6IfEspTra, self).tearDown()
1961
1962
1963 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1964     """Ipsec mGRE ESP v4 TRA tests"""
1965
1966     tun4_encrypt_node_name = "esp4-encrypt-tun"
1967     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1968     encryption_type = ESP
1969
1970     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1971         return [
1972             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1973             / sa.encrypt(
1974                 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
1975                 / GRE()
1976                 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1977                 / UDP(sport=1144, dport=2233)
1978                 / Raw(b"X" * payload_size)
1979             )
1980             for i in range(count)
1981         ]
1982
1983     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1984         return [
1985             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1986             / IP(src="1.1.1.1", dst=dst)
1987             / UDP(sport=1144, dport=2233)
1988             / Raw(b"X" * payload_size)
1989             for i in range(count)
1990         ]
1991
1992     def verify_decrypted(self, p, rxs):
1993         for rx in rxs:
1994             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1995             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1996
1997     def verify_encrypted(self, p, sa, rxs):
1998         for rx in rxs:
1999             try:
2000                 pkt = sa.decrypt(rx[IP])
2001                 if not pkt.haslayer(IP):
2002                     pkt = IP(pkt[Raw].load)
2003                 self.assert_packet_checksums_valid(pkt)
2004                 self.assertTrue(pkt.haslayer(GRE))
2005                 e = pkt[GRE]
2006                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2007             except (IndexError, AssertionError):
2008                 self.logger.debug(ppp("Unexpected packet:", rx))
2009                 try:
2010                     self.logger.debug(ppp("Decrypted packet:", pkt))
2011                 except:
2012                     pass
2013                 raise
2014
2015     def setUp(self):
2016         super(TestIpsecMGreIfEspTra4, self).setUp()
2017
2018         N_NHS = 16
2019         self.tun_if = self.pg0
2020         p = self.ipv4_params
2021         p.tun_if = VppGreInterface(
2022             self,
2023             self.pg0.local_ip4,
2024             "0.0.0.0",
2025             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2026         )
2027         p.tun_if.add_vpp_config()
2028         p.tun_if.admin_up()
2029         p.tun_if.config_ip4()
2030         p.tun_if.generate_remote_hosts(N_NHS)
2031         self.pg0.generate_remote_hosts(N_NHS)
2032         self.pg0.configure_ipv4_neighbors()
2033
2034         # setup some SAs for several next-hops on the interface
2035         self.multi_params = []
2036
2037         for ii in range(N_NHS):
2038             p = copy.copy(self.ipv4_params)
2039
2040             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2041             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2042             p.scapy_tun_spi = p.scapy_tun_spi + ii
2043             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2044             p.vpp_tun_spi = p.vpp_tun_spi + ii
2045
2046             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2047             p.scapy_tra_spi = p.scapy_tra_spi + ii
2048             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2049             p.vpp_tra_spi = p.vpp_tra_spi + ii
2050             p.tun_sa_out = VppIpsecSA(
2051                 self,
2052                 p.scapy_tun_sa_id,
2053                 p.scapy_tun_spi,
2054                 p.auth_algo_vpp_id,
2055                 p.auth_key,
2056                 p.crypt_algo_vpp_id,
2057                 p.crypt_key,
2058                 self.vpp_esp_protocol,
2059             )
2060             p.tun_sa_out.add_vpp_config()
2061
2062             p.tun_sa_in = VppIpsecSA(
2063                 self,
2064                 p.vpp_tun_sa_id,
2065                 p.vpp_tun_spi,
2066                 p.auth_algo_vpp_id,
2067                 p.auth_key,
2068                 p.crypt_algo_vpp_id,
2069                 p.crypt_key,
2070                 self.vpp_esp_protocol,
2071             )
2072             p.tun_sa_in.add_vpp_config()
2073
2074             p.tun_protect = VppIpsecTunProtect(
2075                 self,
2076                 p.tun_if,
2077                 p.tun_sa_out,
2078                 [p.tun_sa_in],
2079                 nh=p.tun_if.remote_hosts[ii].ip4,
2080             )
2081             p.tun_protect.add_vpp_config()
2082             config_tra_params(p, self.encryption_type, p.tun_if)
2083             self.multi_params.append(p)
2084
2085             VppIpRoute(
2086                 self,
2087                 p.remote_tun_if_host,
2088                 32,
2089                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2090             ).add_vpp_config()
2091
2092             # in this v4 variant add the teibs after the protect
2093             p.teib = VppTeib(
2094                 self,
2095                 p.tun_if,
2096                 p.tun_if.remote_hosts[ii].ip4,
2097                 self.pg0.remote_hosts[ii].ip4,
2098             ).add_vpp_config()
2099             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2100         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2101
2102     def tearDown(self):
2103         p = self.ipv4_params
2104         p.tun_if.unconfig_ip4()
2105         super(TestIpsecMGreIfEspTra4, self).tearDown()
2106
2107     def test_tun_44(self):
2108         """mGRE IPSEC 44"""
2109         N_PKTS = 63
2110         for p in self.multi_params:
2111             self.verify_tun_44(p, count=N_PKTS)
2112             p.teib.remove_vpp_config()
2113             self.verify_tun_dropped_44(p, count=N_PKTS)
2114             p.teib.add_vpp_config()
2115             self.verify_tun_44(p, count=N_PKTS)
2116
2117
2118 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2119     """Ipsec mGRE ESP v6 TRA tests"""
2120
2121     tun6_encrypt_node_name = "esp6-encrypt-tun"
2122     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2123     encryption_type = ESP
2124
2125     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2126         return [
2127             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2128             / sa.encrypt(
2129                 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2130                 / GRE()
2131                 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2132                 / UDP(sport=1144, dport=2233)
2133                 / Raw(b"X" * payload_size)
2134             )
2135             for i in range(count)
2136         ]
2137
2138     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2139         return [
2140             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2141             / IPv6(src="1::1", dst=dst)
2142             / UDP(sport=1144, dport=2233)
2143             / Raw(b"X" * payload_size)
2144             for i in range(count)
2145         ]
2146
2147     def verify_decrypted6(self, p, rxs):
2148         for rx in rxs:
2149             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2150             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2151
2152     def verify_encrypted6(self, p, sa, rxs):
2153         for rx in rxs:
2154             try:
2155                 pkt = sa.decrypt(rx[IPv6])
2156                 if not pkt.haslayer(IPv6):
2157                     pkt = IPv6(pkt[Raw].load)
2158                 self.assert_packet_checksums_valid(pkt)
2159                 self.assertTrue(pkt.haslayer(GRE))
2160                 e = pkt[GRE]
2161                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2162             except (IndexError, AssertionError):
2163                 self.logger.debug(ppp("Unexpected packet:", rx))
2164                 try:
2165                     self.logger.debug(ppp("Decrypted packet:", pkt))
2166                 except:
2167                     pass
2168                 raise
2169
2170     def setUp(self):
2171         super(TestIpsecMGreIfEspTra6, self).setUp()
2172
2173         self.vapi.cli("set logging class ipsec level debug")
2174
2175         N_NHS = 16
2176         self.tun_if = self.pg0
2177         p = self.ipv6_params
2178         p.tun_if = VppGreInterface(
2179             self,
2180             self.pg0.local_ip6,
2181             "::",
2182             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2183         )
2184         p.tun_if.add_vpp_config()
2185         p.tun_if.admin_up()
2186         p.tun_if.config_ip6()
2187         p.tun_if.generate_remote_hosts(N_NHS)
2188         self.pg0.generate_remote_hosts(N_NHS)
2189         self.pg0.configure_ipv6_neighbors()
2190
2191         # setup some SAs for several next-hops on the interface
2192         self.multi_params = []
2193
2194         for ii in range(N_NHS):
2195             p = copy.copy(self.ipv6_params)
2196
2197             p.remote_tun_if_host = "1::%d" % (ii + 1)
2198             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2199             p.scapy_tun_spi = p.scapy_tun_spi + ii
2200             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2201             p.vpp_tun_spi = p.vpp_tun_spi + ii
2202
2203             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2204             p.scapy_tra_spi = p.scapy_tra_spi + ii
2205             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2206             p.vpp_tra_spi = p.vpp_tra_spi + ii
2207             p.tun_sa_out = VppIpsecSA(
2208                 self,
2209                 p.scapy_tun_sa_id,
2210                 p.scapy_tun_spi,
2211                 p.auth_algo_vpp_id,
2212                 p.auth_key,
2213                 p.crypt_algo_vpp_id,
2214                 p.crypt_key,
2215                 self.vpp_esp_protocol,
2216             )
2217             p.tun_sa_out.add_vpp_config()
2218
2219             p.tun_sa_in = VppIpsecSA(
2220                 self,
2221                 p.vpp_tun_sa_id,
2222                 p.vpp_tun_spi,
2223                 p.auth_algo_vpp_id,
2224                 p.auth_key,
2225                 p.crypt_algo_vpp_id,
2226                 p.crypt_key,
2227                 self.vpp_esp_protocol,
2228             )
2229             p.tun_sa_in.add_vpp_config()
2230
2231             # in this v6 variant add the teibs first then the protection
2232             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2233             VppTeib(
2234                 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2235             ).add_vpp_config()
2236
2237             p.tun_protect = VppIpsecTunProtect(
2238                 self,
2239                 p.tun_if,
2240                 p.tun_sa_out,
2241                 [p.tun_sa_in],
2242                 nh=p.tun_if.remote_hosts[ii].ip6,
2243             )
2244             p.tun_protect.add_vpp_config()
2245             config_tra_params(p, self.encryption_type, p.tun_if)
2246             self.multi_params.append(p)
2247
2248             VppIpRoute(
2249                 self,
2250                 p.remote_tun_if_host,
2251                 128,
2252                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2253             ).add_vpp_config()
2254             p.tun_dst = self.pg0.remote_hosts[ii].ip6
2255
2256         self.logger.info(self.vapi.cli("sh log"))
2257         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2258         self.logger.info(self.vapi.cli("sh adj 41"))
2259
2260     def tearDown(self):
2261         p = self.ipv6_params
2262         p.tun_if.unconfig_ip6()
2263         super(TestIpsecMGreIfEspTra6, self).tearDown()
2264
2265     def test_tun_66(self):
2266         """mGRE IPSec 66"""
2267         for p in self.multi_params:
2268             self.verify_tun_66(p, count=63)
2269
2270
2271 @tag_fixme_vpp_workers
2272 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2273     """IPsec IPv4 Tunnel protect - transport mode"""
2274
2275     def setUp(self):
2276         super(TestIpsec4TunProtect, self).setUp()
2277
2278         self.tun_if = self.pg0
2279
2280     def tearDown(self):
2281         super(TestIpsec4TunProtect, self).tearDown()
2282
2283     def test_tun_44(self):
2284         """IPSEC tunnel protect"""
2285
2286         p = self.ipv4_params
2287
2288         self.config_network(p)
2289         self.config_sa_tra(p)
2290         self.config_protect(p)
2291
2292         self.verify_tun_44(p, count=127)
2293         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2294         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2295
2296         self.vapi.cli("clear ipsec sa")
2297         self.verify_tun_64(p, count=127)
2298         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2299         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2300
2301         # rekey - create new SAs and update the tunnel protection
2302         np = copy.copy(p)
2303         np.crypt_key = b"X" + p.crypt_key[1:]
2304         np.scapy_tun_spi += 100
2305         np.scapy_tun_sa_id += 1
2306         np.vpp_tun_spi += 100
2307         np.vpp_tun_sa_id += 1
2308         np.tun_if.local_spi = p.vpp_tun_spi
2309         np.tun_if.remote_spi = p.scapy_tun_spi
2310
2311         self.config_sa_tra(np)
2312         self.config_protect(np)
2313         self.unconfig_sa(p)
2314
2315         self.verify_tun_44(np, count=127)
2316         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2317         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2318
2319         # teardown
2320         self.unconfig_protect(np)
2321         self.unconfig_sa(np)
2322         self.unconfig_network(p)
2323
2324
2325 @tag_fixme_vpp_workers
2326 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2327     """IPsec IPv4 Tunnel protect - transport mode"""
2328
2329     def setUp(self):
2330         super(TestIpsec4TunProtectUdp, self).setUp()
2331
2332         self.tun_if = self.pg0
2333
2334         p = self.ipv4_params
2335         p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2336         p.nat_header = UDP(sport=4500, dport=4500)
2337         self.config_network(p)
2338         self.config_sa_tra(p)
2339         self.config_protect(p)
2340
2341     def tearDown(self):
2342         p = self.ipv4_params
2343         self.unconfig_protect(p)
2344         self.unconfig_sa(p)
2345         self.unconfig_network(p)
2346         super(TestIpsec4TunProtectUdp, self).tearDown()
2347
2348     def verify_encrypted(self, p, sa, rxs):
2349         # ensure encrypted packets are recieved with the default UDP ports
2350         for rx in rxs:
2351             self.assertEqual(rx[UDP].sport, 4500)
2352             self.assertEqual(rx[UDP].dport, 4500)
2353         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2354
2355     def test_tun_44(self):
2356         """IPSEC UDP tunnel protect"""
2357
2358         p = self.ipv4_params
2359
2360         self.verify_tun_44(p, count=127)
2361         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2362         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2363
2364     def test_keepalive(self):
2365         """IPSEC NAT Keepalive"""
2366         self.verify_keepalive(self.ipv4_params)
2367
2368
2369 @tag_fixme_vpp_workers
2370 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2371     """IPsec IPv4 Tunnel protect - tunnel mode"""
2372
2373     encryption_type = ESP
2374     tun4_encrypt_node_name = "esp4-encrypt-tun"
2375     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2376
2377     def setUp(self):
2378         super(TestIpsec4TunProtectTun, self).setUp()
2379
2380         self.tun_if = self.pg0
2381
2382     def tearDown(self):
2383         super(TestIpsec4TunProtectTun, self).tearDown()
2384
2385     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2386         return [
2387             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2388             / sa.encrypt(
2389                 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2390                 / IP(src=src, dst=dst)
2391                 / UDP(sport=1144, dport=2233)
2392                 / Raw(b"X" * payload_size)
2393             )
2394             for i in range(count)
2395         ]
2396
2397     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2398         return [
2399             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2400             / IP(src=src, dst=dst)
2401             / UDP(sport=1144, dport=2233)
2402             / Raw(b"X" * payload_size)
2403             for i in range(count)
2404         ]
2405
2406     def verify_decrypted(self, p, rxs):
2407         for rx in rxs:
2408             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2409             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2410             self.assert_packet_checksums_valid(rx)
2411
2412     def verify_encrypted(self, p, sa, rxs):
2413         for rx in rxs:
2414             try:
2415                 pkt = sa.decrypt(rx[IP])
2416                 if not pkt.haslayer(IP):
2417                     pkt = IP(pkt[Raw].load)
2418                 self.assert_packet_checksums_valid(pkt)
2419                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2420                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2421                 inner = pkt[IP].payload
2422                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2423
2424             except (IndexError, AssertionError):
2425                 self.logger.debug(ppp("Unexpected packet:", rx))
2426                 try:
2427                     self.logger.debug(ppp("Decrypted packet:", pkt))
2428                 except:
2429                     pass
2430                 raise
2431
2432     def test_tun_44(self):
2433         """IPSEC tunnel protect"""
2434
2435         p = self.ipv4_params
2436
2437         self.config_network(p)
2438         self.config_sa_tun(p)
2439         self.config_protect(p)
2440
2441         # also add an output features on the tunnel and physical interface
2442         # so we test they still work
2443         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2444         a = VppAcl(self, [r_all]).add_vpp_config()
2445
2446         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2447         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2448
2449         self.verify_tun_44(p, count=127)
2450
2451         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2452         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2453
2454         # rekey - create new SAs and update the tunnel protection
2455         np = copy.copy(p)
2456         np.crypt_key = b"X" + p.crypt_key[1:]
2457         np.scapy_tun_spi += 100
2458         np.scapy_tun_sa_id += 1
2459         np.vpp_tun_spi += 100
2460         np.vpp_tun_sa_id += 1
2461         np.tun_if.local_spi = p.vpp_tun_spi
2462         np.tun_if.remote_spi = p.scapy_tun_spi
2463
2464         self.config_sa_tun(np)
2465         self.config_protect(np)
2466         self.unconfig_sa(p)
2467
2468         self.verify_tun_44(np, count=127)
2469         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2470         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2471
2472         # teardown
2473         self.unconfig_protect(np)
2474         self.unconfig_sa(np)
2475         self.unconfig_network(p)
2476
2477
2478 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2479     """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2480
2481     encryption_type = ESP
2482     tun4_encrypt_node_name = "esp4-encrypt-tun"
2483     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2484
2485     def setUp(self):
2486         super(TestIpsec4TunProtectTunDrop, self).setUp()
2487
2488         self.tun_if = self.pg0
2489
2490     def tearDown(self):
2491         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2492
2493     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2494         return [
2495             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2496             / sa.encrypt(
2497                 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2498                 / IP(src=src, dst=dst)
2499                 / UDP(sport=1144, dport=2233)
2500                 / Raw(b"X" * payload_size)
2501             )
2502             for i in range(count)
2503         ]
2504
2505     def test_tun_drop_44(self):
2506         """IPSEC tunnel protect bogus tunnel header"""
2507
2508         p = self.ipv4_params
2509
2510         self.config_network(p)
2511         self.config_sa_tun(p)
2512         self.config_protect(p)
2513
2514         tx = self.gen_encrypt_pkts(
2515             p,
2516             p.scapy_tun_sa,
2517             self.tun_if,
2518             src=p.remote_tun_if_host,
2519             dst=self.pg1.remote_ip4,
2520             count=63,
2521         )
2522         self.send_and_assert_no_replies(self.tun_if, tx)
2523
2524         # teardown
2525         self.unconfig_protect(p)
2526         self.unconfig_sa(p)
2527         self.unconfig_network(p)
2528
2529
2530 @tag_fixme_vpp_workers
2531 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2532     """IPsec IPv6 Tunnel protect - transport mode"""
2533
2534     encryption_type = ESP
2535     tun6_encrypt_node_name = "esp6-encrypt-tun"
2536     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2537
2538     def setUp(self):
2539         super(TestIpsec6TunProtect, self).setUp()
2540
2541         self.tun_if = self.pg0
2542
2543     def tearDown(self):
2544         super(TestIpsec6TunProtect, self).tearDown()
2545
2546     def test_tun_66(self):
2547         """IPSEC tunnel protect 6o6"""
2548
2549         p = self.ipv6_params
2550
2551         self.config_network(p)
2552         self.config_sa_tra(p)
2553         self.config_protect(p)
2554
2555         self.verify_tun_66(p, count=127)
2556         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2557         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2558
2559         # rekey - create new SAs and update the tunnel protection
2560         np = copy.copy(p)
2561         np.crypt_key = b"X" + p.crypt_key[1:]
2562         np.scapy_tun_spi += 100
2563         np.scapy_tun_sa_id += 1
2564         np.vpp_tun_spi += 100
2565         np.vpp_tun_sa_id += 1
2566         np.tun_if.local_spi = p.vpp_tun_spi
2567         np.tun_if.remote_spi = p.scapy_tun_spi
2568
2569         self.config_sa_tra(np)
2570         self.config_protect(np)
2571         self.unconfig_sa(p)
2572
2573         self.verify_tun_66(np, count=127)
2574         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2575         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2576
2577         # bounce the interface state
2578         p.tun_if.admin_down()
2579         self.verify_drop_tun_66(np, count=127)
2580         node = (
2581             "/err/ipsec6-tun-input/%s" % "ipsec packets received on disabled interface"
2582         )
2583         self.assertEqual(127, self.statistics.get_err_counter(node))
2584         p.tun_if.admin_up()
2585         self.verify_tun_66(np, count=127)
2586
2587         # 3 phase rekey
2588         #  1) add two input SAs [old, new]
2589         #  2) swap output SA to [new]
2590         #  3) use only [new] input SA
2591         np3 = copy.copy(np)
2592         np3.crypt_key = b"Z" + p.crypt_key[1:]
2593         np3.scapy_tun_spi += 100
2594         np3.scapy_tun_sa_id += 1
2595         np3.vpp_tun_spi += 100
2596         np3.vpp_tun_sa_id += 1
2597         np3.tun_if.local_spi = p.vpp_tun_spi
2598         np3.tun_if.remote_spi = p.scapy_tun_spi
2599
2600         self.config_sa_tra(np3)
2601
2602         # step 1;
2603         p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2604         self.verify_tun_66(np, np, count=127)
2605         self.verify_tun_66(np3, np, count=127)
2606
2607         # step 2;
2608         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2609         self.verify_tun_66(np, np3, count=127)
2610         self.verify_tun_66(np3, np3, count=127)
2611
2612         # step 1;
2613         p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2614         self.verify_tun_66(np3, np3, count=127)
2615         self.verify_drop_tun_rx_66(np, count=127)
2616
2617         self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2618         self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2619         self.unconfig_sa(np)
2620
2621         # teardown
2622         self.unconfig_protect(np3)
2623         self.unconfig_sa(np3)
2624         self.unconfig_network(p)
2625
2626     def test_tun_46(self):
2627         """IPSEC tunnel protect 4o6"""
2628
2629         p = self.ipv6_params
2630
2631         self.config_network(p)
2632         self.config_sa_tra(p)
2633         self.config_protect(p)
2634
2635         self.verify_tun_46(p, count=127)
2636         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2637         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2638
2639         # teardown
2640         self.unconfig_protect(p)
2641         self.unconfig_sa(p)
2642         self.unconfig_network(p)
2643
2644
2645 @tag_fixme_vpp_workers
2646 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2647     """IPsec IPv6 Tunnel protect - tunnel mode"""
2648
2649     encryption_type = ESP
2650     tun6_encrypt_node_name = "esp6-encrypt-tun"
2651     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2652
2653     def setUp(self):
2654         super(TestIpsec6TunProtectTun, self).setUp()
2655
2656         self.tun_if = self.pg0
2657
2658     def tearDown(self):
2659         super(TestIpsec6TunProtectTun, self).tearDown()
2660
2661     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2662         return [
2663             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2664             / sa.encrypt(
2665                 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2666                 / IPv6(src=src, dst=dst)
2667                 / UDP(sport=1166, dport=2233)
2668                 / Raw(b"X" * payload_size)
2669             )
2670             for i in range(count)
2671         ]
2672
2673     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2674         return [
2675             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2676             / IPv6(src=src, dst=dst)
2677             / UDP(sport=1166, dport=2233)
2678             / Raw(b"X" * payload_size)
2679             for i in range(count)
2680         ]
2681
2682     def verify_decrypted6(self, p, rxs):
2683         for rx in rxs:
2684             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2685             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2686             self.assert_packet_checksums_valid(rx)
2687
2688     def verify_encrypted6(self, p, sa, rxs):
2689         for rx in rxs:
2690             try:
2691                 pkt = sa.decrypt(rx[IPv6])
2692                 if not pkt.haslayer(IPv6):
2693                     pkt = IPv6(pkt[Raw].load)
2694                 self.assert_packet_checksums_valid(pkt)
2695                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2696                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2697                 inner = pkt[IPv6].payload
2698                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2699
2700             except (IndexError, AssertionError):
2701                 self.logger.debug(ppp("Unexpected packet:", rx))
2702                 try:
2703                     self.logger.debug(ppp("Decrypted packet:", pkt))
2704                 except:
2705                     pass
2706                 raise
2707
2708     def test_tun_66(self):
2709         """IPSEC tunnel protect"""
2710
2711         p = self.ipv6_params
2712
2713         self.config_network(p)
2714         self.config_sa_tun(p)
2715         self.config_protect(p)
2716
2717         self.verify_tun_66(p, count=127)
2718
2719         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2720         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2721
2722         # rekey - create new SAs and update the tunnel protection
2723         np = copy.copy(p)
2724         np.crypt_key = b"X" + p.crypt_key[1:]
2725         np.scapy_tun_spi += 100
2726         np.scapy_tun_sa_id += 1
2727         np.vpp_tun_spi += 100
2728         np.vpp_tun_sa_id += 1
2729         np.tun_if.local_spi = p.vpp_tun_spi
2730         np.tun_if.remote_spi = p.scapy_tun_spi
2731
2732         self.config_sa_tun(np)
2733         self.config_protect(np)
2734         self.unconfig_sa(p)
2735
2736         self.verify_tun_66(np, count=127)
2737         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2738         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2739
2740         # teardown
2741         self.unconfig_protect(np)
2742         self.unconfig_sa(np)
2743         self.unconfig_network(p)
2744
2745
2746 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2747     """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2748
2749     encryption_type = ESP
2750     tun6_encrypt_node_name = "esp6-encrypt-tun"
2751     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2752
2753     def setUp(self):
2754         super(TestIpsec6TunProtectTunDrop, self).setUp()
2755
2756         self.tun_if = self.pg0
2757
2758     def tearDown(self):
2759         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2760
2761     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2762         # the IP destination of the revelaed packet does not match
2763         # that assigned to the tunnel
2764         return [
2765             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2766             / sa.encrypt(
2767                 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2768                 / IPv6(src=src, dst=dst)
2769                 / UDP(sport=1144, dport=2233)
2770                 / Raw(b"X" * payload_size)
2771             )
2772             for i in range(count)
2773         ]
2774
2775     def test_tun_drop_66(self):
2776         """IPSEC 6 tunnel protect bogus tunnel header"""
2777
2778         p = self.ipv6_params
2779
2780         self.config_network(p)
2781         self.config_sa_tun(p)
2782         self.config_protect(p)
2783
2784         tx = self.gen_encrypt_pkts6(
2785             p,
2786             p.scapy_tun_sa,
2787             self.tun_if,
2788             src=p.remote_tun_if_host,
2789             dst=self.pg1.remote_ip6,
2790             count=63,
2791         )
2792         self.send_and_assert_no_replies(self.tun_if, tx)
2793
2794         self.unconfig_protect(p)
2795         self.unconfig_sa(p)
2796         self.unconfig_network(p)
2797
2798
2799 class TemplateIpsecItf4(object):
2800     """IPsec Interface IPv4"""
2801
2802     encryption_type = ESP
2803     tun4_encrypt_node_name = "esp4-encrypt-tun"
2804     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2805     tun4_input_node = "ipsec4-tun-input"
2806
2807     def config_sa_tun(self, p, src, dst):
2808         config_tun_params(p, self.encryption_type, None, src, dst)
2809
2810         p.tun_sa_out = VppIpsecSA(
2811             self,
2812             p.scapy_tun_sa_id,
2813             p.scapy_tun_spi,
2814             p.auth_algo_vpp_id,
2815             p.auth_key,
2816             p.crypt_algo_vpp_id,
2817             p.crypt_key,
2818             self.vpp_esp_protocol,
2819             src,
2820             dst,
2821             flags=p.flags,
2822         )
2823         p.tun_sa_out.add_vpp_config()
2824
2825         p.tun_sa_in = VppIpsecSA(
2826             self,
2827             p.vpp_tun_sa_id,
2828             p.vpp_tun_spi,
2829             p.auth_algo_vpp_id,
2830             p.auth_key,
2831             p.crypt_algo_vpp_id,
2832             p.crypt_key,
2833             self.vpp_esp_protocol,
2834             dst,
2835             src,
2836             flags=p.flags,
2837         )
2838         p.tun_sa_in.add_vpp_config()
2839
2840     def config_protect(self, p):
2841         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2842         p.tun_protect.add_vpp_config()
2843
2844     def config_network(self, p, instance=0xFFFFFFFF):
2845         p.tun_if = VppIpsecInterface(self, instance=instance)
2846
2847         p.tun_if.add_vpp_config()
2848         p.tun_if.admin_up()
2849         p.tun_if.config_ip4()
2850         p.tun_if.config_ip6()
2851
2852         p.route = VppIpRoute(
2853             self,
2854             p.remote_tun_if_host,
2855             32,
2856             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
2857         )
2858         p.route.add_vpp_config()
2859         r = VppIpRoute(
2860             self,
2861             p.remote_tun_if_host6,
2862             128,
2863             [
2864                 VppRoutePath(
2865                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2866                 )
2867             ],
2868         )
2869         r.add_vpp_config()
2870
2871     def unconfig_network(self, p):
2872         p.route.remove_vpp_config()
2873         p.tun_if.remove_vpp_config()
2874
2875     def unconfig_protect(self, p):
2876         p.tun_protect.remove_vpp_config()
2877
2878     def unconfig_sa(self, p):
2879         p.tun_sa_out.remove_vpp_config()
2880         p.tun_sa_in.remove_vpp_config()
2881
2882
2883 @tag_fixme_vpp_workers
2884 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
2885     """IPsec Interface IPv4"""
2886
2887     def setUp(self):
2888         super(TestIpsecItf4, self).setUp()
2889
2890         self.tun_if = self.pg0
2891
2892     def tearDown(self):
2893         super(TestIpsecItf4, self).tearDown()
2894
2895     def test_tun_instance_44(self):
2896         p = self.ipv4_params
2897         self.config_network(p, instance=3)
2898
2899         with self.assertRaises(CliFailedCommandError):
2900             self.vapi.cli("show interface ipsec0")
2901
2902         output = self.vapi.cli("show interface ipsec3")
2903         self.assertTrue("unknown" not in output)
2904
2905         self.unconfig_network(p)
2906
2907     def test_tun_44(self):
2908         """IPSEC interface IPv4"""
2909
2910         n_pkts = 127
2911         p = self.ipv4_params
2912
2913         self.config_network(p)
2914         config_tun_params(
2915             p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
2916         )
2917         self.verify_tun_dropped_44(p, count=n_pkts)
2918         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2919         self.config_protect(p)
2920
2921         self.verify_tun_44(p, count=n_pkts)
2922         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2923         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2924
2925         p.tun_if.admin_down()
2926         self.verify_tun_dropped_44(p, count=n_pkts)
2927         p.tun_if.admin_up()
2928         self.verify_tun_44(p, count=n_pkts)
2929
2930         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
2931         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
2932
2933         # it's a v6 packet when its encrypted
2934         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2935
2936         self.verify_tun_64(p, count=n_pkts)
2937         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
2938         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
2939
2940         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2941
2942         self.vapi.cli("clear interfaces")
2943
2944         # rekey - create new SAs and update the tunnel protection
2945         np = copy.copy(p)
2946         np.crypt_key = b"X" + p.crypt_key[1:]
2947         np.scapy_tun_spi += 100
2948         np.scapy_tun_sa_id += 1
2949         np.vpp_tun_spi += 100
2950         np.vpp_tun_sa_id += 1
2951         np.tun_if.local_spi = p.vpp_tun_spi
2952         np.tun_if.remote_spi = p.scapy_tun_spi
2953
2954         self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
2955         self.config_protect(np)
2956         self.unconfig_sa(p)
2957
2958         self.verify_tun_44(np, count=n_pkts)
2959         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2960         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2961
2962         # teardown
2963         self.unconfig_protect(np)
2964         self.unconfig_sa(np)
2965         self.unconfig_network(p)
2966
2967     def test_tun_44_null(self):
2968         """IPSEC interface IPv4 NULL auth/crypto"""
2969
2970         n_pkts = 127
2971         p = copy.copy(self.ipv4_params)
2972
2973         p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
2974         p.crypt_algo_vpp_id = (
2975             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
2976         )
2977         p.crypt_algo = "NULL"
2978         p.auth_algo = "NULL"
2979
2980         self.config_network(p)
2981         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2982         self.config_protect(p)
2983
2984         self.logger.info(self.vapi.cli("sh ipsec sa"))
2985         self.verify_tun_44(p, count=n_pkts)
2986
2987         # teardown
2988         self.unconfig_protect(p)
2989         self.unconfig_sa(p)
2990         self.unconfig_network(p)
2991
2992     def test_tun_44_police(self):
2993         """IPSEC interface IPv4 with input policer"""
2994         n_pkts = 127
2995         p = self.ipv4_params
2996
2997         self.config_network(p)
2998         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2999         self.config_protect(p)
3000
3001         action_tx = PolicerAction(
3002             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3003         )
3004         policer = VppPolicer(
3005             self,
3006             "pol1",
3007             80,
3008             0,
3009             1000,
3010             0,
3011             conform_action=action_tx,
3012             exceed_action=action_tx,
3013             violate_action=action_tx,
3014         )
3015         policer.add_vpp_config()
3016
3017         # Start policing on tun
3018         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3019
3020         self.verify_tun_44(p, count=n_pkts)
3021         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3022         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3023
3024         stats = policer.get_stats()
3025
3026         # Single rate, 2 colour policer - expect conform, violate but no exceed
3027         self.assertGreater(stats["conform_packets"], 0)
3028         self.assertEqual(stats["exceed_packets"], 0)
3029         self.assertGreater(stats["violate_packets"], 0)
3030
3031         # Stop policing on tun
3032         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3033         self.verify_tun_44(p, count=n_pkts)
3034
3035         # No new policer stats
3036         statsnew = policer.get_stats()
3037         self.assertEqual(stats, statsnew)
3038
3039         # teardown
3040         policer.remove_vpp_config()
3041         self.unconfig_protect(p)
3042         self.unconfig_sa(p)
3043         self.unconfig_network(p)
3044
3045
3046 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3047     """IPsec Interface MPLSoIPv4"""
3048
3049     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3050
3051     def setUp(self):
3052         super(TestIpsecItf4MPLS, self).setUp()
3053
3054         self.tun_if = self.pg0
3055
3056     def tearDown(self):
3057         super(TestIpsecItf4MPLS, self).tearDown()
3058
3059     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3060         return [
3061             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3062             / sa.encrypt(
3063                 MPLS(label=44, ttl=3)
3064                 / IP(src=src, dst=dst)
3065                 / UDP(sport=1166, dport=2233)
3066                 / Raw(b"X" * payload_size)
3067             )
3068             for i in range(count)
3069         ]
3070
3071     def verify_encrypted(self, p, sa, rxs):
3072         for rx in rxs:
3073             try:
3074                 pkt = sa.decrypt(rx[IP])
3075                 if not pkt.haslayer(IP):
3076                     pkt = IP(pkt[Raw].load)
3077                 self.assert_packet_checksums_valid(pkt)
3078                 self.assert_equal(pkt[MPLS].label, 44)
3079                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3080             except (IndexError, AssertionError):
3081                 self.logger.debug(ppp("Unexpected packet:", rx))
3082                 try:
3083                     self.logger.debug(ppp("Decrypted packet:", pkt))
3084                 except:
3085                     pass
3086                 raise
3087
3088     def test_tun_mpls_o_ip4(self):
3089         """IPSEC interface MPLS over IPv4"""
3090
3091         n_pkts = 127
3092         p = self.ipv4_params
3093         f = FibPathProto
3094
3095         tbl = VppMplsTable(self, 0)
3096         tbl.add_vpp_config()
3097
3098         self.config_network(p)
3099         # deag MPLS routes from the tunnel
3100         r4 = VppMplsRoute(
3101             self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3102         ).add_vpp_config()
3103         p.route.modify(
3104             [
3105                 VppRoutePath(
3106                     p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3107                 )
3108             ]
3109         )
3110         p.tun_if.enable_mpls()
3111
3112         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3113         self.config_protect(p)
3114
3115         self.verify_tun_44(p, count=n_pkts)
3116
3117         # cleanup
3118         p.tun_if.disable_mpls()
3119         self.unconfig_protect(p)
3120         self.unconfig_sa(p)
3121         self.unconfig_network(p)
3122
3123
3124 class TemplateIpsecItf6(object):
3125     """IPsec Interface IPv6"""
3126
3127     encryption_type = ESP
3128     tun6_encrypt_node_name = "esp6-encrypt-tun"
3129     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3130     tun6_input_node = "ipsec6-tun-input"
3131
3132     def config_sa_tun(self, p, src, dst):
3133         config_tun_params(p, self.encryption_type, None, src, dst)
3134
3135         if not hasattr(p, "tun_flags"):
3136             p.tun_flags = None
3137         if not hasattr(p, "hop_limit"):
3138             p.hop_limit = 255
3139
3140         p.tun_sa_out = VppIpsecSA(
3141             self,
3142             p.scapy_tun_sa_id,
3143             p.scapy_tun_spi,
3144             p.auth_algo_vpp_id,
3145             p.auth_key,
3146             p.crypt_algo_vpp_id,
3147             p.crypt_key,
3148             self.vpp_esp_protocol,
3149             src,
3150             dst,
3151             flags=p.flags,
3152             tun_flags=p.tun_flags,
3153             hop_limit=p.hop_limit,
3154         )
3155         p.tun_sa_out.add_vpp_config()
3156
3157         p.tun_sa_in = VppIpsecSA(
3158             self,
3159             p.vpp_tun_sa_id,
3160             p.vpp_tun_spi,
3161             p.auth_algo_vpp_id,
3162             p.auth_key,
3163             p.crypt_algo_vpp_id,
3164             p.crypt_key,
3165             self.vpp_esp_protocol,
3166             dst,
3167             src,
3168             flags=p.flags,
3169         )
3170         p.tun_sa_in.add_vpp_config()
3171
3172     def config_protect(self, p):
3173         p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3174         p.tun_protect.add_vpp_config()
3175
3176     def config_network(self, p):
3177         p.tun_if = VppIpsecInterface(self)
3178
3179         p.tun_if.add_vpp_config()
3180         p.tun_if.admin_up()
3181         p.tun_if.config_ip4()
3182         p.tun_if.config_ip6()
3183
3184         r = VppIpRoute(
3185             self,
3186             p.remote_tun_if_host4,
3187             32,
3188             [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3189         )
3190         r.add_vpp_config()
3191
3192         p.route = VppIpRoute(
3193             self,
3194             p.remote_tun_if_host,
3195             128,
3196             [
3197                 VppRoutePath(
3198                     p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3199                 )
3200             ],
3201         )
3202         p.route.add_vpp_config()
3203
3204     def unconfig_network(self, p):
3205         p.route.remove_vpp_config()
3206         p.tun_if.remove_vpp_config()
3207
3208     def unconfig_protect(self, p):
3209         p.tun_protect.remove_vpp_config()
3210
3211     def unconfig_sa(self, p):
3212         p.tun_sa_out.remove_vpp_config()
3213         p.tun_sa_in.remove_vpp_config()
3214
3215
3216 @tag_fixme_vpp_workers
3217 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3218     """IPsec Interface IPv6"""
3219
3220     def setUp(self):
3221         super(TestIpsecItf6, self).setUp()
3222
3223         self.tun_if = self.pg0
3224
3225     def tearDown(self):
3226         super(TestIpsecItf6, self).tearDown()
3227
3228     def test_tun_66(self):
3229         """IPSEC interface IPv6"""
3230
3231         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3232         n_pkts = 127
3233         p = self.ipv6_params
3234         p.inner_hop_limit = 24
3235         p.outer_hop_limit = 23
3236         p.outer_flow_label = 243224
3237         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3238
3239         self.config_network(p)
3240         config_tun_params(
3241             p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3242         )
3243         self.verify_drop_tun_66(p, count=n_pkts)
3244         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3245         self.config_protect(p)
3246
3247         self.verify_tun_66(p, count=n_pkts)
3248         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3249         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3250
3251         p.tun_if.admin_down()
3252         self.verify_drop_tun_66(p, count=n_pkts)
3253         p.tun_if.admin_up()
3254         self.verify_tun_66(p, count=n_pkts)
3255
3256         self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3257         self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3258
3259         # it's a v4 packet when its encrypted
3260         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3261
3262         self.verify_tun_46(p, count=n_pkts)
3263         self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3264         self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3265
3266         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3267
3268         self.vapi.cli("clear interfaces")
3269
3270         # rekey - create new SAs and update the tunnel protection
3271         np = copy.copy(p)
3272         np.crypt_key = b"X" + p.crypt_key[1:]
3273         np.scapy_tun_spi += 100
3274         np.scapy_tun_sa_id += 1
3275         np.vpp_tun_spi += 100
3276         np.vpp_tun_sa_id += 1
3277         np.tun_if.local_spi = p.vpp_tun_spi
3278         np.tun_if.remote_spi = p.scapy_tun_spi
3279         np.inner_hop_limit = 24
3280         np.outer_hop_limit = 128
3281         np.inner_flow_label = 0xABCDE
3282         np.outer_flow_label = 0xABCDE
3283         np.hop_limit = 128
3284         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3285
3286         self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3287         self.config_protect(np)
3288         self.unconfig_sa(p)
3289
3290         self.verify_tun_66(np, count=n_pkts)
3291         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3292         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3293
3294         # teardown
3295         self.unconfig_protect(np)
3296         self.unconfig_sa(np)
3297         self.unconfig_network(p)
3298
3299     def test_tun_66_police(self):
3300         """IPSEC interface IPv6 with input policer"""
3301         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3302         n_pkts = 127
3303         p = self.ipv6_params
3304         p.inner_hop_limit = 24
3305         p.outer_hop_limit = 23
3306         p.outer_flow_label = 243224
3307         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3308
3309         self.config_network(p)
3310         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3311         self.config_protect(p)
3312
3313         action_tx = PolicerAction(
3314             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3315         )
3316         policer = VppPolicer(
3317             self,
3318             "pol1",
3319             80,
3320             0,
3321             1000,
3322             0,
3323             conform_action=action_tx,
3324             exceed_action=action_tx,
3325             violate_action=action_tx,
3326         )
3327         policer.add_vpp_config()
3328
3329         # Start policing on tun
3330         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3331
3332         self.verify_tun_66(p, count=n_pkts)
3333         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3334         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3335
3336         stats = policer.get_stats()
3337
3338         # Single rate, 2 colour policer - expect conform, violate but no exceed
3339         self.assertGreater(stats["conform_packets"], 0)
3340         self.assertEqual(stats["exceed_packets"], 0)
3341         self.assertGreater(stats["violate_packets"], 0)
3342
3343         # Stop policing on tun
3344         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3345         self.verify_tun_66(p, count=n_pkts)
3346
3347         # No new policer stats
3348         statsnew = policer.get_stats()
3349         self.assertEqual(stats, statsnew)
3350
3351         # teardown
3352         policer.remove_vpp_config()
3353         self.unconfig_protect(p)
3354         self.unconfig_sa(p)
3355         self.unconfig_network(p)
3356
3357
3358 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3359     """Ipsec P2MP ESP v4 tests"""
3360
3361     tun4_encrypt_node_name = "esp4-encrypt-tun"
3362     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3363     encryption_type = ESP
3364
3365     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3366         return [
3367             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3368             / sa.encrypt(
3369                 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3370                 / UDP(sport=1144, dport=2233)
3371                 / Raw(b"X" * payload_size)
3372             )
3373             for i in range(count)
3374         ]
3375
3376     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3377         return [
3378             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3379             / IP(src="1.1.1.1", dst=dst)
3380             / UDP(sport=1144, dport=2233)
3381             / Raw(b"X" * payload_size)
3382             for i in range(count)
3383         ]
3384
3385     def verify_decrypted(self, p, rxs):
3386         for rx in rxs:
3387             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3388             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3389
3390     def verify_encrypted(self, p, sa, rxs):
3391         for rx in rxs:
3392             try:
3393                 self.assertEqual(
3394                     rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3395                 )
3396                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3397                 pkt = sa.decrypt(rx[IP])
3398                 if not pkt.haslayer(IP):
3399                     pkt = IP(pkt[Raw].load)
3400                 self.assert_packet_checksums_valid(pkt)
3401                 e = pkt[IP]
3402                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3403             except (IndexError, AssertionError):
3404                 self.logger.debug(ppp("Unexpected packet:", rx))
3405                 try:
3406                     self.logger.debug(ppp("Decrypted packet:", pkt))
3407                 except:
3408                     pass
3409                 raise
3410
3411     def setUp(self):
3412         super(TestIpsecMIfEsp4, self).setUp()
3413
3414         N_NHS = 16
3415         self.tun_if = self.pg0
3416         p = self.ipv4_params
3417         p.tun_if = VppIpsecInterface(
3418             self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3419         )
3420         p.tun_if.add_vpp_config()
3421         p.tun_if.admin_up()
3422         p.tun_if.config_ip4()
3423         p.tun_if.unconfig_ip4()
3424         p.tun_if.config_ip4()
3425         p.tun_if.generate_remote_hosts(N_NHS)
3426         self.pg0.generate_remote_hosts(N_NHS)
3427         self.pg0.configure_ipv4_neighbors()
3428
3429         r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3430         a = VppAcl(self, [r_all]).add_vpp_config()
3431
3432         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3433         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3434
3435         # setup some SAs for several next-hops on the interface
3436         self.multi_params = []
3437
3438         for ii in range(N_NHS):
3439             p = copy.copy(self.ipv4_params)
3440
3441             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3442             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3443             p.scapy_tun_spi = p.scapy_tun_spi + ii
3444             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3445             p.vpp_tun_spi = p.vpp_tun_spi + ii
3446
3447             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3448             p.scapy_tra_spi = p.scapy_tra_spi + ii
3449             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3450             p.vpp_tra_spi = p.vpp_tra_spi + ii
3451             p.hop_limit = ii + 10
3452             p.tun_sa_out = VppIpsecSA(
3453                 self,
3454                 p.scapy_tun_sa_id,
3455                 p.scapy_tun_spi,
3456                 p.auth_algo_vpp_id,
3457                 p.auth_key,
3458                 p.crypt_algo_vpp_id,
3459                 p.crypt_key,
3460                 self.vpp_esp_protocol,
3461                 self.pg0.local_ip4,
3462                 self.pg0.remote_hosts[ii].ip4,
3463                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3464                 hop_limit=p.hop_limit,
3465             )
3466             p.tun_sa_out.add_vpp_config()
3467
3468             p.tun_sa_in = VppIpsecSA(
3469                 self,
3470                 p.vpp_tun_sa_id,
3471                 p.vpp_tun_spi,
3472                 p.auth_algo_vpp_id,
3473                 p.auth_key,
3474                 p.crypt_algo_vpp_id,
3475                 p.crypt_key,
3476                 self.vpp_esp_protocol,
3477                 self.pg0.remote_hosts[ii].ip4,
3478                 self.pg0.local_ip4,
3479                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3480                 hop_limit=p.hop_limit,
3481             )
3482             p.tun_sa_in.add_vpp_config()
3483
3484             p.tun_protect = VppIpsecTunProtect(
3485                 self,
3486                 p.tun_if,
3487                 p.tun_sa_out,
3488                 [p.tun_sa_in],
3489                 nh=p.tun_if.remote_hosts[ii].ip4,
3490             )
3491             p.tun_protect.add_vpp_config()
3492             config_tun_params(
3493                 p,
3494                 self.encryption_type,
3495                 None,
3496                 self.pg0.local_ip4,
3497                 self.pg0.remote_hosts[ii].ip4,
3498             )
3499             self.multi_params.append(p)
3500
3501             p.via_tun_route = VppIpRoute(
3502                 self,
3503                 p.remote_tun_if_host,
3504                 32,
3505                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3506             ).add_vpp_config()
3507
3508             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3509
3510     def tearDown(self):
3511         p = self.ipv4_params
3512         p.tun_if.unconfig_ip4()
3513         super(TestIpsecMIfEsp4, self).tearDown()
3514
3515     def test_tun_44(self):
3516         """P2MP IPSEC 44"""
3517         N_PKTS = 63
3518         for p in self.multi_params:
3519             self.verify_tun_44(p, count=N_PKTS)
3520
3521         # remove one tunnel protect, the rest should still work
3522         self.multi_params[0].tun_protect.remove_vpp_config()
3523         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3524         self.multi_params[0].via_tun_route.remove_vpp_config()
3525         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3526
3527         for p in self.multi_params[1:]:
3528             self.verify_tun_44(p, count=N_PKTS)
3529
3530         self.multi_params[0].tun_protect.add_vpp_config()
3531         self.multi_params[0].via_tun_route.add_vpp_config()
3532
3533         for p in self.multi_params:
3534             self.verify_tun_44(p, count=N_PKTS)
3535
3536
3537 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3538     """IPsec Interface MPLSoIPv6"""
3539
3540     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3541
3542     def setUp(self):
3543         super(TestIpsecItf6MPLS, self).setUp()
3544
3545         self.tun_if = self.pg0
3546
3547     def tearDown(self):
3548         super(TestIpsecItf6MPLS, self).tearDown()
3549
3550     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3551         return [
3552             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3553             / sa.encrypt(
3554                 MPLS(label=66, ttl=3)
3555                 / IPv6(src=src, dst=dst)
3556                 / UDP(sport=1166, dport=2233)
3557                 / Raw(b"X" * payload_size)
3558             )
3559             for i in range(count)
3560         ]
3561
3562     def verify_encrypted6(self, p, sa, rxs):
3563         for rx in rxs:
3564             try:
3565                 pkt = sa.decrypt(rx[IPv6])
3566                 if not pkt.haslayer(IPv6):
3567                     pkt = IP(pkt[Raw].load)
3568                 self.assert_packet_checksums_valid(pkt)
3569                 self.assert_equal(pkt[MPLS].label, 66)
3570                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3571             except (IndexError, AssertionError):
3572                 self.logger.debug(ppp("Unexpected packet:", rx))
3573                 try:
3574                     self.logger.debug(ppp("Decrypted packet:", pkt))
3575                 except:
3576                     pass
3577                 raise
3578
3579     def test_tun_mpls_o_ip6(self):
3580         """IPSEC interface MPLS over IPv6"""
3581
3582         n_pkts = 127
3583         p = self.ipv6_params
3584         f = FibPathProto
3585
3586         tbl = VppMplsTable(self, 0)
3587         tbl.add_vpp_config()
3588
3589         self.config_network(p)
3590         # deag MPLS routes from the tunnel
3591         r6 = VppMplsRoute(
3592             self,
3593             66,
3594             1,
3595             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3596             eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3597         ).add_vpp_config()
3598         p.route.modify(
3599             [
3600                 VppRoutePath(
3601                     p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3602                 )
3603             ]
3604         )
3605         p.tun_if.enable_mpls()
3606
3607         self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3608         self.config_protect(p)
3609
3610         self.verify_tun_66(p, count=n_pkts)
3611
3612         # cleanup
3613         p.tun_if.disable_mpls()
3614         self.unconfig_protect(p)
3615         self.unconfig_sa(p)
3616         self.unconfig_network(p)
3617
3618
3619 if __name__ == "__main__":
3620     unittest.main(testRunner=VppTestRunner)