ipsec: handle UDP keepalives
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 from util import ppp
19 from vpp_papi import VppEnum
20
21
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23     """ IPsec tunnel interface tests """
24
25     encryption_type = ESP
26
27     @classmethod
28     def setUpClass(cls):
29         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
34
35     def setUp(self):
36         super(TemplateIpsec4TunIfEsp, self).setUp()
37
38         self.tun_if = self.pg0
39
40         p = self.ipv4_params
41
42         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
44                                         p.crypt_key, p.crypt_key,
45                                         p.auth_algo_vpp_id, p.auth_key,
46                                         p.auth_key)
47         p.tun_if.add_vpp_config()
48         p.tun_if.admin_up()
49         p.tun_if.config_ip4()
50         p.tun_if.config_ip6()
51
52         r = VppIpRoute(self, p.remote_tun_if_host, 32,
53                        [VppRoutePath(p.tun_if.remote_ip4,
54                                      0xffffffff)])
55         r.add_vpp_config()
56         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57                        [VppRoutePath(p.tun_if.remote_ip6,
58                                      0xffffffff,
59                                      proto=DpoProto.DPO_PROTO_IP6)])
60         r.add_vpp_config()
61
62     def tearDown(self):
63         super(TemplateIpsec4TunIfEsp, self).tearDown()
64
65
66 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
67     """ IPsec UDP tunnel interface tests """
68
69     tun4_encrypt_node_name = "esp4-encrypt-tun"
70     tun4_decrypt_node_name = "esp4-decrypt"
71     encryption_type = ESP
72
73     @classmethod
74     def setUpClass(cls):
75         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
76
77     @classmethod
78     def tearDownClass(cls):
79         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
80
81     def setUp(self):
82         super(TemplateIpsec4TunIfEspUdp, self).setUp()
83
84         self.tun_if = self.pg0
85
86         p = self.ipv4_params
87         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
88                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
89         p.nat_header = UDP(sport=5454, dport=4500)
90
91         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
92                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
93                                         p.crypt_key, p.crypt_key,
94                                         p.auth_algo_vpp_id, p.auth_key,
95                                         p.auth_key, udp_encap=True)
96         p.tun_if.add_vpp_config()
97         p.tun_if.admin_up()
98         p.tun_if.config_ip4()
99         p.tun_if.config_ip6()
100
101         r = VppIpRoute(self, p.remote_tun_if_host, 32,
102                        [VppRoutePath(p.tun_if.remote_ip4,
103                                      0xffffffff)])
104         r.add_vpp_config()
105         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106                        [VppRoutePath(p.tun_if.remote_ip6,
107                                      0xffffffff,
108                                      proto=DpoProto.DPO_PROTO_IP6)])
109         r.add_vpp_config()
110
111     def tearDown(self):
112         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
113
114
115 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
116     """ Ipsec ESP - TUN tests """
117     tun4_encrypt_node_name = "esp4-encrypt-tun"
118     tun4_decrypt_node_name = "esp4-decrypt"
119
120     def test_tun_basic64(self):
121         """ ipsec 6o4 tunnel basic test """
122         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
123
124         self.verify_tun_64(self.params[socket.AF_INET], count=1)
125
126     def test_tun_burst64(self):
127         """ ipsec 6o4 tunnel basic test """
128         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
129
130         self.verify_tun_64(self.params[socket.AF_INET], count=257)
131
132     def test_tun_basic_frag44(self):
133         """ ipsec 4o4 tunnel frag basic test """
134         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
135
136         p = self.ipv4_params
137
138         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
139                                        [1500, 0, 0, 0])
140         self.verify_tun_44(self.params[socket.AF_INET],
141                            count=1, payload_size=1800, n_rx=2)
142         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
143                                        [9000, 0, 0, 0])
144
145
146 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
147     """ Ipsec ESP UDP tests """
148
149     tun4_input_node = "ipsec4-if-input"
150
151     def test_keepalive(self):
152         """ IPSEC NAT Keepalive """
153         self.verify_keepalive(self.ipv4_params)
154
155
156 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
157     """ Ipsec ESP - TCP tests """
158     pass
159
160
161 class TemplateIpsec6TunIfEsp(TemplateIpsec):
162     """ IPsec tunnel interface tests """
163
164     encryption_type = ESP
165
166     def setUp(self):
167         super(TemplateIpsec6TunIfEsp, self).setUp()
168
169         self.tun_if = self.pg0
170
171         p = self.ipv6_params
172         tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173                                       p.scapy_tun_spi, p.crypt_algo_vpp_id,
174                                       p.crypt_key, p.crypt_key,
175                                       p.auth_algo_vpp_id, p.auth_key,
176                                       p.auth_key, is_ip6=True)
177         tun_if.add_vpp_config()
178         tun_if.admin_up()
179         tun_if.config_ip6()
180         tun_if.config_ip4()
181
182         r = VppIpRoute(self, p.remote_tun_if_host, 128,
183                        [VppRoutePath(tun_if.remote_ip6,
184                                      0xffffffff,
185                                      proto=DpoProto.DPO_PROTO_IP6)])
186         r.add_vpp_config()
187         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
188                        [VppRoutePath(tun_if.remote_ip4,
189                                      0xffffffff)])
190         r.add_vpp_config()
191
192     def tearDown(self):
193         super(TemplateIpsec6TunIfEsp, self).tearDown()
194
195
196 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
197     """ Ipsec ESP - TUN tests """
198     tun6_encrypt_node_name = "esp6-encrypt-tun"
199     tun6_decrypt_node_name = "esp6-decrypt"
200
201     def test_tun_basic46(self):
202         """ ipsec 4o6 tunnel basic test """
203         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
204         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
205
206     def test_tun_burst46(self):
207         """ ipsec 4o6 tunnel burst test """
208         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
209         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
210
211
212 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
213     """ IPsec IPv4 Multi Tunnel interface """
214
215     encryption_type = ESP
216     tun4_encrypt_node_name = "esp4-encrypt-tun"
217     tun4_decrypt_node_name = "esp4-decrypt"
218
219     def setUp(self):
220         super(TestIpsec4MultiTunIfEsp, self).setUp()
221
222         self.tun_if = self.pg0
223
224         self.multi_params = []
225
226         for ii in range(10):
227             p = copy.copy(self.ipv4_params)
228
229             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
230             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
231             p.scapy_tun_spi = p.scapy_tun_spi + ii
232             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
233             p.vpp_tun_spi = p.vpp_tun_spi + ii
234
235             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
236             p.scapy_tra_spi = p.scapy_tra_spi + ii
237             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
238             p.vpp_tra_spi = p.vpp_tra_spi + ii
239
240             config_tun_params(p, self.encryption_type, self.tun_if)
241             self.multi_params.append(p)
242
243             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
244                                             p.scapy_tun_spi,
245                                             p.crypt_algo_vpp_id,
246                                             p.crypt_key, p.crypt_key,
247                                             p.auth_algo_vpp_id, p.auth_key,
248                                             p.auth_key)
249             p.tun_if.add_vpp_config()
250             p.tun_if.admin_up()
251             p.tun_if.config_ip4()
252
253             VppIpRoute(self, p.remote_tun_if_host, 32,
254                        [VppRoutePath(p.tun_if.remote_ip4,
255                                      0xffffffff)]).add_vpp_config()
256
257     def tearDown(self):
258         super(TestIpsec4MultiTunIfEsp, self).tearDown()
259
260     def test_tun_44(self):
261         """Multiple IPSEC tunnel interfaces """
262         for p in self.multi_params:
263             self.verify_tun_44(p, count=127)
264             c = p.tun_if.get_rx_stats()
265             self.assertEqual(c['packets'], 127)
266             c = p.tun_if.get_tx_stats()
267             self.assertEqual(c['packets'], 127)
268
269
270 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
271     """ IPsec IPv4 Tunnel interface all Algos """
272
273     encryption_type = ESP
274     tun4_encrypt_node_name = "esp4-encrypt-tun"
275     tun4_decrypt_node_name = "esp4-decrypt"
276
277     def config_network(self, p):
278         config_tun_params(p, self.encryption_type, self.tun_if)
279
280         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
281                                         p.scapy_tun_spi,
282                                         p.crypt_algo_vpp_id,
283                                         p.crypt_key, p.crypt_key,
284                                         p.auth_algo_vpp_id, p.auth_key,
285                                         p.auth_key,
286                                         salt=p.salt)
287         p.tun_if.add_vpp_config()
288         p.tun_if.admin_up()
289         p.tun_if.config_ip4()
290         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
291         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
292
293         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
294                              [VppRoutePath(p.tun_if.remote_ip4,
295                                            0xffffffff)])
296         p.route.add_vpp_config()
297
298     def unconfig_network(self, p):
299         p.tun_if.unconfig_ip4()
300         p.tun_if.remove_vpp_config()
301         p.route.remove_vpp_config()
302
303     def setUp(self):
304         super(TestIpsec4TunIfEspAll, self).setUp()
305
306         self.tun_if = self.pg0
307
308     def tearDown(self):
309         super(TestIpsec4TunIfEspAll, self).tearDown()
310
311     def rekey(self, p):
312         #
313         # change the key and the SPI
314         #
315         p.crypt_key = 'X' + p.crypt_key[1:]
316         p.scapy_tun_spi += 1
317         p.scapy_tun_sa_id += 1
318         p.vpp_tun_spi += 1
319         p.vpp_tun_sa_id += 1
320         p.tun_if.local_spi = p.vpp_tun_spi
321         p.tun_if.remote_spi = p.scapy_tun_spi
322
323         config_tun_params(p, self.encryption_type, self.tun_if)
324
325         p.tun_sa_in = VppIpsecSA(self,
326                                  p.scapy_tun_sa_id,
327                                  p.scapy_tun_spi,
328                                  p.auth_algo_vpp_id,
329                                  p.auth_key,
330                                  p.crypt_algo_vpp_id,
331                                  p.crypt_key,
332                                  self.vpp_esp_protocol,
333                                  self.tun_if.local_addr[p.addr_type],
334                                  self.tun_if.remote_addr[p.addr_type],
335                                  flags=p.flags,
336                                  salt=p.salt)
337         p.tun_sa_out = VppIpsecSA(self,
338                                   p.vpp_tun_sa_id,
339                                   p.vpp_tun_spi,
340                                   p.auth_algo_vpp_id,
341                                   p.auth_key,
342                                   p.crypt_algo_vpp_id,
343                                   p.crypt_key,
344                                   self.vpp_esp_protocol,
345                                   self.tun_if.remote_addr[p.addr_type],
346                                   self.tun_if.local_addr[p.addr_type],
347                                   flags=p.flags,
348                                   salt=p.salt)
349         p.tun_sa_in.add_vpp_config()
350         p.tun_sa_out.add_vpp_config()
351
352         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
353                                          sa_id=p.tun_sa_in.id,
354                                          is_outbound=1)
355         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
356                                          sa_id=p.tun_sa_out.id,
357                                          is_outbound=0)
358         self.logger.info(self.vapi.cli("sh ipsec sa"))
359
360     def test_tun_44(self):
361         """IPSEC tunnel all algos """
362
363         # foreach VPP crypto engine
364         engines = ["ia32", "ipsecmb", "openssl"]
365
366         # foreach crypto algorithm
367         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
368                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
369                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
370                                 IPSEC_API_INTEG_ALG_NONE),
371                   'scapy-crypto': "AES-GCM",
372                   'scapy-integ': "NULL",
373                   'key': "JPjyOWBeVEQiMe7h",
374                   'salt': 3333},
375                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
376                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
377                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
378                                 IPSEC_API_INTEG_ALG_NONE),
379                   'scapy-crypto': "AES-GCM",
380                   'scapy-integ': "NULL",
381                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
382                   'salt': 0},
383                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
384                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
385                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
386                                 IPSEC_API_INTEG_ALG_NONE),
387                   'scapy-crypto': "AES-GCM",
388                   'scapy-integ': "NULL",
389                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
390                   'salt': 9999},
391                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
392                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
393                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
394                                 IPSEC_API_INTEG_ALG_SHA1_96),
395                   'scapy-crypto': "AES-CBC",
396                   'scapy-integ': "HMAC-SHA1-96",
397                   'salt': 0,
398                   'key': "JPjyOWBeVEQiMe7h"},
399                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
400                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
401                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
402                                 IPSEC_API_INTEG_ALG_SHA1_96),
403                   'scapy-crypto': "AES-CBC",
404                   'scapy-integ': "HMAC-SHA1-96",
405                   'salt': 0,
406                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
407                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
408                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
409                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
410                                 IPSEC_API_INTEG_ALG_SHA1_96),
411                   'scapy-crypto': "AES-CBC",
412                   'scapy-integ': "HMAC-SHA1-96",
413                   'salt': 0,
414                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
415
416         for engine in engines:
417             self.vapi.cli("set crypto handler all %s" % engine)
418
419             #
420             # loop through each of the algorithms
421             #
422             for algo in algos:
423                 # with self.subTest(algo=algo['scapy']):
424
425                 p = copy.copy(self.ipv4_params)
426                 p.auth_algo_vpp_id = algo['vpp-integ']
427                 p.crypt_algo_vpp_id = algo['vpp-crypto']
428                 p.crypt_algo = algo['scapy-crypto']
429                 p.auth_algo = algo['scapy-integ']
430                 p.crypt_key = algo['key']
431                 p.salt = algo['salt']
432
433                 self.config_network(p)
434
435                 self.verify_tun_44(p, count=127)
436                 c = p.tun_if.get_rx_stats()
437                 self.assertEqual(c['packets'], 127)
438                 c = p.tun_if.get_tx_stats()
439                 self.assertEqual(c['packets'], 127)
440
441                 #
442                 # rekey the tunnel
443                 #
444                 self.rekey(p)
445                 self.verify_tun_44(p, count=127)
446
447                 self.unconfig_network(p)
448                 p.tun_sa_out.remove_vpp_config()
449                 p.tun_sa_in.remove_vpp_config()
450
451
452 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
453     """ IPsec IPv6 Multi Tunnel interface """
454
455     encryption_type = ESP
456     tun6_encrypt_node_name = "esp6-encrypt-tun"
457     tun6_decrypt_node_name = "esp6-decrypt"
458
459     def setUp(self):
460         super(TestIpsec6MultiTunIfEsp, self).setUp()
461
462         self.tun_if = self.pg0
463
464         self.multi_params = []
465
466         for ii in range(10):
467             p = copy.copy(self.ipv6_params)
468
469             p.remote_tun_if_host = "1111::%d" % (ii + 1)
470             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
471             p.scapy_tun_spi = p.scapy_tun_spi + ii
472             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
473             p.vpp_tun_spi = p.vpp_tun_spi + ii
474
475             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
476             p.scapy_tra_spi = p.scapy_tra_spi + ii
477             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
478             p.vpp_tra_spi = p.vpp_tra_spi + ii
479
480             config_tun_params(p, self.encryption_type, self.tun_if)
481             self.multi_params.append(p)
482
483             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
484                                             p.scapy_tun_spi,
485                                             p.crypt_algo_vpp_id,
486                                             p.crypt_key, p.crypt_key,
487                                             p.auth_algo_vpp_id, p.auth_key,
488                                             p.auth_key, is_ip6=True)
489             p.tun_if.add_vpp_config()
490             p.tun_if.admin_up()
491             p.tun_if.config_ip6()
492
493             r = VppIpRoute(self, p.remote_tun_if_host, 128,
494                            [VppRoutePath(p.tun_if.remote_ip6,
495                                          0xffffffff,
496                                          proto=DpoProto.DPO_PROTO_IP6)])
497             r.add_vpp_config()
498
499     def tearDown(self):
500         super(TestIpsec6MultiTunIfEsp, self).tearDown()
501
502     def test_tun_66(self):
503         """Multiple IPSEC tunnel interfaces """
504         for p in self.multi_params:
505             self.verify_tun_66(p, count=127)
506             c = p.tun_if.get_rx_stats()
507             self.assertEqual(c['packets'], 127)
508             c = p.tun_if.get_tx_stats()
509             self.assertEqual(c['packets'], 127)
510
511
512 class TestIpsecGreTebIfEsp(TemplateIpsec,
513                            IpsecTun4Tests):
514     """ Ipsec GRE TEB ESP - TUN tests """
515     tun4_encrypt_node_name = "esp4-encrypt-tun"
516     tun4_decrypt_node_name = "esp4-decrypt-tun"
517     encryption_type = ESP
518     omac = "00:11:22:33:44:55"
519
520     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
521                          payload_size=100):
522         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
523                 sa.encrypt(IP(src=self.pg0.remote_ip4,
524                               dst=self.pg0.local_ip4) /
525                            GRE() /
526                            Ether(dst=self.omac) /
527                            IP(src="1.1.1.1", dst="1.1.1.2") /
528                            UDP(sport=1144, dport=2233) /
529                            Raw('X' * payload_size))
530                 for i in range(count)]
531
532     def gen_pkts(self, sw_intf, src, dst, count=1,
533                  payload_size=100):
534         return [Ether(dst=self.omac) /
535                 IP(src="1.1.1.1", dst="1.1.1.2") /
536                 UDP(sport=1144, dport=2233) /
537                 Raw('X' * payload_size)
538                 for i in range(count)]
539
540     def verify_decrypted(self, p, rxs):
541         for rx in rxs:
542             self.assert_equal(rx[Ether].dst, self.omac)
543             self.assert_equal(rx[IP].dst, "1.1.1.2")
544
545     def verify_encrypted(self, p, sa, rxs):
546         for rx in rxs:
547             try:
548                 pkt = sa.decrypt(rx[IP])
549                 if not pkt.haslayer(IP):
550                     pkt = IP(pkt[Raw].load)
551                 self.assert_packet_checksums_valid(pkt)
552                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
553                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
554                 self.assertTrue(pkt.haslayer(GRE))
555                 e = pkt[Ether]
556                 self.assertEqual(e[Ether].dst, self.omac)
557                 self.assertEqual(e[IP].dst, "1.1.1.2")
558             except (IndexError, AssertionError):
559                 self.logger.debug(ppp("Unexpected packet:", rx))
560                 try:
561                     self.logger.debug(ppp("Decrypted packet:", pkt))
562                 except:
563                     pass
564                 raise
565
566     def setUp(self):
567         super(TestIpsecGreTebIfEsp, self).setUp()
568
569         self.tun_if = self.pg0
570
571         p = self.ipv4_params
572
573         bd1 = VppBridgeDomain(self, 1)
574         bd1.add_vpp_config()
575
576         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
577                                   p.auth_algo_vpp_id, p.auth_key,
578                                   p.crypt_algo_vpp_id, p.crypt_key,
579                                   self.vpp_esp_protocol,
580                                   self.pg0.local_ip4,
581                                   self.pg0.remote_ip4)
582         p.tun_sa_out.add_vpp_config()
583
584         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
585                                  p.auth_algo_vpp_id, p.auth_key,
586                                  p.crypt_algo_vpp_id, p.crypt_key,
587                                  self.vpp_esp_protocol,
588                                  self.pg0.remote_ip4,
589                                  self.pg0.local_ip4)
590         p.tun_sa_in.add_vpp_config()
591
592         self.tun = VppGreInterface(self,
593                                    self.pg0.local_ip4,
594                                    self.pg0.remote_ip4,
595                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
596                                          GRE_API_TUNNEL_TYPE_TEB))
597         self.tun.add_vpp_config()
598
599         p.tun_protect = VppIpsecTunProtect(self,
600                                            self.tun,
601                                            p.tun_sa_out,
602                                            [p.tun_sa_in])
603
604         p.tun_protect.add_vpp_config()
605
606         self.tun.admin_up()
607         self.tun.config_ip4()
608
609         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
610         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
611
612         self.vapi.cli("clear ipsec sa")
613
614     def tearDown(self):
615         self.tun.unconfig_ip4()
616         super(TestIpsecGreTebIfEsp, self).tearDown()
617
618
619 class TestIpsecGreIfEsp(TemplateIpsec,
620                         IpsecTun4Tests):
621     """ Ipsec GRE ESP - TUN tests """
622     tun4_encrypt_node_name = "esp4-encrypt-tun"
623     tun4_decrypt_node_name = "esp4-decrypt-tun"
624     encryption_type = ESP
625
626     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
627                          payload_size=100):
628         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
629                 sa.encrypt(IP(src=self.pg0.remote_ip4,
630                               dst=self.pg0.local_ip4) /
631                            GRE() /
632                            IP(src=self.pg1.local_ip4,
633                               dst=self.pg1.remote_ip4) /
634                            UDP(sport=1144, dport=2233) /
635                            Raw('X' * payload_size))
636                 for i in range(count)]
637
638     def gen_pkts(self, sw_intf, src, dst, count=1,
639                  payload_size=100):
640         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
641                 IP(src="1.1.1.1", dst="1.1.1.2") /
642                 UDP(sport=1144, dport=2233) /
643                 Raw('X' * payload_size)
644                 for i in range(count)]
645
646     def verify_decrypted(self, p, rxs):
647         for rx in rxs:
648             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
649             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
650
651     def verify_encrypted(self, p, sa, rxs):
652         for rx in rxs:
653             try:
654                 pkt = sa.decrypt(rx[IP])
655                 if not pkt.haslayer(IP):
656                     pkt = IP(pkt[Raw].load)
657                 self.assert_packet_checksums_valid(pkt)
658                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
659                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
660                 self.assertTrue(pkt.haslayer(GRE))
661                 e = pkt[GRE]
662                 self.assertEqual(e[IP].dst, "1.1.1.2")
663             except (IndexError, AssertionError):
664                 self.logger.debug(ppp("Unexpected packet:", rx))
665                 try:
666                     self.logger.debug(ppp("Decrypted packet:", pkt))
667                 except:
668                     pass
669                 raise
670
671     def setUp(self):
672         super(TestIpsecGreIfEsp, self).setUp()
673
674         self.tun_if = self.pg0
675
676         p = self.ipv4_params
677
678         bd1 = VppBridgeDomain(self, 1)
679         bd1.add_vpp_config()
680
681         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
682                                   p.auth_algo_vpp_id, p.auth_key,
683                                   p.crypt_algo_vpp_id, p.crypt_key,
684                                   self.vpp_esp_protocol,
685                                   self.pg0.local_ip4,
686                                   self.pg0.remote_ip4)
687         p.tun_sa_out.add_vpp_config()
688
689         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
690                                  p.auth_algo_vpp_id, p.auth_key,
691                                  p.crypt_algo_vpp_id, p.crypt_key,
692                                  self.vpp_esp_protocol,
693                                  self.pg0.remote_ip4,
694                                  self.pg0.local_ip4)
695         p.tun_sa_in.add_vpp_config()
696
697         self.tun = VppGreInterface(self,
698                                    self.pg0.local_ip4,
699                                    self.pg0.remote_ip4)
700         self.tun.add_vpp_config()
701
702         p.tun_protect = VppIpsecTunProtect(self,
703                                            self.tun,
704                                            p.tun_sa_out,
705                                            [p.tun_sa_in])
706         p.tun_protect.add_vpp_config()
707
708         self.tun.admin_up()
709         self.tun.config_ip4()
710
711         VppIpRoute(self, "1.1.1.2", 32,
712                    [VppRoutePath(self.tun.remote_ip4,
713                                  0xffffffff)]).add_vpp_config()
714
715     def tearDown(self):
716         self.tun.unconfig_ip4()
717         super(TestIpsecGreIfEsp, self).tearDown()
718
719
720 class TemplateIpsec4TunProtect(object):
721     """ IPsec IPv4 Tunnel protect """
722
723     encryption_type = ESP
724     tun4_encrypt_node_name = "esp4-encrypt-tun"
725     tun4_decrypt_node_name = "esp4-decrypt-tun"
726     tun4_input_node = "ipsec4-tun-input"
727
728     def config_sa_tra(self, p):
729         config_tun_params(p, self.encryption_type, self.tun_if)
730
731         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
732                                   p.auth_algo_vpp_id, p.auth_key,
733                                   p.crypt_algo_vpp_id, p.crypt_key,
734                                   self.vpp_esp_protocol,
735                                   flags=p.flags)
736         p.tun_sa_out.add_vpp_config()
737
738         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
739                                  p.auth_algo_vpp_id, p.auth_key,
740                                  p.crypt_algo_vpp_id, p.crypt_key,
741                                  self.vpp_esp_protocol,
742                                  flags=p.flags)
743         p.tun_sa_in.add_vpp_config()
744
745     def config_sa_tun(self, p):
746         config_tun_params(p, self.encryption_type, self.tun_if)
747
748         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
749                                   p.auth_algo_vpp_id, p.auth_key,
750                                   p.crypt_algo_vpp_id, p.crypt_key,
751                                   self.vpp_esp_protocol,
752                                   self.tun_if.remote_addr[p.addr_type],
753                                   self.tun_if.local_addr[p.addr_type],
754                                   flags=p.flags)
755         p.tun_sa_out.add_vpp_config()
756
757         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
758                                  p.auth_algo_vpp_id, p.auth_key,
759                                  p.crypt_algo_vpp_id, p.crypt_key,
760                                  self.vpp_esp_protocol,
761                                  self.tun_if.remote_addr[p.addr_type],
762                                  self.tun_if.local_addr[p.addr_type],
763                                  flags=p.flags)
764         p.tun_sa_in.add_vpp_config()
765
766     def config_protect(self, p):
767         p.tun_protect = VppIpsecTunProtect(self,
768                                            p.tun_if,
769                                            p.tun_sa_out,
770                                            [p.tun_sa_in])
771         p.tun_protect.add_vpp_config()
772
773     def config_network(self, p):
774         p.tun_if = VppIpIpTunInterface(self, self.pg0,
775                                        self.pg0.local_ip4,
776                                        self.pg0.remote_ip4)
777         p.tun_if.add_vpp_config()
778         p.tun_if.admin_up()
779         p.tun_if.config_ip4()
780
781         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
782                              [VppRoutePath(p.tun_if.remote_ip4,
783                                            0xffffffff)])
784         p.route.add_vpp_config()
785
786     def unconfig_network(self, p):
787         p.route.remove_vpp_config()
788         p.tun_if.remove_vpp_config()
789
790     def unconfig_protect(self, p):
791         p.tun_protect.remove_vpp_config()
792
793     def unconfig_sa(self, p):
794         p.tun_sa_out.remove_vpp_config()
795         p.tun_sa_in.remove_vpp_config()
796
797
798 class TestIpsec4TunProtect(TemplateIpsec,
799                            TemplateIpsec4TunProtect,
800                            IpsecTun4):
801     """ IPsec IPv4 Tunnel protect - transport mode"""
802
803     def setUp(self):
804         super(TestIpsec4TunProtect, self).setUp()
805
806         self.tun_if = self.pg0
807
808     def tearDown(self):
809         super(TestIpsec4TunProtect, self).tearDown()
810
811     def test_tun_44(self):
812         """IPSEC tunnel protect"""
813
814         p = self.ipv4_params
815
816         self.config_network(p)
817         self.config_sa_tra(p)
818         self.config_protect(p)
819
820         self.verify_tun_44(p, count=127)
821         c = p.tun_if.get_rx_stats()
822         self.assertEqual(c['packets'], 127)
823         c = p.tun_if.get_tx_stats()
824         self.assertEqual(c['packets'], 127)
825
826         # rekey - create new SAs and update the tunnel protection
827         np = copy.copy(p)
828         np.crypt_key = 'X' + p.crypt_key[1:]
829         np.scapy_tun_spi += 100
830         np.scapy_tun_sa_id += 1
831         np.vpp_tun_spi += 100
832         np.vpp_tun_sa_id += 1
833         np.tun_if.local_spi = p.vpp_tun_spi
834         np.tun_if.remote_spi = p.scapy_tun_spi
835
836         self.config_sa_tra(np)
837         self.config_protect(np)
838         self.unconfig_sa(p)
839
840         self.verify_tun_44(np, count=127)
841         c = p.tun_if.get_rx_stats()
842         self.assertEqual(c['packets'], 254)
843         c = p.tun_if.get_tx_stats()
844         self.assertEqual(c['packets'], 254)
845
846         # teardown
847         self.unconfig_protect(np)
848         self.unconfig_sa(np)
849         self.unconfig_network(p)
850
851
852 class TestIpsec4TunProtectUdp(TemplateIpsec,
853                               TemplateIpsec4TunProtect,
854                               IpsecTun4):
855     """ IPsec IPv4 Tunnel protect - transport mode"""
856
857     def setUp(self):
858         super(TestIpsec4TunProtectUdp, self).setUp()
859
860         self.tun_if = self.pg0
861
862         p = self.ipv4_params
863         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
864                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
865         p.nat_header = UDP(sport=5454, dport=4500)
866         self.config_network(p)
867         self.config_sa_tra(p)
868         self.config_protect(p)
869
870     def tearDown(self):
871         p = self.ipv4_params
872         self.unconfig_protect(p)
873         self.unconfig_sa(p)
874         self.unconfig_network(p)
875         super(TestIpsec4TunProtectUdp, self).tearDown()
876
877     def test_tun_44(self):
878         """IPSEC UDP tunnel protect"""
879
880         p = self.ipv4_params
881
882         self.verify_tun_44(p, count=127)
883         c = p.tun_if.get_rx_stats()
884         self.assertEqual(c['packets'], 127)
885         c = p.tun_if.get_tx_stats()
886         self.assertEqual(c['packets'], 127)
887
888     def test_keepalive(self):
889         """ IPSEC NAT Keepalive """
890         self.verify_keepalive(self.ipv4_params)
891
892
893 class TestIpsec4TunProtectTun(TemplateIpsec,
894                               TemplateIpsec4TunProtect,
895                               IpsecTun4):
896     """ IPsec IPv4 Tunnel protect - tunnel mode"""
897
898     encryption_type = ESP
899     tun4_encrypt_node_name = "esp4-encrypt-tun"
900     tun4_decrypt_node_name = "esp4-decrypt-tun"
901
902     def setUp(self):
903         super(TestIpsec4TunProtectTun, self).setUp()
904
905         self.tun_if = self.pg0
906
907     def tearDown(self):
908         super(TestIpsec4TunProtectTun, self).tearDown()
909
910     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
911                          payload_size=100):
912         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
913                 sa.encrypt(IP(src=sw_intf.remote_ip4,
914                               dst=sw_intf.local_ip4) /
915                            IP(src=src, dst=dst) /
916                            UDP(sport=1144, dport=2233) /
917                            Raw('X' * payload_size))
918                 for i in range(count)]
919
920     def gen_pkts(self, sw_intf, src, dst, count=1,
921                  payload_size=100):
922         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
923                 IP(src=src, dst=dst) /
924                 UDP(sport=1144, dport=2233) /
925                 Raw('X' * payload_size)
926                 for i in range(count)]
927
928     def verify_decrypted(self, p, rxs):
929         for rx in rxs:
930             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
931             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
932             self.assert_packet_checksums_valid(rx)
933
934     def verify_encrypted(self, p, sa, rxs):
935         for rx in rxs:
936             try:
937                 pkt = sa.decrypt(rx[IP])
938                 if not pkt.haslayer(IP):
939                     pkt = IP(pkt[Raw].load)
940                 self.assert_packet_checksums_valid(pkt)
941                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
942                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
943                 inner = pkt[IP].payload
944                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
945
946             except (IndexError, AssertionError):
947                 self.logger.debug(ppp("Unexpected packet:", rx))
948                 try:
949                     self.logger.debug(ppp("Decrypted packet:", pkt))
950                 except:
951                     pass
952                 raise
953
954     def test_tun_44(self):
955         """IPSEC tunnel protect """
956
957         p = self.ipv4_params
958
959         self.config_network(p)
960         self.config_sa_tun(p)
961         self.config_protect(p)
962
963         self.verify_tun_44(p, count=127)
964
965         c = p.tun_if.get_rx_stats()
966         self.assertEqual(c['packets'], 127)
967         c = p.tun_if.get_tx_stats()
968         self.assertEqual(c['packets'], 127)
969
970         # rekey - create new SAs and update the tunnel protection
971         np = copy.copy(p)
972         np.crypt_key = 'X' + p.crypt_key[1:]
973         np.scapy_tun_spi += 100
974         np.scapy_tun_sa_id += 1
975         np.vpp_tun_spi += 100
976         np.vpp_tun_sa_id += 1
977         np.tun_if.local_spi = p.vpp_tun_spi
978         np.tun_if.remote_spi = p.scapy_tun_spi
979
980         self.config_sa_tun(np)
981         self.config_protect(np)
982         self.unconfig_sa(p)
983
984         self.verify_tun_44(np, count=127)
985         c = p.tun_if.get_rx_stats()
986         self.assertEqual(c['packets'], 254)
987         c = p.tun_if.get_tx_stats()
988         self.assertEqual(c['packets'], 254)
989
990         # teardown
991         self.unconfig_protect(np)
992         self.unconfig_sa(np)
993         self.unconfig_network(p)
994
995
996 class TemplateIpsec6TunProtect(object):
997     """ IPsec IPv6 Tunnel protect """
998
999     def config_sa_tra(self, p):
1000         config_tun_params(p, self.encryption_type, self.tun_if)
1001
1002         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1003                                   p.auth_algo_vpp_id, p.auth_key,
1004                                   p.crypt_algo_vpp_id, p.crypt_key,
1005                                   self.vpp_esp_protocol)
1006         p.tun_sa_out.add_vpp_config()
1007
1008         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1009                                  p.auth_algo_vpp_id, p.auth_key,
1010                                  p.crypt_algo_vpp_id, p.crypt_key,
1011                                  self.vpp_esp_protocol)
1012         p.tun_sa_in.add_vpp_config()
1013
1014     def config_sa_tun(self, p):
1015         config_tun_params(p, self.encryption_type, self.tun_if)
1016
1017         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1018                                   p.auth_algo_vpp_id, p.auth_key,
1019                                   p.crypt_algo_vpp_id, p.crypt_key,
1020                                   self.vpp_esp_protocol,
1021                                   self.tun_if.remote_addr[p.addr_type],
1022                                   self.tun_if.local_addr[p.addr_type])
1023         p.tun_sa_out.add_vpp_config()
1024
1025         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1026                                  p.auth_algo_vpp_id, p.auth_key,
1027                                  p.crypt_algo_vpp_id, p.crypt_key,
1028                                  self.vpp_esp_protocol,
1029                                  self.tun_if.remote_addr[p.addr_type],
1030                                  self.tun_if.local_addr[p.addr_type])
1031         p.tun_sa_in.add_vpp_config()
1032
1033     def config_protect(self, p):
1034         p.tun_protect = VppIpsecTunProtect(self,
1035                                            p.tun_if,
1036                                            p.tun_sa_out,
1037                                            [p.tun_sa_in])
1038         p.tun_protect.add_vpp_config()
1039
1040     def config_network(self, p):
1041         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1042                                        self.pg0.local_ip6,
1043                                        self.pg0.remote_ip6)
1044         p.tun_if.add_vpp_config()
1045         p.tun_if.admin_up()
1046         p.tun_if.config_ip6()
1047
1048         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1049                              [VppRoutePath(p.tun_if.remote_ip6,
1050                                            0xffffffff,
1051                                            proto=DpoProto.DPO_PROTO_IP6)])
1052         p.route.add_vpp_config()
1053
1054     def unconfig_network(self, p):
1055         p.route.remove_vpp_config()
1056         p.tun_if.remove_vpp_config()
1057
1058     def unconfig_protect(self, p):
1059         p.tun_protect.remove_vpp_config()
1060
1061     def unconfig_sa(self, p):
1062         p.tun_sa_out.remove_vpp_config()
1063         p.tun_sa_in.remove_vpp_config()
1064
1065
1066 class TestIpsec6TunProtect(TemplateIpsec,
1067                            TemplateIpsec6TunProtect,
1068                            IpsecTun6):
1069     """ IPsec IPv6 Tunnel protect - transport mode"""
1070
1071     encryption_type = ESP
1072     tun6_encrypt_node_name = "esp6-encrypt-tun"
1073     tun6_decrypt_node_name = "esp6-decrypt-tun"
1074
1075     def setUp(self):
1076         super(TestIpsec6TunProtect, self).setUp()
1077
1078         self.tun_if = self.pg0
1079
1080     def tearDown(self):
1081         super(TestIpsec6TunProtect, self).tearDown()
1082
1083     def test_tun_66(self):
1084         """IPSEC tunnel protect"""
1085
1086         p = self.ipv6_params
1087
1088         self.config_network(p)
1089         self.config_sa_tra(p)
1090         self.config_protect(p)
1091
1092         self.verify_tun_66(p, count=127)
1093         c = p.tun_if.get_rx_stats()
1094         self.assertEqual(c['packets'], 127)
1095         c = p.tun_if.get_tx_stats()
1096         self.assertEqual(c['packets'], 127)
1097
1098         # rekey - create new SAs and update the tunnel protection
1099         np = copy.copy(p)
1100         np.crypt_key = 'X' + p.crypt_key[1:]
1101         np.scapy_tun_spi += 100
1102         np.scapy_tun_sa_id += 1
1103         np.vpp_tun_spi += 100
1104         np.vpp_tun_sa_id += 1
1105         np.tun_if.local_spi = p.vpp_tun_spi
1106         np.tun_if.remote_spi = p.scapy_tun_spi
1107
1108         self.config_sa_tra(np)
1109         self.config_protect(np)
1110         self.unconfig_sa(p)
1111
1112         self.verify_tun_66(np, count=127)
1113         c = p.tun_if.get_rx_stats()
1114         self.assertEqual(c['packets'], 254)
1115         c = p.tun_if.get_tx_stats()
1116         self.assertEqual(c['packets'], 254)
1117
1118         # 3 phase rekey
1119         #  1) add two input SAs [old, new]
1120         #  2) swap output SA to [new]
1121         #  3) use only [new] input SA
1122         np3 = copy.copy(np)
1123         np3.crypt_key = 'Z' + p.crypt_key[1:]
1124         np3.scapy_tun_spi += 100
1125         np3.scapy_tun_sa_id += 1
1126         np3.vpp_tun_spi += 100
1127         np3.vpp_tun_sa_id += 1
1128         np3.tun_if.local_spi = p.vpp_tun_spi
1129         np3.tun_if.remote_spi = p.scapy_tun_spi
1130
1131         self.config_sa_tra(np3)
1132
1133         # step 1;
1134         p.tun_protect.update_vpp_config(np.tun_sa_out,
1135                                         [np.tun_sa_in, np3.tun_sa_in])
1136         self.verify_tun_66(np, np, count=127)
1137         self.verify_tun_66(np3, np, count=127)
1138
1139         # step 2;
1140         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1141                                         [np.tun_sa_in, np3.tun_sa_in])
1142         self.verify_tun_66(np, np3, count=127)
1143         self.verify_tun_66(np3, np3, count=127)
1144
1145         # step 1;
1146         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1147                                         [np3.tun_sa_in])
1148         self.verify_tun_66(np3, np3, count=127)
1149         self.verify_drop_tun_66(np, count=127)
1150
1151         c = p.tun_if.get_rx_stats()
1152         self.assertEqual(c['packets'], 127*7)
1153         c = p.tun_if.get_tx_stats()
1154         self.assertEqual(c['packets'], 127*7)
1155         self.unconfig_sa(np)
1156
1157         # teardown
1158         self.unconfig_protect(np3)
1159         self.unconfig_sa(np3)
1160         self.unconfig_network(p)
1161
1162
1163 class TestIpsec6TunProtectTun(TemplateIpsec,
1164                               TemplateIpsec6TunProtect,
1165                               IpsecTun6):
1166     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1167
1168     encryption_type = ESP
1169     tun6_encrypt_node_name = "esp6-encrypt-tun"
1170     tun6_decrypt_node_name = "esp6-decrypt-tun"
1171
1172     def setUp(self):
1173         super(TestIpsec6TunProtectTun, self).setUp()
1174
1175         self.tun_if = self.pg0
1176
1177     def tearDown(self):
1178         super(TestIpsec6TunProtectTun, self).tearDown()
1179
1180     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1181                           payload_size=100):
1182         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1183                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1184                                 dst=sw_intf.local_ip6) /
1185                            IPv6(src=src, dst=dst) /
1186                            UDP(sport=1166, dport=2233) /
1187                            Raw('X' * payload_size))
1188                 for i in range(count)]
1189
1190     def gen_pkts6(self, sw_intf, src, dst, count=1,
1191                   payload_size=100):
1192         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1193                 IPv6(src=src, dst=dst) /
1194                 UDP(sport=1166, dport=2233) /
1195                 Raw('X' * payload_size)
1196                 for i in range(count)]
1197
1198     def verify_decrypted6(self, p, rxs):
1199         for rx in rxs:
1200             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1201             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1202             self.assert_packet_checksums_valid(rx)
1203
1204     def verify_encrypted6(self, p, sa, rxs):
1205         for rx in rxs:
1206             try:
1207                 pkt = sa.decrypt(rx[IPv6])
1208                 if not pkt.haslayer(IPv6):
1209                     pkt = IPv6(pkt[Raw].load)
1210                 self.assert_packet_checksums_valid(pkt)
1211                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1212                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1213                 inner = pkt[IPv6].payload
1214                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1215
1216             except (IndexError, AssertionError):
1217                 self.logger.debug(ppp("Unexpected packet:", rx))
1218                 try:
1219                     self.logger.debug(ppp("Decrypted packet:", pkt))
1220                 except:
1221                     pass
1222                 raise
1223
1224     def test_tun_66(self):
1225         """IPSEC tunnel protect """
1226
1227         p = self.ipv6_params
1228
1229         self.config_network(p)
1230         self.config_sa_tun(p)
1231         self.config_protect(p)
1232
1233         self.verify_tun_66(p, count=127)
1234
1235         c = p.tun_if.get_rx_stats()
1236         self.assertEqual(c['packets'], 127)
1237         c = p.tun_if.get_tx_stats()
1238         self.assertEqual(c['packets'], 127)
1239
1240         # rekey - create new SAs and update the tunnel protection
1241         np = copy.copy(p)
1242         np.crypt_key = 'X' + p.crypt_key[1:]
1243         np.scapy_tun_spi += 100
1244         np.scapy_tun_sa_id += 1
1245         np.vpp_tun_spi += 100
1246         np.vpp_tun_sa_id += 1
1247         np.tun_if.local_spi = p.vpp_tun_spi
1248         np.tun_if.remote_spi = p.scapy_tun_spi
1249
1250         self.config_sa_tun(np)
1251         self.config_protect(np)
1252         self.unconfig_sa(p)
1253
1254         self.verify_tun_66(np, count=127)
1255         c = p.tun_if.get_rx_stats()
1256         self.assertEqual(c['packets'], 254)
1257         c = p.tun_if.get_tx_stats()
1258         self.assertEqual(c['packets'], 254)
1259
1260         # teardown
1261         self.unconfig_protect(np)
1262         self.unconfig_sa(np)
1263         self.unconfig_network(p)
1264
1265
1266 if __name__ == '__main__':
1267     unittest.main(testRunner=VppTestRunner)