ipsec: support 4o6 and 6o4 for tunnel protect
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 from util import ppp
19 from vpp_papi import VppEnum
20
21
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23     """ IPsec tunnel interface tests """
24
25     encryption_type = ESP
26
27     @classmethod
28     def setUpClass(cls):
29         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
34
35     def setUp(self):
36         super(TemplateIpsec4TunIfEsp, self).setUp()
37
38         self.tun_if = self.pg0
39
40         p = self.ipv4_params
41
42         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
44                                         p.crypt_key, p.crypt_key,
45                                         p.auth_algo_vpp_id, p.auth_key,
46                                         p.auth_key)
47         p.tun_if.add_vpp_config()
48         p.tun_if.admin_up()
49         p.tun_if.config_ip4()
50         p.tun_if.config_ip6()
51
52         r = VppIpRoute(self, p.remote_tun_if_host, 32,
53                        [VppRoutePath(p.tun_if.remote_ip4,
54                                      0xffffffff)])
55         r.add_vpp_config()
56         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57                        [VppRoutePath(p.tun_if.remote_ip6,
58                                      0xffffffff,
59                                      proto=DpoProto.DPO_PROTO_IP6)])
60         r.add_vpp_config()
61
62     def tearDown(self):
63         super(TemplateIpsec4TunIfEsp, self).tearDown()
64
65
66 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
67     """ IPsec UDP tunnel interface tests """
68
69     tun4_encrypt_node_name = "esp4-encrypt-tun"
70     tun4_decrypt_node_name = "esp4-decrypt"
71     encryption_type = ESP
72
73     @classmethod
74     def setUpClass(cls):
75         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
76
77     @classmethod
78     def tearDownClass(cls):
79         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
80
81     def setUp(self):
82         super(TemplateIpsec4TunIfEspUdp, self).setUp()
83
84         self.tun_if = self.pg0
85
86         p = self.ipv4_params
87         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
88                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
89         p.nat_header = UDP(sport=5454, dport=4500)
90
91         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
92                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
93                                         p.crypt_key, p.crypt_key,
94                                         p.auth_algo_vpp_id, p.auth_key,
95                                         p.auth_key, udp_encap=True)
96         p.tun_if.add_vpp_config()
97         p.tun_if.admin_up()
98         p.tun_if.config_ip4()
99         p.tun_if.config_ip6()
100
101         r = VppIpRoute(self, p.remote_tun_if_host, 32,
102                        [VppRoutePath(p.tun_if.remote_ip4,
103                                      0xffffffff)])
104         r.add_vpp_config()
105         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106                        [VppRoutePath(p.tun_if.remote_ip6,
107                                      0xffffffff,
108                                      proto=DpoProto.DPO_PROTO_IP6)])
109         r.add_vpp_config()
110
111     def tearDown(self):
112         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
113
114
115 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
116     """ Ipsec ESP - TUN tests """
117     tun4_encrypt_node_name = "esp4-encrypt-tun"
118     tun4_decrypt_node_name = "esp4-decrypt"
119
120     def test_tun_basic64(self):
121         """ ipsec 6o4 tunnel basic test """
122         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
123
124         self.verify_tun_64(self.params[socket.AF_INET], count=1)
125
126     def test_tun_burst64(self):
127         """ ipsec 6o4 tunnel basic test """
128         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
129
130         self.verify_tun_64(self.params[socket.AF_INET], count=257)
131
132     def test_tun_basic_frag44(self):
133         """ ipsec 4o4 tunnel frag basic test """
134         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
135
136         p = self.ipv4_params
137
138         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
139                                        [1500, 0, 0, 0])
140         self.verify_tun_44(self.params[socket.AF_INET],
141                            count=1, payload_size=1800, n_rx=2)
142         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
143                                        [9000, 0, 0, 0])
144
145
146 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
147     """ Ipsec ESP UDP tests """
148
149     tun4_input_node = "ipsec4-if-input"
150
151     def test_keepalive(self):
152         """ IPSEC NAT Keepalive """
153         self.verify_keepalive(self.ipv4_params)
154
155
156 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
157     """ Ipsec ESP - TCP tests """
158     pass
159
160
161 class TemplateIpsec6TunIfEsp(TemplateIpsec):
162     """ IPsec tunnel interface tests """
163
164     encryption_type = ESP
165
166     def setUp(self):
167         super(TemplateIpsec6TunIfEsp, self).setUp()
168
169         self.tun_if = self.pg0
170
171         p = self.ipv6_params
172         tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173                                       p.scapy_tun_spi, p.crypt_algo_vpp_id,
174                                       p.crypt_key, p.crypt_key,
175                                       p.auth_algo_vpp_id, p.auth_key,
176                                       p.auth_key, is_ip6=True)
177         tun_if.add_vpp_config()
178         tun_if.admin_up()
179         tun_if.config_ip6()
180         tun_if.config_ip4()
181
182         r = VppIpRoute(self, p.remote_tun_if_host, 128,
183                        [VppRoutePath(tun_if.remote_ip6,
184                                      0xffffffff,
185                                      proto=DpoProto.DPO_PROTO_IP6)])
186         r.add_vpp_config()
187         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
188                        [VppRoutePath(tun_if.remote_ip4,
189                                      0xffffffff)])
190         r.add_vpp_config()
191
192     def tearDown(self):
193         super(TemplateIpsec6TunIfEsp, self).tearDown()
194
195
196 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
197     """ Ipsec ESP - TUN tests """
198     tun6_encrypt_node_name = "esp6-encrypt-tun"
199     tun6_decrypt_node_name = "esp6-decrypt"
200
201     def test_tun_basic46(self):
202         """ ipsec 4o6 tunnel basic test """
203         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
204         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
205
206     def test_tun_burst46(self):
207         """ ipsec 4o6 tunnel burst test """
208         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
209         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
210
211
212 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
213     """ IPsec IPv4 Multi Tunnel interface """
214
215     encryption_type = ESP
216     tun4_encrypt_node_name = "esp4-encrypt-tun"
217     tun4_decrypt_node_name = "esp4-decrypt"
218
219     def setUp(self):
220         super(TestIpsec4MultiTunIfEsp, self).setUp()
221
222         self.tun_if = self.pg0
223
224         self.multi_params = []
225
226         for ii in range(10):
227             p = copy.copy(self.ipv4_params)
228
229             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
230             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
231             p.scapy_tun_spi = p.scapy_tun_spi + ii
232             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
233             p.vpp_tun_spi = p.vpp_tun_spi + ii
234
235             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
236             p.scapy_tra_spi = p.scapy_tra_spi + ii
237             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
238             p.vpp_tra_spi = p.vpp_tra_spi + ii
239
240             config_tun_params(p, self.encryption_type, self.tun_if)
241             self.multi_params.append(p)
242
243             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
244                                             p.scapy_tun_spi,
245                                             p.crypt_algo_vpp_id,
246                                             p.crypt_key, p.crypt_key,
247                                             p.auth_algo_vpp_id, p.auth_key,
248                                             p.auth_key)
249             p.tun_if.add_vpp_config()
250             p.tun_if.admin_up()
251             p.tun_if.config_ip4()
252
253             VppIpRoute(self, p.remote_tun_if_host, 32,
254                        [VppRoutePath(p.tun_if.remote_ip4,
255                                      0xffffffff)]).add_vpp_config()
256
257     def tearDown(self):
258         super(TestIpsec4MultiTunIfEsp, self).tearDown()
259
260     def test_tun_44(self):
261         """Multiple IPSEC tunnel interfaces """
262         for p in self.multi_params:
263             self.verify_tun_44(p, count=127)
264             c = p.tun_if.get_rx_stats()
265             self.assertEqual(c['packets'], 127)
266             c = p.tun_if.get_tx_stats()
267             self.assertEqual(c['packets'], 127)
268
269
270 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
271     """ IPsec IPv4 Tunnel interface all Algos """
272
273     encryption_type = ESP
274     tun4_encrypt_node_name = "esp4-encrypt-tun"
275     tun4_decrypt_node_name = "esp4-decrypt"
276
277     def config_network(self, p):
278         config_tun_params(p, self.encryption_type, self.tun_if)
279
280         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
281                                         p.scapy_tun_spi,
282                                         p.crypt_algo_vpp_id,
283                                         p.crypt_key, p.crypt_key,
284                                         p.auth_algo_vpp_id, p.auth_key,
285                                         p.auth_key,
286                                         salt=p.salt)
287         p.tun_if.add_vpp_config()
288         p.tun_if.admin_up()
289         p.tun_if.config_ip4()
290         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
291         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
292
293         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
294                              [VppRoutePath(p.tun_if.remote_ip4,
295                                            0xffffffff)])
296         p.route.add_vpp_config()
297
298     def unconfig_network(self, p):
299         p.tun_if.unconfig_ip4()
300         p.tun_if.remove_vpp_config()
301         p.route.remove_vpp_config()
302
303     def setUp(self):
304         super(TestIpsec4TunIfEspAll, self).setUp()
305
306         self.tun_if = self.pg0
307
308     def tearDown(self):
309         super(TestIpsec4TunIfEspAll, self).tearDown()
310
311     def rekey(self, p):
312         #
313         # change the key and the SPI
314         #
315         p.crypt_key = 'X' + p.crypt_key[1:]
316         p.scapy_tun_spi += 1
317         p.scapy_tun_sa_id += 1
318         p.vpp_tun_spi += 1
319         p.vpp_tun_sa_id += 1
320         p.tun_if.local_spi = p.vpp_tun_spi
321         p.tun_if.remote_spi = p.scapy_tun_spi
322
323         config_tun_params(p, self.encryption_type, self.tun_if)
324
325         p.tun_sa_in = VppIpsecSA(self,
326                                  p.scapy_tun_sa_id,
327                                  p.scapy_tun_spi,
328                                  p.auth_algo_vpp_id,
329                                  p.auth_key,
330                                  p.crypt_algo_vpp_id,
331                                  p.crypt_key,
332                                  self.vpp_esp_protocol,
333                                  self.tun_if.local_addr[p.addr_type],
334                                  self.tun_if.remote_addr[p.addr_type],
335                                  flags=p.flags,
336                                  salt=p.salt)
337         p.tun_sa_out = VppIpsecSA(self,
338                                   p.vpp_tun_sa_id,
339                                   p.vpp_tun_spi,
340                                   p.auth_algo_vpp_id,
341                                   p.auth_key,
342                                   p.crypt_algo_vpp_id,
343                                   p.crypt_key,
344                                   self.vpp_esp_protocol,
345                                   self.tun_if.remote_addr[p.addr_type],
346                                   self.tun_if.local_addr[p.addr_type],
347                                   flags=p.flags,
348                                   salt=p.salt)
349         p.tun_sa_in.add_vpp_config()
350         p.tun_sa_out.add_vpp_config()
351
352         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
353                                          sa_id=p.tun_sa_in.id,
354                                          is_outbound=1)
355         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
356                                          sa_id=p.tun_sa_out.id,
357                                          is_outbound=0)
358         self.logger.info(self.vapi.cli("sh ipsec sa"))
359
360     def test_tun_44(self):
361         """IPSEC tunnel all algos """
362
363         # foreach VPP crypto engine
364         engines = ["ia32", "ipsecmb", "openssl"]
365
366         # foreach crypto algorithm
367         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
368                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
369                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
370                                 IPSEC_API_INTEG_ALG_NONE),
371                   'scapy-crypto': "AES-GCM",
372                   'scapy-integ': "NULL",
373                   'key': "JPjyOWBeVEQiMe7h",
374                   'salt': 3333},
375                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
376                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
377                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
378                                 IPSEC_API_INTEG_ALG_NONE),
379                   'scapy-crypto': "AES-GCM",
380                   'scapy-integ': "NULL",
381                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
382                   'salt': 0},
383                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
384                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
385                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
386                                 IPSEC_API_INTEG_ALG_NONE),
387                   'scapy-crypto': "AES-GCM",
388                   'scapy-integ': "NULL",
389                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
390                   'salt': 9999},
391                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
392                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
393                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
394                                 IPSEC_API_INTEG_ALG_SHA1_96),
395                   'scapy-crypto': "AES-CBC",
396                   'scapy-integ': "HMAC-SHA1-96",
397                   'salt': 0,
398                   'key': "JPjyOWBeVEQiMe7h"},
399                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
400                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
401                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
402                                 IPSEC_API_INTEG_ALG_SHA1_96),
403                   'scapy-crypto': "AES-CBC",
404                   'scapy-integ': "HMAC-SHA1-96",
405                   'salt': 0,
406                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
407                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
408                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
409                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
410                                 IPSEC_API_INTEG_ALG_SHA1_96),
411                   'scapy-crypto': "AES-CBC",
412                   'scapy-integ': "HMAC-SHA1-96",
413                   'salt': 0,
414                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
415                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
416                                  IPSEC_API_CRYPTO_ALG_NONE),
417                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
418                                 IPSEC_API_INTEG_ALG_SHA1_96),
419                   'scapy-crypto': "NULL",
420                   'scapy-integ': "HMAC-SHA1-96",
421                   'salt': 0,
422                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
423
424         for engine in engines:
425             self.vapi.cli("set crypto handler all %s" % engine)
426
427             #
428             # loop through each of the algorithms
429             #
430             for algo in algos:
431                 # with self.subTest(algo=algo['scapy']):
432
433                 p = copy.copy(self.ipv4_params)
434                 p.auth_algo_vpp_id = algo['vpp-integ']
435                 p.crypt_algo_vpp_id = algo['vpp-crypto']
436                 p.crypt_algo = algo['scapy-crypto']
437                 p.auth_algo = algo['scapy-integ']
438                 p.crypt_key = algo['key']
439                 p.salt = algo['salt']
440
441                 self.config_network(p)
442
443                 self.verify_tun_44(p, count=127)
444                 c = p.tun_if.get_rx_stats()
445                 self.assertEqual(c['packets'], 127)
446                 c = p.tun_if.get_tx_stats()
447                 self.assertEqual(c['packets'], 127)
448
449                 #
450                 # rekey the tunnel
451                 #
452                 self.rekey(p)
453                 self.verify_tun_44(p, count=127)
454
455                 self.unconfig_network(p)
456                 p.tun_sa_out.remove_vpp_config()
457                 p.tun_sa_in.remove_vpp_config()
458
459
460 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
461     """ IPsec IPv6 Multi Tunnel interface """
462
463     encryption_type = ESP
464     tun6_encrypt_node_name = "esp6-encrypt-tun"
465     tun6_decrypt_node_name = "esp6-decrypt"
466
467     def setUp(self):
468         super(TestIpsec6MultiTunIfEsp, self).setUp()
469
470         self.tun_if = self.pg0
471
472         self.multi_params = []
473
474         for ii in range(10):
475             p = copy.copy(self.ipv6_params)
476
477             p.remote_tun_if_host = "1111::%d" % (ii + 1)
478             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
479             p.scapy_tun_spi = p.scapy_tun_spi + ii
480             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
481             p.vpp_tun_spi = p.vpp_tun_spi + ii
482
483             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
484             p.scapy_tra_spi = p.scapy_tra_spi + ii
485             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
486             p.vpp_tra_spi = p.vpp_tra_spi + ii
487
488             config_tun_params(p, self.encryption_type, self.tun_if)
489             self.multi_params.append(p)
490
491             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
492                                             p.scapy_tun_spi,
493                                             p.crypt_algo_vpp_id,
494                                             p.crypt_key, p.crypt_key,
495                                             p.auth_algo_vpp_id, p.auth_key,
496                                             p.auth_key, is_ip6=True)
497             p.tun_if.add_vpp_config()
498             p.tun_if.admin_up()
499             p.tun_if.config_ip6()
500
501             r = VppIpRoute(self, p.remote_tun_if_host, 128,
502                            [VppRoutePath(p.tun_if.remote_ip6,
503                                          0xffffffff,
504                                          proto=DpoProto.DPO_PROTO_IP6)])
505             r.add_vpp_config()
506
507     def tearDown(self):
508         super(TestIpsec6MultiTunIfEsp, self).tearDown()
509
510     def test_tun_66(self):
511         """Multiple IPSEC tunnel interfaces """
512         for p in self.multi_params:
513             self.verify_tun_66(p, count=127)
514             c = p.tun_if.get_rx_stats()
515             self.assertEqual(c['packets'], 127)
516             c = p.tun_if.get_tx_stats()
517             self.assertEqual(c['packets'], 127)
518
519
520 class TestIpsecGreTebIfEsp(TemplateIpsec,
521                            IpsecTun4Tests):
522     """ Ipsec GRE TEB ESP - TUN tests """
523     tun4_encrypt_node_name = "esp4-encrypt-tun"
524     tun4_decrypt_node_name = "esp4-decrypt-tun"
525     encryption_type = ESP
526     omac = "00:11:22:33:44:55"
527
528     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
529                          payload_size=100):
530         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
531                 sa.encrypt(IP(src=self.pg0.remote_ip4,
532                               dst=self.pg0.local_ip4) /
533                            GRE() /
534                            Ether(dst=self.omac) /
535                            IP(src="1.1.1.1", dst="1.1.1.2") /
536                            UDP(sport=1144, dport=2233) /
537                            Raw('X' * payload_size))
538                 for i in range(count)]
539
540     def gen_pkts(self, sw_intf, src, dst, count=1,
541                  payload_size=100):
542         return [Ether(dst=self.omac) /
543                 IP(src="1.1.1.1", dst="1.1.1.2") /
544                 UDP(sport=1144, dport=2233) /
545                 Raw('X' * payload_size)
546                 for i in range(count)]
547
548     def verify_decrypted(self, p, rxs):
549         for rx in rxs:
550             self.assert_equal(rx[Ether].dst, self.omac)
551             self.assert_equal(rx[IP].dst, "1.1.1.2")
552
553     def verify_encrypted(self, p, sa, rxs):
554         for rx in rxs:
555             try:
556                 pkt = sa.decrypt(rx[IP])
557                 if not pkt.haslayer(IP):
558                     pkt = IP(pkt[Raw].load)
559                 self.assert_packet_checksums_valid(pkt)
560                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
561                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
562                 self.assertTrue(pkt.haslayer(GRE))
563                 e = pkt[Ether]
564                 self.assertEqual(e[Ether].dst, self.omac)
565                 self.assertEqual(e[IP].dst, "1.1.1.2")
566             except (IndexError, AssertionError):
567                 self.logger.debug(ppp("Unexpected packet:", rx))
568                 try:
569                     self.logger.debug(ppp("Decrypted packet:", pkt))
570                 except:
571                     pass
572                 raise
573
574     def setUp(self):
575         super(TestIpsecGreTebIfEsp, self).setUp()
576
577         self.tun_if = self.pg0
578
579         p = self.ipv4_params
580
581         bd1 = VppBridgeDomain(self, 1)
582         bd1.add_vpp_config()
583
584         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
585                                   p.auth_algo_vpp_id, p.auth_key,
586                                   p.crypt_algo_vpp_id, p.crypt_key,
587                                   self.vpp_esp_protocol,
588                                   self.pg0.local_ip4,
589                                   self.pg0.remote_ip4)
590         p.tun_sa_out.add_vpp_config()
591
592         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
593                                  p.auth_algo_vpp_id, p.auth_key,
594                                  p.crypt_algo_vpp_id, p.crypt_key,
595                                  self.vpp_esp_protocol,
596                                  self.pg0.remote_ip4,
597                                  self.pg0.local_ip4)
598         p.tun_sa_in.add_vpp_config()
599
600         self.tun = VppGreInterface(self,
601                                    self.pg0.local_ip4,
602                                    self.pg0.remote_ip4,
603                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
604                                          GRE_API_TUNNEL_TYPE_TEB))
605         self.tun.add_vpp_config()
606
607         p.tun_protect = VppIpsecTunProtect(self,
608                                            self.tun,
609                                            p.tun_sa_out,
610                                            [p.tun_sa_in])
611
612         p.tun_protect.add_vpp_config()
613
614         self.tun.admin_up()
615         self.tun.config_ip4()
616
617         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
618         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
619
620         self.vapi.cli("clear ipsec sa")
621
622     def tearDown(self):
623         self.tun.unconfig_ip4()
624         super(TestIpsecGreTebIfEsp, self).tearDown()
625
626
627 class TestIpsecGreIfEsp(TemplateIpsec,
628                         IpsecTun4Tests):
629     """ Ipsec GRE ESP - TUN tests """
630     tun4_encrypt_node_name = "esp4-encrypt-tun"
631     tun4_decrypt_node_name = "esp4-decrypt-tun"
632     encryption_type = ESP
633
634     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
635                          payload_size=100):
636         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
637                 sa.encrypt(IP(src=self.pg0.remote_ip4,
638                               dst=self.pg0.local_ip4) /
639                            GRE() /
640                            IP(src=self.pg1.local_ip4,
641                               dst=self.pg1.remote_ip4) /
642                            UDP(sport=1144, dport=2233) /
643                            Raw('X' * payload_size))
644                 for i in range(count)]
645
646     def gen_pkts(self, sw_intf, src, dst, count=1,
647                  payload_size=100):
648         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
649                 IP(src="1.1.1.1", dst="1.1.1.2") /
650                 UDP(sport=1144, dport=2233) /
651                 Raw('X' * payload_size)
652                 for i in range(count)]
653
654     def verify_decrypted(self, p, rxs):
655         for rx in rxs:
656             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
657             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
658
659     def verify_encrypted(self, p, sa, rxs):
660         for rx in rxs:
661             try:
662                 pkt = sa.decrypt(rx[IP])
663                 if not pkt.haslayer(IP):
664                     pkt = IP(pkt[Raw].load)
665                 self.assert_packet_checksums_valid(pkt)
666                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
667                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
668                 self.assertTrue(pkt.haslayer(GRE))
669                 e = pkt[GRE]
670                 self.assertEqual(e[IP].dst, "1.1.1.2")
671             except (IndexError, AssertionError):
672                 self.logger.debug(ppp("Unexpected packet:", rx))
673                 try:
674                     self.logger.debug(ppp("Decrypted packet:", pkt))
675                 except:
676                     pass
677                 raise
678
679     def setUp(self):
680         super(TestIpsecGreIfEsp, self).setUp()
681
682         self.tun_if = self.pg0
683
684         p = self.ipv4_params
685
686         bd1 = VppBridgeDomain(self, 1)
687         bd1.add_vpp_config()
688
689         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
690                                   p.auth_algo_vpp_id, p.auth_key,
691                                   p.crypt_algo_vpp_id, p.crypt_key,
692                                   self.vpp_esp_protocol,
693                                   self.pg0.local_ip4,
694                                   self.pg0.remote_ip4)
695         p.tun_sa_out.add_vpp_config()
696
697         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
698                                  p.auth_algo_vpp_id, p.auth_key,
699                                  p.crypt_algo_vpp_id, p.crypt_key,
700                                  self.vpp_esp_protocol,
701                                  self.pg0.remote_ip4,
702                                  self.pg0.local_ip4)
703         p.tun_sa_in.add_vpp_config()
704
705         self.tun = VppGreInterface(self,
706                                    self.pg0.local_ip4,
707                                    self.pg0.remote_ip4)
708         self.tun.add_vpp_config()
709
710         p.tun_protect = VppIpsecTunProtect(self,
711                                            self.tun,
712                                            p.tun_sa_out,
713                                            [p.tun_sa_in])
714         p.tun_protect.add_vpp_config()
715
716         self.tun.admin_up()
717         self.tun.config_ip4()
718
719         VppIpRoute(self, "1.1.1.2", 32,
720                    [VppRoutePath(self.tun.remote_ip4,
721                                  0xffffffff)]).add_vpp_config()
722
723     def tearDown(self):
724         self.tun.unconfig_ip4()
725         super(TestIpsecGreIfEsp, self).tearDown()
726
727
728 class TemplateIpsec4TunProtect(object):
729     """ IPsec IPv4 Tunnel protect """
730
731     encryption_type = ESP
732     tun4_encrypt_node_name = "esp4-encrypt-tun"
733     tun4_decrypt_node_name = "esp4-decrypt-tun"
734     tun4_input_node = "ipsec4-tun-input"
735
736     def config_sa_tra(self, p):
737         config_tun_params(p, self.encryption_type, self.tun_if)
738
739         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
740                                   p.auth_algo_vpp_id, p.auth_key,
741                                   p.crypt_algo_vpp_id, p.crypt_key,
742                                   self.vpp_esp_protocol,
743                                   flags=p.flags)
744         p.tun_sa_out.add_vpp_config()
745
746         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
747                                  p.auth_algo_vpp_id, p.auth_key,
748                                  p.crypt_algo_vpp_id, p.crypt_key,
749                                  self.vpp_esp_protocol,
750                                  flags=p.flags)
751         p.tun_sa_in.add_vpp_config()
752
753     def config_sa_tun(self, p):
754         config_tun_params(p, self.encryption_type, self.tun_if)
755
756         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
757                                   p.auth_algo_vpp_id, p.auth_key,
758                                   p.crypt_algo_vpp_id, p.crypt_key,
759                                   self.vpp_esp_protocol,
760                                   self.tun_if.remote_addr[p.addr_type],
761                                   self.tun_if.local_addr[p.addr_type],
762                                   flags=p.flags)
763         p.tun_sa_out.add_vpp_config()
764
765         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
766                                  p.auth_algo_vpp_id, p.auth_key,
767                                  p.crypt_algo_vpp_id, p.crypt_key,
768                                  self.vpp_esp_protocol,
769                                  self.tun_if.remote_addr[p.addr_type],
770                                  self.tun_if.local_addr[p.addr_type],
771                                  flags=p.flags)
772         p.tun_sa_in.add_vpp_config()
773
774     def config_protect(self, p):
775         p.tun_protect = VppIpsecTunProtect(self,
776                                            p.tun_if,
777                                            p.tun_sa_out,
778                                            [p.tun_sa_in])
779         p.tun_protect.add_vpp_config()
780
781     def config_network(self, p):
782         p.tun_if = VppIpIpTunInterface(self, self.pg0,
783                                        self.pg0.local_ip4,
784                                        self.pg0.remote_ip4)
785         p.tun_if.add_vpp_config()
786         p.tun_if.admin_up()
787         p.tun_if.config_ip4()
788         p.tun_if.config_ip6()
789
790         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
791                              [VppRoutePath(p.tun_if.remote_ip4,
792                                            0xffffffff)])
793         p.route.add_vpp_config()
794         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
795                        [VppRoutePath(p.tun_if.remote_ip6,
796                                      0xffffffff,
797                                      proto=DpoProto.DPO_PROTO_IP6)])
798         r.add_vpp_config()
799
800     def unconfig_network(self, p):
801         p.route.remove_vpp_config()
802         p.tun_if.remove_vpp_config()
803
804     def unconfig_protect(self, p):
805         p.tun_protect.remove_vpp_config()
806
807     def unconfig_sa(self, p):
808         p.tun_sa_out.remove_vpp_config()
809         p.tun_sa_in.remove_vpp_config()
810
811
812 class TestIpsec4TunProtect(TemplateIpsec,
813                            TemplateIpsec4TunProtect,
814                            IpsecTun4):
815     """ IPsec IPv4 Tunnel protect - transport mode"""
816
817     def setUp(self):
818         super(TestIpsec4TunProtect, self).setUp()
819
820         self.tun_if = self.pg0
821
822     def tearDown(self):
823         super(TestIpsec4TunProtect, self).tearDown()
824
825     def test_tun_44(self):
826         """IPSEC tunnel protect"""
827
828         p = self.ipv4_params
829
830         self.config_network(p)
831         self.config_sa_tra(p)
832         self.config_protect(p)
833
834         self.verify_tun_44(p, count=127)
835         c = p.tun_if.get_rx_stats()
836         self.assertEqual(c['packets'], 127)
837         c = p.tun_if.get_tx_stats()
838         self.assertEqual(c['packets'], 127)
839
840         self.vapi.cli("clear ipsec sa")
841         self.verify_tun_64(p, count=127)
842         c = p.tun_if.get_rx_stats()
843         self.assertEqual(c['packets'], 254)
844         c = p.tun_if.get_tx_stats()
845         self.assertEqual(c['packets'], 254)
846
847         # rekey - create new SAs and update the tunnel protection
848         np = copy.copy(p)
849         np.crypt_key = 'X' + p.crypt_key[1:]
850         np.scapy_tun_spi += 100
851         np.scapy_tun_sa_id += 1
852         np.vpp_tun_spi += 100
853         np.vpp_tun_sa_id += 1
854         np.tun_if.local_spi = p.vpp_tun_spi
855         np.tun_if.remote_spi = p.scapy_tun_spi
856
857         self.config_sa_tra(np)
858         self.config_protect(np)
859         self.unconfig_sa(p)
860
861         self.verify_tun_44(np, count=127)
862         c = p.tun_if.get_rx_stats()
863         self.assertEqual(c['packets'], 381)
864         c = p.tun_if.get_tx_stats()
865         self.assertEqual(c['packets'], 381)
866
867         # teardown
868         self.unconfig_protect(np)
869         self.unconfig_sa(np)
870         self.unconfig_network(p)
871
872
873 class TestIpsec4TunProtectUdp(TemplateIpsec,
874                               TemplateIpsec4TunProtect,
875                               IpsecTun4):
876     """ IPsec IPv4 Tunnel protect - transport mode"""
877
878     def setUp(self):
879         super(TestIpsec4TunProtectUdp, self).setUp()
880
881         self.tun_if = self.pg0
882
883         p = self.ipv4_params
884         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
885                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
886         p.nat_header = UDP(sport=5454, dport=4500)
887         self.config_network(p)
888         self.config_sa_tra(p)
889         self.config_protect(p)
890
891     def tearDown(self):
892         p = self.ipv4_params
893         self.unconfig_protect(p)
894         self.unconfig_sa(p)
895         self.unconfig_network(p)
896         super(TestIpsec4TunProtectUdp, self).tearDown()
897
898     def test_tun_44(self):
899         """IPSEC UDP tunnel protect"""
900
901         p = self.ipv4_params
902
903         self.verify_tun_44(p, count=127)
904         c = p.tun_if.get_rx_stats()
905         self.assertEqual(c['packets'], 127)
906         c = p.tun_if.get_tx_stats()
907         self.assertEqual(c['packets'], 127)
908
909     def test_keepalive(self):
910         """ IPSEC NAT Keepalive """
911         self.verify_keepalive(self.ipv4_params)
912
913
914 class TestIpsec4TunProtectTun(TemplateIpsec,
915                               TemplateIpsec4TunProtect,
916                               IpsecTun4):
917     """ IPsec IPv4 Tunnel protect - tunnel mode"""
918
919     encryption_type = ESP
920     tun4_encrypt_node_name = "esp4-encrypt-tun"
921     tun4_decrypt_node_name = "esp4-decrypt-tun"
922
923     def setUp(self):
924         super(TestIpsec4TunProtectTun, self).setUp()
925
926         self.tun_if = self.pg0
927
928     def tearDown(self):
929         super(TestIpsec4TunProtectTun, self).tearDown()
930
931     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
932                          payload_size=100):
933         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
934                 sa.encrypt(IP(src=sw_intf.remote_ip4,
935                               dst=sw_intf.local_ip4) /
936                            IP(src=src, dst=dst) /
937                            UDP(sport=1144, dport=2233) /
938                            Raw('X' * payload_size))
939                 for i in range(count)]
940
941     def gen_pkts(self, sw_intf, src, dst, count=1,
942                  payload_size=100):
943         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
944                 IP(src=src, dst=dst) /
945                 UDP(sport=1144, dport=2233) /
946                 Raw('X' * payload_size)
947                 for i in range(count)]
948
949     def verify_decrypted(self, p, rxs):
950         for rx in rxs:
951             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
952             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
953             self.assert_packet_checksums_valid(rx)
954
955     def verify_encrypted(self, p, sa, rxs):
956         for rx in rxs:
957             try:
958                 pkt = sa.decrypt(rx[IP])
959                 if not pkt.haslayer(IP):
960                     pkt = IP(pkt[Raw].load)
961                 self.assert_packet_checksums_valid(pkt)
962                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
963                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
964                 inner = pkt[IP].payload
965                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
966
967             except (IndexError, AssertionError):
968                 self.logger.debug(ppp("Unexpected packet:", rx))
969                 try:
970                     self.logger.debug(ppp("Decrypted packet:", pkt))
971                 except:
972                     pass
973                 raise
974
975     def test_tun_44(self):
976         """IPSEC tunnel protect """
977
978         p = self.ipv4_params
979
980         self.config_network(p)
981         self.config_sa_tun(p)
982         self.config_protect(p)
983
984         self.verify_tun_44(p, count=127)
985
986         c = p.tun_if.get_rx_stats()
987         self.assertEqual(c['packets'], 127)
988         c = p.tun_if.get_tx_stats()
989         self.assertEqual(c['packets'], 127)
990
991         # rekey - create new SAs and update the tunnel protection
992         np = copy.copy(p)
993         np.crypt_key = 'X' + p.crypt_key[1:]
994         np.scapy_tun_spi += 100
995         np.scapy_tun_sa_id += 1
996         np.vpp_tun_spi += 100
997         np.vpp_tun_sa_id += 1
998         np.tun_if.local_spi = p.vpp_tun_spi
999         np.tun_if.remote_spi = p.scapy_tun_spi
1000
1001         self.config_sa_tun(np)
1002         self.config_protect(np)
1003         self.unconfig_sa(p)
1004
1005         self.verify_tun_44(np, count=127)
1006         c = p.tun_if.get_rx_stats()
1007         self.assertEqual(c['packets'], 254)
1008         c = p.tun_if.get_tx_stats()
1009         self.assertEqual(c['packets'], 254)
1010
1011         # teardown
1012         self.unconfig_protect(np)
1013         self.unconfig_sa(np)
1014         self.unconfig_network(p)
1015
1016
1017 class TemplateIpsec6TunProtect(object):
1018     """ IPsec IPv6 Tunnel protect """
1019
1020     def config_sa_tra(self, p):
1021         config_tun_params(p, self.encryption_type, self.tun_if)
1022
1023         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1024                                   p.auth_algo_vpp_id, p.auth_key,
1025                                   p.crypt_algo_vpp_id, p.crypt_key,
1026                                   self.vpp_esp_protocol)
1027         p.tun_sa_out.add_vpp_config()
1028
1029         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1030                                  p.auth_algo_vpp_id, p.auth_key,
1031                                  p.crypt_algo_vpp_id, p.crypt_key,
1032                                  self.vpp_esp_protocol)
1033         p.tun_sa_in.add_vpp_config()
1034
1035     def config_sa_tun(self, p):
1036         config_tun_params(p, self.encryption_type, self.tun_if)
1037
1038         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1039                                   p.auth_algo_vpp_id, p.auth_key,
1040                                   p.crypt_algo_vpp_id, p.crypt_key,
1041                                   self.vpp_esp_protocol,
1042                                   self.tun_if.remote_addr[p.addr_type],
1043                                   self.tun_if.local_addr[p.addr_type])
1044         p.tun_sa_out.add_vpp_config()
1045
1046         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1047                                  p.auth_algo_vpp_id, p.auth_key,
1048                                  p.crypt_algo_vpp_id, p.crypt_key,
1049                                  self.vpp_esp_protocol,
1050                                  self.tun_if.remote_addr[p.addr_type],
1051                                  self.tun_if.local_addr[p.addr_type])
1052         p.tun_sa_in.add_vpp_config()
1053
1054     def config_protect(self, p):
1055         p.tun_protect = VppIpsecTunProtect(self,
1056                                            p.tun_if,
1057                                            p.tun_sa_out,
1058                                            [p.tun_sa_in])
1059         p.tun_protect.add_vpp_config()
1060
1061     def config_network(self, p):
1062         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1063                                        self.pg0.local_ip6,
1064                                        self.pg0.remote_ip6)
1065         p.tun_if.add_vpp_config()
1066         p.tun_if.admin_up()
1067         p.tun_if.config_ip6()
1068         p.tun_if.config_ip4()
1069
1070         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1071                              [VppRoutePath(p.tun_if.remote_ip6,
1072                                            0xffffffff,
1073                                            proto=DpoProto.DPO_PROTO_IP6)])
1074         p.route.add_vpp_config()
1075         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1076                        [VppRoutePath(p.tun_if.remote_ip4,
1077                                      0xffffffff)])
1078         r.add_vpp_config()
1079
1080     def unconfig_network(self, p):
1081         p.route.remove_vpp_config()
1082         p.tun_if.remove_vpp_config()
1083
1084     def unconfig_protect(self, p):
1085         p.tun_protect.remove_vpp_config()
1086
1087     def unconfig_sa(self, p):
1088         p.tun_sa_out.remove_vpp_config()
1089         p.tun_sa_in.remove_vpp_config()
1090
1091
1092 class TestIpsec6TunProtect(TemplateIpsec,
1093                            TemplateIpsec6TunProtect,
1094                            IpsecTun6):
1095     """ IPsec IPv6 Tunnel protect - transport mode"""
1096
1097     encryption_type = ESP
1098     tun6_encrypt_node_name = "esp6-encrypt-tun"
1099     tun6_decrypt_node_name = "esp6-decrypt-tun"
1100
1101     def setUp(self):
1102         super(TestIpsec6TunProtect, self).setUp()
1103
1104         self.tun_if = self.pg0
1105
1106     def tearDown(self):
1107         super(TestIpsec6TunProtect, self).tearDown()
1108
1109     def test_tun_66(self):
1110         """IPSEC tunnel protect"""
1111
1112         p = self.ipv6_params
1113
1114         self.config_network(p)
1115         self.config_sa_tra(p)
1116         self.config_protect(p)
1117
1118         self.verify_tun_66(p, count=127)
1119         c = p.tun_if.get_rx_stats()
1120         self.assertEqual(c['packets'], 127)
1121         c = p.tun_if.get_tx_stats()
1122         self.assertEqual(c['packets'], 127)
1123
1124         # rekey - create new SAs and update the tunnel protection
1125         np = copy.copy(p)
1126         np.crypt_key = 'X' + p.crypt_key[1:]
1127         np.scapy_tun_spi += 100
1128         np.scapy_tun_sa_id += 1
1129         np.vpp_tun_spi += 100
1130         np.vpp_tun_sa_id += 1
1131         np.tun_if.local_spi = p.vpp_tun_spi
1132         np.tun_if.remote_spi = p.scapy_tun_spi
1133
1134         self.config_sa_tra(np)
1135         self.config_protect(np)
1136         self.unconfig_sa(p)
1137
1138         self.verify_tun_66(np, count=127)
1139         c = p.tun_if.get_rx_stats()
1140         self.assertEqual(c['packets'], 254)
1141         c = p.tun_if.get_tx_stats()
1142         self.assertEqual(c['packets'], 254)
1143
1144         # 3 phase rekey
1145         #  1) add two input SAs [old, new]
1146         #  2) swap output SA to [new]
1147         #  3) use only [new] input SA
1148         np3 = copy.copy(np)
1149         np3.crypt_key = 'Z' + p.crypt_key[1:]
1150         np3.scapy_tun_spi += 100
1151         np3.scapy_tun_sa_id += 1
1152         np3.vpp_tun_spi += 100
1153         np3.vpp_tun_sa_id += 1
1154         np3.tun_if.local_spi = p.vpp_tun_spi
1155         np3.tun_if.remote_spi = p.scapy_tun_spi
1156
1157         self.config_sa_tra(np3)
1158
1159         # step 1;
1160         p.tun_protect.update_vpp_config(np.tun_sa_out,
1161                                         [np.tun_sa_in, np3.tun_sa_in])
1162         self.verify_tun_66(np, np, count=127)
1163         self.verify_tun_66(np3, np, count=127)
1164
1165         # step 2;
1166         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1167                                         [np.tun_sa_in, np3.tun_sa_in])
1168         self.verify_tun_66(np, np3, count=127)
1169         self.verify_tun_66(np3, np3, count=127)
1170
1171         # step 1;
1172         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1173                                         [np3.tun_sa_in])
1174         self.verify_tun_66(np3, np3, count=127)
1175         self.verify_drop_tun_66(np, count=127)
1176
1177         c = p.tun_if.get_rx_stats()
1178         self.assertEqual(c['packets'], 127*7)
1179         c = p.tun_if.get_tx_stats()
1180         self.assertEqual(c['packets'], 127*7)
1181         self.unconfig_sa(np)
1182
1183         # teardown
1184         self.unconfig_protect(np3)
1185         self.unconfig_sa(np3)
1186         self.unconfig_network(p)
1187
1188     def test_tun_46(self):
1189         """IPSEC tunnel protect"""
1190
1191         p = self.ipv6_params
1192
1193         self.config_network(p)
1194         self.config_sa_tra(p)
1195         self.config_protect(p)
1196
1197         self.verify_tun_46(p, count=127)
1198         c = p.tun_if.get_rx_stats()
1199         self.assertEqual(c['packets'], 127)
1200         c = p.tun_if.get_tx_stats()
1201         self.assertEqual(c['packets'], 127)
1202
1203         # teardown
1204         self.unconfig_protect(p)
1205         self.unconfig_sa(p)
1206         self.unconfig_network(p)
1207
1208
1209 class TestIpsec6TunProtectTun(TemplateIpsec,
1210                               TemplateIpsec6TunProtect,
1211                               IpsecTun6):
1212     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1213
1214     encryption_type = ESP
1215     tun6_encrypt_node_name = "esp6-encrypt-tun"
1216     tun6_decrypt_node_name = "esp6-decrypt-tun"
1217
1218     def setUp(self):
1219         super(TestIpsec6TunProtectTun, self).setUp()
1220
1221         self.tun_if = self.pg0
1222
1223     def tearDown(self):
1224         super(TestIpsec6TunProtectTun, self).tearDown()
1225
1226     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1227                           payload_size=100):
1228         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1229                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1230                                 dst=sw_intf.local_ip6) /
1231                            IPv6(src=src, dst=dst) /
1232                            UDP(sport=1166, dport=2233) /
1233                            Raw('X' * payload_size))
1234                 for i in range(count)]
1235
1236     def gen_pkts6(self, sw_intf, src, dst, count=1,
1237                   payload_size=100):
1238         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1239                 IPv6(src=src, dst=dst) /
1240                 UDP(sport=1166, dport=2233) /
1241                 Raw('X' * payload_size)
1242                 for i in range(count)]
1243
1244     def verify_decrypted6(self, p, rxs):
1245         for rx in rxs:
1246             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1247             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1248             self.assert_packet_checksums_valid(rx)
1249
1250     def verify_encrypted6(self, p, sa, rxs):
1251         for rx in rxs:
1252             try:
1253                 pkt = sa.decrypt(rx[IPv6])
1254                 if not pkt.haslayer(IPv6):
1255                     pkt = IPv6(pkt[Raw].load)
1256                 self.assert_packet_checksums_valid(pkt)
1257                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1258                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1259                 inner = pkt[IPv6].payload
1260                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1261
1262             except (IndexError, AssertionError):
1263                 self.logger.debug(ppp("Unexpected packet:", rx))
1264                 try:
1265                     self.logger.debug(ppp("Decrypted packet:", pkt))
1266                 except:
1267                     pass
1268                 raise
1269
1270     def test_tun_66(self):
1271         """IPSEC tunnel protect """
1272
1273         p = self.ipv6_params
1274
1275         self.config_network(p)
1276         self.config_sa_tun(p)
1277         self.config_protect(p)
1278
1279         self.verify_tun_66(p, count=127)
1280
1281         c = p.tun_if.get_rx_stats()
1282         self.assertEqual(c['packets'], 127)
1283         c = p.tun_if.get_tx_stats()
1284         self.assertEqual(c['packets'], 127)
1285
1286         # rekey - create new SAs and update the tunnel protection
1287         np = copy.copy(p)
1288         np.crypt_key = 'X' + p.crypt_key[1:]
1289         np.scapy_tun_spi += 100
1290         np.scapy_tun_sa_id += 1
1291         np.vpp_tun_spi += 100
1292         np.vpp_tun_sa_id += 1
1293         np.tun_if.local_spi = p.vpp_tun_spi
1294         np.tun_if.remote_spi = p.scapy_tun_spi
1295
1296         self.config_sa_tun(np)
1297         self.config_protect(np)
1298         self.unconfig_sa(p)
1299
1300         self.verify_tun_66(np, count=127)
1301         c = p.tun_if.get_rx_stats()
1302         self.assertEqual(c['packets'], 254)
1303         c = p.tun_if.get_tx_stats()
1304         self.assertEqual(c['packets'], 254)
1305
1306         # teardown
1307         self.unconfig_protect(np)
1308         self.unconfig_sa(np)
1309         self.unconfig_network(p)
1310
1311
1312 if __name__ == '__main__':
1313     unittest.main(testRunner=VppTestRunner)