crypto crypto-openssl: support hashing operations
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
15     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19     VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
24 from util import ppp
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 from vpp_policer import PolicerAction, VppPolicer
29
30
31 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
32     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
33     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
34                              IPSEC_API_SAD_FLAG_USE_ESN))
35     crypt_key = mk_scapy_crypt_key(p)
36     if tun_if:
37         p.tun_dst = tun_if.remote_ip
38         p.tun_src = tun_if.local_ip
39     else:
40         p.tun_dst = dst
41         p.tun_src = src
42
43     p.scapy_tun_sa = SecurityAssociation(
44         encryption_type, spi=p.vpp_tun_spi,
45         crypt_algo=p.crypt_algo,
46         crypt_key=crypt_key,
47         auth_algo=p.auth_algo, auth_key=p.auth_key,
48         tunnel_header=ip_class_by_addr_type[p.addr_type](
49             src=p.tun_dst,
50             dst=p.tun_src),
51         nat_t_header=p.nat_header,
52         esn_en=esn_en)
53     p.vpp_tun_sa = SecurityAssociation(
54         encryption_type, spi=p.scapy_tun_spi,
55         crypt_algo=p.crypt_algo,
56         crypt_key=crypt_key,
57         auth_algo=p.auth_algo, auth_key=p.auth_key,
58         tunnel_header=ip_class_by_addr_type[p.addr_type](
59             dst=p.tun_dst,
60             src=p.tun_src),
61         nat_t_header=p.nat_header,
62         esn_en=esn_en)
63
64
65 def config_tra_params(p, encryption_type, tun_if):
66     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
67     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
68                              IPSEC_API_SAD_FLAG_USE_ESN))
69     crypt_key = mk_scapy_crypt_key(p)
70     p.tun_dst = tun_if.remote_ip
71     p.tun_src = tun_if.local_ip
72     p.scapy_tun_sa = SecurityAssociation(
73         encryption_type, spi=p.vpp_tun_spi,
74         crypt_algo=p.crypt_algo,
75         crypt_key=crypt_key,
76         auth_algo=p.auth_algo, auth_key=p.auth_key,
77         esn_en=esn_en,
78         nat_t_header=p.nat_header)
79     p.vpp_tun_sa = SecurityAssociation(
80         encryption_type, spi=p.scapy_tun_spi,
81         crypt_algo=p.crypt_algo,
82         crypt_key=crypt_key,
83         auth_algo=p.auth_algo, auth_key=p.auth_key,
84         esn_en=esn_en,
85         nat_t_header=p.nat_header)
86
87
88 class TemplateIpsec4TunProtect(object):
89     """ IPsec IPv4 Tunnel protect """
90
91     encryption_type = ESP
92     tun4_encrypt_node_name = "esp4-encrypt-tun"
93     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
94     tun4_input_node = "ipsec4-tun-input"
95
96     def config_sa_tra(self, p):
97         config_tun_params(p, self.encryption_type, p.tun_if)
98
99         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
100                                   p.auth_algo_vpp_id, p.auth_key,
101                                   p.crypt_algo_vpp_id, p.crypt_key,
102                                   self.vpp_esp_protocol,
103                                   flags=p.flags)
104         p.tun_sa_out.add_vpp_config()
105
106         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
107                                  p.auth_algo_vpp_id, p.auth_key,
108                                  p.crypt_algo_vpp_id, p.crypt_key,
109                                  self.vpp_esp_protocol,
110                                  flags=p.flags)
111         p.tun_sa_in.add_vpp_config()
112
113     def config_sa_tun(self, p):
114         config_tun_params(p, self.encryption_type, p.tun_if)
115
116         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
117                                   p.auth_algo_vpp_id, p.auth_key,
118                                   p.crypt_algo_vpp_id, p.crypt_key,
119                                   self.vpp_esp_protocol,
120                                   self.tun_if.local_addr[p.addr_type],
121                                   self.tun_if.remote_addr[p.addr_type],
122                                   flags=p.flags)
123         p.tun_sa_out.add_vpp_config()
124
125         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
126                                  p.auth_algo_vpp_id, p.auth_key,
127                                  p.crypt_algo_vpp_id, p.crypt_key,
128                                  self.vpp_esp_protocol,
129                                  self.tun_if.remote_addr[p.addr_type],
130                                  self.tun_if.local_addr[p.addr_type],
131                                  flags=p.flags)
132         p.tun_sa_in.add_vpp_config()
133
134     def config_protect(self, p):
135         p.tun_protect = VppIpsecTunProtect(self,
136                                            p.tun_if,
137                                            p.tun_sa_out,
138                                            [p.tun_sa_in])
139         p.tun_protect.add_vpp_config()
140
141     def config_network(self, p):
142         if hasattr(p, 'tun_dst'):
143             tun_dst = p.tun_dst
144         else:
145             tun_dst = self.pg0.remote_ip4
146         p.tun_if = VppIpIpTunInterface(self, self.pg0,
147                                        self.pg0.local_ip4,
148                                        tun_dst)
149         p.tun_if.add_vpp_config()
150         p.tun_if.admin_up()
151         p.tun_if.config_ip4()
152         p.tun_if.config_ip6()
153
154         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
155                              [VppRoutePath(p.tun_if.remote_ip4,
156                                            0xffffffff)])
157         p.route.add_vpp_config()
158         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
159                        [VppRoutePath(p.tun_if.remote_ip6,
160                                      0xffffffff,
161                                      proto=DpoProto.DPO_PROTO_IP6)])
162         r.add_vpp_config()
163
164     def unconfig_network(self, p):
165         p.route.remove_vpp_config()
166         p.tun_if.remove_vpp_config()
167
168     def unconfig_protect(self, p):
169         p.tun_protect.remove_vpp_config()
170
171     def unconfig_sa(self, p):
172         p.tun_sa_out.remove_vpp_config()
173         p.tun_sa_in.remove_vpp_config()
174
175
176 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
177                              TemplateIpsec):
178     """ IPsec tunnel interface tests """
179
180     encryption_type = ESP
181
182     @classmethod
183     def setUpClass(cls):
184         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
185
186     @classmethod
187     def tearDownClass(cls):
188         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
189
190     def setUp(self):
191         super(TemplateIpsec4TunIfEsp, self).setUp()
192
193         self.tun_if = self.pg0
194
195         p = self.ipv4_params
196
197         self.config_network(p)
198         self.config_sa_tra(p)
199         self.config_protect(p)
200
201     def tearDown(self):
202         super(TemplateIpsec4TunIfEsp, self).tearDown()
203
204
205 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
206                                 TemplateIpsec):
207     """ IPsec UDP tunnel interface tests """
208
209     tun4_encrypt_node_name = "esp4-encrypt-tun"
210     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
211     encryption_type = ESP
212
213     @classmethod
214     def setUpClass(cls):
215         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
216
217     @classmethod
218     def tearDownClass(cls):
219         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
220
221     def verify_encrypted(self, p, sa, rxs):
222         for rx in rxs:
223             try:
224                 # ensure the UDP ports are correct before we decrypt
225                 # which strips them
226                 self.assertTrue(rx.haslayer(UDP))
227                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
228                 self.assert_equal(rx[UDP].dport, 4500)
229
230                 pkt = sa.decrypt(rx[IP])
231                 if not pkt.haslayer(IP):
232                     pkt = IP(pkt[Raw].load)
233
234                 self.assert_packet_checksums_valid(pkt)
235                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
236                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
237             except (IndexError, AssertionError):
238                 self.logger.debug(ppp("Unexpected packet:", rx))
239                 try:
240                     self.logger.debug(ppp("Decrypted packet:", pkt))
241                 except:
242                     pass
243                 raise
244
245     def config_sa_tra(self, p):
246         config_tun_params(p, self.encryption_type, p.tun_if)
247
248         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
249                                   p.auth_algo_vpp_id, p.auth_key,
250                                   p.crypt_algo_vpp_id, p.crypt_key,
251                                   self.vpp_esp_protocol,
252                                   flags=p.flags,
253                                   udp_src=p.nat_header.sport,
254                                   udp_dst=p.nat_header.dport)
255         p.tun_sa_out.add_vpp_config()
256
257         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
258                                  p.auth_algo_vpp_id, p.auth_key,
259                                  p.crypt_algo_vpp_id, p.crypt_key,
260                                  self.vpp_esp_protocol,
261                                  flags=p.flags,
262                                  udp_src=p.nat_header.sport,
263                                  udp_dst=p.nat_header.dport)
264         p.tun_sa_in.add_vpp_config()
265
266     def setUp(self):
267         super(TemplateIpsec4TunIfEspUdp, self).setUp()
268
269         p = self.ipv4_params
270         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
271                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
272         p.nat_header = UDP(sport=5454, dport=4500)
273
274         self.tun_if = self.pg0
275
276         self.config_network(p)
277         self.config_sa_tra(p)
278         self.config_protect(p)
279
280     def tearDown(self):
281         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
282
283
284 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
285     """ Ipsec ESP - TUN tests """
286     tun4_encrypt_node_name = "esp4-encrypt-tun"
287     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
288
289     def test_tun_basic64(self):
290         """ ipsec 6o4 tunnel basic test """
291         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
292
293         self.verify_tun_64(self.params[socket.AF_INET], count=1)
294
295     def test_tun_burst64(self):
296         """ ipsec 6o4 tunnel basic test """
297         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
298
299         self.verify_tun_64(self.params[socket.AF_INET], count=257)
300
301     def test_tun_basic_frag44(self):
302         """ ipsec 4o4 tunnel frag basic test """
303         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
304
305         p = self.ipv4_params
306
307         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
308                                        [1500, 0, 0, 0])
309         self.verify_tun_44(self.params[socket.AF_INET],
310                            count=1, payload_size=1800, n_rx=2)
311         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
312                                        [9000, 0, 0, 0])
313
314
315 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
316     """ Ipsec ESP UDP tests """
317
318     tun4_input_node = "ipsec4-tun-input"
319
320     def setUp(self):
321         super(TestIpsec4TunIfEspUdp, self).setUp()
322
323     def test_keepalive(self):
324         """ IPSEC NAT Keepalive """
325         self.verify_keepalive(self.ipv4_params)
326
327
328 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
329     """ Ipsec ESP UDP GCM tests """
330
331     tun4_input_node = "ipsec4-tun-input"
332
333     def setUp(self):
334         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
335         p = self.ipv4_params
336         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
337                               IPSEC_API_INTEG_ALG_NONE)
338         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
339                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
340         p.crypt_algo = "AES-GCM"
341         p.auth_algo = "NULL"
342         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
343         p.salt = 0
344
345
346 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
347     """ Ipsec ESP - TCP tests """
348     pass
349
350
351 class TemplateIpsec6TunProtect(object):
352     """ IPsec IPv6 Tunnel protect """
353
354     def config_sa_tra(self, p):
355         config_tun_params(p, self.encryption_type, p.tun_if)
356
357         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
358                                   p.auth_algo_vpp_id, p.auth_key,
359                                   p.crypt_algo_vpp_id, p.crypt_key,
360                                   self.vpp_esp_protocol)
361         p.tun_sa_out.add_vpp_config()
362
363         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
364                                  p.auth_algo_vpp_id, p.auth_key,
365                                  p.crypt_algo_vpp_id, p.crypt_key,
366                                  self.vpp_esp_protocol)
367         p.tun_sa_in.add_vpp_config()
368
369     def config_sa_tun(self, p):
370         config_tun_params(p, self.encryption_type, p.tun_if)
371
372         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
373                                   p.auth_algo_vpp_id, p.auth_key,
374                                   p.crypt_algo_vpp_id, p.crypt_key,
375                                   self.vpp_esp_protocol,
376                                   self.tun_if.local_addr[p.addr_type],
377                                   self.tun_if.remote_addr[p.addr_type])
378         p.tun_sa_out.add_vpp_config()
379
380         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
381                                  p.auth_algo_vpp_id, p.auth_key,
382                                  p.crypt_algo_vpp_id, p.crypt_key,
383                                  self.vpp_esp_protocol,
384                                  self.tun_if.remote_addr[p.addr_type],
385                                  self.tun_if.local_addr[p.addr_type])
386         p.tun_sa_in.add_vpp_config()
387
388     def config_protect(self, p):
389         p.tun_protect = VppIpsecTunProtect(self,
390                                            p.tun_if,
391                                            p.tun_sa_out,
392                                            [p.tun_sa_in])
393         p.tun_protect.add_vpp_config()
394
395     def config_network(self, p):
396         if hasattr(p, 'tun_dst'):
397             tun_dst = p.tun_dst
398         else:
399             tun_dst = self.pg0.remote_ip6
400         p.tun_if = VppIpIpTunInterface(self, self.pg0,
401                                        self.pg0.local_ip6,
402                                        tun_dst)
403         p.tun_if.add_vpp_config()
404         p.tun_if.admin_up()
405         p.tun_if.config_ip6()
406         p.tun_if.config_ip4()
407
408         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
409                              [VppRoutePath(p.tun_if.remote_ip6,
410                                            0xffffffff,
411                                            proto=DpoProto.DPO_PROTO_IP6)])
412         p.route.add_vpp_config()
413         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
414                        [VppRoutePath(p.tun_if.remote_ip4,
415                                      0xffffffff)])
416         r.add_vpp_config()
417
418     def unconfig_network(self, p):
419         p.route.remove_vpp_config()
420         p.tun_if.remove_vpp_config()
421
422     def unconfig_protect(self, p):
423         p.tun_protect.remove_vpp_config()
424
425     def unconfig_sa(self, p):
426         p.tun_sa_out.remove_vpp_config()
427         p.tun_sa_in.remove_vpp_config()
428
429
430 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
431                              TemplateIpsec):
432     """ IPsec tunnel interface tests """
433
434     encryption_type = ESP
435
436     def setUp(self):
437         super(TemplateIpsec6TunIfEsp, self).setUp()
438
439         self.tun_if = self.pg0
440
441         p = self.ipv6_params
442         self.config_network(p)
443         self.config_sa_tra(p)
444         self.config_protect(p)
445
446     def tearDown(self):
447         super(TemplateIpsec6TunIfEsp, self).tearDown()
448
449
450 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
451                           IpsecTun6Tests):
452     """ Ipsec ESP - TUN tests """
453     tun6_encrypt_node_name = "esp6-encrypt-tun"
454     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
455
456     def test_tun_basic46(self):
457         """ ipsec 4o6 tunnel basic test """
458         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
459         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
460
461     def test_tun_burst46(self):
462         """ ipsec 4o6 tunnel burst test """
463         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
464         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
465
466
467 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
468                                 IpsecTun6HandoffTests):
469     """ Ipsec ESP 6 Handoff tests """
470     tun6_encrypt_node_name = "esp6-encrypt-tun"
471     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
472
473     def test_tun_handoff_66_police(self):
474         """ ESP 6o6 tunnel with policer worker hand-off test """
475         self.vapi.cli("clear errors")
476         self.vapi.cli("clear ipsec sa")
477
478         N_PKTS = 15
479         p = self.params[socket.AF_INET6]
480
481         action_tx = PolicerAction(
482             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
483             0)
484         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
485                              conform_action=action_tx,
486                              exceed_action=action_tx,
487                              violate_action=action_tx)
488         policer.add_vpp_config()
489
490         # Start policing on tun
491         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
492
493         for pol_bind in [1, 0]:
494             policer.bind_vpp_config(pol_bind, True)
495
496             # inject alternately on worker 0 and 1.
497             for worker in [0, 1, 0, 1]:
498                 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
499                                                    self.tun_if,
500                                                    src=p.remote_tun_if_host,
501                                                    dst=self.pg1.remote_ip6,
502                                                    count=N_PKTS)
503                 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
504                                                  self.pg1, worker=worker)
505                 self.verify_decrypted6(p, recv_pkts)
506                 self.logger.debug(self.vapi.cli("show trace max 100"))
507
508             stats = policer.get_stats()
509             stats0 = policer.get_stats(worker=0)
510             stats1 = policer.get_stats(worker=1)
511
512             if pol_bind == 1:
513                 # First pass: Worker 1, should have done all the policing
514                 self.assertEqual(stats, stats1)
515
516                 # Worker 0, should have handed everything off
517                 self.assertEqual(stats0['conform_packets'], 0)
518                 self.assertEqual(stats0['exceed_packets'], 0)
519                 self.assertEqual(stats0['violate_packets'], 0)
520             else:
521                 # Second pass: both workers should have policed equal amounts
522                 self.assertGreater(stats1['conform_packets'], 0)
523                 self.assertEqual(stats1['exceed_packets'], 0)
524                 self.assertGreater(stats1['violate_packets'], 0)
525
526                 self.assertGreater(stats0['conform_packets'], 0)
527                 self.assertEqual(stats0['exceed_packets'], 0)
528                 self.assertGreater(stats0['violate_packets'], 0)
529
530                 self.assertEqual(stats0['conform_packets'] +
531                                  stats0['violate_packets'],
532                                  stats1['conform_packets'] +
533                                  stats1['violate_packets'])
534
535         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
536         policer.remove_vpp_config()
537
538
539 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
540                                 IpsecTun4HandoffTests):
541     """ Ipsec ESP 4 Handoff tests """
542     tun4_encrypt_node_name = "esp4-encrypt-tun"
543     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
544
545     def test_tun_handoff_44_police(self):
546         """ ESP 4o4 tunnel with policer worker hand-off test """
547         self.vapi.cli("clear errors")
548         self.vapi.cli("clear ipsec sa")
549
550         N_PKTS = 15
551         p = self.params[socket.AF_INET]
552
553         action_tx = PolicerAction(
554             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
555             0)
556         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
557                              conform_action=action_tx,
558                              exceed_action=action_tx,
559                              violate_action=action_tx)
560         policer.add_vpp_config()
561
562         # Start policing on tun
563         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
564
565         for pol_bind in [1, 0]:
566             policer.bind_vpp_config(pol_bind, True)
567
568             # inject alternately on worker 0 and 1.
569             for worker in [0, 1, 0, 1]:
570                 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
571                                                   self.tun_if,
572                                                   src=p.remote_tun_if_host,
573                                                   dst=self.pg1.remote_ip4,
574                                                   count=N_PKTS)
575                 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
576                                                  self.pg1, worker=worker)
577                 self.verify_decrypted(p, recv_pkts)
578                 self.logger.debug(self.vapi.cli("show trace max 100"))
579
580             stats = policer.get_stats()
581             stats0 = policer.get_stats(worker=0)
582             stats1 = policer.get_stats(worker=1)
583
584             if pol_bind == 1:
585                 # First pass: Worker 1, should have done all the policing
586                 self.assertEqual(stats, stats1)
587
588                 # Worker 0, should have handed everything off
589                 self.assertEqual(stats0['conform_packets'], 0)
590                 self.assertEqual(stats0['exceed_packets'], 0)
591                 self.assertEqual(stats0['violate_packets'], 0)
592             else:
593                 # Second pass: both workers should have policed equal amounts
594                 self.assertGreater(stats1['conform_packets'], 0)
595                 self.assertEqual(stats1['exceed_packets'], 0)
596                 self.assertGreater(stats1['violate_packets'], 0)
597
598                 self.assertGreater(stats0['conform_packets'], 0)
599                 self.assertEqual(stats0['exceed_packets'], 0)
600                 self.assertGreater(stats0['violate_packets'], 0)
601
602                 self.assertEqual(stats0['conform_packets'] +
603                                  stats0['violate_packets'],
604                                  stats1['conform_packets'] +
605                                  stats1['violate_packets'])
606
607         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
608         policer.remove_vpp_config()
609
610
611 @tag_fixme_vpp_workers
612 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
613                               TemplateIpsec,
614                               IpsecTun4):
615     """ IPsec IPv4 Multi Tunnel interface """
616
617     encryption_type = ESP
618     tun4_encrypt_node_name = "esp4-encrypt-tun"
619     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
620
621     def setUp(self):
622         super(TestIpsec4MultiTunIfEsp, self).setUp()
623
624         self.tun_if = self.pg0
625
626         self.multi_params = []
627         self.pg0.generate_remote_hosts(10)
628         self.pg0.configure_ipv4_neighbors()
629
630         for ii in range(10):
631             p = copy.copy(self.ipv4_params)
632
633             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
634             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
635             p.scapy_tun_spi = p.scapy_tun_spi + ii
636             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
637             p.vpp_tun_spi = p.vpp_tun_spi + ii
638
639             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
640             p.scapy_tra_spi = p.scapy_tra_spi + ii
641             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
642             p.vpp_tra_spi = p.vpp_tra_spi + ii
643             p.tun_dst = self.pg0.remote_hosts[ii].ip4
644
645             self.multi_params.append(p)
646             self.config_network(p)
647             self.config_sa_tra(p)
648             self.config_protect(p)
649
650     def tearDown(self):
651         super(TestIpsec4MultiTunIfEsp, self).tearDown()
652
653     def test_tun_44(self):
654         """Multiple IPSEC tunnel interfaces """
655         for p in self.multi_params:
656             self.verify_tun_44(p, count=127)
657             self.assertEqual(p.tun_if.get_rx_stats(), 127)
658             self.assertEqual(p.tun_if.get_tx_stats(), 127)
659
660     def test_tun_rr_44(self):
661         """ Round-robin packets acrros multiple interface """
662         tx = []
663         for p in self.multi_params:
664             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
665                                             src=p.remote_tun_if_host,
666                                             dst=self.pg1.remote_ip4)
667         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
668
669         for rx, p in zip(rxs, self.multi_params):
670             self.verify_decrypted(p, [rx])
671
672         tx = []
673         for p in self.multi_params:
674             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
675                                     dst=p.remote_tun_if_host)
676         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
677
678         for rx, p in zip(rxs, self.multi_params):
679             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
680
681
682 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
683                             TemplateIpsec,
684                             IpsecTun4):
685     """ IPsec IPv4 Tunnel interface all Algos """
686
687     encryption_type = ESP
688     tun4_encrypt_node_name = "esp4-encrypt-tun"
689     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
690
691     def setUp(self):
692         super(TestIpsec4TunIfEspAll, self).setUp()
693
694         self.tun_if = self.pg0
695         p = self.ipv4_params
696
697         self.config_network(p)
698         self.config_sa_tra(p)
699         self.config_protect(p)
700
701     def tearDown(self):
702         p = self.ipv4_params
703         self.unconfig_protect(p)
704         self.unconfig_network(p)
705         self.unconfig_sa(p)
706
707         super(TestIpsec4TunIfEspAll, self).tearDown()
708
709     def rekey(self, p):
710         #
711         # change the key and the SPI
712         #
713         np = copy.copy(p)
714         p.crypt_key = b'X' + p.crypt_key[1:]
715         p.scapy_tun_spi += 1
716         p.scapy_tun_sa_id += 1
717         p.vpp_tun_spi += 1
718         p.vpp_tun_sa_id += 1
719         p.tun_if.local_spi = p.vpp_tun_spi
720         p.tun_if.remote_spi = p.scapy_tun_spi
721
722         config_tun_params(p, self.encryption_type, p.tun_if)
723
724         p.tun_sa_out = VppIpsecSA(self,
725                                   p.scapy_tun_sa_id,
726                                   p.scapy_tun_spi,
727                                   p.auth_algo_vpp_id,
728                                   p.auth_key,
729                                   p.crypt_algo_vpp_id,
730                                   p.crypt_key,
731                                   self.vpp_esp_protocol,
732                                   flags=p.flags,
733                                   salt=p.salt)
734         p.tun_sa_in = VppIpsecSA(self,
735                                  p.vpp_tun_sa_id,
736                                  p.vpp_tun_spi,
737                                  p.auth_algo_vpp_id,
738                                  p.auth_key,
739                                  p.crypt_algo_vpp_id,
740                                  p.crypt_key,
741                                  self.vpp_esp_protocol,
742                                  flags=p.flags,
743                                  salt=p.salt)
744         p.tun_sa_in.add_vpp_config()
745         p.tun_sa_out.add_vpp_config()
746
747         self.config_protect(p)
748         np.tun_sa_out.remove_vpp_config()
749         np.tun_sa_in.remove_vpp_config()
750         self.logger.info(self.vapi.cli("sh ipsec sa"))
751
752     def test_tun_44(self):
753         """IPSEC tunnel all algos """
754
755         # foreach VPP crypto engine
756         engines = ["ia32", "ipsecmb", "openssl"]
757
758         # foreach crypto algorithm
759         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
760                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
761                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
762                                 IPSEC_API_INTEG_ALG_NONE),
763                   'scapy-crypto': "AES-GCM",
764                   'scapy-integ': "NULL",
765                   'key': b"JPjyOWBeVEQiMe7h",
766                   'salt': 3333},
767                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
768                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
769                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
770                                 IPSEC_API_INTEG_ALG_NONE),
771                   'scapy-crypto': "AES-GCM",
772                   'scapy-integ': "NULL",
773                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
774                   'salt': 0},
775                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
776                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
777                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
778                                 IPSEC_API_INTEG_ALG_NONE),
779                   'scapy-crypto': "AES-GCM",
780                   'scapy-integ': "NULL",
781                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
782                   'salt': 9999},
783                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
784                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
785                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
786                                 IPSEC_API_INTEG_ALG_SHA1_96),
787                   'scapy-crypto': "AES-CBC",
788                   'scapy-integ': "HMAC-SHA1-96",
789                   'salt': 0,
790                   'key': b"JPjyOWBeVEQiMe7h"},
791                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
792                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
793                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
794                                 IPSEC_API_INTEG_ALG_SHA_512_256),
795                   'scapy-crypto': "AES-CBC",
796                   'scapy-integ': "SHA2-512-256",
797                   'salt': 0,
798                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
799                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
800                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
801                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
802                                 IPSEC_API_INTEG_ALG_SHA_256_128),
803                   'scapy-crypto': "AES-CBC",
804                   'scapy-integ': "SHA2-256-128",
805                   'salt': 0,
806                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
807                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
808                                  IPSEC_API_CRYPTO_ALG_NONE),
809                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
810                                 IPSEC_API_INTEG_ALG_SHA1_96),
811                   'scapy-crypto': "NULL",
812                   'scapy-integ': "HMAC-SHA1-96",
813                   'salt': 0,
814                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
815
816         for engine in engines:
817             self.vapi.cli("set crypto handler all %s" % engine)
818
819             #
820             # loop through each of the algorithms
821             #
822             for algo in algos:
823                 # with self.subTest(algo=algo['scapy']):
824
825                 p = self.ipv4_params
826                 p.auth_algo_vpp_id = algo['vpp-integ']
827                 p.crypt_algo_vpp_id = algo['vpp-crypto']
828                 p.crypt_algo = algo['scapy-crypto']
829                 p.auth_algo = algo['scapy-integ']
830                 p.crypt_key = algo['key']
831                 p.salt = algo['salt']
832
833                 #
834                 # rekey the tunnel
835                 #
836                 self.rekey(p)
837                 self.verify_tun_44(p, count=127)
838
839
840 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
841                                TemplateIpsec,
842                                IpsecTun4):
843     """ IPsec IPv4 Tunnel interface no Algos """
844
845     encryption_type = ESP
846     tun4_encrypt_node_name = "esp4-encrypt-tun"
847     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
848
849     def setUp(self):
850         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
851
852         self.tun_if = self.pg0
853         p = self.ipv4_params
854         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
855                               IPSEC_API_INTEG_ALG_NONE)
856         p.auth_algo = 'NULL'
857         p.auth_key = []
858
859         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
860                                IPSEC_API_CRYPTO_ALG_NONE)
861         p.crypt_algo = 'NULL'
862         p.crypt_key = []
863
864     def tearDown(self):
865         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
866
867     def test_tun_44(self):
868         """ IPSec SA with NULL algos """
869         p = self.ipv4_params
870
871         self.config_network(p)
872         self.config_sa_tra(p)
873         self.config_protect(p)
874
875         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
876                            dst=p.remote_tun_if_host)
877         self.send_and_assert_no_replies(self.pg1, tx)
878
879         self.unconfig_protect(p)
880         self.unconfig_sa(p)
881         self.unconfig_network(p)
882
883
884 @tag_fixme_vpp_workers
885 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
886                               TemplateIpsec,
887                               IpsecTun6):
888     """ IPsec IPv6 Multi Tunnel interface """
889
890     encryption_type = ESP
891     tun6_encrypt_node_name = "esp6-encrypt-tun"
892     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
893
894     def setUp(self):
895         super(TestIpsec6MultiTunIfEsp, self).setUp()
896
897         self.tun_if = self.pg0
898
899         self.multi_params = []
900         self.pg0.generate_remote_hosts(10)
901         self.pg0.configure_ipv6_neighbors()
902
903         for ii in range(10):
904             p = copy.copy(self.ipv6_params)
905
906             p.remote_tun_if_host = "1111::%d" % (ii + 1)
907             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
908             p.scapy_tun_spi = p.scapy_tun_spi + ii
909             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
910             p.vpp_tun_spi = p.vpp_tun_spi + ii
911
912             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
913             p.scapy_tra_spi = p.scapy_tra_spi + ii
914             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
915             p.vpp_tra_spi = p.vpp_tra_spi + ii
916             p.tun_dst = self.pg0.remote_hosts[ii].ip6
917
918             self.multi_params.append(p)
919             self.config_network(p)
920             self.config_sa_tra(p)
921             self.config_protect(p)
922
923     def tearDown(self):
924         super(TestIpsec6MultiTunIfEsp, self).tearDown()
925
926     def test_tun_66(self):
927         """Multiple IPSEC tunnel interfaces """
928         for p in self.multi_params:
929             self.verify_tun_66(p, count=127)
930             self.assertEqual(p.tun_if.get_rx_stats(), 127)
931             self.assertEqual(p.tun_if.get_tx_stats(), 127)
932
933
934 class TestIpsecGreTebIfEsp(TemplateIpsec,
935                            IpsecTun4Tests):
936     """ Ipsec GRE TEB ESP - TUN tests """
937     tun4_encrypt_node_name = "esp4-encrypt-tun"
938     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
939     encryption_type = ESP
940     omac = "00:11:22:33:44:55"
941
942     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
943                          payload_size=100):
944         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
945                 sa.encrypt(IP(src=self.pg0.remote_ip4,
946                               dst=self.pg0.local_ip4) /
947                            GRE() /
948                            Ether(dst=self.omac) /
949                            IP(src="1.1.1.1", dst="1.1.1.2") /
950                            UDP(sport=1144, dport=2233) /
951                            Raw(b'X' * payload_size))
952                 for i in range(count)]
953
954     def gen_pkts(self, sw_intf, src, dst, count=1,
955                  payload_size=100):
956         return [Ether(dst=self.omac) /
957                 IP(src="1.1.1.1", dst="1.1.1.2") /
958                 UDP(sport=1144, dport=2233) /
959                 Raw(b'X' * payload_size)
960                 for i in range(count)]
961
962     def verify_decrypted(self, p, rxs):
963         for rx in rxs:
964             self.assert_equal(rx[Ether].dst, self.omac)
965             self.assert_equal(rx[IP].dst, "1.1.1.2")
966
967     def verify_encrypted(self, p, sa, rxs):
968         for rx in rxs:
969             try:
970                 pkt = sa.decrypt(rx[IP])
971                 if not pkt.haslayer(IP):
972                     pkt = IP(pkt[Raw].load)
973                 self.assert_packet_checksums_valid(pkt)
974                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
975                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
976                 self.assertTrue(pkt.haslayer(GRE))
977                 e = pkt[Ether]
978                 self.assertEqual(e[Ether].dst, self.omac)
979                 self.assertEqual(e[IP].dst, "1.1.1.2")
980             except (IndexError, AssertionError):
981                 self.logger.debug(ppp("Unexpected packet:", rx))
982                 try:
983                     self.logger.debug(ppp("Decrypted packet:", pkt))
984                 except:
985                     pass
986                 raise
987
988     def setUp(self):
989         super(TestIpsecGreTebIfEsp, self).setUp()
990
991         self.tun_if = self.pg0
992
993         p = self.ipv4_params
994
995         bd1 = VppBridgeDomain(self, 1)
996         bd1.add_vpp_config()
997
998         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
999                                   p.auth_algo_vpp_id, p.auth_key,
1000                                   p.crypt_algo_vpp_id, p.crypt_key,
1001                                   self.vpp_esp_protocol,
1002                                   self.pg0.local_ip4,
1003                                   self.pg0.remote_ip4)
1004         p.tun_sa_out.add_vpp_config()
1005
1006         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1007                                  p.auth_algo_vpp_id, p.auth_key,
1008                                  p.crypt_algo_vpp_id, p.crypt_key,
1009                                  self.vpp_esp_protocol,
1010                                  self.pg0.remote_ip4,
1011                                  self.pg0.local_ip4)
1012         p.tun_sa_in.add_vpp_config()
1013
1014         p.tun_if = VppGreInterface(self,
1015                                    self.pg0.local_ip4,
1016                                    self.pg0.remote_ip4,
1017                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1018                                          GRE_API_TUNNEL_TYPE_TEB))
1019         p.tun_if.add_vpp_config()
1020
1021         p.tun_protect = VppIpsecTunProtect(self,
1022                                            p.tun_if,
1023                                            p.tun_sa_out,
1024                                            [p.tun_sa_in])
1025
1026         p.tun_protect.add_vpp_config()
1027
1028         p.tun_if.admin_up()
1029         p.tun_if.config_ip4()
1030         config_tun_params(p, self.encryption_type, p.tun_if)
1031
1032         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1033         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1034
1035         self.vapi.cli("clear ipsec sa")
1036         self.vapi.cli("sh adj")
1037         self.vapi.cli("sh ipsec tun")
1038
1039     def tearDown(self):
1040         p = self.ipv4_params
1041         p.tun_if.unconfig_ip4()
1042         super(TestIpsecGreTebIfEsp, self).tearDown()
1043
1044
1045 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
1046                                IpsecTun4Tests):
1047     """ Ipsec GRE TEB ESP - TUN tests """
1048     tun4_encrypt_node_name = "esp4-encrypt-tun"
1049     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1050     encryption_type = ESP
1051     omac = "00:11:22:33:44:55"
1052
1053     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1054                          payload_size=100):
1055         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1056                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1057                               dst=self.pg0.local_ip4) /
1058                            GRE() /
1059                            Ether(dst=self.omac) /
1060                            IP(src="1.1.1.1", dst="1.1.1.2") /
1061                            UDP(sport=1144, dport=2233) /
1062                            Raw(b'X' * payload_size))
1063                 for i in range(count)]
1064
1065     def gen_pkts(self, sw_intf, src, dst, count=1,
1066                  payload_size=100):
1067         return [Ether(dst=self.omac) /
1068                 Dot1Q(vlan=11) /
1069                 IP(src="1.1.1.1", dst="1.1.1.2") /
1070                 UDP(sport=1144, dport=2233) /
1071                 Raw(b'X' * payload_size)
1072                 for i in range(count)]
1073
1074     def verify_decrypted(self, p, rxs):
1075         for rx in rxs:
1076             self.assert_equal(rx[Ether].dst, self.omac)
1077             self.assert_equal(rx[Dot1Q].vlan, 11)
1078             self.assert_equal(rx[IP].dst, "1.1.1.2")
1079
1080     def verify_encrypted(self, p, sa, rxs):
1081         for rx in rxs:
1082             try:
1083                 pkt = sa.decrypt(rx[IP])
1084                 if not pkt.haslayer(IP):
1085                     pkt = IP(pkt[Raw].load)
1086                 self.assert_packet_checksums_valid(pkt)
1087                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1088                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1089                 self.assertTrue(pkt.haslayer(GRE))
1090                 e = pkt[Ether]
1091                 self.assertEqual(e[Ether].dst, self.omac)
1092                 self.assertFalse(e.haslayer(Dot1Q))
1093                 self.assertEqual(e[IP].dst, "1.1.1.2")
1094             except (IndexError, AssertionError):
1095                 self.logger.debug(ppp("Unexpected packet:", rx))
1096                 try:
1097                     self.logger.debug(ppp("Decrypted packet:", pkt))
1098                 except:
1099                     pass
1100                 raise
1101
1102     def setUp(self):
1103         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1104
1105         self.tun_if = self.pg0
1106
1107         p = self.ipv4_params
1108
1109         bd1 = VppBridgeDomain(self, 1)
1110         bd1.add_vpp_config()
1111
1112         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1113         self.vapi.l2_interface_vlan_tag_rewrite(
1114             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
1115             push_dot1q=11)
1116         self.pg1_11.admin_up()
1117
1118         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1119                                   p.auth_algo_vpp_id, p.auth_key,
1120                                   p.crypt_algo_vpp_id, p.crypt_key,
1121                                   self.vpp_esp_protocol,
1122                                   self.pg0.local_ip4,
1123                                   self.pg0.remote_ip4)
1124         p.tun_sa_out.add_vpp_config()
1125
1126         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1127                                  p.auth_algo_vpp_id, p.auth_key,
1128                                  p.crypt_algo_vpp_id, p.crypt_key,
1129                                  self.vpp_esp_protocol,
1130                                  self.pg0.remote_ip4,
1131                                  self.pg0.local_ip4)
1132         p.tun_sa_in.add_vpp_config()
1133
1134         p.tun_if = VppGreInterface(self,
1135                                    self.pg0.local_ip4,
1136                                    self.pg0.remote_ip4,
1137                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1138                                          GRE_API_TUNNEL_TYPE_TEB))
1139         p.tun_if.add_vpp_config()
1140
1141         p.tun_protect = VppIpsecTunProtect(self,
1142                                            p.tun_if,
1143                                            p.tun_sa_out,
1144                                            [p.tun_sa_in])
1145
1146         p.tun_protect.add_vpp_config()
1147
1148         p.tun_if.admin_up()
1149         p.tun_if.config_ip4()
1150         config_tun_params(p, self.encryption_type, p.tun_if)
1151
1152         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1153         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1154
1155         self.vapi.cli("clear ipsec sa")
1156
1157     def tearDown(self):
1158         p = self.ipv4_params
1159         p.tun_if.unconfig_ip4()
1160         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1161         self.pg1_11.admin_down()
1162         self.pg1_11.remove_vpp_config()
1163
1164
1165 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1166                               IpsecTun4Tests):
1167     """ Ipsec GRE TEB ESP - Tra tests """
1168     tun4_encrypt_node_name = "esp4-encrypt-tun"
1169     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1170     encryption_type = ESP
1171     omac = "00:11:22:33:44:55"
1172
1173     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1174                          payload_size=100):
1175         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1176                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1177                               dst=self.pg0.local_ip4) /
1178                            GRE() /
1179                            Ether(dst=self.omac) /
1180                            IP(src="1.1.1.1", dst="1.1.1.2") /
1181                            UDP(sport=1144, dport=2233) /
1182                            Raw(b'X' * payload_size))
1183                 for i in range(count)]
1184
1185     def gen_pkts(self, sw_intf, src, dst, count=1,
1186                  payload_size=100):
1187         return [Ether(dst=self.omac) /
1188                 IP(src="1.1.1.1", dst="1.1.1.2") /
1189                 UDP(sport=1144, dport=2233) /
1190                 Raw(b'X' * payload_size)
1191                 for i in range(count)]
1192
1193     def verify_decrypted(self, p, rxs):
1194         for rx in rxs:
1195             self.assert_equal(rx[Ether].dst, self.omac)
1196             self.assert_equal(rx[IP].dst, "1.1.1.2")
1197
1198     def verify_encrypted(self, p, sa, rxs):
1199         for rx in rxs:
1200             try:
1201                 pkt = sa.decrypt(rx[IP])
1202                 if not pkt.haslayer(IP):
1203                     pkt = IP(pkt[Raw].load)
1204                 self.assert_packet_checksums_valid(pkt)
1205                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1206                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1207                 self.assertTrue(pkt.haslayer(GRE))
1208                 e = pkt[Ether]
1209                 self.assertEqual(e[Ether].dst, self.omac)
1210                 self.assertEqual(e[IP].dst, "1.1.1.2")
1211             except (IndexError, AssertionError):
1212                 self.logger.debug(ppp("Unexpected packet:", rx))
1213                 try:
1214                     self.logger.debug(ppp("Decrypted packet:", pkt))
1215                 except:
1216                     pass
1217                 raise
1218
1219     def setUp(self):
1220         super(TestIpsecGreTebIfEspTra, self).setUp()
1221
1222         self.tun_if = self.pg0
1223
1224         p = self.ipv4_params
1225
1226         bd1 = VppBridgeDomain(self, 1)
1227         bd1.add_vpp_config()
1228
1229         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1230                                   p.auth_algo_vpp_id, p.auth_key,
1231                                   p.crypt_algo_vpp_id, p.crypt_key,
1232                                   self.vpp_esp_protocol)
1233         p.tun_sa_out.add_vpp_config()
1234
1235         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1236                                  p.auth_algo_vpp_id, p.auth_key,
1237                                  p.crypt_algo_vpp_id, p.crypt_key,
1238                                  self.vpp_esp_protocol)
1239         p.tun_sa_in.add_vpp_config()
1240
1241         p.tun_if = VppGreInterface(self,
1242                                    self.pg0.local_ip4,
1243                                    self.pg0.remote_ip4,
1244                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1245                                          GRE_API_TUNNEL_TYPE_TEB))
1246         p.tun_if.add_vpp_config()
1247
1248         p.tun_protect = VppIpsecTunProtect(self,
1249                                            p.tun_if,
1250                                            p.tun_sa_out,
1251                                            [p.tun_sa_in])
1252
1253         p.tun_protect.add_vpp_config()
1254
1255         p.tun_if.admin_up()
1256         p.tun_if.config_ip4()
1257         config_tra_params(p, self.encryption_type, p.tun_if)
1258
1259         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1260         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1261
1262         self.vapi.cli("clear ipsec sa")
1263
1264     def tearDown(self):
1265         p = self.ipv4_params
1266         p.tun_if.unconfig_ip4()
1267         super(TestIpsecGreTebIfEspTra, self).tearDown()
1268
1269
1270 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1271                                  IpsecTun4Tests):
1272     """ Ipsec GRE TEB UDP ESP - Tra tests """
1273     tun4_encrypt_node_name = "esp4-encrypt-tun"
1274     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1275     encryption_type = ESP
1276     omac = "00:11:22:33:44:55"
1277
1278     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1279                          payload_size=100):
1280         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1281                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1282                               dst=self.pg0.local_ip4) /
1283                            GRE() /
1284                            Ether(dst=self.omac) /
1285                            IP(src="1.1.1.1", dst="1.1.1.2") /
1286                            UDP(sport=1144, dport=2233) /
1287                            Raw(b'X' * payload_size))
1288                 for i in range(count)]
1289
1290     def gen_pkts(self, sw_intf, src, dst, count=1,
1291                  payload_size=100):
1292         return [Ether(dst=self.omac) /
1293                 IP(src="1.1.1.1", dst="1.1.1.2") /
1294                 UDP(sport=1144, dport=2233) /
1295                 Raw(b'X' * payload_size)
1296                 for i in range(count)]
1297
1298     def verify_decrypted(self, p, rxs):
1299         for rx in rxs:
1300             self.assert_equal(rx[Ether].dst, self.omac)
1301             self.assert_equal(rx[IP].dst, "1.1.1.2")
1302
1303     def verify_encrypted(self, p, sa, rxs):
1304         for rx in rxs:
1305             self.assertTrue(rx.haslayer(UDP))
1306             self.assertEqual(rx[UDP].dport, 4545)
1307             self.assertEqual(rx[UDP].sport, 5454)
1308             try:
1309                 pkt = sa.decrypt(rx[IP])
1310                 if not pkt.haslayer(IP):
1311                     pkt = IP(pkt[Raw].load)
1312                 self.assert_packet_checksums_valid(pkt)
1313                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1314                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1315                 self.assertTrue(pkt.haslayer(GRE))
1316                 e = pkt[Ether]
1317                 self.assertEqual(e[Ether].dst, self.omac)
1318                 self.assertEqual(e[IP].dst, "1.1.1.2")
1319             except (IndexError, AssertionError):
1320                 self.logger.debug(ppp("Unexpected packet:", rx))
1321                 try:
1322                     self.logger.debug(ppp("Decrypted packet:", pkt))
1323                 except:
1324                     pass
1325                 raise
1326
1327     def setUp(self):
1328         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1329
1330         self.tun_if = self.pg0
1331
1332         p = self.ipv4_params
1333         p = self.ipv4_params
1334         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1335                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1336         p.nat_header = UDP(sport=5454, dport=4545)
1337
1338         bd1 = VppBridgeDomain(self, 1)
1339         bd1.add_vpp_config()
1340
1341         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1342                                   p.auth_algo_vpp_id, p.auth_key,
1343                                   p.crypt_algo_vpp_id, p.crypt_key,
1344                                   self.vpp_esp_protocol,
1345                                   flags=p.flags,
1346                                   udp_src=5454,
1347                                   udp_dst=4545)
1348         p.tun_sa_out.add_vpp_config()
1349
1350         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1351                                  p.auth_algo_vpp_id, p.auth_key,
1352                                  p.crypt_algo_vpp_id, p.crypt_key,
1353                                  self.vpp_esp_protocol,
1354                                  flags=(p.flags |
1355                                         VppEnum.vl_api_ipsec_sad_flags_t.
1356                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1357                                  udp_src=5454,
1358                                  udp_dst=4545)
1359         p.tun_sa_in.add_vpp_config()
1360
1361         p.tun_if = VppGreInterface(self,
1362                                    self.pg0.local_ip4,
1363                                    self.pg0.remote_ip4,
1364                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1365                                          GRE_API_TUNNEL_TYPE_TEB))
1366         p.tun_if.add_vpp_config()
1367
1368         p.tun_protect = VppIpsecTunProtect(self,
1369                                            p.tun_if,
1370                                            p.tun_sa_out,
1371                                            [p.tun_sa_in])
1372
1373         p.tun_protect.add_vpp_config()
1374
1375         p.tun_if.admin_up()
1376         p.tun_if.config_ip4()
1377         config_tra_params(p, self.encryption_type, p.tun_if)
1378
1379         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1380         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1381
1382         self.vapi.cli("clear ipsec sa")
1383         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1384
1385     def tearDown(self):
1386         p = self.ipv4_params
1387         p.tun_if.unconfig_ip4()
1388         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1389
1390
1391 class TestIpsecGreIfEsp(TemplateIpsec,
1392                         IpsecTun4Tests):
1393     """ Ipsec GRE ESP - TUN tests """
1394     tun4_encrypt_node_name = "esp4-encrypt-tun"
1395     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1396     encryption_type = ESP
1397
1398     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1399                          payload_size=100):
1400         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1401                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1402                               dst=self.pg0.local_ip4) /
1403                            GRE() /
1404                            IP(src=self.pg1.local_ip4,
1405                               dst=self.pg1.remote_ip4) /
1406                            UDP(sport=1144, dport=2233) /
1407                            Raw(b'X' * payload_size))
1408                 for i in range(count)]
1409
1410     def gen_pkts(self, sw_intf, src, dst, count=1,
1411                  payload_size=100):
1412         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1413                 IP(src="1.1.1.1", dst="1.1.1.2") /
1414                 UDP(sport=1144, dport=2233) /
1415                 Raw(b'X' * payload_size)
1416                 for i in range(count)]
1417
1418     def verify_decrypted(self, p, rxs):
1419         for rx in rxs:
1420             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1421             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1422
1423     def verify_encrypted(self, p, sa, rxs):
1424         for rx in rxs:
1425             try:
1426                 pkt = sa.decrypt(rx[IP])
1427                 if not pkt.haslayer(IP):
1428                     pkt = IP(pkt[Raw].load)
1429                 self.assert_packet_checksums_valid(pkt)
1430                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1431                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1432                 self.assertTrue(pkt.haslayer(GRE))
1433                 e = pkt[GRE]
1434                 self.assertEqual(e[IP].dst, "1.1.1.2")
1435             except (IndexError, AssertionError):
1436                 self.logger.debug(ppp("Unexpected packet:", rx))
1437                 try:
1438                     self.logger.debug(ppp("Decrypted packet:", pkt))
1439                 except:
1440                     pass
1441                 raise
1442
1443     def setUp(self):
1444         super(TestIpsecGreIfEsp, self).setUp()
1445
1446         self.tun_if = self.pg0
1447
1448         p = self.ipv4_params
1449
1450         bd1 = VppBridgeDomain(self, 1)
1451         bd1.add_vpp_config()
1452
1453         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1454                                   p.auth_algo_vpp_id, p.auth_key,
1455                                   p.crypt_algo_vpp_id, p.crypt_key,
1456                                   self.vpp_esp_protocol,
1457                                   self.pg0.local_ip4,
1458                                   self.pg0.remote_ip4)
1459         p.tun_sa_out.add_vpp_config()
1460
1461         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1462                                  p.auth_algo_vpp_id, p.auth_key,
1463                                  p.crypt_algo_vpp_id, p.crypt_key,
1464                                  self.vpp_esp_protocol,
1465                                  self.pg0.remote_ip4,
1466                                  self.pg0.local_ip4)
1467         p.tun_sa_in.add_vpp_config()
1468
1469         p.tun_if = VppGreInterface(self,
1470                                    self.pg0.local_ip4,
1471                                    self.pg0.remote_ip4)
1472         p.tun_if.add_vpp_config()
1473
1474         p.tun_protect = VppIpsecTunProtect(self,
1475                                            p.tun_if,
1476                                            p.tun_sa_out,
1477                                            [p.tun_sa_in])
1478         p.tun_protect.add_vpp_config()
1479
1480         p.tun_if.admin_up()
1481         p.tun_if.config_ip4()
1482         config_tun_params(p, self.encryption_type, p.tun_if)
1483
1484         VppIpRoute(self, "1.1.1.2", 32,
1485                    [VppRoutePath(p.tun_if.remote_ip4,
1486                                  0xffffffff)]).add_vpp_config()
1487
1488     def tearDown(self):
1489         p = self.ipv4_params
1490         p.tun_if.unconfig_ip4()
1491         super(TestIpsecGreIfEsp, self).tearDown()
1492
1493
1494 class TestIpsecGreIfEspTra(TemplateIpsec,
1495                            IpsecTun4Tests):
1496     """ Ipsec GRE ESP - TRA tests """
1497     tun4_encrypt_node_name = "esp4-encrypt-tun"
1498     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1499     encryption_type = ESP
1500
1501     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1502                          payload_size=100):
1503         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1504                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1505                               dst=self.pg0.local_ip4) /
1506                            GRE() /
1507                            IP(src=self.pg1.local_ip4,
1508                               dst=self.pg1.remote_ip4) /
1509                            UDP(sport=1144, dport=2233) /
1510                            Raw(b'X' * payload_size))
1511                 for i in range(count)]
1512
1513     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1514                                 payload_size=100):
1515         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1516                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1517                               dst=self.pg0.local_ip4) /
1518                            GRE() /
1519                            UDP(sport=1144, dport=2233) /
1520                            Raw(b'X' * payload_size))
1521                 for i in range(count)]
1522
1523     def gen_pkts(self, sw_intf, src, dst, count=1,
1524                  payload_size=100):
1525         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1526                 IP(src="1.1.1.1", dst="1.1.1.2") /
1527                 UDP(sport=1144, dport=2233) /
1528                 Raw(b'X' * payload_size)
1529                 for i in range(count)]
1530
1531     def verify_decrypted(self, p, rxs):
1532         for rx in rxs:
1533             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1534             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1535
1536     def verify_encrypted(self, p, sa, rxs):
1537         for rx in rxs:
1538             try:
1539                 pkt = sa.decrypt(rx[IP])
1540                 if not pkt.haslayer(IP):
1541                     pkt = IP(pkt[Raw].load)
1542                 self.assert_packet_checksums_valid(pkt)
1543                 self.assertTrue(pkt.haslayer(GRE))
1544                 e = pkt[GRE]
1545                 self.assertEqual(e[IP].dst, "1.1.1.2")
1546             except (IndexError, AssertionError):
1547                 self.logger.debug(ppp("Unexpected packet:", rx))
1548                 try:
1549                     self.logger.debug(ppp("Decrypted packet:", pkt))
1550                 except:
1551                     pass
1552                 raise
1553
1554     def setUp(self):
1555         super(TestIpsecGreIfEspTra, self).setUp()
1556
1557         self.tun_if = self.pg0
1558
1559         p = self.ipv4_params
1560
1561         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1562                                   p.auth_algo_vpp_id, p.auth_key,
1563                                   p.crypt_algo_vpp_id, p.crypt_key,
1564                                   self.vpp_esp_protocol)
1565         p.tun_sa_out.add_vpp_config()
1566
1567         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1568                                  p.auth_algo_vpp_id, p.auth_key,
1569                                  p.crypt_algo_vpp_id, p.crypt_key,
1570                                  self.vpp_esp_protocol)
1571         p.tun_sa_in.add_vpp_config()
1572
1573         p.tun_if = VppGreInterface(self,
1574                                    self.pg0.local_ip4,
1575                                    self.pg0.remote_ip4)
1576         p.tun_if.add_vpp_config()
1577
1578         p.tun_protect = VppIpsecTunProtect(self,
1579                                            p.tun_if,
1580                                            p.tun_sa_out,
1581                                            [p.tun_sa_in])
1582         p.tun_protect.add_vpp_config()
1583
1584         p.tun_if.admin_up()
1585         p.tun_if.config_ip4()
1586         config_tra_params(p, self.encryption_type, p.tun_if)
1587
1588         VppIpRoute(self, "1.1.1.2", 32,
1589                    [VppRoutePath(p.tun_if.remote_ip4,
1590                                  0xffffffff)]).add_vpp_config()
1591
1592     def tearDown(self):
1593         p = self.ipv4_params
1594         p.tun_if.unconfig_ip4()
1595         super(TestIpsecGreIfEspTra, self).tearDown()
1596
1597     def test_gre_non_ip(self):
1598         p = self.ipv4_params
1599         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1600                                           src=p.remote_tun_if_host,
1601                                           dst=self.pg1.remote_ip6)
1602         self.send_and_assert_no_replies(self.tun_if, tx)
1603         node_name = ('/err/%s/unsupported payload' %
1604                      self.tun4_decrypt_node_name[0])
1605         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1606
1607
1608 class TestIpsecGre6IfEspTra(TemplateIpsec,
1609                             IpsecTun6Tests):
1610     """ Ipsec GRE ESP - TRA tests """
1611     tun6_encrypt_node_name = "esp6-encrypt-tun"
1612     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1613     encryption_type = ESP
1614
1615     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1616                           payload_size=100):
1617         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1618                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1619                                 dst=self.pg0.local_ip6) /
1620                            GRE() /
1621                            IPv6(src=self.pg1.local_ip6,
1622                                 dst=self.pg1.remote_ip6) /
1623                            UDP(sport=1144, dport=2233) /
1624                            Raw(b'X' * payload_size))
1625                 for i in range(count)]
1626
1627     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1628                   payload_size=100):
1629         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1630                 IPv6(src="1::1", dst="1::2") /
1631                 UDP(sport=1144, dport=2233) /
1632                 Raw(b'X' * payload_size)
1633                 for i in range(count)]
1634
1635     def verify_decrypted6(self, p, rxs):
1636         for rx in rxs:
1637             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1638             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1639
1640     def verify_encrypted6(self, p, sa, rxs):
1641         for rx in rxs:
1642             try:
1643                 pkt = sa.decrypt(rx[IPv6])
1644                 if not pkt.haslayer(IPv6):
1645                     pkt = IPv6(pkt[Raw].load)
1646                 self.assert_packet_checksums_valid(pkt)
1647                 self.assertTrue(pkt.haslayer(GRE))
1648                 e = pkt[GRE]
1649                 self.assertEqual(e[IPv6].dst, "1::2")
1650             except (IndexError, AssertionError):
1651                 self.logger.debug(ppp("Unexpected packet:", rx))
1652                 try:
1653                     self.logger.debug(ppp("Decrypted packet:", pkt))
1654                 except:
1655                     pass
1656                 raise
1657
1658     def setUp(self):
1659         super(TestIpsecGre6IfEspTra, self).setUp()
1660
1661         self.tun_if = self.pg0
1662
1663         p = self.ipv6_params
1664
1665         bd1 = VppBridgeDomain(self, 1)
1666         bd1.add_vpp_config()
1667
1668         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1669                                   p.auth_algo_vpp_id, p.auth_key,
1670                                   p.crypt_algo_vpp_id, p.crypt_key,
1671                                   self.vpp_esp_protocol)
1672         p.tun_sa_out.add_vpp_config()
1673
1674         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1675                                  p.auth_algo_vpp_id, p.auth_key,
1676                                  p.crypt_algo_vpp_id, p.crypt_key,
1677                                  self.vpp_esp_protocol)
1678         p.tun_sa_in.add_vpp_config()
1679
1680         p.tun_if = VppGreInterface(self,
1681                                    self.pg0.local_ip6,
1682                                    self.pg0.remote_ip6)
1683         p.tun_if.add_vpp_config()
1684
1685         p.tun_protect = VppIpsecTunProtect(self,
1686                                            p.tun_if,
1687                                            p.tun_sa_out,
1688                                            [p.tun_sa_in])
1689         p.tun_protect.add_vpp_config()
1690
1691         p.tun_if.admin_up()
1692         p.tun_if.config_ip6()
1693         config_tra_params(p, self.encryption_type, p.tun_if)
1694
1695         r = VppIpRoute(self, "1::2", 128,
1696                        [VppRoutePath(p.tun_if.remote_ip6,
1697                                      0xffffffff,
1698                                      proto=DpoProto.DPO_PROTO_IP6)])
1699         r.add_vpp_config()
1700
1701     def tearDown(self):
1702         p = self.ipv6_params
1703         p.tun_if.unconfig_ip6()
1704         super(TestIpsecGre6IfEspTra, self).tearDown()
1705
1706
1707 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1708     """ Ipsec mGRE ESP v4 TRA tests """
1709     tun4_encrypt_node_name = "esp4-encrypt-tun"
1710     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1711     encryption_type = ESP
1712
1713     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1714                          payload_size=100):
1715         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1716                 sa.encrypt(IP(src=p.tun_dst,
1717                               dst=self.pg0.local_ip4) /
1718                            GRE() /
1719                            IP(src=self.pg1.local_ip4,
1720                               dst=self.pg1.remote_ip4) /
1721                            UDP(sport=1144, dport=2233) /
1722                            Raw(b'X' * payload_size))
1723                 for i in range(count)]
1724
1725     def gen_pkts(self, sw_intf, src, dst, count=1,
1726                  payload_size=100):
1727         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1728                 IP(src="1.1.1.1", dst=dst) /
1729                 UDP(sport=1144, dport=2233) /
1730                 Raw(b'X' * payload_size)
1731                 for i in range(count)]
1732
1733     def verify_decrypted(self, p, rxs):
1734         for rx in rxs:
1735             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1736             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1737
1738     def verify_encrypted(self, p, sa, rxs):
1739         for rx in rxs:
1740             try:
1741                 pkt = sa.decrypt(rx[IP])
1742                 if not pkt.haslayer(IP):
1743                     pkt = IP(pkt[Raw].load)
1744                 self.assert_packet_checksums_valid(pkt)
1745                 self.assertTrue(pkt.haslayer(GRE))
1746                 e = pkt[GRE]
1747                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1748             except (IndexError, AssertionError):
1749                 self.logger.debug(ppp("Unexpected packet:", rx))
1750                 try:
1751                     self.logger.debug(ppp("Decrypted packet:", pkt))
1752                 except:
1753                     pass
1754                 raise
1755
1756     def setUp(self):
1757         super(TestIpsecMGreIfEspTra4, self).setUp()
1758
1759         N_NHS = 16
1760         self.tun_if = self.pg0
1761         p = self.ipv4_params
1762         p.tun_if = VppGreInterface(self,
1763                                    self.pg0.local_ip4,
1764                                    "0.0.0.0",
1765                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1766                                          TUNNEL_API_MODE_MP))
1767         p.tun_if.add_vpp_config()
1768         p.tun_if.admin_up()
1769         p.tun_if.config_ip4()
1770         p.tun_if.generate_remote_hosts(N_NHS)
1771         self.pg0.generate_remote_hosts(N_NHS)
1772         self.pg0.configure_ipv4_neighbors()
1773
1774         # setup some SAs for several next-hops on the interface
1775         self.multi_params = []
1776
1777         for ii in range(N_NHS):
1778             p = copy.copy(self.ipv4_params)
1779
1780             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1781             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1782             p.scapy_tun_spi = p.scapy_tun_spi + ii
1783             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1784             p.vpp_tun_spi = p.vpp_tun_spi + ii
1785
1786             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1787             p.scapy_tra_spi = p.scapy_tra_spi + ii
1788             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1789             p.vpp_tra_spi = p.vpp_tra_spi + ii
1790             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1791                                       p.auth_algo_vpp_id, p.auth_key,
1792                                       p.crypt_algo_vpp_id, p.crypt_key,
1793                                       self.vpp_esp_protocol)
1794             p.tun_sa_out.add_vpp_config()
1795
1796             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1797                                      p.auth_algo_vpp_id, p.auth_key,
1798                                      p.crypt_algo_vpp_id, p.crypt_key,
1799                                      self.vpp_esp_protocol)
1800             p.tun_sa_in.add_vpp_config()
1801
1802             p.tun_protect = VppIpsecTunProtect(
1803                 self,
1804                 p.tun_if,
1805                 p.tun_sa_out,
1806                 [p.tun_sa_in],
1807                 nh=p.tun_if.remote_hosts[ii].ip4)
1808             p.tun_protect.add_vpp_config()
1809             config_tra_params(p, self.encryption_type, p.tun_if)
1810             self.multi_params.append(p)
1811
1812             VppIpRoute(self, p.remote_tun_if_host, 32,
1813                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1814                                      p.tun_if.sw_if_index)]).add_vpp_config()
1815
1816             # in this v4 variant add the teibs after the protect
1817             p.teib = VppTeib(self, p.tun_if,
1818                              p.tun_if.remote_hosts[ii].ip4,
1819                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1820             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1821         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1822
1823     def tearDown(self):
1824         p = self.ipv4_params
1825         p.tun_if.unconfig_ip4()
1826         super(TestIpsecMGreIfEspTra4, self).tearDown()
1827
1828     def test_tun_44(self):
1829         """mGRE IPSEC 44"""
1830         N_PKTS = 63
1831         for p in self.multi_params:
1832             self.verify_tun_44(p, count=N_PKTS)
1833             p.teib.remove_vpp_config()
1834             self.verify_tun_dropped_44(p, count=N_PKTS)
1835             p.teib.add_vpp_config()
1836             self.verify_tun_44(p, count=N_PKTS)
1837
1838
1839 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1840     """ Ipsec mGRE ESP v6 TRA tests """
1841     tun6_encrypt_node_name = "esp6-encrypt-tun"
1842     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1843     encryption_type = ESP
1844
1845     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1846                           payload_size=100):
1847         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1848                 sa.encrypt(IPv6(src=p.tun_dst,
1849                                 dst=self.pg0.local_ip6) /
1850                            GRE() /
1851                            IPv6(src=self.pg1.local_ip6,
1852                                 dst=self.pg1.remote_ip6) /
1853                            UDP(sport=1144, dport=2233) /
1854                            Raw(b'X' * payload_size))
1855                 for i in range(count)]
1856
1857     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1858                   payload_size=100):
1859         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1860                 IPv6(src="1::1", dst=dst) /
1861                 UDP(sport=1144, dport=2233) /
1862                 Raw(b'X' * payload_size)
1863                 for i in range(count)]
1864
1865     def verify_decrypted6(self, p, rxs):
1866         for rx in rxs:
1867             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1868             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1869
1870     def verify_encrypted6(self, p, sa, rxs):
1871         for rx in rxs:
1872             try:
1873                 pkt = sa.decrypt(rx[IPv6])
1874                 if not pkt.haslayer(IPv6):
1875                     pkt = IPv6(pkt[Raw].load)
1876                 self.assert_packet_checksums_valid(pkt)
1877                 self.assertTrue(pkt.haslayer(GRE))
1878                 e = pkt[GRE]
1879                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1880             except (IndexError, AssertionError):
1881                 self.logger.debug(ppp("Unexpected packet:", rx))
1882                 try:
1883                     self.logger.debug(ppp("Decrypted packet:", pkt))
1884                 except:
1885                     pass
1886                 raise
1887
1888     def setUp(self):
1889         super(TestIpsecMGreIfEspTra6, self).setUp()
1890
1891         self.vapi.cli("set logging class ipsec level debug")
1892
1893         N_NHS = 16
1894         self.tun_if = self.pg0
1895         p = self.ipv6_params
1896         p.tun_if = VppGreInterface(self,
1897                                    self.pg0.local_ip6,
1898                                    "::",
1899                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1900                                          TUNNEL_API_MODE_MP))
1901         p.tun_if.add_vpp_config()
1902         p.tun_if.admin_up()
1903         p.tun_if.config_ip6()
1904         p.tun_if.generate_remote_hosts(N_NHS)
1905         self.pg0.generate_remote_hosts(N_NHS)
1906         self.pg0.configure_ipv6_neighbors()
1907
1908         # setup some SAs for several next-hops on the interface
1909         self.multi_params = []
1910
1911         for ii in range(N_NHS):
1912             p = copy.copy(self.ipv6_params)
1913
1914             p.remote_tun_if_host = "1::%d" % (ii + 1)
1915             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1916             p.scapy_tun_spi = p.scapy_tun_spi + ii
1917             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1918             p.vpp_tun_spi = p.vpp_tun_spi + ii
1919
1920             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1921             p.scapy_tra_spi = p.scapy_tra_spi + ii
1922             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1923             p.vpp_tra_spi = p.vpp_tra_spi + ii
1924             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1925                                       p.auth_algo_vpp_id, p.auth_key,
1926                                       p.crypt_algo_vpp_id, p.crypt_key,
1927                                       self.vpp_esp_protocol)
1928             p.tun_sa_out.add_vpp_config()
1929
1930             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1931                                      p.auth_algo_vpp_id, p.auth_key,
1932                                      p.crypt_algo_vpp_id, p.crypt_key,
1933                                      self.vpp_esp_protocol)
1934             p.tun_sa_in.add_vpp_config()
1935
1936             # in this v6 variant add the teibs first then the protection
1937             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1938             VppTeib(self, p.tun_if,
1939                     p.tun_if.remote_hosts[ii].ip6,
1940                     p.tun_dst).add_vpp_config()
1941
1942             p.tun_protect = VppIpsecTunProtect(
1943                 self,
1944                 p.tun_if,
1945                 p.tun_sa_out,
1946                 [p.tun_sa_in],
1947                 nh=p.tun_if.remote_hosts[ii].ip6)
1948             p.tun_protect.add_vpp_config()
1949             config_tra_params(p, self.encryption_type, p.tun_if)
1950             self.multi_params.append(p)
1951
1952             VppIpRoute(self, p.remote_tun_if_host, 128,
1953                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1954                                      p.tun_if.sw_if_index)]).add_vpp_config()
1955             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1956
1957         self.logger.info(self.vapi.cli("sh log"))
1958         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1959         self.logger.info(self.vapi.cli("sh adj 41"))
1960
1961     def tearDown(self):
1962         p = self.ipv6_params
1963         p.tun_if.unconfig_ip6()
1964         super(TestIpsecMGreIfEspTra6, self).tearDown()
1965
1966     def test_tun_66(self):
1967         """mGRE IPSec 66"""
1968         for p in self.multi_params:
1969             self.verify_tun_66(p, count=63)
1970
1971
1972 @tag_fixme_vpp_workers
1973 class TestIpsec4TunProtect(TemplateIpsec,
1974                            TemplateIpsec4TunProtect,
1975                            IpsecTun4):
1976     """ IPsec IPv4 Tunnel protect - transport mode"""
1977
1978     def setUp(self):
1979         super(TestIpsec4TunProtect, self).setUp()
1980
1981         self.tun_if = self.pg0
1982
1983     def tearDown(self):
1984         super(TestIpsec4TunProtect, self).tearDown()
1985
1986     def test_tun_44(self):
1987         """IPSEC tunnel protect"""
1988
1989         p = self.ipv4_params
1990
1991         self.config_network(p)
1992         self.config_sa_tra(p)
1993         self.config_protect(p)
1994
1995         self.verify_tun_44(p, count=127)
1996         self.assertEqual(p.tun_if.get_rx_stats(), 127)
1997         self.assertEqual(p.tun_if.get_tx_stats(), 127)
1998
1999         self.vapi.cli("clear ipsec sa")
2000         self.verify_tun_64(p, count=127)
2001         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2002         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2003
2004         # rekey - create new SAs and update the tunnel protection
2005         np = copy.copy(p)
2006         np.crypt_key = b'X' + p.crypt_key[1:]
2007         np.scapy_tun_spi += 100
2008         np.scapy_tun_sa_id += 1
2009         np.vpp_tun_spi += 100
2010         np.vpp_tun_sa_id += 1
2011         np.tun_if.local_spi = p.vpp_tun_spi
2012         np.tun_if.remote_spi = p.scapy_tun_spi
2013
2014         self.config_sa_tra(np)
2015         self.config_protect(np)
2016         self.unconfig_sa(p)
2017
2018         self.verify_tun_44(np, count=127)
2019         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2020         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2021
2022         # teardown
2023         self.unconfig_protect(np)
2024         self.unconfig_sa(np)
2025         self.unconfig_network(p)
2026
2027
2028 @tag_fixme_vpp_workers
2029 class TestIpsec4TunProtectUdp(TemplateIpsec,
2030                               TemplateIpsec4TunProtect,
2031                               IpsecTun4):
2032     """ IPsec IPv4 Tunnel protect - transport mode"""
2033
2034     def setUp(self):
2035         super(TestIpsec4TunProtectUdp, self).setUp()
2036
2037         self.tun_if = self.pg0
2038
2039         p = self.ipv4_params
2040         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
2041                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
2042         p.nat_header = UDP(sport=4500, dport=4500)
2043         self.config_network(p)
2044         self.config_sa_tra(p)
2045         self.config_protect(p)
2046
2047     def tearDown(self):
2048         p = self.ipv4_params
2049         self.unconfig_protect(p)
2050         self.unconfig_sa(p)
2051         self.unconfig_network(p)
2052         super(TestIpsec4TunProtectUdp, self).tearDown()
2053
2054     def verify_encrypted(self, p, sa, rxs):
2055         # ensure encrypted packets are recieved with the default UDP ports
2056         for rx in rxs:
2057             self.assertEqual(rx[UDP].sport, 4500)
2058             self.assertEqual(rx[UDP].dport, 4500)
2059         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2060
2061     def test_tun_44(self):
2062         """IPSEC UDP tunnel protect"""
2063
2064         p = self.ipv4_params
2065
2066         self.verify_tun_44(p, count=127)
2067         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2068         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2069
2070     def test_keepalive(self):
2071         """ IPSEC NAT Keepalive """
2072         self.verify_keepalive(self.ipv4_params)
2073
2074
2075 @tag_fixme_vpp_workers
2076 class TestIpsec4TunProtectTun(TemplateIpsec,
2077                               TemplateIpsec4TunProtect,
2078                               IpsecTun4):
2079     """ IPsec IPv4 Tunnel protect - tunnel mode"""
2080
2081     encryption_type = ESP
2082     tun4_encrypt_node_name = "esp4-encrypt-tun"
2083     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2084
2085     def setUp(self):
2086         super(TestIpsec4TunProtectTun, self).setUp()
2087
2088         self.tun_if = self.pg0
2089
2090     def tearDown(self):
2091         super(TestIpsec4TunProtectTun, self).tearDown()
2092
2093     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2094                          payload_size=100):
2095         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2096                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2097                               dst=sw_intf.local_ip4) /
2098                            IP(src=src, dst=dst) /
2099                            UDP(sport=1144, dport=2233) /
2100                            Raw(b'X' * payload_size))
2101                 for i in range(count)]
2102
2103     def gen_pkts(self, sw_intf, src, dst, count=1,
2104                  payload_size=100):
2105         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2106                 IP(src=src, dst=dst) /
2107                 UDP(sport=1144, dport=2233) /
2108                 Raw(b'X' * payload_size)
2109                 for i in range(count)]
2110
2111     def verify_decrypted(self, p, rxs):
2112         for rx in rxs:
2113             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2114             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2115             self.assert_packet_checksums_valid(rx)
2116
2117     def verify_encrypted(self, p, sa, rxs):
2118         for rx in rxs:
2119             try:
2120                 pkt = sa.decrypt(rx[IP])
2121                 if not pkt.haslayer(IP):
2122                     pkt = IP(pkt[Raw].load)
2123                 self.assert_packet_checksums_valid(pkt)
2124                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2125                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2126                 inner = pkt[IP].payload
2127                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2128
2129             except (IndexError, AssertionError):
2130                 self.logger.debug(ppp("Unexpected packet:", rx))
2131                 try:
2132                     self.logger.debug(ppp("Decrypted packet:", pkt))
2133                 except:
2134                     pass
2135                 raise
2136
2137     def test_tun_44(self):
2138         """IPSEC tunnel protect """
2139
2140         p = self.ipv4_params
2141
2142         self.config_network(p)
2143         self.config_sa_tun(p)
2144         self.config_protect(p)
2145
2146         # also add an output features on the tunnel and physical interface
2147         # so we test they still work
2148         r_all = AclRule(True,
2149                         src_prefix="0.0.0.0/0",
2150                         dst_prefix="0.0.0.0/0",
2151                         proto=0)
2152         a = VppAcl(self, [r_all]).add_vpp_config()
2153
2154         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2155         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2156
2157         self.verify_tun_44(p, count=127)
2158
2159         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2160         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2161
2162         # rekey - create new SAs and update the tunnel protection
2163         np = copy.copy(p)
2164         np.crypt_key = b'X' + p.crypt_key[1:]
2165         np.scapy_tun_spi += 100
2166         np.scapy_tun_sa_id += 1
2167         np.vpp_tun_spi += 100
2168         np.vpp_tun_sa_id += 1
2169         np.tun_if.local_spi = p.vpp_tun_spi
2170         np.tun_if.remote_spi = p.scapy_tun_spi
2171
2172         self.config_sa_tun(np)
2173         self.config_protect(np)
2174         self.unconfig_sa(p)
2175
2176         self.verify_tun_44(np, count=127)
2177         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2178         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2179
2180         # teardown
2181         self.unconfig_protect(np)
2182         self.unconfig_sa(np)
2183         self.unconfig_network(p)
2184
2185
2186 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2187                                   TemplateIpsec4TunProtect,
2188                                   IpsecTun4):
2189     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2190
2191     encryption_type = ESP
2192     tun4_encrypt_node_name = "esp4-encrypt-tun"
2193     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2194
2195     def setUp(self):
2196         super(TestIpsec4TunProtectTunDrop, self).setUp()
2197
2198         self.tun_if = self.pg0
2199
2200     def tearDown(self):
2201         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2202
2203     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2204                          payload_size=100):
2205         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2206                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2207                               dst="5.5.5.5") /
2208                            IP(src=src, dst=dst) /
2209                            UDP(sport=1144, dport=2233) /
2210                            Raw(b'X' * payload_size))
2211                 for i in range(count)]
2212
2213     def test_tun_drop_44(self):
2214         """IPSEC tunnel protect bogus tunnel header """
2215
2216         p = self.ipv4_params
2217
2218         self.config_network(p)
2219         self.config_sa_tun(p)
2220         self.config_protect(p)
2221
2222         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2223                                    src=p.remote_tun_if_host,
2224                                    dst=self.pg1.remote_ip4,
2225                                    count=63)
2226         self.send_and_assert_no_replies(self.tun_if, tx)
2227
2228         # teardown
2229         self.unconfig_protect(p)
2230         self.unconfig_sa(p)
2231         self.unconfig_network(p)
2232
2233
2234 @tag_fixme_vpp_workers
2235 class TestIpsec6TunProtect(TemplateIpsec,
2236                            TemplateIpsec6TunProtect,
2237                            IpsecTun6):
2238     """ IPsec IPv6 Tunnel protect - transport mode"""
2239
2240     encryption_type = ESP
2241     tun6_encrypt_node_name = "esp6-encrypt-tun"
2242     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2243
2244     def setUp(self):
2245         super(TestIpsec6TunProtect, self).setUp()
2246
2247         self.tun_if = self.pg0
2248
2249     def tearDown(self):
2250         super(TestIpsec6TunProtect, self).tearDown()
2251
2252     def test_tun_66(self):
2253         """IPSEC tunnel protect 6o6"""
2254
2255         p = self.ipv6_params
2256
2257         self.config_network(p)
2258         self.config_sa_tra(p)
2259         self.config_protect(p)
2260
2261         self.verify_tun_66(p, count=127)
2262         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2263         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2264
2265         # rekey - create new SAs and update the tunnel protection
2266         np = copy.copy(p)
2267         np.crypt_key = b'X' + p.crypt_key[1:]
2268         np.scapy_tun_spi += 100
2269         np.scapy_tun_sa_id += 1
2270         np.vpp_tun_spi += 100
2271         np.vpp_tun_sa_id += 1
2272         np.tun_if.local_spi = p.vpp_tun_spi
2273         np.tun_if.remote_spi = p.scapy_tun_spi
2274
2275         self.config_sa_tra(np)
2276         self.config_protect(np)
2277         self.unconfig_sa(p)
2278
2279         self.verify_tun_66(np, count=127)
2280         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2281         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2282
2283         # bounce the interface state
2284         p.tun_if.admin_down()
2285         self.verify_drop_tun_66(np, count=127)
2286         node = ('/err/ipsec6-tun-input/%s' %
2287                 'ipsec packets received on disabled interface')
2288         self.assertEqual(127, self.statistics.get_err_counter(node))
2289         p.tun_if.admin_up()
2290         self.verify_tun_66(np, count=127)
2291
2292         # 3 phase rekey
2293         #  1) add two input SAs [old, new]
2294         #  2) swap output SA to [new]
2295         #  3) use only [new] input SA
2296         np3 = copy.copy(np)
2297         np3.crypt_key = b'Z' + p.crypt_key[1:]
2298         np3.scapy_tun_spi += 100
2299         np3.scapy_tun_sa_id += 1
2300         np3.vpp_tun_spi += 100
2301         np3.vpp_tun_sa_id += 1
2302         np3.tun_if.local_spi = p.vpp_tun_spi
2303         np3.tun_if.remote_spi = p.scapy_tun_spi
2304
2305         self.config_sa_tra(np3)
2306
2307         # step 1;
2308         p.tun_protect.update_vpp_config(np.tun_sa_out,
2309                                         [np.tun_sa_in, np3.tun_sa_in])
2310         self.verify_tun_66(np, np, count=127)
2311         self.verify_tun_66(np3, np, count=127)
2312
2313         # step 2;
2314         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2315                                         [np.tun_sa_in, np3.tun_sa_in])
2316         self.verify_tun_66(np, np3, count=127)
2317         self.verify_tun_66(np3, np3, count=127)
2318
2319         # step 1;
2320         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2321                                         [np3.tun_sa_in])
2322         self.verify_tun_66(np3, np3, count=127)
2323         self.verify_drop_tun_66(np, count=127)
2324
2325         self.assertEqual(p.tun_if.get_rx_stats(), 127*9)
2326         self.assertEqual(p.tun_if.get_tx_stats(), 127*8)
2327         self.unconfig_sa(np)
2328
2329         # teardown
2330         self.unconfig_protect(np3)
2331         self.unconfig_sa(np3)
2332         self.unconfig_network(p)
2333
2334     def test_tun_46(self):
2335         """IPSEC tunnel protect 4o6"""
2336
2337         p = self.ipv6_params
2338
2339         self.config_network(p)
2340         self.config_sa_tra(p)
2341         self.config_protect(p)
2342
2343         self.verify_tun_46(p, count=127)
2344         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2345         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2346
2347         # teardown
2348         self.unconfig_protect(p)
2349         self.unconfig_sa(p)
2350         self.unconfig_network(p)
2351
2352
2353 @tag_fixme_vpp_workers
2354 class TestIpsec6TunProtectTun(TemplateIpsec,
2355                               TemplateIpsec6TunProtect,
2356                               IpsecTun6):
2357     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2358
2359     encryption_type = ESP
2360     tun6_encrypt_node_name = "esp6-encrypt-tun"
2361     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2362
2363     def setUp(self):
2364         super(TestIpsec6TunProtectTun, self).setUp()
2365
2366         self.tun_if = self.pg0
2367
2368     def tearDown(self):
2369         super(TestIpsec6TunProtectTun, self).tearDown()
2370
2371     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2372                           payload_size=100):
2373         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2374                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2375                                 dst=sw_intf.local_ip6) /
2376                            IPv6(src=src, dst=dst) /
2377                            UDP(sport=1166, dport=2233) /
2378                            Raw(b'X' * payload_size))
2379                 for i in range(count)]
2380
2381     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2382                   payload_size=100):
2383         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2384                 IPv6(src=src, dst=dst) /
2385                 UDP(sport=1166, dport=2233) /
2386                 Raw(b'X' * payload_size)
2387                 for i in range(count)]
2388
2389     def verify_decrypted6(self, p, rxs):
2390         for rx in rxs:
2391             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2392             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2393             self.assert_packet_checksums_valid(rx)
2394
2395     def verify_encrypted6(self, p, sa, rxs):
2396         for rx in rxs:
2397             try:
2398                 pkt = sa.decrypt(rx[IPv6])
2399                 if not pkt.haslayer(IPv6):
2400                     pkt = IPv6(pkt[Raw].load)
2401                 self.assert_packet_checksums_valid(pkt)
2402                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2403                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2404                 inner = pkt[IPv6].payload
2405                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2406
2407             except (IndexError, AssertionError):
2408                 self.logger.debug(ppp("Unexpected packet:", rx))
2409                 try:
2410                     self.logger.debug(ppp("Decrypted packet:", pkt))
2411                 except:
2412                     pass
2413                 raise
2414
2415     def test_tun_66(self):
2416         """IPSEC tunnel protect """
2417
2418         p = self.ipv6_params
2419
2420         self.config_network(p)
2421         self.config_sa_tun(p)
2422         self.config_protect(p)
2423
2424         self.verify_tun_66(p, count=127)
2425
2426         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2427         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2428
2429         # rekey - create new SAs and update the tunnel protection
2430         np = copy.copy(p)
2431         np.crypt_key = b'X' + p.crypt_key[1:]
2432         np.scapy_tun_spi += 100
2433         np.scapy_tun_sa_id += 1
2434         np.vpp_tun_spi += 100
2435         np.vpp_tun_sa_id += 1
2436         np.tun_if.local_spi = p.vpp_tun_spi
2437         np.tun_if.remote_spi = p.scapy_tun_spi
2438
2439         self.config_sa_tun(np)
2440         self.config_protect(np)
2441         self.unconfig_sa(p)
2442
2443         self.verify_tun_66(np, count=127)
2444         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2445         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2446
2447         # teardown
2448         self.unconfig_protect(np)
2449         self.unconfig_sa(np)
2450         self.unconfig_network(p)
2451
2452
2453 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2454                                   TemplateIpsec6TunProtect,
2455                                   IpsecTun6):
2456     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2457
2458     encryption_type = ESP
2459     tun6_encrypt_node_name = "esp6-encrypt-tun"
2460     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2461
2462     def setUp(self):
2463         super(TestIpsec6TunProtectTunDrop, self).setUp()
2464
2465         self.tun_if = self.pg0
2466
2467     def tearDown(self):
2468         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2469
2470     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2471                           payload_size=100):
2472         # the IP destination of the revelaed packet does not match
2473         # that assigned to the tunnel
2474         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2475                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2476                                 dst="5::5") /
2477                            IPv6(src=src, dst=dst) /
2478                            UDP(sport=1144, dport=2233) /
2479                            Raw(b'X' * payload_size))
2480                 for i in range(count)]
2481
2482     def test_tun_drop_66(self):
2483         """IPSEC 6 tunnel protect bogus tunnel header """
2484
2485         p = self.ipv6_params
2486
2487         self.config_network(p)
2488         self.config_sa_tun(p)
2489         self.config_protect(p)
2490
2491         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2492                                     src=p.remote_tun_if_host,
2493                                     dst=self.pg1.remote_ip6,
2494                                     count=63)
2495         self.send_and_assert_no_replies(self.tun_if, tx)
2496
2497         self.unconfig_protect(p)
2498         self.unconfig_sa(p)
2499         self.unconfig_network(p)
2500
2501
2502 class TemplateIpsecItf4(object):
2503     """ IPsec Interface IPv4 """
2504
2505     encryption_type = ESP
2506     tun4_encrypt_node_name = "esp4-encrypt-tun"
2507     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2508     tun4_input_node = "ipsec4-tun-input"
2509
2510     def config_sa_tun(self, p, src, dst):
2511         config_tun_params(p, self.encryption_type, None, src, dst)
2512
2513         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2514                                   p.auth_algo_vpp_id, p.auth_key,
2515                                   p.crypt_algo_vpp_id, p.crypt_key,
2516                                   self.vpp_esp_protocol,
2517                                   src, dst,
2518                                   flags=p.flags)
2519         p.tun_sa_out.add_vpp_config()
2520
2521         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2522                                  p.auth_algo_vpp_id, p.auth_key,
2523                                  p.crypt_algo_vpp_id, p.crypt_key,
2524                                  self.vpp_esp_protocol,
2525                                  dst, src,
2526                                  flags=p.flags)
2527         p.tun_sa_in.add_vpp_config()
2528
2529     def config_protect(self, p):
2530         p.tun_protect = VppIpsecTunProtect(self,
2531                                            p.tun_if,
2532                                            p.tun_sa_out,
2533                                            [p.tun_sa_in])
2534         p.tun_protect.add_vpp_config()
2535
2536     def config_network(self, p, instance=0xffffffff):
2537         p.tun_if = VppIpsecInterface(self, instance=instance)
2538
2539         p.tun_if.add_vpp_config()
2540         p.tun_if.admin_up()
2541         p.tun_if.config_ip4()
2542         p.tun_if.config_ip6()
2543
2544         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2545                              [VppRoutePath(p.tun_if.remote_ip4,
2546                                            0xffffffff)])
2547         p.route.add_vpp_config()
2548         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2549                        [VppRoutePath(p.tun_if.remote_ip6,
2550                                      0xffffffff,
2551                                      proto=DpoProto.DPO_PROTO_IP6)])
2552         r.add_vpp_config()
2553
2554     def unconfig_network(self, p):
2555         p.route.remove_vpp_config()
2556         p.tun_if.remove_vpp_config()
2557
2558     def unconfig_protect(self, p):
2559         p.tun_protect.remove_vpp_config()
2560
2561     def unconfig_sa(self, p):
2562         p.tun_sa_out.remove_vpp_config()
2563         p.tun_sa_in.remove_vpp_config()
2564
2565
2566 @tag_fixme_vpp_workers
2567 class TestIpsecItf4(TemplateIpsec,
2568                     TemplateIpsecItf4,
2569                     IpsecTun4):
2570     """ IPsec Interface IPv4 """
2571
2572     def setUp(self):
2573         super(TestIpsecItf4, self).setUp()
2574
2575         self.tun_if = self.pg0
2576
2577     def tearDown(self):
2578         super(TestIpsecItf4, self).tearDown()
2579
2580     def test_tun_instance_44(self):
2581         p = self.ipv4_params
2582         self.config_network(p, instance=3)
2583
2584         with self.assertRaises(CliFailedCommandError):
2585             self.vapi.cli("show interface ipsec0")
2586
2587         output = self.vapi.cli("show interface ipsec3")
2588         self.assertTrue("unknown" not in output)
2589
2590         self.unconfig_network(p)
2591
2592     def test_tun_44(self):
2593         """IPSEC interface IPv4"""
2594
2595         n_pkts = 127
2596         p = self.ipv4_params
2597
2598         self.config_network(p)
2599         self.config_sa_tun(p,
2600                            self.pg0.local_ip4,
2601                            self.pg0.remote_ip4)
2602         self.config_protect(p)
2603
2604         self.verify_tun_44(p, count=n_pkts)
2605         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2606         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2607
2608         p.tun_if.admin_down()
2609         self.verify_tun_dropped_44(p, count=n_pkts)
2610         p.tun_if.admin_up()
2611         self.verify_tun_44(p, count=n_pkts)
2612
2613         self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2614         self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2615
2616         # it's a v6 packet when its encrypted
2617         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2618
2619         self.verify_tun_64(p, count=n_pkts)
2620         self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2621         self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2622
2623         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2624
2625         self.vapi.cli("clear interfaces")
2626
2627         # rekey - create new SAs and update the tunnel protection
2628         np = copy.copy(p)
2629         np.crypt_key = b'X' + p.crypt_key[1:]
2630         np.scapy_tun_spi += 100
2631         np.scapy_tun_sa_id += 1
2632         np.vpp_tun_spi += 100
2633         np.vpp_tun_sa_id += 1
2634         np.tun_if.local_spi = p.vpp_tun_spi
2635         np.tun_if.remote_spi = p.scapy_tun_spi
2636
2637         self.config_sa_tun(np,
2638                            self.pg0.local_ip4,
2639                            self.pg0.remote_ip4)
2640         self.config_protect(np)
2641         self.unconfig_sa(p)
2642
2643         self.verify_tun_44(np, count=n_pkts)
2644         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2645         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2646
2647         # teardown
2648         self.unconfig_protect(np)
2649         self.unconfig_sa(np)
2650         self.unconfig_network(p)
2651
2652     def test_tun_44_null(self):
2653         """IPSEC interface IPv4 NULL auth/crypto"""
2654
2655         n_pkts = 127
2656         p = copy.copy(self.ipv4_params)
2657
2658         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2659                               IPSEC_API_INTEG_ALG_NONE)
2660         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2661                                IPSEC_API_CRYPTO_ALG_NONE)
2662         p.crypt_algo = "NULL"
2663         p.auth_algo = "NULL"
2664
2665         self.config_network(p)
2666         self.config_sa_tun(p,
2667                            self.pg0.local_ip4,
2668                            self.pg0.remote_ip4)
2669         self.config_protect(p)
2670
2671         self.verify_tun_44(p, count=n_pkts)
2672
2673         # teardown
2674         self.unconfig_protect(p)
2675         self.unconfig_sa(p)
2676         self.unconfig_network(p)
2677
2678     def test_tun_44_police(self):
2679         """IPSEC interface IPv4 with input policer"""
2680         n_pkts = 127
2681         p = self.ipv4_params
2682
2683         self.config_network(p)
2684         self.config_sa_tun(p,
2685                            self.pg0.local_ip4,
2686                            self.pg0.remote_ip4)
2687         self.config_protect(p)
2688
2689         action_tx = PolicerAction(
2690             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2691             0)
2692         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2693                              conform_action=action_tx,
2694                              exceed_action=action_tx,
2695                              violate_action=action_tx)
2696         policer.add_vpp_config()
2697
2698         # Start policing on tun
2699         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2700
2701         self.verify_tun_44(p, count=n_pkts)
2702         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2703         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2704
2705         stats = policer.get_stats()
2706
2707         # Single rate, 2 colour policer - expect conform, violate but no exceed
2708         self.assertGreater(stats['conform_packets'], 0)
2709         self.assertEqual(stats['exceed_packets'], 0)
2710         self.assertGreater(stats['violate_packets'], 0)
2711
2712         # Stop policing on tun
2713         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
2714         self.verify_tun_44(p, count=n_pkts)
2715
2716         # No new policer stats
2717         statsnew = policer.get_stats()
2718         self.assertEqual(stats, statsnew)
2719
2720         # teardown
2721         policer.remove_vpp_config()
2722         self.unconfig_protect(p)
2723         self.unconfig_sa(p)
2724         self.unconfig_network(p)
2725
2726
2727 class TestIpsecItf4MPLS(TemplateIpsec,
2728                         TemplateIpsecItf4,
2729                         IpsecTun4):
2730     """ IPsec Interface MPLSoIPv4 """
2731
2732     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2733
2734     def setUp(self):
2735         super(TestIpsecItf4MPLS, self).setUp()
2736
2737         self.tun_if = self.pg0
2738
2739     def tearDown(self):
2740         super(TestIpsecItf4MPLS, self).tearDown()
2741
2742     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2743                          payload_size=100):
2744         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2745                 sa.encrypt(MPLS(label=44, ttl=3) /
2746                            IP(src=src, dst=dst) /
2747                            UDP(sport=1166, dport=2233) /
2748                            Raw(b'X' * payload_size))
2749                 for i in range(count)]
2750
2751     def verify_encrypted(self, p, sa, rxs):
2752         for rx in rxs:
2753             try:
2754                 pkt = sa.decrypt(rx[IP])
2755                 if not pkt.haslayer(IP):
2756                     pkt = IP(pkt[Raw].load)
2757                 self.assert_packet_checksums_valid(pkt)
2758                 self.assert_equal(pkt[MPLS].label, 44)
2759                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2760             except (IndexError, AssertionError):
2761                 self.logger.debug(ppp("Unexpected packet:", rx))
2762                 try:
2763                     self.logger.debug(ppp("Decrypted packet:", pkt))
2764                 except:
2765                     pass
2766                 raise
2767
2768     def test_tun_mpls_o_ip4(self):
2769         """IPSEC interface MPLS over IPv4"""
2770
2771         n_pkts = 127
2772         p = self.ipv4_params
2773         f = FibPathProto
2774
2775         tbl = VppMplsTable(self, 0)
2776         tbl.add_vpp_config()
2777
2778         self.config_network(p)
2779         # deag MPLS routes from the tunnel
2780         r4 = VppMplsRoute(self, 44, 1,
2781                           [VppRoutePath(
2782                               self.pg1.remote_ip4,
2783                               self.pg1.sw_if_index)]).add_vpp_config()
2784         p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2785                                      p.tun_if.sw_if_index,
2786                                      labels=[VppMplsLabel(44)])])
2787         p.tun_if.enable_mpls()
2788
2789         self.config_sa_tun(p,
2790                            self.pg0.local_ip4,
2791                            self.pg0.remote_ip4)
2792         self.config_protect(p)
2793
2794         self.verify_tun_44(p, count=n_pkts)
2795
2796         # cleanup
2797         p.tun_if.disable_mpls()
2798         self.unconfig_protect(p)
2799         self.unconfig_sa(p)
2800         self.unconfig_network(p)
2801
2802
2803 class TemplateIpsecItf6(object):
2804     """ IPsec Interface IPv6 """
2805
2806     encryption_type = ESP
2807     tun6_encrypt_node_name = "esp6-encrypt-tun"
2808     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2809     tun6_input_node = "ipsec6-tun-input"
2810
2811     def config_sa_tun(self, p, src, dst):
2812         config_tun_params(p, self.encryption_type, None, src, dst)
2813
2814         if not hasattr(p, 'tun_flags'):
2815             p.tun_flags = None
2816         if not hasattr(p, 'hop_limit'):
2817             p.hop_limit = 255
2818
2819         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2820                                   p.auth_algo_vpp_id, p.auth_key,
2821                                   p.crypt_algo_vpp_id, p.crypt_key,
2822                                   self.vpp_esp_protocol,
2823                                   src, dst,
2824                                   flags=p.flags,
2825                                   tun_flags=p.tun_flags,
2826                                   hop_limit=p.hop_limit)
2827         p.tun_sa_out.add_vpp_config()
2828
2829         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2830                                  p.auth_algo_vpp_id, p.auth_key,
2831                                  p.crypt_algo_vpp_id, p.crypt_key,
2832                                  self.vpp_esp_protocol,
2833                                  dst, src,
2834                                  flags=p.flags)
2835         p.tun_sa_in.add_vpp_config()
2836
2837     def config_protect(self, p):
2838         p.tun_protect = VppIpsecTunProtect(self,
2839                                            p.tun_if,
2840                                            p.tun_sa_out,
2841                                            [p.tun_sa_in])
2842         p.tun_protect.add_vpp_config()
2843
2844     def config_network(self, p):
2845         p.tun_if = VppIpsecInterface(self)
2846
2847         p.tun_if.add_vpp_config()
2848         p.tun_if.admin_up()
2849         p.tun_if.config_ip4()
2850         p.tun_if.config_ip6()
2851
2852         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2853                        [VppRoutePath(p.tun_if.remote_ip4,
2854                                      0xffffffff)])
2855         r.add_vpp_config()
2856
2857         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2858                              [VppRoutePath(p.tun_if.remote_ip6,
2859                                            0xffffffff,
2860                                            proto=DpoProto.DPO_PROTO_IP6)])
2861         p.route.add_vpp_config()
2862
2863     def unconfig_network(self, p):
2864         p.route.remove_vpp_config()
2865         p.tun_if.remove_vpp_config()
2866
2867     def unconfig_protect(self, p):
2868         p.tun_protect.remove_vpp_config()
2869
2870     def unconfig_sa(self, p):
2871         p.tun_sa_out.remove_vpp_config()
2872         p.tun_sa_in.remove_vpp_config()
2873
2874
2875 @tag_fixme_vpp_workers
2876 class TestIpsecItf6(TemplateIpsec,
2877                     TemplateIpsecItf6,
2878                     IpsecTun6):
2879     """ IPsec Interface IPv6 """
2880
2881     def setUp(self):
2882         super(TestIpsecItf6, self).setUp()
2883
2884         self.tun_if = self.pg0
2885
2886     def tearDown(self):
2887         super(TestIpsecItf6, self).tearDown()
2888
2889     def test_tun_44(self):
2890         """IPSEC interface IPv6"""
2891
2892         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2893         n_pkts = 127
2894         p = self.ipv6_params
2895         p.inner_hop_limit = 24
2896         p.outer_hop_limit = 23
2897         p.outer_flow_label = 243224
2898         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2899
2900         self.config_network(p)
2901         self.config_sa_tun(p,
2902                            self.pg0.local_ip6,
2903                            self.pg0.remote_ip6)
2904         self.config_protect(p)
2905
2906         self.verify_tun_66(p, count=n_pkts)
2907         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2908         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2909
2910         p.tun_if.admin_down()
2911         self.verify_drop_tun_66(p, count=n_pkts)
2912         p.tun_if.admin_up()
2913         self.verify_tun_66(p, count=n_pkts)
2914
2915         self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2916         self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2917
2918         # it's a v4 packet when its encrypted
2919         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2920
2921         self.verify_tun_46(p, count=n_pkts)
2922         self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2923         self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2924
2925         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2926
2927         self.vapi.cli("clear interfaces")
2928
2929         # rekey - create new SAs and update the tunnel protection
2930         np = copy.copy(p)
2931         np.crypt_key = b'X' + p.crypt_key[1:]
2932         np.scapy_tun_spi += 100
2933         np.scapy_tun_sa_id += 1
2934         np.vpp_tun_spi += 100
2935         np.vpp_tun_sa_id += 1
2936         np.tun_if.local_spi = p.vpp_tun_spi
2937         np.tun_if.remote_spi = p.scapy_tun_spi
2938         np.inner_hop_limit = 24
2939         np.outer_hop_limit = 128
2940         np.inner_flow_label = 0xabcde
2941         np.outer_flow_label = 0xabcde
2942         np.hop_limit = 128
2943         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2944
2945         self.config_sa_tun(np,
2946                            self.pg0.local_ip6,
2947                            self.pg0.remote_ip6)
2948         self.config_protect(np)
2949         self.unconfig_sa(p)
2950
2951         self.verify_tun_66(np, count=n_pkts)
2952         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2953         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2954
2955         # teardown
2956         self.unconfig_protect(np)
2957         self.unconfig_sa(np)
2958         self.unconfig_network(p)
2959
2960     def test_tun_66_police(self):
2961         """IPSEC interface IPv6 with input policer"""
2962         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2963         n_pkts = 127
2964         p = self.ipv6_params
2965         p.inner_hop_limit = 24
2966         p.outer_hop_limit = 23
2967         p.outer_flow_label = 243224
2968         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2969
2970         self.config_network(p)
2971         self.config_sa_tun(p,
2972                            self.pg0.local_ip6,
2973                            self.pg0.remote_ip6)
2974         self.config_protect(p)
2975
2976         action_tx = PolicerAction(
2977             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2978             0)
2979         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2980                              conform_action=action_tx,
2981                              exceed_action=action_tx,
2982                              violate_action=action_tx)
2983         policer.add_vpp_config()
2984
2985         # Start policing on tun
2986         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2987
2988         self.verify_tun_66(p, count=n_pkts)
2989         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2990         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2991
2992         stats = policer.get_stats()
2993
2994         # Single rate, 2 colour policer - expect conform, violate but no exceed
2995         self.assertGreater(stats['conform_packets'], 0)
2996         self.assertEqual(stats['exceed_packets'], 0)
2997         self.assertGreater(stats['violate_packets'], 0)
2998
2999         # Stop policing on tun
3000         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
3001         self.verify_tun_66(p, count=n_pkts)
3002
3003         # No new policer stats
3004         statsnew = policer.get_stats()
3005         self.assertEqual(stats, statsnew)
3006
3007         # teardown
3008         policer.remove_vpp_config()
3009         self.unconfig_protect(p)
3010         self.unconfig_sa(p)
3011         self.unconfig_network(p)
3012
3013
3014 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3015     """ Ipsec P2MP ESP v4 tests """
3016     tun4_encrypt_node_name = "esp4-encrypt-tun"
3017     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3018     encryption_type = ESP
3019
3020     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
3021                          payload_size=100):
3022         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3023                 sa.encrypt(IP(src=self.pg1.local_ip4,
3024                               dst=self.pg1.remote_ip4) /
3025                            UDP(sport=1144, dport=2233) /
3026                            Raw(b'X' * payload_size))
3027                 for i in range(count)]
3028
3029     def gen_pkts(self, sw_intf, src, dst, count=1,
3030                  payload_size=100):
3031         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3032                 IP(src="1.1.1.1", dst=dst) /
3033                 UDP(sport=1144, dport=2233) /
3034                 Raw(b'X' * payload_size)
3035                 for i in range(count)]
3036
3037     def verify_decrypted(self, p, rxs):
3038         for rx in rxs:
3039             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3040             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3041
3042     def verify_encrypted(self, p, sa, rxs):
3043         for rx in rxs:
3044             try:
3045                 self.assertEqual(rx[IP].tos,
3046                                  VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
3047                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3048                 pkt = sa.decrypt(rx[IP])
3049                 if not pkt.haslayer(IP):
3050                     pkt = IP(pkt[Raw].load)
3051                 self.assert_packet_checksums_valid(pkt)
3052                 e = pkt[IP]
3053                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3054             except (IndexError, AssertionError):
3055                 self.logger.debug(ppp("Unexpected packet:", rx))
3056                 try:
3057                     self.logger.debug(ppp("Decrypted packet:", pkt))
3058                 except:
3059                     pass
3060                 raise
3061
3062     def setUp(self):
3063         super(TestIpsecMIfEsp4, self).setUp()
3064
3065         N_NHS = 16
3066         self.tun_if = self.pg0
3067         p = self.ipv4_params
3068         p.tun_if = VppIpsecInterface(self,
3069                                      mode=(VppEnum.vl_api_tunnel_mode_t.
3070                                            TUNNEL_API_MODE_MP))
3071         p.tun_if.add_vpp_config()
3072         p.tun_if.admin_up()
3073         p.tun_if.config_ip4()
3074         p.tun_if.unconfig_ip4()
3075         p.tun_if.config_ip4()
3076         p.tun_if.generate_remote_hosts(N_NHS)
3077         self.pg0.generate_remote_hosts(N_NHS)
3078         self.pg0.configure_ipv4_neighbors()
3079
3080         # setup some SAs for several next-hops on the interface
3081         self.multi_params = []
3082
3083         for ii in range(N_NHS):
3084             p = copy.copy(self.ipv4_params)
3085
3086             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3087             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3088             p.scapy_tun_spi = p.scapy_tun_spi + ii
3089             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3090             p.vpp_tun_spi = p.vpp_tun_spi + ii
3091
3092             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3093             p.scapy_tra_spi = p.scapy_tra_spi + ii
3094             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3095             p.vpp_tra_spi = p.vpp_tra_spi + ii
3096             p.hop_limit = ii+10
3097             p.tun_sa_out = VppIpsecSA(
3098                 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
3099                 p.auth_algo_vpp_id, p.auth_key,
3100                 p.crypt_algo_vpp_id, p.crypt_key,
3101                 self.vpp_esp_protocol,
3102                 self.pg0.local_ip4,
3103                 self.pg0.remote_hosts[ii].ip4,
3104                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3105                 hop_limit=p.hop_limit)
3106             p.tun_sa_out.add_vpp_config()
3107
3108             p.tun_sa_in = VppIpsecSA(
3109                 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
3110                 p.auth_algo_vpp_id, p.auth_key,
3111                 p.crypt_algo_vpp_id, p.crypt_key,
3112                 self.vpp_esp_protocol,
3113                 self.pg0.remote_hosts[ii].ip4,
3114                 self.pg0.local_ip4,
3115                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3116                 hop_limit=p.hop_limit)
3117             p.tun_sa_in.add_vpp_config()
3118
3119             p.tun_protect = VppIpsecTunProtect(
3120                 self,
3121                 p.tun_if,
3122                 p.tun_sa_out,
3123                 [p.tun_sa_in],
3124                 nh=p.tun_if.remote_hosts[ii].ip4)
3125             p.tun_protect.add_vpp_config()
3126             config_tun_params(p, self.encryption_type, None,
3127                               self.pg0.local_ip4,
3128                               self.pg0.remote_hosts[ii].ip4)
3129             self.multi_params.append(p)
3130
3131             VppIpRoute(self, p.remote_tun_if_host, 32,
3132                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
3133                                      p.tun_if.sw_if_index)]).add_vpp_config()
3134
3135             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3136
3137     def tearDown(self):
3138         p = self.ipv4_params
3139         p.tun_if.unconfig_ip4()
3140         super(TestIpsecMIfEsp4, self).tearDown()
3141
3142     def test_tun_44(self):
3143         """P2MP IPSEC 44"""
3144         N_PKTS = 63
3145         for p in self.multi_params:
3146             self.verify_tun_44(p, count=N_PKTS)
3147
3148
3149 class TestIpsecItf6MPLS(TemplateIpsec,
3150                         TemplateIpsecItf6,
3151                         IpsecTun6):
3152     """ IPsec Interface MPLSoIPv6 """
3153
3154     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3155
3156     def setUp(self):
3157         super(TestIpsecItf6MPLS, self).setUp()
3158
3159         self.tun_if = self.pg0
3160
3161     def tearDown(self):
3162         super(TestIpsecItf6MPLS, self).tearDown()
3163
3164     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
3165                           payload_size=100):
3166         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3167                 sa.encrypt(MPLS(label=66, ttl=3) /
3168                            IPv6(src=src, dst=dst) /
3169                            UDP(sport=1166, dport=2233) /
3170                            Raw(b'X' * payload_size))
3171                 for i in range(count)]
3172
3173     def verify_encrypted6(self, p, sa, rxs):
3174         for rx in rxs:
3175             try:
3176                 pkt = sa.decrypt(rx[IPv6])
3177                 if not pkt.haslayer(IPv6):
3178                     pkt = IP(pkt[Raw].load)
3179                 self.assert_packet_checksums_valid(pkt)
3180                 self.assert_equal(pkt[MPLS].label, 66)
3181                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3182             except (IndexError, AssertionError):
3183                 self.logger.debug(ppp("Unexpected packet:", rx))
3184                 try:
3185                     self.logger.debug(ppp("Decrypted packet:", pkt))
3186                 except:
3187                     pass
3188                 raise
3189
3190     def test_tun_mpls_o_ip6(self):
3191         """IPSEC interface MPLS over IPv6"""
3192
3193         n_pkts = 127
3194         p = self.ipv6_params
3195         f = FibPathProto
3196
3197         tbl = VppMplsTable(self, 0)
3198         tbl.add_vpp_config()
3199
3200         self.config_network(p)
3201         # deag MPLS routes from the tunnel
3202         r6 = VppMplsRoute(self, 66, 1,
3203                           [VppRoutePath(
3204                               self.pg1.remote_ip6,
3205                               self.pg1.sw_if_index)],
3206                           eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3207         p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3208                                      p.tun_if.sw_if_index,
3209                                      labels=[VppMplsLabel(66)])])
3210         p.tun_if.enable_mpls()
3211
3212         self.config_sa_tun(p,
3213                            self.pg0.local_ip6,
3214                            self.pg0.remote_ip6)
3215         self.config_protect(p)
3216
3217         self.verify_tun_66(p, count=n_pkts)
3218
3219         # cleanup
3220         p.tun_if.disable_mpls()
3221         self.unconfig_protect(p)
3222         self.unconfig_sa(p)
3223         self.unconfig_network(p)
3224
3225
3226 if __name__ == '__main__':
3227     unittest.main(testRunner=VppTestRunner)