tests: changes for scapy 2.4.3 migration
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key
13 from vpp_ipsec_tun_interface import VppIpsecTunInterface
14 from vpp_gre_interface import VppGreInterface
15 from vpp_ipip_tun_interface import VppIpIpTunInterface
16 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
17 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
18 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from util import ppp
20 from vpp_papi import VppEnum
21
22
23 def config_tun_params(p, encryption_type, tun_if):
24     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
25     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
26                              IPSEC_API_SAD_FLAG_USE_ESN))
27     crypt_key = mk_scapy_crypt_key(p)
28     p.scapy_tun_sa = SecurityAssociation(
29         encryption_type, spi=p.vpp_tun_spi,
30         crypt_algo=p.crypt_algo,
31         crypt_key=crypt_key,
32         auth_algo=p.auth_algo, auth_key=p.auth_key,
33         tunnel_header=ip_class_by_addr_type[p.addr_type](
34             src=tun_if.remote_ip,
35             dst=tun_if.local_ip),
36         nat_t_header=p.nat_header,
37         esn_en=esn_en)
38     p.vpp_tun_sa = SecurityAssociation(
39         encryption_type, spi=p.scapy_tun_spi,
40         crypt_algo=p.crypt_algo,
41         crypt_key=crypt_key,
42         auth_algo=p.auth_algo, auth_key=p.auth_key,
43         tunnel_header=ip_class_by_addr_type[p.addr_type](
44             dst=tun_if.remote_ip,
45             src=tun_if.local_ip),
46         nat_t_header=p.nat_header,
47         esn_en=esn_en)
48
49
50 class TemplateIpsec4TunIfEsp(TemplateIpsec):
51     """ IPsec tunnel interface tests """
52
53     encryption_type = ESP
54
55     @classmethod
56     def setUpClass(cls):
57         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
58
59     @classmethod
60     def tearDownClass(cls):
61         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
62
63     def setUp(self):
64         super(TemplateIpsec4TunIfEsp, self).setUp()
65
66         self.tun_if = self.pg0
67
68         p = self.ipv4_params
69
70         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
71                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
72                                         p.crypt_key, p.crypt_key,
73                                         p.auth_algo_vpp_id, p.auth_key,
74                                         p.auth_key)
75         p.tun_if.add_vpp_config()
76         p.tun_if.admin_up()
77         p.tun_if.config_ip4()
78         p.tun_if.config_ip6()
79         config_tun_params(p, self.encryption_type, p.tun_if)
80
81         r = VppIpRoute(self, p.remote_tun_if_host, 32,
82                        [VppRoutePath(p.tun_if.remote_ip4,
83                                      0xffffffff)])
84         r.add_vpp_config()
85         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
86                        [VppRoutePath(p.tun_if.remote_ip6,
87                                      0xffffffff,
88                                      proto=DpoProto.DPO_PROTO_IP6)])
89         r.add_vpp_config()
90
91     def tearDown(self):
92         super(TemplateIpsec4TunIfEsp, self).tearDown()
93
94
95 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
96     """ IPsec UDP tunnel interface tests """
97
98     tun4_encrypt_node_name = "esp4-encrypt-tun"
99     tun4_decrypt_node_name = "esp4-decrypt-tun"
100     encryption_type = ESP
101
102     @classmethod
103     def setUpClass(cls):
104         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
105
106     @classmethod
107     def tearDownClass(cls):
108         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
109
110     def setUp(self):
111         super(TemplateIpsec4TunIfEspUdp, self).setUp()
112
113         self.tun_if = self.pg0
114
115         p = self.ipv4_params
116         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
117                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
118         p.nat_header = UDP(sport=5454, dport=4500)
119
120         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
121                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
122                                         p.crypt_key, p.crypt_key,
123                                         p.auth_algo_vpp_id, p.auth_key,
124                                         p.auth_key, udp_encap=True)
125         p.tun_if.add_vpp_config()
126         p.tun_if.admin_up()
127         p.tun_if.config_ip4()
128         p.tun_if.config_ip6()
129         config_tun_params(p, self.encryption_type, p.tun_if)
130
131         r = VppIpRoute(self, p.remote_tun_if_host, 32,
132                        [VppRoutePath(p.tun_if.remote_ip4,
133                                      0xffffffff)])
134         r.add_vpp_config()
135         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
136                        [VppRoutePath(p.tun_if.remote_ip6,
137                                      0xffffffff,
138                                      proto=DpoProto.DPO_PROTO_IP6)])
139         r.add_vpp_config()
140
141     def tearDown(self):
142         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
143
144
145 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
146     """ Ipsec ESP - TUN tests """
147     tun4_encrypt_node_name = "esp4-encrypt-tun"
148     tun4_decrypt_node_name = "esp4-decrypt-tun"
149
150     def test_tun_basic64(self):
151         """ ipsec 6o4 tunnel basic test """
152         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
153
154         self.verify_tun_64(self.params[socket.AF_INET], count=1)
155
156     def test_tun_burst64(self):
157         """ ipsec 6o4 tunnel basic test """
158         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
159
160         self.verify_tun_64(self.params[socket.AF_INET], count=257)
161
162     def test_tun_basic_frag44(self):
163         """ ipsec 4o4 tunnel frag basic test """
164         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
165
166         p = self.ipv4_params
167
168         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
169                                        [1500, 0, 0, 0])
170         self.verify_tun_44(self.params[socket.AF_INET],
171                            count=1, payload_size=1800, n_rx=2)
172         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
173                                        [9000, 0, 0, 0])
174
175
176 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
177     """ Ipsec ESP UDP tests """
178
179     tun4_input_node = "ipsec4-tun-input"
180
181     def test_keepalive(self):
182         """ IPSEC NAT Keepalive """
183         self.verify_keepalive(self.ipv4_params)
184
185
186 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
187     """ Ipsec ESP - TCP tests """
188     pass
189
190
191 class TemplateIpsec6TunIfEsp(TemplateIpsec):
192     """ IPsec tunnel interface tests """
193
194     encryption_type = ESP
195
196     def setUp(self):
197         super(TemplateIpsec6TunIfEsp, self).setUp()
198
199         self.tun_if = self.pg0
200
201         p = self.ipv6_params
202         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
203                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
204                                         p.crypt_key, p.crypt_key,
205                                         p.auth_algo_vpp_id, p.auth_key,
206                                         p.auth_key, is_ip6=True)
207         p.tun_if.add_vpp_config()
208         p.tun_if.admin_up()
209         p.tun_if.config_ip6()
210         p.tun_if.config_ip4()
211         config_tun_params(p, self.encryption_type, p.tun_if)
212
213         r = VppIpRoute(self, p.remote_tun_if_host, 128,
214                        [VppRoutePath(p.tun_if.remote_ip6,
215                                      0xffffffff,
216                                      proto=DpoProto.DPO_PROTO_IP6)])
217         r.add_vpp_config()
218         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
219                        [VppRoutePath(p.tun_if.remote_ip4,
220                                      0xffffffff)])
221         r.add_vpp_config()
222
223     def tearDown(self):
224         super(TemplateIpsec6TunIfEsp, self).tearDown()
225
226
227 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
228     """ Ipsec ESP - TUN tests """
229     tun6_encrypt_node_name = "esp6-encrypt-tun"
230     tun6_decrypt_node_name = "esp6-decrypt-tun"
231
232     def test_tun_basic46(self):
233         """ ipsec 4o6 tunnel basic test """
234         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
235         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
236
237     def test_tun_burst46(self):
238         """ ipsec 4o6 tunnel burst test """
239         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
240         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
241
242
243 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
244     """ IPsec IPv4 Multi Tunnel interface """
245
246     encryption_type = ESP
247     tun4_encrypt_node_name = "esp4-encrypt-tun"
248     tun4_decrypt_node_name = "esp4-decrypt-tun"
249
250     def setUp(self):
251         super(TestIpsec4MultiTunIfEsp, self).setUp()
252
253         self.tun_if = self.pg0
254
255         self.multi_params = []
256         self.pg0.generate_remote_hosts(10)
257         self.pg0.configure_ipv4_neighbors()
258
259         for ii in range(10):
260             p = copy.copy(self.ipv4_params)
261
262             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
263             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
264             p.scapy_tun_spi = p.scapy_tun_spi + ii
265             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
266             p.vpp_tun_spi = p.vpp_tun_spi + ii
267
268             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
269             p.scapy_tra_spi = p.scapy_tra_spi + ii
270             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
271             p.vpp_tra_spi = p.vpp_tra_spi + ii
272
273             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
274                                             p.scapy_tun_spi,
275                                             p.crypt_algo_vpp_id,
276                                             p.crypt_key, p.crypt_key,
277                                             p.auth_algo_vpp_id, p.auth_key,
278                                             p.auth_key,
279                                             dst=self.pg0.remote_hosts[ii].ip4)
280             p.tun_if.add_vpp_config()
281             p.tun_if.admin_up()
282             p.tun_if.config_ip4()
283             config_tun_params(p, self.encryption_type, p.tun_if)
284             self.multi_params.append(p)
285
286             VppIpRoute(self, p.remote_tun_if_host, 32,
287                        [VppRoutePath(p.tun_if.remote_ip4,
288                                      0xffffffff)]).add_vpp_config()
289
290     def tearDown(self):
291         super(TestIpsec4MultiTunIfEsp, self).tearDown()
292
293     def test_tun_44(self):
294         """Multiple IPSEC tunnel interfaces """
295         for p in self.multi_params:
296             self.verify_tun_44(p, count=127)
297             c = p.tun_if.get_rx_stats()
298             self.assertEqual(c['packets'], 127)
299             c = p.tun_if.get_tx_stats()
300             self.assertEqual(c['packets'], 127)
301
302
303 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
304     """ IPsec IPv4 Tunnel interface all Algos """
305
306     encryption_type = ESP
307     tun4_encrypt_node_name = "esp4-encrypt-tun"
308     tun4_decrypt_node_name = "esp4-decrypt-tun"
309
310     def config_network(self, p):
311
312         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
313                                         p.scapy_tun_spi,
314                                         p.crypt_algo_vpp_id,
315                                         p.crypt_key, p.crypt_key,
316                                         p.auth_algo_vpp_id, p.auth_key,
317                                         p.auth_key,
318                                         salt=p.salt)
319         p.tun_if.add_vpp_config()
320         p.tun_if.admin_up()
321         p.tun_if.config_ip4()
322         config_tun_params(p, self.encryption_type, p.tun_if)
323         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
324         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
325
326         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
327                              [VppRoutePath(p.tun_if.remote_ip4,
328                                            0xffffffff)])
329         p.route.add_vpp_config()
330
331     def unconfig_network(self, p):
332         p.tun_if.unconfig_ip4()
333         p.tun_if.remove_vpp_config()
334         p.route.remove_vpp_config()
335
336     def setUp(self):
337         super(TestIpsec4TunIfEspAll, self).setUp()
338
339         self.tun_if = self.pg0
340
341     def tearDown(self):
342         super(TestIpsec4TunIfEspAll, self).tearDown()
343
344     def rekey(self, p):
345         #
346         # change the key and the SPI
347         #
348         p.crypt_key = b'X' + p.crypt_key[1:]
349         p.scapy_tun_spi += 1
350         p.scapy_tun_sa_id += 1
351         p.vpp_tun_spi += 1
352         p.vpp_tun_sa_id += 1
353         p.tun_if.local_spi = p.vpp_tun_spi
354         p.tun_if.remote_spi = p.scapy_tun_spi
355
356         config_tun_params(p, self.encryption_type, p.tun_if)
357
358         p.tun_sa_in = VppIpsecSA(self,
359                                  p.scapy_tun_sa_id,
360                                  p.scapy_tun_spi,
361                                  p.auth_algo_vpp_id,
362                                  p.auth_key,
363                                  p.crypt_algo_vpp_id,
364                                  p.crypt_key,
365                                  self.vpp_esp_protocol,
366                                  flags=p.flags,
367                                  salt=p.salt)
368         p.tun_sa_out = VppIpsecSA(self,
369                                   p.vpp_tun_sa_id,
370                                   p.vpp_tun_spi,
371                                   p.auth_algo_vpp_id,
372                                   p.auth_key,
373                                   p.crypt_algo_vpp_id,
374                                   p.crypt_key,
375                                   self.vpp_esp_protocol,
376                                   flags=p.flags,
377                                   salt=p.salt)
378         p.tun_sa_in.add_vpp_config()
379         p.tun_sa_out.add_vpp_config()
380
381         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
382                                          sa_id=p.tun_sa_in.id,
383                                          is_outbound=1)
384         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
385                                          sa_id=p.tun_sa_out.id,
386                                          is_outbound=0)
387         self.logger.info(self.vapi.cli("sh ipsec sa"))
388
389     def test_tun_44(self):
390         """IPSEC tunnel all algos """
391
392         # foreach VPP crypto engine
393         engines = ["ia32", "ipsecmb", "openssl"]
394
395         # foreach crypto algorithm
396         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
397                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
398                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
399                                 IPSEC_API_INTEG_ALG_NONE),
400                   'scapy-crypto': "AES-GCM",
401                   'scapy-integ': "NULL",
402                   'key': b"JPjyOWBeVEQiMe7h",
403                   'salt': 3333},
404                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
405                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
406                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
407                                 IPSEC_API_INTEG_ALG_NONE),
408                   'scapy-crypto': "AES-GCM",
409                   'scapy-integ': "NULL",
410                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
411                   'salt': 0},
412                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
413                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
414                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
415                                 IPSEC_API_INTEG_ALG_NONE),
416                   'scapy-crypto': "AES-GCM",
417                   'scapy-integ': "NULL",
418                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
419                   'salt': 9999},
420                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
421                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
422                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
423                                 IPSEC_API_INTEG_ALG_SHA1_96),
424                   'scapy-crypto': "AES-CBC",
425                   'scapy-integ': "HMAC-SHA1-96",
426                   'salt': 0,
427                   'key': b"JPjyOWBeVEQiMe7h"},
428                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
429                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
430                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
431                                 IPSEC_API_INTEG_ALG_SHA1_96),
432                   'scapy-crypto': "AES-CBC",
433                   'scapy-integ': "HMAC-SHA1-96",
434                   'salt': 0,
435                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
436                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
437                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
438                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
439                                 IPSEC_API_INTEG_ALG_SHA1_96),
440                   'scapy-crypto': "AES-CBC",
441                   'scapy-integ': "HMAC-SHA1-96",
442                   'salt': 0,
443                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
444                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
445                                  IPSEC_API_CRYPTO_ALG_NONE),
446                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
447                                 IPSEC_API_INTEG_ALG_SHA1_96),
448                   'scapy-crypto': "NULL",
449                   'scapy-integ': "HMAC-SHA1-96",
450                   'salt': 0,
451                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
452
453         for engine in engines:
454             self.vapi.cli("set crypto handler all %s" % engine)
455
456             #
457             # loop through each of the algorithms
458             #
459             for algo in algos:
460                 # with self.subTest(algo=algo['scapy']):
461
462                 p = copy.copy(self.ipv4_params)
463                 p.auth_algo_vpp_id = algo['vpp-integ']
464                 p.crypt_algo_vpp_id = algo['vpp-crypto']
465                 p.crypt_algo = algo['scapy-crypto']
466                 p.auth_algo = algo['scapy-integ']
467                 p.crypt_key = algo['key']
468                 p.salt = algo['salt']
469
470                 self.config_network(p)
471
472                 self.verify_tun_44(p, count=127)
473                 c = p.tun_if.get_rx_stats()
474                 self.assertEqual(c['packets'], 127)
475                 c = p.tun_if.get_tx_stats()
476                 self.assertEqual(c['packets'], 127)
477
478                 #
479                 # rekey the tunnel
480                 #
481                 self.rekey(p)
482                 self.verify_tun_44(p, count=127)
483
484                 self.unconfig_network(p)
485                 p.tun_sa_out.remove_vpp_config()
486                 p.tun_sa_in.remove_vpp_config()
487
488
489 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
490     """ IPsec IPv6 Multi Tunnel interface """
491
492     encryption_type = ESP
493     tun6_encrypt_node_name = "esp6-encrypt-tun"
494     tun6_decrypt_node_name = "esp6-decrypt-tun"
495
496     def setUp(self):
497         super(TestIpsec6MultiTunIfEsp, self).setUp()
498
499         self.tun_if = self.pg0
500
501         self.multi_params = []
502         self.pg0.generate_remote_hosts(10)
503         self.pg0.configure_ipv6_neighbors()
504
505         for ii in range(10):
506             p = copy.copy(self.ipv6_params)
507
508             p.remote_tun_if_host = "1111::%d" % (ii + 1)
509             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
510             p.scapy_tun_spi = p.scapy_tun_spi + ii
511             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
512             p.vpp_tun_spi = p.vpp_tun_spi + ii
513
514             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
515             p.scapy_tra_spi = p.scapy_tra_spi + ii
516             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
517             p.vpp_tra_spi = p.vpp_tra_spi + ii
518
519             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
520                                             p.scapy_tun_spi,
521                                             p.crypt_algo_vpp_id,
522                                             p.crypt_key, p.crypt_key,
523                                             p.auth_algo_vpp_id, p.auth_key,
524                                             p.auth_key, is_ip6=True,
525                                             dst=self.pg0.remote_hosts[ii].ip6)
526             p.tun_if.add_vpp_config()
527             p.tun_if.admin_up()
528             p.tun_if.config_ip6()
529             config_tun_params(p, self.encryption_type, p.tun_if)
530             self.multi_params.append(p)
531
532             r = VppIpRoute(self, p.remote_tun_if_host, 128,
533                            [VppRoutePath(p.tun_if.remote_ip6,
534                                          0xffffffff,
535                                          proto=DpoProto.DPO_PROTO_IP6)])
536             r.add_vpp_config()
537
538     def tearDown(self):
539         super(TestIpsec6MultiTunIfEsp, self).tearDown()
540
541     def test_tun_66(self):
542         """Multiple IPSEC tunnel interfaces """
543         for p in self.multi_params:
544             self.verify_tun_66(p, count=127)
545             c = p.tun_if.get_rx_stats()
546             self.assertEqual(c['packets'], 127)
547             c = p.tun_if.get_tx_stats()
548             self.assertEqual(c['packets'], 127)
549
550
551 class TestIpsecGreTebIfEsp(TemplateIpsec,
552                            IpsecTun4Tests):
553     """ Ipsec GRE TEB ESP - TUN tests """
554     tun4_encrypt_node_name = "esp4-encrypt-tun"
555     tun4_decrypt_node_name = "esp4-decrypt-tun"
556     encryption_type = ESP
557     omac = "00:11:22:33:44:55"
558
559     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
560                          payload_size=100):
561         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
562                 sa.encrypt(IP(src=self.pg0.remote_ip4,
563                               dst=self.pg0.local_ip4) /
564                            GRE() /
565                            Ether(dst=self.omac) /
566                            IP(src="1.1.1.1", dst="1.1.1.2") /
567                            UDP(sport=1144, dport=2233) /
568                            Raw(b'X' * payload_size))
569                 for i in range(count)]
570
571     def gen_pkts(self, sw_intf, src, dst, count=1,
572                  payload_size=100):
573         return [Ether(dst=self.omac) /
574                 IP(src="1.1.1.1", dst="1.1.1.2") /
575                 UDP(sport=1144, dport=2233) /
576                 Raw(b'X' * payload_size)
577                 for i in range(count)]
578
579     def verify_decrypted(self, p, rxs):
580         for rx in rxs:
581             self.assert_equal(rx[Ether].dst, self.omac)
582             self.assert_equal(rx[IP].dst, "1.1.1.2")
583
584     def verify_encrypted(self, p, sa, rxs):
585         for rx in rxs:
586             try:
587                 pkt = sa.decrypt(rx[IP])
588                 if not pkt.haslayer(IP):
589                     pkt = IP(pkt[Raw].load)
590                 self.assert_packet_checksums_valid(pkt)
591                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
592                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
593                 self.assertTrue(pkt.haslayer(GRE))
594                 e = pkt[Ether]
595                 self.assertEqual(e[Ether].dst, self.omac)
596                 self.assertEqual(e[IP].dst, "1.1.1.2")
597             except (IndexError, AssertionError):
598                 self.logger.debug(ppp("Unexpected packet:", rx))
599                 try:
600                     self.logger.debug(ppp("Decrypted packet:", pkt))
601                 except:
602                     pass
603                 raise
604
605     def setUp(self):
606         super(TestIpsecGreTebIfEsp, self).setUp()
607
608         self.tun_if = self.pg0
609
610         p = self.ipv4_params
611
612         bd1 = VppBridgeDomain(self, 1)
613         bd1.add_vpp_config()
614
615         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
616                                   p.auth_algo_vpp_id, p.auth_key,
617                                   p.crypt_algo_vpp_id, p.crypt_key,
618                                   self.vpp_esp_protocol,
619                                   self.pg0.local_ip4,
620                                   self.pg0.remote_ip4)
621         p.tun_sa_out.add_vpp_config()
622
623         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
624                                  p.auth_algo_vpp_id, p.auth_key,
625                                  p.crypt_algo_vpp_id, p.crypt_key,
626                                  self.vpp_esp_protocol,
627                                  self.pg0.remote_ip4,
628                                  self.pg0.local_ip4)
629         p.tun_sa_in.add_vpp_config()
630
631         p.tun_if = VppGreInterface(self,
632                                    self.pg0.local_ip4,
633                                    self.pg0.remote_ip4,
634                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
635                                          GRE_API_TUNNEL_TYPE_TEB))
636         p.tun_if.add_vpp_config()
637
638         p.tun_protect = VppIpsecTunProtect(self,
639                                            p.tun_if,
640                                            p.tun_sa_out,
641                                            [p.tun_sa_in])
642
643         p.tun_protect.add_vpp_config()
644
645         p.tun_if.admin_up()
646         p.tun_if.config_ip4()
647         config_tun_params(p, self.encryption_type, p.tun_if)
648
649         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
650         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
651
652         self.vapi.cli("clear ipsec sa")
653
654     def tearDown(self):
655         p = self.ipv4_params
656         p.tun_if.unconfig_ip4()
657         super(TestIpsecGreTebIfEsp, self).tearDown()
658
659
660 class TestIpsecGreIfEsp(TemplateIpsec,
661                         IpsecTun4Tests):
662     """ Ipsec GRE ESP - TUN tests """
663     tun4_encrypt_node_name = "esp4-encrypt-tun"
664     tun4_decrypt_node_name = "esp4-decrypt-tun"
665     encryption_type = ESP
666
667     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
668                          payload_size=100):
669         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
670                 sa.encrypt(IP(src=self.pg0.remote_ip4,
671                               dst=self.pg0.local_ip4) /
672                            GRE() /
673                            IP(src=self.pg1.local_ip4,
674                               dst=self.pg1.remote_ip4) /
675                            UDP(sport=1144, dport=2233) /
676                            Raw(b'X' * payload_size))
677                 for i in range(count)]
678
679     def gen_pkts(self, sw_intf, src, dst, count=1,
680                  payload_size=100):
681         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
682                 IP(src="1.1.1.1", dst="1.1.1.2") /
683                 UDP(sport=1144, dport=2233) /
684                 Raw(b'X' * payload_size)
685                 for i in range(count)]
686
687     def verify_decrypted(self, p, rxs):
688         for rx in rxs:
689             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
690             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
691
692     def verify_encrypted(self, p, sa, rxs):
693         for rx in rxs:
694             try:
695                 pkt = sa.decrypt(rx[IP])
696                 if not pkt.haslayer(IP):
697                     pkt = IP(pkt[Raw].load)
698                 self.assert_packet_checksums_valid(pkt)
699                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
700                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
701                 self.assertTrue(pkt.haslayer(GRE))
702                 e = pkt[GRE]
703                 self.assertEqual(e[IP].dst, "1.1.1.2")
704             except (IndexError, AssertionError):
705                 self.logger.debug(ppp("Unexpected packet:", rx))
706                 try:
707                     self.logger.debug(ppp("Decrypted packet:", pkt))
708                 except:
709                     pass
710                 raise
711
712     def setUp(self):
713         super(TestIpsecGreIfEsp, self).setUp()
714
715         self.tun_if = self.pg0
716
717         p = self.ipv4_params
718
719         bd1 = VppBridgeDomain(self, 1)
720         bd1.add_vpp_config()
721
722         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
723                                   p.auth_algo_vpp_id, p.auth_key,
724                                   p.crypt_algo_vpp_id, p.crypt_key,
725                                   self.vpp_esp_protocol,
726                                   self.pg0.local_ip4,
727                                   self.pg0.remote_ip4)
728         p.tun_sa_out.add_vpp_config()
729
730         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
731                                  p.auth_algo_vpp_id, p.auth_key,
732                                  p.crypt_algo_vpp_id, p.crypt_key,
733                                  self.vpp_esp_protocol,
734                                  self.pg0.remote_ip4,
735                                  self.pg0.local_ip4)
736         p.tun_sa_in.add_vpp_config()
737
738         p.tun_if = VppGreInterface(self,
739                                    self.pg0.local_ip4,
740                                    self.pg0.remote_ip4)
741         p.tun_if.add_vpp_config()
742
743         p.tun_protect = VppIpsecTunProtect(self,
744                                            p.tun_if,
745                                            p.tun_sa_out,
746                                            [p.tun_sa_in])
747         p.tun_protect.add_vpp_config()
748
749         p.tun_if.admin_up()
750         p.tun_if.config_ip4()
751         config_tun_params(p, self.encryption_type, p.tun_if)
752
753         VppIpRoute(self, "1.1.1.2", 32,
754                    [VppRoutePath(p.tun_if.remote_ip4,
755                                  0xffffffff)]).add_vpp_config()
756
757     def tearDown(self):
758         p = self.ipv4_params
759         p.tun_if.unconfig_ip4()
760         super(TestIpsecGreIfEsp, self).tearDown()
761
762
763 class TestIpsecGreIfEspTra(TemplateIpsec,
764                            IpsecTun4Tests):
765     """ Ipsec GRE ESP - TRA tests """
766     tun4_encrypt_node_name = "esp4-encrypt-tun"
767     tun4_decrypt_node_name = "esp4-decrypt-tun"
768     encryption_type = ESP
769
770     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
771                          payload_size=100):
772         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
773                 sa.encrypt(IP(src=self.pg0.remote_ip4,
774                               dst=self.pg0.local_ip4) /
775                            GRE() /
776                            IP(src=self.pg1.local_ip4,
777                               dst=self.pg1.remote_ip4) /
778                            UDP(sport=1144, dport=2233) /
779                            Raw(b'X' * payload_size))
780                 for i in range(count)]
781
782     def gen_pkts(self, sw_intf, src, dst, count=1,
783                  payload_size=100):
784         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
785                 IP(src="1.1.1.1", dst="1.1.1.2") /
786                 UDP(sport=1144, dport=2233) /
787                 Raw(b'X' * payload_size)
788                 for i in range(count)]
789
790     def verify_decrypted(self, p, rxs):
791         for rx in rxs:
792             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
793             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
794
795     def verify_encrypted(self, p, sa, rxs):
796         for rx in rxs:
797             try:
798                 pkt = sa.decrypt(rx[IP])
799                 if not pkt.haslayer(IP):
800                     pkt = IP(pkt[Raw].load)
801                 self.assert_packet_checksums_valid(pkt)
802                 self.assertTrue(pkt.haslayer(GRE))
803                 e = pkt[GRE]
804                 self.assertEqual(e[IP].dst, "1.1.1.2")
805             except (IndexError, AssertionError):
806                 self.logger.debug(ppp("Unexpected packet:", rx))
807                 try:
808                     self.logger.debug(ppp("Decrypted packet:", pkt))
809                 except:
810                     pass
811                 raise
812
813     def setUp(self):
814         super(TestIpsecGreIfEspTra, self).setUp()
815
816         self.tun_if = self.pg0
817
818         p = self.ipv4_params
819
820         bd1 = VppBridgeDomain(self, 1)
821         bd1.add_vpp_config()
822
823         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
824                                   p.auth_algo_vpp_id, p.auth_key,
825                                   p.crypt_algo_vpp_id, p.crypt_key,
826                                   self.vpp_esp_protocol)
827         p.tun_sa_out.add_vpp_config()
828
829         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
830                                  p.auth_algo_vpp_id, p.auth_key,
831                                  p.crypt_algo_vpp_id, p.crypt_key,
832                                  self.vpp_esp_protocol)
833         p.tun_sa_in.add_vpp_config()
834
835         p.tun_if = VppGreInterface(self,
836                                    self.pg0.local_ip4,
837                                    self.pg0.remote_ip4)
838         p.tun_if.add_vpp_config()
839
840         p.tun_protect = VppIpsecTunProtect(self,
841                                            p.tun_if,
842                                            p.tun_sa_out,
843                                            [p.tun_sa_in])
844         p.tun_protect.add_vpp_config()
845
846         p.tun_if.admin_up()
847         p.tun_if.config_ip4()
848         config_tun_params(p, self.encryption_type, p.tun_if)
849
850         VppIpRoute(self, "1.1.1.2", 32,
851                    [VppRoutePath(p.tun_if.remote_ip4,
852                                  0xffffffff)]).add_vpp_config()
853
854     def tearDown(self):
855         p = self.ipv4_params
856         p.tun_if.unconfig_ip4()
857         super(TestIpsecGreIfEspTra, self).tearDown()
858
859
860 class TemplateIpsec4TunProtect(object):
861     """ IPsec IPv4 Tunnel protect """
862
863     encryption_type = ESP
864     tun4_encrypt_node_name = "esp4-encrypt-tun"
865     tun4_decrypt_node_name = "esp4-decrypt-tun"
866     tun4_input_node = "ipsec4-tun-input"
867
868     def config_sa_tra(self, p):
869         config_tun_params(p, self.encryption_type, p.tun_if)
870
871         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
872                                   p.auth_algo_vpp_id, p.auth_key,
873                                   p.crypt_algo_vpp_id, p.crypt_key,
874                                   self.vpp_esp_protocol,
875                                   flags=p.flags)
876         p.tun_sa_out.add_vpp_config()
877
878         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
879                                  p.auth_algo_vpp_id, p.auth_key,
880                                  p.crypt_algo_vpp_id, p.crypt_key,
881                                  self.vpp_esp_protocol,
882                                  flags=p.flags)
883         p.tun_sa_in.add_vpp_config()
884
885     def config_sa_tun(self, p):
886         config_tun_params(p, self.encryption_type, p.tun_if)
887
888         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
889                                   p.auth_algo_vpp_id, p.auth_key,
890                                   p.crypt_algo_vpp_id, p.crypt_key,
891                                   self.vpp_esp_protocol,
892                                   self.tun_if.remote_addr[p.addr_type],
893                                   self.tun_if.local_addr[p.addr_type],
894                                   flags=p.flags)
895         p.tun_sa_out.add_vpp_config()
896
897         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
898                                  p.auth_algo_vpp_id, p.auth_key,
899                                  p.crypt_algo_vpp_id, p.crypt_key,
900                                  self.vpp_esp_protocol,
901                                  self.tun_if.remote_addr[p.addr_type],
902                                  self.tun_if.local_addr[p.addr_type],
903                                  flags=p.flags)
904         p.tun_sa_in.add_vpp_config()
905
906     def config_protect(self, p):
907         p.tun_protect = VppIpsecTunProtect(self,
908                                            p.tun_if,
909                                            p.tun_sa_out,
910                                            [p.tun_sa_in])
911         p.tun_protect.add_vpp_config()
912
913     def config_network(self, p):
914         p.tun_if = VppIpIpTunInterface(self, self.pg0,
915                                        self.pg0.local_ip4,
916                                        self.pg0.remote_ip4)
917         p.tun_if.add_vpp_config()
918         p.tun_if.admin_up()
919         p.tun_if.config_ip4()
920         p.tun_if.config_ip6()
921
922         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
923                              [VppRoutePath(p.tun_if.remote_ip4,
924                                            0xffffffff)])
925         p.route.add_vpp_config()
926         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
927                        [VppRoutePath(p.tun_if.remote_ip6,
928                                      0xffffffff,
929                                      proto=DpoProto.DPO_PROTO_IP6)])
930         r.add_vpp_config()
931
932     def unconfig_network(self, p):
933         p.route.remove_vpp_config()
934         p.tun_if.remove_vpp_config()
935
936     def unconfig_protect(self, p):
937         p.tun_protect.remove_vpp_config()
938
939     def unconfig_sa(self, p):
940         p.tun_sa_out.remove_vpp_config()
941         p.tun_sa_in.remove_vpp_config()
942
943
944 class TestIpsec4TunProtect(TemplateIpsec,
945                            TemplateIpsec4TunProtect,
946                            IpsecTun4):
947     """ IPsec IPv4 Tunnel protect - transport mode"""
948
949     def setUp(self):
950         super(TestIpsec4TunProtect, self).setUp()
951
952         self.tun_if = self.pg0
953
954     def tearDown(self):
955         super(TestIpsec4TunProtect, self).tearDown()
956
957     def test_tun_44(self):
958         """IPSEC tunnel protect"""
959
960         p = self.ipv4_params
961
962         self.config_network(p)
963         self.config_sa_tra(p)
964         self.config_protect(p)
965
966         self.verify_tun_44(p, count=127)
967         c = p.tun_if.get_rx_stats()
968         self.assertEqual(c['packets'], 127)
969         c = p.tun_if.get_tx_stats()
970         self.assertEqual(c['packets'], 127)
971
972         self.vapi.cli("clear ipsec sa")
973         self.verify_tun_64(p, count=127)
974         c = p.tun_if.get_rx_stats()
975         self.assertEqual(c['packets'], 254)
976         c = p.tun_if.get_tx_stats()
977         self.assertEqual(c['packets'], 254)
978
979         # rekey - create new SAs and update the tunnel protection
980         np = copy.copy(p)
981         np.crypt_key = b'X' + p.crypt_key[1:]
982         np.scapy_tun_spi += 100
983         np.scapy_tun_sa_id += 1
984         np.vpp_tun_spi += 100
985         np.vpp_tun_sa_id += 1
986         np.tun_if.local_spi = p.vpp_tun_spi
987         np.tun_if.remote_spi = p.scapy_tun_spi
988
989         self.config_sa_tra(np)
990         self.config_protect(np)
991         self.unconfig_sa(p)
992
993         self.verify_tun_44(np, count=127)
994         c = p.tun_if.get_rx_stats()
995         self.assertEqual(c['packets'], 381)
996         c = p.tun_if.get_tx_stats()
997         self.assertEqual(c['packets'], 381)
998
999         # teardown
1000         self.unconfig_protect(np)
1001         self.unconfig_sa(np)
1002         self.unconfig_network(p)
1003
1004
1005 class TestIpsec4TunProtectUdp(TemplateIpsec,
1006                               TemplateIpsec4TunProtect,
1007                               IpsecTun4):
1008     """ IPsec IPv4 Tunnel protect - transport mode"""
1009
1010     def setUp(self):
1011         super(TestIpsec4TunProtectUdp, self).setUp()
1012
1013         self.tun_if = self.pg0
1014
1015         p = self.ipv4_params
1016         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1017                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1018         p.nat_header = UDP(sport=5454, dport=4500)
1019         self.config_network(p)
1020         self.config_sa_tra(p)
1021         self.config_protect(p)
1022
1023     def tearDown(self):
1024         p = self.ipv4_params
1025         self.unconfig_protect(p)
1026         self.unconfig_sa(p)
1027         self.unconfig_network(p)
1028         super(TestIpsec4TunProtectUdp, self).tearDown()
1029
1030     def test_tun_44(self):
1031         """IPSEC UDP tunnel protect"""
1032
1033         p = self.ipv4_params
1034
1035         self.verify_tun_44(p, count=127)
1036         c = p.tun_if.get_rx_stats()
1037         self.assertEqual(c['packets'], 127)
1038         c = p.tun_if.get_tx_stats()
1039         self.assertEqual(c['packets'], 127)
1040
1041     def test_keepalive(self):
1042         """ IPSEC NAT Keepalive """
1043         self.verify_keepalive(self.ipv4_params)
1044
1045
1046 class TestIpsec4TunProtectTun(TemplateIpsec,
1047                               TemplateIpsec4TunProtect,
1048                               IpsecTun4):
1049     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1050
1051     encryption_type = ESP
1052     tun4_encrypt_node_name = "esp4-encrypt-tun"
1053     tun4_decrypt_node_name = "esp4-decrypt-tun"
1054
1055     def setUp(self):
1056         super(TestIpsec4TunProtectTun, self).setUp()
1057
1058         self.tun_if = self.pg0
1059
1060     def tearDown(self):
1061         super(TestIpsec4TunProtectTun, self).tearDown()
1062
1063     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1064                          payload_size=100):
1065         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1066                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1067                               dst=sw_intf.local_ip4) /
1068                            IP(src=src, dst=dst) /
1069                            UDP(sport=1144, dport=2233) /
1070                            Raw(b'X' * payload_size))
1071                 for i in range(count)]
1072
1073     def gen_pkts(self, sw_intf, src, dst, count=1,
1074                  payload_size=100):
1075         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1076                 IP(src=src, dst=dst) /
1077                 UDP(sport=1144, dport=2233) /
1078                 Raw(b'X' * payload_size)
1079                 for i in range(count)]
1080
1081     def verify_decrypted(self, p, rxs):
1082         for rx in rxs:
1083             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1084             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1085             self.assert_packet_checksums_valid(rx)
1086
1087     def verify_encrypted(self, p, sa, rxs):
1088         for rx in rxs:
1089             try:
1090                 pkt = sa.decrypt(rx[IP])
1091                 if not pkt.haslayer(IP):
1092                     pkt = IP(pkt[Raw].load)
1093                 self.assert_packet_checksums_valid(pkt)
1094                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1095                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1096                 inner = pkt[IP].payload
1097                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1098
1099             except (IndexError, AssertionError):
1100                 self.logger.debug(ppp("Unexpected packet:", rx))
1101                 try:
1102                     self.logger.debug(ppp("Decrypted packet:", pkt))
1103                 except:
1104                     pass
1105                 raise
1106
1107     def test_tun_44(self):
1108         """IPSEC tunnel protect """
1109
1110         p = self.ipv4_params
1111
1112         self.config_network(p)
1113         self.config_sa_tun(p)
1114         self.config_protect(p)
1115
1116         self.verify_tun_44(p, count=127)
1117
1118         c = p.tun_if.get_rx_stats()
1119         self.assertEqual(c['packets'], 127)
1120         c = p.tun_if.get_tx_stats()
1121         self.assertEqual(c['packets'], 127)
1122
1123         # rekey - create new SAs and update the tunnel protection
1124         np = copy.copy(p)
1125         np.crypt_key = b'X' + p.crypt_key[1:]
1126         np.scapy_tun_spi += 100
1127         np.scapy_tun_sa_id += 1
1128         np.vpp_tun_spi += 100
1129         np.vpp_tun_sa_id += 1
1130         np.tun_if.local_spi = p.vpp_tun_spi
1131         np.tun_if.remote_spi = p.scapy_tun_spi
1132
1133         self.config_sa_tun(np)
1134         self.config_protect(np)
1135         self.unconfig_sa(p)
1136
1137         self.verify_tun_44(np, count=127)
1138         c = p.tun_if.get_rx_stats()
1139         self.assertEqual(c['packets'], 254)
1140         c = p.tun_if.get_tx_stats()
1141         self.assertEqual(c['packets'], 254)
1142
1143         # teardown
1144         self.unconfig_protect(np)
1145         self.unconfig_sa(np)
1146         self.unconfig_network(p)
1147
1148
1149 class TemplateIpsec6TunProtect(object):
1150     """ IPsec IPv6 Tunnel protect """
1151
1152     def config_sa_tra(self, p):
1153         config_tun_params(p, self.encryption_type, p.tun_if)
1154
1155         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1156                                   p.auth_algo_vpp_id, p.auth_key,
1157                                   p.crypt_algo_vpp_id, p.crypt_key,
1158                                   self.vpp_esp_protocol)
1159         p.tun_sa_out.add_vpp_config()
1160
1161         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1162                                  p.auth_algo_vpp_id, p.auth_key,
1163                                  p.crypt_algo_vpp_id, p.crypt_key,
1164                                  self.vpp_esp_protocol)
1165         p.tun_sa_in.add_vpp_config()
1166
1167     def config_sa_tun(self, p):
1168         config_tun_params(p, self.encryption_type, p.tun_if)
1169
1170         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1171                                   p.auth_algo_vpp_id, p.auth_key,
1172                                   p.crypt_algo_vpp_id, p.crypt_key,
1173                                   self.vpp_esp_protocol,
1174                                   self.tun_if.remote_addr[p.addr_type],
1175                                   self.tun_if.local_addr[p.addr_type])
1176         p.tun_sa_out.add_vpp_config()
1177
1178         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1179                                  p.auth_algo_vpp_id, p.auth_key,
1180                                  p.crypt_algo_vpp_id, p.crypt_key,
1181                                  self.vpp_esp_protocol,
1182                                  self.tun_if.remote_addr[p.addr_type],
1183                                  self.tun_if.local_addr[p.addr_type])
1184         p.tun_sa_in.add_vpp_config()
1185
1186     def config_protect(self, p):
1187         p.tun_protect = VppIpsecTunProtect(self,
1188                                            p.tun_if,
1189                                            p.tun_sa_out,
1190                                            [p.tun_sa_in])
1191         p.tun_protect.add_vpp_config()
1192
1193     def config_network(self, p):
1194         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1195                                        self.pg0.local_ip6,
1196                                        self.pg0.remote_ip6)
1197         p.tun_if.add_vpp_config()
1198         p.tun_if.admin_up()
1199         p.tun_if.config_ip6()
1200         p.tun_if.config_ip4()
1201
1202         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1203                              [VppRoutePath(p.tun_if.remote_ip6,
1204                                            0xffffffff,
1205                                            proto=DpoProto.DPO_PROTO_IP6)])
1206         p.route.add_vpp_config()
1207         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1208                        [VppRoutePath(p.tun_if.remote_ip4,
1209                                      0xffffffff)])
1210         r.add_vpp_config()
1211
1212     def unconfig_network(self, p):
1213         p.route.remove_vpp_config()
1214         p.tun_if.remove_vpp_config()
1215
1216     def unconfig_protect(self, p):
1217         p.tun_protect.remove_vpp_config()
1218
1219     def unconfig_sa(self, p):
1220         p.tun_sa_out.remove_vpp_config()
1221         p.tun_sa_in.remove_vpp_config()
1222
1223
1224 class TestIpsec6TunProtect(TemplateIpsec,
1225                            TemplateIpsec6TunProtect,
1226                            IpsecTun6):
1227     """ IPsec IPv6 Tunnel protect - transport mode"""
1228
1229     encryption_type = ESP
1230     tun6_encrypt_node_name = "esp6-encrypt-tun"
1231     tun6_decrypt_node_name = "esp6-decrypt-tun"
1232
1233     def setUp(self):
1234         super(TestIpsec6TunProtect, self).setUp()
1235
1236         self.tun_if = self.pg0
1237
1238     def tearDown(self):
1239         super(TestIpsec6TunProtect, self).tearDown()
1240
1241     def test_tun_66(self):
1242         """IPSEC tunnel protect"""
1243
1244         p = self.ipv6_params
1245
1246         self.config_network(p)
1247         self.config_sa_tra(p)
1248         self.config_protect(p)
1249
1250         self.verify_tun_66(p, count=127)
1251         c = p.tun_if.get_rx_stats()
1252         self.assertEqual(c['packets'], 127)
1253         c = p.tun_if.get_tx_stats()
1254         self.assertEqual(c['packets'], 127)
1255
1256         # rekey - create new SAs and update the tunnel protection
1257         np = copy.copy(p)
1258         np.crypt_key = b'X' + p.crypt_key[1:]
1259         np.scapy_tun_spi += 100
1260         np.scapy_tun_sa_id += 1
1261         np.vpp_tun_spi += 100
1262         np.vpp_tun_sa_id += 1
1263         np.tun_if.local_spi = p.vpp_tun_spi
1264         np.tun_if.remote_spi = p.scapy_tun_spi
1265
1266         self.config_sa_tra(np)
1267         self.config_protect(np)
1268         self.unconfig_sa(p)
1269
1270         self.verify_tun_66(np, count=127)
1271         c = p.tun_if.get_rx_stats()
1272         self.assertEqual(c['packets'], 254)
1273         c = p.tun_if.get_tx_stats()
1274         self.assertEqual(c['packets'], 254)
1275
1276         # 3 phase rekey
1277         #  1) add two input SAs [old, new]
1278         #  2) swap output SA to [new]
1279         #  3) use only [new] input SA
1280         np3 = copy.copy(np)
1281         np3.crypt_key = b'Z' + p.crypt_key[1:]
1282         np3.scapy_tun_spi += 100
1283         np3.scapy_tun_sa_id += 1
1284         np3.vpp_tun_spi += 100
1285         np3.vpp_tun_sa_id += 1
1286         np3.tun_if.local_spi = p.vpp_tun_spi
1287         np3.tun_if.remote_spi = p.scapy_tun_spi
1288
1289         self.config_sa_tra(np3)
1290
1291         # step 1;
1292         p.tun_protect.update_vpp_config(np.tun_sa_out,
1293                                         [np.tun_sa_in, np3.tun_sa_in])
1294         self.verify_tun_66(np, np, count=127)
1295         self.verify_tun_66(np3, np, count=127)
1296
1297         # step 2;
1298         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1299                                         [np.tun_sa_in, np3.tun_sa_in])
1300         self.verify_tun_66(np, np3, count=127)
1301         self.verify_tun_66(np3, np3, count=127)
1302
1303         # step 1;
1304         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1305                                         [np3.tun_sa_in])
1306         self.verify_tun_66(np3, np3, count=127)
1307         self.verify_drop_tun_66(np, count=127)
1308
1309         c = p.tun_if.get_rx_stats()
1310         self.assertEqual(c['packets'], 127*7)
1311         c = p.tun_if.get_tx_stats()
1312         self.assertEqual(c['packets'], 127*7)
1313         self.unconfig_sa(np)
1314
1315         # teardown
1316         self.unconfig_protect(np3)
1317         self.unconfig_sa(np3)
1318         self.unconfig_network(p)
1319
1320     def test_tun_46(self):
1321         """IPSEC tunnel protect"""
1322
1323         p = self.ipv6_params
1324
1325         self.config_network(p)
1326         self.config_sa_tra(p)
1327         self.config_protect(p)
1328
1329         self.verify_tun_46(p, count=127)
1330         c = p.tun_if.get_rx_stats()
1331         self.assertEqual(c['packets'], 127)
1332         c = p.tun_if.get_tx_stats()
1333         self.assertEqual(c['packets'], 127)
1334
1335         # teardown
1336         self.unconfig_protect(p)
1337         self.unconfig_sa(p)
1338         self.unconfig_network(p)
1339
1340
1341 class TestIpsec6TunProtectTun(TemplateIpsec,
1342                               TemplateIpsec6TunProtect,
1343                               IpsecTun6):
1344     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1345
1346     encryption_type = ESP
1347     tun6_encrypt_node_name = "esp6-encrypt-tun"
1348     tun6_decrypt_node_name = "esp6-decrypt-tun"
1349
1350     def setUp(self):
1351         super(TestIpsec6TunProtectTun, self).setUp()
1352
1353         self.tun_if = self.pg0
1354
1355     def tearDown(self):
1356         super(TestIpsec6TunProtectTun, self).tearDown()
1357
1358     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1359                           payload_size=100):
1360         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1361                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1362                                 dst=sw_intf.local_ip6) /
1363                            IPv6(src=src, dst=dst) /
1364                            UDP(sport=1166, dport=2233) /
1365                            Raw(b'X' * payload_size))
1366                 for i in range(count)]
1367
1368     def gen_pkts6(self, sw_intf, src, dst, count=1,
1369                   payload_size=100):
1370         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1371                 IPv6(src=src, dst=dst) /
1372                 UDP(sport=1166, dport=2233) /
1373                 Raw(b'X' * payload_size)
1374                 for i in range(count)]
1375
1376     def verify_decrypted6(self, p, rxs):
1377         for rx in rxs:
1378             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1379             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1380             self.assert_packet_checksums_valid(rx)
1381
1382     def verify_encrypted6(self, p, sa, rxs):
1383         for rx in rxs:
1384             try:
1385                 pkt = sa.decrypt(rx[IPv6])
1386                 if not pkt.haslayer(IPv6):
1387                     pkt = IPv6(pkt[Raw].load)
1388                 self.assert_packet_checksums_valid(pkt)
1389                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1390                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1391                 inner = pkt[IPv6].payload
1392                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1393
1394             except (IndexError, AssertionError):
1395                 self.logger.debug(ppp("Unexpected packet:", rx))
1396                 try:
1397                     self.logger.debug(ppp("Decrypted packet:", pkt))
1398                 except:
1399                     pass
1400                 raise
1401
1402     def test_tun_66(self):
1403         """IPSEC tunnel protect """
1404
1405         p = self.ipv6_params
1406
1407         self.config_network(p)
1408         self.config_sa_tun(p)
1409         self.config_protect(p)
1410
1411         self.verify_tun_66(p, count=127)
1412
1413         c = p.tun_if.get_rx_stats()
1414         self.assertEqual(c['packets'], 127)
1415         c = p.tun_if.get_tx_stats()
1416         self.assertEqual(c['packets'], 127)
1417
1418         # rekey - create new SAs and update the tunnel protection
1419         np = copy.copy(p)
1420         np.crypt_key = b'X' + p.crypt_key[1:]
1421         np.scapy_tun_spi += 100
1422         np.scapy_tun_sa_id += 1
1423         np.vpp_tun_spi += 100
1424         np.vpp_tun_sa_id += 1
1425         np.tun_if.local_spi = p.vpp_tun_spi
1426         np.tun_if.remote_spi = p.scapy_tun_spi
1427
1428         self.config_sa_tun(np)
1429         self.config_protect(np)
1430         self.unconfig_sa(p)
1431
1432         self.verify_tun_66(np, count=127)
1433         c = p.tun_if.get_rx_stats()
1434         self.assertEqual(c['packets'], 254)
1435         c = p.tun_if.get_tx_stats()
1436         self.assertEqual(c['packets'], 254)
1437
1438         # teardown
1439         self.unconfig_protect(np)
1440         self.unconfig_sa(np)
1441         self.unconfig_network(p)
1442
1443
1444 if __name__ == '__main__':
1445     unittest.main(testRunner=VppTestRunner)