ipsec: Test and fix IPSec worker hand-off
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from util import ppp
21 from vpp_papi import VppEnum
22
23
24 def config_tun_params(p, encryption_type, tun_if):
25     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
26     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
27                              IPSEC_API_SAD_FLAG_USE_ESN))
28     crypt_key = mk_scapy_crypt_key(p)
29     p.scapy_tun_sa = SecurityAssociation(
30         encryption_type, spi=p.vpp_tun_spi,
31         crypt_algo=p.crypt_algo,
32         crypt_key=crypt_key,
33         auth_algo=p.auth_algo, auth_key=p.auth_key,
34         tunnel_header=ip_class_by_addr_type[p.addr_type](
35             src=tun_if.remote_ip,
36             dst=tun_if.local_ip),
37         nat_t_header=p.nat_header,
38         esn_en=esn_en)
39     p.vpp_tun_sa = SecurityAssociation(
40         encryption_type, spi=p.scapy_tun_spi,
41         crypt_algo=p.crypt_algo,
42         crypt_key=crypt_key,
43         auth_algo=p.auth_algo, auth_key=p.auth_key,
44         tunnel_header=ip_class_by_addr_type[p.addr_type](
45             dst=tun_if.remote_ip,
46             src=tun_if.local_ip),
47         nat_t_header=p.nat_header,
48         esn_en=esn_en)
49
50
51 def config_tra_params(p, encryption_type, tun_if):
52     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
53     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
54                              IPSEC_API_SAD_FLAG_USE_ESN))
55     crypt_key = mk_scapy_crypt_key(p)
56     p.scapy_tun_sa = SecurityAssociation(
57         encryption_type, spi=p.vpp_tun_spi,
58         crypt_algo=p.crypt_algo,
59         crypt_key=crypt_key,
60         auth_algo=p.auth_algo, auth_key=p.auth_key,
61         esn_en=esn_en)
62     p.vpp_tun_sa = SecurityAssociation(
63         encryption_type, spi=p.scapy_tun_spi,
64         crypt_algo=p.crypt_algo,
65         crypt_key=crypt_key,
66         auth_algo=p.auth_algo, auth_key=p.auth_key,
67         esn_en=esn_en)
68
69
70 class TemplateIpsec4TunIfEsp(TemplateIpsec):
71     """ IPsec tunnel interface tests """
72
73     encryption_type = ESP
74
75     @classmethod
76     def setUpClass(cls):
77         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
78
79     @classmethod
80     def tearDownClass(cls):
81         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
82
83     def setUp(self):
84         super(TemplateIpsec4TunIfEsp, self).setUp()
85
86         self.tun_if = self.pg0
87
88         p = self.ipv4_params
89
90         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
91                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
92                                         p.crypt_key, p.crypt_key,
93                                         p.auth_algo_vpp_id, p.auth_key,
94                                         p.auth_key)
95         p.tun_if.add_vpp_config()
96         p.tun_if.admin_up()
97         p.tun_if.config_ip4()
98         p.tun_if.config_ip6()
99         config_tun_params(p, self.encryption_type, p.tun_if)
100
101         r = VppIpRoute(self, p.remote_tun_if_host, 32,
102                        [VppRoutePath(p.tun_if.remote_ip4,
103                                      0xffffffff)])
104         r.add_vpp_config()
105         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106                        [VppRoutePath(p.tun_if.remote_ip6,
107                                      0xffffffff,
108                                      proto=DpoProto.DPO_PROTO_IP6)])
109         r.add_vpp_config()
110
111     def tearDown(self):
112         super(TemplateIpsec4TunIfEsp, self).tearDown()
113
114
115 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
116     """ IPsec UDP tunnel interface tests """
117
118     tun4_encrypt_node_name = "esp4-encrypt-tun"
119     tun4_decrypt_node_name = "esp4-decrypt-tun"
120     encryption_type = ESP
121
122     @classmethod
123     def setUpClass(cls):
124         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
125
126     @classmethod
127     def tearDownClass(cls):
128         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
129
130     def setUp(self):
131         super(TemplateIpsec4TunIfEspUdp, self).setUp()
132
133         self.tun_if = self.pg0
134
135         p = self.ipv4_params
136         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
137                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
138         p.nat_header = UDP(sport=5454, dport=4500)
139
140         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
141                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
142                                         p.crypt_key, p.crypt_key,
143                                         p.auth_algo_vpp_id, p.auth_key,
144                                         p.auth_key, udp_encap=True)
145         p.tun_if.add_vpp_config()
146         p.tun_if.admin_up()
147         p.tun_if.config_ip4()
148         p.tun_if.config_ip6()
149         config_tun_params(p, self.encryption_type, p.tun_if)
150
151         r = VppIpRoute(self, p.remote_tun_if_host, 32,
152                        [VppRoutePath(p.tun_if.remote_ip4,
153                                      0xffffffff)])
154         r.add_vpp_config()
155         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
156                        [VppRoutePath(p.tun_if.remote_ip6,
157                                      0xffffffff,
158                                      proto=DpoProto.DPO_PROTO_IP6)])
159         r.add_vpp_config()
160
161     def tearDown(self):
162         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
163
164
165 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
166     """ Ipsec ESP - TUN tests """
167     tun4_encrypt_node_name = "esp4-encrypt-tun"
168     tun4_decrypt_node_name = "esp4-decrypt-tun"
169
170     def test_tun_basic64(self):
171         """ ipsec 6o4 tunnel basic test """
172         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
173
174         self.verify_tun_64(self.params[socket.AF_INET], count=1)
175
176     def test_tun_burst64(self):
177         """ ipsec 6o4 tunnel basic test """
178         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
179
180         self.verify_tun_64(self.params[socket.AF_INET], count=257)
181
182     def test_tun_basic_frag44(self):
183         """ ipsec 4o4 tunnel frag basic test """
184         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
185
186         p = self.ipv4_params
187
188         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
189                                        [1500, 0, 0, 0])
190         self.verify_tun_44(self.params[socket.AF_INET],
191                            count=1, payload_size=1800, n_rx=2)
192         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
193                                        [9000, 0, 0, 0])
194
195
196 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
197     """ Ipsec ESP UDP tests """
198
199     tun4_input_node = "ipsec4-tun-input"
200
201     def test_keepalive(self):
202         """ IPSEC NAT Keepalive """
203         self.verify_keepalive(self.ipv4_params)
204
205
206 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
207     """ Ipsec ESP - TCP tests """
208     pass
209
210
211 class TemplateIpsec6TunIfEsp(TemplateIpsec):
212     """ IPsec tunnel interface tests """
213
214     encryption_type = ESP
215
216     def setUp(self):
217         super(TemplateIpsec6TunIfEsp, self).setUp()
218
219         self.tun_if = self.pg0
220
221         p = self.ipv6_params
222         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
223                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
224                                         p.crypt_key, p.crypt_key,
225                                         p.auth_algo_vpp_id, p.auth_key,
226                                         p.auth_key, is_ip6=True)
227         p.tun_if.add_vpp_config()
228         p.tun_if.admin_up()
229         p.tun_if.config_ip6()
230         p.tun_if.config_ip4()
231         config_tun_params(p, self.encryption_type, p.tun_if)
232
233         r = VppIpRoute(self, p.remote_tun_if_host, 128,
234                        [VppRoutePath(p.tun_if.remote_ip6,
235                                      0xffffffff,
236                                      proto=DpoProto.DPO_PROTO_IP6)])
237         r.add_vpp_config()
238         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
239                        [VppRoutePath(p.tun_if.remote_ip4,
240                                      0xffffffff)])
241         r.add_vpp_config()
242
243     def tearDown(self):
244         super(TemplateIpsec6TunIfEsp, self).tearDown()
245
246
247 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
248                           IpsecTun6Tests):
249     """ Ipsec ESP - TUN tests """
250     tun6_encrypt_node_name = "esp6-encrypt-tun"
251     tun6_decrypt_node_name = "esp6-decrypt-tun"
252
253     def test_tun_basic46(self):
254         """ ipsec 4o6 tunnel basic test """
255         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
256         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
257
258     def test_tun_burst46(self):
259         """ ipsec 4o6 tunnel burst test """
260         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
261         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
262
263
264 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
265                                 IpsecTun6HandoffTests):
266     """ Ipsec ESP 6 Handoff tests """
267     tun6_encrypt_node_name = "esp6-encrypt-tun"
268     tun6_decrypt_node_name = "esp6-decrypt-tun"
269
270
271 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
272                                 IpsecTun4HandoffTests):
273     """ Ipsec ESP 4 Handoff tests """
274     tun4_encrypt_node_name = "esp4-encrypt-tun"
275     tun4_decrypt_node_name = "esp4-decrypt-tun"
276
277
278 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
279     """ IPsec IPv4 Multi Tunnel interface """
280
281     encryption_type = ESP
282     tun4_encrypt_node_name = "esp4-encrypt-tun"
283     tun4_decrypt_node_name = "esp4-decrypt-tun"
284
285     def setUp(self):
286         super(TestIpsec4MultiTunIfEsp, self).setUp()
287
288         self.tun_if = self.pg0
289
290         self.multi_params = []
291         self.pg0.generate_remote_hosts(10)
292         self.pg0.configure_ipv4_neighbors()
293
294         for ii in range(10):
295             p = copy.copy(self.ipv4_params)
296
297             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
298             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
299             p.scapy_tun_spi = p.scapy_tun_spi + ii
300             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
301             p.vpp_tun_spi = p.vpp_tun_spi + ii
302
303             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
304             p.scapy_tra_spi = p.scapy_tra_spi + ii
305             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
306             p.vpp_tra_spi = p.vpp_tra_spi + ii
307
308             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
309                                             p.scapy_tun_spi,
310                                             p.crypt_algo_vpp_id,
311                                             p.crypt_key, p.crypt_key,
312                                             p.auth_algo_vpp_id, p.auth_key,
313                                             p.auth_key,
314                                             dst=self.pg0.remote_hosts[ii].ip4)
315             p.tun_if.add_vpp_config()
316             p.tun_if.admin_up()
317             p.tun_if.config_ip4()
318             config_tun_params(p, self.encryption_type, p.tun_if)
319             self.multi_params.append(p)
320
321             VppIpRoute(self, p.remote_tun_if_host, 32,
322                        [VppRoutePath(p.tun_if.remote_ip4,
323                                      0xffffffff)]).add_vpp_config()
324
325     def tearDown(self):
326         super(TestIpsec4MultiTunIfEsp, self).tearDown()
327
328     def test_tun_44(self):
329         """Multiple IPSEC tunnel interfaces """
330         for p in self.multi_params:
331             self.verify_tun_44(p, count=127)
332             c = p.tun_if.get_rx_stats()
333             self.assertEqual(c['packets'], 127)
334             c = p.tun_if.get_tx_stats()
335             self.assertEqual(c['packets'], 127)
336
337
338 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
339     """ IPsec IPv4 Tunnel interface all Algos """
340
341     encryption_type = ESP
342     tun4_encrypt_node_name = "esp4-encrypt-tun"
343     tun4_decrypt_node_name = "esp4-decrypt-tun"
344
345     def config_network(self, p):
346
347         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
348                                         p.scapy_tun_spi,
349                                         p.crypt_algo_vpp_id,
350                                         p.crypt_key, p.crypt_key,
351                                         p.auth_algo_vpp_id, p.auth_key,
352                                         p.auth_key,
353                                         salt=p.salt)
354         p.tun_if.add_vpp_config()
355         p.tun_if.admin_up()
356         p.tun_if.config_ip4()
357         config_tun_params(p, self.encryption_type, p.tun_if)
358         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
359         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
360
361         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
362                              [VppRoutePath(p.tun_if.remote_ip4,
363                                            0xffffffff)])
364         p.route.add_vpp_config()
365
366     def unconfig_network(self, p):
367         p.tun_if.unconfig_ip4()
368         p.tun_if.remove_vpp_config()
369         p.route.remove_vpp_config()
370
371     def setUp(self):
372         super(TestIpsec4TunIfEspAll, self).setUp()
373
374         self.tun_if = self.pg0
375
376     def tearDown(self):
377         super(TestIpsec4TunIfEspAll, self).tearDown()
378
379     def rekey(self, p):
380         #
381         # change the key and the SPI
382         #
383         p.crypt_key = b'X' + p.crypt_key[1:]
384         p.scapy_tun_spi += 1
385         p.scapy_tun_sa_id += 1
386         p.vpp_tun_spi += 1
387         p.vpp_tun_sa_id += 1
388         p.tun_if.local_spi = p.vpp_tun_spi
389         p.tun_if.remote_spi = p.scapy_tun_spi
390
391         config_tun_params(p, self.encryption_type, p.tun_if)
392
393         p.tun_sa_in = VppIpsecSA(self,
394                                  p.scapy_tun_sa_id,
395                                  p.scapy_tun_spi,
396                                  p.auth_algo_vpp_id,
397                                  p.auth_key,
398                                  p.crypt_algo_vpp_id,
399                                  p.crypt_key,
400                                  self.vpp_esp_protocol,
401                                  flags=p.flags,
402                                  salt=p.salt)
403         p.tun_sa_out = VppIpsecSA(self,
404                                   p.vpp_tun_sa_id,
405                                   p.vpp_tun_spi,
406                                   p.auth_algo_vpp_id,
407                                   p.auth_key,
408                                   p.crypt_algo_vpp_id,
409                                   p.crypt_key,
410                                   self.vpp_esp_protocol,
411                                   flags=p.flags,
412                                   salt=p.salt)
413         p.tun_sa_in.add_vpp_config()
414         p.tun_sa_out.add_vpp_config()
415
416         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
417                                          sa_id=p.tun_sa_in.id,
418                                          is_outbound=1)
419         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
420                                          sa_id=p.tun_sa_out.id,
421                                          is_outbound=0)
422         self.logger.info(self.vapi.cli("sh ipsec sa"))
423
424     def test_tun_44(self):
425         """IPSEC tunnel all algos """
426
427         # foreach VPP crypto engine
428         engines = ["ia32", "ipsecmb", "openssl"]
429
430         # foreach crypto algorithm
431         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
432                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
433                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
434                                 IPSEC_API_INTEG_ALG_NONE),
435                   'scapy-crypto': "AES-GCM",
436                   'scapy-integ': "NULL",
437                   'key': b"JPjyOWBeVEQiMe7h",
438                   'salt': 3333},
439                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
440                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
441                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
442                                 IPSEC_API_INTEG_ALG_NONE),
443                   'scapy-crypto': "AES-GCM",
444                   'scapy-integ': "NULL",
445                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
446                   'salt': 0},
447                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
448                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
449                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
450                                 IPSEC_API_INTEG_ALG_NONE),
451                   'scapy-crypto': "AES-GCM",
452                   'scapy-integ': "NULL",
453                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
454                   'salt': 9999},
455                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
456                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
457                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
458                                 IPSEC_API_INTEG_ALG_SHA1_96),
459                   'scapy-crypto': "AES-CBC",
460                   'scapy-integ': "HMAC-SHA1-96",
461                   'salt': 0,
462                   'key': b"JPjyOWBeVEQiMe7h"},
463                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
464                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
465                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
466                                 IPSEC_API_INTEG_ALG_SHA1_96),
467                   'scapy-crypto': "AES-CBC",
468                   'scapy-integ': "HMAC-SHA1-96",
469                   'salt': 0,
470                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
471                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
472                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
473                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
474                                 IPSEC_API_INTEG_ALG_SHA1_96),
475                   'scapy-crypto': "AES-CBC",
476                   'scapy-integ': "HMAC-SHA1-96",
477                   'salt': 0,
478                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
479                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
480                                  IPSEC_API_CRYPTO_ALG_NONE),
481                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
482                                 IPSEC_API_INTEG_ALG_SHA1_96),
483                   'scapy-crypto': "NULL",
484                   'scapy-integ': "HMAC-SHA1-96",
485                   'salt': 0,
486                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
487
488         for engine in engines:
489             self.vapi.cli("set crypto handler all %s" % engine)
490
491             #
492             # loop through each of the algorithms
493             #
494             for algo in algos:
495                 # with self.subTest(algo=algo['scapy']):
496
497                 p = copy.copy(self.ipv4_params)
498                 p.auth_algo_vpp_id = algo['vpp-integ']
499                 p.crypt_algo_vpp_id = algo['vpp-crypto']
500                 p.crypt_algo = algo['scapy-crypto']
501                 p.auth_algo = algo['scapy-integ']
502                 p.crypt_key = algo['key']
503                 p.salt = algo['salt']
504
505                 self.config_network(p)
506
507                 self.verify_tun_44(p, count=127)
508                 c = p.tun_if.get_rx_stats()
509                 self.assertEqual(c['packets'], 127)
510                 c = p.tun_if.get_tx_stats()
511                 self.assertEqual(c['packets'], 127)
512
513                 #
514                 # rekey the tunnel
515                 #
516                 self.rekey(p)
517                 self.verify_tun_44(p, count=127)
518
519                 self.unconfig_network(p)
520                 p.tun_sa_out.remove_vpp_config()
521                 p.tun_sa_in.remove_vpp_config()
522
523
524 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
525     """ IPsec IPv6 Multi Tunnel interface """
526
527     encryption_type = ESP
528     tun6_encrypt_node_name = "esp6-encrypt-tun"
529     tun6_decrypt_node_name = "esp6-decrypt-tun"
530
531     def setUp(self):
532         super(TestIpsec6MultiTunIfEsp, self).setUp()
533
534         self.tun_if = self.pg0
535
536         self.multi_params = []
537         self.pg0.generate_remote_hosts(10)
538         self.pg0.configure_ipv6_neighbors()
539
540         for ii in range(10):
541             p = copy.copy(self.ipv6_params)
542
543             p.remote_tun_if_host = "1111::%d" % (ii + 1)
544             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
545             p.scapy_tun_spi = p.scapy_tun_spi + ii
546             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
547             p.vpp_tun_spi = p.vpp_tun_spi + ii
548
549             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
550             p.scapy_tra_spi = p.scapy_tra_spi + ii
551             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
552             p.vpp_tra_spi = p.vpp_tra_spi + ii
553
554             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
555                                             p.scapy_tun_spi,
556                                             p.crypt_algo_vpp_id,
557                                             p.crypt_key, p.crypt_key,
558                                             p.auth_algo_vpp_id, p.auth_key,
559                                             p.auth_key, is_ip6=True,
560                                             dst=self.pg0.remote_hosts[ii].ip6)
561             p.tun_if.add_vpp_config()
562             p.tun_if.admin_up()
563             p.tun_if.config_ip6()
564             config_tun_params(p, self.encryption_type, p.tun_if)
565             self.multi_params.append(p)
566
567             r = VppIpRoute(self, p.remote_tun_if_host, 128,
568                            [VppRoutePath(p.tun_if.remote_ip6,
569                                          0xffffffff,
570                                          proto=DpoProto.DPO_PROTO_IP6)])
571             r.add_vpp_config()
572
573     def tearDown(self):
574         super(TestIpsec6MultiTunIfEsp, self).tearDown()
575
576     def test_tun_66(self):
577         """Multiple IPSEC tunnel interfaces """
578         for p in self.multi_params:
579             self.verify_tun_66(p, count=127)
580             c = p.tun_if.get_rx_stats()
581             self.assertEqual(c['packets'], 127)
582             c = p.tun_if.get_tx_stats()
583             self.assertEqual(c['packets'], 127)
584
585
586 class TestIpsecGreTebIfEsp(TemplateIpsec,
587                            IpsecTun4Tests):
588     """ Ipsec GRE TEB ESP - TUN tests """
589     tun4_encrypt_node_name = "esp4-encrypt-tun"
590     tun4_decrypt_node_name = "esp4-decrypt-tun"
591     encryption_type = ESP
592     omac = "00:11:22:33:44:55"
593
594     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
595                          payload_size=100):
596         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
597                 sa.encrypt(IP(src=self.pg0.remote_ip4,
598                               dst=self.pg0.local_ip4) /
599                            GRE() /
600                            Ether(dst=self.omac) /
601                            IP(src="1.1.1.1", dst="1.1.1.2") /
602                            UDP(sport=1144, dport=2233) /
603                            Raw(b'X' * payload_size))
604                 for i in range(count)]
605
606     def gen_pkts(self, sw_intf, src, dst, count=1,
607                  payload_size=100):
608         return [Ether(dst=self.omac) /
609                 IP(src="1.1.1.1", dst="1.1.1.2") /
610                 UDP(sport=1144, dport=2233) /
611                 Raw(b'X' * payload_size)
612                 for i in range(count)]
613
614     def verify_decrypted(self, p, rxs):
615         for rx in rxs:
616             self.assert_equal(rx[Ether].dst, self.omac)
617             self.assert_equal(rx[IP].dst, "1.1.1.2")
618
619     def verify_encrypted(self, p, sa, rxs):
620         for rx in rxs:
621             try:
622                 pkt = sa.decrypt(rx[IP])
623                 if not pkt.haslayer(IP):
624                     pkt = IP(pkt[Raw].load)
625                 self.assert_packet_checksums_valid(pkt)
626                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
627                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
628                 self.assertTrue(pkt.haslayer(GRE))
629                 e = pkt[Ether]
630                 self.assertEqual(e[Ether].dst, self.omac)
631                 self.assertEqual(e[IP].dst, "1.1.1.2")
632             except (IndexError, AssertionError):
633                 self.logger.debug(ppp("Unexpected packet:", rx))
634                 try:
635                     self.logger.debug(ppp("Decrypted packet:", pkt))
636                 except:
637                     pass
638                 raise
639
640     def setUp(self):
641         super(TestIpsecGreTebIfEsp, self).setUp()
642
643         self.tun_if = self.pg0
644
645         p = self.ipv4_params
646
647         bd1 = VppBridgeDomain(self, 1)
648         bd1.add_vpp_config()
649
650         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
651                                   p.auth_algo_vpp_id, p.auth_key,
652                                   p.crypt_algo_vpp_id, p.crypt_key,
653                                   self.vpp_esp_protocol,
654                                   self.pg0.local_ip4,
655                                   self.pg0.remote_ip4)
656         p.tun_sa_out.add_vpp_config()
657
658         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
659                                  p.auth_algo_vpp_id, p.auth_key,
660                                  p.crypt_algo_vpp_id, p.crypt_key,
661                                  self.vpp_esp_protocol,
662                                  self.pg0.remote_ip4,
663                                  self.pg0.local_ip4)
664         p.tun_sa_in.add_vpp_config()
665
666         p.tun_if = VppGreInterface(self,
667                                    self.pg0.local_ip4,
668                                    self.pg0.remote_ip4,
669                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
670                                          GRE_API_TUNNEL_TYPE_TEB))
671         p.tun_if.add_vpp_config()
672
673         p.tun_protect = VppIpsecTunProtect(self,
674                                            p.tun_if,
675                                            p.tun_sa_out,
676                                            [p.tun_sa_in])
677
678         p.tun_protect.add_vpp_config()
679
680         p.tun_if.admin_up()
681         p.tun_if.config_ip4()
682         config_tun_params(p, self.encryption_type, p.tun_if)
683
684         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
685         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
686
687         self.vapi.cli("clear ipsec sa")
688
689     def tearDown(self):
690         p = self.ipv4_params
691         p.tun_if.unconfig_ip4()
692         super(TestIpsecGreTebIfEsp, self).tearDown()
693
694
695 class TestIpsecGreTebIfEspTra(TemplateIpsec,
696                               IpsecTun4Tests):
697     """ Ipsec GRE TEB ESP - Tra tests """
698     tun4_encrypt_node_name = "esp4-encrypt-tun"
699     tun4_decrypt_node_name = "esp4-decrypt-tun"
700     encryption_type = ESP
701     omac = "00:11:22:33:44:55"
702
703     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
704                          payload_size=100):
705         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
706                 sa.encrypt(IP(src=self.pg0.remote_ip4,
707                               dst=self.pg0.local_ip4) /
708                            GRE() /
709                            Ether(dst=self.omac) /
710                            IP(src="1.1.1.1", dst="1.1.1.2") /
711                            UDP(sport=1144, dport=2233) /
712                            Raw(b'X' * payload_size))
713                 for i in range(count)]
714
715     def gen_pkts(self, sw_intf, src, dst, count=1,
716                  payload_size=100):
717         return [Ether(dst=self.omac) /
718                 IP(src="1.1.1.1", dst="1.1.1.2") /
719                 UDP(sport=1144, dport=2233) /
720                 Raw(b'X' * payload_size)
721                 for i in range(count)]
722
723     def verify_decrypted(self, p, rxs):
724         for rx in rxs:
725             self.assert_equal(rx[Ether].dst, self.omac)
726             self.assert_equal(rx[IP].dst, "1.1.1.2")
727
728     def verify_encrypted(self, p, sa, rxs):
729         for rx in rxs:
730             try:
731                 pkt = sa.decrypt(rx[IP])
732                 if not pkt.haslayer(IP):
733                     pkt = IP(pkt[Raw].load)
734                 self.assert_packet_checksums_valid(pkt)
735                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
736                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
737                 self.assertTrue(pkt.haslayer(GRE))
738                 e = pkt[Ether]
739                 self.assertEqual(e[Ether].dst, self.omac)
740                 self.assertEqual(e[IP].dst, "1.1.1.2")
741             except (IndexError, AssertionError):
742                 self.logger.debug(ppp("Unexpected packet:", rx))
743                 try:
744                     self.logger.debug(ppp("Decrypted packet:", pkt))
745                 except:
746                     pass
747                 raise
748
749     def setUp(self):
750         super(TestIpsecGreTebIfEspTra, self).setUp()
751
752         self.tun_if = self.pg0
753
754         p = self.ipv4_params
755
756         bd1 = VppBridgeDomain(self, 1)
757         bd1.add_vpp_config()
758
759         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
760                                   p.auth_algo_vpp_id, p.auth_key,
761                                   p.crypt_algo_vpp_id, p.crypt_key,
762                                   self.vpp_esp_protocol)
763         p.tun_sa_out.add_vpp_config()
764
765         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
766                                  p.auth_algo_vpp_id, p.auth_key,
767                                  p.crypt_algo_vpp_id, p.crypt_key,
768                                  self.vpp_esp_protocol)
769         p.tun_sa_in.add_vpp_config()
770
771         p.tun_if = VppGreInterface(self,
772                                    self.pg0.local_ip4,
773                                    self.pg0.remote_ip4,
774                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
775                                          GRE_API_TUNNEL_TYPE_TEB))
776         p.tun_if.add_vpp_config()
777
778         p.tun_protect = VppIpsecTunProtect(self,
779                                            p.tun_if,
780                                            p.tun_sa_out,
781                                            [p.tun_sa_in])
782
783         p.tun_protect.add_vpp_config()
784
785         p.tun_if.admin_up()
786         p.tun_if.config_ip4()
787         config_tra_params(p, self.encryption_type, p.tun_if)
788
789         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
790         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
791
792         self.vapi.cli("clear ipsec sa")
793
794     def tearDown(self):
795         p = self.ipv4_params
796         p.tun_if.unconfig_ip4()
797         super(TestIpsecGreTebIfEspTra, self).tearDown()
798
799
800 class TestIpsecGreIfEsp(TemplateIpsec,
801                         IpsecTun4Tests):
802     """ Ipsec GRE ESP - TUN tests """
803     tun4_encrypt_node_name = "esp4-encrypt-tun"
804     tun4_decrypt_node_name = "esp4-decrypt-tun"
805     encryption_type = ESP
806
807     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
808                          payload_size=100):
809         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
810                 sa.encrypt(IP(src=self.pg0.remote_ip4,
811                               dst=self.pg0.local_ip4) /
812                            GRE() /
813                            IP(src=self.pg1.local_ip4,
814                               dst=self.pg1.remote_ip4) /
815                            UDP(sport=1144, dport=2233) /
816                            Raw(b'X' * payload_size))
817                 for i in range(count)]
818
819     def gen_pkts(self, sw_intf, src, dst, count=1,
820                  payload_size=100):
821         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
822                 IP(src="1.1.1.1", dst="1.1.1.2") /
823                 UDP(sport=1144, dport=2233) /
824                 Raw(b'X' * payload_size)
825                 for i in range(count)]
826
827     def verify_decrypted(self, p, rxs):
828         for rx in rxs:
829             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
830             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
831
832     def verify_encrypted(self, p, sa, rxs):
833         for rx in rxs:
834             try:
835                 pkt = sa.decrypt(rx[IP])
836                 if not pkt.haslayer(IP):
837                     pkt = IP(pkt[Raw].load)
838                 self.assert_packet_checksums_valid(pkt)
839                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
840                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
841                 self.assertTrue(pkt.haslayer(GRE))
842                 e = pkt[GRE]
843                 self.assertEqual(e[IP].dst, "1.1.1.2")
844             except (IndexError, AssertionError):
845                 self.logger.debug(ppp("Unexpected packet:", rx))
846                 try:
847                     self.logger.debug(ppp("Decrypted packet:", pkt))
848                 except:
849                     pass
850                 raise
851
852     def setUp(self):
853         super(TestIpsecGreIfEsp, self).setUp()
854
855         self.tun_if = self.pg0
856
857         p = self.ipv4_params
858
859         bd1 = VppBridgeDomain(self, 1)
860         bd1.add_vpp_config()
861
862         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
863                                   p.auth_algo_vpp_id, p.auth_key,
864                                   p.crypt_algo_vpp_id, p.crypt_key,
865                                   self.vpp_esp_protocol,
866                                   self.pg0.local_ip4,
867                                   self.pg0.remote_ip4)
868         p.tun_sa_out.add_vpp_config()
869
870         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
871                                  p.auth_algo_vpp_id, p.auth_key,
872                                  p.crypt_algo_vpp_id, p.crypt_key,
873                                  self.vpp_esp_protocol,
874                                  self.pg0.remote_ip4,
875                                  self.pg0.local_ip4)
876         p.tun_sa_in.add_vpp_config()
877
878         p.tun_if = VppGreInterface(self,
879                                    self.pg0.local_ip4,
880                                    self.pg0.remote_ip4)
881         p.tun_if.add_vpp_config()
882
883         p.tun_protect = VppIpsecTunProtect(self,
884                                            p.tun_if,
885                                            p.tun_sa_out,
886                                            [p.tun_sa_in])
887         p.tun_protect.add_vpp_config()
888
889         p.tun_if.admin_up()
890         p.tun_if.config_ip4()
891         config_tun_params(p, self.encryption_type, p.tun_if)
892
893         VppIpRoute(self, "1.1.1.2", 32,
894                    [VppRoutePath(p.tun_if.remote_ip4,
895                                  0xffffffff)]).add_vpp_config()
896
897     def tearDown(self):
898         p = self.ipv4_params
899         p.tun_if.unconfig_ip4()
900         super(TestIpsecGreIfEsp, self).tearDown()
901
902
903 class TestIpsecGreIfEspTra(TemplateIpsec,
904                            IpsecTun4Tests):
905     """ Ipsec GRE ESP - TRA tests """
906     tun4_encrypt_node_name = "esp4-encrypt-tun"
907     tun4_decrypt_node_name = "esp4-decrypt-tun"
908     encryption_type = ESP
909
910     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
911                          payload_size=100):
912         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
913                 sa.encrypt(IP(src=self.pg0.remote_ip4,
914                               dst=self.pg0.local_ip4) /
915                            GRE() /
916                            IP(src=self.pg1.local_ip4,
917                               dst=self.pg1.remote_ip4) /
918                            UDP(sport=1144, dport=2233) /
919                            Raw(b'X' * payload_size))
920                 for i in range(count)]
921
922     def gen_pkts(self, sw_intf, src, dst, count=1,
923                  payload_size=100):
924         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
925                 IP(src="1.1.1.1", dst="1.1.1.2") /
926                 UDP(sport=1144, dport=2233) /
927                 Raw(b'X' * payload_size)
928                 for i in range(count)]
929
930     def verify_decrypted(self, p, rxs):
931         for rx in rxs:
932             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
933             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
934
935     def verify_encrypted(self, p, sa, rxs):
936         for rx in rxs:
937             try:
938                 pkt = sa.decrypt(rx[IP])
939                 if not pkt.haslayer(IP):
940                     pkt = IP(pkt[Raw].load)
941                 self.assert_packet_checksums_valid(pkt)
942                 self.assertTrue(pkt.haslayer(GRE))
943                 e = pkt[GRE]
944                 self.assertEqual(e[IP].dst, "1.1.1.2")
945             except (IndexError, AssertionError):
946                 self.logger.debug(ppp("Unexpected packet:", rx))
947                 try:
948                     self.logger.debug(ppp("Decrypted packet:", pkt))
949                 except:
950                     pass
951                 raise
952
953     def setUp(self):
954         super(TestIpsecGreIfEspTra, self).setUp()
955
956         self.tun_if = self.pg0
957
958         p = self.ipv4_params
959
960         bd1 = VppBridgeDomain(self, 1)
961         bd1.add_vpp_config()
962
963         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
964                                   p.auth_algo_vpp_id, p.auth_key,
965                                   p.crypt_algo_vpp_id, p.crypt_key,
966                                   self.vpp_esp_protocol)
967         p.tun_sa_out.add_vpp_config()
968
969         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
970                                  p.auth_algo_vpp_id, p.auth_key,
971                                  p.crypt_algo_vpp_id, p.crypt_key,
972                                  self.vpp_esp_protocol)
973         p.tun_sa_in.add_vpp_config()
974
975         p.tun_if = VppGreInterface(self,
976                                    self.pg0.local_ip4,
977                                    self.pg0.remote_ip4)
978         p.tun_if.add_vpp_config()
979
980         p.tun_protect = VppIpsecTunProtect(self,
981                                            p.tun_if,
982                                            p.tun_sa_out,
983                                            [p.tun_sa_in])
984         p.tun_protect.add_vpp_config()
985
986         p.tun_if.admin_up()
987         p.tun_if.config_ip4()
988         config_tra_params(p, self.encryption_type, p.tun_if)
989
990         VppIpRoute(self, "1.1.1.2", 32,
991                    [VppRoutePath(p.tun_if.remote_ip4,
992                                  0xffffffff)]).add_vpp_config()
993
994     def tearDown(self):
995         p = self.ipv4_params
996         p.tun_if.unconfig_ip4()
997         super(TestIpsecGreIfEspTra, self).tearDown()
998
999
1000 class TemplateIpsec4TunProtect(object):
1001     """ IPsec IPv4 Tunnel protect """
1002
1003     encryption_type = ESP
1004     tun4_encrypt_node_name = "esp4-encrypt-tun"
1005     tun4_decrypt_node_name = "esp4-decrypt-tun"
1006     tun4_input_node = "ipsec4-tun-input"
1007
1008     def config_sa_tra(self, p):
1009         config_tun_params(p, self.encryption_type, p.tun_if)
1010
1011         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1012                                   p.auth_algo_vpp_id, p.auth_key,
1013                                   p.crypt_algo_vpp_id, p.crypt_key,
1014                                   self.vpp_esp_protocol,
1015                                   flags=p.flags)
1016         p.tun_sa_out.add_vpp_config()
1017
1018         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1019                                  p.auth_algo_vpp_id, p.auth_key,
1020                                  p.crypt_algo_vpp_id, p.crypt_key,
1021                                  self.vpp_esp_protocol,
1022                                  flags=p.flags)
1023         p.tun_sa_in.add_vpp_config()
1024
1025     def config_sa_tun(self, p):
1026         config_tun_params(p, self.encryption_type, p.tun_if)
1027
1028         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1029                                   p.auth_algo_vpp_id, p.auth_key,
1030                                   p.crypt_algo_vpp_id, p.crypt_key,
1031                                   self.vpp_esp_protocol,
1032                                   self.tun_if.remote_addr[p.addr_type],
1033                                   self.tun_if.local_addr[p.addr_type],
1034                                   flags=p.flags)
1035         p.tun_sa_out.add_vpp_config()
1036
1037         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1038                                  p.auth_algo_vpp_id, p.auth_key,
1039                                  p.crypt_algo_vpp_id, p.crypt_key,
1040                                  self.vpp_esp_protocol,
1041                                  self.tun_if.remote_addr[p.addr_type],
1042                                  self.tun_if.local_addr[p.addr_type],
1043                                  flags=p.flags)
1044         p.tun_sa_in.add_vpp_config()
1045
1046     def config_protect(self, p):
1047         p.tun_protect = VppIpsecTunProtect(self,
1048                                            p.tun_if,
1049                                            p.tun_sa_out,
1050                                            [p.tun_sa_in])
1051         p.tun_protect.add_vpp_config()
1052
1053     def config_network(self, p):
1054         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1055                                        self.pg0.local_ip4,
1056                                        self.pg0.remote_ip4)
1057         p.tun_if.add_vpp_config()
1058         p.tun_if.admin_up()
1059         p.tun_if.config_ip4()
1060         p.tun_if.config_ip6()
1061
1062         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1063                              [VppRoutePath(p.tun_if.remote_ip4,
1064                                            0xffffffff)])
1065         p.route.add_vpp_config()
1066         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1067                        [VppRoutePath(p.tun_if.remote_ip6,
1068                                      0xffffffff,
1069                                      proto=DpoProto.DPO_PROTO_IP6)])
1070         r.add_vpp_config()
1071
1072     def unconfig_network(self, p):
1073         p.route.remove_vpp_config()
1074         p.tun_if.remove_vpp_config()
1075
1076     def unconfig_protect(self, p):
1077         p.tun_protect.remove_vpp_config()
1078
1079     def unconfig_sa(self, p):
1080         p.tun_sa_out.remove_vpp_config()
1081         p.tun_sa_in.remove_vpp_config()
1082
1083
1084 class TestIpsec4TunProtect(TemplateIpsec,
1085                            TemplateIpsec4TunProtect,
1086                            IpsecTun4):
1087     """ IPsec IPv4 Tunnel protect - transport mode"""
1088
1089     def setUp(self):
1090         super(TestIpsec4TunProtect, self).setUp()
1091
1092         self.tun_if = self.pg0
1093
1094     def tearDown(self):
1095         super(TestIpsec4TunProtect, self).tearDown()
1096
1097     def test_tun_44(self):
1098         """IPSEC tunnel protect"""
1099
1100         p = self.ipv4_params
1101
1102         self.config_network(p)
1103         self.config_sa_tra(p)
1104         self.config_protect(p)
1105
1106         self.verify_tun_44(p, count=127)
1107         c = p.tun_if.get_rx_stats()
1108         self.assertEqual(c['packets'], 127)
1109         c = p.tun_if.get_tx_stats()
1110         self.assertEqual(c['packets'], 127)
1111
1112         self.vapi.cli("clear ipsec sa")
1113         self.verify_tun_64(p, count=127)
1114         c = p.tun_if.get_rx_stats()
1115         self.assertEqual(c['packets'], 254)
1116         c = p.tun_if.get_tx_stats()
1117         self.assertEqual(c['packets'], 254)
1118
1119         # rekey - create new SAs and update the tunnel protection
1120         np = copy.copy(p)
1121         np.crypt_key = b'X' + p.crypt_key[1:]
1122         np.scapy_tun_spi += 100
1123         np.scapy_tun_sa_id += 1
1124         np.vpp_tun_spi += 100
1125         np.vpp_tun_sa_id += 1
1126         np.tun_if.local_spi = p.vpp_tun_spi
1127         np.tun_if.remote_spi = p.scapy_tun_spi
1128
1129         self.config_sa_tra(np)
1130         self.config_protect(np)
1131         self.unconfig_sa(p)
1132
1133         self.verify_tun_44(np, count=127)
1134         c = p.tun_if.get_rx_stats()
1135         self.assertEqual(c['packets'], 381)
1136         c = p.tun_if.get_tx_stats()
1137         self.assertEqual(c['packets'], 381)
1138
1139         # teardown
1140         self.unconfig_protect(np)
1141         self.unconfig_sa(np)
1142         self.unconfig_network(p)
1143
1144
1145 class TestIpsec4TunProtectUdp(TemplateIpsec,
1146                               TemplateIpsec4TunProtect,
1147                               IpsecTun4):
1148     """ IPsec IPv4 Tunnel protect - transport mode"""
1149
1150     def setUp(self):
1151         super(TestIpsec4TunProtectUdp, self).setUp()
1152
1153         self.tun_if = self.pg0
1154
1155         p = self.ipv4_params
1156         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1157                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1158         p.nat_header = UDP(sport=5454, dport=4500)
1159         self.config_network(p)
1160         self.config_sa_tra(p)
1161         self.config_protect(p)
1162
1163     def tearDown(self):
1164         p = self.ipv4_params
1165         self.unconfig_protect(p)
1166         self.unconfig_sa(p)
1167         self.unconfig_network(p)
1168         super(TestIpsec4TunProtectUdp, self).tearDown()
1169
1170     def test_tun_44(self):
1171         """IPSEC UDP tunnel protect"""
1172
1173         p = self.ipv4_params
1174
1175         self.verify_tun_44(p, count=127)
1176         c = p.tun_if.get_rx_stats()
1177         self.assertEqual(c['packets'], 127)
1178         c = p.tun_if.get_tx_stats()
1179         self.assertEqual(c['packets'], 127)
1180
1181     def test_keepalive(self):
1182         """ IPSEC NAT Keepalive """
1183         self.verify_keepalive(self.ipv4_params)
1184
1185
1186 class TestIpsec4TunProtectTun(TemplateIpsec,
1187                               TemplateIpsec4TunProtect,
1188                               IpsecTun4):
1189     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1190
1191     encryption_type = ESP
1192     tun4_encrypt_node_name = "esp4-encrypt-tun"
1193     tun4_decrypt_node_name = "esp4-decrypt-tun"
1194
1195     def setUp(self):
1196         super(TestIpsec4TunProtectTun, self).setUp()
1197
1198         self.tun_if = self.pg0
1199
1200     def tearDown(self):
1201         super(TestIpsec4TunProtectTun, self).tearDown()
1202
1203     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1204                          payload_size=100):
1205         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1206                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1207                               dst=sw_intf.local_ip4) /
1208                            IP(src=src, dst=dst) /
1209                            UDP(sport=1144, dport=2233) /
1210                            Raw(b'X' * payload_size))
1211                 for i in range(count)]
1212
1213     def gen_pkts(self, sw_intf, src, dst, count=1,
1214                  payload_size=100):
1215         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1216                 IP(src=src, dst=dst) /
1217                 UDP(sport=1144, dport=2233) /
1218                 Raw(b'X' * payload_size)
1219                 for i in range(count)]
1220
1221     def verify_decrypted(self, p, rxs):
1222         for rx in rxs:
1223             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1224             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1225             self.assert_packet_checksums_valid(rx)
1226
1227     def verify_encrypted(self, p, sa, rxs):
1228         for rx in rxs:
1229             try:
1230                 pkt = sa.decrypt(rx[IP])
1231                 if not pkt.haslayer(IP):
1232                     pkt = IP(pkt[Raw].load)
1233                 self.assert_packet_checksums_valid(pkt)
1234                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1235                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1236                 inner = pkt[IP].payload
1237                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1238
1239             except (IndexError, AssertionError):
1240                 self.logger.debug(ppp("Unexpected packet:", rx))
1241                 try:
1242                     self.logger.debug(ppp("Decrypted packet:", pkt))
1243                 except:
1244                     pass
1245                 raise
1246
1247     def test_tun_44(self):
1248         """IPSEC tunnel protect """
1249
1250         p = self.ipv4_params
1251
1252         self.config_network(p)
1253         self.config_sa_tun(p)
1254         self.config_protect(p)
1255
1256         self.verify_tun_44(p, count=127)
1257
1258         c = p.tun_if.get_rx_stats()
1259         self.assertEqual(c['packets'], 127)
1260         c = p.tun_if.get_tx_stats()
1261         self.assertEqual(c['packets'], 127)
1262
1263         # rekey - create new SAs and update the tunnel protection
1264         np = copy.copy(p)
1265         np.crypt_key = b'X' + p.crypt_key[1:]
1266         np.scapy_tun_spi += 100
1267         np.scapy_tun_sa_id += 1
1268         np.vpp_tun_spi += 100
1269         np.vpp_tun_sa_id += 1
1270         np.tun_if.local_spi = p.vpp_tun_spi
1271         np.tun_if.remote_spi = p.scapy_tun_spi
1272
1273         self.config_sa_tun(np)
1274         self.config_protect(np)
1275         self.unconfig_sa(p)
1276
1277         self.verify_tun_44(np, count=127)
1278         c = p.tun_if.get_rx_stats()
1279         self.assertEqual(c['packets'], 254)
1280         c = p.tun_if.get_tx_stats()
1281         self.assertEqual(c['packets'], 254)
1282
1283         # teardown
1284         self.unconfig_protect(np)
1285         self.unconfig_sa(np)
1286         self.unconfig_network(p)
1287
1288
1289 class TemplateIpsec6TunProtect(object):
1290     """ IPsec IPv6 Tunnel protect """
1291
1292     def config_sa_tra(self, p):
1293         config_tun_params(p, self.encryption_type, p.tun_if)
1294
1295         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1296                                   p.auth_algo_vpp_id, p.auth_key,
1297                                   p.crypt_algo_vpp_id, p.crypt_key,
1298                                   self.vpp_esp_protocol)
1299         p.tun_sa_out.add_vpp_config()
1300
1301         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1302                                  p.auth_algo_vpp_id, p.auth_key,
1303                                  p.crypt_algo_vpp_id, p.crypt_key,
1304                                  self.vpp_esp_protocol)
1305         p.tun_sa_in.add_vpp_config()
1306
1307     def config_sa_tun(self, p):
1308         config_tun_params(p, self.encryption_type, p.tun_if)
1309
1310         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1311                                   p.auth_algo_vpp_id, p.auth_key,
1312                                   p.crypt_algo_vpp_id, p.crypt_key,
1313                                   self.vpp_esp_protocol,
1314                                   self.tun_if.remote_addr[p.addr_type],
1315                                   self.tun_if.local_addr[p.addr_type])
1316         p.tun_sa_out.add_vpp_config()
1317
1318         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1319                                  p.auth_algo_vpp_id, p.auth_key,
1320                                  p.crypt_algo_vpp_id, p.crypt_key,
1321                                  self.vpp_esp_protocol,
1322                                  self.tun_if.remote_addr[p.addr_type],
1323                                  self.tun_if.local_addr[p.addr_type])
1324         p.tun_sa_in.add_vpp_config()
1325
1326     def config_protect(self, p):
1327         p.tun_protect = VppIpsecTunProtect(self,
1328                                            p.tun_if,
1329                                            p.tun_sa_out,
1330                                            [p.tun_sa_in])
1331         p.tun_protect.add_vpp_config()
1332
1333     def config_network(self, p):
1334         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1335                                        self.pg0.local_ip6,
1336                                        self.pg0.remote_ip6)
1337         p.tun_if.add_vpp_config()
1338         p.tun_if.admin_up()
1339         p.tun_if.config_ip6()
1340         p.tun_if.config_ip4()
1341
1342         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1343                              [VppRoutePath(p.tun_if.remote_ip6,
1344                                            0xffffffff,
1345                                            proto=DpoProto.DPO_PROTO_IP6)])
1346         p.route.add_vpp_config()
1347         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1348                        [VppRoutePath(p.tun_if.remote_ip4,
1349                                      0xffffffff)])
1350         r.add_vpp_config()
1351
1352     def unconfig_network(self, p):
1353         p.route.remove_vpp_config()
1354         p.tun_if.remove_vpp_config()
1355
1356     def unconfig_protect(self, p):
1357         p.tun_protect.remove_vpp_config()
1358
1359     def unconfig_sa(self, p):
1360         p.tun_sa_out.remove_vpp_config()
1361         p.tun_sa_in.remove_vpp_config()
1362
1363
1364 class TestIpsec6TunProtect(TemplateIpsec,
1365                            TemplateIpsec6TunProtect,
1366                            IpsecTun6):
1367     """ IPsec IPv6 Tunnel protect - transport mode"""
1368
1369     encryption_type = ESP
1370     tun6_encrypt_node_name = "esp6-encrypt-tun"
1371     tun6_decrypt_node_name = "esp6-decrypt-tun"
1372
1373     def setUp(self):
1374         super(TestIpsec6TunProtect, self).setUp()
1375
1376         self.tun_if = self.pg0
1377
1378     def tearDown(self):
1379         super(TestIpsec6TunProtect, self).tearDown()
1380
1381     def test_tun_66(self):
1382         """IPSEC tunnel protect"""
1383
1384         p = self.ipv6_params
1385
1386         self.config_network(p)
1387         self.config_sa_tra(p)
1388         self.config_protect(p)
1389
1390         self.verify_tun_66(p, count=127)
1391         c = p.tun_if.get_rx_stats()
1392         self.assertEqual(c['packets'], 127)
1393         c = p.tun_if.get_tx_stats()
1394         self.assertEqual(c['packets'], 127)
1395
1396         # rekey - create new SAs and update the tunnel protection
1397         np = copy.copy(p)
1398         np.crypt_key = b'X' + p.crypt_key[1:]
1399         np.scapy_tun_spi += 100
1400         np.scapy_tun_sa_id += 1
1401         np.vpp_tun_spi += 100
1402         np.vpp_tun_sa_id += 1
1403         np.tun_if.local_spi = p.vpp_tun_spi
1404         np.tun_if.remote_spi = p.scapy_tun_spi
1405
1406         self.config_sa_tra(np)
1407         self.config_protect(np)
1408         self.unconfig_sa(p)
1409
1410         self.verify_tun_66(np, count=127)
1411         c = p.tun_if.get_rx_stats()
1412         self.assertEqual(c['packets'], 254)
1413         c = p.tun_if.get_tx_stats()
1414         self.assertEqual(c['packets'], 254)
1415
1416         # 3 phase rekey
1417         #  1) add two input SAs [old, new]
1418         #  2) swap output SA to [new]
1419         #  3) use only [new] input SA
1420         np3 = copy.copy(np)
1421         np3.crypt_key = b'Z' + p.crypt_key[1:]
1422         np3.scapy_tun_spi += 100
1423         np3.scapy_tun_sa_id += 1
1424         np3.vpp_tun_spi += 100
1425         np3.vpp_tun_sa_id += 1
1426         np3.tun_if.local_spi = p.vpp_tun_spi
1427         np3.tun_if.remote_spi = p.scapy_tun_spi
1428
1429         self.config_sa_tra(np3)
1430
1431         # step 1;
1432         p.tun_protect.update_vpp_config(np.tun_sa_out,
1433                                         [np.tun_sa_in, np3.tun_sa_in])
1434         self.verify_tun_66(np, np, count=127)
1435         self.verify_tun_66(np3, np, count=127)
1436
1437         # step 2;
1438         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1439                                         [np.tun_sa_in, np3.tun_sa_in])
1440         self.verify_tun_66(np, np3, count=127)
1441         self.verify_tun_66(np3, np3, count=127)
1442
1443         # step 1;
1444         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1445                                         [np3.tun_sa_in])
1446         self.verify_tun_66(np3, np3, count=127)
1447         self.verify_drop_tun_66(np, count=127)
1448
1449         c = p.tun_if.get_rx_stats()
1450         self.assertEqual(c['packets'], 127*7)
1451         c = p.tun_if.get_tx_stats()
1452         self.assertEqual(c['packets'], 127*7)
1453         self.unconfig_sa(np)
1454
1455         # teardown
1456         self.unconfig_protect(np3)
1457         self.unconfig_sa(np3)
1458         self.unconfig_network(p)
1459
1460     def test_tun_46(self):
1461         """IPSEC tunnel protect"""
1462
1463         p = self.ipv6_params
1464
1465         self.config_network(p)
1466         self.config_sa_tra(p)
1467         self.config_protect(p)
1468
1469         self.verify_tun_46(p, count=127)
1470         c = p.tun_if.get_rx_stats()
1471         self.assertEqual(c['packets'], 127)
1472         c = p.tun_if.get_tx_stats()
1473         self.assertEqual(c['packets'], 127)
1474
1475         # teardown
1476         self.unconfig_protect(p)
1477         self.unconfig_sa(p)
1478         self.unconfig_network(p)
1479
1480
1481 class TestIpsec6TunProtectTun(TemplateIpsec,
1482                               TemplateIpsec6TunProtect,
1483                               IpsecTun6):
1484     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1485
1486     encryption_type = ESP
1487     tun6_encrypt_node_name = "esp6-encrypt-tun"
1488     tun6_decrypt_node_name = "esp6-decrypt-tun"
1489
1490     def setUp(self):
1491         super(TestIpsec6TunProtectTun, self).setUp()
1492
1493         self.tun_if = self.pg0
1494
1495     def tearDown(self):
1496         super(TestIpsec6TunProtectTun, self).tearDown()
1497
1498     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1499                           payload_size=100):
1500         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1501                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1502                                 dst=sw_intf.local_ip6) /
1503                            IPv6(src=src, dst=dst) /
1504                            UDP(sport=1166, dport=2233) /
1505                            Raw(b'X' * payload_size))
1506                 for i in range(count)]
1507
1508     def gen_pkts6(self, sw_intf, src, dst, count=1,
1509                   payload_size=100):
1510         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1511                 IPv6(src=src, dst=dst) /
1512                 UDP(sport=1166, dport=2233) /
1513                 Raw(b'X' * payload_size)
1514                 for i in range(count)]
1515
1516     def verify_decrypted6(self, p, rxs):
1517         for rx in rxs:
1518             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1519             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1520             self.assert_packet_checksums_valid(rx)
1521
1522     def verify_encrypted6(self, p, sa, rxs):
1523         for rx in rxs:
1524             try:
1525                 pkt = sa.decrypt(rx[IPv6])
1526                 if not pkt.haslayer(IPv6):
1527                     pkt = IPv6(pkt[Raw].load)
1528                 self.assert_packet_checksums_valid(pkt)
1529                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1530                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1531                 inner = pkt[IPv6].payload
1532                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1533
1534             except (IndexError, AssertionError):
1535                 self.logger.debug(ppp("Unexpected packet:", rx))
1536                 try:
1537                     self.logger.debug(ppp("Decrypted packet:", pkt))
1538                 except:
1539                     pass
1540                 raise
1541
1542     def test_tun_66(self):
1543         """IPSEC tunnel protect """
1544
1545         p = self.ipv6_params
1546
1547         self.config_network(p)
1548         self.config_sa_tun(p)
1549         self.config_protect(p)
1550
1551         self.verify_tun_66(p, count=127)
1552
1553         c = p.tun_if.get_rx_stats()
1554         self.assertEqual(c['packets'], 127)
1555         c = p.tun_if.get_tx_stats()
1556         self.assertEqual(c['packets'], 127)
1557
1558         # rekey - create new SAs and update the tunnel protection
1559         np = copy.copy(p)
1560         np.crypt_key = b'X' + p.crypt_key[1:]
1561         np.scapy_tun_spi += 100
1562         np.scapy_tun_sa_id += 1
1563         np.vpp_tun_spi += 100
1564         np.vpp_tun_sa_id += 1
1565         np.tun_if.local_spi = p.vpp_tun_spi
1566         np.tun_if.remote_spi = p.scapy_tun_spi
1567
1568         self.config_sa_tun(np)
1569         self.config_protect(np)
1570         self.unconfig_sa(p)
1571
1572         self.verify_tun_66(np, count=127)
1573         c = p.tun_if.get_rx_stats()
1574         self.assertEqual(c['packets'], 254)
1575         c = p.tun_if.get_tx_stats()
1576         self.assertEqual(c['packets'], 254)
1577
1578         # teardown
1579         self.unconfig_protect(np)
1580         self.unconfig_sa(np)
1581         self.unconfig_network(p)
1582
1583
1584 if __name__ == '__main__':
1585     unittest.main(testRunner=VppTestRunner)