ipsec: Deprecated the old IPsec Tunnel interface
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_gre_interface import VppGreInterface
15 from vpp_ipip_tun_interface import VppIpIpTunInterface
16 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
17 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
18 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
20 from vpp_teib import VppTeib
21 from util import ppp
22 from vpp_papi import VppEnum
23 from vpp_papi_provider import CliFailedCommandError
24 from vpp_acl import AclRule, VppAcl, VppAclInterface
25
26
27 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
28     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
29     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
30                              IPSEC_API_SAD_FLAG_USE_ESN))
31     crypt_key = mk_scapy_crypt_key(p)
32     if tun_if:
33         p.tun_dst = tun_if.remote_ip
34         p.tun_src = tun_if.local_ip
35     else:
36         p.tun_dst = dst
37         p.tun_src = src
38
39     p.scapy_tun_sa = SecurityAssociation(
40         encryption_type, spi=p.vpp_tun_spi,
41         crypt_algo=p.crypt_algo,
42         crypt_key=crypt_key,
43         auth_algo=p.auth_algo, auth_key=p.auth_key,
44         tunnel_header=ip_class_by_addr_type[p.addr_type](
45             src=p.tun_dst,
46             dst=p.tun_src),
47         nat_t_header=p.nat_header,
48         esn_en=esn_en)
49     p.vpp_tun_sa = SecurityAssociation(
50         encryption_type, spi=p.scapy_tun_spi,
51         crypt_algo=p.crypt_algo,
52         crypt_key=crypt_key,
53         auth_algo=p.auth_algo, auth_key=p.auth_key,
54         tunnel_header=ip_class_by_addr_type[p.addr_type](
55             dst=p.tun_dst,
56             src=p.tun_src),
57         nat_t_header=p.nat_header,
58         esn_en=esn_en)
59
60
61 def config_tra_params(p, encryption_type, tun_if):
62     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
63     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
64                              IPSEC_API_SAD_FLAG_USE_ESN))
65     crypt_key = mk_scapy_crypt_key(p)
66     p.tun_dst = tun_if.remote_ip
67     p.tun_src = tun_if.local_ip
68     p.scapy_tun_sa = SecurityAssociation(
69         encryption_type, spi=p.vpp_tun_spi,
70         crypt_algo=p.crypt_algo,
71         crypt_key=crypt_key,
72         auth_algo=p.auth_algo, auth_key=p.auth_key,
73         esn_en=esn_en,
74         nat_t_header=p.nat_header)
75     p.vpp_tun_sa = SecurityAssociation(
76         encryption_type, spi=p.scapy_tun_spi,
77         crypt_algo=p.crypt_algo,
78         crypt_key=crypt_key,
79         auth_algo=p.auth_algo, auth_key=p.auth_key,
80         esn_en=esn_en,
81         nat_t_header=p.nat_header)
82
83
84 class TemplateIpsec4TunProtect(object):
85     """ IPsec IPv4 Tunnel protect """
86
87     encryption_type = ESP
88     tun4_encrypt_node_name = "esp4-encrypt-tun"
89     tun4_decrypt_node_name = "esp4-decrypt-tun"
90     tun4_input_node = "ipsec4-tun-input"
91
92     def config_sa_tra(self, p):
93         config_tun_params(p, self.encryption_type, p.tun_if)
94
95         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
96                                   p.auth_algo_vpp_id, p.auth_key,
97                                   p.crypt_algo_vpp_id, p.crypt_key,
98                                   self.vpp_esp_protocol,
99                                   flags=p.flags)
100         p.tun_sa_out.add_vpp_config()
101
102         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
103                                  p.auth_algo_vpp_id, p.auth_key,
104                                  p.crypt_algo_vpp_id, p.crypt_key,
105                                  self.vpp_esp_protocol,
106                                  flags=p.flags)
107         p.tun_sa_in.add_vpp_config()
108
109     def config_sa_tun(self, p):
110         config_tun_params(p, self.encryption_type, p.tun_if)
111
112         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
113                                   p.auth_algo_vpp_id, p.auth_key,
114                                   p.crypt_algo_vpp_id, p.crypt_key,
115                                   self.vpp_esp_protocol,
116                                   self.tun_if.local_addr[p.addr_type],
117                                   self.tun_if.remote_addr[p.addr_type],
118                                   flags=p.flags)
119         p.tun_sa_out.add_vpp_config()
120
121         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
122                                  p.auth_algo_vpp_id, p.auth_key,
123                                  p.crypt_algo_vpp_id, p.crypt_key,
124                                  self.vpp_esp_protocol,
125                                  self.tun_if.remote_addr[p.addr_type],
126                                  self.tun_if.local_addr[p.addr_type],
127                                  flags=p.flags)
128         p.tun_sa_in.add_vpp_config()
129
130     def config_protect(self, p):
131         p.tun_protect = VppIpsecTunProtect(self,
132                                            p.tun_if,
133                                            p.tun_sa_out,
134                                            [p.tun_sa_in])
135         p.tun_protect.add_vpp_config()
136
137     def config_network(self, p):
138         if hasattr(p, 'tun_dst'):
139             tun_dst = p.tun_dst
140         else:
141             tun_dst = self.pg0.remote_ip4
142         p.tun_if = VppIpIpTunInterface(self, self.pg0,
143                                        self.pg0.local_ip4,
144                                        tun_dst)
145         p.tun_if.add_vpp_config()
146         p.tun_if.admin_up()
147         p.tun_if.config_ip4()
148         p.tun_if.config_ip6()
149
150         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
151                              [VppRoutePath(p.tun_if.remote_ip4,
152                                            0xffffffff)])
153         p.route.add_vpp_config()
154         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
155                        [VppRoutePath(p.tun_if.remote_ip6,
156                                      0xffffffff,
157                                      proto=DpoProto.DPO_PROTO_IP6)])
158         r.add_vpp_config()
159
160     def unconfig_network(self, p):
161         p.route.remove_vpp_config()
162         p.tun_if.remove_vpp_config()
163
164     def unconfig_protect(self, p):
165         p.tun_protect.remove_vpp_config()
166
167     def unconfig_sa(self, p):
168         p.tun_sa_out.remove_vpp_config()
169         p.tun_sa_in.remove_vpp_config()
170
171
172 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
173                              TemplateIpsec):
174     """ IPsec tunnel interface tests """
175
176     encryption_type = ESP
177
178     @classmethod
179     def setUpClass(cls):
180         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
181
182     @classmethod
183     def tearDownClass(cls):
184         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
185
186     def setUp(self):
187         super(TemplateIpsec4TunIfEsp, self).setUp()
188
189         self.tun_if = self.pg0
190
191         p = self.ipv4_params
192
193         self.config_network(p)
194         self.config_sa_tra(p)
195         self.config_protect(p)
196
197     def tearDown(self):
198         super(TemplateIpsec4TunIfEsp, self).tearDown()
199
200
201 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
202                                 TemplateIpsec):
203     """ IPsec UDP tunnel interface tests """
204
205     tun4_encrypt_node_name = "esp4-encrypt-tun"
206     tun4_decrypt_node_name = "esp4-decrypt-tun"
207     encryption_type = ESP
208
209     @classmethod
210     def setUpClass(cls):
211         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
212
213     @classmethod
214     def tearDownClass(cls):
215         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
216
217     def verify_encrypted(self, p, sa, rxs):
218         for rx in rxs:
219             try:
220                 # ensure the UDP ports are correct before we decrypt
221                 # which strips them
222                 self.assertTrue(rx.haslayer(UDP))
223                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
224                 self.assert_equal(rx[UDP].dport, 4500)
225
226                 pkt = sa.decrypt(rx[IP])
227                 if not pkt.haslayer(IP):
228                     pkt = IP(pkt[Raw].load)
229
230                 self.assert_packet_checksums_valid(pkt)
231                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
232                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
233             except (IndexError, AssertionError):
234                 self.logger.debug(ppp("Unexpected packet:", rx))
235                 try:
236                     self.logger.debug(ppp("Decrypted packet:", pkt))
237                 except:
238                     pass
239                 raise
240
241     def config_sa_tra(self, p):
242         config_tun_params(p, self.encryption_type, p.tun_if)
243
244         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
245                                   p.auth_algo_vpp_id, p.auth_key,
246                                   p.crypt_algo_vpp_id, p.crypt_key,
247                                   self.vpp_esp_protocol,
248                                   flags=p.flags,
249                                   udp_src=p.nat_header.sport,
250                                   udp_dst=p.nat_header.dport)
251         p.tun_sa_out.add_vpp_config()
252
253         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
254                                  p.auth_algo_vpp_id, p.auth_key,
255                                  p.crypt_algo_vpp_id, p.crypt_key,
256                                  self.vpp_esp_protocol,
257                                  flags=p.flags,
258                                  udp_src=p.nat_header.sport,
259                                  udp_dst=p.nat_header.dport)
260         p.tun_sa_in.add_vpp_config()
261
262     def setUp(self):
263         super(TemplateIpsec4TunIfEspUdp, self).setUp()
264
265         p = self.ipv4_params
266         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
267                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
268         p.nat_header = UDP(sport=5454, dport=4500)
269
270         self.tun_if = self.pg0
271
272         self.config_network(p)
273         self.config_sa_tra(p)
274         self.config_protect(p)
275
276     def tearDown(self):
277         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
278
279
280 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
281     """ Ipsec ESP - TUN tests """
282     tun4_encrypt_node_name = "esp4-encrypt-tun"
283     tun4_decrypt_node_name = "esp4-decrypt-tun"
284
285     def test_tun_basic64(self):
286         """ ipsec 6o4 tunnel basic test """
287         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
288
289         self.verify_tun_64(self.params[socket.AF_INET], count=1)
290
291     def test_tun_burst64(self):
292         """ ipsec 6o4 tunnel basic test """
293         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
294
295         self.verify_tun_64(self.params[socket.AF_INET], count=257)
296
297     def test_tun_basic_frag44(self):
298         """ ipsec 4o4 tunnel frag basic test """
299         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
300
301         p = self.ipv4_params
302
303         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
304                                        [1500, 0, 0, 0])
305         self.verify_tun_44(self.params[socket.AF_INET],
306                            count=1, payload_size=1800, n_rx=2)
307         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
308                                        [9000, 0, 0, 0])
309
310
311 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
312     """ Ipsec ESP UDP tests """
313
314     tun4_input_node = "ipsec4-tun-input"
315
316     def setUp(self):
317         super(TestIpsec4TunIfEspUdp, self).setUp()
318
319     def test_keepalive(self):
320         """ IPSEC NAT Keepalive """
321         self.verify_keepalive(self.ipv4_params)
322
323
324 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
325     """ Ipsec ESP UDP GCM tests """
326
327     tun4_input_node = "ipsec4-tun-input"
328
329     def setUp(self):
330         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
331         p = self.ipv4_params
332         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
333                               IPSEC_API_INTEG_ALG_NONE)
334         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
335                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
336         p.crypt_algo = "AES-GCM"
337         p.auth_algo = "NULL"
338         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
339         p.salt = 0
340
341
342 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
343     """ Ipsec ESP - TCP tests """
344     pass
345
346
347 class TemplateIpsec6TunProtect(object):
348     """ IPsec IPv6 Tunnel protect """
349
350     def config_sa_tra(self, p):
351         config_tun_params(p, self.encryption_type, p.tun_if)
352
353         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
354                                   p.auth_algo_vpp_id, p.auth_key,
355                                   p.crypt_algo_vpp_id, p.crypt_key,
356                                   self.vpp_esp_protocol)
357         p.tun_sa_out.add_vpp_config()
358
359         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
360                                  p.auth_algo_vpp_id, p.auth_key,
361                                  p.crypt_algo_vpp_id, p.crypt_key,
362                                  self.vpp_esp_protocol)
363         p.tun_sa_in.add_vpp_config()
364
365     def config_sa_tun(self, p):
366         config_tun_params(p, self.encryption_type, p.tun_if)
367
368         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
369                                   p.auth_algo_vpp_id, p.auth_key,
370                                   p.crypt_algo_vpp_id, p.crypt_key,
371                                   self.vpp_esp_protocol,
372                                   self.tun_if.local_addr[p.addr_type],
373                                   self.tun_if.remote_addr[p.addr_type])
374         p.tun_sa_out.add_vpp_config()
375
376         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
377                                  p.auth_algo_vpp_id, p.auth_key,
378                                  p.crypt_algo_vpp_id, p.crypt_key,
379                                  self.vpp_esp_protocol,
380                                  self.tun_if.remote_addr[p.addr_type],
381                                  self.tun_if.local_addr[p.addr_type])
382         p.tun_sa_in.add_vpp_config()
383
384     def config_protect(self, p):
385         p.tun_protect = VppIpsecTunProtect(self,
386                                            p.tun_if,
387                                            p.tun_sa_out,
388                                            [p.tun_sa_in])
389         p.tun_protect.add_vpp_config()
390
391     def config_network(self, p):
392         if hasattr(p, 'tun_dst'):
393             tun_dst = p.tun_dst
394         else:
395             tun_dst = self.pg0.remote_ip6
396         p.tun_if = VppIpIpTunInterface(self, self.pg0,
397                                        self.pg0.local_ip6,
398                                        tun_dst)
399         p.tun_if.add_vpp_config()
400         p.tun_if.admin_up()
401         p.tun_if.config_ip6()
402         p.tun_if.config_ip4()
403
404         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
405                              [VppRoutePath(p.tun_if.remote_ip6,
406                                            0xffffffff,
407                                            proto=DpoProto.DPO_PROTO_IP6)])
408         p.route.add_vpp_config()
409         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
410                        [VppRoutePath(p.tun_if.remote_ip4,
411                                      0xffffffff)])
412         r.add_vpp_config()
413
414     def unconfig_network(self, p):
415         p.route.remove_vpp_config()
416         p.tun_if.remove_vpp_config()
417
418     def unconfig_protect(self, p):
419         p.tun_protect.remove_vpp_config()
420
421     def unconfig_sa(self, p):
422         p.tun_sa_out.remove_vpp_config()
423         p.tun_sa_in.remove_vpp_config()
424
425
426 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
427                              TemplateIpsec):
428     """ IPsec tunnel interface tests """
429
430     encryption_type = ESP
431
432     def setUp(self):
433         super(TemplateIpsec6TunIfEsp, self).setUp()
434
435         self.tun_if = self.pg0
436
437         p = self.ipv6_params
438         self.config_network(p)
439         self.config_sa_tra(p)
440         self.config_protect(p)
441
442     def tearDown(self):
443         super(TemplateIpsec6TunIfEsp, self).tearDown()
444
445
446 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
447                           IpsecTun6Tests):
448     """ Ipsec ESP - TUN tests """
449     tun6_encrypt_node_name = "esp6-encrypt-tun"
450     tun6_decrypt_node_name = "esp6-decrypt-tun"
451
452     def test_tun_basic46(self):
453         """ ipsec 4o6 tunnel basic test """
454         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
455         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
456
457     def test_tun_burst46(self):
458         """ ipsec 4o6 tunnel burst test """
459         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
460         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
461
462
463 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
464                                 IpsecTun6HandoffTests):
465     """ Ipsec ESP 6 Handoff tests """
466     tun6_encrypt_node_name = "esp6-encrypt-tun"
467     tun6_decrypt_node_name = "esp6-decrypt-tun"
468
469
470 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
471                                 IpsecTun4HandoffTests):
472     """ Ipsec ESP 4 Handoff tests """
473     tun4_encrypt_node_name = "esp4-encrypt-tun"
474     tun4_decrypt_node_name = "esp4-decrypt-tun"
475
476
477 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
478                               TemplateIpsec,
479                               IpsecTun4):
480     """ IPsec IPv4 Multi Tunnel interface """
481
482     encryption_type = ESP
483     tun4_encrypt_node_name = "esp4-encrypt-tun"
484     tun4_decrypt_node_name = "esp4-decrypt-tun"
485
486     def setUp(self):
487         super(TestIpsec4MultiTunIfEsp, self).setUp()
488
489         self.tun_if = self.pg0
490
491         self.multi_params = []
492         self.pg0.generate_remote_hosts(10)
493         self.pg0.configure_ipv4_neighbors()
494
495         for ii in range(10):
496             p = copy.copy(self.ipv4_params)
497
498             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
499             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
500             p.scapy_tun_spi = p.scapy_tun_spi + ii
501             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
502             p.vpp_tun_spi = p.vpp_tun_spi + ii
503
504             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
505             p.scapy_tra_spi = p.scapy_tra_spi + ii
506             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
507             p.vpp_tra_spi = p.vpp_tra_spi + ii
508             p.tun_dst = self.pg0.remote_hosts[ii].ip4
509
510             self.multi_params.append(p)
511             self.config_network(p)
512             self.config_sa_tra(p)
513             self.config_protect(p)
514
515     def tearDown(self):
516         super(TestIpsec4MultiTunIfEsp, self).tearDown()
517
518     def test_tun_44(self):
519         """Multiple IPSEC tunnel interfaces """
520         for p in self.multi_params:
521             self.verify_tun_44(p, count=127)
522             c = p.tun_if.get_rx_stats()
523             self.assertEqual(c['packets'], 127)
524             c = p.tun_if.get_tx_stats()
525             self.assertEqual(c['packets'], 127)
526
527     def test_tun_rr_44(self):
528         """ Round-robin packets acrros multiple interface """
529         tx = []
530         for p in self.multi_params:
531             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
532                                             src=p.remote_tun_if_host,
533                                             dst=self.pg1.remote_ip4)
534         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
535
536         for rx, p in zip(rxs, self.multi_params):
537             self.verify_decrypted(p, [rx])
538
539         tx = []
540         for p in self.multi_params:
541             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
542                                     dst=p.remote_tun_if_host)
543         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
544
545         for rx, p in zip(rxs, self.multi_params):
546             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
547
548
549 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
550                             TemplateIpsec,
551                             IpsecTun4):
552     """ IPsec IPv4 Tunnel interface all Algos """
553
554     encryption_type = ESP
555     tun4_encrypt_node_name = "esp4-encrypt-tun"
556     tun4_decrypt_node_name = "esp4-decrypt-tun"
557
558     def setUp(self):
559         super(TestIpsec4TunIfEspAll, self).setUp()
560
561         self.tun_if = self.pg0
562         p = self.ipv4_params
563
564         self.config_network(p)
565         self.config_sa_tra(p)
566         self.config_protect(p)
567
568     def tearDown(self):
569         p = self.ipv4_params
570         self.unconfig_protect(p)
571         self.unconfig_network(p)
572         self.unconfig_sa(p)
573
574         super(TestIpsec4TunIfEspAll, self).tearDown()
575
576     def rekey(self, p):
577         #
578         # change the key and the SPI
579         #
580         np = copy.copy(p)
581         p.crypt_key = b'X' + p.crypt_key[1:]
582         p.scapy_tun_spi += 1
583         p.scapy_tun_sa_id += 1
584         p.vpp_tun_spi += 1
585         p.vpp_tun_sa_id += 1
586         p.tun_if.local_spi = p.vpp_tun_spi
587         p.tun_if.remote_spi = p.scapy_tun_spi
588
589         config_tun_params(p, self.encryption_type, p.tun_if)
590
591         p.tun_sa_out = VppIpsecSA(self,
592                                   p.scapy_tun_sa_id,
593                                   p.scapy_tun_spi,
594                                   p.auth_algo_vpp_id,
595                                   p.auth_key,
596                                   p.crypt_algo_vpp_id,
597                                   p.crypt_key,
598                                   self.vpp_esp_protocol,
599                                   flags=p.flags,
600                                   salt=p.salt)
601         p.tun_sa_in = VppIpsecSA(self,
602                                  p.vpp_tun_sa_id,
603                                  p.vpp_tun_spi,
604                                  p.auth_algo_vpp_id,
605                                  p.auth_key,
606                                  p.crypt_algo_vpp_id,
607                                  p.crypt_key,
608                                  self.vpp_esp_protocol,
609                                  flags=p.flags,
610                                  salt=p.salt)
611         p.tun_sa_in.add_vpp_config()
612         p.tun_sa_out.add_vpp_config()
613
614         self.config_protect(p)
615         np.tun_sa_out.remove_vpp_config()
616         np.tun_sa_in.remove_vpp_config()
617         self.logger.info(self.vapi.cli("sh ipsec sa"))
618
619     def test_tun_44(self):
620         """IPSEC tunnel all algos """
621
622         # foreach VPP crypto engine
623         engines = ["ia32", "ipsecmb", "openssl"]
624
625         # foreach crypto algorithm
626         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
627                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
628                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
629                                 IPSEC_API_INTEG_ALG_NONE),
630                   'scapy-crypto': "AES-GCM",
631                   'scapy-integ': "NULL",
632                   'key': b"JPjyOWBeVEQiMe7h",
633                   'salt': 3333},
634                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
635                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
636                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
637                                 IPSEC_API_INTEG_ALG_NONE),
638                   'scapy-crypto': "AES-GCM",
639                   'scapy-integ': "NULL",
640                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
641                   'salt': 0},
642                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
643                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
644                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
645                                 IPSEC_API_INTEG_ALG_NONE),
646                   'scapy-crypto': "AES-GCM",
647                   'scapy-integ': "NULL",
648                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
649                   'salt': 9999},
650                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
651                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
652                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
653                                 IPSEC_API_INTEG_ALG_SHA1_96),
654                   'scapy-crypto': "AES-CBC",
655                   'scapy-integ': "HMAC-SHA1-96",
656                   'salt': 0,
657                   'key': b"JPjyOWBeVEQiMe7h"},
658                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
659                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
660                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
661                                 IPSEC_API_INTEG_ALG_SHA_512_256),
662                   'scapy-crypto': "AES-CBC",
663                   'scapy-integ': "SHA2-512-256",
664                   'salt': 0,
665                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
666                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
667                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
668                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
669                                 IPSEC_API_INTEG_ALG_SHA_256_128),
670                   'scapy-crypto': "AES-CBC",
671                   'scapy-integ': "SHA2-256-128",
672                   'salt': 0,
673                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
674                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
675                                  IPSEC_API_CRYPTO_ALG_NONE),
676                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
677                                 IPSEC_API_INTEG_ALG_SHA1_96),
678                   'scapy-crypto': "NULL",
679                   'scapy-integ': "HMAC-SHA1-96",
680                   'salt': 0,
681                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
682
683         for engine in engines:
684             self.vapi.cli("set crypto handler all %s" % engine)
685
686             #
687             # loop through each of the algorithms
688             #
689             for algo in algos:
690                 # with self.subTest(algo=algo['scapy']):
691
692                 p = self.ipv4_params
693                 p.auth_algo_vpp_id = algo['vpp-integ']
694                 p.crypt_algo_vpp_id = algo['vpp-crypto']
695                 p.crypt_algo = algo['scapy-crypto']
696                 p.auth_algo = algo['scapy-integ']
697                 p.crypt_key = algo['key']
698                 p.salt = algo['salt']
699
700                 #
701                 # rekey the tunnel
702                 #
703                 self.rekey(p)
704                 self.verify_tun_44(p, count=127)
705
706
707 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
708                                TemplateIpsec,
709                                IpsecTun4):
710     """ IPsec IPv4 Tunnel interface no Algos """
711
712     encryption_type = ESP
713     tun4_encrypt_node_name = "esp4-encrypt-tun"
714     tun4_decrypt_node_name = "esp4-decrypt-tun"
715
716     def setUp(self):
717         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
718
719         self.tun_if = self.pg0
720         p = self.ipv4_params
721         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
722                               IPSEC_API_INTEG_ALG_NONE)
723         p.auth_algo = 'NULL'
724         p.auth_key = []
725
726         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
727                                IPSEC_API_CRYPTO_ALG_NONE)
728         p.crypt_algo = 'NULL'
729         p.crypt_key = []
730
731     def tearDown(self):
732         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
733
734     def test_tun_44(self):
735         """ IPSec SA with NULL algos """
736         p = self.ipv4_params
737
738         self.config_network(p)
739         self.config_sa_tra(p)
740         self.config_protect(p)
741
742         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
743                            dst=p.remote_tun_if_host)
744         self.send_and_assert_no_replies(self.pg1, tx)
745
746         self.unconfig_protect(p)
747         self.unconfig_sa(p)
748         self.unconfig_network(p)
749
750
751 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
752                               TemplateIpsec,
753                               IpsecTun6):
754     """ IPsec IPv6 Multi Tunnel interface """
755
756     encryption_type = ESP
757     tun6_encrypt_node_name = "esp6-encrypt-tun"
758     tun6_decrypt_node_name = "esp6-decrypt-tun"
759
760     def setUp(self):
761         super(TestIpsec6MultiTunIfEsp, self).setUp()
762
763         self.tun_if = self.pg0
764
765         self.multi_params = []
766         self.pg0.generate_remote_hosts(10)
767         self.pg0.configure_ipv6_neighbors()
768
769         for ii in range(10):
770             p = copy.copy(self.ipv6_params)
771
772             p.remote_tun_if_host = "1111::%d" % (ii + 1)
773             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
774             p.scapy_tun_spi = p.scapy_tun_spi + ii
775             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
776             p.vpp_tun_spi = p.vpp_tun_spi + ii
777
778             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
779             p.scapy_tra_spi = p.scapy_tra_spi + ii
780             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
781             p.vpp_tra_spi = p.vpp_tra_spi + ii
782             p.tun_dst = self.pg0.remote_hosts[ii].ip6
783
784             self.multi_params.append(p)
785             self.config_network(p)
786             self.config_sa_tra(p)
787             self.config_protect(p)
788
789     def tearDown(self):
790         super(TestIpsec6MultiTunIfEsp, self).tearDown()
791
792     def test_tun_66(self):
793         """Multiple IPSEC tunnel interfaces """
794         for p in self.multi_params:
795             self.verify_tun_66(p, count=127)
796             c = p.tun_if.get_rx_stats()
797             self.assertEqual(c['packets'], 127)
798             c = p.tun_if.get_tx_stats()
799             self.assertEqual(c['packets'], 127)
800
801
802 class TestIpsecGreTebIfEsp(TemplateIpsec,
803                            IpsecTun4Tests):
804     """ Ipsec GRE TEB ESP - TUN tests """
805     tun4_encrypt_node_name = "esp4-encrypt-tun"
806     tun4_decrypt_node_name = "esp4-decrypt-tun"
807     encryption_type = ESP
808     omac = "00:11:22:33:44:55"
809
810     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
811                          payload_size=100):
812         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
813                 sa.encrypt(IP(src=self.pg0.remote_ip4,
814                               dst=self.pg0.local_ip4) /
815                            GRE() /
816                            Ether(dst=self.omac) /
817                            IP(src="1.1.1.1", dst="1.1.1.2") /
818                            UDP(sport=1144, dport=2233) /
819                            Raw(b'X' * payload_size))
820                 for i in range(count)]
821
822     def gen_pkts(self, sw_intf, src, dst, count=1,
823                  payload_size=100):
824         return [Ether(dst=self.omac) /
825                 IP(src="1.1.1.1", dst="1.1.1.2") /
826                 UDP(sport=1144, dport=2233) /
827                 Raw(b'X' * payload_size)
828                 for i in range(count)]
829
830     def verify_decrypted(self, p, rxs):
831         for rx in rxs:
832             self.assert_equal(rx[Ether].dst, self.omac)
833             self.assert_equal(rx[IP].dst, "1.1.1.2")
834
835     def verify_encrypted(self, p, sa, rxs):
836         for rx in rxs:
837             try:
838                 pkt = sa.decrypt(rx[IP])
839                 if not pkt.haslayer(IP):
840                     pkt = IP(pkt[Raw].load)
841                 self.assert_packet_checksums_valid(pkt)
842                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
843                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
844                 self.assertTrue(pkt.haslayer(GRE))
845                 e = pkt[Ether]
846                 self.assertEqual(e[Ether].dst, self.omac)
847                 self.assertEqual(e[IP].dst, "1.1.1.2")
848             except (IndexError, AssertionError):
849                 self.logger.debug(ppp("Unexpected packet:", rx))
850                 try:
851                     self.logger.debug(ppp("Decrypted packet:", pkt))
852                 except:
853                     pass
854                 raise
855
856     def setUp(self):
857         super(TestIpsecGreTebIfEsp, self).setUp()
858
859         self.tun_if = self.pg0
860
861         p = self.ipv4_params
862
863         bd1 = VppBridgeDomain(self, 1)
864         bd1.add_vpp_config()
865
866         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
867                                   p.auth_algo_vpp_id, p.auth_key,
868                                   p.crypt_algo_vpp_id, p.crypt_key,
869                                   self.vpp_esp_protocol,
870                                   self.pg0.local_ip4,
871                                   self.pg0.remote_ip4)
872         p.tun_sa_out.add_vpp_config()
873
874         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
875                                  p.auth_algo_vpp_id, p.auth_key,
876                                  p.crypt_algo_vpp_id, p.crypt_key,
877                                  self.vpp_esp_protocol,
878                                  self.pg0.remote_ip4,
879                                  self.pg0.local_ip4)
880         p.tun_sa_in.add_vpp_config()
881
882         p.tun_if = VppGreInterface(self,
883                                    self.pg0.local_ip4,
884                                    self.pg0.remote_ip4,
885                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
886                                          GRE_API_TUNNEL_TYPE_TEB))
887         p.tun_if.add_vpp_config()
888
889         p.tun_protect = VppIpsecTunProtect(self,
890                                            p.tun_if,
891                                            p.tun_sa_out,
892                                            [p.tun_sa_in])
893
894         p.tun_protect.add_vpp_config()
895
896         p.tun_if.admin_up()
897         p.tun_if.config_ip4()
898         config_tun_params(p, self.encryption_type, p.tun_if)
899
900         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
901         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
902
903         self.vapi.cli("clear ipsec sa")
904         self.vapi.cli("sh adj")
905         self.vapi.cli("sh ipsec tun")
906
907     def tearDown(self):
908         p = self.ipv4_params
909         p.tun_if.unconfig_ip4()
910         super(TestIpsecGreTebIfEsp, self).tearDown()
911
912
913 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
914                                IpsecTun4Tests):
915     """ Ipsec GRE TEB ESP - TUN tests """
916     tun4_encrypt_node_name = "esp4-encrypt-tun"
917     tun4_decrypt_node_name = "esp4-decrypt-tun"
918     encryption_type = ESP
919     omac = "00:11:22:33:44:55"
920
921     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
922                          payload_size=100):
923         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
924                 sa.encrypt(IP(src=self.pg0.remote_ip4,
925                               dst=self.pg0.local_ip4) /
926                            GRE() /
927                            Ether(dst=self.omac) /
928                            IP(src="1.1.1.1", dst="1.1.1.2") /
929                            UDP(sport=1144, dport=2233) /
930                            Raw(b'X' * payload_size))
931                 for i in range(count)]
932
933     def gen_pkts(self, sw_intf, src, dst, count=1,
934                  payload_size=100):
935         return [Ether(dst=self.omac) /
936                 Dot1Q(vlan=11) /
937                 IP(src="1.1.1.1", dst="1.1.1.2") /
938                 UDP(sport=1144, dport=2233) /
939                 Raw(b'X' * payload_size)
940                 for i in range(count)]
941
942     def verify_decrypted(self, p, rxs):
943         for rx in rxs:
944             self.assert_equal(rx[Ether].dst, self.omac)
945             self.assert_equal(rx[Dot1Q].vlan, 11)
946             self.assert_equal(rx[IP].dst, "1.1.1.2")
947
948     def verify_encrypted(self, p, sa, rxs):
949         for rx in rxs:
950             try:
951                 pkt = sa.decrypt(rx[IP])
952                 if not pkt.haslayer(IP):
953                     pkt = IP(pkt[Raw].load)
954                 self.assert_packet_checksums_valid(pkt)
955                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
956                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
957                 self.assertTrue(pkt.haslayer(GRE))
958                 e = pkt[Ether]
959                 self.assertEqual(e[Ether].dst, self.omac)
960                 self.assertFalse(e.haslayer(Dot1Q))
961                 self.assertEqual(e[IP].dst, "1.1.1.2")
962             except (IndexError, AssertionError):
963                 self.logger.debug(ppp("Unexpected packet:", rx))
964                 try:
965                     self.logger.debug(ppp("Decrypted packet:", pkt))
966                 except:
967                     pass
968                 raise
969
970     def setUp(self):
971         super(TestIpsecGreTebVlanIfEsp, self).setUp()
972
973         self.tun_if = self.pg0
974
975         p = self.ipv4_params
976
977         bd1 = VppBridgeDomain(self, 1)
978         bd1.add_vpp_config()
979
980         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
981         self.vapi.l2_interface_vlan_tag_rewrite(
982             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
983             push_dot1q=11)
984         self.pg1_11.admin_up()
985
986         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
987                                   p.auth_algo_vpp_id, p.auth_key,
988                                   p.crypt_algo_vpp_id, p.crypt_key,
989                                   self.vpp_esp_protocol,
990                                   self.pg0.local_ip4,
991                                   self.pg0.remote_ip4)
992         p.tun_sa_out.add_vpp_config()
993
994         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
995                                  p.auth_algo_vpp_id, p.auth_key,
996                                  p.crypt_algo_vpp_id, p.crypt_key,
997                                  self.vpp_esp_protocol,
998                                  self.pg0.remote_ip4,
999                                  self.pg0.local_ip4)
1000         p.tun_sa_in.add_vpp_config()
1001
1002         p.tun_if = VppGreInterface(self,
1003                                    self.pg0.local_ip4,
1004                                    self.pg0.remote_ip4,
1005                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1006                                          GRE_API_TUNNEL_TYPE_TEB))
1007         p.tun_if.add_vpp_config()
1008
1009         p.tun_protect = VppIpsecTunProtect(self,
1010                                            p.tun_if,
1011                                            p.tun_sa_out,
1012                                            [p.tun_sa_in])
1013
1014         p.tun_protect.add_vpp_config()
1015
1016         p.tun_if.admin_up()
1017         p.tun_if.config_ip4()
1018         config_tun_params(p, self.encryption_type, p.tun_if)
1019
1020         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1021         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1022
1023         self.vapi.cli("clear ipsec sa")
1024
1025     def tearDown(self):
1026         p = self.ipv4_params
1027         p.tun_if.unconfig_ip4()
1028         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1029         self.pg1_11.admin_down()
1030         self.pg1_11.remove_vpp_config()
1031
1032
1033 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1034                               IpsecTun4Tests):
1035     """ Ipsec GRE TEB ESP - Tra tests """
1036     tun4_encrypt_node_name = "esp4-encrypt-tun"
1037     tun4_decrypt_node_name = "esp4-decrypt-tun"
1038     encryption_type = ESP
1039     omac = "00:11:22:33:44:55"
1040
1041     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1042                          payload_size=100):
1043         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1044                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1045                               dst=self.pg0.local_ip4) /
1046                            GRE() /
1047                            Ether(dst=self.omac) /
1048                            IP(src="1.1.1.1", dst="1.1.1.2") /
1049                            UDP(sport=1144, dport=2233) /
1050                            Raw(b'X' * payload_size))
1051                 for i in range(count)]
1052
1053     def gen_pkts(self, sw_intf, src, dst, count=1,
1054                  payload_size=100):
1055         return [Ether(dst=self.omac) /
1056                 IP(src="1.1.1.1", dst="1.1.1.2") /
1057                 UDP(sport=1144, dport=2233) /
1058                 Raw(b'X' * payload_size)
1059                 for i in range(count)]
1060
1061     def verify_decrypted(self, p, rxs):
1062         for rx in rxs:
1063             self.assert_equal(rx[Ether].dst, self.omac)
1064             self.assert_equal(rx[IP].dst, "1.1.1.2")
1065
1066     def verify_encrypted(self, p, sa, rxs):
1067         for rx in rxs:
1068             try:
1069                 pkt = sa.decrypt(rx[IP])
1070                 if not pkt.haslayer(IP):
1071                     pkt = IP(pkt[Raw].load)
1072                 self.assert_packet_checksums_valid(pkt)
1073                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1074                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1075                 self.assertTrue(pkt.haslayer(GRE))
1076                 e = pkt[Ether]
1077                 self.assertEqual(e[Ether].dst, self.omac)
1078                 self.assertEqual(e[IP].dst, "1.1.1.2")
1079             except (IndexError, AssertionError):
1080                 self.logger.debug(ppp("Unexpected packet:", rx))
1081                 try:
1082                     self.logger.debug(ppp("Decrypted packet:", pkt))
1083                 except:
1084                     pass
1085                 raise
1086
1087     def setUp(self):
1088         super(TestIpsecGreTebIfEspTra, self).setUp()
1089
1090         self.tun_if = self.pg0
1091
1092         p = self.ipv4_params
1093
1094         bd1 = VppBridgeDomain(self, 1)
1095         bd1.add_vpp_config()
1096
1097         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1098                                   p.auth_algo_vpp_id, p.auth_key,
1099                                   p.crypt_algo_vpp_id, p.crypt_key,
1100                                   self.vpp_esp_protocol)
1101         p.tun_sa_out.add_vpp_config()
1102
1103         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1104                                  p.auth_algo_vpp_id, p.auth_key,
1105                                  p.crypt_algo_vpp_id, p.crypt_key,
1106                                  self.vpp_esp_protocol)
1107         p.tun_sa_in.add_vpp_config()
1108
1109         p.tun_if = VppGreInterface(self,
1110                                    self.pg0.local_ip4,
1111                                    self.pg0.remote_ip4,
1112                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1113                                          GRE_API_TUNNEL_TYPE_TEB))
1114         p.tun_if.add_vpp_config()
1115
1116         p.tun_protect = VppIpsecTunProtect(self,
1117                                            p.tun_if,
1118                                            p.tun_sa_out,
1119                                            [p.tun_sa_in])
1120
1121         p.tun_protect.add_vpp_config()
1122
1123         p.tun_if.admin_up()
1124         p.tun_if.config_ip4()
1125         config_tra_params(p, self.encryption_type, p.tun_if)
1126
1127         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1128         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1129
1130         self.vapi.cli("clear ipsec sa")
1131
1132     def tearDown(self):
1133         p = self.ipv4_params
1134         p.tun_if.unconfig_ip4()
1135         super(TestIpsecGreTebIfEspTra, self).tearDown()
1136
1137
1138 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1139                                  IpsecTun4Tests):
1140     """ Ipsec GRE TEB UDP ESP - Tra tests """
1141     tun4_encrypt_node_name = "esp4-encrypt-tun"
1142     tun4_decrypt_node_name = "esp4-decrypt-tun"
1143     encryption_type = ESP
1144     omac = "00:11:22:33:44:55"
1145
1146     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1147                          payload_size=100):
1148         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1149                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1150                               dst=self.pg0.local_ip4) /
1151                            GRE() /
1152                            Ether(dst=self.omac) /
1153                            IP(src="1.1.1.1", dst="1.1.1.2") /
1154                            UDP(sport=1144, dport=2233) /
1155                            Raw(b'X' * payload_size))
1156                 for i in range(count)]
1157
1158     def gen_pkts(self, sw_intf, src, dst, count=1,
1159                  payload_size=100):
1160         return [Ether(dst=self.omac) /
1161                 IP(src="1.1.1.1", dst="1.1.1.2") /
1162                 UDP(sport=1144, dport=2233) /
1163                 Raw(b'X' * payload_size)
1164                 for i in range(count)]
1165
1166     def verify_decrypted(self, p, rxs):
1167         for rx in rxs:
1168             self.assert_equal(rx[Ether].dst, self.omac)
1169             self.assert_equal(rx[IP].dst, "1.1.1.2")
1170
1171     def verify_encrypted(self, p, sa, rxs):
1172         for rx in rxs:
1173             self.assertTrue(rx.haslayer(UDP))
1174             self.assertEqual(rx[UDP].dport, 4545)
1175             self.assertEqual(rx[UDP].sport, 5454)
1176             try:
1177                 pkt = sa.decrypt(rx[IP])
1178                 if not pkt.haslayer(IP):
1179                     pkt = IP(pkt[Raw].load)
1180                 self.assert_packet_checksums_valid(pkt)
1181                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1182                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1183                 self.assertTrue(pkt.haslayer(GRE))
1184                 e = pkt[Ether]
1185                 self.assertEqual(e[Ether].dst, self.omac)
1186                 self.assertEqual(e[IP].dst, "1.1.1.2")
1187             except (IndexError, AssertionError):
1188                 self.logger.debug(ppp("Unexpected packet:", rx))
1189                 try:
1190                     self.logger.debug(ppp("Decrypted packet:", pkt))
1191                 except:
1192                     pass
1193                 raise
1194
1195     def setUp(self):
1196         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1197
1198         self.tun_if = self.pg0
1199
1200         p = self.ipv4_params
1201         p = self.ipv4_params
1202         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1203                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1204         p.nat_header = UDP(sport=5454, dport=4545)
1205
1206         bd1 = VppBridgeDomain(self, 1)
1207         bd1.add_vpp_config()
1208
1209         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1210                                   p.auth_algo_vpp_id, p.auth_key,
1211                                   p.crypt_algo_vpp_id, p.crypt_key,
1212                                   self.vpp_esp_protocol,
1213                                   flags=p.flags,
1214                                   udp_src=5454,
1215                                   udp_dst=4545)
1216         p.tun_sa_out.add_vpp_config()
1217
1218         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1219                                  p.auth_algo_vpp_id, p.auth_key,
1220                                  p.crypt_algo_vpp_id, p.crypt_key,
1221                                  self.vpp_esp_protocol,
1222                                  flags=(p.flags |
1223                                         VppEnum.vl_api_ipsec_sad_flags_t.
1224                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1225                                  udp_src=5454,
1226                                  udp_dst=4545)
1227         p.tun_sa_in.add_vpp_config()
1228
1229         p.tun_if = VppGreInterface(self,
1230                                    self.pg0.local_ip4,
1231                                    self.pg0.remote_ip4,
1232                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1233                                          GRE_API_TUNNEL_TYPE_TEB))
1234         p.tun_if.add_vpp_config()
1235
1236         p.tun_protect = VppIpsecTunProtect(self,
1237                                            p.tun_if,
1238                                            p.tun_sa_out,
1239                                            [p.tun_sa_in])
1240
1241         p.tun_protect.add_vpp_config()
1242
1243         p.tun_if.admin_up()
1244         p.tun_if.config_ip4()
1245         config_tra_params(p, self.encryption_type, p.tun_if)
1246
1247         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1248         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1249
1250         self.vapi.cli("clear ipsec sa")
1251         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1252
1253     def tearDown(self):
1254         p = self.ipv4_params
1255         p.tun_if.unconfig_ip4()
1256         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1257
1258
1259 class TestIpsecGreIfEsp(TemplateIpsec,
1260                         IpsecTun4Tests):
1261     """ Ipsec GRE ESP - TUN tests """
1262     tun4_encrypt_node_name = "esp4-encrypt-tun"
1263     tun4_decrypt_node_name = "esp4-decrypt-tun"
1264     encryption_type = ESP
1265
1266     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1267                          payload_size=100):
1268         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1269                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1270                               dst=self.pg0.local_ip4) /
1271                            GRE() /
1272                            IP(src=self.pg1.local_ip4,
1273                               dst=self.pg1.remote_ip4) /
1274                            UDP(sport=1144, dport=2233) /
1275                            Raw(b'X' * payload_size))
1276                 for i in range(count)]
1277
1278     def gen_pkts(self, sw_intf, src, dst, count=1,
1279                  payload_size=100):
1280         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1281                 IP(src="1.1.1.1", dst="1.1.1.2") /
1282                 UDP(sport=1144, dport=2233) /
1283                 Raw(b'X' * payload_size)
1284                 for i in range(count)]
1285
1286     def verify_decrypted(self, p, rxs):
1287         for rx in rxs:
1288             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1289             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1290
1291     def verify_encrypted(self, p, sa, rxs):
1292         for rx in rxs:
1293             try:
1294                 pkt = sa.decrypt(rx[IP])
1295                 if not pkt.haslayer(IP):
1296                     pkt = IP(pkt[Raw].load)
1297                 self.assert_packet_checksums_valid(pkt)
1298                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1299                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1300                 self.assertTrue(pkt.haslayer(GRE))
1301                 e = pkt[GRE]
1302                 self.assertEqual(e[IP].dst, "1.1.1.2")
1303             except (IndexError, AssertionError):
1304                 self.logger.debug(ppp("Unexpected packet:", rx))
1305                 try:
1306                     self.logger.debug(ppp("Decrypted packet:", pkt))
1307                 except:
1308                     pass
1309                 raise
1310
1311     def setUp(self):
1312         super(TestIpsecGreIfEsp, self).setUp()
1313
1314         self.tun_if = self.pg0
1315
1316         p = self.ipv4_params
1317
1318         bd1 = VppBridgeDomain(self, 1)
1319         bd1.add_vpp_config()
1320
1321         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1322                                   p.auth_algo_vpp_id, p.auth_key,
1323                                   p.crypt_algo_vpp_id, p.crypt_key,
1324                                   self.vpp_esp_protocol,
1325                                   self.pg0.local_ip4,
1326                                   self.pg0.remote_ip4)
1327         p.tun_sa_out.add_vpp_config()
1328
1329         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1330                                  p.auth_algo_vpp_id, p.auth_key,
1331                                  p.crypt_algo_vpp_id, p.crypt_key,
1332                                  self.vpp_esp_protocol,
1333                                  self.pg0.remote_ip4,
1334                                  self.pg0.local_ip4)
1335         p.tun_sa_in.add_vpp_config()
1336
1337         p.tun_if = VppGreInterface(self,
1338                                    self.pg0.local_ip4,
1339                                    self.pg0.remote_ip4)
1340         p.tun_if.add_vpp_config()
1341
1342         p.tun_protect = VppIpsecTunProtect(self,
1343                                            p.tun_if,
1344                                            p.tun_sa_out,
1345                                            [p.tun_sa_in])
1346         p.tun_protect.add_vpp_config()
1347
1348         p.tun_if.admin_up()
1349         p.tun_if.config_ip4()
1350         config_tun_params(p, self.encryption_type, p.tun_if)
1351
1352         VppIpRoute(self, "1.1.1.2", 32,
1353                    [VppRoutePath(p.tun_if.remote_ip4,
1354                                  0xffffffff)]).add_vpp_config()
1355
1356     def tearDown(self):
1357         p = self.ipv4_params
1358         p.tun_if.unconfig_ip4()
1359         super(TestIpsecGreIfEsp, self).tearDown()
1360
1361
1362 class TestIpsecGreIfEspTra(TemplateIpsec,
1363                            IpsecTun4Tests):
1364     """ Ipsec GRE ESP - TRA tests """
1365     tun4_encrypt_node_name = "esp4-encrypt-tun"
1366     tun4_decrypt_node_name = "esp4-decrypt-tun"
1367     encryption_type = ESP
1368
1369     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1370                          payload_size=100):
1371         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1372                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1373                               dst=self.pg0.local_ip4) /
1374                            GRE() /
1375                            IP(src=self.pg1.local_ip4,
1376                               dst=self.pg1.remote_ip4) /
1377                            UDP(sport=1144, dport=2233) /
1378                            Raw(b'X' * payload_size))
1379                 for i in range(count)]
1380
1381     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1382                                 payload_size=100):
1383         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1384                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1385                               dst=self.pg0.local_ip4) /
1386                            GRE() /
1387                            UDP(sport=1144, dport=2233) /
1388                            Raw(b'X' * payload_size))
1389                 for i in range(count)]
1390
1391     def gen_pkts(self, sw_intf, src, dst, count=1,
1392                  payload_size=100):
1393         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1394                 IP(src="1.1.1.1", dst="1.1.1.2") /
1395                 UDP(sport=1144, dport=2233) /
1396                 Raw(b'X' * payload_size)
1397                 for i in range(count)]
1398
1399     def verify_decrypted(self, p, rxs):
1400         for rx in rxs:
1401             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1402             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1403
1404     def verify_encrypted(self, p, sa, rxs):
1405         for rx in rxs:
1406             try:
1407                 pkt = sa.decrypt(rx[IP])
1408                 if not pkt.haslayer(IP):
1409                     pkt = IP(pkt[Raw].load)
1410                 self.assert_packet_checksums_valid(pkt)
1411                 self.assertTrue(pkt.haslayer(GRE))
1412                 e = pkt[GRE]
1413                 self.assertEqual(e[IP].dst, "1.1.1.2")
1414             except (IndexError, AssertionError):
1415                 self.logger.debug(ppp("Unexpected packet:", rx))
1416                 try:
1417                     self.logger.debug(ppp("Decrypted packet:", pkt))
1418                 except:
1419                     pass
1420                 raise
1421
1422     def setUp(self):
1423         super(TestIpsecGreIfEspTra, self).setUp()
1424
1425         self.tun_if = self.pg0
1426
1427         p = self.ipv4_params
1428
1429         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1430                                   p.auth_algo_vpp_id, p.auth_key,
1431                                   p.crypt_algo_vpp_id, p.crypt_key,
1432                                   self.vpp_esp_protocol)
1433         p.tun_sa_out.add_vpp_config()
1434
1435         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1436                                  p.auth_algo_vpp_id, p.auth_key,
1437                                  p.crypt_algo_vpp_id, p.crypt_key,
1438                                  self.vpp_esp_protocol)
1439         p.tun_sa_in.add_vpp_config()
1440
1441         p.tun_if = VppGreInterface(self,
1442                                    self.pg0.local_ip4,
1443                                    self.pg0.remote_ip4)
1444         p.tun_if.add_vpp_config()
1445
1446         p.tun_protect = VppIpsecTunProtect(self,
1447                                            p.tun_if,
1448                                            p.tun_sa_out,
1449                                            [p.tun_sa_in])
1450         p.tun_protect.add_vpp_config()
1451
1452         p.tun_if.admin_up()
1453         p.tun_if.config_ip4()
1454         config_tra_params(p, self.encryption_type, p.tun_if)
1455
1456         VppIpRoute(self, "1.1.1.2", 32,
1457                    [VppRoutePath(p.tun_if.remote_ip4,
1458                                  0xffffffff)]).add_vpp_config()
1459
1460     def tearDown(self):
1461         p = self.ipv4_params
1462         p.tun_if.unconfig_ip4()
1463         super(TestIpsecGreIfEspTra, self).tearDown()
1464
1465     def test_gre_non_ip(self):
1466         p = self.ipv4_params
1467         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1468                                           src=p.remote_tun_if_host,
1469                                           dst=self.pg1.remote_ip6)
1470         self.send_and_assert_no_replies(self.tun_if, tx)
1471         node_name = ('/err/%s/unsupported payload' %
1472                      self.tun4_decrypt_node_name)
1473         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1474
1475
1476 class TestIpsecGre6IfEspTra(TemplateIpsec,
1477                             IpsecTun6Tests):
1478     """ Ipsec GRE ESP - TRA tests """
1479     tun6_encrypt_node_name = "esp6-encrypt-tun"
1480     tun6_decrypt_node_name = "esp6-decrypt-tun"
1481     encryption_type = ESP
1482
1483     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1484                           payload_size=100):
1485         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1486                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1487                                 dst=self.pg0.local_ip6) /
1488                            GRE() /
1489                            IPv6(src=self.pg1.local_ip6,
1490                                 dst=self.pg1.remote_ip6) /
1491                            UDP(sport=1144, dport=2233) /
1492                            Raw(b'X' * payload_size))
1493                 for i in range(count)]
1494
1495     def gen_pkts6(self, sw_intf, src, dst, count=1,
1496                   payload_size=100):
1497         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1498                 IPv6(src="1::1", dst="1::2") /
1499                 UDP(sport=1144, dport=2233) /
1500                 Raw(b'X' * payload_size)
1501                 for i in range(count)]
1502
1503     def verify_decrypted6(self, p, rxs):
1504         for rx in rxs:
1505             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1506             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1507
1508     def verify_encrypted6(self, p, sa, rxs):
1509         for rx in rxs:
1510             try:
1511                 pkt = sa.decrypt(rx[IPv6])
1512                 if not pkt.haslayer(IPv6):
1513                     pkt = IPv6(pkt[Raw].load)
1514                 self.assert_packet_checksums_valid(pkt)
1515                 self.assertTrue(pkt.haslayer(GRE))
1516                 e = pkt[GRE]
1517                 self.assertEqual(e[IPv6].dst, "1::2")
1518             except (IndexError, AssertionError):
1519                 self.logger.debug(ppp("Unexpected packet:", rx))
1520                 try:
1521                     self.logger.debug(ppp("Decrypted packet:", pkt))
1522                 except:
1523                     pass
1524                 raise
1525
1526     def setUp(self):
1527         super(TestIpsecGre6IfEspTra, self).setUp()
1528
1529         self.tun_if = self.pg0
1530
1531         p = self.ipv6_params
1532
1533         bd1 = VppBridgeDomain(self, 1)
1534         bd1.add_vpp_config()
1535
1536         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1537                                   p.auth_algo_vpp_id, p.auth_key,
1538                                   p.crypt_algo_vpp_id, p.crypt_key,
1539                                   self.vpp_esp_protocol)
1540         p.tun_sa_out.add_vpp_config()
1541
1542         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1543                                  p.auth_algo_vpp_id, p.auth_key,
1544                                  p.crypt_algo_vpp_id, p.crypt_key,
1545                                  self.vpp_esp_protocol)
1546         p.tun_sa_in.add_vpp_config()
1547
1548         p.tun_if = VppGreInterface(self,
1549                                    self.pg0.local_ip6,
1550                                    self.pg0.remote_ip6)
1551         p.tun_if.add_vpp_config()
1552
1553         p.tun_protect = VppIpsecTunProtect(self,
1554                                            p.tun_if,
1555                                            p.tun_sa_out,
1556                                            [p.tun_sa_in])
1557         p.tun_protect.add_vpp_config()
1558
1559         p.tun_if.admin_up()
1560         p.tun_if.config_ip6()
1561         config_tra_params(p, self.encryption_type, p.tun_if)
1562
1563         r = VppIpRoute(self, "1::2", 128,
1564                        [VppRoutePath(p.tun_if.remote_ip6,
1565                                      0xffffffff,
1566                                      proto=DpoProto.DPO_PROTO_IP6)])
1567         r.add_vpp_config()
1568
1569     def tearDown(self):
1570         p = self.ipv6_params
1571         p.tun_if.unconfig_ip6()
1572         super(TestIpsecGre6IfEspTra, self).tearDown()
1573
1574
1575 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1576     """ Ipsec mGRE ESP v4 TRA tests """
1577     tun4_encrypt_node_name = "esp4-encrypt-tun"
1578     tun4_decrypt_node_name = "esp4-decrypt-tun"
1579     encryption_type = ESP
1580
1581     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1582                          payload_size=100):
1583         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1584                 sa.encrypt(IP(src=p.tun_dst,
1585                               dst=self.pg0.local_ip4) /
1586                            GRE() /
1587                            IP(src=self.pg1.local_ip4,
1588                               dst=self.pg1.remote_ip4) /
1589                            UDP(sport=1144, dport=2233) /
1590                            Raw(b'X' * payload_size))
1591                 for i in range(count)]
1592
1593     def gen_pkts(self, sw_intf, src, dst, count=1,
1594                  payload_size=100):
1595         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1596                 IP(src="1.1.1.1", dst=dst) /
1597                 UDP(sport=1144, dport=2233) /
1598                 Raw(b'X' * payload_size)
1599                 for i in range(count)]
1600
1601     def verify_decrypted(self, p, rxs):
1602         for rx in rxs:
1603             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1604             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1605
1606     def verify_encrypted(self, p, sa, rxs):
1607         for rx in rxs:
1608             try:
1609                 pkt = sa.decrypt(rx[IP])
1610                 if not pkt.haslayer(IP):
1611                     pkt = IP(pkt[Raw].load)
1612                 self.assert_packet_checksums_valid(pkt)
1613                 self.assertTrue(pkt.haslayer(GRE))
1614                 e = pkt[GRE]
1615                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1616             except (IndexError, AssertionError):
1617                 self.logger.debug(ppp("Unexpected packet:", rx))
1618                 try:
1619                     self.logger.debug(ppp("Decrypted packet:", pkt))
1620                 except:
1621                     pass
1622                 raise
1623
1624     def setUp(self):
1625         super(TestIpsecMGreIfEspTra4, self).setUp()
1626
1627         N_NHS = 16
1628         self.tun_if = self.pg0
1629         p = self.ipv4_params
1630         p.tun_if = VppGreInterface(self,
1631                                    self.pg0.local_ip4,
1632                                    "0.0.0.0",
1633                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1634                                          TUNNEL_API_MODE_MP))
1635         p.tun_if.add_vpp_config()
1636         p.tun_if.admin_up()
1637         p.tun_if.config_ip4()
1638         p.tun_if.generate_remote_hosts(N_NHS)
1639         self.pg0.generate_remote_hosts(N_NHS)
1640         self.pg0.configure_ipv4_neighbors()
1641
1642         # setup some SAs for several next-hops on the interface
1643         self.multi_params = []
1644
1645         for ii in range(N_NHS):
1646             p = copy.copy(self.ipv4_params)
1647
1648             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1649             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1650             p.scapy_tun_spi = p.scapy_tun_spi + ii
1651             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1652             p.vpp_tun_spi = p.vpp_tun_spi + ii
1653
1654             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1655             p.scapy_tra_spi = p.scapy_tra_spi + ii
1656             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1657             p.vpp_tra_spi = p.vpp_tra_spi + ii
1658             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1659                                       p.auth_algo_vpp_id, p.auth_key,
1660                                       p.crypt_algo_vpp_id, p.crypt_key,
1661                                       self.vpp_esp_protocol)
1662             p.tun_sa_out.add_vpp_config()
1663
1664             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1665                                      p.auth_algo_vpp_id, p.auth_key,
1666                                      p.crypt_algo_vpp_id, p.crypt_key,
1667                                      self.vpp_esp_protocol)
1668             p.tun_sa_in.add_vpp_config()
1669
1670             p.tun_protect = VppIpsecTunProtect(
1671                 self,
1672                 p.tun_if,
1673                 p.tun_sa_out,
1674                 [p.tun_sa_in],
1675                 nh=p.tun_if.remote_hosts[ii].ip4)
1676             p.tun_protect.add_vpp_config()
1677             config_tra_params(p, self.encryption_type, p.tun_if)
1678             self.multi_params.append(p)
1679
1680             VppIpRoute(self, p.remote_tun_if_host, 32,
1681                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1682                                      p.tun_if.sw_if_index)]).add_vpp_config()
1683
1684             # in this v4 variant add the teibs after the protect
1685             p.teib = VppTeib(self, p.tun_if,
1686                              p.tun_if.remote_hosts[ii].ip4,
1687                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1688             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1689         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1690
1691     def tearDown(self):
1692         p = self.ipv4_params
1693         p.tun_if.unconfig_ip4()
1694         super(TestIpsecMGreIfEspTra4, self).tearDown()
1695
1696     def test_tun_44(self):
1697         """mGRE IPSEC 44"""
1698         N_PKTS = 63
1699         for p in self.multi_params:
1700             self.verify_tun_44(p, count=N_PKTS)
1701             p.teib.remove_vpp_config()
1702             self.verify_tun_dropped_44(p, count=N_PKTS)
1703             p.teib.add_vpp_config()
1704             self.verify_tun_44(p, count=N_PKTS)
1705
1706
1707 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1708     """ Ipsec mGRE ESP v6 TRA tests """
1709     tun6_encrypt_node_name = "esp6-encrypt-tun"
1710     tun6_decrypt_node_name = "esp6-decrypt-tun"
1711     encryption_type = ESP
1712
1713     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1714                           payload_size=100):
1715         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1716                 sa.encrypt(IPv6(src=p.tun_dst,
1717                                 dst=self.pg0.local_ip6) /
1718                            GRE() /
1719                            IPv6(src=self.pg1.local_ip6,
1720                                 dst=self.pg1.remote_ip6) /
1721                            UDP(sport=1144, dport=2233) /
1722                            Raw(b'X' * payload_size))
1723                 for i in range(count)]
1724
1725     def gen_pkts6(self, sw_intf, src, dst, count=1,
1726                   payload_size=100):
1727         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1728                 IPv6(src="1::1", dst=dst) /
1729                 UDP(sport=1144, dport=2233) /
1730                 Raw(b'X' * payload_size)
1731                 for i in range(count)]
1732
1733     def verify_decrypted6(self, p, rxs):
1734         for rx in rxs:
1735             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1736             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1737
1738     def verify_encrypted6(self, p, sa, rxs):
1739         for rx in rxs:
1740             try:
1741                 pkt = sa.decrypt(rx[IPv6])
1742                 if not pkt.haslayer(IPv6):
1743                     pkt = IPv6(pkt[Raw].load)
1744                 self.assert_packet_checksums_valid(pkt)
1745                 self.assertTrue(pkt.haslayer(GRE))
1746                 e = pkt[GRE]
1747                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1748             except (IndexError, AssertionError):
1749                 self.logger.debug(ppp("Unexpected packet:", rx))
1750                 try:
1751                     self.logger.debug(ppp("Decrypted packet:", pkt))
1752                 except:
1753                     pass
1754                 raise
1755
1756     def setUp(self):
1757         super(TestIpsecMGreIfEspTra6, self).setUp()
1758
1759         self.vapi.cli("set logging class ipsec level debug")
1760
1761         N_NHS = 16
1762         self.tun_if = self.pg0
1763         p = self.ipv6_params
1764         p.tun_if = VppGreInterface(self,
1765                                    self.pg0.local_ip6,
1766                                    "::",
1767                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1768                                          TUNNEL_API_MODE_MP))
1769         p.tun_if.add_vpp_config()
1770         p.tun_if.admin_up()
1771         p.tun_if.config_ip6()
1772         p.tun_if.generate_remote_hosts(N_NHS)
1773         self.pg0.generate_remote_hosts(N_NHS)
1774         self.pg0.configure_ipv6_neighbors()
1775
1776         # setup some SAs for several next-hops on the interface
1777         self.multi_params = []
1778
1779         for ii in range(N_NHS):
1780             p = copy.copy(self.ipv6_params)
1781
1782             p.remote_tun_if_host = "1::%d" % (ii + 1)
1783             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1784             p.scapy_tun_spi = p.scapy_tun_spi + ii
1785             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1786             p.vpp_tun_spi = p.vpp_tun_spi + ii
1787
1788             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1789             p.scapy_tra_spi = p.scapy_tra_spi + ii
1790             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1791             p.vpp_tra_spi = p.vpp_tra_spi + ii
1792             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1793                                       p.auth_algo_vpp_id, p.auth_key,
1794                                       p.crypt_algo_vpp_id, p.crypt_key,
1795                                       self.vpp_esp_protocol)
1796             p.tun_sa_out.add_vpp_config()
1797
1798             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1799                                      p.auth_algo_vpp_id, p.auth_key,
1800                                      p.crypt_algo_vpp_id, p.crypt_key,
1801                                      self.vpp_esp_protocol)
1802             p.tun_sa_in.add_vpp_config()
1803
1804             # in this v6 variant add the teibs first then the protection
1805             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1806             VppTeib(self, p.tun_if,
1807                     p.tun_if.remote_hosts[ii].ip6,
1808                     p.tun_dst).add_vpp_config()
1809
1810             p.tun_protect = VppIpsecTunProtect(
1811                 self,
1812                 p.tun_if,
1813                 p.tun_sa_out,
1814                 [p.tun_sa_in],
1815                 nh=p.tun_if.remote_hosts[ii].ip6)
1816             p.tun_protect.add_vpp_config()
1817             config_tra_params(p, self.encryption_type, p.tun_if)
1818             self.multi_params.append(p)
1819
1820             VppIpRoute(self, p.remote_tun_if_host, 128,
1821                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1822                                      p.tun_if.sw_if_index)]).add_vpp_config()
1823             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1824
1825         self.logger.info(self.vapi.cli("sh log"))
1826         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1827         self.logger.info(self.vapi.cli("sh adj 41"))
1828
1829     def tearDown(self):
1830         p = self.ipv6_params
1831         p.tun_if.unconfig_ip6()
1832         super(TestIpsecMGreIfEspTra6, self).tearDown()
1833
1834     def test_tun_66(self):
1835         """mGRE IPSec 66"""
1836         for p in self.multi_params:
1837             self.verify_tun_66(p, count=63)
1838
1839
1840 class TestIpsec4TunProtect(TemplateIpsec,
1841                            TemplateIpsec4TunProtect,
1842                            IpsecTun4):
1843     """ IPsec IPv4 Tunnel protect - transport mode"""
1844
1845     def setUp(self):
1846         super(TestIpsec4TunProtect, self).setUp()
1847
1848         self.tun_if = self.pg0
1849
1850     def tearDown(self):
1851         super(TestIpsec4TunProtect, self).tearDown()
1852
1853     def test_tun_44(self):
1854         """IPSEC tunnel protect"""
1855
1856         p = self.ipv4_params
1857
1858         self.config_network(p)
1859         self.config_sa_tra(p)
1860         self.config_protect(p)
1861
1862         self.verify_tun_44(p, count=127)
1863         c = p.tun_if.get_rx_stats()
1864         self.assertEqual(c['packets'], 127)
1865         c = p.tun_if.get_tx_stats()
1866         self.assertEqual(c['packets'], 127)
1867
1868         self.vapi.cli("clear ipsec sa")
1869         self.verify_tun_64(p, count=127)
1870         c = p.tun_if.get_rx_stats()
1871         self.assertEqual(c['packets'], 254)
1872         c = p.tun_if.get_tx_stats()
1873         self.assertEqual(c['packets'], 254)
1874
1875         # rekey - create new SAs and update the tunnel protection
1876         np = copy.copy(p)
1877         np.crypt_key = b'X' + p.crypt_key[1:]
1878         np.scapy_tun_spi += 100
1879         np.scapy_tun_sa_id += 1
1880         np.vpp_tun_spi += 100
1881         np.vpp_tun_sa_id += 1
1882         np.tun_if.local_spi = p.vpp_tun_spi
1883         np.tun_if.remote_spi = p.scapy_tun_spi
1884
1885         self.config_sa_tra(np)
1886         self.config_protect(np)
1887         self.unconfig_sa(p)
1888
1889         self.verify_tun_44(np, count=127)
1890         c = p.tun_if.get_rx_stats()
1891         self.assertEqual(c['packets'], 381)
1892         c = p.tun_if.get_tx_stats()
1893         self.assertEqual(c['packets'], 381)
1894
1895         # teardown
1896         self.unconfig_protect(np)
1897         self.unconfig_sa(np)
1898         self.unconfig_network(p)
1899
1900
1901 class TestIpsec4TunProtectUdp(TemplateIpsec,
1902                               TemplateIpsec4TunProtect,
1903                               IpsecTun4):
1904     """ IPsec IPv4 Tunnel protect - transport mode"""
1905
1906     def setUp(self):
1907         super(TestIpsec4TunProtectUdp, self).setUp()
1908
1909         self.tun_if = self.pg0
1910
1911         p = self.ipv4_params
1912         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1913                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1914         p.nat_header = UDP(sport=4500, dport=4500)
1915         self.config_network(p)
1916         self.config_sa_tra(p)
1917         self.config_protect(p)
1918
1919     def tearDown(self):
1920         p = self.ipv4_params
1921         self.unconfig_protect(p)
1922         self.unconfig_sa(p)
1923         self.unconfig_network(p)
1924         super(TestIpsec4TunProtectUdp, self).tearDown()
1925
1926     def verify_encrypted(self, p, sa, rxs):
1927         # ensure encrypted packets are recieved with the default UDP ports
1928         for rx in rxs:
1929             self.assertEqual(rx[UDP].sport, 4500)
1930             self.assertEqual(rx[UDP].dport, 4500)
1931         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1932
1933     def test_tun_44(self):
1934         """IPSEC UDP tunnel protect"""
1935
1936         p = self.ipv4_params
1937
1938         self.verify_tun_44(p, count=127)
1939         c = p.tun_if.get_rx_stats()
1940         self.assertEqual(c['packets'], 127)
1941         c = p.tun_if.get_tx_stats()
1942         self.assertEqual(c['packets'], 127)
1943
1944     def test_keepalive(self):
1945         """ IPSEC NAT Keepalive """
1946         self.verify_keepalive(self.ipv4_params)
1947
1948
1949 class TestIpsec4TunProtectTun(TemplateIpsec,
1950                               TemplateIpsec4TunProtect,
1951                               IpsecTun4):
1952     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1953
1954     encryption_type = ESP
1955     tun4_encrypt_node_name = "esp4-encrypt-tun"
1956     tun4_decrypt_node_name = "esp4-decrypt-tun"
1957
1958     def setUp(self):
1959         super(TestIpsec4TunProtectTun, self).setUp()
1960
1961         self.tun_if = self.pg0
1962
1963     def tearDown(self):
1964         super(TestIpsec4TunProtectTun, self).tearDown()
1965
1966     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1967                          payload_size=100):
1968         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1969                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1970                               dst=sw_intf.local_ip4) /
1971                            IP(src=src, dst=dst) /
1972                            UDP(sport=1144, dport=2233) /
1973                            Raw(b'X' * payload_size))
1974                 for i in range(count)]
1975
1976     def gen_pkts(self, sw_intf, src, dst, count=1,
1977                  payload_size=100):
1978         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1979                 IP(src=src, dst=dst) /
1980                 UDP(sport=1144, dport=2233) /
1981                 Raw(b'X' * payload_size)
1982                 for i in range(count)]
1983
1984     def verify_decrypted(self, p, rxs):
1985         for rx in rxs:
1986             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1987             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1988             self.assert_packet_checksums_valid(rx)
1989
1990     def verify_encrypted(self, p, sa, rxs):
1991         for rx in rxs:
1992             try:
1993                 pkt = sa.decrypt(rx[IP])
1994                 if not pkt.haslayer(IP):
1995                     pkt = IP(pkt[Raw].load)
1996                 self.assert_packet_checksums_valid(pkt)
1997                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1998                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1999                 inner = pkt[IP].payload
2000                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2001
2002             except (IndexError, AssertionError):
2003                 self.logger.debug(ppp("Unexpected packet:", rx))
2004                 try:
2005                     self.logger.debug(ppp("Decrypted packet:", pkt))
2006                 except:
2007                     pass
2008                 raise
2009
2010     def test_tun_44(self):
2011         """IPSEC tunnel protect """
2012
2013         p = self.ipv4_params
2014
2015         self.config_network(p)
2016         self.config_sa_tun(p)
2017         self.config_protect(p)
2018
2019         # also add an output features on the tunnel and physical interface
2020         # so we test they still work
2021         r_all = AclRule(True,
2022                         src_prefix="0.0.0.0/0",
2023                         dst_prefix="0.0.0.0/0",
2024                         proto=0)
2025         a = VppAcl(self, [r_all]).add_vpp_config()
2026
2027         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2028         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2029
2030         self.verify_tun_44(p, count=127)
2031
2032         c = p.tun_if.get_rx_stats()
2033         self.assertEqual(c['packets'], 127)
2034         c = p.tun_if.get_tx_stats()
2035         self.assertEqual(c['packets'], 127)
2036
2037         # rekey - create new SAs and update the tunnel protection
2038         np = copy.copy(p)
2039         np.crypt_key = b'X' + p.crypt_key[1:]
2040         np.scapy_tun_spi += 100
2041         np.scapy_tun_sa_id += 1
2042         np.vpp_tun_spi += 100
2043         np.vpp_tun_sa_id += 1
2044         np.tun_if.local_spi = p.vpp_tun_spi
2045         np.tun_if.remote_spi = p.scapy_tun_spi
2046
2047         self.config_sa_tun(np)
2048         self.config_protect(np)
2049         self.unconfig_sa(p)
2050
2051         self.verify_tun_44(np, count=127)
2052         c = p.tun_if.get_rx_stats()
2053         self.assertEqual(c['packets'], 254)
2054         c = p.tun_if.get_tx_stats()
2055         self.assertEqual(c['packets'], 254)
2056
2057         # teardown
2058         self.unconfig_protect(np)
2059         self.unconfig_sa(np)
2060         self.unconfig_network(p)
2061
2062
2063 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2064                                   TemplateIpsec4TunProtect,
2065                                   IpsecTun4):
2066     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2067
2068     encryption_type = ESP
2069     tun4_encrypt_node_name = "esp4-encrypt-tun"
2070     tun4_decrypt_node_name = "esp4-decrypt-tun"
2071
2072     def setUp(self):
2073         super(TestIpsec4TunProtectTunDrop, self).setUp()
2074
2075         self.tun_if = self.pg0
2076
2077     def tearDown(self):
2078         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2079
2080     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2081                          payload_size=100):
2082         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2083                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2084                               dst="5.5.5.5") /
2085                            IP(src=src, dst=dst) /
2086                            UDP(sport=1144, dport=2233) /
2087                            Raw(b'X' * payload_size))
2088                 for i in range(count)]
2089
2090     def test_tun_drop_44(self):
2091         """IPSEC tunnel protect bogus tunnel header """
2092
2093         p = self.ipv4_params
2094
2095         self.config_network(p)
2096         self.config_sa_tun(p)
2097         self.config_protect(p)
2098
2099         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2100                                    src=p.remote_tun_if_host,
2101                                    dst=self.pg1.remote_ip4,
2102                                    count=63)
2103         self.send_and_assert_no_replies(self.tun_if, tx)
2104
2105         # teardown
2106         self.unconfig_protect(p)
2107         self.unconfig_sa(p)
2108         self.unconfig_network(p)
2109
2110
2111 class TestIpsec6TunProtect(TemplateIpsec,
2112                            TemplateIpsec6TunProtect,
2113                            IpsecTun6):
2114     """ IPsec IPv6 Tunnel protect - transport mode"""
2115
2116     encryption_type = ESP
2117     tun6_encrypt_node_name = "esp6-encrypt-tun"
2118     tun6_decrypt_node_name = "esp6-decrypt-tun"
2119
2120     def setUp(self):
2121         super(TestIpsec6TunProtect, self).setUp()
2122
2123         self.tun_if = self.pg0
2124
2125     def tearDown(self):
2126         super(TestIpsec6TunProtect, self).tearDown()
2127
2128     def test_tun_66(self):
2129         """IPSEC tunnel protect 6o6"""
2130
2131         p = self.ipv6_params
2132
2133         self.config_network(p)
2134         self.config_sa_tra(p)
2135         self.config_protect(p)
2136
2137         self.verify_tun_66(p, count=127)
2138         c = p.tun_if.get_rx_stats()
2139         self.assertEqual(c['packets'], 127)
2140         c = p.tun_if.get_tx_stats()
2141         self.assertEqual(c['packets'], 127)
2142
2143         # rekey - create new SAs and update the tunnel protection
2144         np = copy.copy(p)
2145         np.crypt_key = b'X' + p.crypt_key[1:]
2146         np.scapy_tun_spi += 100
2147         np.scapy_tun_sa_id += 1
2148         np.vpp_tun_spi += 100
2149         np.vpp_tun_sa_id += 1
2150         np.tun_if.local_spi = p.vpp_tun_spi
2151         np.tun_if.remote_spi = p.scapy_tun_spi
2152
2153         self.config_sa_tra(np)
2154         self.config_protect(np)
2155         self.unconfig_sa(p)
2156
2157         self.verify_tun_66(np, count=127)
2158         c = p.tun_if.get_rx_stats()
2159         self.assertEqual(c['packets'], 254)
2160         c = p.tun_if.get_tx_stats()
2161         self.assertEqual(c['packets'], 254)
2162
2163         # bounce the interface state
2164         p.tun_if.admin_down()
2165         self.verify_drop_tun_66(np, count=127)
2166         node = ('/err/ipsec6-tun-input/%s' %
2167                 'ipsec packets received on disabled interface')
2168         self.assertEqual(127, self.statistics.get_err_counter(node))
2169         p.tun_if.admin_up()
2170         self.verify_tun_66(np, count=127)
2171
2172         # 3 phase rekey
2173         #  1) add two input SAs [old, new]
2174         #  2) swap output SA to [new]
2175         #  3) use only [new] input SA
2176         np3 = copy.copy(np)
2177         np3.crypt_key = b'Z' + p.crypt_key[1:]
2178         np3.scapy_tun_spi += 100
2179         np3.scapy_tun_sa_id += 1
2180         np3.vpp_tun_spi += 100
2181         np3.vpp_tun_sa_id += 1
2182         np3.tun_if.local_spi = p.vpp_tun_spi
2183         np3.tun_if.remote_spi = p.scapy_tun_spi
2184
2185         self.config_sa_tra(np3)
2186
2187         # step 1;
2188         p.tun_protect.update_vpp_config(np.tun_sa_out,
2189                                         [np.tun_sa_in, np3.tun_sa_in])
2190         self.verify_tun_66(np, np, count=127)
2191         self.verify_tun_66(np3, np, count=127)
2192
2193         # step 2;
2194         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2195                                         [np.tun_sa_in, np3.tun_sa_in])
2196         self.verify_tun_66(np, np3, count=127)
2197         self.verify_tun_66(np3, np3, count=127)
2198
2199         # step 1;
2200         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2201                                         [np3.tun_sa_in])
2202         self.verify_tun_66(np3, np3, count=127)
2203         self.verify_drop_tun_66(np, count=127)
2204
2205         c = p.tun_if.get_rx_stats()
2206         self.assertEqual(c['packets'], 127*9)
2207         c = p.tun_if.get_tx_stats()
2208         self.assertEqual(c['packets'], 127*8)
2209         self.unconfig_sa(np)
2210
2211         # teardown
2212         self.unconfig_protect(np3)
2213         self.unconfig_sa(np3)
2214         self.unconfig_network(p)
2215
2216     def test_tun_46(self):
2217         """IPSEC tunnel protect 4o6"""
2218
2219         p = self.ipv6_params
2220
2221         self.config_network(p)
2222         self.config_sa_tra(p)
2223         self.config_protect(p)
2224
2225         self.verify_tun_46(p, count=127)
2226         c = p.tun_if.get_rx_stats()
2227         self.assertEqual(c['packets'], 127)
2228         c = p.tun_if.get_tx_stats()
2229         self.assertEqual(c['packets'], 127)
2230
2231         # teardown
2232         self.unconfig_protect(p)
2233         self.unconfig_sa(p)
2234         self.unconfig_network(p)
2235
2236
2237 class TestIpsec6TunProtectTun(TemplateIpsec,
2238                               TemplateIpsec6TunProtect,
2239                               IpsecTun6):
2240     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2241
2242     encryption_type = ESP
2243     tun6_encrypt_node_name = "esp6-encrypt-tun"
2244     tun6_decrypt_node_name = "esp6-decrypt-tun"
2245
2246     def setUp(self):
2247         super(TestIpsec6TunProtectTun, self).setUp()
2248
2249         self.tun_if = self.pg0
2250
2251     def tearDown(self):
2252         super(TestIpsec6TunProtectTun, self).tearDown()
2253
2254     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2255                           payload_size=100):
2256         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2257                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2258                                 dst=sw_intf.local_ip6) /
2259                            IPv6(src=src, dst=dst) /
2260                            UDP(sport=1166, dport=2233) /
2261                            Raw(b'X' * payload_size))
2262                 for i in range(count)]
2263
2264     def gen_pkts6(self, sw_intf, src, dst, count=1,
2265                   payload_size=100):
2266         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2267                 IPv6(src=src, dst=dst) /
2268                 UDP(sport=1166, dport=2233) /
2269                 Raw(b'X' * payload_size)
2270                 for i in range(count)]
2271
2272     def verify_decrypted6(self, p, rxs):
2273         for rx in rxs:
2274             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2275             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2276             self.assert_packet_checksums_valid(rx)
2277
2278     def verify_encrypted6(self, p, sa, rxs):
2279         for rx in rxs:
2280             try:
2281                 pkt = sa.decrypt(rx[IPv6])
2282                 if not pkt.haslayer(IPv6):
2283                     pkt = IPv6(pkt[Raw].load)
2284                 self.assert_packet_checksums_valid(pkt)
2285                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2286                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2287                 inner = pkt[IPv6].payload
2288                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2289
2290             except (IndexError, AssertionError):
2291                 self.logger.debug(ppp("Unexpected packet:", rx))
2292                 try:
2293                     self.logger.debug(ppp("Decrypted packet:", pkt))
2294                 except:
2295                     pass
2296                 raise
2297
2298     def test_tun_66(self):
2299         """IPSEC tunnel protect """
2300
2301         p = self.ipv6_params
2302
2303         self.config_network(p)
2304         self.config_sa_tun(p)
2305         self.config_protect(p)
2306
2307         self.verify_tun_66(p, count=127)
2308
2309         c = p.tun_if.get_rx_stats()
2310         self.assertEqual(c['packets'], 127)
2311         c = p.tun_if.get_tx_stats()
2312         self.assertEqual(c['packets'], 127)
2313
2314         # rekey - create new SAs and update the tunnel protection
2315         np = copy.copy(p)
2316         np.crypt_key = b'X' + p.crypt_key[1:]
2317         np.scapy_tun_spi += 100
2318         np.scapy_tun_sa_id += 1
2319         np.vpp_tun_spi += 100
2320         np.vpp_tun_sa_id += 1
2321         np.tun_if.local_spi = p.vpp_tun_spi
2322         np.tun_if.remote_spi = p.scapy_tun_spi
2323
2324         self.config_sa_tun(np)
2325         self.config_protect(np)
2326         self.unconfig_sa(p)
2327
2328         self.verify_tun_66(np, count=127)
2329         c = p.tun_if.get_rx_stats()
2330         self.assertEqual(c['packets'], 254)
2331         c = p.tun_if.get_tx_stats()
2332         self.assertEqual(c['packets'], 254)
2333
2334         # teardown
2335         self.unconfig_protect(np)
2336         self.unconfig_sa(np)
2337         self.unconfig_network(p)
2338
2339
2340 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2341                                   TemplateIpsec6TunProtect,
2342                                   IpsecTun6):
2343     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2344
2345     encryption_type = ESP
2346     tun6_encrypt_node_name = "esp6-encrypt-tun"
2347     tun6_decrypt_node_name = "esp6-decrypt-tun"
2348
2349     def setUp(self):
2350         super(TestIpsec6TunProtectTunDrop, self).setUp()
2351
2352         self.tun_if = self.pg0
2353
2354     def tearDown(self):
2355         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2356
2357     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2358                           payload_size=100):
2359         # the IP destination of the revelaed packet does not match
2360         # that assigned to the tunnel
2361         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2362                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2363                                 dst="5::5") /
2364                            IPv6(src=src, dst=dst) /
2365                            UDP(sport=1144, dport=2233) /
2366                            Raw(b'X' * payload_size))
2367                 for i in range(count)]
2368
2369     def test_tun_drop_66(self):
2370         """IPSEC 6 tunnel protect bogus tunnel header """
2371
2372         p = self.ipv6_params
2373
2374         self.config_network(p)
2375         self.config_sa_tun(p)
2376         self.config_protect(p)
2377
2378         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2379                                     src=p.remote_tun_if_host,
2380                                     dst=self.pg1.remote_ip6,
2381                                     count=63)
2382         self.send_and_assert_no_replies(self.tun_if, tx)
2383
2384         self.unconfig_protect(p)
2385         self.unconfig_sa(p)
2386         self.unconfig_network(p)
2387
2388
2389 class TemplateIpsecItf4(object):
2390     """ IPsec Interface IPv4 """
2391
2392     encryption_type = ESP
2393     tun4_encrypt_node_name = "esp4-encrypt-tun"
2394     tun4_decrypt_node_name = "esp4-decrypt-tun"
2395     tun4_input_node = "ipsec4-tun-input"
2396
2397     def config_sa_tun(self, p, src, dst):
2398         config_tun_params(p, self.encryption_type, None, src, dst)
2399
2400         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2401                                   p.auth_algo_vpp_id, p.auth_key,
2402                                   p.crypt_algo_vpp_id, p.crypt_key,
2403                                   self.vpp_esp_protocol,
2404                                   src, dst,
2405                                   flags=p.flags)
2406         p.tun_sa_out.add_vpp_config()
2407
2408         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2409                                  p.auth_algo_vpp_id, p.auth_key,
2410                                  p.crypt_algo_vpp_id, p.crypt_key,
2411                                  self.vpp_esp_protocol,
2412                                  dst, src,
2413                                  flags=p.flags)
2414         p.tun_sa_in.add_vpp_config()
2415
2416     def config_protect(self, p):
2417         p.tun_protect = VppIpsecTunProtect(self,
2418                                            p.tun_if,
2419                                            p.tun_sa_out,
2420                                            [p.tun_sa_in])
2421         p.tun_protect.add_vpp_config()
2422
2423     def config_network(self, p, instance=0xffffffff):
2424         p.tun_if = VppIpsecInterface(self, instance=instance)
2425
2426         p.tun_if.add_vpp_config()
2427         p.tun_if.admin_up()
2428         p.tun_if.config_ip4()
2429         p.tun_if.config_ip6()
2430
2431         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2432                              [VppRoutePath(p.tun_if.remote_ip4,
2433                                            0xffffffff)])
2434         p.route.add_vpp_config()
2435         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2436                        [VppRoutePath(p.tun_if.remote_ip6,
2437                                      0xffffffff,
2438                                      proto=DpoProto.DPO_PROTO_IP6)])
2439         r.add_vpp_config()
2440
2441     def unconfig_network(self, p):
2442         p.route.remove_vpp_config()
2443         p.tun_if.remove_vpp_config()
2444
2445     def unconfig_protect(self, p):
2446         p.tun_protect.remove_vpp_config()
2447
2448     def unconfig_sa(self, p):
2449         p.tun_sa_out.remove_vpp_config()
2450         p.tun_sa_in.remove_vpp_config()
2451
2452
2453 class TestIpsecItf4(TemplateIpsec,
2454                     TemplateIpsecItf4,
2455                     IpsecTun4):
2456     """ IPsec Interface IPv4 """
2457
2458     def setUp(self):
2459         super(TestIpsecItf4, self).setUp()
2460
2461         self.tun_if = self.pg0
2462
2463     def tearDown(self):
2464         super(TestIpsecItf4, self).tearDown()
2465
2466     def test_tun_instance_44(self):
2467         p = self.ipv4_params
2468         self.config_network(p, instance=3)
2469
2470         with self.assertRaises(CliFailedCommandError):
2471             self.vapi.cli("show interface ipsec0")
2472
2473         output = self.vapi.cli("show interface ipsec3")
2474         self.assertTrue("unknown" not in output)
2475
2476         self.unconfig_network(p)
2477
2478     def test_tun_44(self):
2479         """IPSEC interface IPv4"""
2480
2481         n_pkts = 127
2482         p = self.ipv4_params
2483
2484         self.config_network(p)
2485         self.config_sa_tun(p,
2486                            self.pg0.local_ip4,
2487                            self.pg0.remote_ip4)
2488         self.config_protect(p)
2489
2490         self.verify_tun_44(p, count=n_pkts)
2491         c = p.tun_if.get_rx_stats()
2492         self.assertEqual(c['packets'], n_pkts)
2493         c = p.tun_if.get_tx_stats()
2494         self.assertEqual(c['packets'], n_pkts)
2495
2496         p.tun_if.admin_down()
2497         self.verify_tun_dropped_44(p, count=n_pkts)
2498         p.tun_if.admin_up()
2499         self.verify_tun_44(p, count=n_pkts)
2500
2501         c = p.tun_if.get_rx_stats()
2502         self.assertEqual(c['packets'], 3*n_pkts)
2503         c = p.tun_if.get_tx_stats()
2504         self.assertEqual(c['packets'], 2*n_pkts)
2505
2506         # it's a v6 packet when its encrypted
2507         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2508
2509         self.verify_tun_64(p, count=n_pkts)
2510         c = p.tun_if.get_rx_stats()
2511         self.assertEqual(c['packets'], 4*n_pkts)
2512         c = p.tun_if.get_tx_stats()
2513         self.assertEqual(c['packets'], 3*n_pkts)
2514
2515         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2516
2517         self.vapi.cli("clear interfaces")
2518
2519         # rekey - create new SAs and update the tunnel protection
2520         np = copy.copy(p)
2521         np.crypt_key = b'X' + p.crypt_key[1:]
2522         np.scapy_tun_spi += 100
2523         np.scapy_tun_sa_id += 1
2524         np.vpp_tun_spi += 100
2525         np.vpp_tun_sa_id += 1
2526         np.tun_if.local_spi = p.vpp_tun_spi
2527         np.tun_if.remote_spi = p.scapy_tun_spi
2528
2529         self.config_sa_tun(np,
2530                            self.pg0.local_ip4,
2531                            self.pg0.remote_ip4)
2532         self.config_protect(np)
2533         self.unconfig_sa(p)
2534
2535         self.verify_tun_44(np, count=n_pkts)
2536         c = p.tun_if.get_rx_stats()
2537         self.assertEqual(c['packets'], n_pkts)
2538         c = p.tun_if.get_tx_stats()
2539         self.assertEqual(c['packets'], n_pkts)
2540
2541         # teardown
2542         self.unconfig_protect(np)
2543         self.unconfig_sa(np)
2544         self.unconfig_network(p)
2545
2546     def test_tun_44_null(self):
2547         """IPSEC interface IPv4 NULL auth/crypto"""
2548
2549         n_pkts = 127
2550         p = copy.copy(self.ipv4_params)
2551
2552         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2553                               IPSEC_API_INTEG_ALG_NONE)
2554         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2555                                IPSEC_API_CRYPTO_ALG_NONE)
2556         p.crypt_algo = "NULL"
2557         p.auth_algo = "NULL"
2558
2559         self.config_network(p)
2560         self.config_sa_tun(p,
2561                            self.pg0.local_ip4,
2562                            self.pg0.remote_ip4)
2563         self.config_protect(p)
2564
2565         self.verify_tun_44(p, count=n_pkts)
2566
2567         # teardown
2568         self.unconfig_protect(p)
2569         self.unconfig_sa(p)
2570         self.unconfig_network(p)
2571
2572
2573 class TemplateIpsecItf6(object):
2574     """ IPsec Interface IPv6 """
2575
2576     encryption_type = ESP
2577     tun6_encrypt_node_name = "esp6-encrypt-tun"
2578     tun6_decrypt_node_name = "esp6-decrypt-tun"
2579     tun6_input_node = "ipsec6-tun-input"
2580
2581     def config_sa_tun(self, p, src, dst):
2582         config_tun_params(p, self.encryption_type, None, src, dst)
2583
2584         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2585                                   p.auth_algo_vpp_id, p.auth_key,
2586                                   p.crypt_algo_vpp_id, p.crypt_key,
2587                                   self.vpp_esp_protocol,
2588                                   src, dst,
2589                                   flags=p.flags)
2590         p.tun_sa_out.add_vpp_config()
2591
2592         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2593                                  p.auth_algo_vpp_id, p.auth_key,
2594                                  p.crypt_algo_vpp_id, p.crypt_key,
2595                                  self.vpp_esp_protocol,
2596                                  dst, src,
2597                                  flags=p.flags)
2598         p.tun_sa_in.add_vpp_config()
2599
2600     def config_protect(self, p):
2601         p.tun_protect = VppIpsecTunProtect(self,
2602                                            p.tun_if,
2603                                            p.tun_sa_out,
2604                                            [p.tun_sa_in])
2605         p.tun_protect.add_vpp_config()
2606
2607     def config_network(self, p):
2608         p.tun_if = VppIpsecInterface(self)
2609
2610         p.tun_if.add_vpp_config()
2611         p.tun_if.admin_up()
2612         p.tun_if.config_ip4()
2613         p.tun_if.config_ip6()
2614
2615         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2616                        [VppRoutePath(p.tun_if.remote_ip4,
2617                                      0xffffffff)])
2618         r.add_vpp_config()
2619
2620         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2621                              [VppRoutePath(p.tun_if.remote_ip6,
2622                                            0xffffffff,
2623                                            proto=DpoProto.DPO_PROTO_IP6)])
2624         p.route.add_vpp_config()
2625
2626     def unconfig_network(self, p):
2627         p.route.remove_vpp_config()
2628         p.tun_if.remove_vpp_config()
2629
2630     def unconfig_protect(self, p):
2631         p.tun_protect.remove_vpp_config()
2632
2633     def unconfig_sa(self, p):
2634         p.tun_sa_out.remove_vpp_config()
2635         p.tun_sa_in.remove_vpp_config()
2636
2637
2638 class TestIpsecItf6(TemplateIpsec,
2639                     TemplateIpsecItf6,
2640                     IpsecTun6):
2641     """ IPsec Interface IPv6 """
2642
2643     def setUp(self):
2644         super(TestIpsecItf6, self).setUp()
2645
2646         self.tun_if = self.pg0
2647
2648     def tearDown(self):
2649         super(TestIpsecItf6, self).tearDown()
2650
2651     def test_tun_44(self):
2652         """IPSEC interface IPv6"""
2653
2654         n_pkts = 127
2655         p = self.ipv6_params
2656
2657         self.config_network(p)
2658         self.config_sa_tun(p,
2659                            self.pg0.local_ip6,
2660                            self.pg0.remote_ip6)
2661         self.config_protect(p)
2662
2663         self.verify_tun_66(p, count=n_pkts)
2664         c = p.tun_if.get_rx_stats()
2665         self.assertEqual(c['packets'], n_pkts)
2666         c = p.tun_if.get_tx_stats()
2667         self.assertEqual(c['packets'], n_pkts)
2668
2669         p.tun_if.admin_down()
2670         self.verify_drop_tun_66(p, count=n_pkts)
2671         p.tun_if.admin_up()
2672         self.verify_tun_66(p, count=n_pkts)
2673
2674         c = p.tun_if.get_rx_stats()
2675         self.assertEqual(c['packets'], 3*n_pkts)
2676         c = p.tun_if.get_tx_stats()
2677         self.assertEqual(c['packets'], 2*n_pkts)
2678
2679         # it's a v4 packet when its encrypted
2680         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2681
2682         self.verify_tun_46(p, count=n_pkts)
2683         c = p.tun_if.get_rx_stats()
2684         self.assertEqual(c['packets'], 4*n_pkts)
2685         c = p.tun_if.get_tx_stats()
2686         self.assertEqual(c['packets'], 3*n_pkts)
2687
2688         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2689
2690         self.vapi.cli("clear interfaces")
2691
2692         # rekey - create new SAs and update the tunnel protection
2693         np = copy.copy(p)
2694         np.crypt_key = b'X' + p.crypt_key[1:]
2695         np.scapy_tun_spi += 100
2696         np.scapy_tun_sa_id += 1
2697         np.vpp_tun_spi += 100
2698         np.vpp_tun_sa_id += 1
2699         np.tun_if.local_spi = p.vpp_tun_spi
2700         np.tun_if.remote_spi = p.scapy_tun_spi
2701
2702         self.config_sa_tun(np,
2703                            self.pg0.local_ip6,
2704                            self.pg0.remote_ip6)
2705         self.config_protect(np)
2706         self.unconfig_sa(p)
2707
2708         self.verify_tun_66(np, count=n_pkts)
2709         c = p.tun_if.get_rx_stats()
2710         self.assertEqual(c['packets'], n_pkts)
2711         c = p.tun_if.get_tx_stats()
2712         self.assertEqual(c['packets'], n_pkts)
2713
2714         # teardown
2715         self.unconfig_protect(np)
2716         self.unconfig_sa(np)
2717         self.unconfig_network(p)
2718
2719
2720 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2721     """ Ipsec P2MP ESP v4 tests """
2722     tun4_encrypt_node_name = "esp4-encrypt-tun"
2723     tun4_decrypt_node_name = "esp4-decrypt-tun"
2724     encryption_type = ESP
2725
2726     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2727                          payload_size=100):
2728         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2729                 sa.encrypt(IP(src=self.pg1.local_ip4,
2730                               dst=self.pg1.remote_ip4) /
2731                            UDP(sport=1144, dport=2233) /
2732                            Raw(b'X' * payload_size))
2733                 for i in range(count)]
2734
2735     def gen_pkts(self, sw_intf, src, dst, count=1,
2736                  payload_size=100):
2737         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2738                 IP(src="1.1.1.1", dst=dst) /
2739                 UDP(sport=1144, dport=2233) /
2740                 Raw(b'X' * payload_size)
2741                 for i in range(count)]
2742
2743     def verify_decrypted(self, p, rxs):
2744         for rx in rxs:
2745             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2746             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2747
2748     def verify_encrypted(self, p, sa, rxs):
2749         for rx in rxs:
2750             try:
2751                 self.assertEqual(rx[IP].tos,
2752                                  VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
2753                 pkt = sa.decrypt(rx[IP])
2754                 if not pkt.haslayer(IP):
2755                     pkt = IP(pkt[Raw].load)
2756                 self.assert_packet_checksums_valid(pkt)
2757                 e = pkt[IP]
2758                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2759             except (IndexError, AssertionError):
2760                 self.logger.debug(ppp("Unexpected packet:", rx))
2761                 try:
2762                     self.logger.debug(ppp("Decrypted packet:", pkt))
2763                 except:
2764                     pass
2765                 raise
2766
2767     def setUp(self):
2768         super(TestIpsecMIfEsp4, self).setUp()
2769
2770         N_NHS = 16
2771         self.tun_if = self.pg0
2772         p = self.ipv4_params
2773         p.tun_if = VppIpsecInterface(self,
2774                                      mode=(VppEnum.vl_api_tunnel_mode_t.
2775                                            TUNNEL_API_MODE_MP))
2776         p.tun_if.add_vpp_config()
2777         p.tun_if.admin_up()
2778         p.tun_if.config_ip4()
2779         p.tun_if.unconfig_ip4()
2780         p.tun_if.config_ip4()
2781         p.tun_if.generate_remote_hosts(N_NHS)
2782         self.pg0.generate_remote_hosts(N_NHS)
2783         self.pg0.configure_ipv4_neighbors()
2784
2785         # setup some SAs for several next-hops on the interface
2786         self.multi_params = []
2787
2788         for ii in range(N_NHS):
2789             p = copy.copy(self.ipv4_params)
2790
2791             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2792             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2793             p.scapy_tun_spi = p.scapy_tun_spi + ii
2794             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2795             p.vpp_tun_spi = p.vpp_tun_spi + ii
2796
2797             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2798             p.scapy_tra_spi = p.scapy_tra_spi + ii
2799             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2800             p.vpp_tra_spi = p.vpp_tra_spi + ii
2801             p.tun_sa_out = VppIpsecSA(
2802                 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2803                 p.auth_algo_vpp_id, p.auth_key,
2804                 p.crypt_algo_vpp_id, p.crypt_key,
2805                 self.vpp_esp_protocol,
2806                 self.pg0.local_ip4,
2807                 self.pg0.remote_hosts[ii].ip4,
2808                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2809             p.tun_sa_out.add_vpp_config()
2810
2811             p.tun_sa_in = VppIpsecSA(
2812                 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2813                 p.auth_algo_vpp_id, p.auth_key,
2814                 p.crypt_algo_vpp_id, p.crypt_key,
2815                 self.vpp_esp_protocol,
2816                 self.pg0.remote_hosts[ii].ip4,
2817                 self.pg0.local_ip4,
2818                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2819             p.tun_sa_in.add_vpp_config()
2820
2821             p.tun_protect = VppIpsecTunProtect(
2822                 self,
2823                 p.tun_if,
2824                 p.tun_sa_out,
2825                 [p.tun_sa_in],
2826                 nh=p.tun_if.remote_hosts[ii].ip4)
2827             p.tun_protect.add_vpp_config()
2828             config_tun_params(p, self.encryption_type, None,
2829                               self.pg0.local_ip4,
2830                               self.pg0.remote_hosts[ii].ip4)
2831             self.multi_params.append(p)
2832
2833             VppIpRoute(self, p.remote_tun_if_host, 32,
2834                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2835                                      p.tun_if.sw_if_index)]).add_vpp_config()
2836
2837             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2838
2839     def tearDown(self):
2840         p = self.ipv4_params
2841         p.tun_if.unconfig_ip4()
2842         super(TestIpsecMIfEsp4, self).tearDown()
2843
2844     def test_tun_44(self):
2845         """P2MP IPSEC 44"""
2846         N_PKTS = 63
2847         for p in self.multi_params:
2848             self.verify_tun_44(p, count=N_PKTS)
2849
2850
2851 if __name__ == '__main__':
2852     unittest.main(testRunner=VppTestRunner)