ikev2: support responder hostname
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
15     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19     VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
24 from util import ppp
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 from vpp_policer import PolicerAction, VppPolicer
29
30
31 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
32     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
33     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
34                              IPSEC_API_SAD_FLAG_USE_ESN))
35     crypt_key = mk_scapy_crypt_key(p)
36     if tun_if:
37         p.tun_dst = tun_if.remote_ip
38         p.tun_src = tun_if.local_ip
39     else:
40         p.tun_dst = dst
41         p.tun_src = src
42
43     p.scapy_tun_sa = SecurityAssociation(
44         encryption_type, spi=p.vpp_tun_spi,
45         crypt_algo=p.crypt_algo,
46         crypt_key=crypt_key,
47         auth_algo=p.auth_algo, auth_key=p.auth_key,
48         tunnel_header=ip_class_by_addr_type[p.addr_type](
49             src=p.tun_dst,
50             dst=p.tun_src),
51         nat_t_header=p.nat_header,
52         esn_en=esn_en)
53     p.vpp_tun_sa = SecurityAssociation(
54         encryption_type, spi=p.scapy_tun_spi,
55         crypt_algo=p.crypt_algo,
56         crypt_key=crypt_key,
57         auth_algo=p.auth_algo, auth_key=p.auth_key,
58         tunnel_header=ip_class_by_addr_type[p.addr_type](
59             dst=p.tun_dst,
60             src=p.tun_src),
61         nat_t_header=p.nat_header,
62         esn_en=esn_en)
63
64
65 def config_tra_params(p, encryption_type, tun_if):
66     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
67     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
68                              IPSEC_API_SAD_FLAG_USE_ESN))
69     crypt_key = mk_scapy_crypt_key(p)
70     p.tun_dst = tun_if.remote_ip
71     p.tun_src = tun_if.local_ip
72     p.scapy_tun_sa = SecurityAssociation(
73         encryption_type, spi=p.vpp_tun_spi,
74         crypt_algo=p.crypt_algo,
75         crypt_key=crypt_key,
76         auth_algo=p.auth_algo, auth_key=p.auth_key,
77         esn_en=esn_en,
78         nat_t_header=p.nat_header)
79     p.vpp_tun_sa = SecurityAssociation(
80         encryption_type, spi=p.scapy_tun_spi,
81         crypt_algo=p.crypt_algo,
82         crypt_key=crypt_key,
83         auth_algo=p.auth_algo, auth_key=p.auth_key,
84         esn_en=esn_en,
85         nat_t_header=p.nat_header)
86
87
88 class TemplateIpsec4TunProtect(object):
89     """ IPsec IPv4 Tunnel protect """
90
91     encryption_type = ESP
92     tun4_encrypt_node_name = "esp4-encrypt-tun"
93     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
94     tun4_input_node = "ipsec4-tun-input"
95
96     def config_sa_tra(self, p):
97         config_tun_params(p, self.encryption_type, p.tun_if)
98
99         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
100                                   p.auth_algo_vpp_id, p.auth_key,
101                                   p.crypt_algo_vpp_id, p.crypt_key,
102                                   self.vpp_esp_protocol,
103                                   flags=p.flags)
104         p.tun_sa_out.add_vpp_config()
105
106         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
107                                  p.auth_algo_vpp_id, p.auth_key,
108                                  p.crypt_algo_vpp_id, p.crypt_key,
109                                  self.vpp_esp_protocol,
110                                  flags=p.flags)
111         p.tun_sa_in.add_vpp_config()
112
113     def config_sa_tun(self, p):
114         config_tun_params(p, self.encryption_type, p.tun_if)
115
116         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
117                                   p.auth_algo_vpp_id, p.auth_key,
118                                   p.crypt_algo_vpp_id, p.crypt_key,
119                                   self.vpp_esp_protocol,
120                                   self.tun_if.local_addr[p.addr_type],
121                                   self.tun_if.remote_addr[p.addr_type],
122                                   flags=p.flags)
123         p.tun_sa_out.add_vpp_config()
124
125         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
126                                  p.auth_algo_vpp_id, p.auth_key,
127                                  p.crypt_algo_vpp_id, p.crypt_key,
128                                  self.vpp_esp_protocol,
129                                  self.tun_if.remote_addr[p.addr_type],
130                                  self.tun_if.local_addr[p.addr_type],
131                                  flags=p.flags)
132         p.tun_sa_in.add_vpp_config()
133
134     def config_protect(self, p):
135         p.tun_protect = VppIpsecTunProtect(self,
136                                            p.tun_if,
137                                            p.tun_sa_out,
138                                            [p.tun_sa_in])
139         p.tun_protect.add_vpp_config()
140
141     def config_network(self, p):
142         if hasattr(p, 'tun_dst'):
143             tun_dst = p.tun_dst
144         else:
145             tun_dst = self.pg0.remote_ip4
146         p.tun_if = VppIpIpTunInterface(self, self.pg0,
147                                        self.pg0.local_ip4,
148                                        tun_dst)
149         p.tun_if.add_vpp_config()
150         p.tun_if.admin_up()
151         p.tun_if.config_ip4()
152         p.tun_if.config_ip6()
153
154         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
155                              [VppRoutePath(p.tun_if.remote_ip4,
156                                            0xffffffff)])
157         p.route.add_vpp_config()
158         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
159                        [VppRoutePath(p.tun_if.remote_ip6,
160                                      0xffffffff,
161                                      proto=DpoProto.DPO_PROTO_IP6)])
162         r.add_vpp_config()
163
164     def unconfig_network(self, p):
165         p.route.remove_vpp_config()
166         p.tun_if.remove_vpp_config()
167
168     def unconfig_protect(self, p):
169         p.tun_protect.remove_vpp_config()
170
171     def unconfig_sa(self, p):
172         p.tun_sa_out.remove_vpp_config()
173         p.tun_sa_in.remove_vpp_config()
174
175
176 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
177                              TemplateIpsec):
178     """ IPsec tunnel interface tests """
179
180     encryption_type = ESP
181
182     @classmethod
183     def setUpClass(cls):
184         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
185
186     @classmethod
187     def tearDownClass(cls):
188         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
189
190     def setUp(self):
191         super(TemplateIpsec4TunIfEsp, self).setUp()
192
193         self.tun_if = self.pg0
194
195         p = self.ipv4_params
196
197         self.config_network(p)
198         self.config_sa_tra(p)
199         self.config_protect(p)
200
201     def tearDown(self):
202         super(TemplateIpsec4TunIfEsp, self).tearDown()
203
204
205 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
206                                 TemplateIpsec):
207     """ IPsec UDP tunnel interface tests """
208
209     tun4_encrypt_node_name = "esp4-encrypt-tun"
210     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
211     encryption_type = ESP
212
213     @classmethod
214     def setUpClass(cls):
215         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
216
217     @classmethod
218     def tearDownClass(cls):
219         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
220
221     def verify_encrypted(self, p, sa, rxs):
222         for rx in rxs:
223             try:
224                 # ensure the UDP ports are correct before we decrypt
225                 # which strips them
226                 self.assertTrue(rx.haslayer(UDP))
227                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
228                 self.assert_equal(rx[UDP].dport, 4500)
229
230                 pkt = sa.decrypt(rx[IP])
231                 if not pkt.haslayer(IP):
232                     pkt = IP(pkt[Raw].load)
233
234                 self.assert_packet_checksums_valid(pkt)
235                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
236                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
237             except (IndexError, AssertionError):
238                 self.logger.debug(ppp("Unexpected packet:", rx))
239                 try:
240                     self.logger.debug(ppp("Decrypted packet:", pkt))
241                 except:
242                     pass
243                 raise
244
245     def config_sa_tra(self, p):
246         config_tun_params(p, self.encryption_type, p.tun_if)
247
248         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
249                                   p.auth_algo_vpp_id, p.auth_key,
250                                   p.crypt_algo_vpp_id, p.crypt_key,
251                                   self.vpp_esp_protocol,
252                                   flags=p.flags,
253                                   udp_src=p.nat_header.sport,
254                                   udp_dst=p.nat_header.dport)
255         p.tun_sa_out.add_vpp_config()
256
257         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
258                                  p.auth_algo_vpp_id, p.auth_key,
259                                  p.crypt_algo_vpp_id, p.crypt_key,
260                                  self.vpp_esp_protocol,
261                                  flags=p.flags,
262                                  udp_src=p.nat_header.sport,
263                                  udp_dst=p.nat_header.dport)
264         p.tun_sa_in.add_vpp_config()
265
266     def setUp(self):
267         super(TemplateIpsec4TunIfEspUdp, self).setUp()
268
269         p = self.ipv4_params
270         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
271                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
272         p.nat_header = UDP(sport=5454, dport=4500)
273
274         self.tun_if = self.pg0
275
276         self.config_network(p)
277         self.config_sa_tra(p)
278         self.config_protect(p)
279
280     def tearDown(self):
281         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
282
283
284 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
285     """ Ipsec ESP - TUN tests """
286     tun4_encrypt_node_name = "esp4-encrypt-tun"
287     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
288
289     def test_tun_basic64(self):
290         """ ipsec 6o4 tunnel basic test """
291         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
292
293         self.verify_tun_64(self.params[socket.AF_INET], count=1)
294
295     def test_tun_burst64(self):
296         """ ipsec 6o4 tunnel basic test """
297         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
298
299         self.verify_tun_64(self.params[socket.AF_INET], count=257)
300
301     def test_tun_basic_frag44(self):
302         """ ipsec 4o4 tunnel frag basic test """
303         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
304
305         p = self.ipv4_params
306
307         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
308                                        [1500, 0, 0, 0])
309         self.verify_tun_44(self.params[socket.AF_INET],
310                            count=1, payload_size=1800, n_rx=2)
311         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
312                                        [9000, 0, 0, 0])
313
314
315 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
316     """ Ipsec ESP UDP tests """
317
318     tun4_input_node = "ipsec4-tun-input"
319
320     def setUp(self):
321         super(TestIpsec4TunIfEspUdp, self).setUp()
322
323     def test_keepalive(self):
324         """ IPSEC NAT Keepalive """
325         self.verify_keepalive(self.ipv4_params)
326
327
328 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
329     """ Ipsec ESP UDP GCM tests """
330
331     tun4_input_node = "ipsec4-tun-input"
332
333     def setUp(self):
334         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
335         p = self.ipv4_params
336         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
337                               IPSEC_API_INTEG_ALG_NONE)
338         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
339                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
340         p.crypt_algo = "AES-GCM"
341         p.auth_algo = "NULL"
342         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
343         p.salt = 0
344
345
346 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
347     """ Ipsec ESP - TCP tests """
348     pass
349
350
351 class TemplateIpsec6TunProtect(object):
352     """ IPsec IPv6 Tunnel protect """
353
354     def config_sa_tra(self, p):
355         config_tun_params(p, self.encryption_type, p.tun_if)
356
357         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
358                                   p.auth_algo_vpp_id, p.auth_key,
359                                   p.crypt_algo_vpp_id, p.crypt_key,
360                                   self.vpp_esp_protocol)
361         p.tun_sa_out.add_vpp_config()
362
363         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
364                                  p.auth_algo_vpp_id, p.auth_key,
365                                  p.crypt_algo_vpp_id, p.crypt_key,
366                                  self.vpp_esp_protocol)
367         p.tun_sa_in.add_vpp_config()
368
369     def config_sa_tun(self, p):
370         config_tun_params(p, self.encryption_type, p.tun_if)
371
372         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
373                                   p.auth_algo_vpp_id, p.auth_key,
374                                   p.crypt_algo_vpp_id, p.crypt_key,
375                                   self.vpp_esp_protocol,
376                                   self.tun_if.local_addr[p.addr_type],
377                                   self.tun_if.remote_addr[p.addr_type])
378         p.tun_sa_out.add_vpp_config()
379
380         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
381                                  p.auth_algo_vpp_id, p.auth_key,
382                                  p.crypt_algo_vpp_id, p.crypt_key,
383                                  self.vpp_esp_protocol,
384                                  self.tun_if.remote_addr[p.addr_type],
385                                  self.tun_if.local_addr[p.addr_type])
386         p.tun_sa_in.add_vpp_config()
387
388     def config_protect(self, p):
389         p.tun_protect = VppIpsecTunProtect(self,
390                                            p.tun_if,
391                                            p.tun_sa_out,
392                                            [p.tun_sa_in])
393         p.tun_protect.add_vpp_config()
394
395     def config_network(self, p):
396         if hasattr(p, 'tun_dst'):
397             tun_dst = p.tun_dst
398         else:
399             tun_dst = self.pg0.remote_ip6
400         p.tun_if = VppIpIpTunInterface(self, self.pg0,
401                                        self.pg0.local_ip6,
402                                        tun_dst)
403         p.tun_if.add_vpp_config()
404         p.tun_if.admin_up()
405         p.tun_if.config_ip6()
406         p.tun_if.config_ip4()
407
408         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
409                              [VppRoutePath(p.tun_if.remote_ip6,
410                                            0xffffffff,
411                                            proto=DpoProto.DPO_PROTO_IP6)])
412         p.route.add_vpp_config()
413         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
414                        [VppRoutePath(p.tun_if.remote_ip4,
415                                      0xffffffff)])
416         r.add_vpp_config()
417
418     def unconfig_network(self, p):
419         p.route.remove_vpp_config()
420         p.tun_if.remove_vpp_config()
421
422     def unconfig_protect(self, p):
423         p.tun_protect.remove_vpp_config()
424
425     def unconfig_sa(self, p):
426         p.tun_sa_out.remove_vpp_config()
427         p.tun_sa_in.remove_vpp_config()
428
429
430 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
431                              TemplateIpsec):
432     """ IPsec tunnel interface tests """
433
434     encryption_type = ESP
435
436     def setUp(self):
437         super(TemplateIpsec6TunIfEsp, self).setUp()
438
439         self.tun_if = self.pg0
440
441         p = self.ipv6_params
442         self.config_network(p)
443         self.config_sa_tra(p)
444         self.config_protect(p)
445
446     def tearDown(self):
447         super(TemplateIpsec6TunIfEsp, self).tearDown()
448
449
450 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
451                           IpsecTun6Tests):
452     """ Ipsec ESP - TUN tests """
453     tun6_encrypt_node_name = "esp6-encrypt-tun"
454     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
455
456     def test_tun_basic46(self):
457         """ ipsec 4o6 tunnel basic test """
458         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
459         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
460
461     def test_tun_burst46(self):
462         """ ipsec 4o6 tunnel burst test """
463         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
464         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
465
466
467 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
468                                 IpsecTun6HandoffTests):
469     """ Ipsec ESP 6 Handoff tests """
470     tun6_encrypt_node_name = "esp6-encrypt-tun"
471     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
472
473     def test_tun_handoff_66_police(self):
474         """ ESP 6o6 tunnel with policer worker hand-off test """
475         self.vapi.cli("clear errors")
476         self.vapi.cli("clear ipsec sa")
477
478         N_PKTS = 15
479         p = self.params[socket.AF_INET6]
480
481         action_tx = PolicerAction(
482             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
483             0)
484         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
485                              conform_action=action_tx,
486                              exceed_action=action_tx,
487                              violate_action=action_tx)
488         policer.add_vpp_config()
489
490         # Start policing on tun
491         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
492
493         for pol_bind in [1, 0]:
494             policer.bind_vpp_config(pol_bind, True)
495
496             # inject alternately on worker 0 and 1.
497             for worker in [0, 1, 0, 1]:
498                 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
499                                                    self.tun_if,
500                                                    src=p.remote_tun_if_host,
501                                                    dst=self.pg1.remote_ip6,
502                                                    count=N_PKTS)
503                 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
504                                                  self.pg1, worker=worker)
505                 self.verify_decrypted6(p, recv_pkts)
506                 self.logger.debug(self.vapi.cli("show trace max 100"))
507
508             stats = policer.get_stats()
509             stats0 = policer.get_stats(worker=0)
510             stats1 = policer.get_stats(worker=1)
511
512             if pol_bind == 1:
513                 # First pass: Worker 1, should have done all the policing
514                 self.assertEqual(stats, stats1)
515
516                 # Worker 0, should have handed everything off
517                 self.assertEqual(stats0['conform_packets'], 0)
518                 self.assertEqual(stats0['exceed_packets'], 0)
519                 self.assertEqual(stats0['violate_packets'], 0)
520             else:
521                 # Second pass: both workers should have policed equal amounts
522                 self.assertGreater(stats1['conform_packets'], 0)
523                 self.assertEqual(stats1['exceed_packets'], 0)
524                 self.assertGreater(stats1['violate_packets'], 0)
525
526                 self.assertGreater(stats0['conform_packets'], 0)
527                 self.assertEqual(stats0['exceed_packets'], 0)
528                 self.assertGreater(stats0['violate_packets'], 0)
529
530                 self.assertEqual(stats0['conform_packets'] +
531                                  stats0['violate_packets'],
532                                  stats1['conform_packets'] +
533                                  stats1['violate_packets'])
534
535         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
536         policer.remove_vpp_config()
537
538
539 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
540                                 IpsecTun4HandoffTests):
541     """ Ipsec ESP 4 Handoff tests """
542     tun4_encrypt_node_name = "esp4-encrypt-tun"
543     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
544
545     def test_tun_handoff_44_police(self):
546         """ ESP 4o4 tunnel with policer worker hand-off test """
547         self.vapi.cli("clear errors")
548         self.vapi.cli("clear ipsec sa")
549
550         N_PKTS = 15
551         p = self.params[socket.AF_INET]
552
553         action_tx = PolicerAction(
554             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
555             0)
556         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
557                              conform_action=action_tx,
558                              exceed_action=action_tx,
559                              violate_action=action_tx)
560         policer.add_vpp_config()
561
562         # Start policing on tun
563         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
564
565         for pol_bind in [1, 0]:
566             policer.bind_vpp_config(pol_bind, True)
567
568             # inject alternately on worker 0 and 1.
569             for worker in [0, 1, 0, 1]:
570                 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
571                                                   self.tun_if,
572                                                   src=p.remote_tun_if_host,
573                                                   dst=self.pg1.remote_ip4,
574                                                   count=N_PKTS)
575                 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
576                                                  self.pg1, worker=worker)
577                 self.verify_decrypted(p, recv_pkts)
578                 self.logger.debug(self.vapi.cli("show trace max 100"))
579
580             stats = policer.get_stats()
581             stats0 = policer.get_stats(worker=0)
582             stats1 = policer.get_stats(worker=1)
583
584             if pol_bind == 1:
585                 # First pass: Worker 1, should have done all the policing
586                 self.assertEqual(stats, stats1)
587
588                 # Worker 0, should have handed everything off
589                 self.assertEqual(stats0['conform_packets'], 0)
590                 self.assertEqual(stats0['exceed_packets'], 0)
591                 self.assertEqual(stats0['violate_packets'], 0)
592             else:
593                 # Second pass: both workers should have policed equal amounts
594                 self.assertGreater(stats1['conform_packets'], 0)
595                 self.assertEqual(stats1['exceed_packets'], 0)
596                 self.assertGreater(stats1['violate_packets'], 0)
597
598                 self.assertGreater(stats0['conform_packets'], 0)
599                 self.assertEqual(stats0['exceed_packets'], 0)
600                 self.assertGreater(stats0['violate_packets'], 0)
601
602                 self.assertEqual(stats0['conform_packets'] +
603                                  stats0['violate_packets'],
604                                  stats1['conform_packets'] +
605                                  stats1['violate_packets'])
606
607         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
608         policer.remove_vpp_config()
609
610
611 @tag_fixme_vpp_workers
612 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
613                               TemplateIpsec,
614                               IpsecTun4):
615     """ IPsec IPv4 Multi Tunnel interface """
616
617     encryption_type = ESP
618     tun4_encrypt_node_name = "esp4-encrypt-tun"
619     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
620
621     def setUp(self):
622         super(TestIpsec4MultiTunIfEsp, self).setUp()
623
624         self.tun_if = self.pg0
625
626         self.multi_params = []
627         self.pg0.generate_remote_hosts(10)
628         self.pg0.configure_ipv4_neighbors()
629
630         for ii in range(10):
631             p = copy.copy(self.ipv4_params)
632
633             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
634             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
635             p.scapy_tun_spi = p.scapy_tun_spi + ii
636             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
637             p.vpp_tun_spi = p.vpp_tun_spi + ii
638
639             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
640             p.scapy_tra_spi = p.scapy_tra_spi + ii
641             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
642             p.vpp_tra_spi = p.vpp_tra_spi + ii
643             p.tun_dst = self.pg0.remote_hosts[ii].ip4
644
645             self.multi_params.append(p)
646             self.config_network(p)
647             self.config_sa_tra(p)
648             self.config_protect(p)
649
650     def tearDown(self):
651         super(TestIpsec4MultiTunIfEsp, self).tearDown()
652
653     def test_tun_44(self):
654         """Multiple IPSEC tunnel interfaces """
655         for p in self.multi_params:
656             self.verify_tun_44(p, count=127)
657             c = p.tun_if.get_rx_stats()
658             self.assertEqual(c['packets'], 127)
659             c = p.tun_if.get_tx_stats()
660             self.assertEqual(c['packets'], 127)
661
662     def test_tun_rr_44(self):
663         """ Round-robin packets acrros multiple interface """
664         tx = []
665         for p in self.multi_params:
666             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
667                                             src=p.remote_tun_if_host,
668                                             dst=self.pg1.remote_ip4)
669         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
670
671         for rx, p in zip(rxs, self.multi_params):
672             self.verify_decrypted(p, [rx])
673
674         tx = []
675         for p in self.multi_params:
676             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
677                                     dst=p.remote_tun_if_host)
678         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
679
680         for rx, p in zip(rxs, self.multi_params):
681             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
682
683
684 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
685                             TemplateIpsec,
686                             IpsecTun4):
687     """ IPsec IPv4 Tunnel interface all Algos """
688
689     encryption_type = ESP
690     tun4_encrypt_node_name = "esp4-encrypt-tun"
691     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
692
693     def setUp(self):
694         super(TestIpsec4TunIfEspAll, self).setUp()
695
696         self.tun_if = self.pg0
697         p = self.ipv4_params
698
699         self.config_network(p)
700         self.config_sa_tra(p)
701         self.config_protect(p)
702
703     def tearDown(self):
704         p = self.ipv4_params
705         self.unconfig_protect(p)
706         self.unconfig_network(p)
707         self.unconfig_sa(p)
708
709         super(TestIpsec4TunIfEspAll, self).tearDown()
710
711     def rekey(self, p):
712         #
713         # change the key and the SPI
714         #
715         np = copy.copy(p)
716         p.crypt_key = b'X' + p.crypt_key[1:]
717         p.scapy_tun_spi += 1
718         p.scapy_tun_sa_id += 1
719         p.vpp_tun_spi += 1
720         p.vpp_tun_sa_id += 1
721         p.tun_if.local_spi = p.vpp_tun_spi
722         p.tun_if.remote_spi = p.scapy_tun_spi
723
724         config_tun_params(p, self.encryption_type, p.tun_if)
725
726         p.tun_sa_out = VppIpsecSA(self,
727                                   p.scapy_tun_sa_id,
728                                   p.scapy_tun_spi,
729                                   p.auth_algo_vpp_id,
730                                   p.auth_key,
731                                   p.crypt_algo_vpp_id,
732                                   p.crypt_key,
733                                   self.vpp_esp_protocol,
734                                   flags=p.flags,
735                                   salt=p.salt)
736         p.tun_sa_in = VppIpsecSA(self,
737                                  p.vpp_tun_sa_id,
738                                  p.vpp_tun_spi,
739                                  p.auth_algo_vpp_id,
740                                  p.auth_key,
741                                  p.crypt_algo_vpp_id,
742                                  p.crypt_key,
743                                  self.vpp_esp_protocol,
744                                  flags=p.flags,
745                                  salt=p.salt)
746         p.tun_sa_in.add_vpp_config()
747         p.tun_sa_out.add_vpp_config()
748
749         self.config_protect(p)
750         np.tun_sa_out.remove_vpp_config()
751         np.tun_sa_in.remove_vpp_config()
752         self.logger.info(self.vapi.cli("sh ipsec sa"))
753
754     def test_tun_44(self):
755         """IPSEC tunnel all algos """
756
757         # foreach VPP crypto engine
758         engines = ["ia32", "ipsecmb", "openssl"]
759
760         # foreach crypto algorithm
761         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
762                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
763                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
764                                 IPSEC_API_INTEG_ALG_NONE),
765                   'scapy-crypto': "AES-GCM",
766                   'scapy-integ': "NULL",
767                   'key': b"JPjyOWBeVEQiMe7h",
768                   'salt': 3333},
769                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
770                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
771                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
772                                 IPSEC_API_INTEG_ALG_NONE),
773                   'scapy-crypto': "AES-GCM",
774                   'scapy-integ': "NULL",
775                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
776                   'salt': 0},
777                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
778                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
779                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
780                                 IPSEC_API_INTEG_ALG_NONE),
781                   'scapy-crypto': "AES-GCM",
782                   'scapy-integ': "NULL",
783                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
784                   'salt': 9999},
785                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
786                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
787                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
788                                 IPSEC_API_INTEG_ALG_SHA1_96),
789                   'scapy-crypto': "AES-CBC",
790                   'scapy-integ': "HMAC-SHA1-96",
791                   'salt': 0,
792                   'key': b"JPjyOWBeVEQiMe7h"},
793                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
794                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
795                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
796                                 IPSEC_API_INTEG_ALG_SHA_512_256),
797                   'scapy-crypto': "AES-CBC",
798                   'scapy-integ': "SHA2-512-256",
799                   'salt': 0,
800                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
801                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
802                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
803                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
804                                 IPSEC_API_INTEG_ALG_SHA_256_128),
805                   'scapy-crypto': "AES-CBC",
806                   'scapy-integ': "SHA2-256-128",
807                   'salt': 0,
808                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
809                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
810                                  IPSEC_API_CRYPTO_ALG_NONE),
811                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
812                                 IPSEC_API_INTEG_ALG_SHA1_96),
813                   'scapy-crypto': "NULL",
814                   'scapy-integ': "HMAC-SHA1-96",
815                   'salt': 0,
816                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
817
818         for engine in engines:
819             self.vapi.cli("set crypto handler all %s" % engine)
820
821             #
822             # loop through each of the algorithms
823             #
824             for algo in algos:
825                 # with self.subTest(algo=algo['scapy']):
826
827                 p = self.ipv4_params
828                 p.auth_algo_vpp_id = algo['vpp-integ']
829                 p.crypt_algo_vpp_id = algo['vpp-crypto']
830                 p.crypt_algo = algo['scapy-crypto']
831                 p.auth_algo = algo['scapy-integ']
832                 p.crypt_key = algo['key']
833                 p.salt = algo['salt']
834
835                 #
836                 # rekey the tunnel
837                 #
838                 self.rekey(p)
839                 self.verify_tun_44(p, count=127)
840
841
842 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
843                                TemplateIpsec,
844                                IpsecTun4):
845     """ IPsec IPv4 Tunnel interface no Algos """
846
847     encryption_type = ESP
848     tun4_encrypt_node_name = "esp4-encrypt-tun"
849     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
850
851     def setUp(self):
852         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
853
854         self.tun_if = self.pg0
855         p = self.ipv4_params
856         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
857                               IPSEC_API_INTEG_ALG_NONE)
858         p.auth_algo = 'NULL'
859         p.auth_key = []
860
861         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
862                                IPSEC_API_CRYPTO_ALG_NONE)
863         p.crypt_algo = 'NULL'
864         p.crypt_key = []
865
866     def tearDown(self):
867         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
868
869     def test_tun_44(self):
870         """ IPSec SA with NULL algos """
871         p = self.ipv4_params
872
873         self.config_network(p)
874         self.config_sa_tra(p)
875         self.config_protect(p)
876
877         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
878                            dst=p.remote_tun_if_host)
879         self.send_and_assert_no_replies(self.pg1, tx)
880
881         self.unconfig_protect(p)
882         self.unconfig_sa(p)
883         self.unconfig_network(p)
884
885
886 @tag_fixme_vpp_workers
887 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
888                               TemplateIpsec,
889                               IpsecTun6):
890     """ IPsec IPv6 Multi Tunnel interface """
891
892     encryption_type = ESP
893     tun6_encrypt_node_name = "esp6-encrypt-tun"
894     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
895
896     def setUp(self):
897         super(TestIpsec6MultiTunIfEsp, self).setUp()
898
899         self.tun_if = self.pg0
900
901         self.multi_params = []
902         self.pg0.generate_remote_hosts(10)
903         self.pg0.configure_ipv6_neighbors()
904
905         for ii in range(10):
906             p = copy.copy(self.ipv6_params)
907
908             p.remote_tun_if_host = "1111::%d" % (ii + 1)
909             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
910             p.scapy_tun_spi = p.scapy_tun_spi + ii
911             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
912             p.vpp_tun_spi = p.vpp_tun_spi + ii
913
914             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
915             p.scapy_tra_spi = p.scapy_tra_spi + ii
916             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
917             p.vpp_tra_spi = p.vpp_tra_spi + ii
918             p.tun_dst = self.pg0.remote_hosts[ii].ip6
919
920             self.multi_params.append(p)
921             self.config_network(p)
922             self.config_sa_tra(p)
923             self.config_protect(p)
924
925     def tearDown(self):
926         super(TestIpsec6MultiTunIfEsp, self).tearDown()
927
928     def test_tun_66(self):
929         """Multiple IPSEC tunnel interfaces """
930         for p in self.multi_params:
931             self.verify_tun_66(p, count=127)
932             c = p.tun_if.get_rx_stats()
933             self.assertEqual(c['packets'], 127)
934             c = p.tun_if.get_tx_stats()
935             self.assertEqual(c['packets'], 127)
936
937
938 class TestIpsecGreTebIfEsp(TemplateIpsec,
939                            IpsecTun4Tests):
940     """ Ipsec GRE TEB ESP - TUN tests """
941     tun4_encrypt_node_name = "esp4-encrypt-tun"
942     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
943     encryption_type = ESP
944     omac = "00:11:22:33:44:55"
945
946     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
947                          payload_size=100):
948         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
949                 sa.encrypt(IP(src=self.pg0.remote_ip4,
950                               dst=self.pg0.local_ip4) /
951                            GRE() /
952                            Ether(dst=self.omac) /
953                            IP(src="1.1.1.1", dst="1.1.1.2") /
954                            UDP(sport=1144, dport=2233) /
955                            Raw(b'X' * payload_size))
956                 for i in range(count)]
957
958     def gen_pkts(self, sw_intf, src, dst, count=1,
959                  payload_size=100):
960         return [Ether(dst=self.omac) /
961                 IP(src="1.1.1.1", dst="1.1.1.2") /
962                 UDP(sport=1144, dport=2233) /
963                 Raw(b'X' * payload_size)
964                 for i in range(count)]
965
966     def verify_decrypted(self, p, rxs):
967         for rx in rxs:
968             self.assert_equal(rx[Ether].dst, self.omac)
969             self.assert_equal(rx[IP].dst, "1.1.1.2")
970
971     def verify_encrypted(self, p, sa, rxs):
972         for rx in rxs:
973             try:
974                 pkt = sa.decrypt(rx[IP])
975                 if not pkt.haslayer(IP):
976                     pkt = IP(pkt[Raw].load)
977                 self.assert_packet_checksums_valid(pkt)
978                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
979                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
980                 self.assertTrue(pkt.haslayer(GRE))
981                 e = pkt[Ether]
982                 self.assertEqual(e[Ether].dst, self.omac)
983                 self.assertEqual(e[IP].dst, "1.1.1.2")
984             except (IndexError, AssertionError):
985                 self.logger.debug(ppp("Unexpected packet:", rx))
986                 try:
987                     self.logger.debug(ppp("Decrypted packet:", pkt))
988                 except:
989                     pass
990                 raise
991
992     def setUp(self):
993         super(TestIpsecGreTebIfEsp, self).setUp()
994
995         self.tun_if = self.pg0
996
997         p = self.ipv4_params
998
999         bd1 = VppBridgeDomain(self, 1)
1000         bd1.add_vpp_config()
1001
1002         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1003                                   p.auth_algo_vpp_id, p.auth_key,
1004                                   p.crypt_algo_vpp_id, p.crypt_key,
1005                                   self.vpp_esp_protocol,
1006                                   self.pg0.local_ip4,
1007                                   self.pg0.remote_ip4)
1008         p.tun_sa_out.add_vpp_config()
1009
1010         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1011                                  p.auth_algo_vpp_id, p.auth_key,
1012                                  p.crypt_algo_vpp_id, p.crypt_key,
1013                                  self.vpp_esp_protocol,
1014                                  self.pg0.remote_ip4,
1015                                  self.pg0.local_ip4)
1016         p.tun_sa_in.add_vpp_config()
1017
1018         p.tun_if = VppGreInterface(self,
1019                                    self.pg0.local_ip4,
1020                                    self.pg0.remote_ip4,
1021                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1022                                          GRE_API_TUNNEL_TYPE_TEB))
1023         p.tun_if.add_vpp_config()
1024
1025         p.tun_protect = VppIpsecTunProtect(self,
1026                                            p.tun_if,
1027                                            p.tun_sa_out,
1028                                            [p.tun_sa_in])
1029
1030         p.tun_protect.add_vpp_config()
1031
1032         p.tun_if.admin_up()
1033         p.tun_if.config_ip4()
1034         config_tun_params(p, self.encryption_type, p.tun_if)
1035
1036         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1037         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1038
1039         self.vapi.cli("clear ipsec sa")
1040         self.vapi.cli("sh adj")
1041         self.vapi.cli("sh ipsec tun")
1042
1043     def tearDown(self):
1044         p = self.ipv4_params
1045         p.tun_if.unconfig_ip4()
1046         super(TestIpsecGreTebIfEsp, self).tearDown()
1047
1048
1049 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
1050                                IpsecTun4Tests):
1051     """ Ipsec GRE TEB ESP - TUN tests """
1052     tun4_encrypt_node_name = "esp4-encrypt-tun"
1053     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1054     encryption_type = ESP
1055     omac = "00:11:22:33:44:55"
1056
1057     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1058                          payload_size=100):
1059         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1060                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1061                               dst=self.pg0.local_ip4) /
1062                            GRE() /
1063                            Ether(dst=self.omac) /
1064                            IP(src="1.1.1.1", dst="1.1.1.2") /
1065                            UDP(sport=1144, dport=2233) /
1066                            Raw(b'X' * payload_size))
1067                 for i in range(count)]
1068
1069     def gen_pkts(self, sw_intf, src, dst, count=1,
1070                  payload_size=100):
1071         return [Ether(dst=self.omac) /
1072                 Dot1Q(vlan=11) /
1073                 IP(src="1.1.1.1", dst="1.1.1.2") /
1074                 UDP(sport=1144, dport=2233) /
1075                 Raw(b'X' * payload_size)
1076                 for i in range(count)]
1077
1078     def verify_decrypted(self, p, rxs):
1079         for rx in rxs:
1080             self.assert_equal(rx[Ether].dst, self.omac)
1081             self.assert_equal(rx[Dot1Q].vlan, 11)
1082             self.assert_equal(rx[IP].dst, "1.1.1.2")
1083
1084     def verify_encrypted(self, p, sa, rxs):
1085         for rx in rxs:
1086             try:
1087                 pkt = sa.decrypt(rx[IP])
1088                 if not pkt.haslayer(IP):
1089                     pkt = IP(pkt[Raw].load)
1090                 self.assert_packet_checksums_valid(pkt)
1091                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1092                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1093                 self.assertTrue(pkt.haslayer(GRE))
1094                 e = pkt[Ether]
1095                 self.assertEqual(e[Ether].dst, self.omac)
1096                 self.assertFalse(e.haslayer(Dot1Q))
1097                 self.assertEqual(e[IP].dst, "1.1.1.2")
1098             except (IndexError, AssertionError):
1099                 self.logger.debug(ppp("Unexpected packet:", rx))
1100                 try:
1101                     self.logger.debug(ppp("Decrypted packet:", pkt))
1102                 except:
1103                     pass
1104                 raise
1105
1106     def setUp(self):
1107         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1108
1109         self.tun_if = self.pg0
1110
1111         p = self.ipv4_params
1112
1113         bd1 = VppBridgeDomain(self, 1)
1114         bd1.add_vpp_config()
1115
1116         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1117         self.vapi.l2_interface_vlan_tag_rewrite(
1118             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
1119             push_dot1q=11)
1120         self.pg1_11.admin_up()
1121
1122         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1123                                   p.auth_algo_vpp_id, p.auth_key,
1124                                   p.crypt_algo_vpp_id, p.crypt_key,
1125                                   self.vpp_esp_protocol,
1126                                   self.pg0.local_ip4,
1127                                   self.pg0.remote_ip4)
1128         p.tun_sa_out.add_vpp_config()
1129
1130         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1131                                  p.auth_algo_vpp_id, p.auth_key,
1132                                  p.crypt_algo_vpp_id, p.crypt_key,
1133                                  self.vpp_esp_protocol,
1134                                  self.pg0.remote_ip4,
1135                                  self.pg0.local_ip4)
1136         p.tun_sa_in.add_vpp_config()
1137
1138         p.tun_if = VppGreInterface(self,
1139                                    self.pg0.local_ip4,
1140                                    self.pg0.remote_ip4,
1141                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1142                                          GRE_API_TUNNEL_TYPE_TEB))
1143         p.tun_if.add_vpp_config()
1144
1145         p.tun_protect = VppIpsecTunProtect(self,
1146                                            p.tun_if,
1147                                            p.tun_sa_out,
1148                                            [p.tun_sa_in])
1149
1150         p.tun_protect.add_vpp_config()
1151
1152         p.tun_if.admin_up()
1153         p.tun_if.config_ip4()
1154         config_tun_params(p, self.encryption_type, p.tun_if)
1155
1156         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1157         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1158
1159         self.vapi.cli("clear ipsec sa")
1160
1161     def tearDown(self):
1162         p = self.ipv4_params
1163         p.tun_if.unconfig_ip4()
1164         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1165         self.pg1_11.admin_down()
1166         self.pg1_11.remove_vpp_config()
1167
1168
1169 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1170                               IpsecTun4Tests):
1171     """ Ipsec GRE TEB ESP - Tra tests """
1172     tun4_encrypt_node_name = "esp4-encrypt-tun"
1173     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1174     encryption_type = ESP
1175     omac = "00:11:22:33:44:55"
1176
1177     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1178                          payload_size=100):
1179         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1180                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1181                               dst=self.pg0.local_ip4) /
1182                            GRE() /
1183                            Ether(dst=self.omac) /
1184                            IP(src="1.1.1.1", dst="1.1.1.2") /
1185                            UDP(sport=1144, dport=2233) /
1186                            Raw(b'X' * payload_size))
1187                 for i in range(count)]
1188
1189     def gen_pkts(self, sw_intf, src, dst, count=1,
1190                  payload_size=100):
1191         return [Ether(dst=self.omac) /
1192                 IP(src="1.1.1.1", dst="1.1.1.2") /
1193                 UDP(sport=1144, dport=2233) /
1194                 Raw(b'X' * payload_size)
1195                 for i in range(count)]
1196
1197     def verify_decrypted(self, p, rxs):
1198         for rx in rxs:
1199             self.assert_equal(rx[Ether].dst, self.omac)
1200             self.assert_equal(rx[IP].dst, "1.1.1.2")
1201
1202     def verify_encrypted(self, p, sa, rxs):
1203         for rx in rxs:
1204             try:
1205                 pkt = sa.decrypt(rx[IP])
1206                 if not pkt.haslayer(IP):
1207                     pkt = IP(pkt[Raw].load)
1208                 self.assert_packet_checksums_valid(pkt)
1209                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1210                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1211                 self.assertTrue(pkt.haslayer(GRE))
1212                 e = pkt[Ether]
1213                 self.assertEqual(e[Ether].dst, self.omac)
1214                 self.assertEqual(e[IP].dst, "1.1.1.2")
1215             except (IndexError, AssertionError):
1216                 self.logger.debug(ppp("Unexpected packet:", rx))
1217                 try:
1218                     self.logger.debug(ppp("Decrypted packet:", pkt))
1219                 except:
1220                     pass
1221                 raise
1222
1223     def setUp(self):
1224         super(TestIpsecGreTebIfEspTra, self).setUp()
1225
1226         self.tun_if = self.pg0
1227
1228         p = self.ipv4_params
1229
1230         bd1 = VppBridgeDomain(self, 1)
1231         bd1.add_vpp_config()
1232
1233         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1234                                   p.auth_algo_vpp_id, p.auth_key,
1235                                   p.crypt_algo_vpp_id, p.crypt_key,
1236                                   self.vpp_esp_protocol)
1237         p.tun_sa_out.add_vpp_config()
1238
1239         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1240                                  p.auth_algo_vpp_id, p.auth_key,
1241                                  p.crypt_algo_vpp_id, p.crypt_key,
1242                                  self.vpp_esp_protocol)
1243         p.tun_sa_in.add_vpp_config()
1244
1245         p.tun_if = VppGreInterface(self,
1246                                    self.pg0.local_ip4,
1247                                    self.pg0.remote_ip4,
1248                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1249                                          GRE_API_TUNNEL_TYPE_TEB))
1250         p.tun_if.add_vpp_config()
1251
1252         p.tun_protect = VppIpsecTunProtect(self,
1253                                            p.tun_if,
1254                                            p.tun_sa_out,
1255                                            [p.tun_sa_in])
1256
1257         p.tun_protect.add_vpp_config()
1258
1259         p.tun_if.admin_up()
1260         p.tun_if.config_ip4()
1261         config_tra_params(p, self.encryption_type, p.tun_if)
1262
1263         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1264         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1265
1266         self.vapi.cli("clear ipsec sa")
1267
1268     def tearDown(self):
1269         p = self.ipv4_params
1270         p.tun_if.unconfig_ip4()
1271         super(TestIpsecGreTebIfEspTra, self).tearDown()
1272
1273
1274 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1275                                  IpsecTun4Tests):
1276     """ Ipsec GRE TEB UDP ESP - Tra tests """
1277     tun4_encrypt_node_name = "esp4-encrypt-tun"
1278     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1279     encryption_type = ESP
1280     omac = "00:11:22:33:44:55"
1281
1282     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1283                          payload_size=100):
1284         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1285                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1286                               dst=self.pg0.local_ip4) /
1287                            GRE() /
1288                            Ether(dst=self.omac) /
1289                            IP(src="1.1.1.1", dst="1.1.1.2") /
1290                            UDP(sport=1144, dport=2233) /
1291                            Raw(b'X' * payload_size))
1292                 for i in range(count)]
1293
1294     def gen_pkts(self, sw_intf, src, dst, count=1,
1295                  payload_size=100):
1296         return [Ether(dst=self.omac) /
1297                 IP(src="1.1.1.1", dst="1.1.1.2") /
1298                 UDP(sport=1144, dport=2233) /
1299                 Raw(b'X' * payload_size)
1300                 for i in range(count)]
1301
1302     def verify_decrypted(self, p, rxs):
1303         for rx in rxs:
1304             self.assert_equal(rx[Ether].dst, self.omac)
1305             self.assert_equal(rx[IP].dst, "1.1.1.2")
1306
1307     def verify_encrypted(self, p, sa, rxs):
1308         for rx in rxs:
1309             self.assertTrue(rx.haslayer(UDP))
1310             self.assertEqual(rx[UDP].dport, 4545)
1311             self.assertEqual(rx[UDP].sport, 5454)
1312             try:
1313                 pkt = sa.decrypt(rx[IP])
1314                 if not pkt.haslayer(IP):
1315                     pkt = IP(pkt[Raw].load)
1316                 self.assert_packet_checksums_valid(pkt)
1317                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1318                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1319                 self.assertTrue(pkt.haslayer(GRE))
1320                 e = pkt[Ether]
1321                 self.assertEqual(e[Ether].dst, self.omac)
1322                 self.assertEqual(e[IP].dst, "1.1.1.2")
1323             except (IndexError, AssertionError):
1324                 self.logger.debug(ppp("Unexpected packet:", rx))
1325                 try:
1326                     self.logger.debug(ppp("Decrypted packet:", pkt))
1327                 except:
1328                     pass
1329                 raise
1330
1331     def setUp(self):
1332         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1333
1334         self.tun_if = self.pg0
1335
1336         p = self.ipv4_params
1337         p = self.ipv4_params
1338         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1339                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1340         p.nat_header = UDP(sport=5454, dport=4545)
1341
1342         bd1 = VppBridgeDomain(self, 1)
1343         bd1.add_vpp_config()
1344
1345         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1346                                   p.auth_algo_vpp_id, p.auth_key,
1347                                   p.crypt_algo_vpp_id, p.crypt_key,
1348                                   self.vpp_esp_protocol,
1349                                   flags=p.flags,
1350                                   udp_src=5454,
1351                                   udp_dst=4545)
1352         p.tun_sa_out.add_vpp_config()
1353
1354         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1355                                  p.auth_algo_vpp_id, p.auth_key,
1356                                  p.crypt_algo_vpp_id, p.crypt_key,
1357                                  self.vpp_esp_protocol,
1358                                  flags=(p.flags |
1359                                         VppEnum.vl_api_ipsec_sad_flags_t.
1360                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1361                                  udp_src=5454,
1362                                  udp_dst=4545)
1363         p.tun_sa_in.add_vpp_config()
1364
1365         p.tun_if = VppGreInterface(self,
1366                                    self.pg0.local_ip4,
1367                                    self.pg0.remote_ip4,
1368                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1369                                          GRE_API_TUNNEL_TYPE_TEB))
1370         p.tun_if.add_vpp_config()
1371
1372         p.tun_protect = VppIpsecTunProtect(self,
1373                                            p.tun_if,
1374                                            p.tun_sa_out,
1375                                            [p.tun_sa_in])
1376
1377         p.tun_protect.add_vpp_config()
1378
1379         p.tun_if.admin_up()
1380         p.tun_if.config_ip4()
1381         config_tra_params(p, self.encryption_type, p.tun_if)
1382
1383         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1384         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1385
1386         self.vapi.cli("clear ipsec sa")
1387         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1388
1389     def tearDown(self):
1390         p = self.ipv4_params
1391         p.tun_if.unconfig_ip4()
1392         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1393
1394
1395 class TestIpsecGreIfEsp(TemplateIpsec,
1396                         IpsecTun4Tests):
1397     """ Ipsec GRE ESP - TUN tests """
1398     tun4_encrypt_node_name = "esp4-encrypt-tun"
1399     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1400     encryption_type = ESP
1401
1402     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1403                          payload_size=100):
1404         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1405                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1406                               dst=self.pg0.local_ip4) /
1407                            GRE() /
1408                            IP(src=self.pg1.local_ip4,
1409                               dst=self.pg1.remote_ip4) /
1410                            UDP(sport=1144, dport=2233) /
1411                            Raw(b'X' * payload_size))
1412                 for i in range(count)]
1413
1414     def gen_pkts(self, sw_intf, src, dst, count=1,
1415                  payload_size=100):
1416         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1417                 IP(src="1.1.1.1", dst="1.1.1.2") /
1418                 UDP(sport=1144, dport=2233) /
1419                 Raw(b'X' * payload_size)
1420                 for i in range(count)]
1421
1422     def verify_decrypted(self, p, rxs):
1423         for rx in rxs:
1424             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1425             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1426
1427     def verify_encrypted(self, p, sa, rxs):
1428         for rx in rxs:
1429             try:
1430                 pkt = sa.decrypt(rx[IP])
1431                 if not pkt.haslayer(IP):
1432                     pkt = IP(pkt[Raw].load)
1433                 self.assert_packet_checksums_valid(pkt)
1434                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1435                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1436                 self.assertTrue(pkt.haslayer(GRE))
1437                 e = pkt[GRE]
1438                 self.assertEqual(e[IP].dst, "1.1.1.2")
1439             except (IndexError, AssertionError):
1440                 self.logger.debug(ppp("Unexpected packet:", rx))
1441                 try:
1442                     self.logger.debug(ppp("Decrypted packet:", pkt))
1443                 except:
1444                     pass
1445                 raise
1446
1447     def setUp(self):
1448         super(TestIpsecGreIfEsp, self).setUp()
1449
1450         self.tun_if = self.pg0
1451
1452         p = self.ipv4_params
1453
1454         bd1 = VppBridgeDomain(self, 1)
1455         bd1.add_vpp_config()
1456
1457         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1458                                   p.auth_algo_vpp_id, p.auth_key,
1459                                   p.crypt_algo_vpp_id, p.crypt_key,
1460                                   self.vpp_esp_protocol,
1461                                   self.pg0.local_ip4,
1462                                   self.pg0.remote_ip4)
1463         p.tun_sa_out.add_vpp_config()
1464
1465         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1466                                  p.auth_algo_vpp_id, p.auth_key,
1467                                  p.crypt_algo_vpp_id, p.crypt_key,
1468                                  self.vpp_esp_protocol,
1469                                  self.pg0.remote_ip4,
1470                                  self.pg0.local_ip4)
1471         p.tun_sa_in.add_vpp_config()
1472
1473         p.tun_if = VppGreInterface(self,
1474                                    self.pg0.local_ip4,
1475                                    self.pg0.remote_ip4)
1476         p.tun_if.add_vpp_config()
1477
1478         p.tun_protect = VppIpsecTunProtect(self,
1479                                            p.tun_if,
1480                                            p.tun_sa_out,
1481                                            [p.tun_sa_in])
1482         p.tun_protect.add_vpp_config()
1483
1484         p.tun_if.admin_up()
1485         p.tun_if.config_ip4()
1486         config_tun_params(p, self.encryption_type, p.tun_if)
1487
1488         VppIpRoute(self, "1.1.1.2", 32,
1489                    [VppRoutePath(p.tun_if.remote_ip4,
1490                                  0xffffffff)]).add_vpp_config()
1491
1492     def tearDown(self):
1493         p = self.ipv4_params
1494         p.tun_if.unconfig_ip4()
1495         super(TestIpsecGreIfEsp, self).tearDown()
1496
1497
1498 class TestIpsecGreIfEspTra(TemplateIpsec,
1499                            IpsecTun4Tests):
1500     """ Ipsec GRE ESP - TRA tests """
1501     tun4_encrypt_node_name = "esp4-encrypt-tun"
1502     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1503     encryption_type = ESP
1504
1505     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1506                          payload_size=100):
1507         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1508                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1509                               dst=self.pg0.local_ip4) /
1510                            GRE() /
1511                            IP(src=self.pg1.local_ip4,
1512                               dst=self.pg1.remote_ip4) /
1513                            UDP(sport=1144, dport=2233) /
1514                            Raw(b'X' * payload_size))
1515                 for i in range(count)]
1516
1517     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1518                                 payload_size=100):
1519         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1520                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1521                               dst=self.pg0.local_ip4) /
1522                            GRE() /
1523                            UDP(sport=1144, dport=2233) /
1524                            Raw(b'X' * payload_size))
1525                 for i in range(count)]
1526
1527     def gen_pkts(self, sw_intf, src, dst, count=1,
1528                  payload_size=100):
1529         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1530                 IP(src="1.1.1.1", dst="1.1.1.2") /
1531                 UDP(sport=1144, dport=2233) /
1532                 Raw(b'X' * payload_size)
1533                 for i in range(count)]
1534
1535     def verify_decrypted(self, p, rxs):
1536         for rx in rxs:
1537             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1538             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1539
1540     def verify_encrypted(self, p, sa, rxs):
1541         for rx in rxs:
1542             try:
1543                 pkt = sa.decrypt(rx[IP])
1544                 if not pkt.haslayer(IP):
1545                     pkt = IP(pkt[Raw].load)
1546                 self.assert_packet_checksums_valid(pkt)
1547                 self.assertTrue(pkt.haslayer(GRE))
1548                 e = pkt[GRE]
1549                 self.assertEqual(e[IP].dst, "1.1.1.2")
1550             except (IndexError, AssertionError):
1551                 self.logger.debug(ppp("Unexpected packet:", rx))
1552                 try:
1553                     self.logger.debug(ppp("Decrypted packet:", pkt))
1554                 except:
1555                     pass
1556                 raise
1557
1558     def setUp(self):
1559         super(TestIpsecGreIfEspTra, self).setUp()
1560
1561         self.tun_if = self.pg0
1562
1563         p = self.ipv4_params
1564
1565         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1566                                   p.auth_algo_vpp_id, p.auth_key,
1567                                   p.crypt_algo_vpp_id, p.crypt_key,
1568                                   self.vpp_esp_protocol)
1569         p.tun_sa_out.add_vpp_config()
1570
1571         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1572                                  p.auth_algo_vpp_id, p.auth_key,
1573                                  p.crypt_algo_vpp_id, p.crypt_key,
1574                                  self.vpp_esp_protocol)
1575         p.tun_sa_in.add_vpp_config()
1576
1577         p.tun_if = VppGreInterface(self,
1578                                    self.pg0.local_ip4,
1579                                    self.pg0.remote_ip4)
1580         p.tun_if.add_vpp_config()
1581
1582         p.tun_protect = VppIpsecTunProtect(self,
1583                                            p.tun_if,
1584                                            p.tun_sa_out,
1585                                            [p.tun_sa_in])
1586         p.tun_protect.add_vpp_config()
1587
1588         p.tun_if.admin_up()
1589         p.tun_if.config_ip4()
1590         config_tra_params(p, self.encryption_type, p.tun_if)
1591
1592         VppIpRoute(self, "1.1.1.2", 32,
1593                    [VppRoutePath(p.tun_if.remote_ip4,
1594                                  0xffffffff)]).add_vpp_config()
1595
1596     def tearDown(self):
1597         p = self.ipv4_params
1598         p.tun_if.unconfig_ip4()
1599         super(TestIpsecGreIfEspTra, self).tearDown()
1600
1601     def test_gre_non_ip(self):
1602         p = self.ipv4_params
1603         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1604                                           src=p.remote_tun_if_host,
1605                                           dst=self.pg1.remote_ip6)
1606         self.send_and_assert_no_replies(self.tun_if, tx)
1607         node_name = ('/err/%s/unsupported payload' %
1608                      self.tun4_decrypt_node_name[0])
1609         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1610
1611
1612 class TestIpsecGre6IfEspTra(TemplateIpsec,
1613                             IpsecTun6Tests):
1614     """ Ipsec GRE ESP - TRA tests """
1615     tun6_encrypt_node_name = "esp6-encrypt-tun"
1616     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1617     encryption_type = ESP
1618
1619     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1620                           payload_size=100):
1621         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1622                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1623                                 dst=self.pg0.local_ip6) /
1624                            GRE() /
1625                            IPv6(src=self.pg1.local_ip6,
1626                                 dst=self.pg1.remote_ip6) /
1627                            UDP(sport=1144, dport=2233) /
1628                            Raw(b'X' * payload_size))
1629                 for i in range(count)]
1630
1631     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1632                   payload_size=100):
1633         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1634                 IPv6(src="1::1", dst="1::2") /
1635                 UDP(sport=1144, dport=2233) /
1636                 Raw(b'X' * payload_size)
1637                 for i in range(count)]
1638
1639     def verify_decrypted6(self, p, rxs):
1640         for rx in rxs:
1641             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1642             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1643
1644     def verify_encrypted6(self, p, sa, rxs):
1645         for rx in rxs:
1646             try:
1647                 pkt = sa.decrypt(rx[IPv6])
1648                 if not pkt.haslayer(IPv6):
1649                     pkt = IPv6(pkt[Raw].load)
1650                 self.assert_packet_checksums_valid(pkt)
1651                 self.assertTrue(pkt.haslayer(GRE))
1652                 e = pkt[GRE]
1653                 self.assertEqual(e[IPv6].dst, "1::2")
1654             except (IndexError, AssertionError):
1655                 self.logger.debug(ppp("Unexpected packet:", rx))
1656                 try:
1657                     self.logger.debug(ppp("Decrypted packet:", pkt))
1658                 except:
1659                     pass
1660                 raise
1661
1662     def setUp(self):
1663         super(TestIpsecGre6IfEspTra, self).setUp()
1664
1665         self.tun_if = self.pg0
1666
1667         p = self.ipv6_params
1668
1669         bd1 = VppBridgeDomain(self, 1)
1670         bd1.add_vpp_config()
1671
1672         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1673                                   p.auth_algo_vpp_id, p.auth_key,
1674                                   p.crypt_algo_vpp_id, p.crypt_key,
1675                                   self.vpp_esp_protocol)
1676         p.tun_sa_out.add_vpp_config()
1677
1678         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1679                                  p.auth_algo_vpp_id, p.auth_key,
1680                                  p.crypt_algo_vpp_id, p.crypt_key,
1681                                  self.vpp_esp_protocol)
1682         p.tun_sa_in.add_vpp_config()
1683
1684         p.tun_if = VppGreInterface(self,
1685                                    self.pg0.local_ip6,
1686                                    self.pg0.remote_ip6)
1687         p.tun_if.add_vpp_config()
1688
1689         p.tun_protect = VppIpsecTunProtect(self,
1690                                            p.tun_if,
1691                                            p.tun_sa_out,
1692                                            [p.tun_sa_in])
1693         p.tun_protect.add_vpp_config()
1694
1695         p.tun_if.admin_up()
1696         p.tun_if.config_ip6()
1697         config_tra_params(p, self.encryption_type, p.tun_if)
1698
1699         r = VppIpRoute(self, "1::2", 128,
1700                        [VppRoutePath(p.tun_if.remote_ip6,
1701                                      0xffffffff,
1702                                      proto=DpoProto.DPO_PROTO_IP6)])
1703         r.add_vpp_config()
1704
1705     def tearDown(self):
1706         p = self.ipv6_params
1707         p.tun_if.unconfig_ip6()
1708         super(TestIpsecGre6IfEspTra, self).tearDown()
1709
1710
1711 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1712     """ Ipsec mGRE ESP v4 TRA tests """
1713     tun4_encrypt_node_name = "esp4-encrypt-tun"
1714     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1715     encryption_type = ESP
1716
1717     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1718                          payload_size=100):
1719         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1720                 sa.encrypt(IP(src=p.tun_dst,
1721                               dst=self.pg0.local_ip4) /
1722                            GRE() /
1723                            IP(src=self.pg1.local_ip4,
1724                               dst=self.pg1.remote_ip4) /
1725                            UDP(sport=1144, dport=2233) /
1726                            Raw(b'X' * payload_size))
1727                 for i in range(count)]
1728
1729     def gen_pkts(self, sw_intf, src, dst, count=1,
1730                  payload_size=100):
1731         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1732                 IP(src="1.1.1.1", dst=dst) /
1733                 UDP(sport=1144, dport=2233) /
1734                 Raw(b'X' * payload_size)
1735                 for i in range(count)]
1736
1737     def verify_decrypted(self, p, rxs):
1738         for rx in rxs:
1739             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1740             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1741
1742     def verify_encrypted(self, p, sa, rxs):
1743         for rx in rxs:
1744             try:
1745                 pkt = sa.decrypt(rx[IP])
1746                 if not pkt.haslayer(IP):
1747                     pkt = IP(pkt[Raw].load)
1748                 self.assert_packet_checksums_valid(pkt)
1749                 self.assertTrue(pkt.haslayer(GRE))
1750                 e = pkt[GRE]
1751                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1752             except (IndexError, AssertionError):
1753                 self.logger.debug(ppp("Unexpected packet:", rx))
1754                 try:
1755                     self.logger.debug(ppp("Decrypted packet:", pkt))
1756                 except:
1757                     pass
1758                 raise
1759
1760     def setUp(self):
1761         super(TestIpsecMGreIfEspTra4, self).setUp()
1762
1763         N_NHS = 16
1764         self.tun_if = self.pg0
1765         p = self.ipv4_params
1766         p.tun_if = VppGreInterface(self,
1767                                    self.pg0.local_ip4,
1768                                    "0.0.0.0",
1769                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1770                                          TUNNEL_API_MODE_MP))
1771         p.tun_if.add_vpp_config()
1772         p.tun_if.admin_up()
1773         p.tun_if.config_ip4()
1774         p.tun_if.generate_remote_hosts(N_NHS)
1775         self.pg0.generate_remote_hosts(N_NHS)
1776         self.pg0.configure_ipv4_neighbors()
1777
1778         # setup some SAs for several next-hops on the interface
1779         self.multi_params = []
1780
1781         for ii in range(N_NHS):
1782             p = copy.copy(self.ipv4_params)
1783
1784             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1785             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1786             p.scapy_tun_spi = p.scapy_tun_spi + ii
1787             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1788             p.vpp_tun_spi = p.vpp_tun_spi + ii
1789
1790             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1791             p.scapy_tra_spi = p.scapy_tra_spi + ii
1792             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1793             p.vpp_tra_spi = p.vpp_tra_spi + ii
1794             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1795                                       p.auth_algo_vpp_id, p.auth_key,
1796                                       p.crypt_algo_vpp_id, p.crypt_key,
1797                                       self.vpp_esp_protocol)
1798             p.tun_sa_out.add_vpp_config()
1799
1800             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1801                                      p.auth_algo_vpp_id, p.auth_key,
1802                                      p.crypt_algo_vpp_id, p.crypt_key,
1803                                      self.vpp_esp_protocol)
1804             p.tun_sa_in.add_vpp_config()
1805
1806             p.tun_protect = VppIpsecTunProtect(
1807                 self,
1808                 p.tun_if,
1809                 p.tun_sa_out,
1810                 [p.tun_sa_in],
1811                 nh=p.tun_if.remote_hosts[ii].ip4)
1812             p.tun_protect.add_vpp_config()
1813             config_tra_params(p, self.encryption_type, p.tun_if)
1814             self.multi_params.append(p)
1815
1816             VppIpRoute(self, p.remote_tun_if_host, 32,
1817                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1818                                      p.tun_if.sw_if_index)]).add_vpp_config()
1819
1820             # in this v4 variant add the teibs after the protect
1821             p.teib = VppTeib(self, p.tun_if,
1822                              p.tun_if.remote_hosts[ii].ip4,
1823                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1824             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1825         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1826
1827     def tearDown(self):
1828         p = self.ipv4_params
1829         p.tun_if.unconfig_ip4()
1830         super(TestIpsecMGreIfEspTra4, self).tearDown()
1831
1832     def test_tun_44(self):
1833         """mGRE IPSEC 44"""
1834         N_PKTS = 63
1835         for p in self.multi_params:
1836             self.verify_tun_44(p, count=N_PKTS)
1837             p.teib.remove_vpp_config()
1838             self.verify_tun_dropped_44(p, count=N_PKTS)
1839             p.teib.add_vpp_config()
1840             self.verify_tun_44(p, count=N_PKTS)
1841
1842
1843 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1844     """ Ipsec mGRE ESP v6 TRA tests """
1845     tun6_encrypt_node_name = "esp6-encrypt-tun"
1846     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1847     encryption_type = ESP
1848
1849     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1850                           payload_size=100):
1851         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1852                 sa.encrypt(IPv6(src=p.tun_dst,
1853                                 dst=self.pg0.local_ip6) /
1854                            GRE() /
1855                            IPv6(src=self.pg1.local_ip6,
1856                                 dst=self.pg1.remote_ip6) /
1857                            UDP(sport=1144, dport=2233) /
1858                            Raw(b'X' * payload_size))
1859                 for i in range(count)]
1860
1861     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1862                   payload_size=100):
1863         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1864                 IPv6(src="1::1", dst=dst) /
1865                 UDP(sport=1144, dport=2233) /
1866                 Raw(b'X' * payload_size)
1867                 for i in range(count)]
1868
1869     def verify_decrypted6(self, p, rxs):
1870         for rx in rxs:
1871             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1872             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1873
1874     def verify_encrypted6(self, p, sa, rxs):
1875         for rx in rxs:
1876             try:
1877                 pkt = sa.decrypt(rx[IPv6])
1878                 if not pkt.haslayer(IPv6):
1879                     pkt = IPv6(pkt[Raw].load)
1880                 self.assert_packet_checksums_valid(pkt)
1881                 self.assertTrue(pkt.haslayer(GRE))
1882                 e = pkt[GRE]
1883                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1884             except (IndexError, AssertionError):
1885                 self.logger.debug(ppp("Unexpected packet:", rx))
1886                 try:
1887                     self.logger.debug(ppp("Decrypted packet:", pkt))
1888                 except:
1889                     pass
1890                 raise
1891
1892     def setUp(self):
1893         super(TestIpsecMGreIfEspTra6, self).setUp()
1894
1895         self.vapi.cli("set logging class ipsec level debug")
1896
1897         N_NHS = 16
1898         self.tun_if = self.pg0
1899         p = self.ipv6_params
1900         p.tun_if = VppGreInterface(self,
1901                                    self.pg0.local_ip6,
1902                                    "::",
1903                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1904                                          TUNNEL_API_MODE_MP))
1905         p.tun_if.add_vpp_config()
1906         p.tun_if.admin_up()
1907         p.tun_if.config_ip6()
1908         p.tun_if.generate_remote_hosts(N_NHS)
1909         self.pg0.generate_remote_hosts(N_NHS)
1910         self.pg0.configure_ipv6_neighbors()
1911
1912         # setup some SAs for several next-hops on the interface
1913         self.multi_params = []
1914
1915         for ii in range(N_NHS):
1916             p = copy.copy(self.ipv6_params)
1917
1918             p.remote_tun_if_host = "1::%d" % (ii + 1)
1919             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1920             p.scapy_tun_spi = p.scapy_tun_spi + ii
1921             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1922             p.vpp_tun_spi = p.vpp_tun_spi + ii
1923
1924             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1925             p.scapy_tra_spi = p.scapy_tra_spi + ii
1926             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1927             p.vpp_tra_spi = p.vpp_tra_spi + ii
1928             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1929                                       p.auth_algo_vpp_id, p.auth_key,
1930                                       p.crypt_algo_vpp_id, p.crypt_key,
1931                                       self.vpp_esp_protocol)
1932             p.tun_sa_out.add_vpp_config()
1933
1934             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1935                                      p.auth_algo_vpp_id, p.auth_key,
1936                                      p.crypt_algo_vpp_id, p.crypt_key,
1937                                      self.vpp_esp_protocol)
1938             p.tun_sa_in.add_vpp_config()
1939
1940             # in this v6 variant add the teibs first then the protection
1941             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1942             VppTeib(self, p.tun_if,
1943                     p.tun_if.remote_hosts[ii].ip6,
1944                     p.tun_dst).add_vpp_config()
1945
1946             p.tun_protect = VppIpsecTunProtect(
1947                 self,
1948                 p.tun_if,
1949                 p.tun_sa_out,
1950                 [p.tun_sa_in],
1951                 nh=p.tun_if.remote_hosts[ii].ip6)
1952             p.tun_protect.add_vpp_config()
1953             config_tra_params(p, self.encryption_type, p.tun_if)
1954             self.multi_params.append(p)
1955
1956             VppIpRoute(self, p.remote_tun_if_host, 128,
1957                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1958                                      p.tun_if.sw_if_index)]).add_vpp_config()
1959             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1960
1961         self.logger.info(self.vapi.cli("sh log"))
1962         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1963         self.logger.info(self.vapi.cli("sh adj 41"))
1964
1965     def tearDown(self):
1966         p = self.ipv6_params
1967         p.tun_if.unconfig_ip6()
1968         super(TestIpsecMGreIfEspTra6, self).tearDown()
1969
1970     def test_tun_66(self):
1971         """mGRE IPSec 66"""
1972         for p in self.multi_params:
1973             self.verify_tun_66(p, count=63)
1974
1975
1976 @tag_fixme_vpp_workers
1977 class TestIpsec4TunProtect(TemplateIpsec,
1978                            TemplateIpsec4TunProtect,
1979                            IpsecTun4):
1980     """ IPsec IPv4 Tunnel protect - transport mode"""
1981
1982     def setUp(self):
1983         super(TestIpsec4TunProtect, self).setUp()
1984
1985         self.tun_if = self.pg0
1986
1987     def tearDown(self):
1988         super(TestIpsec4TunProtect, self).tearDown()
1989
1990     def test_tun_44(self):
1991         """IPSEC tunnel protect"""
1992
1993         p = self.ipv4_params
1994
1995         self.config_network(p)
1996         self.config_sa_tra(p)
1997         self.config_protect(p)
1998
1999         self.verify_tun_44(p, count=127)
2000         c = p.tun_if.get_rx_stats()
2001         self.assertEqual(c['packets'], 127)
2002         c = p.tun_if.get_tx_stats()
2003         self.assertEqual(c['packets'], 127)
2004
2005         self.vapi.cli("clear ipsec sa")
2006         self.verify_tun_64(p, count=127)
2007         c = p.tun_if.get_rx_stats()
2008         self.assertEqual(c['packets'], 254)
2009         c = p.tun_if.get_tx_stats()
2010         self.assertEqual(c['packets'], 254)
2011
2012         # rekey - create new SAs and update the tunnel protection
2013         np = copy.copy(p)
2014         np.crypt_key = b'X' + p.crypt_key[1:]
2015         np.scapy_tun_spi += 100
2016         np.scapy_tun_sa_id += 1
2017         np.vpp_tun_spi += 100
2018         np.vpp_tun_sa_id += 1
2019         np.tun_if.local_spi = p.vpp_tun_spi
2020         np.tun_if.remote_spi = p.scapy_tun_spi
2021
2022         self.config_sa_tra(np)
2023         self.config_protect(np)
2024         self.unconfig_sa(p)
2025
2026         self.verify_tun_44(np, count=127)
2027         c = p.tun_if.get_rx_stats()
2028         self.assertEqual(c['packets'], 381)
2029         c = p.tun_if.get_tx_stats()
2030         self.assertEqual(c['packets'], 381)
2031
2032         # teardown
2033         self.unconfig_protect(np)
2034         self.unconfig_sa(np)
2035         self.unconfig_network(p)
2036
2037
2038 @tag_fixme_vpp_workers
2039 class TestIpsec4TunProtectUdp(TemplateIpsec,
2040                               TemplateIpsec4TunProtect,
2041                               IpsecTun4):
2042     """ IPsec IPv4 Tunnel protect - transport mode"""
2043
2044     def setUp(self):
2045         super(TestIpsec4TunProtectUdp, self).setUp()
2046
2047         self.tun_if = self.pg0
2048
2049         p = self.ipv4_params
2050         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
2051                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
2052         p.nat_header = UDP(sport=4500, dport=4500)
2053         self.config_network(p)
2054         self.config_sa_tra(p)
2055         self.config_protect(p)
2056
2057     def tearDown(self):
2058         p = self.ipv4_params
2059         self.unconfig_protect(p)
2060         self.unconfig_sa(p)
2061         self.unconfig_network(p)
2062         super(TestIpsec4TunProtectUdp, self).tearDown()
2063
2064     def verify_encrypted(self, p, sa, rxs):
2065         # ensure encrypted packets are recieved with the default UDP ports
2066         for rx in rxs:
2067             self.assertEqual(rx[UDP].sport, 4500)
2068             self.assertEqual(rx[UDP].dport, 4500)
2069         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2070
2071     def test_tun_44(self):
2072         """IPSEC UDP tunnel protect"""
2073
2074         p = self.ipv4_params
2075
2076         self.verify_tun_44(p, count=127)
2077         c = p.tun_if.get_rx_stats()
2078         self.assertEqual(c['packets'], 127)
2079         c = p.tun_if.get_tx_stats()
2080         self.assertEqual(c['packets'], 127)
2081
2082     def test_keepalive(self):
2083         """ IPSEC NAT Keepalive """
2084         self.verify_keepalive(self.ipv4_params)
2085
2086
2087 @tag_fixme_vpp_workers
2088 class TestIpsec4TunProtectTun(TemplateIpsec,
2089                               TemplateIpsec4TunProtect,
2090                               IpsecTun4):
2091     """ IPsec IPv4 Tunnel protect - tunnel mode"""
2092
2093     encryption_type = ESP
2094     tun4_encrypt_node_name = "esp4-encrypt-tun"
2095     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2096
2097     def setUp(self):
2098         super(TestIpsec4TunProtectTun, self).setUp()
2099
2100         self.tun_if = self.pg0
2101
2102     def tearDown(self):
2103         super(TestIpsec4TunProtectTun, self).tearDown()
2104
2105     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2106                          payload_size=100):
2107         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2108                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2109                               dst=sw_intf.local_ip4) /
2110                            IP(src=src, dst=dst) /
2111                            UDP(sport=1144, dport=2233) /
2112                            Raw(b'X' * payload_size))
2113                 for i in range(count)]
2114
2115     def gen_pkts(self, sw_intf, src, dst, count=1,
2116                  payload_size=100):
2117         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2118                 IP(src=src, dst=dst) /
2119                 UDP(sport=1144, dport=2233) /
2120                 Raw(b'X' * payload_size)
2121                 for i in range(count)]
2122
2123     def verify_decrypted(self, p, rxs):
2124         for rx in rxs:
2125             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2126             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2127             self.assert_packet_checksums_valid(rx)
2128
2129     def verify_encrypted(self, p, sa, rxs):
2130         for rx in rxs:
2131             try:
2132                 pkt = sa.decrypt(rx[IP])
2133                 if not pkt.haslayer(IP):
2134                     pkt = IP(pkt[Raw].load)
2135                 self.assert_packet_checksums_valid(pkt)
2136                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2137                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2138                 inner = pkt[IP].payload
2139                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2140
2141             except (IndexError, AssertionError):
2142                 self.logger.debug(ppp("Unexpected packet:", rx))
2143                 try:
2144                     self.logger.debug(ppp("Decrypted packet:", pkt))
2145                 except:
2146                     pass
2147                 raise
2148
2149     def test_tun_44(self):
2150         """IPSEC tunnel protect """
2151
2152         p = self.ipv4_params
2153
2154         self.config_network(p)
2155         self.config_sa_tun(p)
2156         self.config_protect(p)
2157
2158         # also add an output features on the tunnel and physical interface
2159         # so we test they still work
2160         r_all = AclRule(True,
2161                         src_prefix="0.0.0.0/0",
2162                         dst_prefix="0.0.0.0/0",
2163                         proto=0)
2164         a = VppAcl(self, [r_all]).add_vpp_config()
2165
2166         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2167         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2168
2169         self.verify_tun_44(p, count=127)
2170
2171         c = p.tun_if.get_rx_stats()
2172         self.assertEqual(c['packets'], 127)
2173         c = p.tun_if.get_tx_stats()
2174         self.assertEqual(c['packets'], 127)
2175
2176         # rekey - create new SAs and update the tunnel protection
2177         np = copy.copy(p)
2178         np.crypt_key = b'X' + p.crypt_key[1:]
2179         np.scapy_tun_spi += 100
2180         np.scapy_tun_sa_id += 1
2181         np.vpp_tun_spi += 100
2182         np.vpp_tun_sa_id += 1
2183         np.tun_if.local_spi = p.vpp_tun_spi
2184         np.tun_if.remote_spi = p.scapy_tun_spi
2185
2186         self.config_sa_tun(np)
2187         self.config_protect(np)
2188         self.unconfig_sa(p)
2189
2190         self.verify_tun_44(np, count=127)
2191         c = p.tun_if.get_rx_stats()
2192         self.assertEqual(c['packets'], 254)
2193         c = p.tun_if.get_tx_stats()
2194         self.assertEqual(c['packets'], 254)
2195
2196         # teardown
2197         self.unconfig_protect(np)
2198         self.unconfig_sa(np)
2199         self.unconfig_network(p)
2200
2201
2202 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2203                                   TemplateIpsec4TunProtect,
2204                                   IpsecTun4):
2205     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2206
2207     encryption_type = ESP
2208     tun4_encrypt_node_name = "esp4-encrypt-tun"
2209     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2210
2211     def setUp(self):
2212         super(TestIpsec4TunProtectTunDrop, self).setUp()
2213
2214         self.tun_if = self.pg0
2215
2216     def tearDown(self):
2217         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2218
2219     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2220                          payload_size=100):
2221         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2222                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2223                               dst="5.5.5.5") /
2224                            IP(src=src, dst=dst) /
2225                            UDP(sport=1144, dport=2233) /
2226                            Raw(b'X' * payload_size))
2227                 for i in range(count)]
2228
2229     def test_tun_drop_44(self):
2230         """IPSEC tunnel protect bogus tunnel header """
2231
2232         p = self.ipv4_params
2233
2234         self.config_network(p)
2235         self.config_sa_tun(p)
2236         self.config_protect(p)
2237
2238         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2239                                    src=p.remote_tun_if_host,
2240                                    dst=self.pg1.remote_ip4,
2241                                    count=63)
2242         self.send_and_assert_no_replies(self.tun_if, tx)
2243
2244         # teardown
2245         self.unconfig_protect(p)
2246         self.unconfig_sa(p)
2247         self.unconfig_network(p)
2248
2249
2250 @tag_fixme_vpp_workers
2251 class TestIpsec6TunProtect(TemplateIpsec,
2252                            TemplateIpsec6TunProtect,
2253                            IpsecTun6):
2254     """ IPsec IPv6 Tunnel protect - transport mode"""
2255
2256     encryption_type = ESP
2257     tun6_encrypt_node_name = "esp6-encrypt-tun"
2258     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2259
2260     def setUp(self):
2261         super(TestIpsec6TunProtect, self).setUp()
2262
2263         self.tun_if = self.pg0
2264
2265     def tearDown(self):
2266         super(TestIpsec6TunProtect, self).tearDown()
2267
2268     def test_tun_66(self):
2269         """IPSEC tunnel protect 6o6"""
2270
2271         p = self.ipv6_params
2272
2273         self.config_network(p)
2274         self.config_sa_tra(p)
2275         self.config_protect(p)
2276
2277         self.verify_tun_66(p, count=127)
2278         c = p.tun_if.get_rx_stats()
2279         self.assertEqual(c['packets'], 127)
2280         c = p.tun_if.get_tx_stats()
2281         self.assertEqual(c['packets'], 127)
2282
2283         # rekey - create new SAs and update the tunnel protection
2284         np = copy.copy(p)
2285         np.crypt_key = b'X' + p.crypt_key[1:]
2286         np.scapy_tun_spi += 100
2287         np.scapy_tun_sa_id += 1
2288         np.vpp_tun_spi += 100
2289         np.vpp_tun_sa_id += 1
2290         np.tun_if.local_spi = p.vpp_tun_spi
2291         np.tun_if.remote_spi = p.scapy_tun_spi
2292
2293         self.config_sa_tra(np)
2294         self.config_protect(np)
2295         self.unconfig_sa(p)
2296
2297         self.verify_tun_66(np, count=127)
2298         c = p.tun_if.get_rx_stats()
2299         self.assertEqual(c['packets'], 254)
2300         c = p.tun_if.get_tx_stats()
2301         self.assertEqual(c['packets'], 254)
2302
2303         # bounce the interface state
2304         p.tun_if.admin_down()
2305         self.verify_drop_tun_66(np, count=127)
2306         node = ('/err/ipsec6-tun-input/%s' %
2307                 'ipsec packets received on disabled interface')
2308         self.assertEqual(127, self.statistics.get_err_counter(node))
2309         p.tun_if.admin_up()
2310         self.verify_tun_66(np, count=127)
2311
2312         # 3 phase rekey
2313         #  1) add two input SAs [old, new]
2314         #  2) swap output SA to [new]
2315         #  3) use only [new] input SA
2316         np3 = copy.copy(np)
2317         np3.crypt_key = b'Z' + p.crypt_key[1:]
2318         np3.scapy_tun_spi += 100
2319         np3.scapy_tun_sa_id += 1
2320         np3.vpp_tun_spi += 100
2321         np3.vpp_tun_sa_id += 1
2322         np3.tun_if.local_spi = p.vpp_tun_spi
2323         np3.tun_if.remote_spi = p.scapy_tun_spi
2324
2325         self.config_sa_tra(np3)
2326
2327         # step 1;
2328         p.tun_protect.update_vpp_config(np.tun_sa_out,
2329                                         [np.tun_sa_in, np3.tun_sa_in])
2330         self.verify_tun_66(np, np, count=127)
2331         self.verify_tun_66(np3, np, count=127)
2332
2333         # step 2;
2334         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2335                                         [np.tun_sa_in, np3.tun_sa_in])
2336         self.verify_tun_66(np, np3, count=127)
2337         self.verify_tun_66(np3, np3, count=127)
2338
2339         # step 1;
2340         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2341                                         [np3.tun_sa_in])
2342         self.verify_tun_66(np3, np3, count=127)
2343         self.verify_drop_tun_66(np, count=127)
2344
2345         c = p.tun_if.get_rx_stats()
2346         self.assertEqual(c['packets'], 127*9)
2347         c = p.tun_if.get_tx_stats()
2348         self.assertEqual(c['packets'], 127*8)
2349         self.unconfig_sa(np)
2350
2351         # teardown
2352         self.unconfig_protect(np3)
2353         self.unconfig_sa(np3)
2354         self.unconfig_network(p)
2355
2356     def test_tun_46(self):
2357         """IPSEC tunnel protect 4o6"""
2358
2359         p = self.ipv6_params
2360
2361         self.config_network(p)
2362         self.config_sa_tra(p)
2363         self.config_protect(p)
2364
2365         self.verify_tun_46(p, count=127)
2366         c = p.tun_if.get_rx_stats()
2367         self.assertEqual(c['packets'], 127)
2368         c = p.tun_if.get_tx_stats()
2369         self.assertEqual(c['packets'], 127)
2370
2371         # teardown
2372         self.unconfig_protect(p)
2373         self.unconfig_sa(p)
2374         self.unconfig_network(p)
2375
2376
2377 @tag_fixme_vpp_workers
2378 class TestIpsec6TunProtectTun(TemplateIpsec,
2379                               TemplateIpsec6TunProtect,
2380                               IpsecTun6):
2381     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2382
2383     encryption_type = ESP
2384     tun6_encrypt_node_name = "esp6-encrypt-tun"
2385     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2386
2387     def setUp(self):
2388         super(TestIpsec6TunProtectTun, self).setUp()
2389
2390         self.tun_if = self.pg0
2391
2392     def tearDown(self):
2393         super(TestIpsec6TunProtectTun, self).tearDown()
2394
2395     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2396                           payload_size=100):
2397         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2398                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2399                                 dst=sw_intf.local_ip6) /
2400                            IPv6(src=src, dst=dst) /
2401                            UDP(sport=1166, dport=2233) /
2402                            Raw(b'X' * payload_size))
2403                 for i in range(count)]
2404
2405     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2406                   payload_size=100):
2407         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2408                 IPv6(src=src, dst=dst) /
2409                 UDP(sport=1166, dport=2233) /
2410                 Raw(b'X' * payload_size)
2411                 for i in range(count)]
2412
2413     def verify_decrypted6(self, p, rxs):
2414         for rx in rxs:
2415             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2416             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2417             self.assert_packet_checksums_valid(rx)
2418
2419     def verify_encrypted6(self, p, sa, rxs):
2420         for rx in rxs:
2421             try:
2422                 pkt = sa.decrypt(rx[IPv6])
2423                 if not pkt.haslayer(IPv6):
2424                     pkt = IPv6(pkt[Raw].load)
2425                 self.assert_packet_checksums_valid(pkt)
2426                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2427                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2428                 inner = pkt[IPv6].payload
2429                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2430
2431             except (IndexError, AssertionError):
2432                 self.logger.debug(ppp("Unexpected packet:", rx))
2433                 try:
2434                     self.logger.debug(ppp("Decrypted packet:", pkt))
2435                 except:
2436                     pass
2437                 raise
2438
2439     def test_tun_66(self):
2440         """IPSEC tunnel protect """
2441
2442         p = self.ipv6_params
2443
2444         self.config_network(p)
2445         self.config_sa_tun(p)
2446         self.config_protect(p)
2447
2448         self.verify_tun_66(p, count=127)
2449
2450         c = p.tun_if.get_rx_stats()
2451         self.assertEqual(c['packets'], 127)
2452         c = p.tun_if.get_tx_stats()
2453         self.assertEqual(c['packets'], 127)
2454
2455         # rekey - create new SAs and update the tunnel protection
2456         np = copy.copy(p)
2457         np.crypt_key = b'X' + p.crypt_key[1:]
2458         np.scapy_tun_spi += 100
2459         np.scapy_tun_sa_id += 1
2460         np.vpp_tun_spi += 100
2461         np.vpp_tun_sa_id += 1
2462         np.tun_if.local_spi = p.vpp_tun_spi
2463         np.tun_if.remote_spi = p.scapy_tun_spi
2464
2465         self.config_sa_tun(np)
2466         self.config_protect(np)
2467         self.unconfig_sa(p)
2468
2469         self.verify_tun_66(np, count=127)
2470         c = p.tun_if.get_rx_stats()
2471         self.assertEqual(c['packets'], 254)
2472         c = p.tun_if.get_tx_stats()
2473         self.assertEqual(c['packets'], 254)
2474
2475         # teardown
2476         self.unconfig_protect(np)
2477         self.unconfig_sa(np)
2478         self.unconfig_network(p)
2479
2480
2481 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2482                                   TemplateIpsec6TunProtect,
2483                                   IpsecTun6):
2484     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2485
2486     encryption_type = ESP
2487     tun6_encrypt_node_name = "esp6-encrypt-tun"
2488     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2489
2490     def setUp(self):
2491         super(TestIpsec6TunProtectTunDrop, self).setUp()
2492
2493         self.tun_if = self.pg0
2494
2495     def tearDown(self):
2496         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2497
2498     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2499                           payload_size=100):
2500         # the IP destination of the revelaed packet does not match
2501         # that assigned to the tunnel
2502         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2503                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2504                                 dst="5::5") /
2505                            IPv6(src=src, dst=dst) /
2506                            UDP(sport=1144, dport=2233) /
2507                            Raw(b'X' * payload_size))
2508                 for i in range(count)]
2509
2510     def test_tun_drop_66(self):
2511         """IPSEC 6 tunnel protect bogus tunnel header """
2512
2513         p = self.ipv6_params
2514
2515         self.config_network(p)
2516         self.config_sa_tun(p)
2517         self.config_protect(p)
2518
2519         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2520                                     src=p.remote_tun_if_host,
2521                                     dst=self.pg1.remote_ip6,
2522                                     count=63)
2523         self.send_and_assert_no_replies(self.tun_if, tx)
2524
2525         self.unconfig_protect(p)
2526         self.unconfig_sa(p)
2527         self.unconfig_network(p)
2528
2529
2530 class TemplateIpsecItf4(object):
2531     """ IPsec Interface IPv4 """
2532
2533     encryption_type = ESP
2534     tun4_encrypt_node_name = "esp4-encrypt-tun"
2535     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2536     tun4_input_node = "ipsec4-tun-input"
2537
2538     def config_sa_tun(self, p, src, dst):
2539         config_tun_params(p, self.encryption_type, None, src, dst)
2540
2541         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2542                                   p.auth_algo_vpp_id, p.auth_key,
2543                                   p.crypt_algo_vpp_id, p.crypt_key,
2544                                   self.vpp_esp_protocol,
2545                                   src, dst,
2546                                   flags=p.flags)
2547         p.tun_sa_out.add_vpp_config()
2548
2549         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2550                                  p.auth_algo_vpp_id, p.auth_key,
2551                                  p.crypt_algo_vpp_id, p.crypt_key,
2552                                  self.vpp_esp_protocol,
2553                                  dst, src,
2554                                  flags=p.flags)
2555         p.tun_sa_in.add_vpp_config()
2556
2557     def config_protect(self, p):
2558         p.tun_protect = VppIpsecTunProtect(self,
2559                                            p.tun_if,
2560                                            p.tun_sa_out,
2561                                            [p.tun_sa_in])
2562         p.tun_protect.add_vpp_config()
2563
2564     def config_network(self, p, instance=0xffffffff):
2565         p.tun_if = VppIpsecInterface(self, instance=instance)
2566
2567         p.tun_if.add_vpp_config()
2568         p.tun_if.admin_up()
2569         p.tun_if.config_ip4()
2570         p.tun_if.config_ip6()
2571
2572         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2573                              [VppRoutePath(p.tun_if.remote_ip4,
2574                                            0xffffffff)])
2575         p.route.add_vpp_config()
2576         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2577                        [VppRoutePath(p.tun_if.remote_ip6,
2578                                      0xffffffff,
2579                                      proto=DpoProto.DPO_PROTO_IP6)])
2580         r.add_vpp_config()
2581
2582     def unconfig_network(self, p):
2583         p.route.remove_vpp_config()
2584         p.tun_if.remove_vpp_config()
2585
2586     def unconfig_protect(self, p):
2587         p.tun_protect.remove_vpp_config()
2588
2589     def unconfig_sa(self, p):
2590         p.tun_sa_out.remove_vpp_config()
2591         p.tun_sa_in.remove_vpp_config()
2592
2593
2594 @tag_fixme_vpp_workers
2595 class TestIpsecItf4(TemplateIpsec,
2596                     TemplateIpsecItf4,
2597                     IpsecTun4):
2598     """ IPsec Interface IPv4 """
2599
2600     def setUp(self):
2601         super(TestIpsecItf4, self).setUp()
2602
2603         self.tun_if = self.pg0
2604
2605     def tearDown(self):
2606         super(TestIpsecItf4, self).tearDown()
2607
2608     def test_tun_instance_44(self):
2609         p = self.ipv4_params
2610         self.config_network(p, instance=3)
2611
2612         with self.assertRaises(CliFailedCommandError):
2613             self.vapi.cli("show interface ipsec0")
2614
2615         output = self.vapi.cli("show interface ipsec3")
2616         self.assertTrue("unknown" not in output)
2617
2618         self.unconfig_network(p)
2619
2620     def test_tun_44(self):
2621         """IPSEC interface IPv4"""
2622
2623         n_pkts = 127
2624         p = self.ipv4_params
2625
2626         self.config_network(p)
2627         self.config_sa_tun(p,
2628                            self.pg0.local_ip4,
2629                            self.pg0.remote_ip4)
2630         self.config_protect(p)
2631
2632         self.verify_tun_44(p, count=n_pkts)
2633         c = p.tun_if.get_rx_stats()
2634         self.assertEqual(c['packets'], n_pkts)
2635         c = p.tun_if.get_tx_stats()
2636         self.assertEqual(c['packets'], n_pkts)
2637
2638         p.tun_if.admin_down()
2639         self.verify_tun_dropped_44(p, count=n_pkts)
2640         p.tun_if.admin_up()
2641         self.verify_tun_44(p, count=n_pkts)
2642
2643         c = p.tun_if.get_rx_stats()
2644         self.assertEqual(c['packets'], 3*n_pkts)
2645         c = p.tun_if.get_tx_stats()
2646         self.assertEqual(c['packets'], 2*n_pkts)
2647
2648         # it's a v6 packet when its encrypted
2649         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2650
2651         self.verify_tun_64(p, count=n_pkts)
2652         c = p.tun_if.get_rx_stats()
2653         self.assertEqual(c['packets'], 4*n_pkts)
2654         c = p.tun_if.get_tx_stats()
2655         self.assertEqual(c['packets'], 3*n_pkts)
2656
2657         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2658
2659         self.vapi.cli("clear interfaces")
2660
2661         # rekey - create new SAs and update the tunnel protection
2662         np = copy.copy(p)
2663         np.crypt_key = b'X' + p.crypt_key[1:]
2664         np.scapy_tun_spi += 100
2665         np.scapy_tun_sa_id += 1
2666         np.vpp_tun_spi += 100
2667         np.vpp_tun_sa_id += 1
2668         np.tun_if.local_spi = p.vpp_tun_spi
2669         np.tun_if.remote_spi = p.scapy_tun_spi
2670
2671         self.config_sa_tun(np,
2672                            self.pg0.local_ip4,
2673                            self.pg0.remote_ip4)
2674         self.config_protect(np)
2675         self.unconfig_sa(p)
2676
2677         self.verify_tun_44(np, count=n_pkts)
2678         c = p.tun_if.get_rx_stats()
2679         self.assertEqual(c['packets'], n_pkts)
2680         c = p.tun_if.get_tx_stats()
2681         self.assertEqual(c['packets'], n_pkts)
2682
2683         # teardown
2684         self.unconfig_protect(np)
2685         self.unconfig_sa(np)
2686         self.unconfig_network(p)
2687
2688     def test_tun_44_null(self):
2689         """IPSEC interface IPv4 NULL auth/crypto"""
2690
2691         n_pkts = 127
2692         p = copy.copy(self.ipv4_params)
2693
2694         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2695                               IPSEC_API_INTEG_ALG_NONE)
2696         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2697                                IPSEC_API_CRYPTO_ALG_NONE)
2698         p.crypt_algo = "NULL"
2699         p.auth_algo = "NULL"
2700
2701         self.config_network(p)
2702         self.config_sa_tun(p,
2703                            self.pg0.local_ip4,
2704                            self.pg0.remote_ip4)
2705         self.config_protect(p)
2706
2707         self.verify_tun_44(p, count=n_pkts)
2708
2709         # teardown
2710         self.unconfig_protect(p)
2711         self.unconfig_sa(p)
2712         self.unconfig_network(p)
2713
2714     def test_tun_44_police(self):
2715         """IPSEC interface IPv4 with input policer"""
2716         n_pkts = 127
2717         p = self.ipv4_params
2718
2719         self.config_network(p)
2720         self.config_sa_tun(p,
2721                            self.pg0.local_ip4,
2722                            self.pg0.remote_ip4)
2723         self.config_protect(p)
2724
2725         action_tx = PolicerAction(
2726             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2727             0)
2728         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2729                              conform_action=action_tx,
2730                              exceed_action=action_tx,
2731                              violate_action=action_tx)
2732         policer.add_vpp_config()
2733
2734         # Start policing on tun
2735         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2736
2737         self.verify_tun_44(p, count=n_pkts)
2738         c = p.tun_if.get_rx_stats()
2739         self.assertEqual(c['packets'], n_pkts)
2740         c = p.tun_if.get_tx_stats()
2741         self.assertEqual(c['packets'], n_pkts)
2742
2743         stats = policer.get_stats()
2744
2745         # Single rate, 2 colour policer - expect conform, violate but no exceed
2746         self.assertGreater(stats['conform_packets'], 0)
2747         self.assertEqual(stats['exceed_packets'], 0)
2748         self.assertGreater(stats['violate_packets'], 0)
2749
2750         # Stop policing on tun
2751         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
2752         self.verify_tun_44(p, count=n_pkts)
2753
2754         # No new policer stats
2755         statsnew = policer.get_stats()
2756         self.assertEqual(stats, statsnew)
2757
2758         # teardown
2759         policer.remove_vpp_config()
2760         self.unconfig_protect(p)
2761         self.unconfig_sa(p)
2762         self.unconfig_network(p)
2763
2764
2765 class TestIpsecItf4MPLS(TemplateIpsec,
2766                         TemplateIpsecItf4,
2767                         IpsecTun4):
2768     """ IPsec Interface MPLSoIPv4 """
2769
2770     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2771
2772     def setUp(self):
2773         super(TestIpsecItf4MPLS, self).setUp()
2774
2775         self.tun_if = self.pg0
2776
2777     def tearDown(self):
2778         super(TestIpsecItf4MPLS, self).tearDown()
2779
2780     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2781                          payload_size=100):
2782         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2783                 sa.encrypt(MPLS(label=44, ttl=3) /
2784                            IP(src=src, dst=dst) /
2785                            UDP(sport=1166, dport=2233) /
2786                            Raw(b'X' * payload_size))
2787                 for i in range(count)]
2788
2789     def verify_encrypted(self, p, sa, rxs):
2790         for rx in rxs:
2791             try:
2792                 pkt = sa.decrypt(rx[IP])
2793                 if not pkt.haslayer(IP):
2794                     pkt = IP(pkt[Raw].load)
2795                 self.assert_packet_checksums_valid(pkt)
2796                 self.assert_equal(pkt[MPLS].label, 44)
2797                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2798             except (IndexError, AssertionError):
2799                 self.logger.debug(ppp("Unexpected packet:", rx))
2800                 try:
2801                     self.logger.debug(ppp("Decrypted packet:", pkt))
2802                 except:
2803                     pass
2804                 raise
2805
2806     def test_tun_mpls_o_ip4(self):
2807         """IPSEC interface MPLS over IPv4"""
2808
2809         n_pkts = 127
2810         p = self.ipv4_params
2811         f = FibPathProto
2812
2813         tbl = VppMplsTable(self, 0)
2814         tbl.add_vpp_config()
2815
2816         self.config_network(p)
2817         # deag MPLS routes from the tunnel
2818         r4 = VppMplsRoute(self, 44, 1,
2819                           [VppRoutePath(
2820                               self.pg1.remote_ip4,
2821                               self.pg1.sw_if_index)]).add_vpp_config()
2822         p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2823                                      p.tun_if.sw_if_index,
2824                                      labels=[VppMplsLabel(44)])])
2825         p.tun_if.enable_mpls()
2826
2827         self.config_sa_tun(p,
2828                            self.pg0.local_ip4,
2829                            self.pg0.remote_ip4)
2830         self.config_protect(p)
2831
2832         self.verify_tun_44(p, count=n_pkts)
2833
2834         # cleanup
2835         p.tun_if.disable_mpls()
2836         self.unconfig_protect(p)
2837         self.unconfig_sa(p)
2838         self.unconfig_network(p)
2839
2840
2841 class TemplateIpsecItf6(object):
2842     """ IPsec Interface IPv6 """
2843
2844     encryption_type = ESP
2845     tun6_encrypt_node_name = "esp6-encrypt-tun"
2846     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2847     tun6_input_node = "ipsec6-tun-input"
2848
2849     def config_sa_tun(self, p, src, dst):
2850         config_tun_params(p, self.encryption_type, None, src, dst)
2851
2852         if not hasattr(p, 'tun_flags'):
2853             p.tun_flags = None
2854         if not hasattr(p, 'hop_limit'):
2855             p.hop_limit = 255
2856
2857         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2858                                   p.auth_algo_vpp_id, p.auth_key,
2859                                   p.crypt_algo_vpp_id, p.crypt_key,
2860                                   self.vpp_esp_protocol,
2861                                   src, dst,
2862                                   flags=p.flags,
2863                                   tun_flags=p.tun_flags,
2864                                   hop_limit=p.hop_limit)
2865         p.tun_sa_out.add_vpp_config()
2866
2867         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2868                                  p.auth_algo_vpp_id, p.auth_key,
2869                                  p.crypt_algo_vpp_id, p.crypt_key,
2870                                  self.vpp_esp_protocol,
2871                                  dst, src,
2872                                  flags=p.flags)
2873         p.tun_sa_in.add_vpp_config()
2874
2875     def config_protect(self, p):
2876         p.tun_protect = VppIpsecTunProtect(self,
2877                                            p.tun_if,
2878                                            p.tun_sa_out,
2879                                            [p.tun_sa_in])
2880         p.tun_protect.add_vpp_config()
2881
2882     def config_network(self, p):
2883         p.tun_if = VppIpsecInterface(self)
2884
2885         p.tun_if.add_vpp_config()
2886         p.tun_if.admin_up()
2887         p.tun_if.config_ip4()
2888         p.tun_if.config_ip6()
2889
2890         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2891                        [VppRoutePath(p.tun_if.remote_ip4,
2892                                      0xffffffff)])
2893         r.add_vpp_config()
2894
2895         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2896                              [VppRoutePath(p.tun_if.remote_ip6,
2897                                            0xffffffff,
2898                                            proto=DpoProto.DPO_PROTO_IP6)])
2899         p.route.add_vpp_config()
2900
2901     def unconfig_network(self, p):
2902         p.route.remove_vpp_config()
2903         p.tun_if.remove_vpp_config()
2904
2905     def unconfig_protect(self, p):
2906         p.tun_protect.remove_vpp_config()
2907
2908     def unconfig_sa(self, p):
2909         p.tun_sa_out.remove_vpp_config()
2910         p.tun_sa_in.remove_vpp_config()
2911
2912
2913 @tag_fixme_vpp_workers
2914 class TestIpsecItf6(TemplateIpsec,
2915                     TemplateIpsecItf6,
2916                     IpsecTun6):
2917     """ IPsec Interface IPv6 """
2918
2919     def setUp(self):
2920         super(TestIpsecItf6, self).setUp()
2921
2922         self.tun_if = self.pg0
2923
2924     def tearDown(self):
2925         super(TestIpsecItf6, self).tearDown()
2926
2927     def test_tun_44(self):
2928         """IPSEC interface IPv6"""
2929
2930         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2931         n_pkts = 127
2932         p = self.ipv6_params
2933         p.inner_hop_limit = 24
2934         p.outer_hop_limit = 23
2935         p.outer_flow_label = 243224
2936         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2937
2938         self.config_network(p)
2939         self.config_sa_tun(p,
2940                            self.pg0.local_ip6,
2941                            self.pg0.remote_ip6)
2942         self.config_protect(p)
2943
2944         self.verify_tun_66(p, count=n_pkts)
2945         c = p.tun_if.get_rx_stats()
2946         self.assertEqual(c['packets'], n_pkts)
2947         c = p.tun_if.get_tx_stats()
2948         self.assertEqual(c['packets'], n_pkts)
2949
2950         p.tun_if.admin_down()
2951         self.verify_drop_tun_66(p, count=n_pkts)
2952         p.tun_if.admin_up()
2953         self.verify_tun_66(p, count=n_pkts)
2954
2955         c = p.tun_if.get_rx_stats()
2956         self.assertEqual(c['packets'], 3*n_pkts)
2957         c = p.tun_if.get_tx_stats()
2958         self.assertEqual(c['packets'], 2*n_pkts)
2959
2960         # it's a v4 packet when its encrypted
2961         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2962
2963         self.verify_tun_46(p, count=n_pkts)
2964         c = p.tun_if.get_rx_stats()
2965         self.assertEqual(c['packets'], 4*n_pkts)
2966         c = p.tun_if.get_tx_stats()
2967         self.assertEqual(c['packets'], 3*n_pkts)
2968
2969         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2970
2971         self.vapi.cli("clear interfaces")
2972
2973         # rekey - create new SAs and update the tunnel protection
2974         np = copy.copy(p)
2975         np.crypt_key = b'X' + p.crypt_key[1:]
2976         np.scapy_tun_spi += 100
2977         np.scapy_tun_sa_id += 1
2978         np.vpp_tun_spi += 100
2979         np.vpp_tun_sa_id += 1
2980         np.tun_if.local_spi = p.vpp_tun_spi
2981         np.tun_if.remote_spi = p.scapy_tun_spi
2982         np.inner_hop_limit = 24
2983         np.outer_hop_limit = 128
2984         np.inner_flow_label = 0xabcde
2985         np.outer_flow_label = 0xabcde
2986         np.hop_limit = 128
2987         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2988
2989         self.config_sa_tun(np,
2990                            self.pg0.local_ip6,
2991                            self.pg0.remote_ip6)
2992         self.config_protect(np)
2993         self.unconfig_sa(p)
2994
2995         self.verify_tun_66(np, count=n_pkts)
2996         c = p.tun_if.get_rx_stats()
2997         self.assertEqual(c['packets'], n_pkts)
2998         c = p.tun_if.get_tx_stats()
2999         self.assertEqual(c['packets'], n_pkts)
3000
3001         # teardown
3002         self.unconfig_protect(np)
3003         self.unconfig_sa(np)
3004         self.unconfig_network(p)
3005
3006     def test_tun_66_police(self):
3007         """IPSEC interface IPv6 with input policer"""
3008         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3009         n_pkts = 127
3010         p = self.ipv6_params
3011         p.inner_hop_limit = 24
3012         p.outer_hop_limit = 23
3013         p.outer_flow_label = 243224
3014         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3015
3016         self.config_network(p)
3017         self.config_sa_tun(p,
3018                            self.pg0.local_ip6,
3019                            self.pg0.remote_ip6)
3020         self.config_protect(p)
3021
3022         action_tx = PolicerAction(
3023             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
3024             0)
3025         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
3026                              conform_action=action_tx,
3027                              exceed_action=action_tx,
3028                              violate_action=action_tx)
3029         policer.add_vpp_config()
3030
3031         # Start policing on tun
3032         policer.apply_vpp_config(p.tun_if.sw_if_index, True)
3033
3034         self.verify_tun_66(p, count=n_pkts)
3035         c = p.tun_if.get_rx_stats()
3036         self.assertEqual(c['packets'], n_pkts)
3037         c = p.tun_if.get_tx_stats()
3038         self.assertEqual(c['packets'], n_pkts)
3039
3040         stats = policer.get_stats()
3041
3042         # Single rate, 2 colour policer - expect conform, violate but no exceed
3043         self.assertGreater(stats['conform_packets'], 0)
3044         self.assertEqual(stats['exceed_packets'], 0)
3045         self.assertGreater(stats['violate_packets'], 0)
3046
3047         # Stop policing on tun
3048         policer.apply_vpp_config(p.tun_if.sw_if_index, False)
3049         self.verify_tun_66(p, count=n_pkts)
3050
3051         # No new policer stats
3052         statsnew = policer.get_stats()
3053         self.assertEqual(stats, statsnew)
3054
3055         # teardown
3056         policer.remove_vpp_config()
3057         self.unconfig_protect(p)
3058         self.unconfig_sa(p)
3059         self.unconfig_network(p)
3060
3061
3062 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3063     """ Ipsec P2MP ESP v4 tests """
3064     tun4_encrypt_node_name = "esp4-encrypt-tun"
3065     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3066     encryption_type = ESP
3067
3068     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
3069                          payload_size=100):
3070         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3071                 sa.encrypt(IP(src=self.pg1.local_ip4,
3072                               dst=self.pg1.remote_ip4) /
3073                            UDP(sport=1144, dport=2233) /
3074                            Raw(b'X' * payload_size))
3075                 for i in range(count)]
3076
3077     def gen_pkts(self, sw_intf, src, dst, count=1,
3078                  payload_size=100):
3079         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3080                 IP(src="1.1.1.1", dst=dst) /
3081                 UDP(sport=1144, dport=2233) /
3082                 Raw(b'X' * payload_size)
3083                 for i in range(count)]
3084
3085     def verify_decrypted(self, p, rxs):
3086         for rx in rxs:
3087             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3088             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3089
3090     def verify_encrypted(self, p, sa, rxs):
3091         for rx in rxs:
3092             try:
3093                 self.assertEqual(rx[IP].tos,
3094                                  VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
3095                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3096                 pkt = sa.decrypt(rx[IP])
3097                 if not pkt.haslayer(IP):
3098                     pkt = IP(pkt[Raw].load)
3099                 self.assert_packet_checksums_valid(pkt)
3100                 e = pkt[IP]
3101                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3102             except (IndexError, AssertionError):
3103                 self.logger.debug(ppp("Unexpected packet:", rx))
3104                 try:
3105                     self.logger.debug(ppp("Decrypted packet:", pkt))
3106                 except:
3107                     pass
3108                 raise
3109
3110     def setUp(self):
3111         super(TestIpsecMIfEsp4, self).setUp()
3112
3113         N_NHS = 16
3114         self.tun_if = self.pg0
3115         p = self.ipv4_params
3116         p.tun_if = VppIpsecInterface(self,
3117                                      mode=(VppEnum.vl_api_tunnel_mode_t.
3118                                            TUNNEL_API_MODE_MP))
3119         p.tun_if.add_vpp_config()
3120         p.tun_if.admin_up()
3121         p.tun_if.config_ip4()
3122         p.tun_if.unconfig_ip4()
3123         p.tun_if.config_ip4()
3124         p.tun_if.generate_remote_hosts(N_NHS)
3125         self.pg0.generate_remote_hosts(N_NHS)
3126         self.pg0.configure_ipv4_neighbors()
3127
3128         # setup some SAs for several next-hops on the interface
3129         self.multi_params = []
3130
3131         for ii in range(N_NHS):
3132             p = copy.copy(self.ipv4_params)
3133
3134             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3135             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3136             p.scapy_tun_spi = p.scapy_tun_spi + ii
3137             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3138             p.vpp_tun_spi = p.vpp_tun_spi + ii
3139
3140             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3141             p.scapy_tra_spi = p.scapy_tra_spi + ii
3142             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3143             p.vpp_tra_spi = p.vpp_tra_spi + ii
3144             p.hop_limit = ii+10
3145             p.tun_sa_out = VppIpsecSA(
3146                 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
3147                 p.auth_algo_vpp_id, p.auth_key,
3148                 p.crypt_algo_vpp_id, p.crypt_key,
3149                 self.vpp_esp_protocol,
3150                 self.pg0.local_ip4,
3151                 self.pg0.remote_hosts[ii].ip4,
3152                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3153                 hop_limit=p.hop_limit)
3154             p.tun_sa_out.add_vpp_config()
3155
3156             p.tun_sa_in = VppIpsecSA(
3157                 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
3158                 p.auth_algo_vpp_id, p.auth_key,
3159                 p.crypt_algo_vpp_id, p.crypt_key,
3160                 self.vpp_esp_protocol,
3161                 self.pg0.remote_hosts[ii].ip4,
3162                 self.pg0.local_ip4,
3163                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3164                 hop_limit=p.hop_limit)
3165             p.tun_sa_in.add_vpp_config()
3166
3167             p.tun_protect = VppIpsecTunProtect(
3168                 self,
3169                 p.tun_if,
3170                 p.tun_sa_out,
3171                 [p.tun_sa_in],
3172                 nh=p.tun_if.remote_hosts[ii].ip4)
3173             p.tun_protect.add_vpp_config()
3174             config_tun_params(p, self.encryption_type, None,
3175                               self.pg0.local_ip4,
3176                               self.pg0.remote_hosts[ii].ip4)
3177             self.multi_params.append(p)
3178
3179             VppIpRoute(self, p.remote_tun_if_host, 32,
3180                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
3181                                      p.tun_if.sw_if_index)]).add_vpp_config()
3182
3183             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3184
3185     def tearDown(self):
3186         p = self.ipv4_params
3187         p.tun_if.unconfig_ip4()
3188         super(TestIpsecMIfEsp4, self).tearDown()
3189
3190     def test_tun_44(self):
3191         """P2MP IPSEC 44"""
3192         N_PKTS = 63
3193         for p in self.multi_params:
3194             self.verify_tun_44(p, count=N_PKTS)
3195
3196
3197 class TestIpsecItf6MPLS(TemplateIpsec,
3198                         TemplateIpsecItf6,
3199                         IpsecTun6):
3200     """ IPsec Interface MPLSoIPv6 """
3201
3202     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3203
3204     def setUp(self):
3205         super(TestIpsecItf6MPLS, self).setUp()
3206
3207         self.tun_if = self.pg0
3208
3209     def tearDown(self):
3210         super(TestIpsecItf6MPLS, self).tearDown()
3211
3212     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
3213                           payload_size=100):
3214         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3215                 sa.encrypt(MPLS(label=66, ttl=3) /
3216                            IPv6(src=src, dst=dst) /
3217                            UDP(sport=1166, dport=2233) /
3218                            Raw(b'X' * payload_size))
3219                 for i in range(count)]
3220
3221     def verify_encrypted6(self, p, sa, rxs):
3222         for rx in rxs:
3223             try:
3224                 pkt = sa.decrypt(rx[IPv6])
3225                 if not pkt.haslayer(IPv6):
3226                     pkt = IP(pkt[Raw].load)
3227                 self.assert_packet_checksums_valid(pkt)
3228                 self.assert_equal(pkt[MPLS].label, 66)
3229                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3230             except (IndexError, AssertionError):
3231                 self.logger.debug(ppp("Unexpected packet:", rx))
3232                 try:
3233                     self.logger.debug(ppp("Decrypted packet:", pkt))
3234                 except:
3235                     pass
3236                 raise
3237
3238     def test_tun_mpls_o_ip6(self):
3239         """IPSEC interface MPLS over IPv6"""
3240
3241         n_pkts = 127
3242         p = self.ipv6_params
3243         f = FibPathProto
3244
3245         tbl = VppMplsTable(self, 0)
3246         tbl.add_vpp_config()
3247
3248         self.config_network(p)
3249         # deag MPLS routes from the tunnel
3250         r6 = VppMplsRoute(self, 66, 1,
3251                           [VppRoutePath(
3252                               self.pg1.remote_ip6,
3253                               self.pg1.sw_if_index)],
3254                           eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3255         p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3256                                      p.tun_if.sw_if_index,
3257                                      labels=[VppMplsLabel(66)])])
3258         p.tun_if.enable_mpls()
3259
3260         self.config_sa_tun(p,
3261                            self.pg0.local_ip6,
3262                            self.pg0.remote_ip6)
3263         self.config_protect(p)
3264
3265         self.verify_tun_66(p, count=n_pkts)
3266
3267         # cleanup
3268         p.tun_if.disable_mpls()
3269         self.unconfig_protect(p)
3270         self.unconfig_sa(p)
3271         self.unconfig_network(p)
3272
3273
3274 if __name__ == '__main__':
3275     unittest.main(testRunner=VppTestRunner)