tests: tag the tests that do not work with multi-worker configuration
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
15     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19     VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
24 from util import ppp
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28
29
30 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
31     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
32     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
33                              IPSEC_API_SAD_FLAG_USE_ESN))
34     crypt_key = mk_scapy_crypt_key(p)
35     if tun_if:
36         p.tun_dst = tun_if.remote_ip
37         p.tun_src = tun_if.local_ip
38     else:
39         p.tun_dst = dst
40         p.tun_src = src
41
42     p.scapy_tun_sa = SecurityAssociation(
43         encryption_type, spi=p.vpp_tun_spi,
44         crypt_algo=p.crypt_algo,
45         crypt_key=crypt_key,
46         auth_algo=p.auth_algo, auth_key=p.auth_key,
47         tunnel_header=ip_class_by_addr_type[p.addr_type](
48             src=p.tun_dst,
49             dst=p.tun_src),
50         nat_t_header=p.nat_header,
51         esn_en=esn_en)
52     p.vpp_tun_sa = SecurityAssociation(
53         encryption_type, spi=p.scapy_tun_spi,
54         crypt_algo=p.crypt_algo,
55         crypt_key=crypt_key,
56         auth_algo=p.auth_algo, auth_key=p.auth_key,
57         tunnel_header=ip_class_by_addr_type[p.addr_type](
58             dst=p.tun_dst,
59             src=p.tun_src),
60         nat_t_header=p.nat_header,
61         esn_en=esn_en)
62
63
64 def config_tra_params(p, encryption_type, tun_if):
65     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
66     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
67                              IPSEC_API_SAD_FLAG_USE_ESN))
68     crypt_key = mk_scapy_crypt_key(p)
69     p.tun_dst = tun_if.remote_ip
70     p.tun_src = tun_if.local_ip
71     p.scapy_tun_sa = SecurityAssociation(
72         encryption_type, spi=p.vpp_tun_spi,
73         crypt_algo=p.crypt_algo,
74         crypt_key=crypt_key,
75         auth_algo=p.auth_algo, auth_key=p.auth_key,
76         esn_en=esn_en,
77         nat_t_header=p.nat_header)
78     p.vpp_tun_sa = SecurityAssociation(
79         encryption_type, spi=p.scapy_tun_spi,
80         crypt_algo=p.crypt_algo,
81         crypt_key=crypt_key,
82         auth_algo=p.auth_algo, auth_key=p.auth_key,
83         esn_en=esn_en,
84         nat_t_header=p.nat_header)
85
86
87 class TemplateIpsec4TunProtect(object):
88     """ IPsec IPv4 Tunnel protect """
89
90     encryption_type = ESP
91     tun4_encrypt_node_name = "esp4-encrypt-tun"
92     tun4_decrypt_node_name = "esp4-decrypt-tun"
93     tun4_input_node = "ipsec4-tun-input"
94
95     def config_sa_tra(self, p):
96         config_tun_params(p, self.encryption_type, p.tun_if)
97
98         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
99                                   p.auth_algo_vpp_id, p.auth_key,
100                                   p.crypt_algo_vpp_id, p.crypt_key,
101                                   self.vpp_esp_protocol,
102                                   flags=p.flags)
103         p.tun_sa_out.add_vpp_config()
104
105         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
106                                  p.auth_algo_vpp_id, p.auth_key,
107                                  p.crypt_algo_vpp_id, p.crypt_key,
108                                  self.vpp_esp_protocol,
109                                  flags=p.flags)
110         p.tun_sa_in.add_vpp_config()
111
112     def config_sa_tun(self, p):
113         config_tun_params(p, self.encryption_type, p.tun_if)
114
115         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
116                                   p.auth_algo_vpp_id, p.auth_key,
117                                   p.crypt_algo_vpp_id, p.crypt_key,
118                                   self.vpp_esp_protocol,
119                                   self.tun_if.local_addr[p.addr_type],
120                                   self.tun_if.remote_addr[p.addr_type],
121                                   flags=p.flags)
122         p.tun_sa_out.add_vpp_config()
123
124         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
125                                  p.auth_algo_vpp_id, p.auth_key,
126                                  p.crypt_algo_vpp_id, p.crypt_key,
127                                  self.vpp_esp_protocol,
128                                  self.tun_if.remote_addr[p.addr_type],
129                                  self.tun_if.local_addr[p.addr_type],
130                                  flags=p.flags)
131         p.tun_sa_in.add_vpp_config()
132
133     def config_protect(self, p):
134         p.tun_protect = VppIpsecTunProtect(self,
135                                            p.tun_if,
136                                            p.tun_sa_out,
137                                            [p.tun_sa_in])
138         p.tun_protect.add_vpp_config()
139
140     def config_network(self, p):
141         if hasattr(p, 'tun_dst'):
142             tun_dst = p.tun_dst
143         else:
144             tun_dst = self.pg0.remote_ip4
145         p.tun_if = VppIpIpTunInterface(self, self.pg0,
146                                        self.pg0.local_ip4,
147                                        tun_dst)
148         p.tun_if.add_vpp_config()
149         p.tun_if.admin_up()
150         p.tun_if.config_ip4()
151         p.tun_if.config_ip6()
152
153         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
154                              [VppRoutePath(p.tun_if.remote_ip4,
155                                            0xffffffff)])
156         p.route.add_vpp_config()
157         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
158                        [VppRoutePath(p.tun_if.remote_ip6,
159                                      0xffffffff,
160                                      proto=DpoProto.DPO_PROTO_IP6)])
161         r.add_vpp_config()
162
163     def unconfig_network(self, p):
164         p.route.remove_vpp_config()
165         p.tun_if.remove_vpp_config()
166
167     def unconfig_protect(self, p):
168         p.tun_protect.remove_vpp_config()
169
170     def unconfig_sa(self, p):
171         p.tun_sa_out.remove_vpp_config()
172         p.tun_sa_in.remove_vpp_config()
173
174
175 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
176                              TemplateIpsec):
177     """ IPsec tunnel interface tests """
178
179     encryption_type = ESP
180
181     @classmethod
182     def setUpClass(cls):
183         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
184
185     @classmethod
186     def tearDownClass(cls):
187         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
188
189     def setUp(self):
190         super(TemplateIpsec4TunIfEsp, self).setUp()
191
192         self.tun_if = self.pg0
193
194         p = self.ipv4_params
195
196         self.config_network(p)
197         self.config_sa_tra(p)
198         self.config_protect(p)
199
200     def tearDown(self):
201         super(TemplateIpsec4TunIfEsp, self).tearDown()
202
203
204 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
205                                 TemplateIpsec):
206     """ IPsec UDP tunnel interface tests """
207
208     tun4_encrypt_node_name = "esp4-encrypt-tun"
209     tun4_decrypt_node_name = "esp4-decrypt-tun"
210     encryption_type = ESP
211
212     @classmethod
213     def setUpClass(cls):
214         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
215
216     @classmethod
217     def tearDownClass(cls):
218         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
219
220     def verify_encrypted(self, p, sa, rxs):
221         for rx in rxs:
222             try:
223                 # ensure the UDP ports are correct before we decrypt
224                 # which strips them
225                 self.assertTrue(rx.haslayer(UDP))
226                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
227                 self.assert_equal(rx[UDP].dport, 4500)
228
229                 pkt = sa.decrypt(rx[IP])
230                 if not pkt.haslayer(IP):
231                     pkt = IP(pkt[Raw].load)
232
233                 self.assert_packet_checksums_valid(pkt)
234                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
235                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
236             except (IndexError, AssertionError):
237                 self.logger.debug(ppp("Unexpected packet:", rx))
238                 try:
239                     self.logger.debug(ppp("Decrypted packet:", pkt))
240                 except:
241                     pass
242                 raise
243
244     def config_sa_tra(self, p):
245         config_tun_params(p, self.encryption_type, p.tun_if)
246
247         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
248                                   p.auth_algo_vpp_id, p.auth_key,
249                                   p.crypt_algo_vpp_id, p.crypt_key,
250                                   self.vpp_esp_protocol,
251                                   flags=p.flags,
252                                   udp_src=p.nat_header.sport,
253                                   udp_dst=p.nat_header.dport)
254         p.tun_sa_out.add_vpp_config()
255
256         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
257                                  p.auth_algo_vpp_id, p.auth_key,
258                                  p.crypt_algo_vpp_id, p.crypt_key,
259                                  self.vpp_esp_protocol,
260                                  flags=p.flags,
261                                  udp_src=p.nat_header.sport,
262                                  udp_dst=p.nat_header.dport)
263         p.tun_sa_in.add_vpp_config()
264
265     def setUp(self):
266         super(TemplateIpsec4TunIfEspUdp, self).setUp()
267
268         p = self.ipv4_params
269         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
270                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
271         p.nat_header = UDP(sport=5454, dport=4500)
272
273         self.tun_if = self.pg0
274
275         self.config_network(p)
276         self.config_sa_tra(p)
277         self.config_protect(p)
278
279     def tearDown(self):
280         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
281
282
283 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
284     """ Ipsec ESP - TUN tests """
285     tun4_encrypt_node_name = "esp4-encrypt-tun"
286     tun4_decrypt_node_name = "esp4-decrypt-tun"
287
288     def test_tun_basic64(self):
289         """ ipsec 6o4 tunnel basic test """
290         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
291
292         self.verify_tun_64(self.params[socket.AF_INET], count=1)
293
294     def test_tun_burst64(self):
295         """ ipsec 6o4 tunnel basic test """
296         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
297
298         self.verify_tun_64(self.params[socket.AF_INET], count=257)
299
300     def test_tun_basic_frag44(self):
301         """ ipsec 4o4 tunnel frag basic test """
302         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
303
304         p = self.ipv4_params
305
306         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
307                                        [1500, 0, 0, 0])
308         self.verify_tun_44(self.params[socket.AF_INET],
309                            count=1, payload_size=1800, n_rx=2)
310         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
311                                        [9000, 0, 0, 0])
312
313
314 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
315     """ Ipsec ESP UDP tests """
316
317     tun4_input_node = "ipsec4-tun-input"
318
319     def setUp(self):
320         super(TestIpsec4TunIfEspUdp, self).setUp()
321
322     def test_keepalive(self):
323         """ IPSEC NAT Keepalive """
324         self.verify_keepalive(self.ipv4_params)
325
326
327 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
328     """ Ipsec ESP UDP GCM tests """
329
330     tun4_input_node = "ipsec4-tun-input"
331
332     def setUp(self):
333         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
334         p = self.ipv4_params
335         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
336                               IPSEC_API_INTEG_ALG_NONE)
337         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
338                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
339         p.crypt_algo = "AES-GCM"
340         p.auth_algo = "NULL"
341         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
342         p.salt = 0
343
344
345 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
346     """ Ipsec ESP - TCP tests """
347     pass
348
349
350 class TemplateIpsec6TunProtect(object):
351     """ IPsec IPv6 Tunnel protect """
352
353     def config_sa_tra(self, p):
354         config_tun_params(p, self.encryption_type, p.tun_if)
355
356         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
357                                   p.auth_algo_vpp_id, p.auth_key,
358                                   p.crypt_algo_vpp_id, p.crypt_key,
359                                   self.vpp_esp_protocol)
360         p.tun_sa_out.add_vpp_config()
361
362         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
363                                  p.auth_algo_vpp_id, p.auth_key,
364                                  p.crypt_algo_vpp_id, p.crypt_key,
365                                  self.vpp_esp_protocol)
366         p.tun_sa_in.add_vpp_config()
367
368     def config_sa_tun(self, p):
369         config_tun_params(p, self.encryption_type, p.tun_if)
370
371         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
372                                   p.auth_algo_vpp_id, p.auth_key,
373                                   p.crypt_algo_vpp_id, p.crypt_key,
374                                   self.vpp_esp_protocol,
375                                   self.tun_if.local_addr[p.addr_type],
376                                   self.tun_if.remote_addr[p.addr_type])
377         p.tun_sa_out.add_vpp_config()
378
379         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
380                                  p.auth_algo_vpp_id, p.auth_key,
381                                  p.crypt_algo_vpp_id, p.crypt_key,
382                                  self.vpp_esp_protocol,
383                                  self.tun_if.remote_addr[p.addr_type],
384                                  self.tun_if.local_addr[p.addr_type])
385         p.tun_sa_in.add_vpp_config()
386
387     def config_protect(self, p):
388         p.tun_protect = VppIpsecTunProtect(self,
389                                            p.tun_if,
390                                            p.tun_sa_out,
391                                            [p.tun_sa_in])
392         p.tun_protect.add_vpp_config()
393
394     def config_network(self, p):
395         if hasattr(p, 'tun_dst'):
396             tun_dst = p.tun_dst
397         else:
398             tun_dst = self.pg0.remote_ip6
399         p.tun_if = VppIpIpTunInterface(self, self.pg0,
400                                        self.pg0.local_ip6,
401                                        tun_dst)
402         p.tun_if.add_vpp_config()
403         p.tun_if.admin_up()
404         p.tun_if.config_ip6()
405         p.tun_if.config_ip4()
406
407         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
408                              [VppRoutePath(p.tun_if.remote_ip6,
409                                            0xffffffff,
410                                            proto=DpoProto.DPO_PROTO_IP6)])
411         p.route.add_vpp_config()
412         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
413                        [VppRoutePath(p.tun_if.remote_ip4,
414                                      0xffffffff)])
415         r.add_vpp_config()
416
417     def unconfig_network(self, p):
418         p.route.remove_vpp_config()
419         p.tun_if.remove_vpp_config()
420
421     def unconfig_protect(self, p):
422         p.tun_protect.remove_vpp_config()
423
424     def unconfig_sa(self, p):
425         p.tun_sa_out.remove_vpp_config()
426         p.tun_sa_in.remove_vpp_config()
427
428
429 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
430                              TemplateIpsec):
431     """ IPsec tunnel interface tests """
432
433     encryption_type = ESP
434
435     def setUp(self):
436         super(TemplateIpsec6TunIfEsp, self).setUp()
437
438         self.tun_if = self.pg0
439
440         p = self.ipv6_params
441         self.config_network(p)
442         self.config_sa_tra(p)
443         self.config_protect(p)
444
445     def tearDown(self):
446         super(TemplateIpsec6TunIfEsp, self).tearDown()
447
448
449 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
450                           IpsecTun6Tests):
451     """ Ipsec ESP - TUN tests """
452     tun6_encrypt_node_name = "esp6-encrypt-tun"
453     tun6_decrypt_node_name = "esp6-decrypt-tun"
454
455     def test_tun_basic46(self):
456         """ ipsec 4o6 tunnel basic test """
457         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
458         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
459
460     def test_tun_burst46(self):
461         """ ipsec 4o6 tunnel burst test """
462         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
463         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
464
465
466 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
467                                 IpsecTun6HandoffTests):
468     """ Ipsec ESP 6 Handoff tests """
469     tun6_encrypt_node_name = "esp6-encrypt-tun"
470     tun6_decrypt_node_name = "esp6-decrypt-tun"
471
472
473 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
474                                 IpsecTun4HandoffTests):
475     """ Ipsec ESP 4 Handoff tests """
476     tun4_encrypt_node_name = "esp4-encrypt-tun"
477     tun4_decrypt_node_name = "esp4-decrypt-tun"
478
479
480 @tag_fixme_vpp_workers
481 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
482                               TemplateIpsec,
483                               IpsecTun4):
484     """ IPsec IPv4 Multi Tunnel interface """
485
486     encryption_type = ESP
487     tun4_encrypt_node_name = "esp4-encrypt-tun"
488     tun4_decrypt_node_name = "esp4-decrypt-tun"
489
490     def setUp(self):
491         super(TestIpsec4MultiTunIfEsp, self).setUp()
492
493         self.tun_if = self.pg0
494
495         self.multi_params = []
496         self.pg0.generate_remote_hosts(10)
497         self.pg0.configure_ipv4_neighbors()
498
499         for ii in range(10):
500             p = copy.copy(self.ipv4_params)
501
502             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
503             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
504             p.scapy_tun_spi = p.scapy_tun_spi + ii
505             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
506             p.vpp_tun_spi = p.vpp_tun_spi + ii
507
508             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
509             p.scapy_tra_spi = p.scapy_tra_spi + ii
510             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
511             p.vpp_tra_spi = p.vpp_tra_spi + ii
512             p.tun_dst = self.pg0.remote_hosts[ii].ip4
513
514             self.multi_params.append(p)
515             self.config_network(p)
516             self.config_sa_tra(p)
517             self.config_protect(p)
518
519     def tearDown(self):
520         super(TestIpsec4MultiTunIfEsp, self).tearDown()
521
522     def test_tun_44(self):
523         """Multiple IPSEC tunnel interfaces """
524         for p in self.multi_params:
525             self.verify_tun_44(p, count=127)
526             c = p.tun_if.get_rx_stats()
527             self.assertEqual(c['packets'], 127)
528             c = p.tun_if.get_tx_stats()
529             self.assertEqual(c['packets'], 127)
530
531     def test_tun_rr_44(self):
532         """ Round-robin packets acrros multiple interface """
533         tx = []
534         for p in self.multi_params:
535             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
536                                             src=p.remote_tun_if_host,
537                                             dst=self.pg1.remote_ip4)
538         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
539
540         for rx, p in zip(rxs, self.multi_params):
541             self.verify_decrypted(p, [rx])
542
543         tx = []
544         for p in self.multi_params:
545             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
546                                     dst=p.remote_tun_if_host)
547         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
548
549         for rx, p in zip(rxs, self.multi_params):
550             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
551
552
553 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
554                             TemplateIpsec,
555                             IpsecTun4):
556     """ IPsec IPv4 Tunnel interface all Algos """
557
558     encryption_type = ESP
559     tun4_encrypt_node_name = "esp4-encrypt-tun"
560     tun4_decrypt_node_name = "esp4-decrypt-tun"
561
562     def setUp(self):
563         super(TestIpsec4TunIfEspAll, self).setUp()
564
565         self.tun_if = self.pg0
566         p = self.ipv4_params
567
568         self.config_network(p)
569         self.config_sa_tra(p)
570         self.config_protect(p)
571
572     def tearDown(self):
573         p = self.ipv4_params
574         self.unconfig_protect(p)
575         self.unconfig_network(p)
576         self.unconfig_sa(p)
577
578         super(TestIpsec4TunIfEspAll, self).tearDown()
579
580     def rekey(self, p):
581         #
582         # change the key and the SPI
583         #
584         np = copy.copy(p)
585         p.crypt_key = b'X' + p.crypt_key[1:]
586         p.scapy_tun_spi += 1
587         p.scapy_tun_sa_id += 1
588         p.vpp_tun_spi += 1
589         p.vpp_tun_sa_id += 1
590         p.tun_if.local_spi = p.vpp_tun_spi
591         p.tun_if.remote_spi = p.scapy_tun_spi
592
593         config_tun_params(p, self.encryption_type, p.tun_if)
594
595         p.tun_sa_out = VppIpsecSA(self,
596                                   p.scapy_tun_sa_id,
597                                   p.scapy_tun_spi,
598                                   p.auth_algo_vpp_id,
599                                   p.auth_key,
600                                   p.crypt_algo_vpp_id,
601                                   p.crypt_key,
602                                   self.vpp_esp_protocol,
603                                   flags=p.flags,
604                                   salt=p.salt)
605         p.tun_sa_in = VppIpsecSA(self,
606                                  p.vpp_tun_sa_id,
607                                  p.vpp_tun_spi,
608                                  p.auth_algo_vpp_id,
609                                  p.auth_key,
610                                  p.crypt_algo_vpp_id,
611                                  p.crypt_key,
612                                  self.vpp_esp_protocol,
613                                  flags=p.flags,
614                                  salt=p.salt)
615         p.tun_sa_in.add_vpp_config()
616         p.tun_sa_out.add_vpp_config()
617
618         self.config_protect(p)
619         np.tun_sa_out.remove_vpp_config()
620         np.tun_sa_in.remove_vpp_config()
621         self.logger.info(self.vapi.cli("sh ipsec sa"))
622
623     def test_tun_44(self):
624         """IPSEC tunnel all algos """
625
626         # foreach VPP crypto engine
627         engines = ["ia32", "ipsecmb", "openssl"]
628
629         # foreach crypto algorithm
630         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
631                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
632                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
633                                 IPSEC_API_INTEG_ALG_NONE),
634                   'scapy-crypto': "AES-GCM",
635                   'scapy-integ': "NULL",
636                   'key': b"JPjyOWBeVEQiMe7h",
637                   'salt': 3333},
638                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
639                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
640                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
641                                 IPSEC_API_INTEG_ALG_NONE),
642                   'scapy-crypto': "AES-GCM",
643                   'scapy-integ': "NULL",
644                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
645                   'salt': 0},
646                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
647                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
648                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
649                                 IPSEC_API_INTEG_ALG_NONE),
650                   'scapy-crypto': "AES-GCM",
651                   'scapy-integ': "NULL",
652                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
653                   'salt': 9999},
654                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
655                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
656                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
657                                 IPSEC_API_INTEG_ALG_SHA1_96),
658                   'scapy-crypto': "AES-CBC",
659                   'scapy-integ': "HMAC-SHA1-96",
660                   'salt': 0,
661                   'key': b"JPjyOWBeVEQiMe7h"},
662                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
663                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
664                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
665                                 IPSEC_API_INTEG_ALG_SHA_512_256),
666                   'scapy-crypto': "AES-CBC",
667                   'scapy-integ': "SHA2-512-256",
668                   'salt': 0,
669                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
670                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
671                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
672                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
673                                 IPSEC_API_INTEG_ALG_SHA_256_128),
674                   'scapy-crypto': "AES-CBC",
675                   'scapy-integ': "SHA2-256-128",
676                   'salt': 0,
677                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
678                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
679                                  IPSEC_API_CRYPTO_ALG_NONE),
680                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
681                                 IPSEC_API_INTEG_ALG_SHA1_96),
682                   'scapy-crypto': "NULL",
683                   'scapy-integ': "HMAC-SHA1-96",
684                   'salt': 0,
685                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
686
687         for engine in engines:
688             self.vapi.cli("set crypto handler all %s" % engine)
689
690             #
691             # loop through each of the algorithms
692             #
693             for algo in algos:
694                 # with self.subTest(algo=algo['scapy']):
695
696                 p = self.ipv4_params
697                 p.auth_algo_vpp_id = algo['vpp-integ']
698                 p.crypt_algo_vpp_id = algo['vpp-crypto']
699                 p.crypt_algo = algo['scapy-crypto']
700                 p.auth_algo = algo['scapy-integ']
701                 p.crypt_key = algo['key']
702                 p.salt = algo['salt']
703
704                 #
705                 # rekey the tunnel
706                 #
707                 self.rekey(p)
708                 self.verify_tun_44(p, count=127)
709
710
711 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
712                                TemplateIpsec,
713                                IpsecTun4):
714     """ IPsec IPv4 Tunnel interface no Algos """
715
716     encryption_type = ESP
717     tun4_encrypt_node_name = "esp4-encrypt-tun"
718     tun4_decrypt_node_name = "esp4-decrypt-tun"
719
720     def setUp(self):
721         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
722
723         self.tun_if = self.pg0
724         p = self.ipv4_params
725         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
726                               IPSEC_API_INTEG_ALG_NONE)
727         p.auth_algo = 'NULL'
728         p.auth_key = []
729
730         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
731                                IPSEC_API_CRYPTO_ALG_NONE)
732         p.crypt_algo = 'NULL'
733         p.crypt_key = []
734
735     def tearDown(self):
736         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
737
738     def test_tun_44(self):
739         """ IPSec SA with NULL algos """
740         p = self.ipv4_params
741
742         self.config_network(p)
743         self.config_sa_tra(p)
744         self.config_protect(p)
745
746         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
747                            dst=p.remote_tun_if_host)
748         self.send_and_assert_no_replies(self.pg1, tx)
749
750         self.unconfig_protect(p)
751         self.unconfig_sa(p)
752         self.unconfig_network(p)
753
754
755 @tag_fixme_vpp_workers
756 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
757                               TemplateIpsec,
758                               IpsecTun6):
759     """ IPsec IPv6 Multi Tunnel interface """
760
761     encryption_type = ESP
762     tun6_encrypt_node_name = "esp6-encrypt-tun"
763     tun6_decrypt_node_name = "esp6-decrypt-tun"
764
765     def setUp(self):
766         super(TestIpsec6MultiTunIfEsp, self).setUp()
767
768         self.tun_if = self.pg0
769
770         self.multi_params = []
771         self.pg0.generate_remote_hosts(10)
772         self.pg0.configure_ipv6_neighbors()
773
774         for ii in range(10):
775             p = copy.copy(self.ipv6_params)
776
777             p.remote_tun_if_host = "1111::%d" % (ii + 1)
778             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
779             p.scapy_tun_spi = p.scapy_tun_spi + ii
780             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
781             p.vpp_tun_spi = p.vpp_tun_spi + ii
782
783             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
784             p.scapy_tra_spi = p.scapy_tra_spi + ii
785             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
786             p.vpp_tra_spi = p.vpp_tra_spi + ii
787             p.tun_dst = self.pg0.remote_hosts[ii].ip6
788
789             self.multi_params.append(p)
790             self.config_network(p)
791             self.config_sa_tra(p)
792             self.config_protect(p)
793
794     def tearDown(self):
795         super(TestIpsec6MultiTunIfEsp, self).tearDown()
796
797     def test_tun_66(self):
798         """Multiple IPSEC tunnel interfaces """
799         for p in self.multi_params:
800             self.verify_tun_66(p, count=127)
801             c = p.tun_if.get_rx_stats()
802             self.assertEqual(c['packets'], 127)
803             c = p.tun_if.get_tx_stats()
804             self.assertEqual(c['packets'], 127)
805
806
807 class TestIpsecGreTebIfEsp(TemplateIpsec,
808                            IpsecTun4Tests):
809     """ Ipsec GRE TEB ESP - TUN tests """
810     tun4_encrypt_node_name = "esp4-encrypt-tun"
811     tun4_decrypt_node_name = "esp4-decrypt-tun"
812     encryption_type = ESP
813     omac = "00:11:22:33:44:55"
814
815     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
816                          payload_size=100):
817         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
818                 sa.encrypt(IP(src=self.pg0.remote_ip4,
819                               dst=self.pg0.local_ip4) /
820                            GRE() /
821                            Ether(dst=self.omac) /
822                            IP(src="1.1.1.1", dst="1.1.1.2") /
823                            UDP(sport=1144, dport=2233) /
824                            Raw(b'X' * payload_size))
825                 for i in range(count)]
826
827     def gen_pkts(self, sw_intf, src, dst, count=1,
828                  payload_size=100):
829         return [Ether(dst=self.omac) /
830                 IP(src="1.1.1.1", dst="1.1.1.2") /
831                 UDP(sport=1144, dport=2233) /
832                 Raw(b'X' * payload_size)
833                 for i in range(count)]
834
835     def verify_decrypted(self, p, rxs):
836         for rx in rxs:
837             self.assert_equal(rx[Ether].dst, self.omac)
838             self.assert_equal(rx[IP].dst, "1.1.1.2")
839
840     def verify_encrypted(self, p, sa, rxs):
841         for rx in rxs:
842             try:
843                 pkt = sa.decrypt(rx[IP])
844                 if not pkt.haslayer(IP):
845                     pkt = IP(pkt[Raw].load)
846                 self.assert_packet_checksums_valid(pkt)
847                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
848                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
849                 self.assertTrue(pkt.haslayer(GRE))
850                 e = pkt[Ether]
851                 self.assertEqual(e[Ether].dst, self.omac)
852                 self.assertEqual(e[IP].dst, "1.1.1.2")
853             except (IndexError, AssertionError):
854                 self.logger.debug(ppp("Unexpected packet:", rx))
855                 try:
856                     self.logger.debug(ppp("Decrypted packet:", pkt))
857                 except:
858                     pass
859                 raise
860
861     def setUp(self):
862         super(TestIpsecGreTebIfEsp, self).setUp()
863
864         self.tun_if = self.pg0
865
866         p = self.ipv4_params
867
868         bd1 = VppBridgeDomain(self, 1)
869         bd1.add_vpp_config()
870
871         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
872                                   p.auth_algo_vpp_id, p.auth_key,
873                                   p.crypt_algo_vpp_id, p.crypt_key,
874                                   self.vpp_esp_protocol,
875                                   self.pg0.local_ip4,
876                                   self.pg0.remote_ip4)
877         p.tun_sa_out.add_vpp_config()
878
879         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
880                                  p.auth_algo_vpp_id, p.auth_key,
881                                  p.crypt_algo_vpp_id, p.crypt_key,
882                                  self.vpp_esp_protocol,
883                                  self.pg0.remote_ip4,
884                                  self.pg0.local_ip4)
885         p.tun_sa_in.add_vpp_config()
886
887         p.tun_if = VppGreInterface(self,
888                                    self.pg0.local_ip4,
889                                    self.pg0.remote_ip4,
890                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
891                                          GRE_API_TUNNEL_TYPE_TEB))
892         p.tun_if.add_vpp_config()
893
894         p.tun_protect = VppIpsecTunProtect(self,
895                                            p.tun_if,
896                                            p.tun_sa_out,
897                                            [p.tun_sa_in])
898
899         p.tun_protect.add_vpp_config()
900
901         p.tun_if.admin_up()
902         p.tun_if.config_ip4()
903         config_tun_params(p, self.encryption_type, p.tun_if)
904
905         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
906         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
907
908         self.vapi.cli("clear ipsec sa")
909         self.vapi.cli("sh adj")
910         self.vapi.cli("sh ipsec tun")
911
912     def tearDown(self):
913         p = self.ipv4_params
914         p.tun_if.unconfig_ip4()
915         super(TestIpsecGreTebIfEsp, self).tearDown()
916
917
918 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
919                                IpsecTun4Tests):
920     """ Ipsec GRE TEB ESP - TUN tests """
921     tun4_encrypt_node_name = "esp4-encrypt-tun"
922     tun4_decrypt_node_name = "esp4-decrypt-tun"
923     encryption_type = ESP
924     omac = "00:11:22:33:44:55"
925
926     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
927                          payload_size=100):
928         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
929                 sa.encrypt(IP(src=self.pg0.remote_ip4,
930                               dst=self.pg0.local_ip4) /
931                            GRE() /
932                            Ether(dst=self.omac) /
933                            IP(src="1.1.1.1", dst="1.1.1.2") /
934                            UDP(sport=1144, dport=2233) /
935                            Raw(b'X' * payload_size))
936                 for i in range(count)]
937
938     def gen_pkts(self, sw_intf, src, dst, count=1,
939                  payload_size=100):
940         return [Ether(dst=self.omac) /
941                 Dot1Q(vlan=11) /
942                 IP(src="1.1.1.1", dst="1.1.1.2") /
943                 UDP(sport=1144, dport=2233) /
944                 Raw(b'X' * payload_size)
945                 for i in range(count)]
946
947     def verify_decrypted(self, p, rxs):
948         for rx in rxs:
949             self.assert_equal(rx[Ether].dst, self.omac)
950             self.assert_equal(rx[Dot1Q].vlan, 11)
951             self.assert_equal(rx[IP].dst, "1.1.1.2")
952
953     def verify_encrypted(self, p, sa, rxs):
954         for rx in rxs:
955             try:
956                 pkt = sa.decrypt(rx[IP])
957                 if not pkt.haslayer(IP):
958                     pkt = IP(pkt[Raw].load)
959                 self.assert_packet_checksums_valid(pkt)
960                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
961                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
962                 self.assertTrue(pkt.haslayer(GRE))
963                 e = pkt[Ether]
964                 self.assertEqual(e[Ether].dst, self.omac)
965                 self.assertFalse(e.haslayer(Dot1Q))
966                 self.assertEqual(e[IP].dst, "1.1.1.2")
967             except (IndexError, AssertionError):
968                 self.logger.debug(ppp("Unexpected packet:", rx))
969                 try:
970                     self.logger.debug(ppp("Decrypted packet:", pkt))
971                 except:
972                     pass
973                 raise
974
975     def setUp(self):
976         super(TestIpsecGreTebVlanIfEsp, self).setUp()
977
978         self.tun_if = self.pg0
979
980         p = self.ipv4_params
981
982         bd1 = VppBridgeDomain(self, 1)
983         bd1.add_vpp_config()
984
985         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
986         self.vapi.l2_interface_vlan_tag_rewrite(
987             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
988             push_dot1q=11)
989         self.pg1_11.admin_up()
990
991         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
992                                   p.auth_algo_vpp_id, p.auth_key,
993                                   p.crypt_algo_vpp_id, p.crypt_key,
994                                   self.vpp_esp_protocol,
995                                   self.pg0.local_ip4,
996                                   self.pg0.remote_ip4)
997         p.tun_sa_out.add_vpp_config()
998
999         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1000                                  p.auth_algo_vpp_id, p.auth_key,
1001                                  p.crypt_algo_vpp_id, p.crypt_key,
1002                                  self.vpp_esp_protocol,
1003                                  self.pg0.remote_ip4,
1004                                  self.pg0.local_ip4)
1005         p.tun_sa_in.add_vpp_config()
1006
1007         p.tun_if = VppGreInterface(self,
1008                                    self.pg0.local_ip4,
1009                                    self.pg0.remote_ip4,
1010                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1011                                          GRE_API_TUNNEL_TYPE_TEB))
1012         p.tun_if.add_vpp_config()
1013
1014         p.tun_protect = VppIpsecTunProtect(self,
1015                                            p.tun_if,
1016                                            p.tun_sa_out,
1017                                            [p.tun_sa_in])
1018
1019         p.tun_protect.add_vpp_config()
1020
1021         p.tun_if.admin_up()
1022         p.tun_if.config_ip4()
1023         config_tun_params(p, self.encryption_type, p.tun_if)
1024
1025         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1026         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1027
1028         self.vapi.cli("clear ipsec sa")
1029
1030     def tearDown(self):
1031         p = self.ipv4_params
1032         p.tun_if.unconfig_ip4()
1033         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1034         self.pg1_11.admin_down()
1035         self.pg1_11.remove_vpp_config()
1036
1037
1038 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1039                               IpsecTun4Tests):
1040     """ Ipsec GRE TEB ESP - Tra tests """
1041     tun4_encrypt_node_name = "esp4-encrypt-tun"
1042     tun4_decrypt_node_name = "esp4-decrypt-tun"
1043     encryption_type = ESP
1044     omac = "00:11:22:33:44:55"
1045
1046     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1047                          payload_size=100):
1048         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1049                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1050                               dst=self.pg0.local_ip4) /
1051                            GRE() /
1052                            Ether(dst=self.omac) /
1053                            IP(src="1.1.1.1", dst="1.1.1.2") /
1054                            UDP(sport=1144, dport=2233) /
1055                            Raw(b'X' * payload_size))
1056                 for i in range(count)]
1057
1058     def gen_pkts(self, sw_intf, src, dst, count=1,
1059                  payload_size=100):
1060         return [Ether(dst=self.omac) /
1061                 IP(src="1.1.1.1", dst="1.1.1.2") /
1062                 UDP(sport=1144, dport=2233) /
1063                 Raw(b'X' * payload_size)
1064                 for i in range(count)]
1065
1066     def verify_decrypted(self, p, rxs):
1067         for rx in rxs:
1068             self.assert_equal(rx[Ether].dst, self.omac)
1069             self.assert_equal(rx[IP].dst, "1.1.1.2")
1070
1071     def verify_encrypted(self, p, sa, rxs):
1072         for rx in rxs:
1073             try:
1074                 pkt = sa.decrypt(rx[IP])
1075                 if not pkt.haslayer(IP):
1076                     pkt = IP(pkt[Raw].load)
1077                 self.assert_packet_checksums_valid(pkt)
1078                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1079                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1080                 self.assertTrue(pkt.haslayer(GRE))
1081                 e = pkt[Ether]
1082                 self.assertEqual(e[Ether].dst, self.omac)
1083                 self.assertEqual(e[IP].dst, "1.1.1.2")
1084             except (IndexError, AssertionError):
1085                 self.logger.debug(ppp("Unexpected packet:", rx))
1086                 try:
1087                     self.logger.debug(ppp("Decrypted packet:", pkt))
1088                 except:
1089                     pass
1090                 raise
1091
1092     def setUp(self):
1093         super(TestIpsecGreTebIfEspTra, self).setUp()
1094
1095         self.tun_if = self.pg0
1096
1097         p = self.ipv4_params
1098
1099         bd1 = VppBridgeDomain(self, 1)
1100         bd1.add_vpp_config()
1101
1102         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1103                                   p.auth_algo_vpp_id, p.auth_key,
1104                                   p.crypt_algo_vpp_id, p.crypt_key,
1105                                   self.vpp_esp_protocol)
1106         p.tun_sa_out.add_vpp_config()
1107
1108         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1109                                  p.auth_algo_vpp_id, p.auth_key,
1110                                  p.crypt_algo_vpp_id, p.crypt_key,
1111                                  self.vpp_esp_protocol)
1112         p.tun_sa_in.add_vpp_config()
1113
1114         p.tun_if = VppGreInterface(self,
1115                                    self.pg0.local_ip4,
1116                                    self.pg0.remote_ip4,
1117                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1118                                          GRE_API_TUNNEL_TYPE_TEB))
1119         p.tun_if.add_vpp_config()
1120
1121         p.tun_protect = VppIpsecTunProtect(self,
1122                                            p.tun_if,
1123                                            p.tun_sa_out,
1124                                            [p.tun_sa_in])
1125
1126         p.tun_protect.add_vpp_config()
1127
1128         p.tun_if.admin_up()
1129         p.tun_if.config_ip4()
1130         config_tra_params(p, self.encryption_type, p.tun_if)
1131
1132         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1133         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1134
1135         self.vapi.cli("clear ipsec sa")
1136
1137     def tearDown(self):
1138         p = self.ipv4_params
1139         p.tun_if.unconfig_ip4()
1140         super(TestIpsecGreTebIfEspTra, self).tearDown()
1141
1142
1143 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1144                                  IpsecTun4Tests):
1145     """ Ipsec GRE TEB UDP ESP - Tra tests """
1146     tun4_encrypt_node_name = "esp4-encrypt-tun"
1147     tun4_decrypt_node_name = "esp4-decrypt-tun"
1148     encryption_type = ESP
1149     omac = "00:11:22:33:44:55"
1150
1151     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1152                          payload_size=100):
1153         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1154                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1155                               dst=self.pg0.local_ip4) /
1156                            GRE() /
1157                            Ether(dst=self.omac) /
1158                            IP(src="1.1.1.1", dst="1.1.1.2") /
1159                            UDP(sport=1144, dport=2233) /
1160                            Raw(b'X' * payload_size))
1161                 for i in range(count)]
1162
1163     def gen_pkts(self, sw_intf, src, dst, count=1,
1164                  payload_size=100):
1165         return [Ether(dst=self.omac) /
1166                 IP(src="1.1.1.1", dst="1.1.1.2") /
1167                 UDP(sport=1144, dport=2233) /
1168                 Raw(b'X' * payload_size)
1169                 for i in range(count)]
1170
1171     def verify_decrypted(self, p, rxs):
1172         for rx in rxs:
1173             self.assert_equal(rx[Ether].dst, self.omac)
1174             self.assert_equal(rx[IP].dst, "1.1.1.2")
1175
1176     def verify_encrypted(self, p, sa, rxs):
1177         for rx in rxs:
1178             self.assertTrue(rx.haslayer(UDP))
1179             self.assertEqual(rx[UDP].dport, 4545)
1180             self.assertEqual(rx[UDP].sport, 5454)
1181             try:
1182                 pkt = sa.decrypt(rx[IP])
1183                 if not pkt.haslayer(IP):
1184                     pkt = IP(pkt[Raw].load)
1185                 self.assert_packet_checksums_valid(pkt)
1186                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1187                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1188                 self.assertTrue(pkt.haslayer(GRE))
1189                 e = pkt[Ether]
1190                 self.assertEqual(e[Ether].dst, self.omac)
1191                 self.assertEqual(e[IP].dst, "1.1.1.2")
1192             except (IndexError, AssertionError):
1193                 self.logger.debug(ppp("Unexpected packet:", rx))
1194                 try:
1195                     self.logger.debug(ppp("Decrypted packet:", pkt))
1196                 except:
1197                     pass
1198                 raise
1199
1200     def setUp(self):
1201         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1202
1203         self.tun_if = self.pg0
1204
1205         p = self.ipv4_params
1206         p = self.ipv4_params
1207         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1208                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1209         p.nat_header = UDP(sport=5454, dport=4545)
1210
1211         bd1 = VppBridgeDomain(self, 1)
1212         bd1.add_vpp_config()
1213
1214         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1215                                   p.auth_algo_vpp_id, p.auth_key,
1216                                   p.crypt_algo_vpp_id, p.crypt_key,
1217                                   self.vpp_esp_protocol,
1218                                   flags=p.flags,
1219                                   udp_src=5454,
1220                                   udp_dst=4545)
1221         p.tun_sa_out.add_vpp_config()
1222
1223         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1224                                  p.auth_algo_vpp_id, p.auth_key,
1225                                  p.crypt_algo_vpp_id, p.crypt_key,
1226                                  self.vpp_esp_protocol,
1227                                  flags=(p.flags |
1228                                         VppEnum.vl_api_ipsec_sad_flags_t.
1229                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1230                                  udp_src=5454,
1231                                  udp_dst=4545)
1232         p.tun_sa_in.add_vpp_config()
1233
1234         p.tun_if = VppGreInterface(self,
1235                                    self.pg0.local_ip4,
1236                                    self.pg0.remote_ip4,
1237                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1238                                          GRE_API_TUNNEL_TYPE_TEB))
1239         p.tun_if.add_vpp_config()
1240
1241         p.tun_protect = VppIpsecTunProtect(self,
1242                                            p.tun_if,
1243                                            p.tun_sa_out,
1244                                            [p.tun_sa_in])
1245
1246         p.tun_protect.add_vpp_config()
1247
1248         p.tun_if.admin_up()
1249         p.tun_if.config_ip4()
1250         config_tra_params(p, self.encryption_type, p.tun_if)
1251
1252         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1253         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1254
1255         self.vapi.cli("clear ipsec sa")
1256         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1257
1258     def tearDown(self):
1259         p = self.ipv4_params
1260         p.tun_if.unconfig_ip4()
1261         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1262
1263
1264 class TestIpsecGreIfEsp(TemplateIpsec,
1265                         IpsecTun4Tests):
1266     """ Ipsec GRE ESP - TUN tests """
1267     tun4_encrypt_node_name = "esp4-encrypt-tun"
1268     tun4_decrypt_node_name = "esp4-decrypt-tun"
1269     encryption_type = ESP
1270
1271     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1272                          payload_size=100):
1273         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1274                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1275                               dst=self.pg0.local_ip4) /
1276                            GRE() /
1277                            IP(src=self.pg1.local_ip4,
1278                               dst=self.pg1.remote_ip4) /
1279                            UDP(sport=1144, dport=2233) /
1280                            Raw(b'X' * payload_size))
1281                 for i in range(count)]
1282
1283     def gen_pkts(self, sw_intf, src, dst, count=1,
1284                  payload_size=100):
1285         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1286                 IP(src="1.1.1.1", dst="1.1.1.2") /
1287                 UDP(sport=1144, dport=2233) /
1288                 Raw(b'X' * payload_size)
1289                 for i in range(count)]
1290
1291     def verify_decrypted(self, p, rxs):
1292         for rx in rxs:
1293             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1294             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1295
1296     def verify_encrypted(self, p, sa, rxs):
1297         for rx in rxs:
1298             try:
1299                 pkt = sa.decrypt(rx[IP])
1300                 if not pkt.haslayer(IP):
1301                     pkt = IP(pkt[Raw].load)
1302                 self.assert_packet_checksums_valid(pkt)
1303                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1304                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1305                 self.assertTrue(pkt.haslayer(GRE))
1306                 e = pkt[GRE]
1307                 self.assertEqual(e[IP].dst, "1.1.1.2")
1308             except (IndexError, AssertionError):
1309                 self.logger.debug(ppp("Unexpected packet:", rx))
1310                 try:
1311                     self.logger.debug(ppp("Decrypted packet:", pkt))
1312                 except:
1313                     pass
1314                 raise
1315
1316     def setUp(self):
1317         super(TestIpsecGreIfEsp, self).setUp()
1318
1319         self.tun_if = self.pg0
1320
1321         p = self.ipv4_params
1322
1323         bd1 = VppBridgeDomain(self, 1)
1324         bd1.add_vpp_config()
1325
1326         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1327                                   p.auth_algo_vpp_id, p.auth_key,
1328                                   p.crypt_algo_vpp_id, p.crypt_key,
1329                                   self.vpp_esp_protocol,
1330                                   self.pg0.local_ip4,
1331                                   self.pg0.remote_ip4)
1332         p.tun_sa_out.add_vpp_config()
1333
1334         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1335                                  p.auth_algo_vpp_id, p.auth_key,
1336                                  p.crypt_algo_vpp_id, p.crypt_key,
1337                                  self.vpp_esp_protocol,
1338                                  self.pg0.remote_ip4,
1339                                  self.pg0.local_ip4)
1340         p.tun_sa_in.add_vpp_config()
1341
1342         p.tun_if = VppGreInterface(self,
1343                                    self.pg0.local_ip4,
1344                                    self.pg0.remote_ip4)
1345         p.tun_if.add_vpp_config()
1346
1347         p.tun_protect = VppIpsecTunProtect(self,
1348                                            p.tun_if,
1349                                            p.tun_sa_out,
1350                                            [p.tun_sa_in])
1351         p.tun_protect.add_vpp_config()
1352
1353         p.tun_if.admin_up()
1354         p.tun_if.config_ip4()
1355         config_tun_params(p, self.encryption_type, p.tun_if)
1356
1357         VppIpRoute(self, "1.1.1.2", 32,
1358                    [VppRoutePath(p.tun_if.remote_ip4,
1359                                  0xffffffff)]).add_vpp_config()
1360
1361     def tearDown(self):
1362         p = self.ipv4_params
1363         p.tun_if.unconfig_ip4()
1364         super(TestIpsecGreIfEsp, self).tearDown()
1365
1366
1367 class TestIpsecGreIfEspTra(TemplateIpsec,
1368                            IpsecTun4Tests):
1369     """ Ipsec GRE ESP - TRA tests """
1370     tun4_encrypt_node_name = "esp4-encrypt-tun"
1371     tun4_decrypt_node_name = "esp4-decrypt-tun"
1372     encryption_type = ESP
1373
1374     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1375                          payload_size=100):
1376         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1377                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1378                               dst=self.pg0.local_ip4) /
1379                            GRE() /
1380                            IP(src=self.pg1.local_ip4,
1381                               dst=self.pg1.remote_ip4) /
1382                            UDP(sport=1144, dport=2233) /
1383                            Raw(b'X' * payload_size))
1384                 for i in range(count)]
1385
1386     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1387                                 payload_size=100):
1388         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1389                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1390                               dst=self.pg0.local_ip4) /
1391                            GRE() /
1392                            UDP(sport=1144, dport=2233) /
1393                            Raw(b'X' * payload_size))
1394                 for i in range(count)]
1395
1396     def gen_pkts(self, sw_intf, src, dst, count=1,
1397                  payload_size=100):
1398         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1399                 IP(src="1.1.1.1", dst="1.1.1.2") /
1400                 UDP(sport=1144, dport=2233) /
1401                 Raw(b'X' * payload_size)
1402                 for i in range(count)]
1403
1404     def verify_decrypted(self, p, rxs):
1405         for rx in rxs:
1406             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1407             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1408
1409     def verify_encrypted(self, p, sa, rxs):
1410         for rx in rxs:
1411             try:
1412                 pkt = sa.decrypt(rx[IP])
1413                 if not pkt.haslayer(IP):
1414                     pkt = IP(pkt[Raw].load)
1415                 self.assert_packet_checksums_valid(pkt)
1416                 self.assertTrue(pkt.haslayer(GRE))
1417                 e = pkt[GRE]
1418                 self.assertEqual(e[IP].dst, "1.1.1.2")
1419             except (IndexError, AssertionError):
1420                 self.logger.debug(ppp("Unexpected packet:", rx))
1421                 try:
1422                     self.logger.debug(ppp("Decrypted packet:", pkt))
1423                 except:
1424                     pass
1425                 raise
1426
1427     def setUp(self):
1428         super(TestIpsecGreIfEspTra, self).setUp()
1429
1430         self.tun_if = self.pg0
1431
1432         p = self.ipv4_params
1433
1434         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1435                                   p.auth_algo_vpp_id, p.auth_key,
1436                                   p.crypt_algo_vpp_id, p.crypt_key,
1437                                   self.vpp_esp_protocol)
1438         p.tun_sa_out.add_vpp_config()
1439
1440         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1441                                  p.auth_algo_vpp_id, p.auth_key,
1442                                  p.crypt_algo_vpp_id, p.crypt_key,
1443                                  self.vpp_esp_protocol)
1444         p.tun_sa_in.add_vpp_config()
1445
1446         p.tun_if = VppGreInterface(self,
1447                                    self.pg0.local_ip4,
1448                                    self.pg0.remote_ip4)
1449         p.tun_if.add_vpp_config()
1450
1451         p.tun_protect = VppIpsecTunProtect(self,
1452                                            p.tun_if,
1453                                            p.tun_sa_out,
1454                                            [p.tun_sa_in])
1455         p.tun_protect.add_vpp_config()
1456
1457         p.tun_if.admin_up()
1458         p.tun_if.config_ip4()
1459         config_tra_params(p, self.encryption_type, p.tun_if)
1460
1461         VppIpRoute(self, "1.1.1.2", 32,
1462                    [VppRoutePath(p.tun_if.remote_ip4,
1463                                  0xffffffff)]).add_vpp_config()
1464
1465     def tearDown(self):
1466         p = self.ipv4_params
1467         p.tun_if.unconfig_ip4()
1468         super(TestIpsecGreIfEspTra, self).tearDown()
1469
1470     def test_gre_non_ip(self):
1471         p = self.ipv4_params
1472         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1473                                           src=p.remote_tun_if_host,
1474                                           dst=self.pg1.remote_ip6)
1475         self.send_and_assert_no_replies(self.tun_if, tx)
1476         node_name = ('/err/%s/unsupported payload' %
1477                      self.tun4_decrypt_node_name)
1478         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1479
1480
1481 class TestIpsecGre6IfEspTra(TemplateIpsec,
1482                             IpsecTun6Tests):
1483     """ Ipsec GRE ESP - TRA tests """
1484     tun6_encrypt_node_name = "esp6-encrypt-tun"
1485     tun6_decrypt_node_name = "esp6-decrypt-tun"
1486     encryption_type = ESP
1487
1488     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1489                           payload_size=100):
1490         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1491                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1492                                 dst=self.pg0.local_ip6) /
1493                            GRE() /
1494                            IPv6(src=self.pg1.local_ip6,
1495                                 dst=self.pg1.remote_ip6) /
1496                            UDP(sport=1144, dport=2233) /
1497                            Raw(b'X' * payload_size))
1498                 for i in range(count)]
1499
1500     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1501                   payload_size=100):
1502         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1503                 IPv6(src="1::1", dst="1::2") /
1504                 UDP(sport=1144, dport=2233) /
1505                 Raw(b'X' * payload_size)
1506                 for i in range(count)]
1507
1508     def verify_decrypted6(self, p, rxs):
1509         for rx in rxs:
1510             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1511             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1512
1513     def verify_encrypted6(self, p, sa, rxs):
1514         for rx in rxs:
1515             try:
1516                 pkt = sa.decrypt(rx[IPv6])
1517                 if not pkt.haslayer(IPv6):
1518                     pkt = IPv6(pkt[Raw].load)
1519                 self.assert_packet_checksums_valid(pkt)
1520                 self.assertTrue(pkt.haslayer(GRE))
1521                 e = pkt[GRE]
1522                 self.assertEqual(e[IPv6].dst, "1::2")
1523             except (IndexError, AssertionError):
1524                 self.logger.debug(ppp("Unexpected packet:", rx))
1525                 try:
1526                     self.logger.debug(ppp("Decrypted packet:", pkt))
1527                 except:
1528                     pass
1529                 raise
1530
1531     def setUp(self):
1532         super(TestIpsecGre6IfEspTra, self).setUp()
1533
1534         self.tun_if = self.pg0
1535
1536         p = self.ipv6_params
1537
1538         bd1 = VppBridgeDomain(self, 1)
1539         bd1.add_vpp_config()
1540
1541         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1542                                   p.auth_algo_vpp_id, p.auth_key,
1543                                   p.crypt_algo_vpp_id, p.crypt_key,
1544                                   self.vpp_esp_protocol)
1545         p.tun_sa_out.add_vpp_config()
1546
1547         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1548                                  p.auth_algo_vpp_id, p.auth_key,
1549                                  p.crypt_algo_vpp_id, p.crypt_key,
1550                                  self.vpp_esp_protocol)
1551         p.tun_sa_in.add_vpp_config()
1552
1553         p.tun_if = VppGreInterface(self,
1554                                    self.pg0.local_ip6,
1555                                    self.pg0.remote_ip6)
1556         p.tun_if.add_vpp_config()
1557
1558         p.tun_protect = VppIpsecTunProtect(self,
1559                                            p.tun_if,
1560                                            p.tun_sa_out,
1561                                            [p.tun_sa_in])
1562         p.tun_protect.add_vpp_config()
1563
1564         p.tun_if.admin_up()
1565         p.tun_if.config_ip6()
1566         config_tra_params(p, self.encryption_type, p.tun_if)
1567
1568         r = VppIpRoute(self, "1::2", 128,
1569                        [VppRoutePath(p.tun_if.remote_ip6,
1570                                      0xffffffff,
1571                                      proto=DpoProto.DPO_PROTO_IP6)])
1572         r.add_vpp_config()
1573
1574     def tearDown(self):
1575         p = self.ipv6_params
1576         p.tun_if.unconfig_ip6()
1577         super(TestIpsecGre6IfEspTra, self).tearDown()
1578
1579
1580 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1581     """ Ipsec mGRE ESP v4 TRA tests """
1582     tun4_encrypt_node_name = "esp4-encrypt-tun"
1583     tun4_decrypt_node_name = "esp4-decrypt-tun"
1584     encryption_type = ESP
1585
1586     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1587                          payload_size=100):
1588         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1589                 sa.encrypt(IP(src=p.tun_dst,
1590                               dst=self.pg0.local_ip4) /
1591                            GRE() /
1592                            IP(src=self.pg1.local_ip4,
1593                               dst=self.pg1.remote_ip4) /
1594                            UDP(sport=1144, dport=2233) /
1595                            Raw(b'X' * payload_size))
1596                 for i in range(count)]
1597
1598     def gen_pkts(self, sw_intf, src, dst, count=1,
1599                  payload_size=100):
1600         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1601                 IP(src="1.1.1.1", dst=dst) /
1602                 UDP(sport=1144, dport=2233) /
1603                 Raw(b'X' * payload_size)
1604                 for i in range(count)]
1605
1606     def verify_decrypted(self, p, rxs):
1607         for rx in rxs:
1608             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1609             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1610
1611     def verify_encrypted(self, p, sa, rxs):
1612         for rx in rxs:
1613             try:
1614                 pkt = sa.decrypt(rx[IP])
1615                 if not pkt.haslayer(IP):
1616                     pkt = IP(pkt[Raw].load)
1617                 self.assert_packet_checksums_valid(pkt)
1618                 self.assertTrue(pkt.haslayer(GRE))
1619                 e = pkt[GRE]
1620                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1621             except (IndexError, AssertionError):
1622                 self.logger.debug(ppp("Unexpected packet:", rx))
1623                 try:
1624                     self.logger.debug(ppp("Decrypted packet:", pkt))
1625                 except:
1626                     pass
1627                 raise
1628
1629     def setUp(self):
1630         super(TestIpsecMGreIfEspTra4, self).setUp()
1631
1632         N_NHS = 16
1633         self.tun_if = self.pg0
1634         p = self.ipv4_params
1635         p.tun_if = VppGreInterface(self,
1636                                    self.pg0.local_ip4,
1637                                    "0.0.0.0",
1638                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1639                                          TUNNEL_API_MODE_MP))
1640         p.tun_if.add_vpp_config()
1641         p.tun_if.admin_up()
1642         p.tun_if.config_ip4()
1643         p.tun_if.generate_remote_hosts(N_NHS)
1644         self.pg0.generate_remote_hosts(N_NHS)
1645         self.pg0.configure_ipv4_neighbors()
1646
1647         # setup some SAs for several next-hops on the interface
1648         self.multi_params = []
1649
1650         for ii in range(N_NHS):
1651             p = copy.copy(self.ipv4_params)
1652
1653             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1654             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1655             p.scapy_tun_spi = p.scapy_tun_spi + ii
1656             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1657             p.vpp_tun_spi = p.vpp_tun_spi + ii
1658
1659             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1660             p.scapy_tra_spi = p.scapy_tra_spi + ii
1661             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1662             p.vpp_tra_spi = p.vpp_tra_spi + ii
1663             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1664                                       p.auth_algo_vpp_id, p.auth_key,
1665                                       p.crypt_algo_vpp_id, p.crypt_key,
1666                                       self.vpp_esp_protocol)
1667             p.tun_sa_out.add_vpp_config()
1668
1669             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1670                                      p.auth_algo_vpp_id, p.auth_key,
1671                                      p.crypt_algo_vpp_id, p.crypt_key,
1672                                      self.vpp_esp_protocol)
1673             p.tun_sa_in.add_vpp_config()
1674
1675             p.tun_protect = VppIpsecTunProtect(
1676                 self,
1677                 p.tun_if,
1678                 p.tun_sa_out,
1679                 [p.tun_sa_in],
1680                 nh=p.tun_if.remote_hosts[ii].ip4)
1681             p.tun_protect.add_vpp_config()
1682             config_tra_params(p, self.encryption_type, p.tun_if)
1683             self.multi_params.append(p)
1684
1685             VppIpRoute(self, p.remote_tun_if_host, 32,
1686                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1687                                      p.tun_if.sw_if_index)]).add_vpp_config()
1688
1689             # in this v4 variant add the teibs after the protect
1690             p.teib = VppTeib(self, p.tun_if,
1691                              p.tun_if.remote_hosts[ii].ip4,
1692                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1693             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1694         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1695
1696     def tearDown(self):
1697         p = self.ipv4_params
1698         p.tun_if.unconfig_ip4()
1699         super(TestIpsecMGreIfEspTra4, self).tearDown()
1700
1701     def test_tun_44(self):
1702         """mGRE IPSEC 44"""
1703         N_PKTS = 63
1704         for p in self.multi_params:
1705             self.verify_tun_44(p, count=N_PKTS)
1706             p.teib.remove_vpp_config()
1707             self.verify_tun_dropped_44(p, count=N_PKTS)
1708             p.teib.add_vpp_config()
1709             self.verify_tun_44(p, count=N_PKTS)
1710
1711
1712 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1713     """ Ipsec mGRE ESP v6 TRA tests """
1714     tun6_encrypt_node_name = "esp6-encrypt-tun"
1715     tun6_decrypt_node_name = "esp6-decrypt-tun"
1716     encryption_type = ESP
1717
1718     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1719                           payload_size=100):
1720         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1721                 sa.encrypt(IPv6(src=p.tun_dst,
1722                                 dst=self.pg0.local_ip6) /
1723                            GRE() /
1724                            IPv6(src=self.pg1.local_ip6,
1725                                 dst=self.pg1.remote_ip6) /
1726                            UDP(sport=1144, dport=2233) /
1727                            Raw(b'X' * payload_size))
1728                 for i in range(count)]
1729
1730     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1731                   payload_size=100):
1732         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1733                 IPv6(src="1::1", dst=dst) /
1734                 UDP(sport=1144, dport=2233) /
1735                 Raw(b'X' * payload_size)
1736                 for i in range(count)]
1737
1738     def verify_decrypted6(self, p, rxs):
1739         for rx in rxs:
1740             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1741             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1742
1743     def verify_encrypted6(self, p, sa, rxs):
1744         for rx in rxs:
1745             try:
1746                 pkt = sa.decrypt(rx[IPv6])
1747                 if not pkt.haslayer(IPv6):
1748                     pkt = IPv6(pkt[Raw].load)
1749                 self.assert_packet_checksums_valid(pkt)
1750                 self.assertTrue(pkt.haslayer(GRE))
1751                 e = pkt[GRE]
1752                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1753             except (IndexError, AssertionError):
1754                 self.logger.debug(ppp("Unexpected packet:", rx))
1755                 try:
1756                     self.logger.debug(ppp("Decrypted packet:", pkt))
1757                 except:
1758                     pass
1759                 raise
1760
1761     def setUp(self):
1762         super(TestIpsecMGreIfEspTra6, self).setUp()
1763
1764         self.vapi.cli("set logging class ipsec level debug")
1765
1766         N_NHS = 16
1767         self.tun_if = self.pg0
1768         p = self.ipv6_params
1769         p.tun_if = VppGreInterface(self,
1770                                    self.pg0.local_ip6,
1771                                    "::",
1772                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1773                                          TUNNEL_API_MODE_MP))
1774         p.tun_if.add_vpp_config()
1775         p.tun_if.admin_up()
1776         p.tun_if.config_ip6()
1777         p.tun_if.generate_remote_hosts(N_NHS)
1778         self.pg0.generate_remote_hosts(N_NHS)
1779         self.pg0.configure_ipv6_neighbors()
1780
1781         # setup some SAs for several next-hops on the interface
1782         self.multi_params = []
1783
1784         for ii in range(N_NHS):
1785             p = copy.copy(self.ipv6_params)
1786
1787             p.remote_tun_if_host = "1::%d" % (ii + 1)
1788             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1789             p.scapy_tun_spi = p.scapy_tun_spi + ii
1790             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1791             p.vpp_tun_spi = p.vpp_tun_spi + ii
1792
1793             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1794             p.scapy_tra_spi = p.scapy_tra_spi + ii
1795             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1796             p.vpp_tra_spi = p.vpp_tra_spi + ii
1797             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1798                                       p.auth_algo_vpp_id, p.auth_key,
1799                                       p.crypt_algo_vpp_id, p.crypt_key,
1800                                       self.vpp_esp_protocol)
1801             p.tun_sa_out.add_vpp_config()
1802
1803             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1804                                      p.auth_algo_vpp_id, p.auth_key,
1805                                      p.crypt_algo_vpp_id, p.crypt_key,
1806                                      self.vpp_esp_protocol)
1807             p.tun_sa_in.add_vpp_config()
1808
1809             # in this v6 variant add the teibs first then the protection
1810             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1811             VppTeib(self, p.tun_if,
1812                     p.tun_if.remote_hosts[ii].ip6,
1813                     p.tun_dst).add_vpp_config()
1814
1815             p.tun_protect = VppIpsecTunProtect(
1816                 self,
1817                 p.tun_if,
1818                 p.tun_sa_out,
1819                 [p.tun_sa_in],
1820                 nh=p.tun_if.remote_hosts[ii].ip6)
1821             p.tun_protect.add_vpp_config()
1822             config_tra_params(p, self.encryption_type, p.tun_if)
1823             self.multi_params.append(p)
1824
1825             VppIpRoute(self, p.remote_tun_if_host, 128,
1826                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1827                                      p.tun_if.sw_if_index)]).add_vpp_config()
1828             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1829
1830         self.logger.info(self.vapi.cli("sh log"))
1831         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1832         self.logger.info(self.vapi.cli("sh adj 41"))
1833
1834     def tearDown(self):
1835         p = self.ipv6_params
1836         p.tun_if.unconfig_ip6()
1837         super(TestIpsecMGreIfEspTra6, self).tearDown()
1838
1839     def test_tun_66(self):
1840         """mGRE IPSec 66"""
1841         for p in self.multi_params:
1842             self.verify_tun_66(p, count=63)
1843
1844
1845 @tag_fixme_vpp_workers
1846 class TestIpsec4TunProtect(TemplateIpsec,
1847                            TemplateIpsec4TunProtect,
1848                            IpsecTun4):
1849     """ IPsec IPv4 Tunnel protect - transport mode"""
1850
1851     def setUp(self):
1852         super(TestIpsec4TunProtect, self).setUp()
1853
1854         self.tun_if = self.pg0
1855
1856     def tearDown(self):
1857         super(TestIpsec4TunProtect, self).tearDown()
1858
1859     def test_tun_44(self):
1860         """IPSEC tunnel protect"""
1861
1862         p = self.ipv4_params
1863
1864         self.config_network(p)
1865         self.config_sa_tra(p)
1866         self.config_protect(p)
1867
1868         self.verify_tun_44(p, count=127)
1869         c = p.tun_if.get_rx_stats()
1870         self.assertEqual(c['packets'], 127)
1871         c = p.tun_if.get_tx_stats()
1872         self.assertEqual(c['packets'], 127)
1873
1874         self.vapi.cli("clear ipsec sa")
1875         self.verify_tun_64(p, count=127)
1876         c = p.tun_if.get_rx_stats()
1877         self.assertEqual(c['packets'], 254)
1878         c = p.tun_if.get_tx_stats()
1879         self.assertEqual(c['packets'], 254)
1880
1881         # rekey - create new SAs and update the tunnel protection
1882         np = copy.copy(p)
1883         np.crypt_key = b'X' + p.crypt_key[1:]
1884         np.scapy_tun_spi += 100
1885         np.scapy_tun_sa_id += 1
1886         np.vpp_tun_spi += 100
1887         np.vpp_tun_sa_id += 1
1888         np.tun_if.local_spi = p.vpp_tun_spi
1889         np.tun_if.remote_spi = p.scapy_tun_spi
1890
1891         self.config_sa_tra(np)
1892         self.config_protect(np)
1893         self.unconfig_sa(p)
1894
1895         self.verify_tun_44(np, count=127)
1896         c = p.tun_if.get_rx_stats()
1897         self.assertEqual(c['packets'], 381)
1898         c = p.tun_if.get_tx_stats()
1899         self.assertEqual(c['packets'], 381)
1900
1901         # teardown
1902         self.unconfig_protect(np)
1903         self.unconfig_sa(np)
1904         self.unconfig_network(p)
1905
1906
1907 @tag_fixme_vpp_workers
1908 class TestIpsec4TunProtectUdp(TemplateIpsec,
1909                               TemplateIpsec4TunProtect,
1910                               IpsecTun4):
1911     """ IPsec IPv4 Tunnel protect - transport mode"""
1912
1913     def setUp(self):
1914         super(TestIpsec4TunProtectUdp, self).setUp()
1915
1916         self.tun_if = self.pg0
1917
1918         p = self.ipv4_params
1919         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1920                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1921         p.nat_header = UDP(sport=4500, dport=4500)
1922         self.config_network(p)
1923         self.config_sa_tra(p)
1924         self.config_protect(p)
1925
1926     def tearDown(self):
1927         p = self.ipv4_params
1928         self.unconfig_protect(p)
1929         self.unconfig_sa(p)
1930         self.unconfig_network(p)
1931         super(TestIpsec4TunProtectUdp, self).tearDown()
1932
1933     def verify_encrypted(self, p, sa, rxs):
1934         # ensure encrypted packets are recieved with the default UDP ports
1935         for rx in rxs:
1936             self.assertEqual(rx[UDP].sport, 4500)
1937             self.assertEqual(rx[UDP].dport, 4500)
1938         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1939
1940     def test_tun_44(self):
1941         """IPSEC UDP tunnel protect"""
1942
1943         p = self.ipv4_params
1944
1945         self.verify_tun_44(p, count=127)
1946         c = p.tun_if.get_rx_stats()
1947         self.assertEqual(c['packets'], 127)
1948         c = p.tun_if.get_tx_stats()
1949         self.assertEqual(c['packets'], 127)
1950
1951     def test_keepalive(self):
1952         """ IPSEC NAT Keepalive """
1953         self.verify_keepalive(self.ipv4_params)
1954
1955
1956 @tag_fixme_vpp_workers
1957 class TestIpsec4TunProtectTun(TemplateIpsec,
1958                               TemplateIpsec4TunProtect,
1959                               IpsecTun4):
1960     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1961
1962     encryption_type = ESP
1963     tun4_encrypt_node_name = "esp4-encrypt-tun"
1964     tun4_decrypt_node_name = "esp4-decrypt-tun"
1965
1966     def setUp(self):
1967         super(TestIpsec4TunProtectTun, self).setUp()
1968
1969         self.tun_if = self.pg0
1970
1971     def tearDown(self):
1972         super(TestIpsec4TunProtectTun, self).tearDown()
1973
1974     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1975                          payload_size=100):
1976         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1977                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1978                               dst=sw_intf.local_ip4) /
1979                            IP(src=src, dst=dst) /
1980                            UDP(sport=1144, dport=2233) /
1981                            Raw(b'X' * payload_size))
1982                 for i in range(count)]
1983
1984     def gen_pkts(self, sw_intf, src, dst, count=1,
1985                  payload_size=100):
1986         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1987                 IP(src=src, dst=dst) /
1988                 UDP(sport=1144, dport=2233) /
1989                 Raw(b'X' * payload_size)
1990                 for i in range(count)]
1991
1992     def verify_decrypted(self, p, rxs):
1993         for rx in rxs:
1994             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1995             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1996             self.assert_packet_checksums_valid(rx)
1997
1998     def verify_encrypted(self, p, sa, rxs):
1999         for rx in rxs:
2000             try:
2001                 pkt = sa.decrypt(rx[IP])
2002                 if not pkt.haslayer(IP):
2003                     pkt = IP(pkt[Raw].load)
2004                 self.assert_packet_checksums_valid(pkt)
2005                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2006                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2007                 inner = pkt[IP].payload
2008                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2009
2010             except (IndexError, AssertionError):
2011                 self.logger.debug(ppp("Unexpected packet:", rx))
2012                 try:
2013                     self.logger.debug(ppp("Decrypted packet:", pkt))
2014                 except:
2015                     pass
2016                 raise
2017
2018     def test_tun_44(self):
2019         """IPSEC tunnel protect """
2020
2021         p = self.ipv4_params
2022
2023         self.config_network(p)
2024         self.config_sa_tun(p)
2025         self.config_protect(p)
2026
2027         # also add an output features on the tunnel and physical interface
2028         # so we test they still work
2029         r_all = AclRule(True,
2030                         src_prefix="0.0.0.0/0",
2031                         dst_prefix="0.0.0.0/0",
2032                         proto=0)
2033         a = VppAcl(self, [r_all]).add_vpp_config()
2034
2035         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2036         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2037
2038         self.verify_tun_44(p, count=127)
2039
2040         c = p.tun_if.get_rx_stats()
2041         self.assertEqual(c['packets'], 127)
2042         c = p.tun_if.get_tx_stats()
2043         self.assertEqual(c['packets'], 127)
2044
2045         # rekey - create new SAs and update the tunnel protection
2046         np = copy.copy(p)
2047         np.crypt_key = b'X' + p.crypt_key[1:]
2048         np.scapy_tun_spi += 100
2049         np.scapy_tun_sa_id += 1
2050         np.vpp_tun_spi += 100
2051         np.vpp_tun_sa_id += 1
2052         np.tun_if.local_spi = p.vpp_tun_spi
2053         np.tun_if.remote_spi = p.scapy_tun_spi
2054
2055         self.config_sa_tun(np)
2056         self.config_protect(np)
2057         self.unconfig_sa(p)
2058
2059         self.verify_tun_44(np, count=127)
2060         c = p.tun_if.get_rx_stats()
2061         self.assertEqual(c['packets'], 254)
2062         c = p.tun_if.get_tx_stats()
2063         self.assertEqual(c['packets'], 254)
2064
2065         # teardown
2066         self.unconfig_protect(np)
2067         self.unconfig_sa(np)
2068         self.unconfig_network(p)
2069
2070
2071 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2072                                   TemplateIpsec4TunProtect,
2073                                   IpsecTun4):
2074     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2075
2076     encryption_type = ESP
2077     tun4_encrypt_node_name = "esp4-encrypt-tun"
2078     tun4_decrypt_node_name = "esp4-decrypt-tun"
2079
2080     def setUp(self):
2081         super(TestIpsec4TunProtectTunDrop, self).setUp()
2082
2083         self.tun_if = self.pg0
2084
2085     def tearDown(self):
2086         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2087
2088     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2089                          payload_size=100):
2090         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2091                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2092                               dst="5.5.5.5") /
2093                            IP(src=src, dst=dst) /
2094                            UDP(sport=1144, dport=2233) /
2095                            Raw(b'X' * payload_size))
2096                 for i in range(count)]
2097
2098     def test_tun_drop_44(self):
2099         """IPSEC tunnel protect bogus tunnel header """
2100
2101         p = self.ipv4_params
2102
2103         self.config_network(p)
2104         self.config_sa_tun(p)
2105         self.config_protect(p)
2106
2107         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2108                                    src=p.remote_tun_if_host,
2109                                    dst=self.pg1.remote_ip4,
2110                                    count=63)
2111         self.send_and_assert_no_replies(self.tun_if, tx)
2112
2113         # teardown
2114         self.unconfig_protect(p)
2115         self.unconfig_sa(p)
2116         self.unconfig_network(p)
2117
2118
2119 @tag_fixme_vpp_workers
2120 class TestIpsec6TunProtect(TemplateIpsec,
2121                            TemplateIpsec6TunProtect,
2122                            IpsecTun6):
2123     """ IPsec IPv6 Tunnel protect - transport mode"""
2124
2125     encryption_type = ESP
2126     tun6_encrypt_node_name = "esp6-encrypt-tun"
2127     tun6_decrypt_node_name = "esp6-decrypt-tun"
2128
2129     def setUp(self):
2130         super(TestIpsec6TunProtect, self).setUp()
2131
2132         self.tun_if = self.pg0
2133
2134     def tearDown(self):
2135         super(TestIpsec6TunProtect, self).tearDown()
2136
2137     def test_tun_66(self):
2138         """IPSEC tunnel protect 6o6"""
2139
2140         p = self.ipv6_params
2141
2142         self.config_network(p)
2143         self.config_sa_tra(p)
2144         self.config_protect(p)
2145
2146         self.verify_tun_66(p, count=127)
2147         c = p.tun_if.get_rx_stats()
2148         self.assertEqual(c['packets'], 127)
2149         c = p.tun_if.get_tx_stats()
2150         self.assertEqual(c['packets'], 127)
2151
2152         # rekey - create new SAs and update the tunnel protection
2153         np = copy.copy(p)
2154         np.crypt_key = b'X' + p.crypt_key[1:]
2155         np.scapy_tun_spi += 100
2156         np.scapy_tun_sa_id += 1
2157         np.vpp_tun_spi += 100
2158         np.vpp_tun_sa_id += 1
2159         np.tun_if.local_spi = p.vpp_tun_spi
2160         np.tun_if.remote_spi = p.scapy_tun_spi
2161
2162         self.config_sa_tra(np)
2163         self.config_protect(np)
2164         self.unconfig_sa(p)
2165
2166         self.verify_tun_66(np, count=127)
2167         c = p.tun_if.get_rx_stats()
2168         self.assertEqual(c['packets'], 254)
2169         c = p.tun_if.get_tx_stats()
2170         self.assertEqual(c['packets'], 254)
2171
2172         # bounce the interface state
2173         p.tun_if.admin_down()
2174         self.verify_drop_tun_66(np, count=127)
2175         node = ('/err/ipsec6-tun-input/%s' %
2176                 'ipsec packets received on disabled interface')
2177         self.assertEqual(127, self.statistics.get_err_counter(node))
2178         p.tun_if.admin_up()
2179         self.verify_tun_66(np, count=127)
2180
2181         # 3 phase rekey
2182         #  1) add two input SAs [old, new]
2183         #  2) swap output SA to [new]
2184         #  3) use only [new] input SA
2185         np3 = copy.copy(np)
2186         np3.crypt_key = b'Z' + p.crypt_key[1:]
2187         np3.scapy_tun_spi += 100
2188         np3.scapy_tun_sa_id += 1
2189         np3.vpp_tun_spi += 100
2190         np3.vpp_tun_sa_id += 1
2191         np3.tun_if.local_spi = p.vpp_tun_spi
2192         np3.tun_if.remote_spi = p.scapy_tun_spi
2193
2194         self.config_sa_tra(np3)
2195
2196         # step 1;
2197         p.tun_protect.update_vpp_config(np.tun_sa_out,
2198                                         [np.tun_sa_in, np3.tun_sa_in])
2199         self.verify_tun_66(np, np, count=127)
2200         self.verify_tun_66(np3, np, count=127)
2201
2202         # step 2;
2203         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2204                                         [np.tun_sa_in, np3.tun_sa_in])
2205         self.verify_tun_66(np, np3, count=127)
2206         self.verify_tun_66(np3, np3, count=127)
2207
2208         # step 1;
2209         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2210                                         [np3.tun_sa_in])
2211         self.verify_tun_66(np3, np3, count=127)
2212         self.verify_drop_tun_66(np, count=127)
2213
2214         c = p.tun_if.get_rx_stats()
2215         self.assertEqual(c['packets'], 127*9)
2216         c = p.tun_if.get_tx_stats()
2217         self.assertEqual(c['packets'], 127*8)
2218         self.unconfig_sa(np)
2219
2220         # teardown
2221         self.unconfig_protect(np3)
2222         self.unconfig_sa(np3)
2223         self.unconfig_network(p)
2224
2225     def test_tun_46(self):
2226         """IPSEC tunnel protect 4o6"""
2227
2228         p = self.ipv6_params
2229
2230         self.config_network(p)
2231         self.config_sa_tra(p)
2232         self.config_protect(p)
2233
2234         self.verify_tun_46(p, count=127)
2235         c = p.tun_if.get_rx_stats()
2236         self.assertEqual(c['packets'], 127)
2237         c = p.tun_if.get_tx_stats()
2238         self.assertEqual(c['packets'], 127)
2239
2240         # teardown
2241         self.unconfig_protect(p)
2242         self.unconfig_sa(p)
2243         self.unconfig_network(p)
2244
2245
2246 @tag_fixme_vpp_workers
2247 class TestIpsec6TunProtectTun(TemplateIpsec,
2248                               TemplateIpsec6TunProtect,
2249                               IpsecTun6):
2250     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2251
2252     encryption_type = ESP
2253     tun6_encrypt_node_name = "esp6-encrypt-tun"
2254     tun6_decrypt_node_name = "esp6-decrypt-tun"
2255
2256     def setUp(self):
2257         super(TestIpsec6TunProtectTun, self).setUp()
2258
2259         self.tun_if = self.pg0
2260
2261     def tearDown(self):
2262         super(TestIpsec6TunProtectTun, self).tearDown()
2263
2264     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2265                           payload_size=100):
2266         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2267                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2268                                 dst=sw_intf.local_ip6) /
2269                            IPv6(src=src, dst=dst) /
2270                            UDP(sport=1166, dport=2233) /
2271                            Raw(b'X' * payload_size))
2272                 for i in range(count)]
2273
2274     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2275                   payload_size=100):
2276         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2277                 IPv6(src=src, dst=dst) /
2278                 UDP(sport=1166, dport=2233) /
2279                 Raw(b'X' * payload_size)
2280                 for i in range(count)]
2281
2282     def verify_decrypted6(self, p, rxs):
2283         for rx in rxs:
2284             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2285             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2286             self.assert_packet_checksums_valid(rx)
2287
2288     def verify_encrypted6(self, p, sa, rxs):
2289         for rx in rxs:
2290             try:
2291                 pkt = sa.decrypt(rx[IPv6])
2292                 if not pkt.haslayer(IPv6):
2293                     pkt = IPv6(pkt[Raw].load)
2294                 self.assert_packet_checksums_valid(pkt)
2295                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2296                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2297                 inner = pkt[IPv6].payload
2298                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2299
2300             except (IndexError, AssertionError):
2301                 self.logger.debug(ppp("Unexpected packet:", rx))
2302                 try:
2303                     self.logger.debug(ppp("Decrypted packet:", pkt))
2304                 except:
2305                     pass
2306                 raise
2307
2308     def test_tun_66(self):
2309         """IPSEC tunnel protect """
2310
2311         p = self.ipv6_params
2312
2313         self.config_network(p)
2314         self.config_sa_tun(p)
2315         self.config_protect(p)
2316
2317         self.verify_tun_66(p, count=127)
2318
2319         c = p.tun_if.get_rx_stats()
2320         self.assertEqual(c['packets'], 127)
2321         c = p.tun_if.get_tx_stats()
2322         self.assertEqual(c['packets'], 127)
2323
2324         # rekey - create new SAs and update the tunnel protection
2325         np = copy.copy(p)
2326         np.crypt_key = b'X' + p.crypt_key[1:]
2327         np.scapy_tun_spi += 100
2328         np.scapy_tun_sa_id += 1
2329         np.vpp_tun_spi += 100
2330         np.vpp_tun_sa_id += 1
2331         np.tun_if.local_spi = p.vpp_tun_spi
2332         np.tun_if.remote_spi = p.scapy_tun_spi
2333
2334         self.config_sa_tun(np)
2335         self.config_protect(np)
2336         self.unconfig_sa(p)
2337
2338         self.verify_tun_66(np, count=127)
2339         c = p.tun_if.get_rx_stats()
2340         self.assertEqual(c['packets'], 254)
2341         c = p.tun_if.get_tx_stats()
2342         self.assertEqual(c['packets'], 254)
2343
2344         # teardown
2345         self.unconfig_protect(np)
2346         self.unconfig_sa(np)
2347         self.unconfig_network(p)
2348
2349
2350 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2351                                   TemplateIpsec6TunProtect,
2352                                   IpsecTun6):
2353     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2354
2355     encryption_type = ESP
2356     tun6_encrypt_node_name = "esp6-encrypt-tun"
2357     tun6_decrypt_node_name = "esp6-decrypt-tun"
2358
2359     def setUp(self):
2360         super(TestIpsec6TunProtectTunDrop, self).setUp()
2361
2362         self.tun_if = self.pg0
2363
2364     def tearDown(self):
2365         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2366
2367     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2368                           payload_size=100):
2369         # the IP destination of the revelaed packet does not match
2370         # that assigned to the tunnel
2371         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2372                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2373                                 dst="5::5") /
2374                            IPv6(src=src, dst=dst) /
2375                            UDP(sport=1144, dport=2233) /
2376                            Raw(b'X' * payload_size))
2377                 for i in range(count)]
2378
2379     def test_tun_drop_66(self):
2380         """IPSEC 6 tunnel protect bogus tunnel header """
2381
2382         p = self.ipv6_params
2383
2384         self.config_network(p)
2385         self.config_sa_tun(p)
2386         self.config_protect(p)
2387
2388         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2389                                     src=p.remote_tun_if_host,
2390                                     dst=self.pg1.remote_ip6,
2391                                     count=63)
2392         self.send_and_assert_no_replies(self.tun_if, tx)
2393
2394         self.unconfig_protect(p)
2395         self.unconfig_sa(p)
2396         self.unconfig_network(p)
2397
2398
2399 class TemplateIpsecItf4(object):
2400     """ IPsec Interface IPv4 """
2401
2402     encryption_type = ESP
2403     tun4_encrypt_node_name = "esp4-encrypt-tun"
2404     tun4_decrypt_node_name = "esp4-decrypt-tun"
2405     tun4_input_node = "ipsec4-tun-input"
2406
2407     def config_sa_tun(self, p, src, dst):
2408         config_tun_params(p, self.encryption_type, None, src, dst)
2409
2410         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2411                                   p.auth_algo_vpp_id, p.auth_key,
2412                                   p.crypt_algo_vpp_id, p.crypt_key,
2413                                   self.vpp_esp_protocol,
2414                                   src, dst,
2415                                   flags=p.flags)
2416         p.tun_sa_out.add_vpp_config()
2417
2418         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2419                                  p.auth_algo_vpp_id, p.auth_key,
2420                                  p.crypt_algo_vpp_id, p.crypt_key,
2421                                  self.vpp_esp_protocol,
2422                                  dst, src,
2423                                  flags=p.flags)
2424         p.tun_sa_in.add_vpp_config()
2425
2426     def config_protect(self, p):
2427         p.tun_protect = VppIpsecTunProtect(self,
2428                                            p.tun_if,
2429                                            p.tun_sa_out,
2430                                            [p.tun_sa_in])
2431         p.tun_protect.add_vpp_config()
2432
2433     def config_network(self, p, instance=0xffffffff):
2434         p.tun_if = VppIpsecInterface(self, instance=instance)
2435
2436         p.tun_if.add_vpp_config()
2437         p.tun_if.admin_up()
2438         p.tun_if.config_ip4()
2439         p.tun_if.config_ip6()
2440
2441         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2442                              [VppRoutePath(p.tun_if.remote_ip4,
2443                                            0xffffffff)])
2444         p.route.add_vpp_config()
2445         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2446                        [VppRoutePath(p.tun_if.remote_ip6,
2447                                      0xffffffff,
2448                                      proto=DpoProto.DPO_PROTO_IP6)])
2449         r.add_vpp_config()
2450
2451     def unconfig_network(self, p):
2452         p.route.remove_vpp_config()
2453         p.tun_if.remove_vpp_config()
2454
2455     def unconfig_protect(self, p):
2456         p.tun_protect.remove_vpp_config()
2457
2458     def unconfig_sa(self, p):
2459         p.tun_sa_out.remove_vpp_config()
2460         p.tun_sa_in.remove_vpp_config()
2461
2462
2463 @tag_fixme_vpp_workers
2464 class TestIpsecItf4(TemplateIpsec,
2465                     TemplateIpsecItf4,
2466                     IpsecTun4):
2467     """ IPsec Interface IPv4 """
2468
2469     def setUp(self):
2470         super(TestIpsecItf4, self).setUp()
2471
2472         self.tun_if = self.pg0
2473
2474     def tearDown(self):
2475         super(TestIpsecItf4, self).tearDown()
2476
2477     def test_tun_instance_44(self):
2478         p = self.ipv4_params
2479         self.config_network(p, instance=3)
2480
2481         with self.assertRaises(CliFailedCommandError):
2482             self.vapi.cli("show interface ipsec0")
2483
2484         output = self.vapi.cli("show interface ipsec3")
2485         self.assertTrue("unknown" not in output)
2486
2487         self.unconfig_network(p)
2488
2489     def test_tun_44(self):
2490         """IPSEC interface IPv4"""
2491
2492         n_pkts = 127
2493         p = self.ipv4_params
2494
2495         self.config_network(p)
2496         self.config_sa_tun(p,
2497                            self.pg0.local_ip4,
2498                            self.pg0.remote_ip4)
2499         self.config_protect(p)
2500
2501         self.verify_tun_44(p, count=n_pkts)
2502         c = p.tun_if.get_rx_stats()
2503         self.assertEqual(c['packets'], n_pkts)
2504         c = p.tun_if.get_tx_stats()
2505         self.assertEqual(c['packets'], n_pkts)
2506
2507         p.tun_if.admin_down()
2508         self.verify_tun_dropped_44(p, count=n_pkts)
2509         p.tun_if.admin_up()
2510         self.verify_tun_44(p, count=n_pkts)
2511
2512         c = p.tun_if.get_rx_stats()
2513         self.assertEqual(c['packets'], 3*n_pkts)
2514         c = p.tun_if.get_tx_stats()
2515         self.assertEqual(c['packets'], 2*n_pkts)
2516
2517         # it's a v6 packet when its encrypted
2518         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2519
2520         self.verify_tun_64(p, count=n_pkts)
2521         c = p.tun_if.get_rx_stats()
2522         self.assertEqual(c['packets'], 4*n_pkts)
2523         c = p.tun_if.get_tx_stats()
2524         self.assertEqual(c['packets'], 3*n_pkts)
2525
2526         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2527
2528         self.vapi.cli("clear interfaces")
2529
2530         # rekey - create new SAs and update the tunnel protection
2531         np = copy.copy(p)
2532         np.crypt_key = b'X' + p.crypt_key[1:]
2533         np.scapy_tun_spi += 100
2534         np.scapy_tun_sa_id += 1
2535         np.vpp_tun_spi += 100
2536         np.vpp_tun_sa_id += 1
2537         np.tun_if.local_spi = p.vpp_tun_spi
2538         np.tun_if.remote_spi = p.scapy_tun_spi
2539
2540         self.config_sa_tun(np,
2541                            self.pg0.local_ip4,
2542                            self.pg0.remote_ip4)
2543         self.config_protect(np)
2544         self.unconfig_sa(p)
2545
2546         self.verify_tun_44(np, count=n_pkts)
2547         c = p.tun_if.get_rx_stats()
2548         self.assertEqual(c['packets'], n_pkts)
2549         c = p.tun_if.get_tx_stats()
2550         self.assertEqual(c['packets'], n_pkts)
2551
2552         # teardown
2553         self.unconfig_protect(np)
2554         self.unconfig_sa(np)
2555         self.unconfig_network(p)
2556
2557     def test_tun_44_null(self):
2558         """IPSEC interface IPv4 NULL auth/crypto"""
2559
2560         n_pkts = 127
2561         p = copy.copy(self.ipv4_params)
2562
2563         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2564                               IPSEC_API_INTEG_ALG_NONE)
2565         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2566                                IPSEC_API_CRYPTO_ALG_NONE)
2567         p.crypt_algo = "NULL"
2568         p.auth_algo = "NULL"
2569
2570         self.config_network(p)
2571         self.config_sa_tun(p,
2572                            self.pg0.local_ip4,
2573                            self.pg0.remote_ip4)
2574         self.config_protect(p)
2575
2576         self.verify_tun_44(p, count=n_pkts)
2577
2578         # teardown
2579         self.unconfig_protect(p)
2580         self.unconfig_sa(p)
2581         self.unconfig_network(p)
2582
2583
2584 class TestIpsecItf4MPLS(TemplateIpsec,
2585                         TemplateIpsecItf4,
2586                         IpsecTun4):
2587     """ IPsec Interface MPLSoIPv4 """
2588
2589     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2590
2591     def setUp(self):
2592         super(TestIpsecItf4MPLS, self).setUp()
2593
2594         self.tun_if = self.pg0
2595
2596     def tearDown(self):
2597         super(TestIpsecItf4MPLS, self).tearDown()
2598
2599     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2600                          payload_size=100):
2601         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2602                 sa.encrypt(MPLS(label=44, ttl=3) /
2603                            IP(src=src, dst=dst) /
2604                            UDP(sport=1166, dport=2233) /
2605                            Raw(b'X' * payload_size))
2606                 for i in range(count)]
2607
2608     def verify_encrypted(self, p, sa, rxs):
2609         for rx in rxs:
2610             try:
2611                 pkt = sa.decrypt(rx[IP])
2612                 if not pkt.haslayer(IP):
2613                     pkt = IP(pkt[Raw].load)
2614                 self.assert_packet_checksums_valid(pkt)
2615                 self.assert_equal(pkt[MPLS].label, 44)
2616                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2617             except (IndexError, AssertionError):
2618                 self.logger.debug(ppp("Unexpected packet:", rx))
2619                 try:
2620                     self.logger.debug(ppp("Decrypted packet:", pkt))
2621                 except:
2622                     pass
2623                 raise
2624
2625     def test_tun_mpls_o_ip4(self):
2626         """IPSEC interface MPLS over IPv4"""
2627
2628         n_pkts = 127
2629         p = self.ipv4_params
2630         f = FibPathProto
2631
2632         tbl = VppMplsTable(self, 0)
2633         tbl.add_vpp_config()
2634
2635         self.config_network(p)
2636         # deag MPLS routes from the tunnel
2637         r4 = VppMplsRoute(self, 44, 1,
2638                           [VppRoutePath(
2639                               self.pg1.remote_ip4,
2640                               self.pg1.sw_if_index)]).add_vpp_config()
2641         p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2642                                      p.tun_if.sw_if_index,
2643                                      labels=[VppMplsLabel(44)])])
2644         p.tun_if.enable_mpls()
2645
2646         self.config_sa_tun(p,
2647                            self.pg0.local_ip4,
2648                            self.pg0.remote_ip4)
2649         self.config_protect(p)
2650
2651         self.verify_tun_44(p, count=n_pkts)
2652
2653         # cleanup
2654         p.tun_if.disable_mpls()
2655         self.unconfig_protect(p)
2656         self.unconfig_sa(p)
2657         self.unconfig_network(p)
2658
2659
2660 class TemplateIpsecItf6(object):
2661     """ IPsec Interface IPv6 """
2662
2663     encryption_type = ESP
2664     tun6_encrypt_node_name = "esp6-encrypt-tun"
2665     tun6_decrypt_node_name = "esp6-decrypt-tun"
2666     tun6_input_node = "ipsec6-tun-input"
2667
2668     def config_sa_tun(self, p, src, dst):
2669         config_tun_params(p, self.encryption_type, None, src, dst)
2670
2671         if not hasattr(p, 'tun_flags'):
2672             p.tun_flags = None
2673         if not hasattr(p, 'hop_limit'):
2674             p.hop_limit = 255
2675
2676         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2677                                   p.auth_algo_vpp_id, p.auth_key,
2678                                   p.crypt_algo_vpp_id, p.crypt_key,
2679                                   self.vpp_esp_protocol,
2680                                   src, dst,
2681                                   flags=p.flags,
2682                                   tun_flags=p.tun_flags,
2683                                   hop_limit=p.hop_limit)
2684         p.tun_sa_out.add_vpp_config()
2685
2686         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2687                                  p.auth_algo_vpp_id, p.auth_key,
2688                                  p.crypt_algo_vpp_id, p.crypt_key,
2689                                  self.vpp_esp_protocol,
2690                                  dst, src,
2691                                  flags=p.flags)
2692         p.tun_sa_in.add_vpp_config()
2693
2694     def config_protect(self, p):
2695         p.tun_protect = VppIpsecTunProtect(self,
2696                                            p.tun_if,
2697                                            p.tun_sa_out,
2698                                            [p.tun_sa_in])
2699         p.tun_protect.add_vpp_config()
2700
2701     def config_network(self, p):
2702         p.tun_if = VppIpsecInterface(self)
2703
2704         p.tun_if.add_vpp_config()
2705         p.tun_if.admin_up()
2706         p.tun_if.config_ip4()
2707         p.tun_if.config_ip6()
2708
2709         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2710                        [VppRoutePath(p.tun_if.remote_ip4,
2711                                      0xffffffff)])
2712         r.add_vpp_config()
2713
2714         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2715                              [VppRoutePath(p.tun_if.remote_ip6,
2716                                            0xffffffff,
2717                                            proto=DpoProto.DPO_PROTO_IP6)])
2718         p.route.add_vpp_config()
2719
2720     def unconfig_network(self, p):
2721         p.route.remove_vpp_config()
2722         p.tun_if.remove_vpp_config()
2723
2724     def unconfig_protect(self, p):
2725         p.tun_protect.remove_vpp_config()
2726
2727     def unconfig_sa(self, p):
2728         p.tun_sa_out.remove_vpp_config()
2729         p.tun_sa_in.remove_vpp_config()
2730
2731
2732 @tag_fixme_vpp_workers
2733 class TestIpsecItf6(TemplateIpsec,
2734                     TemplateIpsecItf6,
2735                     IpsecTun6):
2736     """ IPsec Interface IPv6 """
2737
2738     def setUp(self):
2739         super(TestIpsecItf6, self).setUp()
2740
2741         self.tun_if = self.pg0
2742
2743     def tearDown(self):
2744         super(TestIpsecItf6, self).tearDown()
2745
2746     def test_tun_44(self):
2747         """IPSEC interface IPv6"""
2748
2749         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2750         n_pkts = 127
2751         p = self.ipv6_params
2752         p.inner_hop_limit = 24
2753         p.outer_hop_limit = 23
2754         p.outer_flow_label = 243224
2755         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2756
2757         self.config_network(p)
2758         self.config_sa_tun(p,
2759                            self.pg0.local_ip6,
2760                            self.pg0.remote_ip6)
2761         self.config_protect(p)
2762
2763         self.verify_tun_66(p, count=n_pkts)
2764         c = p.tun_if.get_rx_stats()
2765         self.assertEqual(c['packets'], n_pkts)
2766         c = p.tun_if.get_tx_stats()
2767         self.assertEqual(c['packets'], n_pkts)
2768
2769         p.tun_if.admin_down()
2770         self.verify_drop_tun_66(p, count=n_pkts)
2771         p.tun_if.admin_up()
2772         self.verify_tun_66(p, count=n_pkts)
2773
2774         c = p.tun_if.get_rx_stats()
2775         self.assertEqual(c['packets'], 3*n_pkts)
2776         c = p.tun_if.get_tx_stats()
2777         self.assertEqual(c['packets'], 2*n_pkts)
2778
2779         # it's a v4 packet when its encrypted
2780         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2781
2782         self.verify_tun_46(p, count=n_pkts)
2783         c = p.tun_if.get_rx_stats()
2784         self.assertEqual(c['packets'], 4*n_pkts)
2785         c = p.tun_if.get_tx_stats()
2786         self.assertEqual(c['packets'], 3*n_pkts)
2787
2788         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2789
2790         self.vapi.cli("clear interfaces")
2791
2792         # rekey - create new SAs and update the tunnel protection
2793         np = copy.copy(p)
2794         np.crypt_key = b'X' + p.crypt_key[1:]
2795         np.scapy_tun_spi += 100
2796         np.scapy_tun_sa_id += 1
2797         np.vpp_tun_spi += 100
2798         np.vpp_tun_sa_id += 1
2799         np.tun_if.local_spi = p.vpp_tun_spi
2800         np.tun_if.remote_spi = p.scapy_tun_spi
2801         np.inner_hop_limit = 24
2802         np.outer_hop_limit = 128
2803         np.inner_flow_label = 0xabcde
2804         np.outer_flow_label = 0xabcde
2805         np.hop_limit = 128
2806         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2807
2808         self.config_sa_tun(np,
2809                            self.pg0.local_ip6,
2810                            self.pg0.remote_ip6)
2811         self.config_protect(np)
2812         self.unconfig_sa(p)
2813
2814         self.verify_tun_66(np, count=n_pkts)
2815         c = p.tun_if.get_rx_stats()
2816         self.assertEqual(c['packets'], n_pkts)
2817         c = p.tun_if.get_tx_stats()
2818         self.assertEqual(c['packets'], n_pkts)
2819
2820         # teardown
2821         self.unconfig_protect(np)
2822         self.unconfig_sa(np)
2823         self.unconfig_network(p)
2824
2825
2826 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2827     """ Ipsec P2MP ESP v4 tests """
2828     tun4_encrypt_node_name = "esp4-encrypt-tun"
2829     tun4_decrypt_node_name = "esp4-decrypt-tun"
2830     encryption_type = ESP
2831
2832     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2833                          payload_size=100):
2834         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2835                 sa.encrypt(IP(src=self.pg1.local_ip4,
2836                               dst=self.pg1.remote_ip4) /
2837                            UDP(sport=1144, dport=2233) /
2838                            Raw(b'X' * payload_size))
2839                 for i in range(count)]
2840
2841     def gen_pkts(self, sw_intf, src, dst, count=1,
2842                  payload_size=100):
2843         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2844                 IP(src="1.1.1.1", dst=dst) /
2845                 UDP(sport=1144, dport=2233) /
2846                 Raw(b'X' * payload_size)
2847                 for i in range(count)]
2848
2849     def verify_decrypted(self, p, rxs):
2850         for rx in rxs:
2851             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2852             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2853
2854     def verify_encrypted(self, p, sa, rxs):
2855         for rx in rxs:
2856             try:
2857                 self.assertEqual(rx[IP].tos,
2858                                  VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
2859                 self.assertEqual(rx[IP].ttl, p.hop_limit)
2860                 pkt = sa.decrypt(rx[IP])
2861                 if not pkt.haslayer(IP):
2862                     pkt = IP(pkt[Raw].load)
2863                 self.assert_packet_checksums_valid(pkt)
2864                 e = pkt[IP]
2865                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2866             except (IndexError, AssertionError):
2867                 self.logger.debug(ppp("Unexpected packet:", rx))
2868                 try:
2869                     self.logger.debug(ppp("Decrypted packet:", pkt))
2870                 except:
2871                     pass
2872                 raise
2873
2874     def setUp(self):
2875         super(TestIpsecMIfEsp4, self).setUp()
2876
2877         N_NHS = 16
2878         self.tun_if = self.pg0
2879         p = self.ipv4_params
2880         p.tun_if = VppIpsecInterface(self,
2881                                      mode=(VppEnum.vl_api_tunnel_mode_t.
2882                                            TUNNEL_API_MODE_MP))
2883         p.tun_if.add_vpp_config()
2884         p.tun_if.admin_up()
2885         p.tun_if.config_ip4()
2886         p.tun_if.unconfig_ip4()
2887         p.tun_if.config_ip4()
2888         p.tun_if.generate_remote_hosts(N_NHS)
2889         self.pg0.generate_remote_hosts(N_NHS)
2890         self.pg0.configure_ipv4_neighbors()
2891
2892         # setup some SAs for several next-hops on the interface
2893         self.multi_params = []
2894
2895         for ii in range(N_NHS):
2896             p = copy.copy(self.ipv4_params)
2897
2898             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2899             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2900             p.scapy_tun_spi = p.scapy_tun_spi + ii
2901             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2902             p.vpp_tun_spi = p.vpp_tun_spi + ii
2903
2904             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2905             p.scapy_tra_spi = p.scapy_tra_spi + ii
2906             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2907             p.vpp_tra_spi = p.vpp_tra_spi + ii
2908             p.hop_limit = ii+10
2909             p.tun_sa_out = VppIpsecSA(
2910                 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2911                 p.auth_algo_vpp_id, p.auth_key,
2912                 p.crypt_algo_vpp_id, p.crypt_key,
2913                 self.vpp_esp_protocol,
2914                 self.pg0.local_ip4,
2915                 self.pg0.remote_hosts[ii].ip4,
2916                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
2917                 hop_limit=p.hop_limit)
2918             p.tun_sa_out.add_vpp_config()
2919
2920             p.tun_sa_in = VppIpsecSA(
2921                 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2922                 p.auth_algo_vpp_id, p.auth_key,
2923                 p.crypt_algo_vpp_id, p.crypt_key,
2924                 self.vpp_esp_protocol,
2925                 self.pg0.remote_hosts[ii].ip4,
2926                 self.pg0.local_ip4,
2927                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
2928                 hop_limit=p.hop_limit)
2929             p.tun_sa_in.add_vpp_config()
2930
2931             p.tun_protect = VppIpsecTunProtect(
2932                 self,
2933                 p.tun_if,
2934                 p.tun_sa_out,
2935                 [p.tun_sa_in],
2936                 nh=p.tun_if.remote_hosts[ii].ip4)
2937             p.tun_protect.add_vpp_config()
2938             config_tun_params(p, self.encryption_type, None,
2939                               self.pg0.local_ip4,
2940                               self.pg0.remote_hosts[ii].ip4)
2941             self.multi_params.append(p)
2942
2943             VppIpRoute(self, p.remote_tun_if_host, 32,
2944                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2945                                      p.tun_if.sw_if_index)]).add_vpp_config()
2946
2947             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2948
2949     def tearDown(self):
2950         p = self.ipv4_params
2951         p.tun_if.unconfig_ip4()
2952         super(TestIpsecMIfEsp4, self).tearDown()
2953
2954     def test_tun_44(self):
2955         """P2MP IPSEC 44"""
2956         N_PKTS = 63
2957         for p in self.multi_params:
2958             self.verify_tun_44(p, count=N_PKTS)
2959
2960
2961 class TestIpsecItf6MPLS(TemplateIpsec,
2962                         TemplateIpsecItf6,
2963                         IpsecTun6):
2964     """ IPsec Interface MPLSoIPv6 """
2965
2966     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
2967
2968     def setUp(self):
2969         super(TestIpsecItf6MPLS, self).setUp()
2970
2971         self.tun_if = self.pg0
2972
2973     def tearDown(self):
2974         super(TestIpsecItf6MPLS, self).tearDown()
2975
2976     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2977                           payload_size=100):
2978         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2979                 sa.encrypt(MPLS(label=66, ttl=3) /
2980                            IPv6(src=src, dst=dst) /
2981                            UDP(sport=1166, dport=2233) /
2982                            Raw(b'X' * payload_size))
2983                 for i in range(count)]
2984
2985     def verify_encrypted6(self, p, sa, rxs):
2986         for rx in rxs:
2987             try:
2988                 pkt = sa.decrypt(rx[IPv6])
2989                 if not pkt.haslayer(IPv6):
2990                     pkt = IP(pkt[Raw].load)
2991                 self.assert_packet_checksums_valid(pkt)
2992                 self.assert_equal(pkt[MPLS].label, 66)
2993                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
2994             except (IndexError, AssertionError):
2995                 self.logger.debug(ppp("Unexpected packet:", rx))
2996                 try:
2997                     self.logger.debug(ppp("Decrypted packet:", pkt))
2998                 except:
2999                     pass
3000                 raise
3001
3002     def test_tun_mpls_o_ip6(self):
3003         """IPSEC interface MPLS over IPv6"""
3004
3005         n_pkts = 127
3006         p = self.ipv6_params
3007         f = FibPathProto
3008
3009         tbl = VppMplsTable(self, 0)
3010         tbl.add_vpp_config()
3011
3012         self.config_network(p)
3013         # deag MPLS routes from the tunnel
3014         r6 = VppMplsRoute(self, 66, 1,
3015                           [VppRoutePath(
3016                               self.pg1.remote_ip6,
3017                               self.pg1.sw_if_index)],
3018                           eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3019         p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3020                                      p.tun_if.sw_if_index,
3021                                      labels=[VppMplsLabel(66)])])
3022         p.tun_if.enable_mpls()
3023
3024         self.config_sa_tun(p,
3025                            self.pg0.local_ip6,
3026                            self.pg0.remote_ip6)
3027         self.config_protect(p)
3028
3029         self.verify_tun_66(p, count=n_pkts)
3030
3031         # cleanup
3032         p.tun_if.disable_mpls()
3033         self.unconfig_protect(p)
3034         self.unconfig_sa(p)
3035         self.unconfig_network(p)
3036
3037
3038 if __name__ == '__main__':
3039     unittest.main(testRunner=VppTestRunner)