ipsec: Fix NULL encryption algorithm
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 from util import ppp
19 from vpp_papi import VppEnum
20
21
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23     """ IPsec tunnel interface tests """
24
25     encryption_type = ESP
26
27     @classmethod
28     def setUpClass(cls):
29         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
34
35     def setUp(self):
36         super(TemplateIpsec4TunIfEsp, self).setUp()
37
38         self.tun_if = self.pg0
39
40         p = self.ipv4_params
41
42         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
44                                         p.crypt_key, p.crypt_key,
45                                         p.auth_algo_vpp_id, p.auth_key,
46                                         p.auth_key)
47         p.tun_if.add_vpp_config()
48         p.tun_if.admin_up()
49         p.tun_if.config_ip4()
50         p.tun_if.config_ip6()
51
52         r = VppIpRoute(self, p.remote_tun_if_host, 32,
53                        [VppRoutePath(p.tun_if.remote_ip4,
54                                      0xffffffff)])
55         r.add_vpp_config()
56         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57                        [VppRoutePath(p.tun_if.remote_ip6,
58                                      0xffffffff,
59                                      proto=DpoProto.DPO_PROTO_IP6)])
60         r.add_vpp_config()
61
62     def tearDown(self):
63         super(TemplateIpsec4TunIfEsp, self).tearDown()
64
65
66 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
67     """ IPsec UDP tunnel interface tests """
68
69     tun4_encrypt_node_name = "esp4-encrypt-tun"
70     tun4_decrypt_node_name = "esp4-decrypt"
71     encryption_type = ESP
72
73     @classmethod
74     def setUpClass(cls):
75         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
76
77     @classmethod
78     def tearDownClass(cls):
79         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
80
81     def setUp(self):
82         super(TemplateIpsec4TunIfEspUdp, self).setUp()
83
84         self.tun_if = self.pg0
85
86         p = self.ipv4_params
87         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
88                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
89         p.nat_header = UDP(sport=5454, dport=4500)
90
91         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
92                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
93                                         p.crypt_key, p.crypt_key,
94                                         p.auth_algo_vpp_id, p.auth_key,
95                                         p.auth_key, udp_encap=True)
96         p.tun_if.add_vpp_config()
97         p.tun_if.admin_up()
98         p.tun_if.config_ip4()
99         p.tun_if.config_ip6()
100
101         r = VppIpRoute(self, p.remote_tun_if_host, 32,
102                        [VppRoutePath(p.tun_if.remote_ip4,
103                                      0xffffffff)])
104         r.add_vpp_config()
105         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106                        [VppRoutePath(p.tun_if.remote_ip6,
107                                      0xffffffff,
108                                      proto=DpoProto.DPO_PROTO_IP6)])
109         r.add_vpp_config()
110
111     def tearDown(self):
112         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
113
114
115 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
116     """ Ipsec ESP - TUN tests """
117     tun4_encrypt_node_name = "esp4-encrypt-tun"
118     tun4_decrypt_node_name = "esp4-decrypt"
119
120     def test_tun_basic64(self):
121         """ ipsec 6o4 tunnel basic test """
122         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
123
124         self.verify_tun_64(self.params[socket.AF_INET], count=1)
125
126     def test_tun_burst64(self):
127         """ ipsec 6o4 tunnel basic test """
128         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
129
130         self.verify_tun_64(self.params[socket.AF_INET], count=257)
131
132     def test_tun_basic_frag44(self):
133         """ ipsec 4o4 tunnel frag basic test """
134         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
135
136         p = self.ipv4_params
137
138         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
139                                        [1500, 0, 0, 0])
140         self.verify_tun_44(self.params[socket.AF_INET],
141                            count=1, payload_size=1800, n_rx=2)
142         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
143                                        [9000, 0, 0, 0])
144
145
146 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
147     """ Ipsec ESP UDP tests """
148
149     tun4_input_node = "ipsec4-if-input"
150
151     def test_keepalive(self):
152         """ IPSEC NAT Keepalive """
153         self.verify_keepalive(self.ipv4_params)
154
155
156 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
157     """ Ipsec ESP - TCP tests """
158     pass
159
160
161 class TemplateIpsec6TunIfEsp(TemplateIpsec):
162     """ IPsec tunnel interface tests """
163
164     encryption_type = ESP
165
166     def setUp(self):
167         super(TemplateIpsec6TunIfEsp, self).setUp()
168
169         self.tun_if = self.pg0
170
171         p = self.ipv6_params
172         tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173                                       p.scapy_tun_spi, p.crypt_algo_vpp_id,
174                                       p.crypt_key, p.crypt_key,
175                                       p.auth_algo_vpp_id, p.auth_key,
176                                       p.auth_key, is_ip6=True)
177         tun_if.add_vpp_config()
178         tun_if.admin_up()
179         tun_if.config_ip6()
180         tun_if.config_ip4()
181
182         r = VppIpRoute(self, p.remote_tun_if_host, 128,
183                        [VppRoutePath(tun_if.remote_ip6,
184                                      0xffffffff,
185                                      proto=DpoProto.DPO_PROTO_IP6)])
186         r.add_vpp_config()
187         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
188                        [VppRoutePath(tun_if.remote_ip4,
189                                      0xffffffff)])
190         r.add_vpp_config()
191
192     def tearDown(self):
193         super(TemplateIpsec6TunIfEsp, self).tearDown()
194
195
196 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
197     """ Ipsec ESP - TUN tests """
198     tun6_encrypt_node_name = "esp6-encrypt-tun"
199     tun6_decrypt_node_name = "esp6-decrypt"
200
201     def test_tun_basic46(self):
202         """ ipsec 4o6 tunnel basic test """
203         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
204         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
205
206     def test_tun_burst46(self):
207         """ ipsec 4o6 tunnel burst test """
208         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
209         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
210
211
212 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
213     """ IPsec IPv4 Multi Tunnel interface """
214
215     encryption_type = ESP
216     tun4_encrypt_node_name = "esp4-encrypt-tun"
217     tun4_decrypt_node_name = "esp4-decrypt"
218
219     def setUp(self):
220         super(TestIpsec4MultiTunIfEsp, self).setUp()
221
222         self.tun_if = self.pg0
223
224         self.multi_params = []
225
226         for ii in range(10):
227             p = copy.copy(self.ipv4_params)
228
229             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
230             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
231             p.scapy_tun_spi = p.scapy_tun_spi + ii
232             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
233             p.vpp_tun_spi = p.vpp_tun_spi + ii
234
235             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
236             p.scapy_tra_spi = p.scapy_tra_spi + ii
237             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
238             p.vpp_tra_spi = p.vpp_tra_spi + ii
239
240             config_tun_params(p, self.encryption_type, self.tun_if)
241             self.multi_params.append(p)
242
243             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
244                                             p.scapy_tun_spi,
245                                             p.crypt_algo_vpp_id,
246                                             p.crypt_key, p.crypt_key,
247                                             p.auth_algo_vpp_id, p.auth_key,
248                                             p.auth_key)
249             p.tun_if.add_vpp_config()
250             p.tun_if.admin_up()
251             p.tun_if.config_ip4()
252
253             VppIpRoute(self, p.remote_tun_if_host, 32,
254                        [VppRoutePath(p.tun_if.remote_ip4,
255                                      0xffffffff)]).add_vpp_config()
256
257     def tearDown(self):
258         super(TestIpsec4MultiTunIfEsp, self).tearDown()
259
260     def test_tun_44(self):
261         """Multiple IPSEC tunnel interfaces """
262         for p in self.multi_params:
263             self.verify_tun_44(p, count=127)
264             c = p.tun_if.get_rx_stats()
265             self.assertEqual(c['packets'], 127)
266             c = p.tun_if.get_tx_stats()
267             self.assertEqual(c['packets'], 127)
268
269
270 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
271     """ IPsec IPv4 Tunnel interface all Algos """
272
273     encryption_type = ESP
274     tun4_encrypt_node_name = "esp4-encrypt-tun"
275     tun4_decrypt_node_name = "esp4-decrypt"
276
277     def config_network(self, p):
278         config_tun_params(p, self.encryption_type, self.tun_if)
279
280         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
281                                         p.scapy_tun_spi,
282                                         p.crypt_algo_vpp_id,
283                                         p.crypt_key, p.crypt_key,
284                                         p.auth_algo_vpp_id, p.auth_key,
285                                         p.auth_key,
286                                         salt=p.salt)
287         p.tun_if.add_vpp_config()
288         p.tun_if.admin_up()
289         p.tun_if.config_ip4()
290         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
291         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
292
293         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
294                              [VppRoutePath(p.tun_if.remote_ip4,
295                                            0xffffffff)])
296         p.route.add_vpp_config()
297
298     def unconfig_network(self, p):
299         p.tun_if.unconfig_ip4()
300         p.tun_if.remove_vpp_config()
301         p.route.remove_vpp_config()
302
303     def setUp(self):
304         super(TestIpsec4TunIfEspAll, self).setUp()
305
306         self.tun_if = self.pg0
307
308     def tearDown(self):
309         super(TestIpsec4TunIfEspAll, self).tearDown()
310
311     def rekey(self, p):
312         #
313         # change the key and the SPI
314         #
315         p.crypt_key = 'X' + p.crypt_key[1:]
316         p.scapy_tun_spi += 1
317         p.scapy_tun_sa_id += 1
318         p.vpp_tun_spi += 1
319         p.vpp_tun_sa_id += 1
320         p.tun_if.local_spi = p.vpp_tun_spi
321         p.tun_if.remote_spi = p.scapy_tun_spi
322
323         config_tun_params(p, self.encryption_type, self.tun_if)
324
325         p.tun_sa_in = VppIpsecSA(self,
326                                  p.scapy_tun_sa_id,
327                                  p.scapy_tun_spi,
328                                  p.auth_algo_vpp_id,
329                                  p.auth_key,
330                                  p.crypt_algo_vpp_id,
331                                  p.crypt_key,
332                                  self.vpp_esp_protocol,
333                                  self.tun_if.local_addr[p.addr_type],
334                                  self.tun_if.remote_addr[p.addr_type],
335                                  flags=p.flags,
336                                  salt=p.salt)
337         p.tun_sa_out = VppIpsecSA(self,
338                                   p.vpp_tun_sa_id,
339                                   p.vpp_tun_spi,
340                                   p.auth_algo_vpp_id,
341                                   p.auth_key,
342                                   p.crypt_algo_vpp_id,
343                                   p.crypt_key,
344                                   self.vpp_esp_protocol,
345                                   self.tun_if.remote_addr[p.addr_type],
346                                   self.tun_if.local_addr[p.addr_type],
347                                   flags=p.flags,
348                                   salt=p.salt)
349         p.tun_sa_in.add_vpp_config()
350         p.tun_sa_out.add_vpp_config()
351
352         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
353                                          sa_id=p.tun_sa_in.id,
354                                          is_outbound=1)
355         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
356                                          sa_id=p.tun_sa_out.id,
357                                          is_outbound=0)
358         self.logger.info(self.vapi.cli("sh ipsec sa"))
359
360     def test_tun_44(self):
361         """IPSEC tunnel all algos """
362
363         # foreach VPP crypto engine
364         engines = ["ia32", "ipsecmb", "openssl"]
365
366         # foreach crypto algorithm
367         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
368                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
369                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
370                                 IPSEC_API_INTEG_ALG_NONE),
371                   'scapy-crypto': "AES-GCM",
372                   'scapy-integ': "NULL",
373                   'key': "JPjyOWBeVEQiMe7h",
374                   'salt': 3333},
375                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
376                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
377                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
378                                 IPSEC_API_INTEG_ALG_NONE),
379                   'scapy-crypto': "AES-GCM",
380                   'scapy-integ': "NULL",
381                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
382                   'salt': 0},
383                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
384                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
385                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
386                                 IPSEC_API_INTEG_ALG_NONE),
387                   'scapy-crypto': "AES-GCM",
388                   'scapy-integ': "NULL",
389                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
390                   'salt': 9999},
391                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
392                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
393                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
394                                 IPSEC_API_INTEG_ALG_SHA1_96),
395                   'scapy-crypto': "AES-CBC",
396                   'scapy-integ': "HMAC-SHA1-96",
397                   'salt': 0,
398                   'key': "JPjyOWBeVEQiMe7h"},
399                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
400                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
401                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
402                                 IPSEC_API_INTEG_ALG_SHA1_96),
403                   'scapy-crypto': "AES-CBC",
404                   'scapy-integ': "HMAC-SHA1-96",
405                   'salt': 0,
406                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
407                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
408                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
409                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
410                                 IPSEC_API_INTEG_ALG_SHA1_96),
411                   'scapy-crypto': "AES-CBC",
412                   'scapy-integ': "HMAC-SHA1-96",
413                   'salt': 0,
414                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
415                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
416                                  IPSEC_API_CRYPTO_ALG_NONE),
417                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
418                                 IPSEC_API_INTEG_ALG_SHA1_96),
419                   'scapy-crypto': "NULL",
420                   'scapy-integ': "HMAC-SHA1-96",
421                   'salt': 0,
422                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
423
424         for engine in engines:
425             self.vapi.cli("set crypto handler all %s" % engine)
426
427             #
428             # loop through each of the algorithms
429             #
430             for algo in algos:
431                 # with self.subTest(algo=algo['scapy']):
432
433                 p = copy.copy(self.ipv4_params)
434                 p.auth_algo_vpp_id = algo['vpp-integ']
435                 p.crypt_algo_vpp_id = algo['vpp-crypto']
436                 p.crypt_algo = algo['scapy-crypto']
437                 p.auth_algo = algo['scapy-integ']
438                 p.crypt_key = algo['key']
439                 p.salt = algo['salt']
440
441                 self.config_network(p)
442
443                 self.verify_tun_44(p, count=127)
444                 c = p.tun_if.get_rx_stats()
445                 self.assertEqual(c['packets'], 127)
446                 c = p.tun_if.get_tx_stats()
447                 self.assertEqual(c['packets'], 127)
448
449                 #
450                 # rekey the tunnel
451                 #
452                 self.rekey(p)
453                 self.verify_tun_44(p, count=127)
454
455                 self.unconfig_network(p)
456                 p.tun_sa_out.remove_vpp_config()
457                 p.tun_sa_in.remove_vpp_config()
458
459
460 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
461     """ IPsec IPv6 Multi Tunnel interface """
462
463     encryption_type = ESP
464     tun6_encrypt_node_name = "esp6-encrypt-tun"
465     tun6_decrypt_node_name = "esp6-decrypt"
466
467     def setUp(self):
468         super(TestIpsec6MultiTunIfEsp, self).setUp()
469
470         self.tun_if = self.pg0
471
472         self.multi_params = []
473
474         for ii in range(10):
475             p = copy.copy(self.ipv6_params)
476
477             p.remote_tun_if_host = "1111::%d" % (ii + 1)
478             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
479             p.scapy_tun_spi = p.scapy_tun_spi + ii
480             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
481             p.vpp_tun_spi = p.vpp_tun_spi + ii
482
483             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
484             p.scapy_tra_spi = p.scapy_tra_spi + ii
485             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
486             p.vpp_tra_spi = p.vpp_tra_spi + ii
487
488             config_tun_params(p, self.encryption_type, self.tun_if)
489             self.multi_params.append(p)
490
491             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
492                                             p.scapy_tun_spi,
493                                             p.crypt_algo_vpp_id,
494                                             p.crypt_key, p.crypt_key,
495                                             p.auth_algo_vpp_id, p.auth_key,
496                                             p.auth_key, is_ip6=True)
497             p.tun_if.add_vpp_config()
498             p.tun_if.admin_up()
499             p.tun_if.config_ip6()
500
501             r = VppIpRoute(self, p.remote_tun_if_host, 128,
502                            [VppRoutePath(p.tun_if.remote_ip6,
503                                          0xffffffff,
504                                          proto=DpoProto.DPO_PROTO_IP6)])
505             r.add_vpp_config()
506
507     def tearDown(self):
508         super(TestIpsec6MultiTunIfEsp, self).tearDown()
509
510     def test_tun_66(self):
511         """Multiple IPSEC tunnel interfaces """
512         for p in self.multi_params:
513             self.verify_tun_66(p, count=127)
514             c = p.tun_if.get_rx_stats()
515             self.assertEqual(c['packets'], 127)
516             c = p.tun_if.get_tx_stats()
517             self.assertEqual(c['packets'], 127)
518
519
520 class TestIpsecGreTebIfEsp(TemplateIpsec,
521                            IpsecTun4Tests):
522     """ Ipsec GRE TEB ESP - TUN tests """
523     tun4_encrypt_node_name = "esp4-encrypt-tun"
524     tun4_decrypt_node_name = "esp4-decrypt-tun"
525     encryption_type = ESP
526     omac = "00:11:22:33:44:55"
527
528     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
529                          payload_size=100):
530         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
531                 sa.encrypt(IP(src=self.pg0.remote_ip4,
532                               dst=self.pg0.local_ip4) /
533                            GRE() /
534                            Ether(dst=self.omac) /
535                            IP(src="1.1.1.1", dst="1.1.1.2") /
536                            UDP(sport=1144, dport=2233) /
537                            Raw('X' * payload_size))
538                 for i in range(count)]
539
540     def gen_pkts(self, sw_intf, src, dst, count=1,
541                  payload_size=100):
542         return [Ether(dst=self.omac) /
543                 IP(src="1.1.1.1", dst="1.1.1.2") /
544                 UDP(sport=1144, dport=2233) /
545                 Raw('X' * payload_size)
546                 for i in range(count)]
547
548     def verify_decrypted(self, p, rxs):
549         for rx in rxs:
550             self.assert_equal(rx[Ether].dst, self.omac)
551             self.assert_equal(rx[IP].dst, "1.1.1.2")
552
553     def verify_encrypted(self, p, sa, rxs):
554         for rx in rxs:
555             try:
556                 pkt = sa.decrypt(rx[IP])
557                 if not pkt.haslayer(IP):
558                     pkt = IP(pkt[Raw].load)
559                 self.assert_packet_checksums_valid(pkt)
560                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
561                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
562                 self.assertTrue(pkt.haslayer(GRE))
563                 e = pkt[Ether]
564                 self.assertEqual(e[Ether].dst, self.omac)
565                 self.assertEqual(e[IP].dst, "1.1.1.2")
566             except (IndexError, AssertionError):
567                 self.logger.debug(ppp("Unexpected packet:", rx))
568                 try:
569                     self.logger.debug(ppp("Decrypted packet:", pkt))
570                 except:
571                     pass
572                 raise
573
574     def setUp(self):
575         super(TestIpsecGreTebIfEsp, self).setUp()
576
577         self.tun_if = self.pg0
578
579         p = self.ipv4_params
580
581         bd1 = VppBridgeDomain(self, 1)
582         bd1.add_vpp_config()
583
584         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
585                                   p.auth_algo_vpp_id, p.auth_key,
586                                   p.crypt_algo_vpp_id, p.crypt_key,
587                                   self.vpp_esp_protocol,
588                                   self.pg0.local_ip4,
589                                   self.pg0.remote_ip4)
590         p.tun_sa_out.add_vpp_config()
591
592         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
593                                  p.auth_algo_vpp_id, p.auth_key,
594                                  p.crypt_algo_vpp_id, p.crypt_key,
595                                  self.vpp_esp_protocol,
596                                  self.pg0.remote_ip4,
597                                  self.pg0.local_ip4)
598         p.tun_sa_in.add_vpp_config()
599
600         self.tun = VppGreInterface(self,
601                                    self.pg0.local_ip4,
602                                    self.pg0.remote_ip4,
603                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
604                                          GRE_API_TUNNEL_TYPE_TEB))
605         self.tun.add_vpp_config()
606
607         p.tun_protect = VppIpsecTunProtect(self,
608                                            self.tun,
609                                            p.tun_sa_out,
610                                            [p.tun_sa_in])
611
612         p.tun_protect.add_vpp_config()
613
614         self.tun.admin_up()
615         self.tun.config_ip4()
616
617         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
618         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
619
620         self.vapi.cli("clear ipsec sa")
621
622     def tearDown(self):
623         self.tun.unconfig_ip4()
624         super(TestIpsecGreTebIfEsp, self).tearDown()
625
626
627 class TestIpsecGreIfEsp(TemplateIpsec,
628                         IpsecTun4Tests):
629     """ Ipsec GRE ESP - TUN tests """
630     tun4_encrypt_node_name = "esp4-encrypt-tun"
631     tun4_decrypt_node_name = "esp4-decrypt-tun"
632     encryption_type = ESP
633
634     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
635                          payload_size=100):
636         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
637                 sa.encrypt(IP(src=self.pg0.remote_ip4,
638                               dst=self.pg0.local_ip4) /
639                            GRE() /
640                            IP(src=self.pg1.local_ip4,
641                               dst=self.pg1.remote_ip4) /
642                            UDP(sport=1144, dport=2233) /
643                            Raw('X' * payload_size))
644                 for i in range(count)]
645
646     def gen_pkts(self, sw_intf, src, dst, count=1,
647                  payload_size=100):
648         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
649                 IP(src="1.1.1.1", dst="1.1.1.2") /
650                 UDP(sport=1144, dport=2233) /
651                 Raw('X' * payload_size)
652                 for i in range(count)]
653
654     def verify_decrypted(self, p, rxs):
655         for rx in rxs:
656             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
657             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
658
659     def verify_encrypted(self, p, sa, rxs):
660         for rx in rxs:
661             try:
662                 pkt = sa.decrypt(rx[IP])
663                 if not pkt.haslayer(IP):
664                     pkt = IP(pkt[Raw].load)
665                 self.assert_packet_checksums_valid(pkt)
666                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
667                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
668                 self.assertTrue(pkt.haslayer(GRE))
669                 e = pkt[GRE]
670                 self.assertEqual(e[IP].dst, "1.1.1.2")
671             except (IndexError, AssertionError):
672                 self.logger.debug(ppp("Unexpected packet:", rx))
673                 try:
674                     self.logger.debug(ppp("Decrypted packet:", pkt))
675                 except:
676                     pass
677                 raise
678
679     def setUp(self):
680         super(TestIpsecGreIfEsp, self).setUp()
681
682         self.tun_if = self.pg0
683
684         p = self.ipv4_params
685
686         bd1 = VppBridgeDomain(self, 1)
687         bd1.add_vpp_config()
688
689         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
690                                   p.auth_algo_vpp_id, p.auth_key,
691                                   p.crypt_algo_vpp_id, p.crypt_key,
692                                   self.vpp_esp_protocol,
693                                   self.pg0.local_ip4,
694                                   self.pg0.remote_ip4)
695         p.tun_sa_out.add_vpp_config()
696
697         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
698                                  p.auth_algo_vpp_id, p.auth_key,
699                                  p.crypt_algo_vpp_id, p.crypt_key,
700                                  self.vpp_esp_protocol,
701                                  self.pg0.remote_ip4,
702                                  self.pg0.local_ip4)
703         p.tun_sa_in.add_vpp_config()
704
705         self.tun = VppGreInterface(self,
706                                    self.pg0.local_ip4,
707                                    self.pg0.remote_ip4)
708         self.tun.add_vpp_config()
709
710         p.tun_protect = VppIpsecTunProtect(self,
711                                            self.tun,
712                                            p.tun_sa_out,
713                                            [p.tun_sa_in])
714         p.tun_protect.add_vpp_config()
715
716         self.tun.admin_up()
717         self.tun.config_ip4()
718
719         VppIpRoute(self, "1.1.1.2", 32,
720                    [VppRoutePath(self.tun.remote_ip4,
721                                  0xffffffff)]).add_vpp_config()
722
723     def tearDown(self):
724         self.tun.unconfig_ip4()
725         super(TestIpsecGreIfEsp, self).tearDown()
726
727
728 class TemplateIpsec4TunProtect(object):
729     """ IPsec IPv4 Tunnel protect """
730
731     encryption_type = ESP
732     tun4_encrypt_node_name = "esp4-encrypt-tun"
733     tun4_decrypt_node_name = "esp4-decrypt-tun"
734     tun4_input_node = "ipsec4-tun-input"
735
736     def config_sa_tra(self, p):
737         config_tun_params(p, self.encryption_type, self.tun_if)
738
739         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
740                                   p.auth_algo_vpp_id, p.auth_key,
741                                   p.crypt_algo_vpp_id, p.crypt_key,
742                                   self.vpp_esp_protocol,
743                                   flags=p.flags)
744         p.tun_sa_out.add_vpp_config()
745
746         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
747                                  p.auth_algo_vpp_id, p.auth_key,
748                                  p.crypt_algo_vpp_id, p.crypt_key,
749                                  self.vpp_esp_protocol,
750                                  flags=p.flags)
751         p.tun_sa_in.add_vpp_config()
752
753     def config_sa_tun(self, p):
754         config_tun_params(p, self.encryption_type, self.tun_if)
755
756         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
757                                   p.auth_algo_vpp_id, p.auth_key,
758                                   p.crypt_algo_vpp_id, p.crypt_key,
759                                   self.vpp_esp_protocol,
760                                   self.tun_if.remote_addr[p.addr_type],
761                                   self.tun_if.local_addr[p.addr_type],
762                                   flags=p.flags)
763         p.tun_sa_out.add_vpp_config()
764
765         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
766                                  p.auth_algo_vpp_id, p.auth_key,
767                                  p.crypt_algo_vpp_id, p.crypt_key,
768                                  self.vpp_esp_protocol,
769                                  self.tun_if.remote_addr[p.addr_type],
770                                  self.tun_if.local_addr[p.addr_type],
771                                  flags=p.flags)
772         p.tun_sa_in.add_vpp_config()
773
774     def config_protect(self, p):
775         p.tun_protect = VppIpsecTunProtect(self,
776                                            p.tun_if,
777                                            p.tun_sa_out,
778                                            [p.tun_sa_in])
779         p.tun_protect.add_vpp_config()
780
781     def config_network(self, p):
782         p.tun_if = VppIpIpTunInterface(self, self.pg0,
783                                        self.pg0.local_ip4,
784                                        self.pg0.remote_ip4)
785         p.tun_if.add_vpp_config()
786         p.tun_if.admin_up()
787         p.tun_if.config_ip4()
788
789         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
790                              [VppRoutePath(p.tun_if.remote_ip4,
791                                            0xffffffff)])
792         p.route.add_vpp_config()
793
794     def unconfig_network(self, p):
795         p.route.remove_vpp_config()
796         p.tun_if.remove_vpp_config()
797
798     def unconfig_protect(self, p):
799         p.tun_protect.remove_vpp_config()
800
801     def unconfig_sa(self, p):
802         p.tun_sa_out.remove_vpp_config()
803         p.tun_sa_in.remove_vpp_config()
804
805
806 class TestIpsec4TunProtect(TemplateIpsec,
807                            TemplateIpsec4TunProtect,
808                            IpsecTun4):
809     """ IPsec IPv4 Tunnel protect - transport mode"""
810
811     def setUp(self):
812         super(TestIpsec4TunProtect, self).setUp()
813
814         self.tun_if = self.pg0
815
816     def tearDown(self):
817         super(TestIpsec4TunProtect, self).tearDown()
818
819     def test_tun_44(self):
820         """IPSEC tunnel protect"""
821
822         p = self.ipv4_params
823
824         self.config_network(p)
825         self.config_sa_tra(p)
826         self.config_protect(p)
827
828         self.verify_tun_44(p, count=127)
829         c = p.tun_if.get_rx_stats()
830         self.assertEqual(c['packets'], 127)
831         c = p.tun_if.get_tx_stats()
832         self.assertEqual(c['packets'], 127)
833
834         # rekey - create new SAs and update the tunnel protection
835         np = copy.copy(p)
836         np.crypt_key = 'X' + p.crypt_key[1:]
837         np.scapy_tun_spi += 100
838         np.scapy_tun_sa_id += 1
839         np.vpp_tun_spi += 100
840         np.vpp_tun_sa_id += 1
841         np.tun_if.local_spi = p.vpp_tun_spi
842         np.tun_if.remote_spi = p.scapy_tun_spi
843
844         self.config_sa_tra(np)
845         self.config_protect(np)
846         self.unconfig_sa(p)
847
848         self.verify_tun_44(np, count=127)
849         c = p.tun_if.get_rx_stats()
850         self.assertEqual(c['packets'], 254)
851         c = p.tun_if.get_tx_stats()
852         self.assertEqual(c['packets'], 254)
853
854         # teardown
855         self.unconfig_protect(np)
856         self.unconfig_sa(np)
857         self.unconfig_network(p)
858
859
860 class TestIpsec4TunProtectUdp(TemplateIpsec,
861                               TemplateIpsec4TunProtect,
862                               IpsecTun4):
863     """ IPsec IPv4 Tunnel protect - transport mode"""
864
865     def setUp(self):
866         super(TestIpsec4TunProtectUdp, self).setUp()
867
868         self.tun_if = self.pg0
869
870         p = self.ipv4_params
871         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
872                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
873         p.nat_header = UDP(sport=5454, dport=4500)
874         self.config_network(p)
875         self.config_sa_tra(p)
876         self.config_protect(p)
877
878     def tearDown(self):
879         p = self.ipv4_params
880         self.unconfig_protect(p)
881         self.unconfig_sa(p)
882         self.unconfig_network(p)
883         super(TestIpsec4TunProtectUdp, self).tearDown()
884
885     def test_tun_44(self):
886         """IPSEC UDP tunnel protect"""
887
888         p = self.ipv4_params
889
890         self.verify_tun_44(p, count=127)
891         c = p.tun_if.get_rx_stats()
892         self.assertEqual(c['packets'], 127)
893         c = p.tun_if.get_tx_stats()
894         self.assertEqual(c['packets'], 127)
895
896     def test_keepalive(self):
897         """ IPSEC NAT Keepalive """
898         self.verify_keepalive(self.ipv4_params)
899
900
901 class TestIpsec4TunProtectTun(TemplateIpsec,
902                               TemplateIpsec4TunProtect,
903                               IpsecTun4):
904     """ IPsec IPv4 Tunnel protect - tunnel mode"""
905
906     encryption_type = ESP
907     tun4_encrypt_node_name = "esp4-encrypt-tun"
908     tun4_decrypt_node_name = "esp4-decrypt-tun"
909
910     def setUp(self):
911         super(TestIpsec4TunProtectTun, self).setUp()
912
913         self.tun_if = self.pg0
914
915     def tearDown(self):
916         super(TestIpsec4TunProtectTun, self).tearDown()
917
918     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
919                          payload_size=100):
920         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
921                 sa.encrypt(IP(src=sw_intf.remote_ip4,
922                               dst=sw_intf.local_ip4) /
923                            IP(src=src, dst=dst) /
924                            UDP(sport=1144, dport=2233) /
925                            Raw('X' * payload_size))
926                 for i in range(count)]
927
928     def gen_pkts(self, sw_intf, src, dst, count=1,
929                  payload_size=100):
930         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
931                 IP(src=src, dst=dst) /
932                 UDP(sport=1144, dport=2233) /
933                 Raw('X' * payload_size)
934                 for i in range(count)]
935
936     def verify_decrypted(self, p, rxs):
937         for rx in rxs:
938             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
939             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
940             self.assert_packet_checksums_valid(rx)
941
942     def verify_encrypted(self, p, sa, rxs):
943         for rx in rxs:
944             try:
945                 pkt = sa.decrypt(rx[IP])
946                 if not pkt.haslayer(IP):
947                     pkt = IP(pkt[Raw].load)
948                 self.assert_packet_checksums_valid(pkt)
949                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
950                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
951                 inner = pkt[IP].payload
952                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
953
954             except (IndexError, AssertionError):
955                 self.logger.debug(ppp("Unexpected packet:", rx))
956                 try:
957                     self.logger.debug(ppp("Decrypted packet:", pkt))
958                 except:
959                     pass
960                 raise
961
962     def test_tun_44(self):
963         """IPSEC tunnel protect """
964
965         p = self.ipv4_params
966
967         self.config_network(p)
968         self.config_sa_tun(p)
969         self.config_protect(p)
970
971         self.verify_tun_44(p, count=127)
972
973         c = p.tun_if.get_rx_stats()
974         self.assertEqual(c['packets'], 127)
975         c = p.tun_if.get_tx_stats()
976         self.assertEqual(c['packets'], 127)
977
978         # rekey - create new SAs and update the tunnel protection
979         np = copy.copy(p)
980         np.crypt_key = 'X' + p.crypt_key[1:]
981         np.scapy_tun_spi += 100
982         np.scapy_tun_sa_id += 1
983         np.vpp_tun_spi += 100
984         np.vpp_tun_sa_id += 1
985         np.tun_if.local_spi = p.vpp_tun_spi
986         np.tun_if.remote_spi = p.scapy_tun_spi
987
988         self.config_sa_tun(np)
989         self.config_protect(np)
990         self.unconfig_sa(p)
991
992         self.verify_tun_44(np, count=127)
993         c = p.tun_if.get_rx_stats()
994         self.assertEqual(c['packets'], 254)
995         c = p.tun_if.get_tx_stats()
996         self.assertEqual(c['packets'], 254)
997
998         # teardown
999         self.unconfig_protect(np)
1000         self.unconfig_sa(np)
1001         self.unconfig_network(p)
1002
1003
1004 class TemplateIpsec6TunProtect(object):
1005     """ IPsec IPv6 Tunnel protect """
1006
1007     def config_sa_tra(self, p):
1008         config_tun_params(p, self.encryption_type, self.tun_if)
1009
1010         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1011                                   p.auth_algo_vpp_id, p.auth_key,
1012                                   p.crypt_algo_vpp_id, p.crypt_key,
1013                                   self.vpp_esp_protocol)
1014         p.tun_sa_out.add_vpp_config()
1015
1016         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1017                                  p.auth_algo_vpp_id, p.auth_key,
1018                                  p.crypt_algo_vpp_id, p.crypt_key,
1019                                  self.vpp_esp_protocol)
1020         p.tun_sa_in.add_vpp_config()
1021
1022     def config_sa_tun(self, p):
1023         config_tun_params(p, self.encryption_type, self.tun_if)
1024
1025         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1026                                   p.auth_algo_vpp_id, p.auth_key,
1027                                   p.crypt_algo_vpp_id, p.crypt_key,
1028                                   self.vpp_esp_protocol,
1029                                   self.tun_if.remote_addr[p.addr_type],
1030                                   self.tun_if.local_addr[p.addr_type])
1031         p.tun_sa_out.add_vpp_config()
1032
1033         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1034                                  p.auth_algo_vpp_id, p.auth_key,
1035                                  p.crypt_algo_vpp_id, p.crypt_key,
1036                                  self.vpp_esp_protocol,
1037                                  self.tun_if.remote_addr[p.addr_type],
1038                                  self.tun_if.local_addr[p.addr_type])
1039         p.tun_sa_in.add_vpp_config()
1040
1041     def config_protect(self, p):
1042         p.tun_protect = VppIpsecTunProtect(self,
1043                                            p.tun_if,
1044                                            p.tun_sa_out,
1045                                            [p.tun_sa_in])
1046         p.tun_protect.add_vpp_config()
1047
1048     def config_network(self, p):
1049         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1050                                        self.pg0.local_ip6,
1051                                        self.pg0.remote_ip6)
1052         p.tun_if.add_vpp_config()
1053         p.tun_if.admin_up()
1054         p.tun_if.config_ip6()
1055
1056         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1057                              [VppRoutePath(p.tun_if.remote_ip6,
1058                                            0xffffffff,
1059                                            proto=DpoProto.DPO_PROTO_IP6)])
1060         p.route.add_vpp_config()
1061
1062     def unconfig_network(self, p):
1063         p.route.remove_vpp_config()
1064         p.tun_if.remove_vpp_config()
1065
1066     def unconfig_protect(self, p):
1067         p.tun_protect.remove_vpp_config()
1068
1069     def unconfig_sa(self, p):
1070         p.tun_sa_out.remove_vpp_config()
1071         p.tun_sa_in.remove_vpp_config()
1072
1073
1074 class TestIpsec6TunProtect(TemplateIpsec,
1075                            TemplateIpsec6TunProtect,
1076                            IpsecTun6):
1077     """ IPsec IPv6 Tunnel protect - transport mode"""
1078
1079     encryption_type = ESP
1080     tun6_encrypt_node_name = "esp6-encrypt-tun"
1081     tun6_decrypt_node_name = "esp6-decrypt-tun"
1082
1083     def setUp(self):
1084         super(TestIpsec6TunProtect, self).setUp()
1085
1086         self.tun_if = self.pg0
1087
1088     def tearDown(self):
1089         super(TestIpsec6TunProtect, self).tearDown()
1090
1091     def test_tun_66(self):
1092         """IPSEC tunnel protect"""
1093
1094         p = self.ipv6_params
1095
1096         self.config_network(p)
1097         self.config_sa_tra(p)
1098         self.config_protect(p)
1099
1100         self.verify_tun_66(p, count=127)
1101         c = p.tun_if.get_rx_stats()
1102         self.assertEqual(c['packets'], 127)
1103         c = p.tun_if.get_tx_stats()
1104         self.assertEqual(c['packets'], 127)
1105
1106         # rekey - create new SAs and update the tunnel protection
1107         np = copy.copy(p)
1108         np.crypt_key = 'X' + p.crypt_key[1:]
1109         np.scapy_tun_spi += 100
1110         np.scapy_tun_sa_id += 1
1111         np.vpp_tun_spi += 100
1112         np.vpp_tun_sa_id += 1
1113         np.tun_if.local_spi = p.vpp_tun_spi
1114         np.tun_if.remote_spi = p.scapy_tun_spi
1115
1116         self.config_sa_tra(np)
1117         self.config_protect(np)
1118         self.unconfig_sa(p)
1119
1120         self.verify_tun_66(np, count=127)
1121         c = p.tun_if.get_rx_stats()
1122         self.assertEqual(c['packets'], 254)
1123         c = p.tun_if.get_tx_stats()
1124         self.assertEqual(c['packets'], 254)
1125
1126         # 3 phase rekey
1127         #  1) add two input SAs [old, new]
1128         #  2) swap output SA to [new]
1129         #  3) use only [new] input SA
1130         np3 = copy.copy(np)
1131         np3.crypt_key = 'Z' + p.crypt_key[1:]
1132         np3.scapy_tun_spi += 100
1133         np3.scapy_tun_sa_id += 1
1134         np3.vpp_tun_spi += 100
1135         np3.vpp_tun_sa_id += 1
1136         np3.tun_if.local_spi = p.vpp_tun_spi
1137         np3.tun_if.remote_spi = p.scapy_tun_spi
1138
1139         self.config_sa_tra(np3)
1140
1141         # step 1;
1142         p.tun_protect.update_vpp_config(np.tun_sa_out,
1143                                         [np.tun_sa_in, np3.tun_sa_in])
1144         self.verify_tun_66(np, np, count=127)
1145         self.verify_tun_66(np3, np, count=127)
1146
1147         # step 2;
1148         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1149                                         [np.tun_sa_in, np3.tun_sa_in])
1150         self.verify_tun_66(np, np3, count=127)
1151         self.verify_tun_66(np3, np3, count=127)
1152
1153         # step 1;
1154         p.tun_protect.update_vpp_config(np3.tun_sa_out,
1155                                         [np3.tun_sa_in])
1156         self.verify_tun_66(np3, np3, count=127)
1157         self.verify_drop_tun_66(np, count=127)
1158
1159         c = p.tun_if.get_rx_stats()
1160         self.assertEqual(c['packets'], 127*7)
1161         c = p.tun_if.get_tx_stats()
1162         self.assertEqual(c['packets'], 127*7)
1163         self.unconfig_sa(np)
1164
1165         # teardown
1166         self.unconfig_protect(np3)
1167         self.unconfig_sa(np3)
1168         self.unconfig_network(p)
1169
1170
1171 class TestIpsec6TunProtectTun(TemplateIpsec,
1172                               TemplateIpsec6TunProtect,
1173                               IpsecTun6):
1174     """ IPsec IPv6 Tunnel protect - tunnel mode"""
1175
1176     encryption_type = ESP
1177     tun6_encrypt_node_name = "esp6-encrypt-tun"
1178     tun6_decrypt_node_name = "esp6-decrypt-tun"
1179
1180     def setUp(self):
1181         super(TestIpsec6TunProtectTun, self).setUp()
1182
1183         self.tun_if = self.pg0
1184
1185     def tearDown(self):
1186         super(TestIpsec6TunProtectTun, self).tearDown()
1187
1188     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1189                           payload_size=100):
1190         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1191                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1192                                 dst=sw_intf.local_ip6) /
1193                            IPv6(src=src, dst=dst) /
1194                            UDP(sport=1166, dport=2233) /
1195                            Raw('X' * payload_size))
1196                 for i in range(count)]
1197
1198     def gen_pkts6(self, sw_intf, src, dst, count=1,
1199                   payload_size=100):
1200         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1201                 IPv6(src=src, dst=dst) /
1202                 UDP(sport=1166, dport=2233) /
1203                 Raw('X' * payload_size)
1204                 for i in range(count)]
1205
1206     def verify_decrypted6(self, p, rxs):
1207         for rx in rxs:
1208             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1209             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1210             self.assert_packet_checksums_valid(rx)
1211
1212     def verify_encrypted6(self, p, sa, rxs):
1213         for rx in rxs:
1214             try:
1215                 pkt = sa.decrypt(rx[IPv6])
1216                 if not pkt.haslayer(IPv6):
1217                     pkt = IPv6(pkt[Raw].load)
1218                 self.assert_packet_checksums_valid(pkt)
1219                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1220                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1221                 inner = pkt[IPv6].payload
1222                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1223
1224             except (IndexError, AssertionError):
1225                 self.logger.debug(ppp("Unexpected packet:", rx))
1226                 try:
1227                     self.logger.debug(ppp("Decrypted packet:", pkt))
1228                 except:
1229                     pass
1230                 raise
1231
1232     def test_tun_66(self):
1233         """IPSEC tunnel protect """
1234
1235         p = self.ipv6_params
1236
1237         self.config_network(p)
1238         self.config_sa_tun(p)
1239         self.config_protect(p)
1240
1241         self.verify_tun_66(p, count=127)
1242
1243         c = p.tun_if.get_rx_stats()
1244         self.assertEqual(c['packets'], 127)
1245         c = p.tun_if.get_tx_stats()
1246         self.assertEqual(c['packets'], 127)
1247
1248         # rekey - create new SAs and update the tunnel protection
1249         np = copy.copy(p)
1250         np.crypt_key = 'X' + p.crypt_key[1:]
1251         np.scapy_tun_spi += 100
1252         np.scapy_tun_sa_id += 1
1253         np.vpp_tun_spi += 100
1254         np.vpp_tun_sa_id += 1
1255         np.tun_if.local_spi = p.vpp_tun_spi
1256         np.tun_if.remote_spi = p.scapy_tun_spi
1257
1258         self.config_sa_tun(np)
1259         self.config_protect(np)
1260         self.unconfig_sa(p)
1261
1262         self.verify_tun_66(np, count=127)
1263         c = p.tun_if.get_rx_stats()
1264         self.assertEqual(c['packets'], 254)
1265         c = p.tun_if.get_tx_stats()
1266         self.assertEqual(c['packets'], 254)
1267
1268         # teardown
1269         self.unconfig_protect(np)
1270         self.unconfig_sa(np)
1271         self.unconfig_network(p)
1272
1273
1274 if __name__ == '__main__':
1275     unittest.main(testRunner=VppTestRunner)