ipsec: Targeted unit testing
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from util import ppp
21 from vpp_papi import VppEnum
22
23
24 def config_tun_params(p, encryption_type, tun_if):
25     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
26     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
27                              IPSEC_API_SAD_FLAG_USE_ESN))
28     crypt_key = mk_scapy_crypt_key(p)
29     p.scapy_tun_sa = SecurityAssociation(
30         encryption_type, spi=p.vpp_tun_spi,
31         crypt_algo=p.crypt_algo,
32         crypt_key=crypt_key,
33         auth_algo=p.auth_algo, auth_key=p.auth_key,
34         tunnel_header=ip_class_by_addr_type[p.addr_type](
35             src=tun_if.remote_ip,
36             dst=tun_if.local_ip),
37         nat_t_header=p.nat_header,
38         esn_en=esn_en)
39     p.vpp_tun_sa = SecurityAssociation(
40         encryption_type, spi=p.scapy_tun_spi,
41         crypt_algo=p.crypt_algo,
42         crypt_key=crypt_key,
43         auth_algo=p.auth_algo, auth_key=p.auth_key,
44         tunnel_header=ip_class_by_addr_type[p.addr_type](
45             dst=tun_if.remote_ip,
46             src=tun_if.local_ip),
47         nat_t_header=p.nat_header,
48         esn_en=esn_en)
49
50
51 def config_tra_params(p, encryption_type, tun_if):
52     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
53     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
54                              IPSEC_API_SAD_FLAG_USE_ESN))
55     crypt_key = mk_scapy_crypt_key(p)
56     p.scapy_tun_sa = SecurityAssociation(
57         encryption_type, spi=p.vpp_tun_spi,
58         crypt_algo=p.crypt_algo,
59         crypt_key=crypt_key,
60         auth_algo=p.auth_algo, auth_key=p.auth_key,
61         esn_en=esn_en)
62     p.vpp_tun_sa = SecurityAssociation(
63         encryption_type, spi=p.scapy_tun_spi,
64         crypt_algo=p.crypt_algo,
65         crypt_key=crypt_key,
66         auth_algo=p.auth_algo, auth_key=p.auth_key,
67         esn_en=esn_en)
68
69
70 class TemplateIpsec4TunIfEsp(TemplateIpsec):
71     """ IPsec tunnel interface tests """
72
73     encryption_type = ESP
74
75     @classmethod
76     def setUpClass(cls):
77         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
78
79     @classmethod
80     def tearDownClass(cls):
81         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
82
83     def setUp(self):
84         super(TemplateIpsec4TunIfEsp, self).setUp()
85
86         self.tun_if = self.pg0
87
88         p = self.ipv4_params
89
90         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
91                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
92                                         p.crypt_key, p.crypt_key,
93                                         p.auth_algo_vpp_id, p.auth_key,
94                                         p.auth_key)
95         p.tun_if.add_vpp_config()
96         p.tun_if.admin_up()
97         p.tun_if.config_ip4()
98         p.tun_if.config_ip6()
99         config_tun_params(p, self.encryption_type, p.tun_if)
100
101         r = VppIpRoute(self, p.remote_tun_if_host, 32,
102                        [VppRoutePath(p.tun_if.remote_ip4,
103                                      0xffffffff)])
104         r.add_vpp_config()
105         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106                        [VppRoutePath(p.tun_if.remote_ip6,
107                                      0xffffffff,
108                                      proto=DpoProto.DPO_PROTO_IP6)])
109         r.add_vpp_config()
110
111     def tearDown(self):
112         super(TemplateIpsec4TunIfEsp, self).tearDown()
113
114
115 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
116     """ IPsec UDP tunnel interface tests """
117
118     tun4_encrypt_node_name = "esp4-encrypt-tun"
119     tun4_decrypt_node_name = "esp4-decrypt-tun"
120     encryption_type = ESP
121
122     @classmethod
123     def setUpClass(cls):
124         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
125
126     @classmethod
127     def tearDownClass(cls):
128         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
129
130     def setUp(self):
131         super(TemplateIpsec4TunIfEspUdp, self).setUp()
132
133         self.tun_if = self.pg0
134
135         p = self.ipv4_params
136         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
137                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
138         p.nat_header = UDP(sport=5454, dport=4500)
139
140         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
141                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
142                                         p.crypt_key, p.crypt_key,
143                                         p.auth_algo_vpp_id, p.auth_key,
144                                         p.auth_key, udp_encap=True)
145         p.tun_if.add_vpp_config()
146         p.tun_if.admin_up()
147         p.tun_if.config_ip4()
148         p.tun_if.config_ip6()
149         config_tun_params(p, self.encryption_type, p.tun_if)
150
151         r = VppIpRoute(self, p.remote_tun_if_host, 32,
152                        [VppRoutePath(p.tun_if.remote_ip4,
153                                      0xffffffff)])
154         r.add_vpp_config()
155         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
156                        [VppRoutePath(p.tun_if.remote_ip6,
157                                      0xffffffff,
158                                      proto=DpoProto.DPO_PROTO_IP6)])
159         r.add_vpp_config()
160
161     def tearDown(self):
162         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
163
164
165 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
166     """ Ipsec ESP - TUN tests """
167     tun4_encrypt_node_name = "esp4-encrypt-tun"
168     tun4_decrypt_node_name = "esp4-decrypt-tun"
169
170     def test_tun_basic64(self):
171         """ ipsec 6o4 tunnel basic test """
172         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
173
174         self.verify_tun_64(self.params[socket.AF_INET], count=1)
175
176     def test_tun_burst64(self):
177         """ ipsec 6o4 tunnel basic test """
178         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
179
180         self.verify_tun_64(self.params[socket.AF_INET], count=257)
181
182     def test_tun_basic_frag44(self):
183         """ ipsec 4o4 tunnel frag basic test """
184         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
185
186         p = self.ipv4_params
187
188         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
189                                        [1500, 0, 0, 0])
190         self.verify_tun_44(self.params[socket.AF_INET],
191                            count=1, payload_size=1800, n_rx=2)
192         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
193                                        [9000, 0, 0, 0])
194
195
196 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
197     """ Ipsec ESP UDP tests """
198
199     tun4_input_node = "ipsec4-tun-input"
200
201     def test_keepalive(self):
202         """ IPSEC NAT Keepalive """
203         self.verify_keepalive(self.ipv4_params)
204
205
206 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
207     """ Ipsec ESP - TCP tests """
208     pass
209
210
211 class TemplateIpsec6TunIfEsp(TemplateIpsec):
212     """ IPsec tunnel interface tests """
213
214     encryption_type = ESP
215
216     def setUp(self):
217         super(TemplateIpsec6TunIfEsp, self).setUp()
218
219         self.tun_if = self.pg0
220
221         p = self.ipv6_params
222         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
223                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
224                                         p.crypt_key, p.crypt_key,
225                                         p.auth_algo_vpp_id, p.auth_key,
226                                         p.auth_key, is_ip6=True)
227         p.tun_if.add_vpp_config()
228         p.tun_if.admin_up()
229         p.tun_if.config_ip6()
230         p.tun_if.config_ip4()
231         config_tun_params(p, self.encryption_type, p.tun_if)
232
233         r = VppIpRoute(self, p.remote_tun_if_host, 128,
234                        [VppRoutePath(p.tun_if.remote_ip6,
235                                      0xffffffff,
236                                      proto=DpoProto.DPO_PROTO_IP6)])
237         r.add_vpp_config()
238         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
239                        [VppRoutePath(p.tun_if.remote_ip4,
240                                      0xffffffff)])
241         r.add_vpp_config()
242
243     def tearDown(self):
244         super(TemplateIpsec6TunIfEsp, self).tearDown()
245
246
247 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
248                           IpsecTun6Tests):
249     """ Ipsec ESP - TUN tests """
250     tun6_encrypt_node_name = "esp6-encrypt-tun"
251     tun6_decrypt_node_name = "esp6-decrypt-tun"
252
253     def test_tun_basic46(self):
254         """ ipsec 4o6 tunnel basic test """
255         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
256         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
257
258     def test_tun_burst46(self):
259         """ ipsec 4o6 tunnel burst test """
260         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
261         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
262
263
264 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
265                                 IpsecTun6HandoffTests):
266     """ Ipsec ESP 6 Handoff tests """
267     tun6_encrypt_node_name = "esp6-encrypt-tun"
268     tun6_decrypt_node_name = "esp6-decrypt-tun"
269
270
271 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
272                                 IpsecTun4HandoffTests):
273     """ Ipsec ESP 4 Handoff tests """
274     tun4_encrypt_node_name = "esp4-encrypt-tun"
275     tun4_decrypt_node_name = "esp4-decrypt-tun"
276
277
278 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
279     """ IPsec IPv4 Multi Tunnel interface """
280
281     encryption_type = ESP
282     tun4_encrypt_node_name = "esp4-encrypt-tun"
283     tun4_decrypt_node_name = "esp4-decrypt-tun"
284
285     def setUp(self):
286         super(TestIpsec4MultiTunIfEsp, self).setUp()
287
288         self.tun_if = self.pg0
289
290         self.multi_params = []
291         self.pg0.generate_remote_hosts(10)
292         self.pg0.configure_ipv4_neighbors()
293
294         for ii in range(10):
295             p = copy.copy(self.ipv4_params)
296
297             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
298             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
299             p.scapy_tun_spi = p.scapy_tun_spi + ii
300             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
301             p.vpp_tun_spi = p.vpp_tun_spi + ii
302
303             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
304             p.scapy_tra_spi = p.scapy_tra_spi + ii
305             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
306             p.vpp_tra_spi = p.vpp_tra_spi + ii
307             p.tun_dst = self.pg0.remote_hosts[ii].ip4
308
309             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
310                                             p.scapy_tun_spi,
311                                             p.crypt_algo_vpp_id,
312                                             p.crypt_key, p.crypt_key,
313                                             p.auth_algo_vpp_id, p.auth_key,
314                                             p.auth_key,
315                                             dst=p.tun_dst)
316             p.tun_if.add_vpp_config()
317             p.tun_if.admin_up()
318             p.tun_if.config_ip4()
319             config_tun_params(p, self.encryption_type, p.tun_if)
320             self.multi_params.append(p)
321
322             VppIpRoute(self, p.remote_tun_if_host, 32,
323                        [VppRoutePath(p.tun_if.remote_ip4,
324                                      0xffffffff)]).add_vpp_config()
325
326     def tearDown(self):
327         super(TestIpsec4MultiTunIfEsp, self).tearDown()
328
329     def test_tun_44(self):
330         """Multiple IPSEC tunnel interfaces """
331         for p in self.multi_params:
332             self.verify_tun_44(p, count=127)
333             c = p.tun_if.get_rx_stats()
334             self.assertEqual(c['packets'], 127)
335             c = p.tun_if.get_tx_stats()
336             self.assertEqual(c['packets'], 127)
337
338     def test_tun_rr_44(self):
339         """ Round-robin packets acrros multiple interface """
340         tx = []
341         for p in self.multi_params:
342             tx = tx + self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
343                                             src=p.remote_tun_if_host,
344                                             dst=self.pg1.remote_ip4)
345         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
346
347         for rx, p in zip(rxs, self.multi_params):
348             self.verify_decrypted(p, [rx])
349
350         tx = []
351         for p in self.multi_params:
352             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
353                                     dst=p.remote_tun_if_host)
354         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
355
356         for rx, p in zip(rxs, self.multi_params):
357             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
358
359
360 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
361     """ IPsec IPv4 Tunnel interface all Algos """
362
363     encryption_type = ESP
364     tun4_encrypt_node_name = "esp4-encrypt-tun"
365     tun4_decrypt_node_name = "esp4-decrypt-tun"
366
367     def config_network(self, p):
368
369         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
370                                         p.scapy_tun_spi,
371                                         p.crypt_algo_vpp_id,
372                                         p.crypt_key, p.crypt_key,
373                                         p.auth_algo_vpp_id, p.auth_key,
374                                         p.auth_key,
375                                         salt=p.salt)
376         p.tun_if.add_vpp_config()
377         p.tun_if.admin_up()
378         p.tun_if.config_ip4()
379         config_tun_params(p, self.encryption_type, p.tun_if)
380         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
381         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
382
383         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
384                              [VppRoutePath(p.tun_if.remote_ip4,
385                                            0xffffffff)])
386         p.route.add_vpp_config()
387
388     def unconfig_network(self, p):
389         p.tun_if.unconfig_ip4()
390         p.tun_if.remove_vpp_config()
391         p.route.remove_vpp_config()
392
393     def setUp(self):
394         super(TestIpsec4TunIfEspAll, self).setUp()
395
396         self.tun_if = self.pg0
397
398     def tearDown(self):
399         super(TestIpsec4TunIfEspAll, self).tearDown()
400
401     def rekey(self, p):
402         #
403         # change the key and the SPI
404         #
405         p.crypt_key = b'X' + p.crypt_key[1:]
406         p.scapy_tun_spi += 1
407         p.scapy_tun_sa_id += 1
408         p.vpp_tun_spi += 1
409         p.vpp_tun_sa_id += 1
410         p.tun_if.local_spi = p.vpp_tun_spi
411         p.tun_if.remote_spi = p.scapy_tun_spi
412
413         config_tun_params(p, self.encryption_type, p.tun_if)
414
415         p.tun_sa_in = VppIpsecSA(self,
416                                  p.scapy_tun_sa_id,
417                                  p.scapy_tun_spi,
418                                  p.auth_algo_vpp_id,
419                                  p.auth_key,
420                                  p.crypt_algo_vpp_id,
421                                  p.crypt_key,
422                                  self.vpp_esp_protocol,
423                                  flags=p.flags,
424                                  salt=p.salt)
425         p.tun_sa_out = VppIpsecSA(self,
426                                   p.vpp_tun_sa_id,
427                                   p.vpp_tun_spi,
428                                   p.auth_algo_vpp_id,
429                                   p.auth_key,
430                                   p.crypt_algo_vpp_id,
431                                   p.crypt_key,
432                                   self.vpp_esp_protocol,
433                                   flags=p.flags,
434                                   salt=p.salt)
435         p.tun_sa_in.add_vpp_config()
436         p.tun_sa_out.add_vpp_config()
437
438         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
439                                          sa_id=p.tun_sa_in.id,
440                                          is_outbound=1)
441         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
442                                          sa_id=p.tun_sa_out.id,
443                                          is_outbound=0)
444         self.logger.info(self.vapi.cli("sh ipsec sa"))
445
446     def test_tun_44(self):
447         """IPSEC tunnel all algos """
448
449         # foreach VPP crypto engine
450         engines = ["ia32", "ipsecmb", "openssl"]
451
452         # foreach crypto algorithm
453         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
454                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
455                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
456                                 IPSEC_API_INTEG_ALG_NONE),
457                   'scapy-crypto': "AES-GCM",
458                   'scapy-integ': "NULL",
459                   'key': b"JPjyOWBeVEQiMe7h",
460                   'salt': 3333},
461                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
462                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
463                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
464                                 IPSEC_API_INTEG_ALG_NONE),
465                   'scapy-crypto': "AES-GCM",
466                   'scapy-integ': "NULL",
467                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
468                   'salt': 0},
469                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
470                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
471                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
472                                 IPSEC_API_INTEG_ALG_NONE),
473                   'scapy-crypto': "AES-GCM",
474                   'scapy-integ': "NULL",
475                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
476                   'salt': 9999},
477                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
478                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
479                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
480                                 IPSEC_API_INTEG_ALG_SHA1_96),
481                   'scapy-crypto': "AES-CBC",
482                   'scapy-integ': "HMAC-SHA1-96",
483                   'salt': 0,
484                   'key': b"JPjyOWBeVEQiMe7h"},
485                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
486                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
487                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
488                                 IPSEC_API_INTEG_ALG_SHA1_96),
489                   'scapy-crypto': "AES-CBC",
490                   'scapy-integ': "HMAC-SHA1-96",
491                   'salt': 0,
492                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
493                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
494                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
495                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
496                                 IPSEC_API_INTEG_ALG_SHA1_96),
497                   'scapy-crypto': "AES-CBC",
498                   'scapy-integ': "HMAC-SHA1-96",
499                   'salt': 0,
500                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
501                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
502                                  IPSEC_API_CRYPTO_ALG_NONE),
503                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
504                                 IPSEC_API_INTEG_ALG_SHA1_96),
505                   'scapy-crypto': "NULL",
506                   'scapy-integ': "HMAC-SHA1-96",
507                   'salt': 0,
508                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
509
510         for engine in engines:
511             self.vapi.cli("set crypto handler all %s" % engine)
512
513             #
514             # loop through each of the algorithms
515             #
516             for algo in algos:
517                 # with self.subTest(algo=algo['scapy']):
518
519                 p = copy.copy(self.ipv4_params)
520                 p.auth_algo_vpp_id = algo['vpp-integ']
521                 p.crypt_algo_vpp_id = algo['vpp-crypto']
522                 p.crypt_algo = algo['scapy-crypto']
523                 p.auth_algo = algo['scapy-integ']
524                 p.crypt_key = algo['key']
525                 p.salt = algo['salt']
526
527                 self.config_network(p)
528
529                 self.verify_tun_44(p, count=127)
530                 c = p.tun_if.get_rx_stats()
531                 self.assertEqual(c['packets'], 127)
532                 c = p.tun_if.get_tx_stats()
533                 self.assertEqual(c['packets'], 127)
534
535                 #
536                 # rekey the tunnel
537                 #
538                 self.rekey(p)
539                 self.verify_tun_44(p, count=127)
540
541                 self.unconfig_network(p)
542                 p.tun_sa_out.remove_vpp_config()
543                 p.tun_sa_in.remove_vpp_config()
544
545
546 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
547     """ IPsec IPv4 Tunnel interface all Algos """
548
549     encryption_type = ESP
550     tun4_encrypt_node_name = "esp4-encrypt-tun"
551     tun4_decrypt_node_name = "esp4-decrypt-tun"
552
553     def config_network(self, p):
554
555         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
556                               IPSEC_API_INTEG_ALG_NONE)
557         p.auth_algo = 'NULL'
558         p.auth_key = []
559
560         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
561                                IPSEC_API_CRYPTO_ALG_NONE)
562         p.crypt_algo = 'NULL'
563         p.crypt_key = []
564
565         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
566                                         p.scapy_tun_spi,
567                                         p.crypt_algo_vpp_id,
568                                         p.crypt_key, p.crypt_key,
569                                         p.auth_algo_vpp_id, p.auth_key,
570                                         p.auth_key,
571                                         salt=p.salt)
572         p.tun_if.add_vpp_config()
573         p.tun_if.admin_up()
574         p.tun_if.config_ip4()
575         config_tun_params(p, self.encryption_type, p.tun_if)
576         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
577         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
578
579         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
580                              [VppRoutePath(p.tun_if.remote_ip4,
581                                            0xffffffff)])
582         p.route.add_vpp_config()
583
584     def unconfig_network(self, p):
585         p.tun_if.unconfig_ip4()
586         p.tun_if.remove_vpp_config()
587         p.route.remove_vpp_config()
588
589     def setUp(self):
590         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
591
592         self.tun_if = self.pg0
593
594     def tearDown(self):
595         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
596
597     def test_tun_44(self):
598         p = self.ipv4_params
599
600         self.config_network(p)
601
602         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
603                            dst=p.remote_tun_if_host)
604         self.send_and_assert_no_replies(self.pg1, tx)
605
606         self.unconfig_network(p)
607
608
609 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
610     """ IPsec IPv6 Multi Tunnel interface """
611
612     encryption_type = ESP
613     tun6_encrypt_node_name = "esp6-encrypt-tun"
614     tun6_decrypt_node_name = "esp6-decrypt-tun"
615
616     def setUp(self):
617         super(TestIpsec6MultiTunIfEsp, self).setUp()
618
619         self.tun_if = self.pg0
620
621         self.multi_params = []
622         self.pg0.generate_remote_hosts(10)
623         self.pg0.configure_ipv6_neighbors()
624
625         for ii in range(10):
626             p = copy.copy(self.ipv6_params)
627
628             p.remote_tun_if_host = "1111::%d" % (ii + 1)
629             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
630             p.scapy_tun_spi = p.scapy_tun_spi + ii
631             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
632             p.vpp_tun_spi = p.vpp_tun_spi + ii
633
634             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
635             p.scapy_tra_spi = p.scapy_tra_spi + ii
636             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
637             p.vpp_tra_spi = p.vpp_tra_spi + ii
638
639             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
640                                             p.scapy_tun_spi,
641                                             p.crypt_algo_vpp_id,
642                                             p.crypt_key, p.crypt_key,
643                                             p.auth_algo_vpp_id, p.auth_key,
644                                             p.auth_key, is_ip6=True,
645                                             dst=self.pg0.remote_hosts[ii].ip6)
646             p.tun_if.add_vpp_config()
647             p.tun_if.admin_up()
648             p.tun_if.config_ip6()
649             config_tun_params(p, self.encryption_type, p.tun_if)
650             self.multi_params.append(p)
651
652             r = VppIpRoute(self, p.remote_tun_if_host, 128,
653                            [VppRoutePath(p.tun_if.remote_ip6,
654                                          0xffffffff,
655                                          proto=DpoProto.DPO_PROTO_IP6)])
656             r.add_vpp_config()
657
658     def tearDown(self):
659         super(TestIpsec6MultiTunIfEsp, self).tearDown()
660
661     def test_tun_66(self):
662         """Multiple IPSEC tunnel interfaces """
663         for p in self.multi_params:
664             self.verify_tun_66(p, count=127)
665             c = p.tun_if.get_rx_stats()
666             self.assertEqual(c['packets'], 127)
667             c = p.tun_if.get_tx_stats()
668             self.assertEqual(c['packets'], 127)
669
670
671 class TestIpsecGreTebIfEsp(TemplateIpsec,
672                            IpsecTun4Tests):
673     """ Ipsec GRE TEB ESP - TUN tests """
674     tun4_encrypt_node_name = "esp4-encrypt-tun"
675     tun4_decrypt_node_name = "esp4-decrypt-tun"
676     encryption_type = ESP
677     omac = "00:11:22:33:44:55"
678
679     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
680                          payload_size=100):
681         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
682                 sa.encrypt(IP(src=self.pg0.remote_ip4,
683                               dst=self.pg0.local_ip4) /
684                            GRE() /
685                            Ether(dst=self.omac) /
686                            IP(src="1.1.1.1", dst="1.1.1.2") /
687                            UDP(sport=1144, dport=2233) /
688                            Raw(b'X' * payload_size))
689                 for i in range(count)]
690
691     def gen_pkts(self, sw_intf, src, dst, count=1,
692                  payload_size=100):
693         return [Ether(dst=self.omac) /
694                 IP(src="1.1.1.1", dst="1.1.1.2") /
695                 UDP(sport=1144, dport=2233) /
696                 Raw(b'X' * payload_size)
697                 for i in range(count)]
698
699     def verify_decrypted(self, p, rxs):
700         for rx in rxs:
701             self.assert_equal(rx[Ether].dst, self.omac)
702             self.assert_equal(rx[IP].dst, "1.1.1.2")
703
704     def verify_encrypted(self, p, sa, rxs):
705         for rx in rxs:
706             try:
707                 pkt = sa.decrypt(rx[IP])
708                 if not pkt.haslayer(IP):
709                     pkt = IP(pkt[Raw].load)
710                 self.assert_packet_checksums_valid(pkt)
711                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
712                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
713                 self.assertTrue(pkt.haslayer(GRE))
714                 e = pkt[Ether]
715                 self.assertEqual(e[Ether].dst, self.omac)
716                 self.assertEqual(e[IP].dst, "1.1.1.2")
717             except (IndexError, AssertionError):
718                 self.logger.debug(ppp("Unexpected packet:", rx))
719                 try:
720                     self.logger.debug(ppp("Decrypted packet:", pkt))
721                 except:
722                     pass
723                 raise
724
725     def setUp(self):
726         super(TestIpsecGreTebIfEsp, self).setUp()
727
728         self.tun_if = self.pg0
729
730         p = self.ipv4_params
731
732         bd1 = VppBridgeDomain(self, 1)
733         bd1.add_vpp_config()
734
735         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
736                                   p.auth_algo_vpp_id, p.auth_key,
737                                   p.crypt_algo_vpp_id, p.crypt_key,
738                                   self.vpp_esp_protocol,
739                                   self.pg0.local_ip4,
740                                   self.pg0.remote_ip4)
741         p.tun_sa_out.add_vpp_config()
742
743         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
744                                  p.auth_algo_vpp_id, p.auth_key,
745                                  p.crypt_algo_vpp_id, p.crypt_key,
746                                  self.vpp_esp_protocol,
747                                  self.pg0.remote_ip4,
748                                  self.pg0.local_ip4)
749         p.tun_sa_in.add_vpp_config()
750
751         p.tun_if = VppGreInterface(self,
752                                    self.pg0.local_ip4,
753                                    self.pg0.remote_ip4,
754                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
755                                          GRE_API_TUNNEL_TYPE_TEB))
756         p.tun_if.add_vpp_config()
757
758         p.tun_protect = VppIpsecTunProtect(self,
759                                            p.tun_if,
760                                            p.tun_sa_out,
761                                            [p.tun_sa_in])
762
763         p.tun_protect.add_vpp_config()
764
765         p.tun_if.admin_up()
766         p.tun_if.config_ip4()
767         config_tun_params(p, self.encryption_type, p.tun_if)
768
769         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
770         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
771
772         self.vapi.cli("clear ipsec sa")
773
774     def tearDown(self):
775         p = self.ipv4_params
776         p.tun_if.unconfig_ip4()
777         super(TestIpsecGreTebIfEsp, self).tearDown()
778
779
780 class TestIpsecGreTebIfEspTra(TemplateIpsec,
781                               IpsecTun4Tests):
782     """ Ipsec GRE TEB ESP - Tra tests """
783     tun4_encrypt_node_name = "esp4-encrypt-tun"
784     tun4_decrypt_node_name = "esp4-decrypt-tun"
785     encryption_type = ESP
786     omac = "00:11:22:33:44:55"
787
788     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
789                          payload_size=100):
790         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
791                 sa.encrypt(IP(src=self.pg0.remote_ip4,
792                               dst=self.pg0.local_ip4) /
793                            GRE() /
794                            Ether(dst=self.omac) /
795                            IP(src="1.1.1.1", dst="1.1.1.2") /
796                            UDP(sport=1144, dport=2233) /
797                            Raw(b'X' * payload_size))
798                 for i in range(count)]
799
800     def gen_pkts(self, sw_intf, src, dst, count=1,
801                  payload_size=100):
802         return [Ether(dst=self.omac) /
803                 IP(src="1.1.1.1", dst="1.1.1.2") /
804                 UDP(sport=1144, dport=2233) /
805                 Raw(b'X' * payload_size)
806                 for i in range(count)]
807
808     def verify_decrypted(self, p, rxs):
809         for rx in rxs:
810             self.assert_equal(rx[Ether].dst, self.omac)
811             self.assert_equal(rx[IP].dst, "1.1.1.2")
812
813     def verify_encrypted(self, p, sa, rxs):
814         for rx in rxs:
815             try:
816                 pkt = sa.decrypt(rx[IP])
817                 if not pkt.haslayer(IP):
818                     pkt = IP(pkt[Raw].load)
819                 self.assert_packet_checksums_valid(pkt)
820                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
821                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
822                 self.assertTrue(pkt.haslayer(GRE))
823                 e = pkt[Ether]
824                 self.assertEqual(e[Ether].dst, self.omac)
825                 self.assertEqual(e[IP].dst, "1.1.1.2")
826             except (IndexError, AssertionError):
827                 self.logger.debug(ppp("Unexpected packet:", rx))
828                 try:
829                     self.logger.debug(ppp("Decrypted packet:", pkt))
830                 except:
831                     pass
832                 raise
833
834     def setUp(self):
835         super(TestIpsecGreTebIfEspTra, self).setUp()
836
837         self.tun_if = self.pg0
838
839         p = self.ipv4_params
840
841         bd1 = VppBridgeDomain(self, 1)
842         bd1.add_vpp_config()
843
844         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
845                                   p.auth_algo_vpp_id, p.auth_key,
846                                   p.crypt_algo_vpp_id, p.crypt_key,
847                                   self.vpp_esp_protocol)
848         p.tun_sa_out.add_vpp_config()
849
850         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
851                                  p.auth_algo_vpp_id, p.auth_key,
852                                  p.crypt_algo_vpp_id, p.crypt_key,
853                                  self.vpp_esp_protocol)
854         p.tun_sa_in.add_vpp_config()
855
856         p.tun_if = VppGreInterface(self,
857                                    self.pg0.local_ip4,
858                                    self.pg0.remote_ip4,
859                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
860                                          GRE_API_TUNNEL_TYPE_TEB))
861         p.tun_if.add_vpp_config()
862
863         p.tun_protect = VppIpsecTunProtect(self,
864                                            p.tun_if,
865                                            p.tun_sa_out,
866                                            [p.tun_sa_in])
867
868         p.tun_protect.add_vpp_config()
869
870         p.tun_if.admin_up()
871         p.tun_if.config_ip4()
872         config_tra_params(p, self.encryption_type, p.tun_if)
873
874         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
875         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
876
877         self.vapi.cli("clear ipsec sa")
878
879     def tearDown(self):
880         p = self.ipv4_params
881         p.tun_if.unconfig_ip4()
882         super(TestIpsecGreTebIfEspTra, self).tearDown()
883
884
885 class TestIpsecGreIfEsp(TemplateIpsec,
886                         IpsecTun4Tests):
887     """ Ipsec GRE ESP - TUN tests """
888     tun4_encrypt_node_name = "esp4-encrypt-tun"
889     tun4_decrypt_node_name = "esp4-decrypt-tun"
890     encryption_type = ESP
891
892     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
893                          payload_size=100):
894         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
895                 sa.encrypt(IP(src=self.pg0.remote_ip4,
896                               dst=self.pg0.local_ip4) /
897                            GRE() /
898                            IP(src=self.pg1.local_ip4,
899                               dst=self.pg1.remote_ip4) /
900                            UDP(sport=1144, dport=2233) /
901                            Raw(b'X' * payload_size))
902                 for i in range(count)]
903
904     def gen_pkts(self, sw_intf, src, dst, count=1,
905                  payload_size=100):
906         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
907                 IP(src="1.1.1.1", dst="1.1.1.2") /
908                 UDP(sport=1144, dport=2233) /
909                 Raw(b'X' * payload_size)
910                 for i in range(count)]
911
912     def verify_decrypted(self, p, rxs):
913         for rx in rxs:
914             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
915             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
916
917     def verify_encrypted(self, p, sa, rxs):
918         for rx in rxs:
919             try:
920                 pkt = sa.decrypt(rx[IP])
921                 if not pkt.haslayer(IP):
922                     pkt = IP(pkt[Raw].load)
923                 self.assert_packet_checksums_valid(pkt)
924                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
925                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
926                 self.assertTrue(pkt.haslayer(GRE))
927                 e = pkt[GRE]
928                 self.assertEqual(e[IP].dst, "1.1.1.2")
929             except (IndexError, AssertionError):
930                 self.logger.debug(ppp("Unexpected packet:", rx))
931                 try:
932                     self.logger.debug(ppp("Decrypted packet:", pkt))
933                 except:
934                     pass
935                 raise
936
937     def setUp(self):
938         super(TestIpsecGreIfEsp, self).setUp()
939
940         self.tun_if = self.pg0
941
942         p = self.ipv4_params
943
944         bd1 = VppBridgeDomain(self, 1)
945         bd1.add_vpp_config()
946
947         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
948                                   p.auth_algo_vpp_id, p.auth_key,
949                                   p.crypt_algo_vpp_id, p.crypt_key,
950                                   self.vpp_esp_protocol,
951                                   self.pg0.local_ip4,
952                                   self.pg0.remote_ip4)
953         p.tun_sa_out.add_vpp_config()
954
955         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
956                                  p.auth_algo_vpp_id, p.auth_key,
957                                  p.crypt_algo_vpp_id, p.crypt_key,
958                                  self.vpp_esp_protocol,
959                                  self.pg0.remote_ip4,
960                                  self.pg0.local_ip4)
961         p.tun_sa_in.add_vpp_config()
962
963         p.tun_if = VppGreInterface(self,
964                                    self.pg0.local_ip4,
965                                    self.pg0.remote_ip4)
966         p.tun_if.add_vpp_config()
967
968         p.tun_protect = VppIpsecTunProtect(self,
969                                            p.tun_if,
970                                            p.tun_sa_out,
971                                            [p.tun_sa_in])
972         p.tun_protect.add_vpp_config()
973
974         p.tun_if.admin_up()
975         p.tun_if.config_ip4()
976         config_tun_params(p, self.encryption_type, p.tun_if)
977
978         VppIpRoute(self, "1.1.1.2", 32,
979                    [VppRoutePath(p.tun_if.remote_ip4,
980                                  0xffffffff)]).add_vpp_config()
981
982     def tearDown(self):
983         p = self.ipv4_params
984         p.tun_if.unconfig_ip4()
985         super(TestIpsecGreIfEsp, self).tearDown()
986
987
988 class TestIpsecGreIfEspTra(TemplateIpsec,
989                            IpsecTun4Tests):
990     """ Ipsec GRE ESP - TRA tests """
991     tun4_encrypt_node_name = "esp4-encrypt-tun"
992     tun4_decrypt_node_name = "esp4-decrypt-tun"
993     encryption_type = ESP
994
995     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
996                          payload_size=100):
997         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
998                 sa.encrypt(IP(src=self.pg0.remote_ip4,
999                               dst=self.pg0.local_ip4) /
1000                            GRE() /
1001                            IP(src=self.pg1.local_ip4,
1002                               dst=self.pg1.remote_ip4) /
1003                            UDP(sport=1144, dport=2233) /
1004                            Raw(b'X' * payload_size))
1005                 for i in range(count)]
1006
1007     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1008                                 payload_size=100):
1009         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1010                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1011                               dst=self.pg0.local_ip4) /
1012                            GRE() /
1013                            UDP(sport=1144, dport=2233) /
1014                            Raw(b'X' * payload_size))
1015                 for i in range(count)]
1016
1017     def gen_pkts(self, sw_intf, src, dst, count=1,
1018                  payload_size=100):
1019         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1020                 IP(src="1.1.1.1", dst="1.1.1.2") /
1021                 UDP(sport=1144, dport=2233) /
1022                 Raw(b'X' * payload_size)
1023                 for i in range(count)]
1024
1025     def verify_decrypted(self, p, rxs):
1026         for rx in rxs:
1027             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1028             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1029
1030     def verify_encrypted(self, p, sa, rxs):
1031         for rx in rxs:
1032             try:
1033                 pkt = sa.decrypt(rx[IP])
1034                 if not pkt.haslayer(IP):
1035                     pkt = IP(pkt[Raw].load)
1036                 self.assert_packet_checksums_valid(pkt)
1037                 self.assertTrue(pkt.haslayer(GRE))
1038                 e = pkt[GRE]
1039                 self.assertEqual(e[IP].dst, "1.1.1.2")
1040             except (IndexError, AssertionError):
1041                 self.logger.debug(ppp("Unexpected packet:", rx))
1042                 try:
1043                     self.logger.debug(ppp("Decrypted packet:", pkt))
1044                 except:
1045                     pass
1046                 raise
1047
1048     def setUp(self):
1049         super(TestIpsecGreIfEspTra, self).setUp()
1050
1051         self.tun_if = self.pg0
1052
1053         p = self.ipv4_params
1054
1055         bd1 = VppBridgeDomain(self, 1)
1056         bd1.add_vpp_config()
1057
1058         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1059                                   p.auth_algo_vpp_id, p.auth_key,
1060                                   p.crypt_algo_vpp_id, p.crypt_key,
1061                                   self.vpp_esp_protocol)
1062         p.tun_sa_out.add_vpp_config()
1063
1064         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1065                                  p.auth_algo_vpp_id, p.auth_key,
1066                                  p.crypt_algo_vpp_id, p.crypt_key,
1067                                  self.vpp_esp_protocol)
1068         p.tun_sa_in.add_vpp_config()
1069
1070         p.tun_if = VppGreInterface(self,
1071                                    self.pg0.local_ip4,
1072                                    self.pg0.remote_ip4)
1073         p.tun_if.add_vpp_config()
1074
1075         p.tun_protect = VppIpsecTunProtect(self,
1076                                            p.tun_if,
1077                                            p.tun_sa_out,
1078                                            [p.tun_sa_in])
1079         p.tun_protect.add_vpp_config()
1080
1081         p.tun_if.admin_up()
1082         p.tun_if.config_ip4()
1083         config_tra_params(p, self.encryption_type, p.tun_if)
1084
1085         VppIpRoute(self, "1.1.1.2", 32,
1086                    [VppRoutePath(p.tun_if.remote_ip4,
1087                                  0xffffffff)]).add_vpp_config()
1088
1089     def tearDown(self):
1090         p = self.ipv4_params
1091         p.tun_if.unconfig_ip4()
1092         super(TestIpsecGreIfEspTra, self).tearDown()
1093
1094     def test_gre_non_ip(self):
1095         p = self.ipv4_params
1096         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1097                                           src=p.remote_tun_if_host,
1098                                           dst=self.pg1.remote_ip6)
1099         self.send_and_assert_no_replies(self.tun_if, tx)
1100         node_name = ('/err/%s/unsupported payload' %
1101                      self.tun4_decrypt_node_name)
1102         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1103
1104
1105 class TestIpsecGre6IfEspTra(TemplateIpsec,
1106                             IpsecTun6Tests):
1107     """ Ipsec GRE ESP - TRA tests """
1108     tun6_encrypt_node_name = "esp6-encrypt-tun"
1109     tun6_decrypt_node_name = "esp6-decrypt-tun"
1110     encryption_type = ESP
1111
1112     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1113                           payload_size=100):
1114         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1115                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1116                                 dst=self.pg0.local_ip6) /
1117                            GRE() /
1118                            IPv6(src=self.pg1.local_ip6,
1119                                 dst=self.pg1.remote_ip6) /
1120                            UDP(sport=1144, dport=2233) /
1121                            Raw(b'X' * payload_size))
1122                 for i in range(count)]
1123
1124     def gen_pkts6(self, sw_intf, src, dst, count=1,
1125                   payload_size=100):
1126         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1127                 IPv6(src="1::1", dst="1::2") /
1128                 UDP(sport=1144, dport=2233) /
1129                 Raw(b'X' * payload_size)
1130                 for i in range(count)]
1131
1132     def verify_decrypted6(self, p, rxs):
1133         for rx in rxs:
1134             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1135             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1136
1137     def verify_encrypted6(self, p, sa, rxs):
1138         for rx in rxs:
1139             try:
1140                 pkt = sa.decrypt(rx[IPv6])
1141                 if not pkt.haslayer(IPv6):
1142                     pkt = IPv6(pkt[Raw].load)
1143                 self.assert_packet_checksums_valid(pkt)
1144                 self.assertTrue(pkt.haslayer(GRE))
1145                 e = pkt[GRE]
1146                 self.assertEqual(e[IPv6].dst, "1::2")
1147             except (IndexError, AssertionError):
1148                 self.logger.debug(ppp("Unexpected packet:", rx))
1149                 try:
1150                     self.logger.debug(ppp("Decrypted packet:", pkt))
1151                 except:
1152                     pass
1153                 raise
1154
1155     def setUp(self):
1156         super(TestIpsecGre6IfEspTra, self).setUp()
1157
1158         self.tun_if = self.pg0
1159
1160         p = self.ipv6_params
1161
1162         bd1 = VppBridgeDomain(self, 1)
1163         bd1.add_vpp_config()
1164
1165         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1166                                   p.auth_algo_vpp_id, p.auth_key,
1167                                   p.crypt_algo_vpp_id, p.crypt_key,
1168                                   self.vpp_esp_protocol)
1169         p.tun_sa_out.add_vpp_config()
1170
1171         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1172                                  p.auth_algo_vpp_id, p.auth_key,
1173                                  p.crypt_algo_vpp_id, p.crypt_key,
1174                                  self.vpp_esp_protocol)
1175         p.tun_sa_in.add_vpp_config()
1176
1177         p.tun_if = VppGreInterface(self,
1178                                    self.pg0.local_ip6,
1179                                    self.pg0.remote_ip6)
1180         p.tun_if.add_vpp_config()
1181
1182         p.tun_protect = VppIpsecTunProtect(self,
1183                                            p.tun_if,
1184                                            p.tun_sa_out,
1185                                            [p.tun_sa_in])
1186         p.tun_protect.add_vpp_config()
1187
1188         p.tun_if.admin_up()
1189         p.tun_if.config_ip6()
1190         config_tra_params(p, self.encryption_type, p.tun_if)
1191
1192         r = VppIpRoute(self, "1::2", 128,
1193                        [VppRoutePath(p.tun_if.remote_ip6,
1194                                      0xffffffff,
1195                                      proto=DpoProto.DPO_PROTO_IP6)])
1196         r.add_vpp_config()
1197
1198     def tearDown(self):
1199         p = self.ipv6_params
1200         p.tun_if.unconfig_ip6()
1201         super(TestIpsecGre6IfEspTra, self).tearDown()
1202
1203
1204 class TemplateIpsec4TunProtect(object):
1205     """ IPsec IPv4 Tunnel protect """
1206
1207     encryption_type = ESP
1208     tun4_encrypt_node_name = "esp4-encrypt-tun"
1209     tun4_decrypt_node_name = "esp4-decrypt-tun"
1210     tun4_input_node = "ipsec4-tun-input"
1211
1212     def config_sa_tra(self, p):
1213         config_tun_params(p, self.encryption_type, p.tun_if)
1214
1215         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1216                                   p.auth_algo_vpp_id, p.auth_key,
1217                                   p.crypt_algo_vpp_id, p.crypt_key,
1218                                   self.vpp_esp_protocol,
1219                                   flags=p.flags)
1220         p.tun_sa_out.add_vpp_config()
1221
1222         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1223                                  p.auth_algo_vpp_id, p.auth_key,
1224                                  p.crypt_algo_vpp_id, p.crypt_key,
1225                                  self.vpp_esp_protocol,
1226                                  flags=p.flags)
1227         p.tun_sa_in.add_vpp_config()
1228
1229     def config_sa_tun(self, p):
1230         config_tun_params(p, self.encryption_type, p.tun_if)
1231
1232         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1233                                   p.auth_algo_vpp_id, p.auth_key,
1234                                   p.crypt_algo_vpp_id, p.crypt_key,
1235                                   self.vpp_esp_protocol,
1236                                   self.tun_if.remote_addr[p.addr_type],
1237                                   self.tun_if.local_addr[p.addr_type],
1238                                   flags=p.flags)
1239         p.tun_sa_out.add_vpp_config()
1240
1241         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1242                                  p.auth_algo_vpp_id, p.auth_key,
1243                                  p.crypt_algo_vpp_id, p.crypt_key,
1244                                  self.vpp_esp_protocol,
1245                                  self.tun_if.remote_addr[p.addr_type],
1246                                  self.tun_if.local_addr[p.addr_type],
1247                                  flags=p.flags)
1248         p.tun_sa_in.add_vpp_config()
1249
1250     def config_protect(self, p):
1251         p.tun_protect = VppIpsecTunProtect(self,
1252                                            p.tun_if,
1253                                            p.tun_sa_out,
1254                                            [p.tun_sa_in])
1255         p.tun_protect.add_vpp_config()
1256
1257     def config_network(self, p):
1258         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1259                                        self.pg0.local_ip4,
1260                                        self.pg0.remote_ip4)
1261         p.tun_if.add_vpp_config()
1262         p.tun_if.admin_up()
1263         p.tun_if.config_ip4()
1264         p.tun_if.config_ip6()
1265
1266         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1267                              [VppRoutePath(p.tun_if.remote_ip4,
1268                                            0xffffffff)])
1269         p.route.add_vpp_config()
1270         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1271                        [VppRoutePath(p.tun_if.remote_ip6,
1272                                      0xffffffff,
1273                                      proto=DpoProto.DPO_PROTO_IP6)])
1274         r.add_vpp_config()
1275
1276     def unconfig_network(self, p):
1277         p.route.remove_vpp_config()
1278         p.tun_if.remove_vpp_config()
1279
1280     def unconfig_protect(self, p):
1281         p.tun_protect.remove_vpp_config()
1282
1283     def unconfig_sa(self, p):
1284         p.tun_sa_out.remove_vpp_config()
1285         p.tun_sa_in.remove_vpp_config()
1286
1287
1288 class TestIpsec4TunProtect(TemplateIpsec,
1289                            TemplateIpsec4TunProtect,
1290                            IpsecTun4):
1291     """ IPsec IPv4 Tunnel protect - transport mode"""
1292
1293     def setUp(self):
1294         super(TestIpsec4TunProtect, self).setUp()
1295
1296         self.tun_if = self.pg0
1297
1298     def tearDown(self):
1299         super(TestIpsec4TunProtect, self).tearDown()
1300
1301     def test_tun_44(self):
1302         """IPSEC tunnel protect"""
1303
1304         p = self.ipv4_params
1305
1306         self.config_network(p)
1307         self.config_sa_tra(p)
1308         self.config_protect(p)
1309
1310         self.verify_tun_44(p, count=127)
1311         c = p.tun_if.get_rx_stats()
1312         self.assertEqual(c['packets'], 127)
1313         c = p.tun_if.get_tx_stats()
1314         self.assertEqual(c['packets'], 127)
1315
1316         self.vapi.cli("clear ipsec sa")
1317         self.verify_tun_64(p, count=127)
1318         c = p.tun_if.get_rx_stats()
1319         self.assertEqual(c['packets'], 254)
1320         c = p.tun_if.get_tx_stats()
1321         self.assertEqual(c['packets'], 254)
1322
1323         # rekey - create new SAs and update the tunnel protection
1324         np = copy.copy(p)
1325         np.crypt_key = b'X' + p.crypt_key[1:]
1326         np.scapy_tun_spi += 100
1327         np.scapy_tun_sa_id += 1
1328         np.vpp_tun_spi += 100
1329         np.vpp_tun_sa_id += 1
1330         np.tun_if.local_spi = p.vpp_tun_spi
1331         np.tun_if.remote_spi = p.scapy_tun_spi
1332
1333         self.config_sa_tra(np)
1334         self.config_protect(np)
1335         self.unconfig_sa(p)
1336
1337         self.verify_tun_44(np, count=127)
1338         c = p.tun_if.get_rx_stats()
1339         self.assertEqual(c['packets'], 381)
1340         c = p.tun_if.get_tx_stats()
1341         self.assertEqual(c['packets'], 381)
1342
1343         # teardown
1344         self.unconfig_protect(np)
1345         self.unconfig_sa(np)
1346         self.unconfig_network(p)
1347
1348
1349 class TestIpsec4TunProtectUdp(TemplateIpsec,
1350                               TemplateIpsec4TunProtect,
1351                               IpsecTun4):
1352     """ IPsec IPv4 Tunnel protect - transport mode"""
1353
1354     def setUp(self):
1355         super(TestIpsec4TunProtectUdp, self).setUp()
1356
1357         self.tun_if = self.pg0
1358
1359         p = self.ipv4_params
1360         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1361                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1362         p.nat_header = UDP(sport=5454, dport=4500)
1363         self.config_network(p)
1364         self.config_sa_tra(p)
1365         self.config_protect(p)
1366
1367     def tearDown(self):
1368         p = self.ipv4_params
1369         self.unconfig_protect(p)
1370         self.unconfig_sa(p)
1371         self.unconfig_network(p)
1372         super(TestIpsec4TunProtectUdp, self).tearDown()
1373
1374     def test_tun_44(self):
1375         """IPSEC UDP tunnel protect"""
1376
1377         p = self.ipv4_params
1378
1379         self.verify_tun_44(p, count=127)
1380         c = p.tun_if.get_rx_stats()
1381         self.assertEqual(c['packets'], 127)
1382         c = p.tun_if.get_tx_stats()
1383         self.assertEqual(c['packets'], 127)
1384
1385     def test_keepalive(self):
1386         """ IPSEC NAT Keepalive """
1387         self.verify_keepalive(self.ipv4_params)
1388
1389
1390 class TestIpsec4TunProtectTun(TemplateIpsec,
1391                               TemplateIpsec4TunProtect,
1392                               IpsecTun4):
1393     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1394
1395     encryption_type = ESP
1396     tun4_encrypt_node_name = "esp4-encrypt-tun"
1397     tun4_decrypt_node_name = "esp4-decrypt-tun"
1398
1399     def setUp(self):
1400         super(TestIpsec4TunProtectTun, self).setUp()
1401
1402         self.tun_if = self.pg0
1403
1404     def tearDown(self):
1405         super(TestIpsec4TunProtectTun, self).tearDown()
1406
1407     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1408                          payload_size=100):
1409         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1410                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1411                               dst=sw_intf.local_ip4) /
1412                            IP(src=src, dst=dst) /
1413                            UDP(sport=1144, dport=2233) /
1414                            Raw(b'X' * payload_size))
1415                 for i in range(count)]
1416
1417     def gen_pkts(self, sw_intf, src, dst, count=1,
1418                  payload_size=100):
1419         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1420                 IP(src=src, dst=dst) /
1421                 UDP(sport=1144, dport=2233) /
1422                 Raw(b'X' * payload_size)
1423                 for i in range(count)]
1424
1425     def verify_decrypted(self, p, rxs):
1426         for rx in rxs:
1427             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1428             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1429             self.assert_packet_checksums_valid(rx)
1430
1431     def verify_encrypted(self, p, sa, rxs):
1432         for rx in rxs:
1433             try:
1434                 pkt = sa.decrypt(rx[IP])
1435                 if not pkt.haslayer(IP):
1436                     pkt = IP(pkt[Raw].load)
1437                 self.assert_packet_checksums_valid(pkt)
1438                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1439                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1440                 inner = pkt[IP].payload
1441                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1442
1443             except (IndexError, AssertionError):
1444                 self.logger.debug(ppp("Unexpected packet:", rx))
1445                 try:
1446                     self.logger.debug(ppp("Decrypted packet:", pkt))
1447                 except:
1448                     pass
1449                 raise
1450
1451     def test_tun_44(self):
1452         """IPSEC tunnel protect """
1453
1454         p = self.ipv4_params
1455
1456         self.config_network(p)
1457         self.config_sa_tun(p)
1458         self.config_protect(p)
1459
1460         self.verify_tun_44(p, count=127)
1461
1462         c = p.tun_if.get_rx_stats()
1463         self.assertEqual(c['packets'], 127)
1464         c = p.tun_if.get_tx_stats()
1465         self.assertEqual(c['packets'], 127)
1466
1467         # rekey - create new SAs and update the tunnel protection
1468         np = copy.copy(p)
1469         np.crypt_key = b'X' + p.crypt_key[1:]
1470         np.scapy_tun_spi += 100
1471         np.scapy_tun_sa_id += 1
1472         np.vpp_tun_spi += 100
1473         np.vpp_tun_sa_id += 1
1474         np.tun_if.local_spi = p.vpp_tun_spi
1475         np.tun_if.remote_spi = p.scapy_tun_spi
1476
1477         self.config_sa_tun(np)
1478         self.config_protect(np)
1479         self.unconfig_sa(p)
1480
1481         self.verify_tun_44(np, count=127)
1482         c = p.tun_if.get_rx_stats()
1483         self.assertEqual(c['packets'], 254)
1484         c = p.tun_if.get_tx_stats()
1485         self.assertEqual(c['packets'], 254)
1486
1487         # teardown
1488         self.unconfig_protect(np)
1489         self.unconfig_sa(np)
1490         self.unconfig_network(p)
1491
1492
1493 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1494                                   TemplateIpsec4TunProtect,
1495                                   IpsecTun4):
1496     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1497
1498     encryption_type = ESP
1499     tun4_encrypt_node_name = "esp4-encrypt-tun"
1500     tun4_decrypt_node_name = "esp4-decrypt-tun"
1501
1502     def setUp(self):
1503         super(TestIpsec4TunProtectTunDrop, self).setUp()
1504
1505         self.tun_if = self.pg0
1506
1507     def tearDown(self):
1508         super(TestIpsec4TunProtectTunDrop, self).tearDown()
1509
1510     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1511                          payload_size=100):
1512         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1513                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1514                               dst="5.5.5.5") /
1515                            IP(src=src, dst=dst) /
1516                            UDP(sport=1144, dport=2233) /
1517                            Raw(b'X' * payload_size))
1518                 for i in range(count)]
1519
1520     def test_tun_drop_44(self):
1521         """IPSEC tunnel protect bogus tunnel header """
1522
1523         p = self.ipv4_params
1524
1525         self.config_network(p)
1526         self.config_sa_tun(p)
1527         self.config_protect(p)
1528
1529         tx = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1530                                    src=p.remote_tun_if_host,
1531                                    dst=self.pg1.remote_ip4,
1532                                    count=63)
1533         self.send_and_assert_no_replies(self.tun_if, tx)
1534
1535         # teardown
1536         self.unconfig_protect(p)
1537         self.unconfig_sa(p)
1538         self.unconfig_network(p)
1539
1540
1541 class TemplateIpsec6TunProtect(object):
1542     """ IPsec IPv6 Tunnel protect """
1543
1544     def config_sa_tra(self, p):
1545         config_tun_params(p, self.encryption_type, p.tun_if)
1546
1547         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1548                                   p.auth_algo_vpp_id, p.auth_key,
1549                                   p.crypt_algo_vpp_id, p.crypt_key,
1550                                   self.vpp_esp_protocol)
1551         p.tun_sa_out.add_vpp_config()
1552
1553         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1554                                  p.auth_algo_vpp_id, p.auth_key,
1555                                  p.crypt_algo_vpp_id, p.crypt_key,
1556                                  self.vpp_esp_protocol)
1557         p.tun_sa_in.add_vpp_config()
1558
1559     def config_sa_tun(self, p):
1560         config_tun_params(p, self.encryption_type, p.tun_if)
1561
1562         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1563                                   p.auth_algo_vpp_id, p.auth_key,
1564                                   p.crypt_algo_vpp_id, p.crypt_key,
1565                                   self.vpp_esp_protocol,
1566                                   self.tun_if.remote_addr[p.addr_type],
1567                                   self.tun_if.local_addr[p.addr_type])
1568         p.tun_sa_out.add_vpp_config()
1569
1570         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1571                                  p.auth_algo_vpp_id, p.auth_key,
1572                                  p.crypt_algo_vpp_id, p.crypt_key,
1573                                  self.vpp_esp_protocol,
1574                                  self.tun_if.remote_addr[p.addr_type],
1575                                  self.tun_if.local_addr[p.addr_type])
1576         p.tun_sa_in.add_vpp_config()
1577
1578     def config_protect(self, p):
1579         p.tun_protect = VppIpsecTunProtect(self,
1580                                            p.tun_if,
1581                                            p.tun_sa_out,
1582                                            [p.tun_sa_in])
1583         p.tun_protect.add_vpp_config()
1584
1585     def config_network(self, p):
1586         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1587                                        self.pg0.local_ip6,
1588                                        self.pg0.remote_ip6)
1589         p.tun_if.add_vpp_config()
1590         p.tun_if.admin_up()
1591         p.tun_if.config_ip6()
1592         p.tun_if.config_ip4()
1593
1594         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1595                              [VppRoutePath(p.tun_if.remote_ip6,
1596                                            0xffffffff,
1597                                            proto=DpoProto.DPO_PROTO_IP6)])
1598         p.route.add_vpp_config()
1599         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1600                        [VppRoutePath(p.tun_if.remote_ip4,
1601                                      0xffffffff)])
1602         r.add_vpp_config()
1603
1604     def unconfig_network(self, p):
1605         p.route.remove_vpp_config()
1606         p.tun_if.remove_vpp_config()
1607
1608     def unconfig_protect(self, p):
1609         p.tun_protect.remove_vpp_config()
1610
1611     def unconfig_sa(self, p):
1612         p.tun_sa_out.remove_vpp_config()
1613         p.tun_sa_in.remove_vpp_config()
1614
1615
1616 class TestIpsec6TunProtect(TemplateIpsec,
1617                            TemplateIpsec6TunProtect,
1618                            IpsecTun6):
1619     """ IPsec IPv6 Tunnel protect - transport mode"""
1620
1621     encryption_type = ESP
1622     tun6_encrypt_node_name = "esp6-encrypt-tun"
1623     tun6_decrypt_node_name = "esp6-decrypt-tun"
1624
1625     def setUp(self):
1626         super(TestIpsec6TunProtect, self).setUp()
1627
1628         self.tun_if = self.pg0
1629
1630     def tearDown(self):
1631         super(TestIpsec6TunProtect, self).tearDown()
1632
1633     def test_tun_66(self):
1634         """IPSEC tunnel protect 6o6"""
1635
1636         p = self.ipv6_params
1637
1638         self.config_network(p)
1639         self.config_sa_tra(p)
1640         self.config_protect(p)
1641
1642         self.verify_tun_66(p, count=127)
1643         c = p.tun_if.get_rx_stats()
1644         self.assertEqual(c['packets'], 127)
1645         c = p.tun_if.get_tx_stats()
1646         self.assertEqual(c['packets'], 127)
1647
1648         # rekey - create new SAs and update the tunnel protection
1649         np = copy.copy(p)
1650         np.crypt_key = b'X' + p.crypt_key[1:]
1651         np.scapy_tun_spi += 100
1652         np.scapy_tun_sa_id += 1
1653         np.vpp_tun_spi += 100
1654         np.vpp_tun_sa_id += 1
1655         np.tun_if.local_spi = p.vpp_tun_spi
1656         np.tun_if.remote_spi = p.scapy_tun_spi
1657
1658         self.config_sa_tra(np)
1659         self.config_protect(np)
1660         self.unconfig_sa(p)
1661
1662         self.verify_tun_66(np, count=127)
1663         c = p.tun_if.get_rx_stats()
1664         self.assertEqual(c['packets'], 254)
1665         c = p.tun_if.get_tx_stats()
1666         self.assertEqual(c['packets'], 254)
1667
1668         # bounce the interface state
1669         p.tun_if.admin_down()
1670         self.verify_drop_tun_66(np, count=127)
1671         node = ('/err/ipsec6-tun-input/%s' %
1672                 'ipsec packets received on disabled interface')
1673         self.assertEqual(127, self.statistics.get_err_counter(node))
1674         p.tun_if.admin_up()
1675         self.verify_tun_66(np, count=127)
1676
1677         # 3 phase rekey
1678         #  1) add two input SAs [old, new]
1679         #  2) swap output SA to [new]
1680         #  3) use only [new] input SA
1681         np3 = copy.copy(np)
1682         np3.crypt_key = b'Z' + p.crypt_key[1:]
1683         np3.scapy_tun_spi += 100
1684         np3.scapy_tun_sa_id += 1
1685         np3.vpp_tun_spi += 100
1686         np3.vpp_tun_sa_id += 1
1687         np3.tun_if.local_spi = p.vpp_tun_spi
1688         np3.tun_if.remote_spi = p.scapy_tun_spi
1689
1690         self.config_sa_tra(np3)
1691
1692         # step 1;
1693         p.tun_protect.update_vpp_config(np.tun_sa_out,
1694                                         [np.tun_sa_in, np3.tun_sa_in])
1695         self.verify_tun_66(np, np, count=127)
1696         self.verify_tun_66(np3, np, count=127)
1697
1698         # step 2;
1699         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1700                                         [np.tun_sa_in, np3.tun_sa_in])
1701         self.verify_tun_66(np, np3, count=127)
1702         self.verify_tun_66(np3, np3, count=127)
1703
1704         # step 1;
1705         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1706                                         [np3.tun_sa_in])
1707         self.verify_tun_66(np3, np3, count=127)
1708         self.verify_drop_tun_66(np, count=127)
1709
1710         c = p.tun_if.get_rx_stats()
1711         self.assertEqual(c['packets'], 127*9)
1712         c = p.tun_if.get_tx_stats()
1713         self.assertEqual(c['packets'], 127*8)
1714         self.unconfig_sa(np)
1715
1716         # teardown
1717         self.unconfig_protect(np3)
1718         self.unconfig_sa(np3)
1719         self.unconfig_network(p)
1720
1721     def test_tun_46(self):
1722         """IPSEC tunnel protect 4o6"""
1723
1724         p = self.ipv6_params
1725
1726         self.config_network(p)
1727         self.config_sa_tra(p)
1728         self.config_protect(p)
1729
1730         self.verify_tun_46(p, count=127)
1731         c = p.tun_if.get_rx_stats()
1732         self.assertEqual(c['packets'], 127)
1733         c = p.tun_if.get_tx_stats()
1734         self.assertEqual(c['packets'], 127)
1735
1736         # teardown
1737         self.unconfig_protect(p)
1738         self.unconfig_sa(p)
1739         self.unconfig_network(p)
1740
1741
1742 class TestIpsec6TunProtectTun(TemplateIpsec,
1743                               TemplateIpsec6TunProtect,
1744                               IpsecTun6):
1745     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1746
1747     encryption_type = ESP
1748     tun6_encrypt_node_name = "esp6-encrypt-tun"
1749     tun6_decrypt_node_name = "esp6-decrypt-tun"
1750
1751     def setUp(self):
1752         super(TestIpsec6TunProtectTun, self).setUp()
1753
1754         self.tun_if = self.pg0
1755
1756     def tearDown(self):
1757         super(TestIpsec6TunProtectTun, self).tearDown()
1758
1759     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1760                           payload_size=100):
1761         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1762                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1763                                 dst=sw_intf.local_ip6) /
1764                            IPv6(src=src, dst=dst) /
1765                            UDP(sport=1166, dport=2233) /
1766                            Raw(b'X' * payload_size))
1767                 for i in range(count)]
1768
1769     def gen_pkts6(self, sw_intf, src, dst, count=1,
1770                   payload_size=100):
1771         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1772                 IPv6(src=src, dst=dst) /
1773                 UDP(sport=1166, dport=2233) /
1774                 Raw(b'X' * payload_size)
1775                 for i in range(count)]
1776
1777     def verify_decrypted6(self, p, rxs):
1778         for rx in rxs:
1779             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1780             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1781             self.assert_packet_checksums_valid(rx)
1782
1783     def verify_encrypted6(self, p, sa, rxs):
1784         for rx in rxs:
1785             try:
1786                 pkt = sa.decrypt(rx[IPv6])
1787                 if not pkt.haslayer(IPv6):
1788                     pkt = IPv6(pkt[Raw].load)
1789                 self.assert_packet_checksums_valid(pkt)
1790                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1791                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1792                 inner = pkt[IPv6].payload
1793                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1794
1795             except (IndexError, AssertionError):
1796                 self.logger.debug(ppp("Unexpected packet:", rx))
1797                 try:
1798                     self.logger.debug(ppp("Decrypted packet:", pkt))
1799                 except:
1800                     pass
1801                 raise
1802
1803     def test_tun_66(self):
1804         """IPSEC tunnel protect """
1805
1806         p = self.ipv6_params
1807
1808         self.config_network(p)
1809         self.config_sa_tun(p)
1810         self.config_protect(p)
1811
1812         self.verify_tun_66(p, count=127)
1813
1814         c = p.tun_if.get_rx_stats()
1815         self.assertEqual(c['packets'], 127)
1816         c = p.tun_if.get_tx_stats()
1817         self.assertEqual(c['packets'], 127)
1818
1819         # rekey - create new SAs and update the tunnel protection
1820         np = copy.copy(p)
1821         np.crypt_key = b'X' + p.crypt_key[1:]
1822         np.scapy_tun_spi += 100
1823         np.scapy_tun_sa_id += 1
1824         np.vpp_tun_spi += 100
1825         np.vpp_tun_sa_id += 1
1826         np.tun_if.local_spi = p.vpp_tun_spi
1827         np.tun_if.remote_spi = p.scapy_tun_spi
1828
1829         self.config_sa_tun(np)
1830         self.config_protect(np)
1831         self.unconfig_sa(p)
1832
1833         self.verify_tun_66(np, count=127)
1834         c = p.tun_if.get_rx_stats()
1835         self.assertEqual(c['packets'], 254)
1836         c = p.tun_if.get_tx_stats()
1837         self.assertEqual(c['packets'], 254)
1838
1839         # teardown
1840         self.unconfig_protect(np)
1841         self.unconfig_sa(np)
1842         self.unconfig_network(p)
1843
1844
1845 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
1846                                   TemplateIpsec6TunProtect,
1847                                   IpsecTun6):
1848     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
1849
1850     encryption_type = ESP
1851     tun6_encrypt_node_name = "esp6-encrypt-tun"
1852     tun6_decrypt_node_name = "esp6-decrypt-tun"
1853
1854     def setUp(self):
1855         super(TestIpsec6TunProtectTunDrop, self).setUp()
1856
1857         self.tun_if = self.pg0
1858
1859     def tearDown(self):
1860         super(TestIpsec6TunProtectTunDrop, self).tearDown()
1861
1862     def gen_encrypt_pkts5(self, sa, sw_intf, src, dst, count=1,
1863                           payload_size=100):
1864         # the IP destination of the revelaed packet does not match
1865         # that assigned to the tunnel
1866         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1867                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1868                                 dst="5::5") /
1869                            IPv6(src=src, dst=dst) /
1870                            UDP(sport=1144, dport=2233) /
1871                            Raw(b'X' * payload_size))
1872                 for i in range(count)]
1873
1874     def test_tun_drop_66(self):
1875         """IPSEC 6 tunnel protect bogus tunnel header """
1876
1877         p = self.ipv6_params
1878
1879         self.config_network(p)
1880         self.config_sa_tun(p)
1881         self.config_protect(p)
1882
1883         tx = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1884                                     src=p.remote_tun_if_host,
1885                                     dst=self.pg1.remote_ip6,
1886                                     count=63)
1887         self.send_and_assert_no_replies(self.tun_if, tx)
1888
1889         self.unconfig_protect(p)
1890         self.unconfig_sa(p)
1891         self.unconfig_network(p)
1892
1893
1894 if __name__ == '__main__':
1895     unittest.main(testRunner=VppTestRunner)