ipsec: User can choose the UDP source port
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
22 from util import ppp
23 from vpp_papi import VppEnum
24
25
26 def config_tun_params(p, encryption_type, tun_if):
27     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29                              IPSEC_API_SAD_FLAG_USE_ESN))
30     crypt_key = mk_scapy_crypt_key(p)
31     p.tun_dst = tun_if.remote_ip
32     p.tun_src = tun_if.local_ip
33     p.scapy_tun_sa = SecurityAssociation(
34         encryption_type, spi=p.vpp_tun_spi,
35         crypt_algo=p.crypt_algo,
36         crypt_key=crypt_key,
37         auth_algo=p.auth_algo, auth_key=p.auth_key,
38         tunnel_header=ip_class_by_addr_type[p.addr_type](
39             src=p.tun_dst,
40             dst=p.tun_src),
41         nat_t_header=p.nat_header,
42         esn_en=esn_en)
43     p.vpp_tun_sa = SecurityAssociation(
44         encryption_type, spi=p.scapy_tun_spi,
45         crypt_algo=p.crypt_algo,
46         crypt_key=crypt_key,
47         auth_algo=p.auth_algo, auth_key=p.auth_key,
48         tunnel_header=ip_class_by_addr_type[p.addr_type](
49             dst=p.tun_dst,
50             src=p.tun_src),
51         nat_t_header=p.nat_header,
52         esn_en=esn_en)
53
54
55 def config_tra_params(p, encryption_type, tun_if):
56     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58                              IPSEC_API_SAD_FLAG_USE_ESN))
59     crypt_key = mk_scapy_crypt_key(p)
60     p.tun_dst = tun_if.remote_ip
61     p.tun_src = tun_if.local_ip
62     p.scapy_tun_sa = SecurityAssociation(
63         encryption_type, spi=p.vpp_tun_spi,
64         crypt_algo=p.crypt_algo,
65         crypt_key=crypt_key,
66         auth_algo=p.auth_algo, auth_key=p.auth_key,
67         esn_en=esn_en,
68         nat_t_header=p.nat_header)
69     p.vpp_tun_sa = SecurityAssociation(
70         encryption_type, spi=p.scapy_tun_spi,
71         crypt_algo=p.crypt_algo,
72         crypt_key=crypt_key,
73         auth_algo=p.auth_algo, auth_key=p.auth_key,
74         esn_en=esn_en,
75         nat_t_header=p.nat_header)
76
77
78 class TemplateIpsec4TunIfEsp(TemplateIpsec):
79     """ IPsec tunnel interface tests """
80
81     encryption_type = ESP
82
83     @classmethod
84     def setUpClass(cls):
85         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
86
87     @classmethod
88     def tearDownClass(cls):
89         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
90
91     def setUp(self):
92         super(TemplateIpsec4TunIfEsp, self).setUp()
93
94         self.tun_if = self.pg0
95
96         p = self.ipv4_params
97
98         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
99                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
100                                         p.crypt_key, p.crypt_key,
101                                         p.auth_algo_vpp_id, p.auth_key,
102                                         p.auth_key)
103         p.tun_if.add_vpp_config()
104         p.tun_if.admin_up()
105         p.tun_if.config_ip4()
106         p.tun_if.config_ip6()
107         config_tun_params(p, self.encryption_type, p.tun_if)
108
109         r = VppIpRoute(self, p.remote_tun_if_host, 32,
110                        [VppRoutePath(p.tun_if.remote_ip4,
111                                      0xffffffff)])
112         r.add_vpp_config()
113         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
114                        [VppRoutePath(p.tun_if.remote_ip6,
115                                      0xffffffff,
116                                      proto=DpoProto.DPO_PROTO_IP6)])
117         r.add_vpp_config()
118
119     def tearDown(self):
120         super(TemplateIpsec4TunIfEsp, self).tearDown()
121
122
123 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
124     """ IPsec UDP tunnel interface tests """
125
126     tun4_encrypt_node_name = "esp4-encrypt-tun"
127     tun4_decrypt_node_name = "esp4-decrypt-tun"
128     encryption_type = ESP
129
130     @classmethod
131     def setUpClass(cls):
132         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
133
134     @classmethod
135     def tearDownClass(cls):
136         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
137
138     def verify_encrypted(self, p, sa, rxs):
139         for rx in rxs:
140             try:
141                 # ensure the UDP ports are correct before we decrypt
142                 # which strips them
143                 self.assertTrue(rx.haslayer(UDP))
144                 self.assert_equal(rx[UDP].sport, 4500)
145                 self.assert_equal(rx[UDP].dport, 4500)
146
147                 pkt = sa.decrypt(rx[IP])
148                 if not pkt.haslayer(IP):
149                     pkt = IP(pkt[Raw].load)
150
151                 self.assert_packet_checksums_valid(pkt)
152                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
153                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
154             except (IndexError, AssertionError):
155                 self.logger.debug(ppp("Unexpected packet:", rx))
156                 try:
157                     self.logger.debug(ppp("Decrypted packet:", pkt))
158                 except:
159                     pass
160                 raise
161
162     def setUp(self):
163         super(TemplateIpsec4TunIfEspUdp, self).setUp()
164
165         p = self.ipv4_params
166         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
167                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
168         p.nat_header = UDP(sport=5454, dport=4500)
169
170     def config_network(self):
171
172         self.tun_if = self.pg0
173         p = self.ipv4_params
174         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
175                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
176                                         p.crypt_key, p.crypt_key,
177                                         p.auth_algo_vpp_id, p.auth_key,
178                                         p.auth_key, udp_encap=True)
179         p.tun_if.add_vpp_config()
180         p.tun_if.admin_up()
181         p.tun_if.config_ip4()
182         p.tun_if.config_ip6()
183         config_tun_params(p, self.encryption_type, p.tun_if)
184
185         r = VppIpRoute(self, p.remote_tun_if_host, 32,
186                        [VppRoutePath(p.tun_if.remote_ip4,
187                                      0xffffffff)])
188         r.add_vpp_config()
189         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
190                        [VppRoutePath(p.tun_if.remote_ip6,
191                                      0xffffffff,
192                                      proto=DpoProto.DPO_PROTO_IP6)])
193         r.add_vpp_config()
194
195     def tearDown(self):
196         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
197
198
199 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
200     """ Ipsec ESP - TUN tests """
201     tun4_encrypt_node_name = "esp4-encrypt-tun"
202     tun4_decrypt_node_name = "esp4-decrypt-tun"
203
204     def test_tun_basic64(self):
205         """ ipsec 6o4 tunnel basic test """
206         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
207
208         self.verify_tun_64(self.params[socket.AF_INET], count=1)
209
210     def test_tun_burst64(self):
211         """ ipsec 6o4 tunnel basic test """
212         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
213
214         self.verify_tun_64(self.params[socket.AF_INET], count=257)
215
216     def test_tun_basic_frag44(self):
217         """ ipsec 4o4 tunnel frag basic test """
218         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
219
220         p = self.ipv4_params
221
222         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
223                                        [1500, 0, 0, 0])
224         self.verify_tun_44(self.params[socket.AF_INET],
225                            count=1, payload_size=1800, n_rx=2)
226         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
227                                        [9000, 0, 0, 0])
228
229
230 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
231     """ Ipsec ESP UDP tests """
232
233     tun4_input_node = "ipsec4-tun-input"
234
235     def setUp(self):
236         super(TemplateIpsec4TunIfEspUdp, self).setUp()
237         self.config_network()
238
239     def test_keepalive(self):
240         """ IPSEC NAT Keepalive """
241         self.verify_keepalive(self.ipv4_params)
242
243
244 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
245     """ Ipsec ESP UDP GCM tests """
246
247     tun4_input_node = "ipsec4-tun-input"
248
249     def setUp(self):
250         super(TemplateIpsec4TunIfEspUdp, self).setUp()
251         p = self.ipv4_params
252         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
253                               IPSEC_API_INTEG_ALG_NONE)
254         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
255                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
256         p.crypt_algo = "AES-GCM"
257         p.auth_algo = "NULL"
258         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
259         p.salt = 0
260         self.config_network()
261
262
263 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
264     """ Ipsec ESP - TCP tests """
265     pass
266
267
268 class TemplateIpsec6TunIfEsp(TemplateIpsec):
269     """ IPsec tunnel interface tests """
270
271     encryption_type = ESP
272
273     def setUp(self):
274         super(TemplateIpsec6TunIfEsp, self).setUp()
275
276         self.tun_if = self.pg0
277
278         p = self.ipv6_params
279         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
280                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
281                                         p.crypt_key, p.crypt_key,
282                                         p.auth_algo_vpp_id, p.auth_key,
283                                         p.auth_key, is_ip6=True)
284         p.tun_if.add_vpp_config()
285         p.tun_if.admin_up()
286         p.tun_if.config_ip6()
287         p.tun_if.config_ip4()
288         config_tun_params(p, self.encryption_type, p.tun_if)
289
290         r = VppIpRoute(self, p.remote_tun_if_host, 128,
291                        [VppRoutePath(p.tun_if.remote_ip6,
292                                      0xffffffff,
293                                      proto=DpoProto.DPO_PROTO_IP6)])
294         r.add_vpp_config()
295         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
296                        [VppRoutePath(p.tun_if.remote_ip4,
297                                      0xffffffff)])
298         r.add_vpp_config()
299
300     def tearDown(self):
301         super(TemplateIpsec6TunIfEsp, self).tearDown()
302
303
304 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
305                           IpsecTun6Tests):
306     """ Ipsec ESP - TUN tests """
307     tun6_encrypt_node_name = "esp6-encrypt-tun"
308     tun6_decrypt_node_name = "esp6-decrypt-tun"
309
310     def test_tun_basic46(self):
311         """ ipsec 4o6 tunnel basic test """
312         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
313         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
314
315     def test_tun_burst46(self):
316         """ ipsec 4o6 tunnel burst test """
317         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
318         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
319
320
321 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
322                                 IpsecTun6HandoffTests):
323     """ Ipsec ESP 6 Handoff tests """
324     tun6_encrypt_node_name = "esp6-encrypt-tun"
325     tun6_decrypt_node_name = "esp6-decrypt-tun"
326
327
328 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
329                                 IpsecTun4HandoffTests):
330     """ Ipsec ESP 4 Handoff tests """
331     tun4_encrypt_node_name = "esp4-encrypt-tun"
332     tun4_decrypt_node_name = "esp4-decrypt-tun"
333
334
335 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
336     """ IPsec IPv4 Multi Tunnel interface """
337
338     encryption_type = ESP
339     tun4_encrypt_node_name = "esp4-encrypt-tun"
340     tun4_decrypt_node_name = "esp4-decrypt-tun"
341
342     def setUp(self):
343         super(TestIpsec4MultiTunIfEsp, self).setUp()
344
345         self.tun_if = self.pg0
346
347         self.multi_params = []
348         self.pg0.generate_remote_hosts(10)
349         self.pg0.configure_ipv4_neighbors()
350
351         for ii in range(10):
352             p = copy.copy(self.ipv4_params)
353
354             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
355             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
356             p.scapy_tun_spi = p.scapy_tun_spi + ii
357             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
358             p.vpp_tun_spi = p.vpp_tun_spi + ii
359
360             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
361             p.scapy_tra_spi = p.scapy_tra_spi + ii
362             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
363             p.vpp_tra_spi = p.vpp_tra_spi + ii
364             p.tun_dst = self.pg0.remote_hosts[ii].ip4
365
366             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
367                                             p.scapy_tun_spi,
368                                             p.crypt_algo_vpp_id,
369                                             p.crypt_key, p.crypt_key,
370                                             p.auth_algo_vpp_id, p.auth_key,
371                                             p.auth_key,
372                                             dst=p.tun_dst)
373             p.tun_if.add_vpp_config()
374             p.tun_if.admin_up()
375             p.tun_if.config_ip4()
376             config_tun_params(p, self.encryption_type, p.tun_if)
377             self.multi_params.append(p)
378
379             VppIpRoute(self, p.remote_tun_if_host, 32,
380                        [VppRoutePath(p.tun_if.remote_ip4,
381                                      0xffffffff)]).add_vpp_config()
382
383     def tearDown(self):
384         super(TestIpsec4MultiTunIfEsp, self).tearDown()
385
386     def test_tun_44(self):
387         """Multiple IPSEC tunnel interfaces """
388         for p in self.multi_params:
389             self.verify_tun_44(p, count=127)
390             c = p.tun_if.get_rx_stats()
391             self.assertEqual(c['packets'], 127)
392             c = p.tun_if.get_tx_stats()
393             self.assertEqual(c['packets'], 127)
394
395     def test_tun_rr_44(self):
396         """ Round-robin packets acrros multiple interface """
397         tx = []
398         for p in self.multi_params:
399             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
400                                             src=p.remote_tun_if_host,
401                                             dst=self.pg1.remote_ip4)
402         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
403
404         for rx, p in zip(rxs, self.multi_params):
405             self.verify_decrypted(p, [rx])
406
407         tx = []
408         for p in self.multi_params:
409             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
410                                     dst=p.remote_tun_if_host)
411         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
412
413         for rx, p in zip(rxs, self.multi_params):
414             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
415
416
417 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
418     """ IPsec IPv4 Tunnel interface all Algos """
419
420     encryption_type = ESP
421     tun4_encrypt_node_name = "esp4-encrypt-tun"
422     tun4_decrypt_node_name = "esp4-decrypt-tun"
423
424     def config_network(self, p):
425
426         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
427                                         p.scapy_tun_spi,
428                                         p.crypt_algo_vpp_id,
429                                         p.crypt_key, p.crypt_key,
430                                         p.auth_algo_vpp_id, p.auth_key,
431                                         p.auth_key,
432                                         salt=p.salt)
433         p.tun_if.add_vpp_config()
434         p.tun_if.admin_up()
435         p.tun_if.config_ip4()
436         config_tun_params(p, self.encryption_type, p.tun_if)
437         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
438         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
439
440         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
441                              [VppRoutePath(p.tun_if.remote_ip4,
442                                            0xffffffff)])
443         p.route.add_vpp_config()
444
445     def unconfig_network(self, p):
446         p.tun_if.unconfig_ip4()
447         p.tun_if.remove_vpp_config()
448         p.route.remove_vpp_config()
449
450     def setUp(self):
451         super(TestIpsec4TunIfEspAll, self).setUp()
452
453         self.tun_if = self.pg0
454
455     def tearDown(self):
456         super(TestIpsec4TunIfEspAll, self).tearDown()
457
458     def rekey(self, p):
459         #
460         # change the key and the SPI
461         #
462         p.crypt_key = b'X' + p.crypt_key[1:]
463         p.scapy_tun_spi += 1
464         p.scapy_tun_sa_id += 1
465         p.vpp_tun_spi += 1
466         p.vpp_tun_sa_id += 1
467         p.tun_if.local_spi = p.vpp_tun_spi
468         p.tun_if.remote_spi = p.scapy_tun_spi
469
470         config_tun_params(p, self.encryption_type, p.tun_if)
471
472         p.tun_sa_in = VppIpsecSA(self,
473                                  p.scapy_tun_sa_id,
474                                  p.scapy_tun_spi,
475                                  p.auth_algo_vpp_id,
476                                  p.auth_key,
477                                  p.crypt_algo_vpp_id,
478                                  p.crypt_key,
479                                  self.vpp_esp_protocol,
480                                  flags=p.flags,
481                                  salt=p.salt)
482         p.tun_sa_out = VppIpsecSA(self,
483                                   p.vpp_tun_sa_id,
484                                   p.vpp_tun_spi,
485                                   p.auth_algo_vpp_id,
486                                   p.auth_key,
487                                   p.crypt_algo_vpp_id,
488                                   p.crypt_key,
489                                   self.vpp_esp_protocol,
490                                   flags=p.flags,
491                                   salt=p.salt)
492         p.tun_sa_in.add_vpp_config()
493         p.tun_sa_out.add_vpp_config()
494
495         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
496                                          sa_id=p.tun_sa_in.id,
497                                          is_outbound=1)
498         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
499                                          sa_id=p.tun_sa_out.id,
500                                          is_outbound=0)
501         self.logger.info(self.vapi.cli("sh ipsec sa"))
502
503     def test_tun_44(self):
504         """IPSEC tunnel all algos """
505
506         # foreach VPP crypto engine
507         engines = ["ia32", "ipsecmb", "openssl"]
508
509         # foreach crypto algorithm
510         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
511                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
512                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
513                                 IPSEC_API_INTEG_ALG_NONE),
514                   'scapy-crypto': "AES-GCM",
515                   'scapy-integ': "NULL",
516                   'key': b"JPjyOWBeVEQiMe7h",
517                   'salt': 3333},
518                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
519                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
520                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
521                                 IPSEC_API_INTEG_ALG_NONE),
522                   'scapy-crypto': "AES-GCM",
523                   'scapy-integ': "NULL",
524                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
525                   'salt': 0},
526                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
527                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
528                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
529                                 IPSEC_API_INTEG_ALG_NONE),
530                   'scapy-crypto': "AES-GCM",
531                   'scapy-integ': "NULL",
532                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
533                   'salt': 9999},
534                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
535                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
536                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
537                                 IPSEC_API_INTEG_ALG_SHA1_96),
538                   'scapy-crypto': "AES-CBC",
539                   'scapy-integ': "HMAC-SHA1-96",
540                   'salt': 0,
541                   'key': b"JPjyOWBeVEQiMe7h"},
542                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
543                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
544                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
545                                 IPSEC_API_INTEG_ALG_SHA1_96),
546                   'scapy-crypto': "AES-CBC",
547                   'scapy-integ': "HMAC-SHA1-96",
548                   'salt': 0,
549                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
550                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
551                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
552                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
553                                 IPSEC_API_INTEG_ALG_SHA1_96),
554                   'scapy-crypto': "AES-CBC",
555                   'scapy-integ': "HMAC-SHA1-96",
556                   'salt': 0,
557                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
558                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
559                                  IPSEC_API_CRYPTO_ALG_NONE),
560                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
561                                 IPSEC_API_INTEG_ALG_SHA1_96),
562                   'scapy-crypto': "NULL",
563                   'scapy-integ': "HMAC-SHA1-96",
564                   'salt': 0,
565                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
566
567         for engine in engines:
568             self.vapi.cli("set crypto handler all %s" % engine)
569
570             #
571             # loop through each of the algorithms
572             #
573             for algo in algos:
574                 # with self.subTest(algo=algo['scapy']):
575
576                 p = copy.copy(self.ipv4_params)
577                 p.auth_algo_vpp_id = algo['vpp-integ']
578                 p.crypt_algo_vpp_id = algo['vpp-crypto']
579                 p.crypt_algo = algo['scapy-crypto']
580                 p.auth_algo = algo['scapy-integ']
581                 p.crypt_key = algo['key']
582                 p.salt = algo['salt']
583
584                 self.config_network(p)
585
586                 self.verify_tun_44(p, count=127)
587                 c = p.tun_if.get_rx_stats()
588                 self.assertEqual(c['packets'], 127)
589                 c = p.tun_if.get_tx_stats()
590                 self.assertEqual(c['packets'], 127)
591
592                 #
593                 # rekey the tunnel
594                 #
595                 self.rekey(p)
596                 self.verify_tun_44(p, count=127)
597
598                 self.unconfig_network(p)
599                 p.tun_sa_out.remove_vpp_config()
600                 p.tun_sa_in.remove_vpp_config()
601
602
603 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
604     """ IPsec IPv4 Tunnel interface no Algos """
605
606     encryption_type = ESP
607     tun4_encrypt_node_name = "esp4-encrypt-tun"
608     tun4_decrypt_node_name = "esp4-decrypt-tun"
609
610     def config_network(self, p):
611
612         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
613                               IPSEC_API_INTEG_ALG_NONE)
614         p.auth_algo = 'NULL'
615         p.auth_key = []
616
617         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
618                                IPSEC_API_CRYPTO_ALG_NONE)
619         p.crypt_algo = 'NULL'
620         p.crypt_key = []
621
622         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
623                                         p.scapy_tun_spi,
624                                         p.crypt_algo_vpp_id,
625                                         p.crypt_key, p.crypt_key,
626                                         p.auth_algo_vpp_id, p.auth_key,
627                                         p.auth_key,
628                                         salt=p.salt)
629         p.tun_if.add_vpp_config()
630         p.tun_if.admin_up()
631         p.tun_if.config_ip4()
632         config_tun_params(p, self.encryption_type, p.tun_if)
633         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
634         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
635
636         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
637                              [VppRoutePath(p.tun_if.remote_ip4,
638                                            0xffffffff)])
639         p.route.add_vpp_config()
640
641     def unconfig_network(self, p):
642         p.tun_if.unconfig_ip4()
643         p.tun_if.remove_vpp_config()
644         p.route.remove_vpp_config()
645
646     def setUp(self):
647         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
648
649         self.tun_if = self.pg0
650
651     def tearDown(self):
652         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
653
654     def test_tun_44(self):
655         """ IPSec SA with NULL algos """
656         p = self.ipv4_params
657
658         self.config_network(p)
659
660         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
661                            dst=p.remote_tun_if_host)
662         self.send_and_assert_no_replies(self.pg1, tx)
663
664         self.unconfig_network(p)
665
666
667 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
668     """ IPsec IPv6 Multi Tunnel interface """
669
670     encryption_type = ESP
671     tun6_encrypt_node_name = "esp6-encrypt-tun"
672     tun6_decrypt_node_name = "esp6-decrypt-tun"
673
674     def setUp(self):
675         super(TestIpsec6MultiTunIfEsp, self).setUp()
676
677         self.tun_if = self.pg0
678
679         self.multi_params = []
680         self.pg0.generate_remote_hosts(10)
681         self.pg0.configure_ipv6_neighbors()
682
683         for ii in range(10):
684             p = copy.copy(self.ipv6_params)
685
686             p.remote_tun_if_host = "1111::%d" % (ii + 1)
687             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
688             p.scapy_tun_spi = p.scapy_tun_spi + ii
689             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
690             p.vpp_tun_spi = p.vpp_tun_spi + ii
691
692             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
693             p.scapy_tra_spi = p.scapy_tra_spi + ii
694             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
695             p.vpp_tra_spi = p.vpp_tra_spi + ii
696
697             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
698                                             p.scapy_tun_spi,
699                                             p.crypt_algo_vpp_id,
700                                             p.crypt_key, p.crypt_key,
701                                             p.auth_algo_vpp_id, p.auth_key,
702                                             p.auth_key, is_ip6=True,
703                                             dst=self.pg0.remote_hosts[ii].ip6)
704             p.tun_if.add_vpp_config()
705             p.tun_if.admin_up()
706             p.tun_if.config_ip6()
707             config_tun_params(p, self.encryption_type, p.tun_if)
708             self.multi_params.append(p)
709
710             r = VppIpRoute(self, p.remote_tun_if_host, 128,
711                            [VppRoutePath(p.tun_if.remote_ip6,
712                                          0xffffffff,
713                                          proto=DpoProto.DPO_PROTO_IP6)])
714             r.add_vpp_config()
715
716     def tearDown(self):
717         super(TestIpsec6MultiTunIfEsp, self).tearDown()
718
719     def test_tun_66(self):
720         """Multiple IPSEC tunnel interfaces """
721         for p in self.multi_params:
722             self.verify_tun_66(p, count=127)
723             c = p.tun_if.get_rx_stats()
724             self.assertEqual(c['packets'], 127)
725             c = p.tun_if.get_tx_stats()
726             self.assertEqual(c['packets'], 127)
727
728
729 class TestIpsecGreTebIfEsp(TemplateIpsec,
730                            IpsecTun4Tests):
731     """ Ipsec GRE TEB ESP - TUN tests """
732     tun4_encrypt_node_name = "esp4-encrypt-tun"
733     tun4_decrypt_node_name = "esp4-decrypt-tun"
734     encryption_type = ESP
735     omac = "00:11:22:33:44:55"
736
737     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
738                          payload_size=100):
739         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
740                 sa.encrypt(IP(src=self.pg0.remote_ip4,
741                               dst=self.pg0.local_ip4) /
742                            GRE() /
743                            Ether(dst=self.omac) /
744                            IP(src="1.1.1.1", dst="1.1.1.2") /
745                            UDP(sport=1144, dport=2233) /
746                            Raw(b'X' * payload_size))
747                 for i in range(count)]
748
749     def gen_pkts(self, sw_intf, src, dst, count=1,
750                  payload_size=100):
751         return [Ether(dst=self.omac) /
752                 IP(src="1.1.1.1", dst="1.1.1.2") /
753                 UDP(sport=1144, dport=2233) /
754                 Raw(b'X' * payload_size)
755                 for i in range(count)]
756
757     def verify_decrypted(self, p, rxs):
758         for rx in rxs:
759             self.assert_equal(rx[Ether].dst, self.omac)
760             self.assert_equal(rx[IP].dst, "1.1.1.2")
761
762     def verify_encrypted(self, p, sa, rxs):
763         for rx in rxs:
764             try:
765                 pkt = sa.decrypt(rx[IP])
766                 if not pkt.haslayer(IP):
767                     pkt = IP(pkt[Raw].load)
768                 self.assert_packet_checksums_valid(pkt)
769                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
770                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
771                 self.assertTrue(pkt.haslayer(GRE))
772                 e = pkt[Ether]
773                 self.assertEqual(e[Ether].dst, self.omac)
774                 self.assertEqual(e[IP].dst, "1.1.1.2")
775             except (IndexError, AssertionError):
776                 self.logger.debug(ppp("Unexpected packet:", rx))
777                 try:
778                     self.logger.debug(ppp("Decrypted packet:", pkt))
779                 except:
780                     pass
781                 raise
782
783     def setUp(self):
784         super(TestIpsecGreTebIfEsp, self).setUp()
785
786         self.tun_if = self.pg0
787
788         p = self.ipv4_params
789
790         bd1 = VppBridgeDomain(self, 1)
791         bd1.add_vpp_config()
792
793         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
794                                   p.auth_algo_vpp_id, p.auth_key,
795                                   p.crypt_algo_vpp_id, p.crypt_key,
796                                   self.vpp_esp_protocol,
797                                   self.pg0.local_ip4,
798                                   self.pg0.remote_ip4)
799         p.tun_sa_out.add_vpp_config()
800
801         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
802                                  p.auth_algo_vpp_id, p.auth_key,
803                                  p.crypt_algo_vpp_id, p.crypt_key,
804                                  self.vpp_esp_protocol,
805                                  self.pg0.remote_ip4,
806                                  self.pg0.local_ip4)
807         p.tun_sa_in.add_vpp_config()
808
809         p.tun_if = VppGreInterface(self,
810                                    self.pg0.local_ip4,
811                                    self.pg0.remote_ip4,
812                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
813                                          GRE_API_TUNNEL_TYPE_TEB))
814         p.tun_if.add_vpp_config()
815
816         p.tun_protect = VppIpsecTunProtect(self,
817                                            p.tun_if,
818                                            p.tun_sa_out,
819                                            [p.tun_sa_in])
820
821         p.tun_protect.add_vpp_config()
822
823         p.tun_if.admin_up()
824         p.tun_if.config_ip4()
825         config_tun_params(p, self.encryption_type, p.tun_if)
826
827         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
828         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
829
830         self.vapi.cli("clear ipsec sa")
831         self.vapi.cli("sh adj")
832         self.vapi.cli("sh ipsec tun")
833
834     def tearDown(self):
835         p = self.ipv4_params
836         p.tun_if.unconfig_ip4()
837         super(TestIpsecGreTebIfEsp, self).tearDown()
838
839
840 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
841                                IpsecTun4Tests):
842     """ Ipsec GRE TEB ESP - TUN tests """
843     tun4_encrypt_node_name = "esp4-encrypt-tun"
844     tun4_decrypt_node_name = "esp4-decrypt-tun"
845     encryption_type = ESP
846     omac = "00:11:22:33:44:55"
847
848     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
849                          payload_size=100):
850         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
851                 sa.encrypt(IP(src=self.pg0.remote_ip4,
852                               dst=self.pg0.local_ip4) /
853                            GRE() /
854                            Ether(dst=self.omac) /
855                            IP(src="1.1.1.1", dst="1.1.1.2") /
856                            UDP(sport=1144, dport=2233) /
857                            Raw(b'X' * payload_size))
858                 for i in range(count)]
859
860     def gen_pkts(self, sw_intf, src, dst, count=1,
861                  payload_size=100):
862         return [Ether(dst=self.omac) /
863                 Dot1Q(vlan=11) /
864                 IP(src="1.1.1.1", dst="1.1.1.2") /
865                 UDP(sport=1144, dport=2233) /
866                 Raw(b'X' * payload_size)
867                 for i in range(count)]
868
869     def verify_decrypted(self, p, rxs):
870         for rx in rxs:
871             self.assert_equal(rx[Ether].dst, self.omac)
872             self.assert_equal(rx[Dot1Q].vlan, 11)
873             self.assert_equal(rx[IP].dst, "1.1.1.2")
874
875     def verify_encrypted(self, p, sa, rxs):
876         for rx in rxs:
877             try:
878                 pkt = sa.decrypt(rx[IP])
879                 if not pkt.haslayer(IP):
880                     pkt = IP(pkt[Raw].load)
881                 self.assert_packet_checksums_valid(pkt)
882                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
883                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
884                 self.assertTrue(pkt.haslayer(GRE))
885                 e = pkt[Ether]
886                 self.assertEqual(e[Ether].dst, self.omac)
887                 self.assertFalse(e.haslayer(Dot1Q))
888                 self.assertEqual(e[IP].dst, "1.1.1.2")
889             except (IndexError, AssertionError):
890                 self.logger.debug(ppp("Unexpected packet:", rx))
891                 try:
892                     self.logger.debug(ppp("Decrypted packet:", pkt))
893                 except:
894                     pass
895                 raise
896
897     def setUp(self):
898         super(TestIpsecGreTebVlanIfEsp, self).setUp()
899
900         self.tun_if = self.pg0
901
902         p = self.ipv4_params
903
904         bd1 = VppBridgeDomain(self, 1)
905         bd1.add_vpp_config()
906
907         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
908         self.vapi.l2_interface_vlan_tag_rewrite(
909             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
910             push_dot1q=11)
911         self.pg1_11.admin_up()
912
913         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
914                                   p.auth_algo_vpp_id, p.auth_key,
915                                   p.crypt_algo_vpp_id, p.crypt_key,
916                                   self.vpp_esp_protocol,
917                                   self.pg0.local_ip4,
918                                   self.pg0.remote_ip4)
919         p.tun_sa_out.add_vpp_config()
920
921         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
922                                  p.auth_algo_vpp_id, p.auth_key,
923                                  p.crypt_algo_vpp_id, p.crypt_key,
924                                  self.vpp_esp_protocol,
925                                  self.pg0.remote_ip4,
926                                  self.pg0.local_ip4)
927         p.tun_sa_in.add_vpp_config()
928
929         p.tun_if = VppGreInterface(self,
930                                    self.pg0.local_ip4,
931                                    self.pg0.remote_ip4,
932                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
933                                          GRE_API_TUNNEL_TYPE_TEB))
934         p.tun_if.add_vpp_config()
935
936         p.tun_protect = VppIpsecTunProtect(self,
937                                            p.tun_if,
938                                            p.tun_sa_out,
939                                            [p.tun_sa_in])
940
941         p.tun_protect.add_vpp_config()
942
943         p.tun_if.admin_up()
944         p.tun_if.config_ip4()
945         config_tun_params(p, self.encryption_type, p.tun_if)
946
947         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
948         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
949
950         self.vapi.cli("clear ipsec sa")
951
952     def tearDown(self):
953         p = self.ipv4_params
954         p.tun_if.unconfig_ip4()
955         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
956         self.pg1_11.admin_down()
957         self.pg1_11.remove_vpp_config()
958
959
960 class TestIpsecGreTebIfEspTra(TemplateIpsec,
961                               IpsecTun4Tests):
962     """ Ipsec GRE TEB ESP - Tra tests """
963     tun4_encrypt_node_name = "esp4-encrypt-tun"
964     tun4_decrypt_node_name = "esp4-decrypt-tun"
965     encryption_type = ESP
966     omac = "00:11:22:33:44:55"
967
968     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
969                          payload_size=100):
970         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
971                 sa.encrypt(IP(src=self.pg0.remote_ip4,
972                               dst=self.pg0.local_ip4) /
973                            GRE() /
974                            Ether(dst=self.omac) /
975                            IP(src="1.1.1.1", dst="1.1.1.2") /
976                            UDP(sport=1144, dport=2233) /
977                            Raw(b'X' * payload_size))
978                 for i in range(count)]
979
980     def gen_pkts(self, sw_intf, src, dst, count=1,
981                  payload_size=100):
982         return [Ether(dst=self.omac) /
983                 IP(src="1.1.1.1", dst="1.1.1.2") /
984                 UDP(sport=1144, dport=2233) /
985                 Raw(b'X' * payload_size)
986                 for i in range(count)]
987
988     def verify_decrypted(self, p, rxs):
989         for rx in rxs:
990             self.assert_equal(rx[Ether].dst, self.omac)
991             self.assert_equal(rx[IP].dst, "1.1.1.2")
992
993     def verify_encrypted(self, p, sa, rxs):
994         for rx in rxs:
995             try:
996                 pkt = sa.decrypt(rx[IP])
997                 if not pkt.haslayer(IP):
998                     pkt = IP(pkt[Raw].load)
999                 self.assert_packet_checksums_valid(pkt)
1000                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1001                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1002                 self.assertTrue(pkt.haslayer(GRE))
1003                 e = pkt[Ether]
1004                 self.assertEqual(e[Ether].dst, self.omac)
1005                 self.assertEqual(e[IP].dst, "1.1.1.2")
1006             except (IndexError, AssertionError):
1007                 self.logger.debug(ppp("Unexpected packet:", rx))
1008                 try:
1009                     self.logger.debug(ppp("Decrypted packet:", pkt))
1010                 except:
1011                     pass
1012                 raise
1013
1014     def setUp(self):
1015         super(TestIpsecGreTebIfEspTra, self).setUp()
1016
1017         self.tun_if = self.pg0
1018
1019         p = self.ipv4_params
1020
1021         bd1 = VppBridgeDomain(self, 1)
1022         bd1.add_vpp_config()
1023
1024         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1025                                   p.auth_algo_vpp_id, p.auth_key,
1026                                   p.crypt_algo_vpp_id, p.crypt_key,
1027                                   self.vpp_esp_protocol)
1028         p.tun_sa_out.add_vpp_config()
1029
1030         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1031                                  p.auth_algo_vpp_id, p.auth_key,
1032                                  p.crypt_algo_vpp_id, p.crypt_key,
1033                                  self.vpp_esp_protocol)
1034         p.tun_sa_in.add_vpp_config()
1035
1036         p.tun_if = VppGreInterface(self,
1037                                    self.pg0.local_ip4,
1038                                    self.pg0.remote_ip4,
1039                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1040                                          GRE_API_TUNNEL_TYPE_TEB))
1041         p.tun_if.add_vpp_config()
1042
1043         p.tun_protect = VppIpsecTunProtect(self,
1044                                            p.tun_if,
1045                                            p.tun_sa_out,
1046                                            [p.tun_sa_in])
1047
1048         p.tun_protect.add_vpp_config()
1049
1050         p.tun_if.admin_up()
1051         p.tun_if.config_ip4()
1052         config_tra_params(p, self.encryption_type, p.tun_if)
1053
1054         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1055         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1056
1057         self.vapi.cli("clear ipsec sa")
1058
1059     def tearDown(self):
1060         p = self.ipv4_params
1061         p.tun_if.unconfig_ip4()
1062         super(TestIpsecGreTebIfEspTra, self).tearDown()
1063
1064
1065 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1066                                  IpsecTun4Tests):
1067     """ Ipsec GRE TEB UDP ESP - Tra tests """
1068     tun4_encrypt_node_name = "esp4-encrypt-tun"
1069     tun4_decrypt_node_name = "esp4-decrypt-tun"
1070     encryption_type = ESP
1071     omac = "00:11:22:33:44:55"
1072
1073     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1074                          payload_size=100):
1075         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1076                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1077                               dst=self.pg0.local_ip4) /
1078                            GRE() /
1079                            Ether(dst=self.omac) /
1080                            IP(src="1.1.1.1", dst="1.1.1.2") /
1081                            UDP(sport=1144, dport=2233) /
1082                            Raw(b'X' * payload_size))
1083                 for i in range(count)]
1084
1085     def gen_pkts(self, sw_intf, src, dst, count=1,
1086                  payload_size=100):
1087         return [Ether(dst=self.omac) /
1088                 IP(src="1.1.1.1", dst="1.1.1.2") /
1089                 UDP(sport=1144, dport=2233) /
1090                 Raw(b'X' * payload_size)
1091                 for i in range(count)]
1092
1093     def verify_decrypted(self, p, rxs):
1094         for rx in rxs:
1095             self.assert_equal(rx[Ether].dst, self.omac)
1096             self.assert_equal(rx[IP].dst, "1.1.1.2")
1097
1098     def verify_encrypted(self, p, sa, rxs):
1099         for rx in rxs:
1100             self.assertTrue(rx.haslayer(UDP))
1101             self.assertEqual(rx[UDP].dport, 4545)
1102             self.assertEqual(rx[UDP].sport, 5454)
1103             try:
1104                 pkt = sa.decrypt(rx[IP])
1105                 if not pkt.haslayer(IP):
1106                     pkt = IP(pkt[Raw].load)
1107                 self.assert_packet_checksums_valid(pkt)
1108                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1109                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1110                 self.assertTrue(pkt.haslayer(GRE))
1111                 e = pkt[Ether]
1112                 self.assertEqual(e[Ether].dst, self.omac)
1113                 self.assertEqual(e[IP].dst, "1.1.1.2")
1114             except (IndexError, AssertionError):
1115                 self.logger.debug(ppp("Unexpected packet:", rx))
1116                 try:
1117                     self.logger.debug(ppp("Decrypted packet:", pkt))
1118                 except:
1119                     pass
1120                 raise
1121
1122     def setUp(self):
1123         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1124
1125         self.tun_if = self.pg0
1126
1127         p = self.ipv4_params
1128         p = self.ipv4_params
1129         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1130                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1131         p.nat_header = UDP(sport=5454, dport=4545)
1132
1133         bd1 = VppBridgeDomain(self, 1)
1134         bd1.add_vpp_config()
1135
1136         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1137                                   p.auth_algo_vpp_id, p.auth_key,
1138                                   p.crypt_algo_vpp_id, p.crypt_key,
1139                                   self.vpp_esp_protocol,
1140                                   flags=p.flags,
1141                                   udp_src=5454,
1142                                   udp_dst=4545)
1143         p.tun_sa_out.add_vpp_config()
1144
1145         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1146                                  p.auth_algo_vpp_id, p.auth_key,
1147                                  p.crypt_algo_vpp_id, p.crypt_key,
1148                                  self.vpp_esp_protocol,
1149                                  flags=(p.flags |
1150                                         VppEnum.vl_api_ipsec_sad_flags_t.
1151                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1152                                  udp_src=5454,
1153                                  udp_dst=4545)
1154         p.tun_sa_in.add_vpp_config()
1155
1156         p.tun_if = VppGreInterface(self,
1157                                    self.pg0.local_ip4,
1158                                    self.pg0.remote_ip4,
1159                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1160                                          GRE_API_TUNNEL_TYPE_TEB))
1161         p.tun_if.add_vpp_config()
1162
1163         p.tun_protect = VppIpsecTunProtect(self,
1164                                            p.tun_if,
1165                                            p.tun_sa_out,
1166                                            [p.tun_sa_in])
1167
1168         p.tun_protect.add_vpp_config()
1169
1170         p.tun_if.admin_up()
1171         p.tun_if.config_ip4()
1172         config_tra_params(p, self.encryption_type, p.tun_if)
1173
1174         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1175         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1176
1177         self.vapi.cli("clear ipsec sa")
1178         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1179
1180     def tearDown(self):
1181         p = self.ipv4_params
1182         p.tun_if.unconfig_ip4()
1183         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1184
1185
1186 class TestIpsecGreIfEsp(TemplateIpsec,
1187                         IpsecTun4Tests):
1188     """ Ipsec GRE ESP - TUN tests """
1189     tun4_encrypt_node_name = "esp4-encrypt-tun"
1190     tun4_decrypt_node_name = "esp4-decrypt-tun"
1191     encryption_type = ESP
1192
1193     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1194                          payload_size=100):
1195         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1196                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1197                               dst=self.pg0.local_ip4) /
1198                            GRE() /
1199                            IP(src=self.pg1.local_ip4,
1200                               dst=self.pg1.remote_ip4) /
1201                            UDP(sport=1144, dport=2233) /
1202                            Raw(b'X' * payload_size))
1203                 for i in range(count)]
1204
1205     def gen_pkts(self, sw_intf, src, dst, count=1,
1206                  payload_size=100):
1207         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1208                 IP(src="1.1.1.1", dst="1.1.1.2") /
1209                 UDP(sport=1144, dport=2233) /
1210                 Raw(b'X' * payload_size)
1211                 for i in range(count)]
1212
1213     def verify_decrypted(self, p, rxs):
1214         for rx in rxs:
1215             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1216             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1217
1218     def verify_encrypted(self, p, sa, rxs):
1219         for rx in rxs:
1220             try:
1221                 pkt = sa.decrypt(rx[IP])
1222                 if not pkt.haslayer(IP):
1223                     pkt = IP(pkt[Raw].load)
1224                 self.assert_packet_checksums_valid(pkt)
1225                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1226                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1227                 self.assertTrue(pkt.haslayer(GRE))
1228                 e = pkt[GRE]
1229                 self.assertEqual(e[IP].dst, "1.1.1.2")
1230             except (IndexError, AssertionError):
1231                 self.logger.debug(ppp("Unexpected packet:", rx))
1232                 try:
1233                     self.logger.debug(ppp("Decrypted packet:", pkt))
1234                 except:
1235                     pass
1236                 raise
1237
1238     def setUp(self):
1239         super(TestIpsecGreIfEsp, self).setUp()
1240
1241         self.tun_if = self.pg0
1242
1243         p = self.ipv4_params
1244
1245         bd1 = VppBridgeDomain(self, 1)
1246         bd1.add_vpp_config()
1247
1248         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1249                                   p.auth_algo_vpp_id, p.auth_key,
1250                                   p.crypt_algo_vpp_id, p.crypt_key,
1251                                   self.vpp_esp_protocol,
1252                                   self.pg0.local_ip4,
1253                                   self.pg0.remote_ip4)
1254         p.tun_sa_out.add_vpp_config()
1255
1256         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1257                                  p.auth_algo_vpp_id, p.auth_key,
1258                                  p.crypt_algo_vpp_id, p.crypt_key,
1259                                  self.vpp_esp_protocol,
1260                                  self.pg0.remote_ip4,
1261                                  self.pg0.local_ip4)
1262         p.tun_sa_in.add_vpp_config()
1263
1264         p.tun_if = VppGreInterface(self,
1265                                    self.pg0.local_ip4,
1266                                    self.pg0.remote_ip4)
1267         p.tun_if.add_vpp_config()
1268
1269         p.tun_protect = VppIpsecTunProtect(self,
1270                                            p.tun_if,
1271                                            p.tun_sa_out,
1272                                            [p.tun_sa_in])
1273         p.tun_protect.add_vpp_config()
1274
1275         p.tun_if.admin_up()
1276         p.tun_if.config_ip4()
1277         config_tun_params(p, self.encryption_type, p.tun_if)
1278
1279         VppIpRoute(self, "1.1.1.2", 32,
1280                    [VppRoutePath(p.tun_if.remote_ip4,
1281                                  0xffffffff)]).add_vpp_config()
1282
1283     def tearDown(self):
1284         p = self.ipv4_params
1285         p.tun_if.unconfig_ip4()
1286         super(TestIpsecGreIfEsp, self).tearDown()
1287
1288
1289 class TestIpsecGreIfEspTra(TemplateIpsec,
1290                            IpsecTun4Tests):
1291     """ Ipsec GRE ESP - TRA tests """
1292     tun4_encrypt_node_name = "esp4-encrypt-tun"
1293     tun4_decrypt_node_name = "esp4-decrypt-tun"
1294     encryption_type = ESP
1295
1296     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1297                          payload_size=100):
1298         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1299                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1300                               dst=self.pg0.local_ip4) /
1301                            GRE() /
1302                            IP(src=self.pg1.local_ip4,
1303                               dst=self.pg1.remote_ip4) /
1304                            UDP(sport=1144, dport=2233) /
1305                            Raw(b'X' * payload_size))
1306                 for i in range(count)]
1307
1308     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1309                                 payload_size=100):
1310         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1311                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1312                               dst=self.pg0.local_ip4) /
1313                            GRE() /
1314                            UDP(sport=1144, dport=2233) /
1315                            Raw(b'X' * payload_size))
1316                 for i in range(count)]
1317
1318     def gen_pkts(self, sw_intf, src, dst, count=1,
1319                  payload_size=100):
1320         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1321                 IP(src="1.1.1.1", dst="1.1.1.2") /
1322                 UDP(sport=1144, dport=2233) /
1323                 Raw(b'X' * payload_size)
1324                 for i in range(count)]
1325
1326     def verify_decrypted(self, p, rxs):
1327         for rx in rxs:
1328             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1329             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1330
1331     def verify_encrypted(self, p, sa, rxs):
1332         for rx in rxs:
1333             try:
1334                 pkt = sa.decrypt(rx[IP])
1335                 if not pkt.haslayer(IP):
1336                     pkt = IP(pkt[Raw].load)
1337                 self.assert_packet_checksums_valid(pkt)
1338                 self.assertTrue(pkt.haslayer(GRE))
1339                 e = pkt[GRE]
1340                 self.assertEqual(e[IP].dst, "1.1.1.2")
1341             except (IndexError, AssertionError):
1342                 self.logger.debug(ppp("Unexpected packet:", rx))
1343                 try:
1344                     self.logger.debug(ppp("Decrypted packet:", pkt))
1345                 except:
1346                     pass
1347                 raise
1348
1349     def setUp(self):
1350         super(TestIpsecGreIfEspTra, self).setUp()
1351
1352         self.tun_if = self.pg0
1353
1354         p = self.ipv4_params
1355
1356         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1357                                   p.auth_algo_vpp_id, p.auth_key,
1358                                   p.crypt_algo_vpp_id, p.crypt_key,
1359                                   self.vpp_esp_protocol)
1360         p.tun_sa_out.add_vpp_config()
1361
1362         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1363                                  p.auth_algo_vpp_id, p.auth_key,
1364                                  p.crypt_algo_vpp_id, p.crypt_key,
1365                                  self.vpp_esp_protocol)
1366         p.tun_sa_in.add_vpp_config()
1367
1368         p.tun_if = VppGreInterface(self,
1369                                    self.pg0.local_ip4,
1370                                    self.pg0.remote_ip4)
1371         p.tun_if.add_vpp_config()
1372
1373         p.tun_protect = VppIpsecTunProtect(self,
1374                                            p.tun_if,
1375                                            p.tun_sa_out,
1376                                            [p.tun_sa_in])
1377         p.tun_protect.add_vpp_config()
1378
1379         p.tun_if.admin_up()
1380         p.tun_if.config_ip4()
1381         config_tra_params(p, self.encryption_type, p.tun_if)
1382
1383         VppIpRoute(self, "1.1.1.2", 32,
1384                    [VppRoutePath(p.tun_if.remote_ip4,
1385                                  0xffffffff)]).add_vpp_config()
1386
1387     def tearDown(self):
1388         p = self.ipv4_params
1389         p.tun_if.unconfig_ip4()
1390         super(TestIpsecGreIfEspTra, self).tearDown()
1391
1392     def test_gre_non_ip(self):
1393         p = self.ipv4_params
1394         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1395                                           src=p.remote_tun_if_host,
1396                                           dst=self.pg1.remote_ip6)
1397         self.send_and_assert_no_replies(self.tun_if, tx)
1398         node_name = ('/err/%s/unsupported payload' %
1399                      self.tun4_decrypt_node_name)
1400         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1401
1402
1403 class TestIpsecGre6IfEspTra(TemplateIpsec,
1404                             IpsecTun6Tests):
1405     """ Ipsec GRE ESP - TRA tests """
1406     tun6_encrypt_node_name = "esp6-encrypt-tun"
1407     tun6_decrypt_node_name = "esp6-decrypt-tun"
1408     encryption_type = ESP
1409
1410     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1411                           payload_size=100):
1412         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1413                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1414                                 dst=self.pg0.local_ip6) /
1415                            GRE() /
1416                            IPv6(src=self.pg1.local_ip6,
1417                                 dst=self.pg1.remote_ip6) /
1418                            UDP(sport=1144, dport=2233) /
1419                            Raw(b'X' * payload_size))
1420                 for i in range(count)]
1421
1422     def gen_pkts6(self, sw_intf, src, dst, count=1,
1423                   payload_size=100):
1424         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1425                 IPv6(src="1::1", dst="1::2") /
1426                 UDP(sport=1144, dport=2233) /
1427                 Raw(b'X' * payload_size)
1428                 for i in range(count)]
1429
1430     def verify_decrypted6(self, p, rxs):
1431         for rx in rxs:
1432             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1433             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1434
1435     def verify_encrypted6(self, p, sa, rxs):
1436         for rx in rxs:
1437             try:
1438                 pkt = sa.decrypt(rx[IPv6])
1439                 if not pkt.haslayer(IPv6):
1440                     pkt = IPv6(pkt[Raw].load)
1441                 self.assert_packet_checksums_valid(pkt)
1442                 self.assertTrue(pkt.haslayer(GRE))
1443                 e = pkt[GRE]
1444                 self.assertEqual(e[IPv6].dst, "1::2")
1445             except (IndexError, AssertionError):
1446                 self.logger.debug(ppp("Unexpected packet:", rx))
1447                 try:
1448                     self.logger.debug(ppp("Decrypted packet:", pkt))
1449                 except:
1450                     pass
1451                 raise
1452
1453     def setUp(self):
1454         super(TestIpsecGre6IfEspTra, self).setUp()
1455
1456         self.tun_if = self.pg0
1457
1458         p = self.ipv6_params
1459
1460         bd1 = VppBridgeDomain(self, 1)
1461         bd1.add_vpp_config()
1462
1463         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1464                                   p.auth_algo_vpp_id, p.auth_key,
1465                                   p.crypt_algo_vpp_id, p.crypt_key,
1466                                   self.vpp_esp_protocol)
1467         p.tun_sa_out.add_vpp_config()
1468
1469         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1470                                  p.auth_algo_vpp_id, p.auth_key,
1471                                  p.crypt_algo_vpp_id, p.crypt_key,
1472                                  self.vpp_esp_protocol)
1473         p.tun_sa_in.add_vpp_config()
1474
1475         p.tun_if = VppGreInterface(self,
1476                                    self.pg0.local_ip6,
1477                                    self.pg0.remote_ip6)
1478         p.tun_if.add_vpp_config()
1479
1480         p.tun_protect = VppIpsecTunProtect(self,
1481                                            p.tun_if,
1482                                            p.tun_sa_out,
1483                                            [p.tun_sa_in])
1484         p.tun_protect.add_vpp_config()
1485
1486         p.tun_if.admin_up()
1487         p.tun_if.config_ip6()
1488         config_tra_params(p, self.encryption_type, p.tun_if)
1489
1490         r = VppIpRoute(self, "1::2", 128,
1491                        [VppRoutePath(p.tun_if.remote_ip6,
1492                                      0xffffffff,
1493                                      proto=DpoProto.DPO_PROTO_IP6)])
1494         r.add_vpp_config()
1495
1496     def tearDown(self):
1497         p = self.ipv6_params
1498         p.tun_if.unconfig_ip6()
1499         super(TestIpsecGre6IfEspTra, self).tearDown()
1500
1501
1502 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1503     """ Ipsec mGRE ESP v4 TRA tests """
1504     tun4_encrypt_node_name = "esp4-encrypt-tun"
1505     tun4_decrypt_node_name = "esp4-decrypt-tun"
1506     encryption_type = ESP
1507
1508     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1509                          payload_size=100):
1510         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1511                 sa.encrypt(IP(src=p.tun_dst,
1512                               dst=self.pg0.local_ip4) /
1513                            GRE() /
1514                            IP(src=self.pg1.local_ip4,
1515                               dst=self.pg1.remote_ip4) /
1516                            UDP(sport=1144, dport=2233) /
1517                            Raw(b'X' * payload_size))
1518                 for i in range(count)]
1519
1520     def gen_pkts(self, sw_intf, src, dst, count=1,
1521                  payload_size=100):
1522         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1523                 IP(src="1.1.1.1", dst=dst) /
1524                 UDP(sport=1144, dport=2233) /
1525                 Raw(b'X' * payload_size)
1526                 for i in range(count)]
1527
1528     def verify_decrypted(self, p, rxs):
1529         for rx in rxs:
1530             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1531             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1532
1533     def verify_encrypted(self, p, sa, rxs):
1534         for rx in rxs:
1535             try:
1536                 pkt = sa.decrypt(rx[IP])
1537                 if not pkt.haslayer(IP):
1538                     pkt = IP(pkt[Raw].load)
1539                 self.assert_packet_checksums_valid(pkt)
1540                 self.assertTrue(pkt.haslayer(GRE))
1541                 e = pkt[GRE]
1542                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1543             except (IndexError, AssertionError):
1544                 self.logger.debug(ppp("Unexpected packet:", rx))
1545                 try:
1546                     self.logger.debug(ppp("Decrypted packet:", pkt))
1547                 except:
1548                     pass
1549                 raise
1550
1551     def setUp(self):
1552         super(TestIpsecMGreIfEspTra4, self).setUp()
1553
1554         N_NHS = 16
1555         self.tun_if = self.pg0
1556         p = self.ipv4_params
1557         p.tun_if = VppGreInterface(self,
1558                                    self.pg0.local_ip4,
1559                                    "0.0.0.0",
1560                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1561                                          TUNNEL_API_MODE_MP))
1562         p.tun_if.add_vpp_config()
1563         p.tun_if.admin_up()
1564         p.tun_if.config_ip4()
1565         p.tun_if.generate_remote_hosts(N_NHS)
1566         self.pg0.generate_remote_hosts(N_NHS)
1567         self.pg0.configure_ipv4_neighbors()
1568
1569         # setup some SAs for several next-hops on the interface
1570         self.multi_params = []
1571
1572         for ii in range(1):
1573             p = copy.copy(self.ipv4_params)
1574
1575             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1576             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1577             p.scapy_tun_spi = p.scapy_tun_spi + ii
1578             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1579             p.vpp_tun_spi = p.vpp_tun_spi + ii
1580
1581             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1582             p.scapy_tra_spi = p.scapy_tra_spi + ii
1583             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1584             p.vpp_tra_spi = p.vpp_tra_spi + ii
1585             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1586                                       p.auth_algo_vpp_id, p.auth_key,
1587                                       p.crypt_algo_vpp_id, p.crypt_key,
1588                                       self.vpp_esp_protocol)
1589             p.tun_sa_out.add_vpp_config()
1590
1591             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1592                                      p.auth_algo_vpp_id, p.auth_key,
1593                                      p.crypt_algo_vpp_id, p.crypt_key,
1594                                      self.vpp_esp_protocol)
1595             p.tun_sa_in.add_vpp_config()
1596
1597             p.tun_protect = VppIpsecTunProtect(
1598                 self,
1599                 p.tun_if,
1600                 p.tun_sa_out,
1601                 [p.tun_sa_in],
1602                 nh=p.tun_if.remote_hosts[ii].ip4)
1603             p.tun_protect.add_vpp_config()
1604             config_tra_params(p, self.encryption_type, p.tun_if)
1605             self.multi_params.append(p)
1606
1607             VppIpRoute(self, p.remote_tun_if_host, 32,
1608                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1609                                      p.tun_if.sw_if_index)]).add_vpp_config()
1610
1611             # in this v4 variant add the teibs after the protect
1612             p.teib = VppTeib(self, p.tun_if,
1613                              p.tun_if.remote_hosts[ii].ip4,
1614                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1615             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1616         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1617
1618     def tearDown(self):
1619         p = self.ipv4_params
1620         p.tun_if.unconfig_ip4()
1621         super(TestIpsecMGreIfEspTra4, self).tearDown()
1622
1623     def test_tun_44(self):
1624         """mGRE IPSEC 44"""
1625         N_PKTS = 63
1626         for p in self.multi_params:
1627             self.verify_tun_44(p, count=N_PKTS)
1628             p.teib.remove_vpp_config()
1629             self.verify_tun_dropped_44(p, count=N_PKTS)
1630             p.teib.add_vpp_config()
1631             self.verify_tun_44(p, count=N_PKTS)
1632
1633
1634 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1635     """ Ipsec mGRE ESP v6 TRA tests """
1636     tun6_encrypt_node_name = "esp6-encrypt-tun"
1637     tun6_decrypt_node_name = "esp6-decrypt-tun"
1638     encryption_type = ESP
1639
1640     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1641                           payload_size=100):
1642         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1643                 sa.encrypt(IPv6(src=p.tun_dst,
1644                                 dst=self.pg0.local_ip6) /
1645                            GRE() /
1646                            IPv6(src=self.pg1.local_ip6,
1647                                 dst=self.pg1.remote_ip6) /
1648                            UDP(sport=1144, dport=2233) /
1649                            Raw(b'X' * payload_size))
1650                 for i in range(count)]
1651
1652     def gen_pkts6(self, sw_intf, src, dst, count=1,
1653                   payload_size=100):
1654         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1655                 IPv6(src="1::1", dst=dst) /
1656                 UDP(sport=1144, dport=2233) /
1657                 Raw(b'X' * payload_size)
1658                 for i in range(count)]
1659
1660     def verify_decrypted6(self, p, rxs):
1661         for rx in rxs:
1662             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1663             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1664
1665     def verify_encrypted6(self, p, sa, rxs):
1666         for rx in rxs:
1667             try:
1668                 pkt = sa.decrypt(rx[IPv6])
1669                 if not pkt.haslayer(IPv6):
1670                     pkt = IPv6(pkt[Raw].load)
1671                 self.assert_packet_checksums_valid(pkt)
1672                 self.assertTrue(pkt.haslayer(GRE))
1673                 e = pkt[GRE]
1674                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1675             except (IndexError, AssertionError):
1676                 self.logger.debug(ppp("Unexpected packet:", rx))
1677                 try:
1678                     self.logger.debug(ppp("Decrypted packet:", pkt))
1679                 except:
1680                     pass
1681                 raise
1682
1683     def setUp(self):
1684         super(TestIpsecMGreIfEspTra6, self).setUp()
1685
1686         self.vapi.cli("set logging class ipsec level debug")
1687
1688         N_NHS = 16
1689         self.tun_if = self.pg0
1690         p = self.ipv6_params
1691         p.tun_if = VppGreInterface(self,
1692                                    self.pg0.local_ip6,
1693                                    "::",
1694                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1695                                          TUNNEL_API_MODE_MP))
1696         p.tun_if.add_vpp_config()
1697         p.tun_if.admin_up()
1698         p.tun_if.config_ip6()
1699         p.tun_if.generate_remote_hosts(N_NHS)
1700         self.pg0.generate_remote_hosts(N_NHS)
1701         self.pg0.configure_ipv6_neighbors()
1702
1703         # setup some SAs for several next-hops on the interface
1704         self.multi_params = []
1705
1706         for ii in range(N_NHS):
1707             p = copy.copy(self.ipv6_params)
1708
1709             p.remote_tun_if_host = "1::%d" % (ii + 1)
1710             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1711             p.scapy_tun_spi = p.scapy_tun_spi + ii
1712             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1713             p.vpp_tun_spi = p.vpp_tun_spi + ii
1714
1715             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1716             p.scapy_tra_spi = p.scapy_tra_spi + ii
1717             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1718             p.vpp_tra_spi = p.vpp_tra_spi + ii
1719             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1720                                       p.auth_algo_vpp_id, p.auth_key,
1721                                       p.crypt_algo_vpp_id, p.crypt_key,
1722                                       self.vpp_esp_protocol)
1723             p.tun_sa_out.add_vpp_config()
1724
1725             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1726                                      p.auth_algo_vpp_id, p.auth_key,
1727                                      p.crypt_algo_vpp_id, p.crypt_key,
1728                                      self.vpp_esp_protocol)
1729             p.tun_sa_in.add_vpp_config()
1730
1731             # in this v6 variant add the teibs first then the protection
1732             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1733             VppTeib(self, p.tun_if,
1734                     p.tun_if.remote_hosts[ii].ip6,
1735                     p.tun_dst).add_vpp_config()
1736
1737             p.tun_protect = VppIpsecTunProtect(
1738                 self,
1739                 p.tun_if,
1740                 p.tun_sa_out,
1741                 [p.tun_sa_in],
1742                 nh=p.tun_if.remote_hosts[ii].ip6)
1743             p.tun_protect.add_vpp_config()
1744             config_tra_params(p, self.encryption_type, p.tun_if)
1745             self.multi_params.append(p)
1746
1747             VppIpRoute(self, p.remote_tun_if_host, 128,
1748                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1749                                      p.tun_if.sw_if_index)]).add_vpp_config()
1750             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1751
1752         self.logger.info(self.vapi.cli("sh log"))
1753         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1754         self.logger.info(self.vapi.cli("sh adj 41"))
1755
1756     def tearDown(self):
1757         p = self.ipv6_params
1758         p.tun_if.unconfig_ip6()
1759         super(TestIpsecMGreIfEspTra6, self).tearDown()
1760
1761     def test_tun_66(self):
1762         """mGRE IPSec 66"""
1763         for p in self.multi_params:
1764             self.verify_tun_66(p, count=63)
1765
1766
1767 class TemplateIpsec4TunProtect(object):
1768     """ IPsec IPv4 Tunnel protect """
1769
1770     encryption_type = ESP
1771     tun4_encrypt_node_name = "esp4-encrypt-tun"
1772     tun4_decrypt_node_name = "esp4-decrypt-tun"
1773     tun4_input_node = "ipsec4-tun-input"
1774
1775     def config_sa_tra(self, p):
1776         config_tun_params(p, self.encryption_type, p.tun_if)
1777
1778         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1779                                   p.auth_algo_vpp_id, p.auth_key,
1780                                   p.crypt_algo_vpp_id, p.crypt_key,
1781                                   self.vpp_esp_protocol,
1782                                   flags=p.flags)
1783         p.tun_sa_out.add_vpp_config()
1784
1785         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1786                                  p.auth_algo_vpp_id, p.auth_key,
1787                                  p.crypt_algo_vpp_id, p.crypt_key,
1788                                  self.vpp_esp_protocol,
1789                                  flags=p.flags)
1790         p.tun_sa_in.add_vpp_config()
1791
1792     def config_sa_tun(self, p):
1793         config_tun_params(p, self.encryption_type, p.tun_if)
1794
1795         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1796                                   p.auth_algo_vpp_id, p.auth_key,
1797                                   p.crypt_algo_vpp_id, p.crypt_key,
1798                                   self.vpp_esp_protocol,
1799                                   self.tun_if.local_addr[p.addr_type],
1800                                   self.tun_if.remote_addr[p.addr_type],
1801                                   flags=p.flags)
1802         p.tun_sa_out.add_vpp_config()
1803
1804         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1805                                  p.auth_algo_vpp_id, p.auth_key,
1806                                  p.crypt_algo_vpp_id, p.crypt_key,
1807                                  self.vpp_esp_protocol,
1808                                  self.tun_if.remote_addr[p.addr_type],
1809                                  self.tun_if.local_addr[p.addr_type],
1810                                  flags=p.flags)
1811         p.tun_sa_in.add_vpp_config()
1812
1813     def config_protect(self, p):
1814         p.tun_protect = VppIpsecTunProtect(self,
1815                                            p.tun_if,
1816                                            p.tun_sa_out,
1817                                            [p.tun_sa_in])
1818         p.tun_protect.add_vpp_config()
1819
1820     def config_network(self, p):
1821         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1822                                        self.pg0.local_ip4,
1823                                        self.pg0.remote_ip4)
1824         p.tun_if.add_vpp_config()
1825         p.tun_if.admin_up()
1826         p.tun_if.config_ip4()
1827         p.tun_if.config_ip6()
1828
1829         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1830                              [VppRoutePath(p.tun_if.remote_ip4,
1831                                            0xffffffff)])
1832         p.route.add_vpp_config()
1833         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1834                        [VppRoutePath(p.tun_if.remote_ip6,
1835                                      0xffffffff,
1836                                      proto=DpoProto.DPO_PROTO_IP6)])
1837         r.add_vpp_config()
1838
1839     def unconfig_network(self, p):
1840         p.route.remove_vpp_config()
1841         p.tun_if.remove_vpp_config()
1842
1843     def unconfig_protect(self, p):
1844         p.tun_protect.remove_vpp_config()
1845
1846     def unconfig_sa(self, p):
1847         p.tun_sa_out.remove_vpp_config()
1848         p.tun_sa_in.remove_vpp_config()
1849
1850
1851 class TestIpsec4TunProtect(TemplateIpsec,
1852                            TemplateIpsec4TunProtect,
1853                            IpsecTun4):
1854     """ IPsec IPv4 Tunnel protect - transport mode"""
1855
1856     def setUp(self):
1857         super(TestIpsec4TunProtect, self).setUp()
1858
1859         self.tun_if = self.pg0
1860
1861     def tearDown(self):
1862         super(TestIpsec4TunProtect, self).tearDown()
1863
1864     def test_tun_44(self):
1865         """IPSEC tunnel protect"""
1866
1867         p = self.ipv4_params
1868
1869         self.config_network(p)
1870         self.config_sa_tra(p)
1871         self.config_protect(p)
1872
1873         self.verify_tun_44(p, count=127)
1874         c = p.tun_if.get_rx_stats()
1875         self.assertEqual(c['packets'], 127)
1876         c = p.tun_if.get_tx_stats()
1877         self.assertEqual(c['packets'], 127)
1878
1879         self.vapi.cli("clear ipsec sa")
1880         self.verify_tun_64(p, count=127)
1881         c = p.tun_if.get_rx_stats()
1882         self.assertEqual(c['packets'], 254)
1883         c = p.tun_if.get_tx_stats()
1884         self.assertEqual(c['packets'], 254)
1885
1886         # rekey - create new SAs and update the tunnel protection
1887         np = copy.copy(p)
1888         np.crypt_key = b'X' + p.crypt_key[1:]
1889         np.scapy_tun_spi += 100
1890         np.scapy_tun_sa_id += 1
1891         np.vpp_tun_spi += 100
1892         np.vpp_tun_sa_id += 1
1893         np.tun_if.local_spi = p.vpp_tun_spi
1894         np.tun_if.remote_spi = p.scapy_tun_spi
1895
1896         self.config_sa_tra(np)
1897         self.config_protect(np)
1898         self.unconfig_sa(p)
1899
1900         self.verify_tun_44(np, count=127)
1901         c = p.tun_if.get_rx_stats()
1902         self.assertEqual(c['packets'], 381)
1903         c = p.tun_if.get_tx_stats()
1904         self.assertEqual(c['packets'], 381)
1905
1906         # teardown
1907         self.unconfig_protect(np)
1908         self.unconfig_sa(np)
1909         self.unconfig_network(p)
1910
1911
1912 class TestIpsec4TunProtectUdp(TemplateIpsec,
1913                               TemplateIpsec4TunProtect,
1914                               IpsecTun4):
1915     """ IPsec IPv4 Tunnel protect - transport mode"""
1916
1917     def setUp(self):
1918         super(TestIpsec4TunProtectUdp, self).setUp()
1919
1920         self.tun_if = self.pg0
1921
1922         p = self.ipv4_params
1923         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1924                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1925         p.nat_header = UDP(sport=4500, dport=4500)
1926         self.config_network(p)
1927         self.config_sa_tra(p)
1928         self.config_protect(p)
1929
1930     def tearDown(self):
1931         p = self.ipv4_params
1932         self.unconfig_protect(p)
1933         self.unconfig_sa(p)
1934         self.unconfig_network(p)
1935         super(TestIpsec4TunProtectUdp, self).tearDown()
1936
1937     def verify_encrypted(self, p, sa, rxs):
1938         # ensure encrypted packets are recieved with the default UDP ports
1939         for rx in rxs:
1940             self.assertEqual(rx[UDP].sport, 4500)
1941             self.assertEqual(rx[UDP].dport, 4500)
1942         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1943
1944     def test_tun_44(self):
1945         """IPSEC UDP tunnel protect"""
1946
1947         p = self.ipv4_params
1948
1949         self.verify_tun_44(p, count=127)
1950         c = p.tun_if.get_rx_stats()
1951         self.assertEqual(c['packets'], 127)
1952         c = p.tun_if.get_tx_stats()
1953         self.assertEqual(c['packets'], 127)
1954
1955     def test_keepalive(self):
1956         """ IPSEC NAT Keepalive """
1957         self.verify_keepalive(self.ipv4_params)
1958
1959
1960 class TestIpsec4TunProtectTun(TemplateIpsec,
1961                               TemplateIpsec4TunProtect,
1962                               IpsecTun4):
1963     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1964
1965     encryption_type = ESP
1966     tun4_encrypt_node_name = "esp4-encrypt-tun"
1967     tun4_decrypt_node_name = "esp4-decrypt-tun"
1968
1969     def setUp(self):
1970         super(TestIpsec4TunProtectTun, self).setUp()
1971
1972         self.tun_if = self.pg0
1973
1974     def tearDown(self):
1975         super(TestIpsec4TunProtectTun, self).tearDown()
1976
1977     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1978                          payload_size=100):
1979         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1980                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1981                               dst=sw_intf.local_ip4) /
1982                            IP(src=src, dst=dst) /
1983                            UDP(sport=1144, dport=2233) /
1984                            Raw(b'X' * payload_size))
1985                 for i in range(count)]
1986
1987     def gen_pkts(self, sw_intf, src, dst, count=1,
1988                  payload_size=100):
1989         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1990                 IP(src=src, dst=dst) /
1991                 UDP(sport=1144, dport=2233) /
1992                 Raw(b'X' * payload_size)
1993                 for i in range(count)]
1994
1995     def verify_decrypted(self, p, rxs):
1996         for rx in rxs:
1997             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1998             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1999             self.assert_packet_checksums_valid(rx)
2000
2001     def verify_encrypted(self, p, sa, rxs):
2002         for rx in rxs:
2003             try:
2004                 pkt = sa.decrypt(rx[IP])
2005                 if not pkt.haslayer(IP):
2006                     pkt = IP(pkt[Raw].load)
2007                 self.assert_packet_checksums_valid(pkt)
2008                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2009                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2010                 inner = pkt[IP].payload
2011                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2012
2013             except (IndexError, AssertionError):
2014                 self.logger.debug(ppp("Unexpected packet:", rx))
2015                 try:
2016                     self.logger.debug(ppp("Decrypted packet:", pkt))
2017                 except:
2018                     pass
2019                 raise
2020
2021     def test_tun_44(self):
2022         """IPSEC tunnel protect """
2023
2024         p = self.ipv4_params
2025
2026         self.config_network(p)
2027         self.config_sa_tun(p)
2028         self.config_protect(p)
2029
2030         self.verify_tun_44(p, count=127)
2031
2032         c = p.tun_if.get_rx_stats()
2033         self.assertEqual(c['packets'], 127)
2034         c = p.tun_if.get_tx_stats()
2035         self.assertEqual(c['packets'], 127)
2036
2037         # rekey - create new SAs and update the tunnel protection
2038         np = copy.copy(p)
2039         np.crypt_key = b'X' + p.crypt_key[1:]
2040         np.scapy_tun_spi += 100
2041         np.scapy_tun_sa_id += 1
2042         np.vpp_tun_spi += 100
2043         np.vpp_tun_sa_id += 1
2044         np.tun_if.local_spi = p.vpp_tun_spi
2045         np.tun_if.remote_spi = p.scapy_tun_spi
2046
2047         self.config_sa_tun(np)
2048         self.config_protect(np)
2049         self.unconfig_sa(p)
2050
2051         self.verify_tun_44(np, count=127)
2052         c = p.tun_if.get_rx_stats()
2053         self.assertEqual(c['packets'], 254)
2054         c = p.tun_if.get_tx_stats()
2055         self.assertEqual(c['packets'], 254)
2056
2057         # teardown
2058         self.unconfig_protect(np)
2059         self.unconfig_sa(np)
2060         self.unconfig_network(p)
2061
2062
2063 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2064                                   TemplateIpsec4TunProtect,
2065                                   IpsecTun4):
2066     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2067
2068     encryption_type = ESP
2069     tun4_encrypt_node_name = "esp4-encrypt-tun"
2070     tun4_decrypt_node_name = "esp4-decrypt-tun"
2071
2072     def setUp(self):
2073         super(TestIpsec4TunProtectTunDrop, self).setUp()
2074
2075         self.tun_if = self.pg0
2076
2077     def tearDown(self):
2078         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2079
2080     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2081                          payload_size=100):
2082         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2083                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2084                               dst="5.5.5.5") /
2085                            IP(src=src, dst=dst) /
2086                            UDP(sport=1144, dport=2233) /
2087                            Raw(b'X' * payload_size))
2088                 for i in range(count)]
2089
2090     def test_tun_drop_44(self):
2091         """IPSEC tunnel protect bogus tunnel header """
2092
2093         p = self.ipv4_params
2094
2095         self.config_network(p)
2096         self.config_sa_tun(p)
2097         self.config_protect(p)
2098
2099         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2100                                    src=p.remote_tun_if_host,
2101                                    dst=self.pg1.remote_ip4,
2102                                    count=63)
2103         self.send_and_assert_no_replies(self.tun_if, tx)
2104
2105         # teardown
2106         self.unconfig_protect(p)
2107         self.unconfig_sa(p)
2108         self.unconfig_network(p)
2109
2110
2111 class TemplateIpsec6TunProtect(object):
2112     """ IPsec IPv6 Tunnel protect """
2113
2114     def config_sa_tra(self, p):
2115         config_tun_params(p, self.encryption_type, p.tun_if)
2116
2117         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2118                                   p.auth_algo_vpp_id, p.auth_key,
2119                                   p.crypt_algo_vpp_id, p.crypt_key,
2120                                   self.vpp_esp_protocol)
2121         p.tun_sa_out.add_vpp_config()
2122
2123         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2124                                  p.auth_algo_vpp_id, p.auth_key,
2125                                  p.crypt_algo_vpp_id, p.crypt_key,
2126                                  self.vpp_esp_protocol)
2127         p.tun_sa_in.add_vpp_config()
2128
2129     def config_sa_tun(self, p):
2130         config_tun_params(p, self.encryption_type, p.tun_if)
2131
2132         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2133                                   p.auth_algo_vpp_id, p.auth_key,
2134                                   p.crypt_algo_vpp_id, p.crypt_key,
2135                                   self.vpp_esp_protocol,
2136                                   self.tun_if.local_addr[p.addr_type],
2137                                   self.tun_if.remote_addr[p.addr_type])
2138         p.tun_sa_out.add_vpp_config()
2139
2140         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2141                                  p.auth_algo_vpp_id, p.auth_key,
2142                                  p.crypt_algo_vpp_id, p.crypt_key,
2143                                  self.vpp_esp_protocol,
2144                                  self.tun_if.remote_addr[p.addr_type],
2145                                  self.tun_if.local_addr[p.addr_type])
2146         p.tun_sa_in.add_vpp_config()
2147
2148     def config_protect(self, p):
2149         p.tun_protect = VppIpsecTunProtect(self,
2150                                            p.tun_if,
2151                                            p.tun_sa_out,
2152                                            [p.tun_sa_in])
2153         p.tun_protect.add_vpp_config()
2154
2155     def config_network(self, p):
2156         p.tun_if = VppIpIpTunInterface(self, self.pg0,
2157                                        self.pg0.local_ip6,
2158                                        self.pg0.remote_ip6)
2159         p.tun_if.add_vpp_config()
2160         p.tun_if.admin_up()
2161         p.tun_if.config_ip6()
2162         p.tun_if.config_ip4()
2163
2164         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2165                              [VppRoutePath(p.tun_if.remote_ip6,
2166                                            0xffffffff,
2167                                            proto=DpoProto.DPO_PROTO_IP6)])
2168         p.route.add_vpp_config()
2169         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2170                        [VppRoutePath(p.tun_if.remote_ip4,
2171                                      0xffffffff)])
2172         r.add_vpp_config()
2173
2174     def unconfig_network(self, p):
2175         p.route.remove_vpp_config()
2176         p.tun_if.remove_vpp_config()
2177
2178     def unconfig_protect(self, p):
2179         p.tun_protect.remove_vpp_config()
2180
2181     def unconfig_sa(self, p):
2182         p.tun_sa_out.remove_vpp_config()
2183         p.tun_sa_in.remove_vpp_config()
2184
2185
2186 class TestIpsec6TunProtect(TemplateIpsec,
2187                            TemplateIpsec6TunProtect,
2188                            IpsecTun6):
2189     """ IPsec IPv6 Tunnel protect - transport mode"""
2190
2191     encryption_type = ESP
2192     tun6_encrypt_node_name = "esp6-encrypt-tun"
2193     tun6_decrypt_node_name = "esp6-decrypt-tun"
2194
2195     def setUp(self):
2196         super(TestIpsec6TunProtect, self).setUp()
2197
2198         self.tun_if = self.pg0
2199
2200     def tearDown(self):
2201         super(TestIpsec6TunProtect, self).tearDown()
2202
2203     def test_tun_66(self):
2204         """IPSEC tunnel protect 6o6"""
2205
2206         p = self.ipv6_params
2207
2208         self.config_network(p)
2209         self.config_sa_tra(p)
2210         self.config_protect(p)
2211
2212         self.verify_tun_66(p, count=127)
2213         c = p.tun_if.get_rx_stats()
2214         self.assertEqual(c['packets'], 127)
2215         c = p.tun_if.get_tx_stats()
2216         self.assertEqual(c['packets'], 127)
2217
2218         # rekey - create new SAs and update the tunnel protection
2219         np = copy.copy(p)
2220         np.crypt_key = b'X' + p.crypt_key[1:]
2221         np.scapy_tun_spi += 100
2222         np.scapy_tun_sa_id += 1
2223         np.vpp_tun_spi += 100
2224         np.vpp_tun_sa_id += 1
2225         np.tun_if.local_spi = p.vpp_tun_spi
2226         np.tun_if.remote_spi = p.scapy_tun_spi
2227
2228         self.config_sa_tra(np)
2229         self.config_protect(np)
2230         self.unconfig_sa(p)
2231
2232         self.verify_tun_66(np, count=127)
2233         c = p.tun_if.get_rx_stats()
2234         self.assertEqual(c['packets'], 254)
2235         c = p.tun_if.get_tx_stats()
2236         self.assertEqual(c['packets'], 254)
2237
2238         # bounce the interface state
2239         p.tun_if.admin_down()
2240         self.verify_drop_tun_66(np, count=127)
2241         node = ('/err/ipsec6-tun-input/%s' %
2242                 'ipsec packets received on disabled interface')
2243         self.assertEqual(127, self.statistics.get_err_counter(node))
2244         p.tun_if.admin_up()
2245         self.verify_tun_66(np, count=127)
2246
2247         # 3 phase rekey
2248         #  1) add two input SAs [old, new]
2249         #  2) swap output SA to [new]
2250         #  3) use only [new] input SA
2251         np3 = copy.copy(np)
2252         np3.crypt_key = b'Z' + p.crypt_key[1:]
2253         np3.scapy_tun_spi += 100
2254         np3.scapy_tun_sa_id += 1
2255         np3.vpp_tun_spi += 100
2256         np3.vpp_tun_sa_id += 1
2257         np3.tun_if.local_spi = p.vpp_tun_spi
2258         np3.tun_if.remote_spi = p.scapy_tun_spi
2259
2260         self.config_sa_tra(np3)
2261
2262         # step 1;
2263         p.tun_protect.update_vpp_config(np.tun_sa_out,
2264                                         [np.tun_sa_in, np3.tun_sa_in])
2265         self.verify_tun_66(np, np, count=127)
2266         self.verify_tun_66(np3, np, count=127)
2267
2268         # step 2;
2269         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2270                                         [np.tun_sa_in, np3.tun_sa_in])
2271         self.verify_tun_66(np, np3, count=127)
2272         self.verify_tun_66(np3, np3, count=127)
2273
2274         # step 1;
2275         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2276                                         [np3.tun_sa_in])
2277         self.verify_tun_66(np3, np3, count=127)
2278         self.verify_drop_tun_66(np, count=127)
2279
2280         c = p.tun_if.get_rx_stats()
2281         self.assertEqual(c['packets'], 127*9)
2282         c = p.tun_if.get_tx_stats()
2283         self.assertEqual(c['packets'], 127*8)
2284         self.unconfig_sa(np)
2285
2286         # teardown
2287         self.unconfig_protect(np3)
2288         self.unconfig_sa(np3)
2289         self.unconfig_network(p)
2290
2291     def test_tun_46(self):
2292         """IPSEC tunnel protect 4o6"""
2293
2294         p = self.ipv6_params
2295
2296         self.config_network(p)
2297         self.config_sa_tra(p)
2298         self.config_protect(p)
2299
2300         self.verify_tun_46(p, count=127)
2301         c = p.tun_if.get_rx_stats()
2302         self.assertEqual(c['packets'], 127)
2303         c = p.tun_if.get_tx_stats()
2304         self.assertEqual(c['packets'], 127)
2305
2306         # teardown
2307         self.unconfig_protect(p)
2308         self.unconfig_sa(p)
2309         self.unconfig_network(p)
2310
2311
2312 class TestIpsec6TunProtectTun(TemplateIpsec,
2313                               TemplateIpsec6TunProtect,
2314                               IpsecTun6):
2315     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2316
2317     encryption_type = ESP
2318     tun6_encrypt_node_name = "esp6-encrypt-tun"
2319     tun6_decrypt_node_name = "esp6-decrypt-tun"
2320
2321     def setUp(self):
2322         super(TestIpsec6TunProtectTun, self).setUp()
2323
2324         self.tun_if = self.pg0
2325
2326     def tearDown(self):
2327         super(TestIpsec6TunProtectTun, self).tearDown()
2328
2329     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2330                           payload_size=100):
2331         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2332                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2333                                 dst=sw_intf.local_ip6) /
2334                            IPv6(src=src, dst=dst) /
2335                            UDP(sport=1166, dport=2233) /
2336                            Raw(b'X' * payload_size))
2337                 for i in range(count)]
2338
2339     def gen_pkts6(self, sw_intf, src, dst, count=1,
2340                   payload_size=100):
2341         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2342                 IPv6(src=src, dst=dst) /
2343                 UDP(sport=1166, dport=2233) /
2344                 Raw(b'X' * payload_size)
2345                 for i in range(count)]
2346
2347     def verify_decrypted6(self, p, rxs):
2348         for rx in rxs:
2349             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2350             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2351             self.assert_packet_checksums_valid(rx)
2352
2353     def verify_encrypted6(self, p, sa, rxs):
2354         for rx in rxs:
2355             try:
2356                 pkt = sa.decrypt(rx[IPv6])
2357                 if not pkt.haslayer(IPv6):
2358                     pkt = IPv6(pkt[Raw].load)
2359                 self.assert_packet_checksums_valid(pkt)
2360                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2361                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2362                 inner = pkt[IPv6].payload
2363                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2364
2365             except (IndexError, AssertionError):
2366                 self.logger.debug(ppp("Unexpected packet:", rx))
2367                 try:
2368                     self.logger.debug(ppp("Decrypted packet:", pkt))
2369                 except:
2370                     pass
2371                 raise
2372
2373     def test_tun_66(self):
2374         """IPSEC tunnel protect """
2375
2376         p = self.ipv6_params
2377
2378         self.config_network(p)
2379         self.config_sa_tun(p)
2380         self.config_protect(p)
2381
2382         self.verify_tun_66(p, count=127)
2383
2384         c = p.tun_if.get_rx_stats()
2385         self.assertEqual(c['packets'], 127)
2386         c = p.tun_if.get_tx_stats()
2387         self.assertEqual(c['packets'], 127)
2388
2389         # rekey - create new SAs and update the tunnel protection
2390         np = copy.copy(p)
2391         np.crypt_key = b'X' + p.crypt_key[1:]
2392         np.scapy_tun_spi += 100
2393         np.scapy_tun_sa_id += 1
2394         np.vpp_tun_spi += 100
2395         np.vpp_tun_sa_id += 1
2396         np.tun_if.local_spi = p.vpp_tun_spi
2397         np.tun_if.remote_spi = p.scapy_tun_spi
2398
2399         self.config_sa_tun(np)
2400         self.config_protect(np)
2401         self.unconfig_sa(p)
2402
2403         self.verify_tun_66(np, count=127)
2404         c = p.tun_if.get_rx_stats()
2405         self.assertEqual(c['packets'], 254)
2406         c = p.tun_if.get_tx_stats()
2407         self.assertEqual(c['packets'], 254)
2408
2409         # teardown
2410         self.unconfig_protect(np)
2411         self.unconfig_sa(np)
2412         self.unconfig_network(p)
2413
2414
2415 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2416                                   TemplateIpsec6TunProtect,
2417                                   IpsecTun6):
2418     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2419
2420     encryption_type = ESP
2421     tun6_encrypt_node_name = "esp6-encrypt-tun"
2422     tun6_decrypt_node_name = "esp6-decrypt-tun"
2423
2424     def setUp(self):
2425         super(TestIpsec6TunProtectTunDrop, self).setUp()
2426
2427         self.tun_if = self.pg0
2428
2429     def tearDown(self):
2430         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2431
2432     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2433                           payload_size=100):
2434         # the IP destination of the revelaed packet does not match
2435         # that assigned to the tunnel
2436         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2437                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2438                                 dst="5::5") /
2439                            IPv6(src=src, dst=dst) /
2440                            UDP(sport=1144, dport=2233) /
2441                            Raw(b'X' * payload_size))
2442                 for i in range(count)]
2443
2444     def test_tun_drop_66(self):
2445         """IPSEC 6 tunnel protect bogus tunnel header """
2446
2447         p = self.ipv6_params
2448
2449         self.config_network(p)
2450         self.config_sa_tun(p)
2451         self.config_protect(p)
2452
2453         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2454                                     src=p.remote_tun_if_host,
2455                                     dst=self.pg1.remote_ip6,
2456                                     count=63)
2457         self.send_and_assert_no_replies(self.tun_if, tx)
2458
2459         self.unconfig_protect(p)
2460         self.unconfig_sa(p)
2461         self.unconfig_network(p)
2462
2463
2464 if __name__ == '__main__':
2465     unittest.main(testRunner=VppTestRunner)