57c90f93ca8095544145b71c64073887706b6b2a
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from util import ppp
22 from vpp_papi import VppEnum
23
24
25 def config_tun_params(p, encryption_type, tun_if):
26     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
27     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
28                              IPSEC_API_SAD_FLAG_USE_ESN))
29     crypt_key = mk_scapy_crypt_key(p)
30     p.tun_dst = tun_if.remote_ip
31     p.tun_src = tun_if.local_ip
32     p.scapy_tun_sa = SecurityAssociation(
33         encryption_type, spi=p.vpp_tun_spi,
34         crypt_algo=p.crypt_algo,
35         crypt_key=crypt_key,
36         auth_algo=p.auth_algo, auth_key=p.auth_key,
37         tunnel_header=ip_class_by_addr_type[p.addr_type](
38             src=p.tun_dst,
39             dst=p.tun_src),
40         nat_t_header=p.nat_header,
41         esn_en=esn_en)
42     p.vpp_tun_sa = SecurityAssociation(
43         encryption_type, spi=p.scapy_tun_spi,
44         crypt_algo=p.crypt_algo,
45         crypt_key=crypt_key,
46         auth_algo=p.auth_algo, auth_key=p.auth_key,
47         tunnel_header=ip_class_by_addr_type[p.addr_type](
48             dst=p.tun_dst,
49             src=p.tun_src),
50         nat_t_header=p.nat_header,
51         esn_en=esn_en)
52
53
54 def config_tra_params(p, encryption_type, tun_if):
55     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
56     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
57                              IPSEC_API_SAD_FLAG_USE_ESN))
58     crypt_key = mk_scapy_crypt_key(p)
59     p.tun_dst = tun_if.remote_ip
60     p.tun_src = tun_if.local_ip
61     p.scapy_tun_sa = SecurityAssociation(
62         encryption_type, spi=p.vpp_tun_spi,
63         crypt_algo=p.crypt_algo,
64         crypt_key=crypt_key,
65         auth_algo=p.auth_algo, auth_key=p.auth_key,
66         esn_en=esn_en)
67     p.vpp_tun_sa = SecurityAssociation(
68         encryption_type, spi=p.scapy_tun_spi,
69         crypt_algo=p.crypt_algo,
70         crypt_key=crypt_key,
71         auth_algo=p.auth_algo, auth_key=p.auth_key,
72         esn_en=esn_en)
73
74
75 class TemplateIpsec4TunIfEsp(TemplateIpsec):
76     """ IPsec tunnel interface tests """
77
78     encryption_type = ESP
79
80     @classmethod
81     def setUpClass(cls):
82         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
83
84     @classmethod
85     def tearDownClass(cls):
86         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
87
88     def setUp(self):
89         super(TemplateIpsec4TunIfEsp, self).setUp()
90
91         self.tun_if = self.pg0
92
93         p = self.ipv4_params
94
95         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
96                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
97                                         p.crypt_key, p.crypt_key,
98                                         p.auth_algo_vpp_id, p.auth_key,
99                                         p.auth_key)
100         p.tun_if.add_vpp_config()
101         p.tun_if.admin_up()
102         p.tun_if.config_ip4()
103         p.tun_if.config_ip6()
104         config_tun_params(p, self.encryption_type, p.tun_if)
105
106         r = VppIpRoute(self, p.remote_tun_if_host, 32,
107                        [VppRoutePath(p.tun_if.remote_ip4,
108                                      0xffffffff)])
109         r.add_vpp_config()
110         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
111                        [VppRoutePath(p.tun_if.remote_ip6,
112                                      0xffffffff,
113                                      proto=DpoProto.DPO_PROTO_IP6)])
114         r.add_vpp_config()
115
116     def tearDown(self):
117         super(TemplateIpsec4TunIfEsp, self).tearDown()
118
119
120 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
121     """ IPsec UDP tunnel interface tests """
122
123     tun4_encrypt_node_name = "esp4-encrypt-tun"
124     tun4_decrypt_node_name = "esp4-decrypt-tun"
125     encryption_type = ESP
126
127     @classmethod
128     def setUpClass(cls):
129         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
130
131     @classmethod
132     def tearDownClass(cls):
133         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
134
135     def setUp(self):
136         super(TemplateIpsec4TunIfEspUdp, self).setUp()
137
138         self.tun_if = self.pg0
139
140         p = self.ipv4_params
141         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
142                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
143         p.nat_header = UDP(sport=5454, dport=4500)
144
145         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
146                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
147                                         p.crypt_key, p.crypt_key,
148                                         p.auth_algo_vpp_id, p.auth_key,
149                                         p.auth_key, udp_encap=True)
150         p.tun_if.add_vpp_config()
151         p.tun_if.admin_up()
152         p.tun_if.config_ip4()
153         p.tun_if.config_ip6()
154         config_tun_params(p, self.encryption_type, p.tun_if)
155
156         r = VppIpRoute(self, p.remote_tun_if_host, 32,
157                        [VppRoutePath(p.tun_if.remote_ip4,
158                                      0xffffffff)])
159         r.add_vpp_config()
160         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
161                        [VppRoutePath(p.tun_if.remote_ip6,
162                                      0xffffffff,
163                                      proto=DpoProto.DPO_PROTO_IP6)])
164         r.add_vpp_config()
165
166     def tearDown(self):
167         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
168
169
170 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
171     """ Ipsec ESP - TUN tests """
172     tun4_encrypt_node_name = "esp4-encrypt-tun"
173     tun4_decrypt_node_name = "esp4-decrypt-tun"
174
175     def test_tun_basic64(self):
176         """ ipsec 6o4 tunnel basic test """
177         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
178
179         self.verify_tun_64(self.params[socket.AF_INET], count=1)
180
181     def test_tun_burst64(self):
182         """ ipsec 6o4 tunnel basic test """
183         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
184
185         self.verify_tun_64(self.params[socket.AF_INET], count=257)
186
187     def test_tun_basic_frag44(self):
188         """ ipsec 4o4 tunnel frag basic test """
189         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
190
191         p = self.ipv4_params
192
193         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
194                                        [1500, 0, 0, 0])
195         self.verify_tun_44(self.params[socket.AF_INET],
196                            count=1, payload_size=1800, n_rx=2)
197         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
198                                        [9000, 0, 0, 0])
199
200
201 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
202     """ Ipsec ESP UDP tests """
203
204     tun4_input_node = "ipsec4-tun-input"
205
206     def test_keepalive(self):
207         """ IPSEC NAT Keepalive """
208         self.verify_keepalive(self.ipv4_params)
209
210
211 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
212     """ Ipsec ESP - TCP tests """
213     pass
214
215
216 class TemplateIpsec6TunIfEsp(TemplateIpsec):
217     """ IPsec tunnel interface tests """
218
219     encryption_type = ESP
220
221     def setUp(self):
222         super(TemplateIpsec6TunIfEsp, self).setUp()
223
224         self.tun_if = self.pg0
225
226         p = self.ipv6_params
227         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
228                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
229                                         p.crypt_key, p.crypt_key,
230                                         p.auth_algo_vpp_id, p.auth_key,
231                                         p.auth_key, is_ip6=True)
232         p.tun_if.add_vpp_config()
233         p.tun_if.admin_up()
234         p.tun_if.config_ip6()
235         p.tun_if.config_ip4()
236         config_tun_params(p, self.encryption_type, p.tun_if)
237
238         r = VppIpRoute(self, p.remote_tun_if_host, 128,
239                        [VppRoutePath(p.tun_if.remote_ip6,
240                                      0xffffffff,
241                                      proto=DpoProto.DPO_PROTO_IP6)])
242         r.add_vpp_config()
243         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
244                        [VppRoutePath(p.tun_if.remote_ip4,
245                                      0xffffffff)])
246         r.add_vpp_config()
247
248     def tearDown(self):
249         super(TemplateIpsec6TunIfEsp, self).tearDown()
250
251
252 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
253                           IpsecTun6Tests):
254     """ Ipsec ESP - TUN tests """
255     tun6_encrypt_node_name = "esp6-encrypt-tun"
256     tun6_decrypt_node_name = "esp6-decrypt-tun"
257
258     def test_tun_basic46(self):
259         """ ipsec 4o6 tunnel basic test """
260         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
261         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
262
263     def test_tun_burst46(self):
264         """ ipsec 4o6 tunnel burst test """
265         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
266         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
267
268
269 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
270                                 IpsecTun6HandoffTests):
271     """ Ipsec ESP 6 Handoff tests """
272     tun6_encrypt_node_name = "esp6-encrypt-tun"
273     tun6_decrypt_node_name = "esp6-decrypt-tun"
274
275
276 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
277                                 IpsecTun4HandoffTests):
278     """ Ipsec ESP 4 Handoff tests """
279     tun4_encrypt_node_name = "esp4-encrypt-tun"
280     tun4_decrypt_node_name = "esp4-decrypt-tun"
281
282
283 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
284     """ IPsec IPv4 Multi Tunnel interface """
285
286     encryption_type = ESP
287     tun4_encrypt_node_name = "esp4-encrypt-tun"
288     tun4_decrypt_node_name = "esp4-decrypt-tun"
289
290     def setUp(self):
291         super(TestIpsec4MultiTunIfEsp, self).setUp()
292
293         self.tun_if = self.pg0
294
295         self.multi_params = []
296         self.pg0.generate_remote_hosts(10)
297         self.pg0.configure_ipv4_neighbors()
298
299         for ii in range(10):
300             p = copy.copy(self.ipv4_params)
301
302             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
303             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
304             p.scapy_tun_spi = p.scapy_tun_spi + ii
305             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
306             p.vpp_tun_spi = p.vpp_tun_spi + ii
307
308             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
309             p.scapy_tra_spi = p.scapy_tra_spi + ii
310             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
311             p.vpp_tra_spi = p.vpp_tra_spi + ii
312             p.tun_dst = self.pg0.remote_hosts[ii].ip4
313
314             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
315                                             p.scapy_tun_spi,
316                                             p.crypt_algo_vpp_id,
317                                             p.crypt_key, p.crypt_key,
318                                             p.auth_algo_vpp_id, p.auth_key,
319                                             p.auth_key,
320                                             dst=p.tun_dst)
321             p.tun_if.add_vpp_config()
322             p.tun_if.admin_up()
323             p.tun_if.config_ip4()
324             config_tun_params(p, self.encryption_type, p.tun_if)
325             self.multi_params.append(p)
326
327             VppIpRoute(self, p.remote_tun_if_host, 32,
328                        [VppRoutePath(p.tun_if.remote_ip4,
329                                      0xffffffff)]).add_vpp_config()
330
331     def tearDown(self):
332         super(TestIpsec4MultiTunIfEsp, self).tearDown()
333
334     def test_tun_44(self):
335         """Multiple IPSEC tunnel interfaces """
336         for p in self.multi_params:
337             self.verify_tun_44(p, count=127)
338             c = p.tun_if.get_rx_stats()
339             self.assertEqual(c['packets'], 127)
340             c = p.tun_if.get_tx_stats()
341             self.assertEqual(c['packets'], 127)
342
343     def test_tun_rr_44(self):
344         """ Round-robin packets acrros multiple interface """
345         tx = []
346         for p in self.multi_params:
347             tx = tx + self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
348                                             src=p.remote_tun_if_host,
349                                             dst=self.pg1.remote_ip4)
350         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
351
352         for rx, p in zip(rxs, self.multi_params):
353             self.verify_decrypted(p, [rx])
354
355         tx = []
356         for p in self.multi_params:
357             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
358                                     dst=p.remote_tun_if_host)
359         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
360
361         for rx, p in zip(rxs, self.multi_params):
362             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
363
364
365 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
366     """ IPsec IPv4 Tunnel interface all Algos """
367
368     encryption_type = ESP
369     tun4_encrypt_node_name = "esp4-encrypt-tun"
370     tun4_decrypt_node_name = "esp4-decrypt-tun"
371
372     def config_network(self, p):
373
374         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
375                                         p.scapy_tun_spi,
376                                         p.crypt_algo_vpp_id,
377                                         p.crypt_key, p.crypt_key,
378                                         p.auth_algo_vpp_id, p.auth_key,
379                                         p.auth_key,
380                                         salt=p.salt)
381         p.tun_if.add_vpp_config()
382         p.tun_if.admin_up()
383         p.tun_if.config_ip4()
384         config_tun_params(p, self.encryption_type, p.tun_if)
385         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
386         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
387
388         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
389                              [VppRoutePath(p.tun_if.remote_ip4,
390                                            0xffffffff)])
391         p.route.add_vpp_config()
392
393     def unconfig_network(self, p):
394         p.tun_if.unconfig_ip4()
395         p.tun_if.remove_vpp_config()
396         p.route.remove_vpp_config()
397
398     def setUp(self):
399         super(TestIpsec4TunIfEspAll, self).setUp()
400
401         self.tun_if = self.pg0
402
403     def tearDown(self):
404         super(TestIpsec4TunIfEspAll, self).tearDown()
405
406     def rekey(self, p):
407         #
408         # change the key and the SPI
409         #
410         p.crypt_key = b'X' + p.crypt_key[1:]
411         p.scapy_tun_spi += 1
412         p.scapy_tun_sa_id += 1
413         p.vpp_tun_spi += 1
414         p.vpp_tun_sa_id += 1
415         p.tun_if.local_spi = p.vpp_tun_spi
416         p.tun_if.remote_spi = p.scapy_tun_spi
417
418         config_tun_params(p, self.encryption_type, p.tun_if)
419
420         p.tun_sa_in = VppIpsecSA(self,
421                                  p.scapy_tun_sa_id,
422                                  p.scapy_tun_spi,
423                                  p.auth_algo_vpp_id,
424                                  p.auth_key,
425                                  p.crypt_algo_vpp_id,
426                                  p.crypt_key,
427                                  self.vpp_esp_protocol,
428                                  flags=p.flags,
429                                  salt=p.salt)
430         p.tun_sa_out = VppIpsecSA(self,
431                                   p.vpp_tun_sa_id,
432                                   p.vpp_tun_spi,
433                                   p.auth_algo_vpp_id,
434                                   p.auth_key,
435                                   p.crypt_algo_vpp_id,
436                                   p.crypt_key,
437                                   self.vpp_esp_protocol,
438                                   flags=p.flags,
439                                   salt=p.salt)
440         p.tun_sa_in.add_vpp_config()
441         p.tun_sa_out.add_vpp_config()
442
443         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
444                                          sa_id=p.tun_sa_in.id,
445                                          is_outbound=1)
446         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
447                                          sa_id=p.tun_sa_out.id,
448                                          is_outbound=0)
449         self.logger.info(self.vapi.cli("sh ipsec sa"))
450
451     def test_tun_44(self):
452         """IPSEC tunnel all algos """
453
454         # foreach VPP crypto engine
455         engines = ["ia32", "ipsecmb", "openssl"]
456
457         # foreach crypto algorithm
458         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
459                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
460                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
461                                 IPSEC_API_INTEG_ALG_NONE),
462                   'scapy-crypto': "AES-GCM",
463                   'scapy-integ': "NULL",
464                   'key': b"JPjyOWBeVEQiMe7h",
465                   'salt': 3333},
466                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
467                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
468                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
469                                 IPSEC_API_INTEG_ALG_NONE),
470                   'scapy-crypto': "AES-GCM",
471                   'scapy-integ': "NULL",
472                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
473                   'salt': 0},
474                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
475                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
476                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
477                                 IPSEC_API_INTEG_ALG_NONE),
478                   'scapy-crypto': "AES-GCM",
479                   'scapy-integ': "NULL",
480                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
481                   'salt': 9999},
482                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
483                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
484                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
485                                 IPSEC_API_INTEG_ALG_SHA1_96),
486                   'scapy-crypto': "AES-CBC",
487                   'scapy-integ': "HMAC-SHA1-96",
488                   'salt': 0,
489                   'key': b"JPjyOWBeVEQiMe7h"},
490                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
491                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
492                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
493                                 IPSEC_API_INTEG_ALG_SHA1_96),
494                   'scapy-crypto': "AES-CBC",
495                   'scapy-integ': "HMAC-SHA1-96",
496                   'salt': 0,
497                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
498                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
499                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
500                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
501                                 IPSEC_API_INTEG_ALG_SHA1_96),
502                   'scapy-crypto': "AES-CBC",
503                   'scapy-integ': "HMAC-SHA1-96",
504                   'salt': 0,
505                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
506                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
507                                  IPSEC_API_CRYPTO_ALG_NONE),
508                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
509                                 IPSEC_API_INTEG_ALG_SHA1_96),
510                   'scapy-crypto': "NULL",
511                   'scapy-integ': "HMAC-SHA1-96",
512                   'salt': 0,
513                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
514
515         for engine in engines:
516             self.vapi.cli("set crypto handler all %s" % engine)
517
518             #
519             # loop through each of the algorithms
520             #
521             for algo in algos:
522                 # with self.subTest(algo=algo['scapy']):
523
524                 p = copy.copy(self.ipv4_params)
525                 p.auth_algo_vpp_id = algo['vpp-integ']
526                 p.crypt_algo_vpp_id = algo['vpp-crypto']
527                 p.crypt_algo = algo['scapy-crypto']
528                 p.auth_algo = algo['scapy-integ']
529                 p.crypt_key = algo['key']
530                 p.salt = algo['salt']
531
532                 self.config_network(p)
533
534                 self.verify_tun_44(p, count=127)
535                 c = p.tun_if.get_rx_stats()
536                 self.assertEqual(c['packets'], 127)
537                 c = p.tun_if.get_tx_stats()
538                 self.assertEqual(c['packets'], 127)
539
540                 #
541                 # rekey the tunnel
542                 #
543                 self.rekey(p)
544                 self.verify_tun_44(p, count=127)
545
546                 self.unconfig_network(p)
547                 p.tun_sa_out.remove_vpp_config()
548                 p.tun_sa_in.remove_vpp_config()
549
550
551 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
552     """ IPsec IPv4 Tunnel interface all Algos """
553
554     encryption_type = ESP
555     tun4_encrypt_node_name = "esp4-encrypt-tun"
556     tun4_decrypt_node_name = "esp4-decrypt-tun"
557
558     def config_network(self, p):
559
560         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
561                               IPSEC_API_INTEG_ALG_NONE)
562         p.auth_algo = 'NULL'
563         p.auth_key = []
564
565         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
566                                IPSEC_API_CRYPTO_ALG_NONE)
567         p.crypt_algo = 'NULL'
568         p.crypt_key = []
569
570         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
571                                         p.scapy_tun_spi,
572                                         p.crypt_algo_vpp_id,
573                                         p.crypt_key, p.crypt_key,
574                                         p.auth_algo_vpp_id, p.auth_key,
575                                         p.auth_key,
576                                         salt=p.salt)
577         p.tun_if.add_vpp_config()
578         p.tun_if.admin_up()
579         p.tun_if.config_ip4()
580         config_tun_params(p, self.encryption_type, p.tun_if)
581         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
582         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
583
584         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
585                              [VppRoutePath(p.tun_if.remote_ip4,
586                                            0xffffffff)])
587         p.route.add_vpp_config()
588
589     def unconfig_network(self, p):
590         p.tun_if.unconfig_ip4()
591         p.tun_if.remove_vpp_config()
592         p.route.remove_vpp_config()
593
594     def setUp(self):
595         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
596
597         self.tun_if = self.pg0
598
599     def tearDown(self):
600         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
601
602     def test_tun_44(self):
603         p = self.ipv4_params
604
605         self.config_network(p)
606
607         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
608                            dst=p.remote_tun_if_host)
609         self.send_and_assert_no_replies(self.pg1, tx)
610
611         self.unconfig_network(p)
612
613
614 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
615     """ IPsec IPv6 Multi Tunnel interface """
616
617     encryption_type = ESP
618     tun6_encrypt_node_name = "esp6-encrypt-tun"
619     tun6_decrypt_node_name = "esp6-decrypt-tun"
620
621     def setUp(self):
622         super(TestIpsec6MultiTunIfEsp, self).setUp()
623
624         self.tun_if = self.pg0
625
626         self.multi_params = []
627         self.pg0.generate_remote_hosts(10)
628         self.pg0.configure_ipv6_neighbors()
629
630         for ii in range(10):
631             p = copy.copy(self.ipv6_params)
632
633             p.remote_tun_if_host = "1111::%d" % (ii + 1)
634             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
635             p.scapy_tun_spi = p.scapy_tun_spi + ii
636             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
637             p.vpp_tun_spi = p.vpp_tun_spi + ii
638
639             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
640             p.scapy_tra_spi = p.scapy_tra_spi + ii
641             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
642             p.vpp_tra_spi = p.vpp_tra_spi + ii
643
644             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
645                                             p.scapy_tun_spi,
646                                             p.crypt_algo_vpp_id,
647                                             p.crypt_key, p.crypt_key,
648                                             p.auth_algo_vpp_id, p.auth_key,
649                                             p.auth_key, is_ip6=True,
650                                             dst=self.pg0.remote_hosts[ii].ip6)
651             p.tun_if.add_vpp_config()
652             p.tun_if.admin_up()
653             p.tun_if.config_ip6()
654             config_tun_params(p, self.encryption_type, p.tun_if)
655             self.multi_params.append(p)
656
657             r = VppIpRoute(self, p.remote_tun_if_host, 128,
658                            [VppRoutePath(p.tun_if.remote_ip6,
659                                          0xffffffff,
660                                          proto=DpoProto.DPO_PROTO_IP6)])
661             r.add_vpp_config()
662
663     def tearDown(self):
664         super(TestIpsec6MultiTunIfEsp, self).tearDown()
665
666     def test_tun_66(self):
667         """Multiple IPSEC tunnel interfaces """
668         for p in self.multi_params:
669             self.verify_tun_66(p, count=127)
670             c = p.tun_if.get_rx_stats()
671             self.assertEqual(c['packets'], 127)
672             c = p.tun_if.get_tx_stats()
673             self.assertEqual(c['packets'], 127)
674
675
676 class TestIpsecGreTebIfEsp(TemplateIpsec,
677                            IpsecTun4Tests):
678     """ Ipsec GRE TEB ESP - TUN tests """
679     tun4_encrypt_node_name = "esp4-encrypt-tun"
680     tun4_decrypt_node_name = "esp4-decrypt-tun"
681     encryption_type = ESP
682     omac = "00:11:22:33:44:55"
683
684     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
685                          payload_size=100):
686         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
687                 sa.encrypt(IP(src=self.pg0.remote_ip4,
688                               dst=self.pg0.local_ip4) /
689                            GRE() /
690                            Ether(dst=self.omac) /
691                            IP(src="1.1.1.1", dst="1.1.1.2") /
692                            UDP(sport=1144, dport=2233) /
693                            Raw(b'X' * payload_size))
694                 for i in range(count)]
695
696     def gen_pkts(self, sw_intf, src, dst, count=1,
697                  payload_size=100):
698         return [Ether(dst=self.omac) /
699                 IP(src="1.1.1.1", dst="1.1.1.2") /
700                 UDP(sport=1144, dport=2233) /
701                 Raw(b'X' * payload_size)
702                 for i in range(count)]
703
704     def verify_decrypted(self, p, rxs):
705         for rx in rxs:
706             self.assert_equal(rx[Ether].dst, self.omac)
707             self.assert_equal(rx[IP].dst, "1.1.1.2")
708
709     def verify_encrypted(self, p, sa, rxs):
710         for rx in rxs:
711             try:
712                 pkt = sa.decrypt(rx[IP])
713                 if not pkt.haslayer(IP):
714                     pkt = IP(pkt[Raw].load)
715                 self.assert_packet_checksums_valid(pkt)
716                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
717                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
718                 self.assertTrue(pkt.haslayer(GRE))
719                 e = pkt[Ether]
720                 self.assertEqual(e[Ether].dst, self.omac)
721                 self.assertEqual(e[IP].dst, "1.1.1.2")
722             except (IndexError, AssertionError):
723                 self.logger.debug(ppp("Unexpected packet:", rx))
724                 try:
725                     self.logger.debug(ppp("Decrypted packet:", pkt))
726                 except:
727                     pass
728                 raise
729
730     def setUp(self):
731         super(TestIpsecGreTebIfEsp, self).setUp()
732
733         self.tun_if = self.pg0
734
735         p = self.ipv4_params
736
737         bd1 = VppBridgeDomain(self, 1)
738         bd1.add_vpp_config()
739
740         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
741                                   p.auth_algo_vpp_id, p.auth_key,
742                                   p.crypt_algo_vpp_id, p.crypt_key,
743                                   self.vpp_esp_protocol,
744                                   self.pg0.local_ip4,
745                                   self.pg0.remote_ip4)
746         p.tun_sa_out.add_vpp_config()
747
748         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
749                                  p.auth_algo_vpp_id, p.auth_key,
750                                  p.crypt_algo_vpp_id, p.crypt_key,
751                                  self.vpp_esp_protocol,
752                                  self.pg0.remote_ip4,
753                                  self.pg0.local_ip4)
754         p.tun_sa_in.add_vpp_config()
755
756         p.tun_if = VppGreInterface(self,
757                                    self.pg0.local_ip4,
758                                    self.pg0.remote_ip4,
759                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
760                                          GRE_API_TUNNEL_TYPE_TEB))
761         p.tun_if.add_vpp_config()
762
763         p.tun_protect = VppIpsecTunProtect(self,
764                                            p.tun_if,
765                                            p.tun_sa_out,
766                                            [p.tun_sa_in])
767
768         p.tun_protect.add_vpp_config()
769
770         p.tun_if.admin_up()
771         p.tun_if.config_ip4()
772         config_tun_params(p, self.encryption_type, p.tun_if)
773
774         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
775         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
776
777         self.vapi.cli("clear ipsec sa")
778
779     def tearDown(self):
780         p = self.ipv4_params
781         p.tun_if.unconfig_ip4()
782         super(TestIpsecGreTebIfEsp, self).tearDown()
783
784
785 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
786                                IpsecTun4Tests):
787     """ Ipsec GRE TEB ESP - TUN tests """
788     tun4_encrypt_node_name = "esp4-encrypt-tun"
789     tun4_decrypt_node_name = "esp4-decrypt-tun"
790     encryption_type = ESP
791     omac = "00:11:22:33:44:55"
792
793     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
794                          payload_size=100):
795         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
796                 sa.encrypt(IP(src=self.pg0.remote_ip4,
797                               dst=self.pg0.local_ip4) /
798                            GRE() /
799                            Ether(dst=self.omac) /
800                            IP(src="1.1.1.1", dst="1.1.1.2") /
801                            UDP(sport=1144, dport=2233) /
802                            Raw(b'X' * payload_size))
803                 for i in range(count)]
804
805     def gen_pkts(self, sw_intf, src, dst, count=1,
806                  payload_size=100):
807         return [Ether(dst=self.omac) /
808                 Dot1Q(vlan=11) /
809                 IP(src="1.1.1.1", dst="1.1.1.2") /
810                 UDP(sport=1144, dport=2233) /
811                 Raw(b'X' * payload_size)
812                 for i in range(count)]
813
814     def verify_decrypted(self, p, rxs):
815         for rx in rxs:
816             self.assert_equal(rx[Ether].dst, self.omac)
817             self.assert_equal(rx[Dot1Q].vlan, 11)
818             self.assert_equal(rx[IP].dst, "1.1.1.2")
819
820     def verify_encrypted(self, p, sa, rxs):
821         for rx in rxs:
822             try:
823                 pkt = sa.decrypt(rx[IP])
824                 if not pkt.haslayer(IP):
825                     pkt = IP(pkt[Raw].load)
826                 self.assert_packet_checksums_valid(pkt)
827                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
828                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
829                 self.assertTrue(pkt.haslayer(GRE))
830                 e = pkt[Ether]
831                 self.assertEqual(e[Ether].dst, self.omac)
832                 self.assertFalse(e.haslayer(Dot1Q))
833                 self.assertEqual(e[IP].dst, "1.1.1.2")
834             except (IndexError, AssertionError):
835                 self.logger.debug(ppp("Unexpected packet:", rx))
836                 try:
837                     self.logger.debug(ppp("Decrypted packet:", pkt))
838                 except:
839                     pass
840                 raise
841
842     def setUp(self):
843         super(TestIpsecGreTebVlanIfEsp, self).setUp()
844
845         self.tun_if = self.pg0
846
847         p = self.ipv4_params
848
849         bd1 = VppBridgeDomain(self, 1)
850         bd1.add_vpp_config()
851
852         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
853         self.vapi.l2_interface_vlan_tag_rewrite(
854             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
855             push_dot1q=11)
856         self.pg1_11.admin_up()
857
858         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
859                                   p.auth_algo_vpp_id, p.auth_key,
860                                   p.crypt_algo_vpp_id, p.crypt_key,
861                                   self.vpp_esp_protocol,
862                                   self.pg0.local_ip4,
863                                   self.pg0.remote_ip4)
864         p.tun_sa_out.add_vpp_config()
865
866         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
867                                  p.auth_algo_vpp_id, p.auth_key,
868                                  p.crypt_algo_vpp_id, p.crypt_key,
869                                  self.vpp_esp_protocol,
870                                  self.pg0.remote_ip4,
871                                  self.pg0.local_ip4)
872         p.tun_sa_in.add_vpp_config()
873
874         p.tun_if = VppGreInterface(self,
875                                    self.pg0.local_ip4,
876                                    self.pg0.remote_ip4,
877                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
878                                          GRE_API_TUNNEL_TYPE_TEB))
879         p.tun_if.add_vpp_config()
880
881         p.tun_protect = VppIpsecTunProtect(self,
882                                            p.tun_if,
883                                            p.tun_sa_out,
884                                            [p.tun_sa_in])
885
886         p.tun_protect.add_vpp_config()
887
888         p.tun_if.admin_up()
889         p.tun_if.config_ip4()
890         config_tun_params(p, self.encryption_type, p.tun_if)
891
892         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
893         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
894
895         self.vapi.cli("clear ipsec sa")
896
897     def tearDown(self):
898         p = self.ipv4_params
899         p.tun_if.unconfig_ip4()
900         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
901         self.pg1_11.admin_down()
902         self.pg1_11.remove_vpp_config()
903
904
905 class TestIpsecGreTebIfEspTra(TemplateIpsec,
906                               IpsecTun4Tests):
907     """ Ipsec GRE TEB ESP - Tra tests """
908     tun4_encrypt_node_name = "esp4-encrypt-tun"
909     tun4_decrypt_node_name = "esp4-decrypt-tun"
910     encryption_type = ESP
911     omac = "00:11:22:33:44:55"
912
913     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
914                          payload_size=100):
915         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
916                 sa.encrypt(IP(src=self.pg0.remote_ip4,
917                               dst=self.pg0.local_ip4) /
918                            GRE() /
919                            Ether(dst=self.omac) /
920                            IP(src="1.1.1.1", dst="1.1.1.2") /
921                            UDP(sport=1144, dport=2233) /
922                            Raw(b'X' * payload_size))
923                 for i in range(count)]
924
925     def gen_pkts(self, sw_intf, src, dst, count=1,
926                  payload_size=100):
927         return [Ether(dst=self.omac) /
928                 IP(src="1.1.1.1", dst="1.1.1.2") /
929                 UDP(sport=1144, dport=2233) /
930                 Raw(b'X' * payload_size)
931                 for i in range(count)]
932
933     def verify_decrypted(self, p, rxs):
934         for rx in rxs:
935             self.assert_equal(rx[Ether].dst, self.omac)
936             self.assert_equal(rx[IP].dst, "1.1.1.2")
937
938     def verify_encrypted(self, p, sa, rxs):
939         for rx in rxs:
940             try:
941                 pkt = sa.decrypt(rx[IP])
942                 if not pkt.haslayer(IP):
943                     pkt = IP(pkt[Raw].load)
944                 self.assert_packet_checksums_valid(pkt)
945                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
946                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
947                 self.assertTrue(pkt.haslayer(GRE))
948                 e = pkt[Ether]
949                 self.assertEqual(e[Ether].dst, self.omac)
950                 self.assertEqual(e[IP].dst, "1.1.1.2")
951             except (IndexError, AssertionError):
952                 self.logger.debug(ppp("Unexpected packet:", rx))
953                 try:
954                     self.logger.debug(ppp("Decrypted packet:", pkt))
955                 except:
956                     pass
957                 raise
958
959     def setUp(self):
960         super(TestIpsecGreTebIfEspTra, self).setUp()
961
962         self.tun_if = self.pg0
963
964         p = self.ipv4_params
965
966         bd1 = VppBridgeDomain(self, 1)
967         bd1.add_vpp_config()
968
969         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
970                                   p.auth_algo_vpp_id, p.auth_key,
971                                   p.crypt_algo_vpp_id, p.crypt_key,
972                                   self.vpp_esp_protocol)
973         p.tun_sa_out.add_vpp_config()
974
975         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
976                                  p.auth_algo_vpp_id, p.auth_key,
977                                  p.crypt_algo_vpp_id, p.crypt_key,
978                                  self.vpp_esp_protocol)
979         p.tun_sa_in.add_vpp_config()
980
981         p.tun_if = VppGreInterface(self,
982                                    self.pg0.local_ip4,
983                                    self.pg0.remote_ip4,
984                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
985                                          GRE_API_TUNNEL_TYPE_TEB))
986         p.tun_if.add_vpp_config()
987
988         p.tun_protect = VppIpsecTunProtect(self,
989                                            p.tun_if,
990                                            p.tun_sa_out,
991                                            [p.tun_sa_in])
992
993         p.tun_protect.add_vpp_config()
994
995         p.tun_if.admin_up()
996         p.tun_if.config_ip4()
997         config_tra_params(p, self.encryption_type, p.tun_if)
998
999         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1000         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1001
1002         self.vapi.cli("clear ipsec sa")
1003
1004     def tearDown(self):
1005         p = self.ipv4_params
1006         p.tun_if.unconfig_ip4()
1007         super(TestIpsecGreTebIfEspTra, self).tearDown()
1008
1009
1010 class TestIpsecGreIfEsp(TemplateIpsec,
1011                         IpsecTun4Tests):
1012     """ Ipsec GRE ESP - TUN tests """
1013     tun4_encrypt_node_name = "esp4-encrypt-tun"
1014     tun4_decrypt_node_name = "esp4-decrypt-tun"
1015     encryption_type = ESP
1016
1017     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1018                          payload_size=100):
1019         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1020                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1021                               dst=self.pg0.local_ip4) /
1022                            GRE() /
1023                            IP(src=self.pg1.local_ip4,
1024                               dst=self.pg1.remote_ip4) /
1025                            UDP(sport=1144, dport=2233) /
1026                            Raw(b'X' * payload_size))
1027                 for i in range(count)]
1028
1029     def gen_pkts(self, sw_intf, src, dst, count=1,
1030                  payload_size=100):
1031         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1032                 IP(src="1.1.1.1", dst="1.1.1.2") /
1033                 UDP(sport=1144, dport=2233) /
1034                 Raw(b'X' * payload_size)
1035                 for i in range(count)]
1036
1037     def verify_decrypted(self, p, rxs):
1038         for rx in rxs:
1039             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1040             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1041
1042     def verify_encrypted(self, p, sa, rxs):
1043         for rx in rxs:
1044             try:
1045                 pkt = sa.decrypt(rx[IP])
1046                 if not pkt.haslayer(IP):
1047                     pkt = IP(pkt[Raw].load)
1048                 self.assert_packet_checksums_valid(pkt)
1049                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1050                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1051                 self.assertTrue(pkt.haslayer(GRE))
1052                 e = pkt[GRE]
1053                 self.assertEqual(e[IP].dst, "1.1.1.2")
1054             except (IndexError, AssertionError):
1055                 self.logger.debug(ppp("Unexpected packet:", rx))
1056                 try:
1057                     self.logger.debug(ppp("Decrypted packet:", pkt))
1058                 except:
1059                     pass
1060                 raise
1061
1062     def setUp(self):
1063         super(TestIpsecGreIfEsp, self).setUp()
1064
1065         self.tun_if = self.pg0
1066
1067         p = self.ipv4_params
1068
1069         bd1 = VppBridgeDomain(self, 1)
1070         bd1.add_vpp_config()
1071
1072         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1073                                   p.auth_algo_vpp_id, p.auth_key,
1074                                   p.crypt_algo_vpp_id, p.crypt_key,
1075                                   self.vpp_esp_protocol,
1076                                   self.pg0.local_ip4,
1077                                   self.pg0.remote_ip4)
1078         p.tun_sa_out.add_vpp_config()
1079
1080         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1081                                  p.auth_algo_vpp_id, p.auth_key,
1082                                  p.crypt_algo_vpp_id, p.crypt_key,
1083                                  self.vpp_esp_protocol,
1084                                  self.pg0.remote_ip4,
1085                                  self.pg0.local_ip4)
1086         p.tun_sa_in.add_vpp_config()
1087
1088         p.tun_if = VppGreInterface(self,
1089                                    self.pg0.local_ip4,
1090                                    self.pg0.remote_ip4)
1091         p.tun_if.add_vpp_config()
1092
1093         p.tun_protect = VppIpsecTunProtect(self,
1094                                            p.tun_if,
1095                                            p.tun_sa_out,
1096                                            [p.tun_sa_in])
1097         p.tun_protect.add_vpp_config()
1098
1099         p.tun_if.admin_up()
1100         p.tun_if.config_ip4()
1101         config_tun_params(p, self.encryption_type, p.tun_if)
1102
1103         VppIpRoute(self, "1.1.1.2", 32,
1104                    [VppRoutePath(p.tun_if.remote_ip4,
1105                                  0xffffffff)]).add_vpp_config()
1106
1107     def tearDown(self):
1108         p = self.ipv4_params
1109         p.tun_if.unconfig_ip4()
1110         super(TestIpsecGreIfEsp, self).tearDown()
1111
1112
1113 class TestIpsecGreIfEspTra(TemplateIpsec,
1114                            IpsecTun4Tests):
1115     """ Ipsec GRE ESP - TRA tests """
1116     tun4_encrypt_node_name = "esp4-encrypt-tun"
1117     tun4_decrypt_node_name = "esp4-decrypt-tun"
1118     encryption_type = ESP
1119
1120     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1121                          payload_size=100):
1122         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1123                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1124                               dst=self.pg0.local_ip4) /
1125                            GRE() /
1126                            IP(src=self.pg1.local_ip4,
1127                               dst=self.pg1.remote_ip4) /
1128                            UDP(sport=1144, dport=2233) /
1129                            Raw(b'X' * payload_size))
1130                 for i in range(count)]
1131
1132     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1133                                 payload_size=100):
1134         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1135                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1136                               dst=self.pg0.local_ip4) /
1137                            GRE() /
1138                            UDP(sport=1144, dport=2233) /
1139                            Raw(b'X' * payload_size))
1140                 for i in range(count)]
1141
1142     def gen_pkts(self, sw_intf, src, dst, count=1,
1143                  payload_size=100):
1144         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1145                 IP(src="1.1.1.1", dst="1.1.1.2") /
1146                 UDP(sport=1144, dport=2233) /
1147                 Raw(b'X' * payload_size)
1148                 for i in range(count)]
1149
1150     def verify_decrypted(self, p, rxs):
1151         for rx in rxs:
1152             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1153             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1154
1155     def verify_encrypted(self, p, sa, rxs):
1156         for rx in rxs:
1157             try:
1158                 pkt = sa.decrypt(rx[IP])
1159                 if not pkt.haslayer(IP):
1160                     pkt = IP(pkt[Raw].load)
1161                 self.assert_packet_checksums_valid(pkt)
1162                 self.assertTrue(pkt.haslayer(GRE))
1163                 e = pkt[GRE]
1164                 self.assertEqual(e[IP].dst, "1.1.1.2")
1165             except (IndexError, AssertionError):
1166                 self.logger.debug(ppp("Unexpected packet:", rx))
1167                 try:
1168                     self.logger.debug(ppp("Decrypted packet:", pkt))
1169                 except:
1170                     pass
1171                 raise
1172
1173     def setUp(self):
1174         super(TestIpsecGreIfEspTra, self).setUp()
1175
1176         self.tun_if = self.pg0
1177
1178         p = self.ipv4_params
1179
1180         bd1 = VppBridgeDomain(self, 1)
1181         bd1.add_vpp_config()
1182
1183         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1184                                   p.auth_algo_vpp_id, p.auth_key,
1185                                   p.crypt_algo_vpp_id, p.crypt_key,
1186                                   self.vpp_esp_protocol)
1187         p.tun_sa_out.add_vpp_config()
1188
1189         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1190                                  p.auth_algo_vpp_id, p.auth_key,
1191                                  p.crypt_algo_vpp_id, p.crypt_key,
1192                                  self.vpp_esp_protocol)
1193         p.tun_sa_in.add_vpp_config()
1194
1195         p.tun_if = VppGreInterface(self,
1196                                    self.pg0.local_ip4,
1197                                    self.pg0.remote_ip4)
1198         p.tun_if.add_vpp_config()
1199
1200         p.tun_protect = VppIpsecTunProtect(self,
1201                                            p.tun_if,
1202                                            p.tun_sa_out,
1203                                            [p.tun_sa_in])
1204         p.tun_protect.add_vpp_config()
1205
1206         p.tun_if.admin_up()
1207         p.tun_if.config_ip4()
1208         config_tra_params(p, self.encryption_type, p.tun_if)
1209
1210         VppIpRoute(self, "1.1.1.2", 32,
1211                    [VppRoutePath(p.tun_if.remote_ip4,
1212                                  0xffffffff)]).add_vpp_config()
1213
1214     def tearDown(self):
1215         p = self.ipv4_params
1216         p.tun_if.unconfig_ip4()
1217         super(TestIpsecGreIfEspTra, self).tearDown()
1218
1219     def test_gre_non_ip(self):
1220         p = self.ipv4_params
1221         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1222                                           src=p.remote_tun_if_host,
1223                                           dst=self.pg1.remote_ip6)
1224         self.send_and_assert_no_replies(self.tun_if, tx)
1225         node_name = ('/err/%s/unsupported payload' %
1226                      self.tun4_decrypt_node_name)
1227         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1228
1229
1230 class TestIpsecGre6IfEspTra(TemplateIpsec,
1231                             IpsecTun6Tests):
1232     """ Ipsec GRE ESP - TRA tests """
1233     tun6_encrypt_node_name = "esp6-encrypt-tun"
1234     tun6_decrypt_node_name = "esp6-decrypt-tun"
1235     encryption_type = ESP
1236
1237     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1238                           payload_size=100):
1239         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1240                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1241                                 dst=self.pg0.local_ip6) /
1242                            GRE() /
1243                            IPv6(src=self.pg1.local_ip6,
1244                                 dst=self.pg1.remote_ip6) /
1245                            UDP(sport=1144, dport=2233) /
1246                            Raw(b'X' * payload_size))
1247                 for i in range(count)]
1248
1249     def gen_pkts6(self, sw_intf, src, dst, count=1,
1250                   payload_size=100):
1251         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1252                 IPv6(src="1::1", dst="1::2") /
1253                 UDP(sport=1144, dport=2233) /
1254                 Raw(b'X' * payload_size)
1255                 for i in range(count)]
1256
1257     def verify_decrypted6(self, p, rxs):
1258         for rx in rxs:
1259             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1260             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1261
1262     def verify_encrypted6(self, p, sa, rxs):
1263         for rx in rxs:
1264             try:
1265                 pkt = sa.decrypt(rx[IPv6])
1266                 if not pkt.haslayer(IPv6):
1267                     pkt = IPv6(pkt[Raw].load)
1268                 self.assert_packet_checksums_valid(pkt)
1269                 self.assertTrue(pkt.haslayer(GRE))
1270                 e = pkt[GRE]
1271                 self.assertEqual(e[IPv6].dst, "1::2")
1272             except (IndexError, AssertionError):
1273                 self.logger.debug(ppp("Unexpected packet:", rx))
1274                 try:
1275                     self.logger.debug(ppp("Decrypted packet:", pkt))
1276                 except:
1277                     pass
1278                 raise
1279
1280     def setUp(self):
1281         super(TestIpsecGre6IfEspTra, self).setUp()
1282
1283         self.tun_if = self.pg0
1284
1285         p = self.ipv6_params
1286
1287         bd1 = VppBridgeDomain(self, 1)
1288         bd1.add_vpp_config()
1289
1290         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1291                                   p.auth_algo_vpp_id, p.auth_key,
1292                                   p.crypt_algo_vpp_id, p.crypt_key,
1293                                   self.vpp_esp_protocol)
1294         p.tun_sa_out.add_vpp_config()
1295
1296         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1297                                  p.auth_algo_vpp_id, p.auth_key,
1298                                  p.crypt_algo_vpp_id, p.crypt_key,
1299                                  self.vpp_esp_protocol)
1300         p.tun_sa_in.add_vpp_config()
1301
1302         p.tun_if = VppGreInterface(self,
1303                                    self.pg0.local_ip6,
1304                                    self.pg0.remote_ip6)
1305         p.tun_if.add_vpp_config()
1306
1307         p.tun_protect = VppIpsecTunProtect(self,
1308                                            p.tun_if,
1309                                            p.tun_sa_out,
1310                                            [p.tun_sa_in])
1311         p.tun_protect.add_vpp_config()
1312
1313         p.tun_if.admin_up()
1314         p.tun_if.config_ip6()
1315         config_tra_params(p, self.encryption_type, p.tun_if)
1316
1317         r = VppIpRoute(self, "1::2", 128,
1318                        [VppRoutePath(p.tun_if.remote_ip6,
1319                                      0xffffffff,
1320                                      proto=DpoProto.DPO_PROTO_IP6)])
1321         r.add_vpp_config()
1322
1323     def tearDown(self):
1324         p = self.ipv6_params
1325         p.tun_if.unconfig_ip6()
1326         super(TestIpsecGre6IfEspTra, self).tearDown()
1327
1328
1329 class TemplateIpsec4TunProtect(object):
1330     """ IPsec IPv4 Tunnel protect """
1331
1332     encryption_type = ESP
1333     tun4_encrypt_node_name = "esp4-encrypt-tun"
1334     tun4_decrypt_node_name = "esp4-decrypt-tun"
1335     tun4_input_node = "ipsec4-tun-input"
1336
1337     def config_sa_tra(self, p):
1338         config_tun_params(p, self.encryption_type, p.tun_if)
1339
1340         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1341                                   p.auth_algo_vpp_id, p.auth_key,
1342                                   p.crypt_algo_vpp_id, p.crypt_key,
1343                                   self.vpp_esp_protocol,
1344                                   flags=p.flags)
1345         p.tun_sa_out.add_vpp_config()
1346
1347         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1348                                  p.auth_algo_vpp_id, p.auth_key,
1349                                  p.crypt_algo_vpp_id, p.crypt_key,
1350                                  self.vpp_esp_protocol,
1351                                  flags=p.flags)
1352         p.tun_sa_in.add_vpp_config()
1353
1354     def config_sa_tun(self, p):
1355         config_tun_params(p, self.encryption_type, p.tun_if)
1356
1357         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1358                                   p.auth_algo_vpp_id, p.auth_key,
1359                                   p.crypt_algo_vpp_id, p.crypt_key,
1360                                   self.vpp_esp_protocol,
1361                                   self.tun_if.local_addr[p.addr_type],
1362                                   self.tun_if.remote_addr[p.addr_type],
1363                                   flags=p.flags)
1364         p.tun_sa_out.add_vpp_config()
1365
1366         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1367                                  p.auth_algo_vpp_id, p.auth_key,
1368                                  p.crypt_algo_vpp_id, p.crypt_key,
1369                                  self.vpp_esp_protocol,
1370                                  self.tun_if.remote_addr[p.addr_type],
1371                                  self.tun_if.local_addr[p.addr_type],
1372                                  flags=p.flags)
1373         p.tun_sa_in.add_vpp_config()
1374
1375     def config_protect(self, p):
1376         p.tun_protect = VppIpsecTunProtect(self,
1377                                            p.tun_if,
1378                                            p.tun_sa_out,
1379                                            [p.tun_sa_in])
1380         p.tun_protect.add_vpp_config()
1381
1382     def config_network(self, p):
1383         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1384                                        self.pg0.local_ip4,
1385                                        self.pg0.remote_ip4)
1386         p.tun_if.add_vpp_config()
1387         p.tun_if.admin_up()
1388         p.tun_if.config_ip4()
1389         p.tun_if.config_ip6()
1390
1391         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1392                              [VppRoutePath(p.tun_if.remote_ip4,
1393                                            0xffffffff)])
1394         p.route.add_vpp_config()
1395         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1396                        [VppRoutePath(p.tun_if.remote_ip6,
1397                                      0xffffffff,
1398                                      proto=DpoProto.DPO_PROTO_IP6)])
1399         r.add_vpp_config()
1400
1401     def unconfig_network(self, p):
1402         p.route.remove_vpp_config()
1403         p.tun_if.remove_vpp_config()
1404
1405     def unconfig_protect(self, p):
1406         p.tun_protect.remove_vpp_config()
1407
1408     def unconfig_sa(self, p):
1409         p.tun_sa_out.remove_vpp_config()
1410         p.tun_sa_in.remove_vpp_config()
1411
1412
1413 class TestIpsec4TunProtect(TemplateIpsec,
1414                            TemplateIpsec4TunProtect,
1415                            IpsecTun4):
1416     """ IPsec IPv4 Tunnel protect - transport mode"""
1417
1418     def setUp(self):
1419         super(TestIpsec4TunProtect, self).setUp()
1420
1421         self.tun_if = self.pg0
1422
1423     def tearDown(self):
1424         super(TestIpsec4TunProtect, self).tearDown()
1425
1426     def test_tun_44(self):
1427         """IPSEC tunnel protect"""
1428
1429         p = self.ipv4_params
1430
1431         self.config_network(p)
1432         self.config_sa_tra(p)
1433         self.config_protect(p)
1434
1435         self.verify_tun_44(p, count=127)
1436         c = p.tun_if.get_rx_stats()
1437         self.assertEqual(c['packets'], 127)
1438         c = p.tun_if.get_tx_stats()
1439         self.assertEqual(c['packets'], 127)
1440
1441         self.vapi.cli("clear ipsec sa")
1442         self.verify_tun_64(p, count=127)
1443         c = p.tun_if.get_rx_stats()
1444         self.assertEqual(c['packets'], 254)
1445         c = p.tun_if.get_tx_stats()
1446         self.assertEqual(c['packets'], 254)
1447
1448         # rekey - create new SAs and update the tunnel protection
1449         np = copy.copy(p)
1450         np.crypt_key = b'X' + p.crypt_key[1:]
1451         np.scapy_tun_spi += 100
1452         np.scapy_tun_sa_id += 1
1453         np.vpp_tun_spi += 100
1454         np.vpp_tun_sa_id += 1
1455         np.tun_if.local_spi = p.vpp_tun_spi
1456         np.tun_if.remote_spi = p.scapy_tun_spi
1457
1458         self.config_sa_tra(np)
1459         self.config_protect(np)
1460         self.unconfig_sa(p)
1461
1462         self.verify_tun_44(np, count=127)
1463         c = p.tun_if.get_rx_stats()
1464         self.assertEqual(c['packets'], 381)
1465         c = p.tun_if.get_tx_stats()
1466         self.assertEqual(c['packets'], 381)
1467
1468         # teardown
1469         self.unconfig_protect(np)
1470         self.unconfig_sa(np)
1471         self.unconfig_network(p)
1472
1473
1474 class TestIpsec4TunProtectUdp(TemplateIpsec,
1475                               TemplateIpsec4TunProtect,
1476                               IpsecTun4):
1477     """ IPsec IPv4 Tunnel protect - transport mode"""
1478
1479     def setUp(self):
1480         super(TestIpsec4TunProtectUdp, self).setUp()
1481
1482         self.tun_if = self.pg0
1483
1484         p = self.ipv4_params
1485         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1486                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1487         p.nat_header = UDP(sport=5454, dport=4500)
1488         self.config_network(p)
1489         self.config_sa_tra(p)
1490         self.config_protect(p)
1491
1492     def tearDown(self):
1493         p = self.ipv4_params
1494         self.unconfig_protect(p)
1495         self.unconfig_sa(p)
1496         self.unconfig_network(p)
1497         super(TestIpsec4TunProtectUdp, self).tearDown()
1498
1499     def test_tun_44(self):
1500         """IPSEC UDP tunnel protect"""
1501
1502         p = self.ipv4_params
1503
1504         self.verify_tun_44(p, count=127)
1505         c = p.tun_if.get_rx_stats()
1506         self.assertEqual(c['packets'], 127)
1507         c = p.tun_if.get_tx_stats()
1508         self.assertEqual(c['packets'], 127)
1509
1510     def test_keepalive(self):
1511         """ IPSEC NAT Keepalive """
1512         self.verify_keepalive(self.ipv4_params)
1513
1514
1515 class TestIpsec4TunProtectTun(TemplateIpsec,
1516                               TemplateIpsec4TunProtect,
1517                               IpsecTun4):
1518     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1519
1520     encryption_type = ESP
1521     tun4_encrypt_node_name = "esp4-encrypt-tun"
1522     tun4_decrypt_node_name = "esp4-decrypt-tun"
1523
1524     def setUp(self):
1525         super(TestIpsec4TunProtectTun, self).setUp()
1526
1527         self.tun_if = self.pg0
1528
1529     def tearDown(self):
1530         super(TestIpsec4TunProtectTun, self).tearDown()
1531
1532     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1533                          payload_size=100):
1534         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1535                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1536                               dst=sw_intf.local_ip4) /
1537                            IP(src=src, dst=dst) /
1538                            UDP(sport=1144, dport=2233) /
1539                            Raw(b'X' * payload_size))
1540                 for i in range(count)]
1541
1542     def gen_pkts(self, sw_intf, src, dst, count=1,
1543                  payload_size=100):
1544         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1545                 IP(src=src, dst=dst) /
1546                 UDP(sport=1144, dport=2233) /
1547                 Raw(b'X' * payload_size)
1548                 for i in range(count)]
1549
1550     def verify_decrypted(self, p, rxs):
1551         for rx in rxs:
1552             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1553             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1554             self.assert_packet_checksums_valid(rx)
1555
1556     def verify_encrypted(self, p, sa, rxs):
1557         for rx in rxs:
1558             try:
1559                 pkt = sa.decrypt(rx[IP])
1560                 if not pkt.haslayer(IP):
1561                     pkt = IP(pkt[Raw].load)
1562                 self.assert_packet_checksums_valid(pkt)
1563                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1564                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1565                 inner = pkt[IP].payload
1566                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1567
1568             except (IndexError, AssertionError):
1569                 self.logger.debug(ppp("Unexpected packet:", rx))
1570                 try:
1571                     self.logger.debug(ppp("Decrypted packet:", pkt))
1572                 except:
1573                     pass
1574                 raise
1575
1576     def test_tun_44(self):
1577         """IPSEC tunnel protect """
1578
1579         p = self.ipv4_params
1580
1581         self.config_network(p)
1582         self.config_sa_tun(p)
1583         self.config_protect(p)
1584
1585         self.verify_tun_44(p, count=127)
1586
1587         c = p.tun_if.get_rx_stats()
1588         self.assertEqual(c['packets'], 127)
1589         c = p.tun_if.get_tx_stats()
1590         self.assertEqual(c['packets'], 127)
1591
1592         # rekey - create new SAs and update the tunnel protection
1593         np = copy.copy(p)
1594         np.crypt_key = b'X' + p.crypt_key[1:]
1595         np.scapy_tun_spi += 100
1596         np.scapy_tun_sa_id += 1
1597         np.vpp_tun_spi += 100
1598         np.vpp_tun_sa_id += 1
1599         np.tun_if.local_spi = p.vpp_tun_spi
1600         np.tun_if.remote_spi = p.scapy_tun_spi
1601
1602         self.config_sa_tun(np)
1603         self.config_protect(np)
1604         self.unconfig_sa(p)
1605
1606         self.verify_tun_44(np, count=127)
1607         c = p.tun_if.get_rx_stats()
1608         self.assertEqual(c['packets'], 254)
1609         c = p.tun_if.get_tx_stats()
1610         self.assertEqual(c['packets'], 254)
1611
1612         # teardown
1613         self.unconfig_protect(np)
1614         self.unconfig_sa(np)
1615         self.unconfig_network(p)
1616
1617
1618 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1619                                   TemplateIpsec4TunProtect,
1620                                   IpsecTun4):
1621     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1622
1623     encryption_type = ESP
1624     tun4_encrypt_node_name = "esp4-encrypt-tun"
1625     tun4_decrypt_node_name = "esp4-decrypt-tun"
1626
1627     def setUp(self):
1628         super(TestIpsec4TunProtectTunDrop, self).setUp()
1629
1630         self.tun_if = self.pg0
1631
1632     def tearDown(self):
1633         super(TestIpsec4TunProtectTunDrop, self).tearDown()
1634
1635     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1636                          payload_size=100):
1637         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1638                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1639                               dst="5.5.5.5") /
1640                            IP(src=src, dst=dst) /
1641                            UDP(sport=1144, dport=2233) /
1642                            Raw(b'X' * payload_size))
1643                 for i in range(count)]
1644
1645     def test_tun_drop_44(self):
1646         """IPSEC tunnel protect bogus tunnel header """
1647
1648         p = self.ipv4_params
1649
1650         self.config_network(p)
1651         self.config_sa_tun(p)
1652         self.config_protect(p)
1653
1654         tx = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1655                                    src=p.remote_tun_if_host,
1656                                    dst=self.pg1.remote_ip4,
1657                                    count=63)
1658         self.send_and_assert_no_replies(self.tun_if, tx)
1659
1660         # teardown
1661         self.unconfig_protect(p)
1662         self.unconfig_sa(p)
1663         self.unconfig_network(p)
1664
1665
1666 class TemplateIpsec6TunProtect(object):
1667     """ IPsec IPv6 Tunnel protect """
1668
1669     def config_sa_tra(self, p):
1670         config_tun_params(p, self.encryption_type, p.tun_if)
1671
1672         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1673                                   p.auth_algo_vpp_id, p.auth_key,
1674                                   p.crypt_algo_vpp_id, p.crypt_key,
1675                                   self.vpp_esp_protocol)
1676         p.tun_sa_out.add_vpp_config()
1677
1678         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1679                                  p.auth_algo_vpp_id, p.auth_key,
1680                                  p.crypt_algo_vpp_id, p.crypt_key,
1681                                  self.vpp_esp_protocol)
1682         p.tun_sa_in.add_vpp_config()
1683
1684     def config_sa_tun(self, p):
1685         config_tun_params(p, self.encryption_type, p.tun_if)
1686
1687         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1688                                   p.auth_algo_vpp_id, p.auth_key,
1689                                   p.crypt_algo_vpp_id, p.crypt_key,
1690                                   self.vpp_esp_protocol,
1691                                   self.tun_if.local_addr[p.addr_type],
1692                                   self.tun_if.remote_addr[p.addr_type])
1693         p.tun_sa_out.add_vpp_config()
1694
1695         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1696                                  p.auth_algo_vpp_id, p.auth_key,
1697                                  p.crypt_algo_vpp_id, p.crypt_key,
1698                                  self.vpp_esp_protocol,
1699                                  self.tun_if.remote_addr[p.addr_type],
1700                                  self.tun_if.local_addr[p.addr_type])
1701         p.tun_sa_in.add_vpp_config()
1702
1703     def config_protect(self, p):
1704         p.tun_protect = VppIpsecTunProtect(self,
1705                                            p.tun_if,
1706                                            p.tun_sa_out,
1707                                            [p.tun_sa_in])
1708         p.tun_protect.add_vpp_config()
1709
1710     def config_network(self, p):
1711         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1712                                        self.pg0.local_ip6,
1713                                        self.pg0.remote_ip6)
1714         p.tun_if.add_vpp_config()
1715         p.tun_if.admin_up()
1716         p.tun_if.config_ip6()
1717         p.tun_if.config_ip4()
1718
1719         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1720                              [VppRoutePath(p.tun_if.remote_ip6,
1721                                            0xffffffff,
1722                                            proto=DpoProto.DPO_PROTO_IP6)])
1723         p.route.add_vpp_config()
1724         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1725                        [VppRoutePath(p.tun_if.remote_ip4,
1726                                      0xffffffff)])
1727         r.add_vpp_config()
1728
1729     def unconfig_network(self, p):
1730         p.route.remove_vpp_config()
1731         p.tun_if.remove_vpp_config()
1732
1733     def unconfig_protect(self, p):
1734         p.tun_protect.remove_vpp_config()
1735
1736     def unconfig_sa(self, p):
1737         p.tun_sa_out.remove_vpp_config()
1738         p.tun_sa_in.remove_vpp_config()
1739
1740
1741 class TestIpsec6TunProtect(TemplateIpsec,
1742                            TemplateIpsec6TunProtect,
1743                            IpsecTun6):
1744     """ IPsec IPv6 Tunnel protect - transport mode"""
1745
1746     encryption_type = ESP
1747     tun6_encrypt_node_name = "esp6-encrypt-tun"
1748     tun6_decrypt_node_name = "esp6-decrypt-tun"
1749
1750     def setUp(self):
1751         super(TestIpsec6TunProtect, self).setUp()
1752
1753         self.tun_if = self.pg0
1754
1755     def tearDown(self):
1756         super(TestIpsec6TunProtect, self).tearDown()
1757
1758     def test_tun_66(self):
1759         """IPSEC tunnel protect 6o6"""
1760
1761         p = self.ipv6_params
1762
1763         self.config_network(p)
1764         self.config_sa_tra(p)
1765         self.config_protect(p)
1766
1767         self.verify_tun_66(p, count=127)
1768         c = p.tun_if.get_rx_stats()
1769         self.assertEqual(c['packets'], 127)
1770         c = p.tun_if.get_tx_stats()
1771         self.assertEqual(c['packets'], 127)
1772
1773         # rekey - create new SAs and update the tunnel protection
1774         np = copy.copy(p)
1775         np.crypt_key = b'X' + p.crypt_key[1:]
1776         np.scapy_tun_spi += 100
1777         np.scapy_tun_sa_id += 1
1778         np.vpp_tun_spi += 100
1779         np.vpp_tun_sa_id += 1
1780         np.tun_if.local_spi = p.vpp_tun_spi
1781         np.tun_if.remote_spi = p.scapy_tun_spi
1782
1783         self.config_sa_tra(np)
1784         self.config_protect(np)
1785         self.unconfig_sa(p)
1786
1787         self.verify_tun_66(np, count=127)
1788         c = p.tun_if.get_rx_stats()
1789         self.assertEqual(c['packets'], 254)
1790         c = p.tun_if.get_tx_stats()
1791         self.assertEqual(c['packets'], 254)
1792
1793         # bounce the interface state
1794         p.tun_if.admin_down()
1795         self.verify_drop_tun_66(np, count=127)
1796         node = ('/err/ipsec6-tun-input/%s' %
1797                 'ipsec packets received on disabled interface')
1798         self.assertEqual(127, self.statistics.get_err_counter(node))
1799         p.tun_if.admin_up()
1800         self.verify_tun_66(np, count=127)
1801
1802         # 3 phase rekey
1803         #  1) add two input SAs [old, new]
1804         #  2) swap output SA to [new]
1805         #  3) use only [new] input SA
1806         np3 = copy.copy(np)
1807         np3.crypt_key = b'Z' + p.crypt_key[1:]
1808         np3.scapy_tun_spi += 100
1809         np3.scapy_tun_sa_id += 1
1810         np3.vpp_tun_spi += 100
1811         np3.vpp_tun_sa_id += 1
1812         np3.tun_if.local_spi = p.vpp_tun_spi
1813         np3.tun_if.remote_spi = p.scapy_tun_spi
1814
1815         self.config_sa_tra(np3)
1816
1817         # step 1;
1818         p.tun_protect.update_vpp_config(np.tun_sa_out,
1819                                         [np.tun_sa_in, np3.tun_sa_in])
1820         self.verify_tun_66(np, np, count=127)
1821         self.verify_tun_66(np3, np, count=127)
1822
1823         # step 2;
1824         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1825                                         [np.tun_sa_in, np3.tun_sa_in])
1826         self.verify_tun_66(np, np3, count=127)
1827         self.verify_tun_66(np3, np3, count=127)
1828
1829         # step 1;
1830         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1831                                         [np3.tun_sa_in])
1832         self.verify_tun_66(np3, np3, count=127)
1833         self.verify_drop_tun_66(np, count=127)
1834
1835         c = p.tun_if.get_rx_stats()
1836         self.assertEqual(c['packets'], 127*9)
1837         c = p.tun_if.get_tx_stats()
1838         self.assertEqual(c['packets'], 127*8)
1839         self.unconfig_sa(np)
1840
1841         # teardown
1842         self.unconfig_protect(np3)
1843         self.unconfig_sa(np3)
1844         self.unconfig_network(p)
1845
1846     def test_tun_46(self):
1847         """IPSEC tunnel protect 4o6"""
1848
1849         p = self.ipv6_params
1850
1851         self.config_network(p)
1852         self.config_sa_tra(p)
1853         self.config_protect(p)
1854
1855         self.verify_tun_46(p, count=127)
1856         c = p.tun_if.get_rx_stats()
1857         self.assertEqual(c['packets'], 127)
1858         c = p.tun_if.get_tx_stats()
1859         self.assertEqual(c['packets'], 127)
1860
1861         # teardown
1862         self.unconfig_protect(p)
1863         self.unconfig_sa(p)
1864         self.unconfig_network(p)
1865
1866
1867 class TestIpsec6TunProtectTun(TemplateIpsec,
1868                               TemplateIpsec6TunProtect,
1869                               IpsecTun6):
1870     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1871
1872     encryption_type = ESP
1873     tun6_encrypt_node_name = "esp6-encrypt-tun"
1874     tun6_decrypt_node_name = "esp6-decrypt-tun"
1875
1876     def setUp(self):
1877         super(TestIpsec6TunProtectTun, self).setUp()
1878
1879         self.tun_if = self.pg0
1880
1881     def tearDown(self):
1882         super(TestIpsec6TunProtectTun, self).tearDown()
1883
1884     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1885                           payload_size=100):
1886         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1887                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1888                                 dst=sw_intf.local_ip6) /
1889                            IPv6(src=src, dst=dst) /
1890                            UDP(sport=1166, dport=2233) /
1891                            Raw(b'X' * payload_size))
1892                 for i in range(count)]
1893
1894     def gen_pkts6(self, sw_intf, src, dst, count=1,
1895                   payload_size=100):
1896         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1897                 IPv6(src=src, dst=dst) /
1898                 UDP(sport=1166, dport=2233) /
1899                 Raw(b'X' * payload_size)
1900                 for i in range(count)]
1901
1902     def verify_decrypted6(self, p, rxs):
1903         for rx in rxs:
1904             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1905             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1906             self.assert_packet_checksums_valid(rx)
1907
1908     def verify_encrypted6(self, p, sa, rxs):
1909         for rx in rxs:
1910             try:
1911                 pkt = sa.decrypt(rx[IPv6])
1912                 if not pkt.haslayer(IPv6):
1913                     pkt = IPv6(pkt[Raw].load)
1914                 self.assert_packet_checksums_valid(pkt)
1915                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1916                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1917                 inner = pkt[IPv6].payload
1918                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1919
1920             except (IndexError, AssertionError):
1921                 self.logger.debug(ppp("Unexpected packet:", rx))
1922                 try:
1923                     self.logger.debug(ppp("Decrypted packet:", pkt))
1924                 except:
1925                     pass
1926                 raise
1927
1928     def test_tun_66(self):
1929         """IPSEC tunnel protect """
1930
1931         p = self.ipv6_params
1932
1933         self.config_network(p)
1934         self.config_sa_tun(p)
1935         self.config_protect(p)
1936
1937         self.verify_tun_66(p, count=127)
1938
1939         c = p.tun_if.get_rx_stats()
1940         self.assertEqual(c['packets'], 127)
1941         c = p.tun_if.get_tx_stats()
1942         self.assertEqual(c['packets'], 127)
1943
1944         # rekey - create new SAs and update the tunnel protection
1945         np = copy.copy(p)
1946         np.crypt_key = b'X' + p.crypt_key[1:]
1947         np.scapy_tun_spi += 100
1948         np.scapy_tun_sa_id += 1
1949         np.vpp_tun_spi += 100
1950         np.vpp_tun_sa_id += 1
1951         np.tun_if.local_spi = p.vpp_tun_spi
1952         np.tun_if.remote_spi = p.scapy_tun_spi
1953
1954         self.config_sa_tun(np)
1955         self.config_protect(np)
1956         self.unconfig_sa(p)
1957
1958         self.verify_tun_66(np, count=127)
1959         c = p.tun_if.get_rx_stats()
1960         self.assertEqual(c['packets'], 254)
1961         c = p.tun_if.get_tx_stats()
1962         self.assertEqual(c['packets'], 254)
1963
1964         # teardown
1965         self.unconfig_protect(np)
1966         self.unconfig_sa(np)
1967         self.unconfig_network(p)
1968
1969
1970 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
1971                                   TemplateIpsec6TunProtect,
1972                                   IpsecTun6):
1973     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
1974
1975     encryption_type = ESP
1976     tun6_encrypt_node_name = "esp6-encrypt-tun"
1977     tun6_decrypt_node_name = "esp6-decrypt-tun"
1978
1979     def setUp(self):
1980         super(TestIpsec6TunProtectTunDrop, self).setUp()
1981
1982         self.tun_if = self.pg0
1983
1984     def tearDown(self):
1985         super(TestIpsec6TunProtectTunDrop, self).tearDown()
1986
1987     def gen_encrypt_pkts5(self, sa, sw_intf, src, dst, count=1,
1988                           payload_size=100):
1989         # the IP destination of the revelaed packet does not match
1990         # that assigned to the tunnel
1991         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1992                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1993                                 dst="5::5") /
1994                            IPv6(src=src, dst=dst) /
1995                            UDP(sport=1144, dport=2233) /
1996                            Raw(b'X' * payload_size))
1997                 for i in range(count)]
1998
1999     def test_tun_drop_66(self):
2000         """IPSEC 6 tunnel protect bogus tunnel header """
2001
2002         p = self.ipv6_params
2003
2004         self.config_network(p)
2005         self.config_sa_tun(p)
2006         self.config_protect(p)
2007
2008         tx = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
2009                                     src=p.remote_tun_if_host,
2010                                     dst=self.pg1.remote_ip6,
2011                                     count=63)
2012         self.send_and_assert_no_replies(self.tun_if, tx)
2013
2014         self.unconfig_protect(p)
2015         self.unconfig_sa(p)
2016         self.unconfig_network(p)
2017
2018
2019 if __name__ == '__main__':
2020     unittest.main(testRunner=VppTestRunner)