feature: Config end nodes are user specific
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
22 from util import ppp
23 from vpp_papi import VppEnum
24 from vpp_acl import AclRule, VppAcl, VppAclInterface
25
26
27 def config_tun_params(p, encryption_type, tun_if):
28     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
29     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
30                              IPSEC_API_SAD_FLAG_USE_ESN))
31     crypt_key = mk_scapy_crypt_key(p)
32     p.tun_dst = tun_if.remote_ip
33     p.tun_src = tun_if.local_ip
34     p.scapy_tun_sa = SecurityAssociation(
35         encryption_type, spi=p.vpp_tun_spi,
36         crypt_algo=p.crypt_algo,
37         crypt_key=crypt_key,
38         auth_algo=p.auth_algo, auth_key=p.auth_key,
39         tunnel_header=ip_class_by_addr_type[p.addr_type](
40             src=p.tun_dst,
41             dst=p.tun_src),
42         nat_t_header=p.nat_header,
43         esn_en=esn_en)
44     p.vpp_tun_sa = SecurityAssociation(
45         encryption_type, spi=p.scapy_tun_spi,
46         crypt_algo=p.crypt_algo,
47         crypt_key=crypt_key,
48         auth_algo=p.auth_algo, auth_key=p.auth_key,
49         tunnel_header=ip_class_by_addr_type[p.addr_type](
50             dst=p.tun_dst,
51             src=p.tun_src),
52         nat_t_header=p.nat_header,
53         esn_en=esn_en)
54
55
56 def config_tra_params(p, encryption_type, tun_if):
57     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
58     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
59                              IPSEC_API_SAD_FLAG_USE_ESN))
60     crypt_key = mk_scapy_crypt_key(p)
61     p.tun_dst = tun_if.remote_ip
62     p.tun_src = tun_if.local_ip
63     p.scapy_tun_sa = SecurityAssociation(
64         encryption_type, spi=p.vpp_tun_spi,
65         crypt_algo=p.crypt_algo,
66         crypt_key=crypt_key,
67         auth_algo=p.auth_algo, auth_key=p.auth_key,
68         esn_en=esn_en,
69         nat_t_header=p.nat_header)
70     p.vpp_tun_sa = SecurityAssociation(
71         encryption_type, spi=p.scapy_tun_spi,
72         crypt_algo=p.crypt_algo,
73         crypt_key=crypt_key,
74         auth_algo=p.auth_algo, auth_key=p.auth_key,
75         esn_en=esn_en,
76         nat_t_header=p.nat_header)
77
78
79 class TemplateIpsec4TunIfEsp(TemplateIpsec):
80     """ IPsec tunnel interface tests """
81
82     encryption_type = ESP
83
84     @classmethod
85     def setUpClass(cls):
86         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
87
88     @classmethod
89     def tearDownClass(cls):
90         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
91
92     def setUp(self):
93         super(TemplateIpsec4TunIfEsp, self).setUp()
94
95         self.tun_if = self.pg0
96
97         p = self.ipv4_params
98
99         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
100                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
101                                         p.crypt_key, p.crypt_key,
102                                         p.auth_algo_vpp_id, p.auth_key,
103                                         p.auth_key)
104         p.tun_if.add_vpp_config()
105         p.tun_if.admin_up()
106         p.tun_if.config_ip4()
107         p.tun_if.config_ip6()
108         config_tun_params(p, self.encryption_type, p.tun_if)
109
110         r = VppIpRoute(self, p.remote_tun_if_host, 32,
111                        [VppRoutePath(p.tun_if.remote_ip4,
112                                      0xffffffff)])
113         r.add_vpp_config()
114         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
115                        [VppRoutePath(p.tun_if.remote_ip6,
116                                      0xffffffff,
117                                      proto=DpoProto.DPO_PROTO_IP6)])
118         r.add_vpp_config()
119
120     def tearDown(self):
121         super(TemplateIpsec4TunIfEsp, self).tearDown()
122
123
124 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
125     """ IPsec UDP tunnel interface tests """
126
127     tun4_encrypt_node_name = "esp4-encrypt-tun"
128     tun4_decrypt_node_name = "esp4-decrypt-tun"
129     encryption_type = ESP
130
131     @classmethod
132     def setUpClass(cls):
133         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
134
135     @classmethod
136     def tearDownClass(cls):
137         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
138
139     def verify_encrypted(self, p, sa, rxs):
140         for rx in rxs:
141             try:
142                 # ensure the UDP ports are correct before we decrypt
143                 # which strips them
144                 self.assertTrue(rx.haslayer(UDP))
145                 self.assert_equal(rx[UDP].sport, 4500)
146                 self.assert_equal(rx[UDP].dport, 4500)
147
148                 pkt = sa.decrypt(rx[IP])
149                 if not pkt.haslayer(IP):
150                     pkt = IP(pkt[Raw].load)
151
152                 self.assert_packet_checksums_valid(pkt)
153                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
154                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
155             except (IndexError, AssertionError):
156                 self.logger.debug(ppp("Unexpected packet:", rx))
157                 try:
158                     self.logger.debug(ppp("Decrypted packet:", pkt))
159                 except:
160                     pass
161                 raise
162
163     def setUp(self):
164         super(TemplateIpsec4TunIfEspUdp, self).setUp()
165
166         p = self.ipv4_params
167         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
168                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
169         p.nat_header = UDP(sport=5454, dport=4500)
170
171     def config_network(self):
172
173         self.tun_if = self.pg0
174         p = self.ipv4_params
175         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
176                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
177                                         p.crypt_key, p.crypt_key,
178                                         p.auth_algo_vpp_id, p.auth_key,
179                                         p.auth_key, udp_encap=True)
180         p.tun_if.add_vpp_config()
181         p.tun_if.admin_up()
182         p.tun_if.config_ip4()
183         p.tun_if.config_ip6()
184         config_tun_params(p, self.encryption_type, p.tun_if)
185
186         r = VppIpRoute(self, p.remote_tun_if_host, 32,
187                        [VppRoutePath(p.tun_if.remote_ip4,
188                                      0xffffffff)])
189         r.add_vpp_config()
190         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
191                        [VppRoutePath(p.tun_if.remote_ip6,
192                                      0xffffffff,
193                                      proto=DpoProto.DPO_PROTO_IP6)])
194         r.add_vpp_config()
195
196     def tearDown(self):
197         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
198
199
200 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
201     """ Ipsec ESP - TUN tests """
202     tun4_encrypt_node_name = "esp4-encrypt-tun"
203     tun4_decrypt_node_name = "esp4-decrypt-tun"
204
205     def test_tun_basic64(self):
206         """ ipsec 6o4 tunnel basic test """
207         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
208
209         self.verify_tun_64(self.params[socket.AF_INET], count=1)
210
211     def test_tun_burst64(self):
212         """ ipsec 6o4 tunnel basic test """
213         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
214
215         self.verify_tun_64(self.params[socket.AF_INET], count=257)
216
217     def test_tun_basic_frag44(self):
218         """ ipsec 4o4 tunnel frag basic test """
219         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
220
221         p = self.ipv4_params
222
223         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
224                                        [1500, 0, 0, 0])
225         self.verify_tun_44(self.params[socket.AF_INET],
226                            count=1, payload_size=1800, n_rx=2)
227         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
228                                        [9000, 0, 0, 0])
229
230
231 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
232     """ Ipsec ESP UDP tests """
233
234     tun4_input_node = "ipsec4-tun-input"
235
236     def setUp(self):
237         super(TemplateIpsec4TunIfEspUdp, self).setUp()
238         self.config_network()
239
240     def test_keepalive(self):
241         """ IPSEC NAT Keepalive """
242         self.verify_keepalive(self.ipv4_params)
243
244
245 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
246     """ Ipsec ESP UDP GCM tests """
247
248     tun4_input_node = "ipsec4-tun-input"
249
250     def setUp(self):
251         super(TemplateIpsec4TunIfEspUdp, self).setUp()
252         p = self.ipv4_params
253         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
254                               IPSEC_API_INTEG_ALG_NONE)
255         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
256                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
257         p.crypt_algo = "AES-GCM"
258         p.auth_algo = "NULL"
259         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
260         p.salt = 0
261         self.config_network()
262
263
264 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
265     """ Ipsec ESP - TCP tests """
266     pass
267
268
269 class TemplateIpsec6TunIfEsp(TemplateIpsec):
270     """ IPsec tunnel interface tests """
271
272     encryption_type = ESP
273
274     def setUp(self):
275         super(TemplateIpsec6TunIfEsp, self).setUp()
276
277         self.tun_if = self.pg0
278
279         p = self.ipv6_params
280         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
281                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
282                                         p.crypt_key, p.crypt_key,
283                                         p.auth_algo_vpp_id, p.auth_key,
284                                         p.auth_key, is_ip6=True)
285         p.tun_if.add_vpp_config()
286         p.tun_if.admin_up()
287         p.tun_if.config_ip6()
288         p.tun_if.config_ip4()
289         config_tun_params(p, self.encryption_type, p.tun_if)
290
291         r = VppIpRoute(self, p.remote_tun_if_host, 128,
292                        [VppRoutePath(p.tun_if.remote_ip6,
293                                      0xffffffff,
294                                      proto=DpoProto.DPO_PROTO_IP6)])
295         r.add_vpp_config()
296         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
297                        [VppRoutePath(p.tun_if.remote_ip4,
298                                      0xffffffff)])
299         r.add_vpp_config()
300
301     def tearDown(self):
302         super(TemplateIpsec6TunIfEsp, self).tearDown()
303
304
305 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
306                           IpsecTun6Tests):
307     """ Ipsec ESP - TUN tests """
308     tun6_encrypt_node_name = "esp6-encrypt-tun"
309     tun6_decrypt_node_name = "esp6-decrypt-tun"
310
311     def test_tun_basic46(self):
312         """ ipsec 4o6 tunnel basic test """
313         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
314         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
315
316     def test_tun_burst46(self):
317         """ ipsec 4o6 tunnel burst test """
318         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
319         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
320
321
322 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
323                                 IpsecTun6HandoffTests):
324     """ Ipsec ESP 6 Handoff tests """
325     tun6_encrypt_node_name = "esp6-encrypt-tun"
326     tun6_decrypt_node_name = "esp6-decrypt-tun"
327
328
329 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
330                                 IpsecTun4HandoffTests):
331     """ Ipsec ESP 4 Handoff tests """
332     tun4_encrypt_node_name = "esp4-encrypt-tun"
333     tun4_decrypt_node_name = "esp4-decrypt-tun"
334
335
336 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
337     """ IPsec IPv4 Multi Tunnel interface """
338
339     encryption_type = ESP
340     tun4_encrypt_node_name = "esp4-encrypt-tun"
341     tun4_decrypt_node_name = "esp4-decrypt-tun"
342
343     def setUp(self):
344         super(TestIpsec4MultiTunIfEsp, self).setUp()
345
346         self.tun_if = self.pg0
347
348         self.multi_params = []
349         self.pg0.generate_remote_hosts(10)
350         self.pg0.configure_ipv4_neighbors()
351
352         for ii in range(10):
353             p = copy.copy(self.ipv4_params)
354
355             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
356             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
357             p.scapy_tun_spi = p.scapy_tun_spi + ii
358             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
359             p.vpp_tun_spi = p.vpp_tun_spi + ii
360
361             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
362             p.scapy_tra_spi = p.scapy_tra_spi + ii
363             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
364             p.vpp_tra_spi = p.vpp_tra_spi + ii
365             p.tun_dst = self.pg0.remote_hosts[ii].ip4
366
367             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
368                                             p.scapy_tun_spi,
369                                             p.crypt_algo_vpp_id,
370                                             p.crypt_key, p.crypt_key,
371                                             p.auth_algo_vpp_id, p.auth_key,
372                                             p.auth_key,
373                                             dst=p.tun_dst)
374             p.tun_if.add_vpp_config()
375             p.tun_if.admin_up()
376             p.tun_if.config_ip4()
377             config_tun_params(p, self.encryption_type, p.tun_if)
378             self.multi_params.append(p)
379
380             VppIpRoute(self, p.remote_tun_if_host, 32,
381                        [VppRoutePath(p.tun_if.remote_ip4,
382                                      0xffffffff)]).add_vpp_config()
383
384     def tearDown(self):
385         super(TestIpsec4MultiTunIfEsp, self).tearDown()
386
387     def test_tun_44(self):
388         """Multiple IPSEC tunnel interfaces """
389         for p in self.multi_params:
390             self.verify_tun_44(p, count=127)
391             c = p.tun_if.get_rx_stats()
392             self.assertEqual(c['packets'], 127)
393             c = p.tun_if.get_tx_stats()
394             self.assertEqual(c['packets'], 127)
395
396     def test_tun_rr_44(self):
397         """ Round-robin packets acrros multiple interface """
398         tx = []
399         for p in self.multi_params:
400             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
401                                             src=p.remote_tun_if_host,
402                                             dst=self.pg1.remote_ip4)
403         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
404
405         for rx, p in zip(rxs, self.multi_params):
406             self.verify_decrypted(p, [rx])
407
408         tx = []
409         for p in self.multi_params:
410             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
411                                     dst=p.remote_tun_if_host)
412         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
413
414         for rx, p in zip(rxs, self.multi_params):
415             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
416
417
418 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
419     """ IPsec IPv4 Tunnel interface all Algos """
420
421     encryption_type = ESP
422     tun4_encrypt_node_name = "esp4-encrypt-tun"
423     tun4_decrypt_node_name = "esp4-decrypt-tun"
424
425     def config_network(self, p):
426
427         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
428                                         p.scapy_tun_spi,
429                                         p.crypt_algo_vpp_id,
430                                         p.crypt_key, p.crypt_key,
431                                         p.auth_algo_vpp_id, p.auth_key,
432                                         p.auth_key,
433                                         salt=p.salt)
434         p.tun_if.add_vpp_config()
435         p.tun_if.admin_up()
436         p.tun_if.config_ip4()
437         config_tun_params(p, self.encryption_type, p.tun_if)
438         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
439         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
440
441         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
442                              [VppRoutePath(p.tun_if.remote_ip4,
443                                            0xffffffff)])
444         p.route.add_vpp_config()
445
446     def unconfig_network(self, p):
447         p.tun_if.unconfig_ip4()
448         p.tun_if.remove_vpp_config()
449         p.route.remove_vpp_config()
450
451     def setUp(self):
452         super(TestIpsec4TunIfEspAll, self).setUp()
453
454         self.tun_if = self.pg0
455
456     def tearDown(self):
457         super(TestIpsec4TunIfEspAll, self).tearDown()
458
459     def rekey(self, p):
460         #
461         # change the key and the SPI
462         #
463         p.crypt_key = b'X' + p.crypt_key[1:]
464         p.scapy_tun_spi += 1
465         p.scapy_tun_sa_id += 1
466         p.vpp_tun_spi += 1
467         p.vpp_tun_sa_id += 1
468         p.tun_if.local_spi = p.vpp_tun_spi
469         p.tun_if.remote_spi = p.scapy_tun_spi
470
471         config_tun_params(p, self.encryption_type, p.tun_if)
472
473         p.tun_sa_in = VppIpsecSA(self,
474                                  p.scapy_tun_sa_id,
475                                  p.scapy_tun_spi,
476                                  p.auth_algo_vpp_id,
477                                  p.auth_key,
478                                  p.crypt_algo_vpp_id,
479                                  p.crypt_key,
480                                  self.vpp_esp_protocol,
481                                  flags=p.flags,
482                                  salt=p.salt)
483         p.tun_sa_out = VppIpsecSA(self,
484                                   p.vpp_tun_sa_id,
485                                   p.vpp_tun_spi,
486                                   p.auth_algo_vpp_id,
487                                   p.auth_key,
488                                   p.crypt_algo_vpp_id,
489                                   p.crypt_key,
490                                   self.vpp_esp_protocol,
491                                   flags=p.flags,
492                                   salt=p.salt)
493         p.tun_sa_in.add_vpp_config()
494         p.tun_sa_out.add_vpp_config()
495
496         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
497                                          sa_id=p.tun_sa_in.id,
498                                          is_outbound=1)
499         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
500                                          sa_id=p.tun_sa_out.id,
501                                          is_outbound=0)
502         self.logger.info(self.vapi.cli("sh ipsec sa"))
503
504     def test_tun_44(self):
505         """IPSEC tunnel all algos """
506
507         # foreach VPP crypto engine
508         engines = ["ia32", "ipsecmb", "openssl"]
509
510         # foreach crypto algorithm
511         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
512                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
513                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
514                                 IPSEC_API_INTEG_ALG_NONE),
515                   'scapy-crypto': "AES-GCM",
516                   'scapy-integ': "NULL",
517                   'key': b"JPjyOWBeVEQiMe7h",
518                   'salt': 3333},
519                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
520                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
521                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
522                                 IPSEC_API_INTEG_ALG_NONE),
523                   'scapy-crypto': "AES-GCM",
524                   'scapy-integ': "NULL",
525                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
526                   'salt': 0},
527                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
528                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
529                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
530                                 IPSEC_API_INTEG_ALG_NONE),
531                   'scapy-crypto': "AES-GCM",
532                   'scapy-integ': "NULL",
533                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
534                   'salt': 9999},
535                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
536                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
537                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
538                                 IPSEC_API_INTEG_ALG_SHA1_96),
539                   'scapy-crypto': "AES-CBC",
540                   'scapy-integ': "HMAC-SHA1-96",
541                   'salt': 0,
542                   'key': b"JPjyOWBeVEQiMe7h"},
543                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
544                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
545                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
546                                 IPSEC_API_INTEG_ALG_SHA1_96),
547                   'scapy-crypto': "AES-CBC",
548                   'scapy-integ': "HMAC-SHA1-96",
549                   'salt': 0,
550                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
551                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
552                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
553                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
554                                 IPSEC_API_INTEG_ALG_SHA1_96),
555                   'scapy-crypto': "AES-CBC",
556                   'scapy-integ': "HMAC-SHA1-96",
557                   'salt': 0,
558                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
559                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
560                                  IPSEC_API_CRYPTO_ALG_NONE),
561                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
562                                 IPSEC_API_INTEG_ALG_SHA1_96),
563                   'scapy-crypto': "NULL",
564                   'scapy-integ': "HMAC-SHA1-96",
565                   'salt': 0,
566                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
567
568         for engine in engines:
569             self.vapi.cli("set crypto handler all %s" % engine)
570
571             #
572             # loop through each of the algorithms
573             #
574             for algo in algos:
575                 # with self.subTest(algo=algo['scapy']):
576
577                 p = copy.copy(self.ipv4_params)
578                 p.auth_algo_vpp_id = algo['vpp-integ']
579                 p.crypt_algo_vpp_id = algo['vpp-crypto']
580                 p.crypt_algo = algo['scapy-crypto']
581                 p.auth_algo = algo['scapy-integ']
582                 p.crypt_key = algo['key']
583                 p.salt = algo['salt']
584
585                 self.config_network(p)
586
587                 self.verify_tun_44(p, count=127)
588                 c = p.tun_if.get_rx_stats()
589                 self.assertEqual(c['packets'], 127)
590                 c = p.tun_if.get_tx_stats()
591                 self.assertEqual(c['packets'], 127)
592
593                 #
594                 # rekey the tunnel
595                 #
596                 self.rekey(p)
597                 self.verify_tun_44(p, count=127)
598
599                 self.unconfig_network(p)
600                 p.tun_sa_out.remove_vpp_config()
601                 p.tun_sa_in.remove_vpp_config()
602
603
604 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
605     """ IPsec IPv4 Tunnel interface no Algos """
606
607     encryption_type = ESP
608     tun4_encrypt_node_name = "esp4-encrypt-tun"
609     tun4_decrypt_node_name = "esp4-decrypt-tun"
610
611     def config_network(self, p):
612
613         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
614                               IPSEC_API_INTEG_ALG_NONE)
615         p.auth_algo = 'NULL'
616         p.auth_key = []
617
618         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
619                                IPSEC_API_CRYPTO_ALG_NONE)
620         p.crypt_algo = 'NULL'
621         p.crypt_key = []
622
623         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
624                                         p.scapy_tun_spi,
625                                         p.crypt_algo_vpp_id,
626                                         p.crypt_key, p.crypt_key,
627                                         p.auth_algo_vpp_id, p.auth_key,
628                                         p.auth_key,
629                                         salt=p.salt)
630         p.tun_if.add_vpp_config()
631         p.tun_if.admin_up()
632         p.tun_if.config_ip4()
633         config_tun_params(p, self.encryption_type, p.tun_if)
634         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
635         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
636
637         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
638                              [VppRoutePath(p.tun_if.remote_ip4,
639                                            0xffffffff)])
640         p.route.add_vpp_config()
641
642     def unconfig_network(self, p):
643         p.tun_if.unconfig_ip4()
644         p.tun_if.remove_vpp_config()
645         p.route.remove_vpp_config()
646
647     def setUp(self):
648         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
649
650         self.tun_if = self.pg0
651
652     def tearDown(self):
653         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
654
655     def test_tun_44(self):
656         """ IPSec SA with NULL algos """
657         p = self.ipv4_params
658
659         self.config_network(p)
660
661         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
662                            dst=p.remote_tun_if_host)
663         self.send_and_assert_no_replies(self.pg1, tx)
664
665         self.unconfig_network(p)
666
667
668 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
669     """ IPsec IPv6 Multi Tunnel interface """
670
671     encryption_type = ESP
672     tun6_encrypt_node_name = "esp6-encrypt-tun"
673     tun6_decrypt_node_name = "esp6-decrypt-tun"
674
675     def setUp(self):
676         super(TestIpsec6MultiTunIfEsp, self).setUp()
677
678         self.tun_if = self.pg0
679
680         self.multi_params = []
681         self.pg0.generate_remote_hosts(10)
682         self.pg0.configure_ipv6_neighbors()
683
684         for ii in range(10):
685             p = copy.copy(self.ipv6_params)
686
687             p.remote_tun_if_host = "1111::%d" % (ii + 1)
688             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
689             p.scapy_tun_spi = p.scapy_tun_spi + ii
690             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
691             p.vpp_tun_spi = p.vpp_tun_spi + ii
692
693             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
694             p.scapy_tra_spi = p.scapy_tra_spi + ii
695             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
696             p.vpp_tra_spi = p.vpp_tra_spi + ii
697
698             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
699                                             p.scapy_tun_spi,
700                                             p.crypt_algo_vpp_id,
701                                             p.crypt_key, p.crypt_key,
702                                             p.auth_algo_vpp_id, p.auth_key,
703                                             p.auth_key, is_ip6=True,
704                                             dst=self.pg0.remote_hosts[ii].ip6)
705             p.tun_if.add_vpp_config()
706             p.tun_if.admin_up()
707             p.tun_if.config_ip6()
708             config_tun_params(p, self.encryption_type, p.tun_if)
709             self.multi_params.append(p)
710
711             r = VppIpRoute(self, p.remote_tun_if_host, 128,
712                            [VppRoutePath(p.tun_if.remote_ip6,
713                                          0xffffffff,
714                                          proto=DpoProto.DPO_PROTO_IP6)])
715             r.add_vpp_config()
716
717     def tearDown(self):
718         super(TestIpsec6MultiTunIfEsp, self).tearDown()
719
720     def test_tun_66(self):
721         """Multiple IPSEC tunnel interfaces """
722         for p in self.multi_params:
723             self.verify_tun_66(p, count=127)
724             c = p.tun_if.get_rx_stats()
725             self.assertEqual(c['packets'], 127)
726             c = p.tun_if.get_tx_stats()
727             self.assertEqual(c['packets'], 127)
728
729
730 class TestIpsecGreTebIfEsp(TemplateIpsec,
731                            IpsecTun4Tests):
732     """ Ipsec GRE TEB ESP - TUN tests """
733     tun4_encrypt_node_name = "esp4-encrypt-tun"
734     tun4_decrypt_node_name = "esp4-decrypt-tun"
735     encryption_type = ESP
736     omac = "00:11:22:33:44:55"
737
738     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
739                          payload_size=100):
740         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
741                 sa.encrypt(IP(src=self.pg0.remote_ip4,
742                               dst=self.pg0.local_ip4) /
743                            GRE() /
744                            Ether(dst=self.omac) /
745                            IP(src="1.1.1.1", dst="1.1.1.2") /
746                            UDP(sport=1144, dport=2233) /
747                            Raw(b'X' * payload_size))
748                 for i in range(count)]
749
750     def gen_pkts(self, sw_intf, src, dst, count=1,
751                  payload_size=100):
752         return [Ether(dst=self.omac) /
753                 IP(src="1.1.1.1", dst="1.1.1.2") /
754                 UDP(sport=1144, dport=2233) /
755                 Raw(b'X' * payload_size)
756                 for i in range(count)]
757
758     def verify_decrypted(self, p, rxs):
759         for rx in rxs:
760             self.assert_equal(rx[Ether].dst, self.omac)
761             self.assert_equal(rx[IP].dst, "1.1.1.2")
762
763     def verify_encrypted(self, p, sa, rxs):
764         for rx in rxs:
765             try:
766                 pkt = sa.decrypt(rx[IP])
767                 if not pkt.haslayer(IP):
768                     pkt = IP(pkt[Raw].load)
769                 self.assert_packet_checksums_valid(pkt)
770                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
771                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
772                 self.assertTrue(pkt.haslayer(GRE))
773                 e = pkt[Ether]
774                 self.assertEqual(e[Ether].dst, self.omac)
775                 self.assertEqual(e[IP].dst, "1.1.1.2")
776             except (IndexError, AssertionError):
777                 self.logger.debug(ppp("Unexpected packet:", rx))
778                 try:
779                     self.logger.debug(ppp("Decrypted packet:", pkt))
780                 except:
781                     pass
782                 raise
783
784     def setUp(self):
785         super(TestIpsecGreTebIfEsp, self).setUp()
786
787         self.tun_if = self.pg0
788
789         p = self.ipv4_params
790
791         bd1 = VppBridgeDomain(self, 1)
792         bd1.add_vpp_config()
793
794         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
795                                   p.auth_algo_vpp_id, p.auth_key,
796                                   p.crypt_algo_vpp_id, p.crypt_key,
797                                   self.vpp_esp_protocol,
798                                   self.pg0.local_ip4,
799                                   self.pg0.remote_ip4)
800         p.tun_sa_out.add_vpp_config()
801
802         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
803                                  p.auth_algo_vpp_id, p.auth_key,
804                                  p.crypt_algo_vpp_id, p.crypt_key,
805                                  self.vpp_esp_protocol,
806                                  self.pg0.remote_ip4,
807                                  self.pg0.local_ip4)
808         p.tun_sa_in.add_vpp_config()
809
810         p.tun_if = VppGreInterface(self,
811                                    self.pg0.local_ip4,
812                                    self.pg0.remote_ip4,
813                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
814                                          GRE_API_TUNNEL_TYPE_TEB))
815         p.tun_if.add_vpp_config()
816
817         p.tun_protect = VppIpsecTunProtect(self,
818                                            p.tun_if,
819                                            p.tun_sa_out,
820                                            [p.tun_sa_in])
821
822         p.tun_protect.add_vpp_config()
823
824         p.tun_if.admin_up()
825         p.tun_if.config_ip4()
826         config_tun_params(p, self.encryption_type, p.tun_if)
827
828         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
829         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
830
831         self.vapi.cli("clear ipsec sa")
832         self.vapi.cli("sh adj")
833         self.vapi.cli("sh ipsec tun")
834
835     def tearDown(self):
836         p = self.ipv4_params
837         p.tun_if.unconfig_ip4()
838         super(TestIpsecGreTebIfEsp, self).tearDown()
839
840
841 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
842                                IpsecTun4Tests):
843     """ Ipsec GRE TEB ESP - TUN tests """
844     tun4_encrypt_node_name = "esp4-encrypt-tun"
845     tun4_decrypt_node_name = "esp4-decrypt-tun"
846     encryption_type = ESP
847     omac = "00:11:22:33:44:55"
848
849     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
850                          payload_size=100):
851         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
852                 sa.encrypt(IP(src=self.pg0.remote_ip4,
853                               dst=self.pg0.local_ip4) /
854                            GRE() /
855                            Ether(dst=self.omac) /
856                            IP(src="1.1.1.1", dst="1.1.1.2") /
857                            UDP(sport=1144, dport=2233) /
858                            Raw(b'X' * payload_size))
859                 for i in range(count)]
860
861     def gen_pkts(self, sw_intf, src, dst, count=1,
862                  payload_size=100):
863         return [Ether(dst=self.omac) /
864                 Dot1Q(vlan=11) /
865                 IP(src="1.1.1.1", dst="1.1.1.2") /
866                 UDP(sport=1144, dport=2233) /
867                 Raw(b'X' * payload_size)
868                 for i in range(count)]
869
870     def verify_decrypted(self, p, rxs):
871         for rx in rxs:
872             self.assert_equal(rx[Ether].dst, self.omac)
873             self.assert_equal(rx[Dot1Q].vlan, 11)
874             self.assert_equal(rx[IP].dst, "1.1.1.2")
875
876     def verify_encrypted(self, p, sa, rxs):
877         for rx in rxs:
878             try:
879                 pkt = sa.decrypt(rx[IP])
880                 if not pkt.haslayer(IP):
881                     pkt = IP(pkt[Raw].load)
882                 self.assert_packet_checksums_valid(pkt)
883                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
884                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
885                 self.assertTrue(pkt.haslayer(GRE))
886                 e = pkt[Ether]
887                 self.assertEqual(e[Ether].dst, self.omac)
888                 self.assertFalse(e.haslayer(Dot1Q))
889                 self.assertEqual(e[IP].dst, "1.1.1.2")
890             except (IndexError, AssertionError):
891                 self.logger.debug(ppp("Unexpected packet:", rx))
892                 try:
893                     self.logger.debug(ppp("Decrypted packet:", pkt))
894                 except:
895                     pass
896                 raise
897
898     def setUp(self):
899         super(TestIpsecGreTebVlanIfEsp, self).setUp()
900
901         self.tun_if = self.pg0
902
903         p = self.ipv4_params
904
905         bd1 = VppBridgeDomain(self, 1)
906         bd1.add_vpp_config()
907
908         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
909         self.vapi.l2_interface_vlan_tag_rewrite(
910             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
911             push_dot1q=11)
912         self.pg1_11.admin_up()
913
914         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
915                                   p.auth_algo_vpp_id, p.auth_key,
916                                   p.crypt_algo_vpp_id, p.crypt_key,
917                                   self.vpp_esp_protocol,
918                                   self.pg0.local_ip4,
919                                   self.pg0.remote_ip4)
920         p.tun_sa_out.add_vpp_config()
921
922         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
923                                  p.auth_algo_vpp_id, p.auth_key,
924                                  p.crypt_algo_vpp_id, p.crypt_key,
925                                  self.vpp_esp_protocol,
926                                  self.pg0.remote_ip4,
927                                  self.pg0.local_ip4)
928         p.tun_sa_in.add_vpp_config()
929
930         p.tun_if = VppGreInterface(self,
931                                    self.pg0.local_ip4,
932                                    self.pg0.remote_ip4,
933                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
934                                          GRE_API_TUNNEL_TYPE_TEB))
935         p.tun_if.add_vpp_config()
936
937         p.tun_protect = VppIpsecTunProtect(self,
938                                            p.tun_if,
939                                            p.tun_sa_out,
940                                            [p.tun_sa_in])
941
942         p.tun_protect.add_vpp_config()
943
944         p.tun_if.admin_up()
945         p.tun_if.config_ip4()
946         config_tun_params(p, self.encryption_type, p.tun_if)
947
948         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
949         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
950
951         self.vapi.cli("clear ipsec sa")
952
953     def tearDown(self):
954         p = self.ipv4_params
955         p.tun_if.unconfig_ip4()
956         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
957         self.pg1_11.admin_down()
958         self.pg1_11.remove_vpp_config()
959
960
961 class TestIpsecGreTebIfEspTra(TemplateIpsec,
962                               IpsecTun4Tests):
963     """ Ipsec GRE TEB ESP - Tra tests """
964     tun4_encrypt_node_name = "esp4-encrypt-tun"
965     tun4_decrypt_node_name = "esp4-decrypt-tun"
966     encryption_type = ESP
967     omac = "00:11:22:33:44:55"
968
969     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
970                          payload_size=100):
971         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
972                 sa.encrypt(IP(src=self.pg0.remote_ip4,
973                               dst=self.pg0.local_ip4) /
974                            GRE() /
975                            Ether(dst=self.omac) /
976                            IP(src="1.1.1.1", dst="1.1.1.2") /
977                            UDP(sport=1144, dport=2233) /
978                            Raw(b'X' * payload_size))
979                 for i in range(count)]
980
981     def gen_pkts(self, sw_intf, src, dst, count=1,
982                  payload_size=100):
983         return [Ether(dst=self.omac) /
984                 IP(src="1.1.1.1", dst="1.1.1.2") /
985                 UDP(sport=1144, dport=2233) /
986                 Raw(b'X' * payload_size)
987                 for i in range(count)]
988
989     def verify_decrypted(self, p, rxs):
990         for rx in rxs:
991             self.assert_equal(rx[Ether].dst, self.omac)
992             self.assert_equal(rx[IP].dst, "1.1.1.2")
993
994     def verify_encrypted(self, p, sa, rxs):
995         for rx in rxs:
996             try:
997                 pkt = sa.decrypt(rx[IP])
998                 if not pkt.haslayer(IP):
999                     pkt = IP(pkt[Raw].load)
1000                 self.assert_packet_checksums_valid(pkt)
1001                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1002                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1003                 self.assertTrue(pkt.haslayer(GRE))
1004                 e = pkt[Ether]
1005                 self.assertEqual(e[Ether].dst, self.omac)
1006                 self.assertEqual(e[IP].dst, "1.1.1.2")
1007             except (IndexError, AssertionError):
1008                 self.logger.debug(ppp("Unexpected packet:", rx))
1009                 try:
1010                     self.logger.debug(ppp("Decrypted packet:", pkt))
1011                 except:
1012                     pass
1013                 raise
1014
1015     def setUp(self):
1016         super(TestIpsecGreTebIfEspTra, self).setUp()
1017
1018         self.tun_if = self.pg0
1019
1020         p = self.ipv4_params
1021
1022         bd1 = VppBridgeDomain(self, 1)
1023         bd1.add_vpp_config()
1024
1025         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1026                                   p.auth_algo_vpp_id, p.auth_key,
1027                                   p.crypt_algo_vpp_id, p.crypt_key,
1028                                   self.vpp_esp_protocol)
1029         p.tun_sa_out.add_vpp_config()
1030
1031         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1032                                  p.auth_algo_vpp_id, p.auth_key,
1033                                  p.crypt_algo_vpp_id, p.crypt_key,
1034                                  self.vpp_esp_protocol)
1035         p.tun_sa_in.add_vpp_config()
1036
1037         p.tun_if = VppGreInterface(self,
1038                                    self.pg0.local_ip4,
1039                                    self.pg0.remote_ip4,
1040                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1041                                          GRE_API_TUNNEL_TYPE_TEB))
1042         p.tun_if.add_vpp_config()
1043
1044         p.tun_protect = VppIpsecTunProtect(self,
1045                                            p.tun_if,
1046                                            p.tun_sa_out,
1047                                            [p.tun_sa_in])
1048
1049         p.tun_protect.add_vpp_config()
1050
1051         p.tun_if.admin_up()
1052         p.tun_if.config_ip4()
1053         config_tra_params(p, self.encryption_type, p.tun_if)
1054
1055         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1056         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1057
1058         self.vapi.cli("clear ipsec sa")
1059
1060     def tearDown(self):
1061         p = self.ipv4_params
1062         p.tun_if.unconfig_ip4()
1063         super(TestIpsecGreTebIfEspTra, self).tearDown()
1064
1065
1066 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1067                                  IpsecTun4Tests):
1068     """ Ipsec GRE TEB UDP ESP - Tra tests """
1069     tun4_encrypt_node_name = "esp4-encrypt-tun"
1070     tun4_decrypt_node_name = "esp4-decrypt-tun"
1071     encryption_type = ESP
1072     omac = "00:11:22:33:44:55"
1073
1074     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1075                          payload_size=100):
1076         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1077                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1078                               dst=self.pg0.local_ip4) /
1079                            GRE() /
1080                            Ether(dst=self.omac) /
1081                            IP(src="1.1.1.1", dst="1.1.1.2") /
1082                            UDP(sport=1144, dport=2233) /
1083                            Raw(b'X' * payload_size))
1084                 for i in range(count)]
1085
1086     def gen_pkts(self, sw_intf, src, dst, count=1,
1087                  payload_size=100):
1088         return [Ether(dst=self.omac) /
1089                 IP(src="1.1.1.1", dst="1.1.1.2") /
1090                 UDP(sport=1144, dport=2233) /
1091                 Raw(b'X' * payload_size)
1092                 for i in range(count)]
1093
1094     def verify_decrypted(self, p, rxs):
1095         for rx in rxs:
1096             self.assert_equal(rx[Ether].dst, self.omac)
1097             self.assert_equal(rx[IP].dst, "1.1.1.2")
1098
1099     def verify_encrypted(self, p, sa, rxs):
1100         for rx in rxs:
1101             self.assertTrue(rx.haslayer(UDP))
1102             self.assertEqual(rx[UDP].dport, 4545)
1103             self.assertEqual(rx[UDP].sport, 5454)
1104             try:
1105                 pkt = sa.decrypt(rx[IP])
1106                 if not pkt.haslayer(IP):
1107                     pkt = IP(pkt[Raw].load)
1108                 self.assert_packet_checksums_valid(pkt)
1109                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1110                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1111                 self.assertTrue(pkt.haslayer(GRE))
1112                 e = pkt[Ether]
1113                 self.assertEqual(e[Ether].dst, self.omac)
1114                 self.assertEqual(e[IP].dst, "1.1.1.2")
1115             except (IndexError, AssertionError):
1116                 self.logger.debug(ppp("Unexpected packet:", rx))
1117                 try:
1118                     self.logger.debug(ppp("Decrypted packet:", pkt))
1119                 except:
1120                     pass
1121                 raise
1122
1123     def setUp(self):
1124         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1125
1126         self.tun_if = self.pg0
1127
1128         p = self.ipv4_params
1129         p = self.ipv4_params
1130         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1131                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1132         p.nat_header = UDP(sport=5454, dport=4545)
1133
1134         bd1 = VppBridgeDomain(self, 1)
1135         bd1.add_vpp_config()
1136
1137         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1138                                   p.auth_algo_vpp_id, p.auth_key,
1139                                   p.crypt_algo_vpp_id, p.crypt_key,
1140                                   self.vpp_esp_protocol,
1141                                   flags=p.flags,
1142                                   udp_src=5454,
1143                                   udp_dst=4545)
1144         p.tun_sa_out.add_vpp_config()
1145
1146         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1147                                  p.auth_algo_vpp_id, p.auth_key,
1148                                  p.crypt_algo_vpp_id, p.crypt_key,
1149                                  self.vpp_esp_protocol,
1150                                  flags=(p.flags |
1151                                         VppEnum.vl_api_ipsec_sad_flags_t.
1152                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1153                                  udp_src=5454,
1154                                  udp_dst=4545)
1155         p.tun_sa_in.add_vpp_config()
1156
1157         p.tun_if = VppGreInterface(self,
1158                                    self.pg0.local_ip4,
1159                                    self.pg0.remote_ip4,
1160                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1161                                          GRE_API_TUNNEL_TYPE_TEB))
1162         p.tun_if.add_vpp_config()
1163
1164         p.tun_protect = VppIpsecTunProtect(self,
1165                                            p.tun_if,
1166                                            p.tun_sa_out,
1167                                            [p.tun_sa_in])
1168
1169         p.tun_protect.add_vpp_config()
1170
1171         p.tun_if.admin_up()
1172         p.tun_if.config_ip4()
1173         config_tra_params(p, self.encryption_type, p.tun_if)
1174
1175         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1176         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1177
1178         self.vapi.cli("clear ipsec sa")
1179         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1180
1181     def tearDown(self):
1182         p = self.ipv4_params
1183         p.tun_if.unconfig_ip4()
1184         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1185
1186
1187 class TestIpsecGreIfEsp(TemplateIpsec,
1188                         IpsecTun4Tests):
1189     """ Ipsec GRE ESP - TUN tests """
1190     tun4_encrypt_node_name = "esp4-encrypt-tun"
1191     tun4_decrypt_node_name = "esp4-decrypt-tun"
1192     encryption_type = ESP
1193
1194     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1195                          payload_size=100):
1196         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1197                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1198                               dst=self.pg0.local_ip4) /
1199                            GRE() /
1200                            IP(src=self.pg1.local_ip4,
1201                               dst=self.pg1.remote_ip4) /
1202                            UDP(sport=1144, dport=2233) /
1203                            Raw(b'X' * payload_size))
1204                 for i in range(count)]
1205
1206     def gen_pkts(self, sw_intf, src, dst, count=1,
1207                  payload_size=100):
1208         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1209                 IP(src="1.1.1.1", dst="1.1.1.2") /
1210                 UDP(sport=1144, dport=2233) /
1211                 Raw(b'X' * payload_size)
1212                 for i in range(count)]
1213
1214     def verify_decrypted(self, p, rxs):
1215         for rx in rxs:
1216             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1217             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1218
1219     def verify_encrypted(self, p, sa, rxs):
1220         for rx in rxs:
1221             try:
1222                 pkt = sa.decrypt(rx[IP])
1223                 if not pkt.haslayer(IP):
1224                     pkt = IP(pkt[Raw].load)
1225                 self.assert_packet_checksums_valid(pkt)
1226                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1227                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1228                 self.assertTrue(pkt.haslayer(GRE))
1229                 e = pkt[GRE]
1230                 self.assertEqual(e[IP].dst, "1.1.1.2")
1231             except (IndexError, AssertionError):
1232                 self.logger.debug(ppp("Unexpected packet:", rx))
1233                 try:
1234                     self.logger.debug(ppp("Decrypted packet:", pkt))
1235                 except:
1236                     pass
1237                 raise
1238
1239     def setUp(self):
1240         super(TestIpsecGreIfEsp, self).setUp()
1241
1242         self.tun_if = self.pg0
1243
1244         p = self.ipv4_params
1245
1246         bd1 = VppBridgeDomain(self, 1)
1247         bd1.add_vpp_config()
1248
1249         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1250                                   p.auth_algo_vpp_id, p.auth_key,
1251                                   p.crypt_algo_vpp_id, p.crypt_key,
1252                                   self.vpp_esp_protocol,
1253                                   self.pg0.local_ip4,
1254                                   self.pg0.remote_ip4)
1255         p.tun_sa_out.add_vpp_config()
1256
1257         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1258                                  p.auth_algo_vpp_id, p.auth_key,
1259                                  p.crypt_algo_vpp_id, p.crypt_key,
1260                                  self.vpp_esp_protocol,
1261                                  self.pg0.remote_ip4,
1262                                  self.pg0.local_ip4)
1263         p.tun_sa_in.add_vpp_config()
1264
1265         p.tun_if = VppGreInterface(self,
1266                                    self.pg0.local_ip4,
1267                                    self.pg0.remote_ip4)
1268         p.tun_if.add_vpp_config()
1269
1270         p.tun_protect = VppIpsecTunProtect(self,
1271                                            p.tun_if,
1272                                            p.tun_sa_out,
1273                                            [p.tun_sa_in])
1274         p.tun_protect.add_vpp_config()
1275
1276         p.tun_if.admin_up()
1277         p.tun_if.config_ip4()
1278         config_tun_params(p, self.encryption_type, p.tun_if)
1279
1280         VppIpRoute(self, "1.1.1.2", 32,
1281                    [VppRoutePath(p.tun_if.remote_ip4,
1282                                  0xffffffff)]).add_vpp_config()
1283
1284     def tearDown(self):
1285         p = self.ipv4_params
1286         p.tun_if.unconfig_ip4()
1287         super(TestIpsecGreIfEsp, self).tearDown()
1288
1289
1290 class TestIpsecGreIfEspTra(TemplateIpsec,
1291                            IpsecTun4Tests):
1292     """ Ipsec GRE ESP - TRA tests """
1293     tun4_encrypt_node_name = "esp4-encrypt-tun"
1294     tun4_decrypt_node_name = "esp4-decrypt-tun"
1295     encryption_type = ESP
1296
1297     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1298                          payload_size=100):
1299         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1300                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1301                               dst=self.pg0.local_ip4) /
1302                            GRE() /
1303                            IP(src=self.pg1.local_ip4,
1304                               dst=self.pg1.remote_ip4) /
1305                            UDP(sport=1144, dport=2233) /
1306                            Raw(b'X' * payload_size))
1307                 for i in range(count)]
1308
1309     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1310                                 payload_size=100):
1311         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1312                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1313                               dst=self.pg0.local_ip4) /
1314                            GRE() /
1315                            UDP(sport=1144, dport=2233) /
1316                            Raw(b'X' * payload_size))
1317                 for i in range(count)]
1318
1319     def gen_pkts(self, sw_intf, src, dst, count=1,
1320                  payload_size=100):
1321         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1322                 IP(src="1.1.1.1", dst="1.1.1.2") /
1323                 UDP(sport=1144, dport=2233) /
1324                 Raw(b'X' * payload_size)
1325                 for i in range(count)]
1326
1327     def verify_decrypted(self, p, rxs):
1328         for rx in rxs:
1329             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1330             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1331
1332     def verify_encrypted(self, p, sa, rxs):
1333         for rx in rxs:
1334             try:
1335                 pkt = sa.decrypt(rx[IP])
1336                 if not pkt.haslayer(IP):
1337                     pkt = IP(pkt[Raw].load)
1338                 self.assert_packet_checksums_valid(pkt)
1339                 self.assertTrue(pkt.haslayer(GRE))
1340                 e = pkt[GRE]
1341                 self.assertEqual(e[IP].dst, "1.1.1.2")
1342             except (IndexError, AssertionError):
1343                 self.logger.debug(ppp("Unexpected packet:", rx))
1344                 try:
1345                     self.logger.debug(ppp("Decrypted packet:", pkt))
1346                 except:
1347                     pass
1348                 raise
1349
1350     def setUp(self):
1351         super(TestIpsecGreIfEspTra, self).setUp()
1352
1353         self.tun_if = self.pg0
1354
1355         p = self.ipv4_params
1356
1357         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1358                                   p.auth_algo_vpp_id, p.auth_key,
1359                                   p.crypt_algo_vpp_id, p.crypt_key,
1360                                   self.vpp_esp_protocol)
1361         p.tun_sa_out.add_vpp_config()
1362
1363         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1364                                  p.auth_algo_vpp_id, p.auth_key,
1365                                  p.crypt_algo_vpp_id, p.crypt_key,
1366                                  self.vpp_esp_protocol)
1367         p.tun_sa_in.add_vpp_config()
1368
1369         p.tun_if = VppGreInterface(self,
1370                                    self.pg0.local_ip4,
1371                                    self.pg0.remote_ip4)
1372         p.tun_if.add_vpp_config()
1373
1374         p.tun_protect = VppIpsecTunProtect(self,
1375                                            p.tun_if,
1376                                            p.tun_sa_out,
1377                                            [p.tun_sa_in])
1378         p.tun_protect.add_vpp_config()
1379
1380         p.tun_if.admin_up()
1381         p.tun_if.config_ip4()
1382         config_tra_params(p, self.encryption_type, p.tun_if)
1383
1384         VppIpRoute(self, "1.1.1.2", 32,
1385                    [VppRoutePath(p.tun_if.remote_ip4,
1386                                  0xffffffff)]).add_vpp_config()
1387
1388     def tearDown(self):
1389         p = self.ipv4_params
1390         p.tun_if.unconfig_ip4()
1391         super(TestIpsecGreIfEspTra, self).tearDown()
1392
1393     def test_gre_non_ip(self):
1394         p = self.ipv4_params
1395         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1396                                           src=p.remote_tun_if_host,
1397                                           dst=self.pg1.remote_ip6)
1398         self.send_and_assert_no_replies(self.tun_if, tx)
1399         node_name = ('/err/%s/unsupported payload' %
1400                      self.tun4_decrypt_node_name)
1401         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1402
1403
1404 class TestIpsecGre6IfEspTra(TemplateIpsec,
1405                             IpsecTun6Tests):
1406     """ Ipsec GRE ESP - TRA tests """
1407     tun6_encrypt_node_name = "esp6-encrypt-tun"
1408     tun6_decrypt_node_name = "esp6-decrypt-tun"
1409     encryption_type = ESP
1410
1411     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1412                           payload_size=100):
1413         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1414                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1415                                 dst=self.pg0.local_ip6) /
1416                            GRE() /
1417                            IPv6(src=self.pg1.local_ip6,
1418                                 dst=self.pg1.remote_ip6) /
1419                            UDP(sport=1144, dport=2233) /
1420                            Raw(b'X' * payload_size))
1421                 for i in range(count)]
1422
1423     def gen_pkts6(self, sw_intf, src, dst, count=1,
1424                   payload_size=100):
1425         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1426                 IPv6(src="1::1", dst="1::2") /
1427                 UDP(sport=1144, dport=2233) /
1428                 Raw(b'X' * payload_size)
1429                 for i in range(count)]
1430
1431     def verify_decrypted6(self, p, rxs):
1432         for rx in rxs:
1433             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1434             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1435
1436     def verify_encrypted6(self, p, sa, rxs):
1437         for rx in rxs:
1438             try:
1439                 pkt = sa.decrypt(rx[IPv6])
1440                 if not pkt.haslayer(IPv6):
1441                     pkt = IPv6(pkt[Raw].load)
1442                 self.assert_packet_checksums_valid(pkt)
1443                 self.assertTrue(pkt.haslayer(GRE))
1444                 e = pkt[GRE]
1445                 self.assertEqual(e[IPv6].dst, "1::2")
1446             except (IndexError, AssertionError):
1447                 self.logger.debug(ppp("Unexpected packet:", rx))
1448                 try:
1449                     self.logger.debug(ppp("Decrypted packet:", pkt))
1450                 except:
1451                     pass
1452                 raise
1453
1454     def setUp(self):
1455         super(TestIpsecGre6IfEspTra, self).setUp()
1456
1457         self.tun_if = self.pg0
1458
1459         p = self.ipv6_params
1460
1461         bd1 = VppBridgeDomain(self, 1)
1462         bd1.add_vpp_config()
1463
1464         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1465                                   p.auth_algo_vpp_id, p.auth_key,
1466                                   p.crypt_algo_vpp_id, p.crypt_key,
1467                                   self.vpp_esp_protocol)
1468         p.tun_sa_out.add_vpp_config()
1469
1470         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1471                                  p.auth_algo_vpp_id, p.auth_key,
1472                                  p.crypt_algo_vpp_id, p.crypt_key,
1473                                  self.vpp_esp_protocol)
1474         p.tun_sa_in.add_vpp_config()
1475
1476         p.tun_if = VppGreInterface(self,
1477                                    self.pg0.local_ip6,
1478                                    self.pg0.remote_ip6)
1479         p.tun_if.add_vpp_config()
1480
1481         p.tun_protect = VppIpsecTunProtect(self,
1482                                            p.tun_if,
1483                                            p.tun_sa_out,
1484                                            [p.tun_sa_in])
1485         p.tun_protect.add_vpp_config()
1486
1487         p.tun_if.admin_up()
1488         p.tun_if.config_ip6()
1489         config_tra_params(p, self.encryption_type, p.tun_if)
1490
1491         r = VppIpRoute(self, "1::2", 128,
1492                        [VppRoutePath(p.tun_if.remote_ip6,
1493                                      0xffffffff,
1494                                      proto=DpoProto.DPO_PROTO_IP6)])
1495         r.add_vpp_config()
1496
1497     def tearDown(self):
1498         p = self.ipv6_params
1499         p.tun_if.unconfig_ip6()
1500         super(TestIpsecGre6IfEspTra, self).tearDown()
1501
1502
1503 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1504     """ Ipsec mGRE ESP v4 TRA tests """
1505     tun4_encrypt_node_name = "esp4-encrypt-tun"
1506     tun4_decrypt_node_name = "esp4-decrypt-tun"
1507     encryption_type = ESP
1508
1509     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1510                          payload_size=100):
1511         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1512                 sa.encrypt(IP(src=p.tun_dst,
1513                               dst=self.pg0.local_ip4) /
1514                            GRE() /
1515                            IP(src=self.pg1.local_ip4,
1516                               dst=self.pg1.remote_ip4) /
1517                            UDP(sport=1144, dport=2233) /
1518                            Raw(b'X' * payload_size))
1519                 for i in range(count)]
1520
1521     def gen_pkts(self, sw_intf, src, dst, count=1,
1522                  payload_size=100):
1523         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1524                 IP(src="1.1.1.1", dst=dst) /
1525                 UDP(sport=1144, dport=2233) /
1526                 Raw(b'X' * payload_size)
1527                 for i in range(count)]
1528
1529     def verify_decrypted(self, p, rxs):
1530         for rx in rxs:
1531             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1532             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1533
1534     def verify_encrypted(self, p, sa, rxs):
1535         for rx in rxs:
1536             try:
1537                 pkt = sa.decrypt(rx[IP])
1538                 if not pkt.haslayer(IP):
1539                     pkt = IP(pkt[Raw].load)
1540                 self.assert_packet_checksums_valid(pkt)
1541                 self.assertTrue(pkt.haslayer(GRE))
1542                 e = pkt[GRE]
1543                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1544             except (IndexError, AssertionError):
1545                 self.logger.debug(ppp("Unexpected packet:", rx))
1546                 try:
1547                     self.logger.debug(ppp("Decrypted packet:", pkt))
1548                 except:
1549                     pass
1550                 raise
1551
1552     def setUp(self):
1553         super(TestIpsecMGreIfEspTra4, self).setUp()
1554
1555         N_NHS = 16
1556         self.tun_if = self.pg0
1557         p = self.ipv4_params
1558         p.tun_if = VppGreInterface(self,
1559                                    self.pg0.local_ip4,
1560                                    "0.0.0.0",
1561                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1562                                          TUNNEL_API_MODE_MP))
1563         p.tun_if.add_vpp_config()
1564         p.tun_if.admin_up()
1565         p.tun_if.config_ip4()
1566         p.tun_if.generate_remote_hosts(N_NHS)
1567         self.pg0.generate_remote_hosts(N_NHS)
1568         self.pg0.configure_ipv4_neighbors()
1569
1570         # setup some SAs for several next-hops on the interface
1571         self.multi_params = []
1572
1573         for ii in range(1):
1574             p = copy.copy(self.ipv4_params)
1575
1576             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1577             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1578             p.scapy_tun_spi = p.scapy_tun_spi + ii
1579             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1580             p.vpp_tun_spi = p.vpp_tun_spi + ii
1581
1582             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1583             p.scapy_tra_spi = p.scapy_tra_spi + ii
1584             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1585             p.vpp_tra_spi = p.vpp_tra_spi + ii
1586             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1587                                       p.auth_algo_vpp_id, p.auth_key,
1588                                       p.crypt_algo_vpp_id, p.crypt_key,
1589                                       self.vpp_esp_protocol)
1590             p.tun_sa_out.add_vpp_config()
1591
1592             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1593                                      p.auth_algo_vpp_id, p.auth_key,
1594                                      p.crypt_algo_vpp_id, p.crypt_key,
1595                                      self.vpp_esp_protocol)
1596             p.tun_sa_in.add_vpp_config()
1597
1598             p.tun_protect = VppIpsecTunProtect(
1599                 self,
1600                 p.tun_if,
1601                 p.tun_sa_out,
1602                 [p.tun_sa_in],
1603                 nh=p.tun_if.remote_hosts[ii].ip4)
1604             p.tun_protect.add_vpp_config()
1605             config_tra_params(p, self.encryption_type, p.tun_if)
1606             self.multi_params.append(p)
1607
1608             VppIpRoute(self, p.remote_tun_if_host, 32,
1609                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1610                                      p.tun_if.sw_if_index)]).add_vpp_config()
1611
1612             # in this v4 variant add the teibs after the protect
1613             p.teib = VppTeib(self, p.tun_if,
1614                              p.tun_if.remote_hosts[ii].ip4,
1615                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1616             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1617         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1618
1619     def tearDown(self):
1620         p = self.ipv4_params
1621         p.tun_if.unconfig_ip4()
1622         super(TestIpsecMGreIfEspTra4, self).tearDown()
1623
1624     def test_tun_44(self):
1625         """mGRE IPSEC 44"""
1626         N_PKTS = 63
1627         for p in self.multi_params:
1628             self.verify_tun_44(p, count=N_PKTS)
1629             p.teib.remove_vpp_config()
1630             self.verify_tun_dropped_44(p, count=N_PKTS)
1631             p.teib.add_vpp_config()
1632             self.verify_tun_44(p, count=N_PKTS)
1633
1634
1635 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1636     """ Ipsec mGRE ESP v6 TRA tests """
1637     tun6_encrypt_node_name = "esp6-encrypt-tun"
1638     tun6_decrypt_node_name = "esp6-decrypt-tun"
1639     encryption_type = ESP
1640
1641     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1642                           payload_size=100):
1643         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1644                 sa.encrypt(IPv6(src=p.tun_dst,
1645                                 dst=self.pg0.local_ip6) /
1646                            GRE() /
1647                            IPv6(src=self.pg1.local_ip6,
1648                                 dst=self.pg1.remote_ip6) /
1649                            UDP(sport=1144, dport=2233) /
1650                            Raw(b'X' * payload_size))
1651                 for i in range(count)]
1652
1653     def gen_pkts6(self, sw_intf, src, dst, count=1,
1654                   payload_size=100):
1655         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1656                 IPv6(src="1::1", dst=dst) /
1657                 UDP(sport=1144, dport=2233) /
1658                 Raw(b'X' * payload_size)
1659                 for i in range(count)]
1660
1661     def verify_decrypted6(self, p, rxs):
1662         for rx in rxs:
1663             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1664             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1665
1666     def verify_encrypted6(self, p, sa, rxs):
1667         for rx in rxs:
1668             try:
1669                 pkt = sa.decrypt(rx[IPv6])
1670                 if not pkt.haslayer(IPv6):
1671                     pkt = IPv6(pkt[Raw].load)
1672                 self.assert_packet_checksums_valid(pkt)
1673                 self.assertTrue(pkt.haslayer(GRE))
1674                 e = pkt[GRE]
1675                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1676             except (IndexError, AssertionError):
1677                 self.logger.debug(ppp("Unexpected packet:", rx))
1678                 try:
1679                     self.logger.debug(ppp("Decrypted packet:", pkt))
1680                 except:
1681                     pass
1682                 raise
1683
1684     def setUp(self):
1685         super(TestIpsecMGreIfEspTra6, self).setUp()
1686
1687         self.vapi.cli("set logging class ipsec level debug")
1688
1689         N_NHS = 16
1690         self.tun_if = self.pg0
1691         p = self.ipv6_params
1692         p.tun_if = VppGreInterface(self,
1693                                    self.pg0.local_ip6,
1694                                    "::",
1695                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1696                                          TUNNEL_API_MODE_MP))
1697         p.tun_if.add_vpp_config()
1698         p.tun_if.admin_up()
1699         p.tun_if.config_ip6()
1700         p.tun_if.generate_remote_hosts(N_NHS)
1701         self.pg0.generate_remote_hosts(N_NHS)
1702         self.pg0.configure_ipv6_neighbors()
1703
1704         # setup some SAs for several next-hops on the interface
1705         self.multi_params = []
1706
1707         for ii in range(N_NHS):
1708             p = copy.copy(self.ipv6_params)
1709
1710             p.remote_tun_if_host = "1::%d" % (ii + 1)
1711             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1712             p.scapy_tun_spi = p.scapy_tun_spi + ii
1713             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1714             p.vpp_tun_spi = p.vpp_tun_spi + ii
1715
1716             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1717             p.scapy_tra_spi = p.scapy_tra_spi + ii
1718             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1719             p.vpp_tra_spi = p.vpp_tra_spi + ii
1720             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1721                                       p.auth_algo_vpp_id, p.auth_key,
1722                                       p.crypt_algo_vpp_id, p.crypt_key,
1723                                       self.vpp_esp_protocol)
1724             p.tun_sa_out.add_vpp_config()
1725
1726             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1727                                      p.auth_algo_vpp_id, p.auth_key,
1728                                      p.crypt_algo_vpp_id, p.crypt_key,
1729                                      self.vpp_esp_protocol)
1730             p.tun_sa_in.add_vpp_config()
1731
1732             # in this v6 variant add the teibs first then the protection
1733             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1734             VppTeib(self, p.tun_if,
1735                     p.tun_if.remote_hosts[ii].ip6,
1736                     p.tun_dst).add_vpp_config()
1737
1738             p.tun_protect = VppIpsecTunProtect(
1739                 self,
1740                 p.tun_if,
1741                 p.tun_sa_out,
1742                 [p.tun_sa_in],
1743                 nh=p.tun_if.remote_hosts[ii].ip6)
1744             p.tun_protect.add_vpp_config()
1745             config_tra_params(p, self.encryption_type, p.tun_if)
1746             self.multi_params.append(p)
1747
1748             VppIpRoute(self, p.remote_tun_if_host, 128,
1749                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1750                                      p.tun_if.sw_if_index)]).add_vpp_config()
1751             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1752
1753         self.logger.info(self.vapi.cli("sh log"))
1754         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1755         self.logger.info(self.vapi.cli("sh adj 41"))
1756
1757     def tearDown(self):
1758         p = self.ipv6_params
1759         p.tun_if.unconfig_ip6()
1760         super(TestIpsecMGreIfEspTra6, self).tearDown()
1761
1762     def test_tun_66(self):
1763         """mGRE IPSec 66"""
1764         for p in self.multi_params:
1765             self.verify_tun_66(p, count=63)
1766
1767
1768 class TemplateIpsec4TunProtect(object):
1769     """ IPsec IPv4 Tunnel protect """
1770
1771     encryption_type = ESP
1772     tun4_encrypt_node_name = "esp4-encrypt-tun"
1773     tun4_decrypt_node_name = "esp4-decrypt-tun"
1774     tun4_input_node = "ipsec4-tun-input"
1775
1776     def config_sa_tra(self, p):
1777         config_tun_params(p, self.encryption_type, p.tun_if)
1778
1779         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1780                                   p.auth_algo_vpp_id, p.auth_key,
1781                                   p.crypt_algo_vpp_id, p.crypt_key,
1782                                   self.vpp_esp_protocol,
1783                                   flags=p.flags)
1784         p.tun_sa_out.add_vpp_config()
1785
1786         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1787                                  p.auth_algo_vpp_id, p.auth_key,
1788                                  p.crypt_algo_vpp_id, p.crypt_key,
1789                                  self.vpp_esp_protocol,
1790                                  flags=p.flags)
1791         p.tun_sa_in.add_vpp_config()
1792
1793     def config_sa_tun(self, p):
1794         config_tun_params(p, self.encryption_type, p.tun_if)
1795
1796         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1797                                   p.auth_algo_vpp_id, p.auth_key,
1798                                   p.crypt_algo_vpp_id, p.crypt_key,
1799                                   self.vpp_esp_protocol,
1800                                   self.tun_if.local_addr[p.addr_type],
1801                                   self.tun_if.remote_addr[p.addr_type],
1802                                   flags=p.flags)
1803         p.tun_sa_out.add_vpp_config()
1804
1805         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1806                                  p.auth_algo_vpp_id, p.auth_key,
1807                                  p.crypt_algo_vpp_id, p.crypt_key,
1808                                  self.vpp_esp_protocol,
1809                                  self.tun_if.remote_addr[p.addr_type],
1810                                  self.tun_if.local_addr[p.addr_type],
1811                                  flags=p.flags)
1812         p.tun_sa_in.add_vpp_config()
1813
1814     def config_protect(self, p):
1815         p.tun_protect = VppIpsecTunProtect(self,
1816                                            p.tun_if,
1817                                            p.tun_sa_out,
1818                                            [p.tun_sa_in])
1819         p.tun_protect.add_vpp_config()
1820
1821     def config_network(self, p):
1822         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1823                                        self.pg0.local_ip4,
1824                                        self.pg0.remote_ip4)
1825         p.tun_if.add_vpp_config()
1826         p.tun_if.admin_up()
1827         p.tun_if.config_ip4()
1828         p.tun_if.config_ip6()
1829
1830         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1831                              [VppRoutePath(p.tun_if.remote_ip4,
1832                                            0xffffffff)])
1833         p.route.add_vpp_config()
1834         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1835                        [VppRoutePath(p.tun_if.remote_ip6,
1836                                      0xffffffff,
1837                                      proto=DpoProto.DPO_PROTO_IP6)])
1838         r.add_vpp_config()
1839
1840     def unconfig_network(self, p):
1841         p.route.remove_vpp_config()
1842         p.tun_if.remove_vpp_config()
1843
1844     def unconfig_protect(self, p):
1845         p.tun_protect.remove_vpp_config()
1846
1847     def unconfig_sa(self, p):
1848         p.tun_sa_out.remove_vpp_config()
1849         p.tun_sa_in.remove_vpp_config()
1850
1851
1852 class TestIpsec4TunProtect(TemplateIpsec,
1853                            TemplateIpsec4TunProtect,
1854                            IpsecTun4):
1855     """ IPsec IPv4 Tunnel protect - transport mode"""
1856
1857     def setUp(self):
1858         super(TestIpsec4TunProtect, self).setUp()
1859
1860         self.tun_if = self.pg0
1861
1862     def tearDown(self):
1863         super(TestIpsec4TunProtect, self).tearDown()
1864
1865     def test_tun_44(self):
1866         """IPSEC tunnel protect"""
1867
1868         p = self.ipv4_params
1869
1870         self.config_network(p)
1871         self.config_sa_tra(p)
1872         self.config_protect(p)
1873
1874         self.verify_tun_44(p, count=127)
1875         c = p.tun_if.get_rx_stats()
1876         self.assertEqual(c['packets'], 127)
1877         c = p.tun_if.get_tx_stats()
1878         self.assertEqual(c['packets'], 127)
1879
1880         self.vapi.cli("clear ipsec sa")
1881         self.verify_tun_64(p, count=127)
1882         c = p.tun_if.get_rx_stats()
1883         self.assertEqual(c['packets'], 254)
1884         c = p.tun_if.get_tx_stats()
1885         self.assertEqual(c['packets'], 254)
1886
1887         # rekey - create new SAs and update the tunnel protection
1888         np = copy.copy(p)
1889         np.crypt_key = b'X' + p.crypt_key[1:]
1890         np.scapy_tun_spi += 100
1891         np.scapy_tun_sa_id += 1
1892         np.vpp_tun_spi += 100
1893         np.vpp_tun_sa_id += 1
1894         np.tun_if.local_spi = p.vpp_tun_spi
1895         np.tun_if.remote_spi = p.scapy_tun_spi
1896
1897         self.config_sa_tra(np)
1898         self.config_protect(np)
1899         self.unconfig_sa(p)
1900
1901         self.verify_tun_44(np, count=127)
1902         c = p.tun_if.get_rx_stats()
1903         self.assertEqual(c['packets'], 381)
1904         c = p.tun_if.get_tx_stats()
1905         self.assertEqual(c['packets'], 381)
1906
1907         # teardown
1908         self.unconfig_protect(np)
1909         self.unconfig_sa(np)
1910         self.unconfig_network(p)
1911
1912
1913 class TestIpsec4TunProtectUdp(TemplateIpsec,
1914                               TemplateIpsec4TunProtect,
1915                               IpsecTun4):
1916     """ IPsec IPv4 Tunnel protect - transport mode"""
1917
1918     def setUp(self):
1919         super(TestIpsec4TunProtectUdp, self).setUp()
1920
1921         self.tun_if = self.pg0
1922
1923         p = self.ipv4_params
1924         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1925                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1926         p.nat_header = UDP(sport=4500, dport=4500)
1927         self.config_network(p)
1928         self.config_sa_tra(p)
1929         self.config_protect(p)
1930
1931     def tearDown(self):
1932         p = self.ipv4_params
1933         self.unconfig_protect(p)
1934         self.unconfig_sa(p)
1935         self.unconfig_network(p)
1936         super(TestIpsec4TunProtectUdp, self).tearDown()
1937
1938     def verify_encrypted(self, p, sa, rxs):
1939         # ensure encrypted packets are recieved with the default UDP ports
1940         for rx in rxs:
1941             self.assertEqual(rx[UDP].sport, 4500)
1942             self.assertEqual(rx[UDP].dport, 4500)
1943         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1944
1945     def test_tun_44(self):
1946         """IPSEC UDP tunnel protect"""
1947
1948         p = self.ipv4_params
1949
1950         self.verify_tun_44(p, count=127)
1951         c = p.tun_if.get_rx_stats()
1952         self.assertEqual(c['packets'], 127)
1953         c = p.tun_if.get_tx_stats()
1954         self.assertEqual(c['packets'], 127)
1955
1956     def test_keepalive(self):
1957         """ IPSEC NAT Keepalive """
1958         self.verify_keepalive(self.ipv4_params)
1959
1960
1961 class TestIpsec4TunProtectTun(TemplateIpsec,
1962                               TemplateIpsec4TunProtect,
1963                               IpsecTun4):
1964     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1965
1966     encryption_type = ESP
1967     tun4_encrypt_node_name = "esp4-encrypt-tun"
1968     tun4_decrypt_node_name = "esp4-decrypt-tun"
1969
1970     def setUp(self):
1971         super(TestIpsec4TunProtectTun, self).setUp()
1972
1973         self.tun_if = self.pg0
1974
1975     def tearDown(self):
1976         super(TestIpsec4TunProtectTun, self).tearDown()
1977
1978     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1979                          payload_size=100):
1980         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1981                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1982                               dst=sw_intf.local_ip4) /
1983                            IP(src=src, dst=dst) /
1984                            UDP(sport=1144, dport=2233) /
1985                            Raw(b'X' * payload_size))
1986                 for i in range(count)]
1987
1988     def gen_pkts(self, sw_intf, src, dst, count=1,
1989                  payload_size=100):
1990         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1991                 IP(src=src, dst=dst) /
1992                 UDP(sport=1144, dport=2233) /
1993                 Raw(b'X' * payload_size)
1994                 for i in range(count)]
1995
1996     def verify_decrypted(self, p, rxs):
1997         for rx in rxs:
1998             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1999             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2000             self.assert_packet_checksums_valid(rx)
2001
2002     def verify_encrypted(self, p, sa, rxs):
2003         for rx in rxs:
2004             try:
2005                 pkt = sa.decrypt(rx[IP])
2006                 if not pkt.haslayer(IP):
2007                     pkt = IP(pkt[Raw].load)
2008                 self.assert_packet_checksums_valid(pkt)
2009                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2010                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2011                 inner = pkt[IP].payload
2012                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2013
2014             except (IndexError, AssertionError):
2015                 self.logger.debug(ppp("Unexpected packet:", rx))
2016                 try:
2017                     self.logger.debug(ppp("Decrypted packet:", pkt))
2018                 except:
2019                     pass
2020                 raise
2021
2022     def test_tun_44(self):
2023         """IPSEC tunnel protect """
2024
2025         p = self.ipv4_params
2026
2027         self.config_network(p)
2028         self.config_sa_tun(p)
2029         self.config_protect(p)
2030
2031         # also add an output features on the tunnel and physical interface
2032         # so we test they still work
2033         r_all = AclRule(True,
2034                         src_prefix="0.0.0.0/0",
2035                         dst_prefix="0.0.0.0/0",
2036                         proto=0)
2037         a = VppAcl(self, [r_all]).add_vpp_config()
2038
2039         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2040         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2041
2042         self.verify_tun_44(p, count=127)
2043
2044         c = p.tun_if.get_rx_stats()
2045         self.assertEqual(c['packets'], 127)
2046         c = p.tun_if.get_tx_stats()
2047         self.assertEqual(c['packets'], 127)
2048
2049         # rekey - create new SAs and update the tunnel protection
2050         np = copy.copy(p)
2051         np.crypt_key = b'X' + p.crypt_key[1:]
2052         np.scapy_tun_spi += 100
2053         np.scapy_tun_sa_id += 1
2054         np.vpp_tun_spi += 100
2055         np.vpp_tun_sa_id += 1
2056         np.tun_if.local_spi = p.vpp_tun_spi
2057         np.tun_if.remote_spi = p.scapy_tun_spi
2058
2059         self.config_sa_tun(np)
2060         self.config_protect(np)
2061         self.unconfig_sa(p)
2062
2063         self.verify_tun_44(np, count=127)
2064         c = p.tun_if.get_rx_stats()
2065         self.assertEqual(c['packets'], 254)
2066         c = p.tun_if.get_tx_stats()
2067         self.assertEqual(c['packets'], 254)
2068
2069         # teardown
2070         self.unconfig_protect(np)
2071         self.unconfig_sa(np)
2072         self.unconfig_network(p)
2073
2074
2075 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2076                                   TemplateIpsec4TunProtect,
2077                                   IpsecTun4):
2078     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2079
2080     encryption_type = ESP
2081     tun4_encrypt_node_name = "esp4-encrypt-tun"
2082     tun4_decrypt_node_name = "esp4-decrypt-tun"
2083
2084     def setUp(self):
2085         super(TestIpsec4TunProtectTunDrop, self).setUp()
2086
2087         self.tun_if = self.pg0
2088
2089     def tearDown(self):
2090         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2091
2092     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2093                          payload_size=100):
2094         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2095                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2096                               dst="5.5.5.5") /
2097                            IP(src=src, dst=dst) /
2098                            UDP(sport=1144, dport=2233) /
2099                            Raw(b'X' * payload_size))
2100                 for i in range(count)]
2101
2102     def test_tun_drop_44(self):
2103         """IPSEC tunnel protect bogus tunnel header """
2104
2105         p = self.ipv4_params
2106
2107         self.config_network(p)
2108         self.config_sa_tun(p)
2109         self.config_protect(p)
2110
2111         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2112                                    src=p.remote_tun_if_host,
2113                                    dst=self.pg1.remote_ip4,
2114                                    count=63)
2115         self.send_and_assert_no_replies(self.tun_if, tx)
2116
2117         # teardown
2118         self.unconfig_protect(p)
2119         self.unconfig_sa(p)
2120         self.unconfig_network(p)
2121
2122
2123 class TemplateIpsec6TunProtect(object):
2124     """ IPsec IPv6 Tunnel protect """
2125
2126     def config_sa_tra(self, p):
2127         config_tun_params(p, self.encryption_type, p.tun_if)
2128
2129         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2130                                   p.auth_algo_vpp_id, p.auth_key,
2131                                   p.crypt_algo_vpp_id, p.crypt_key,
2132                                   self.vpp_esp_protocol)
2133         p.tun_sa_out.add_vpp_config()
2134
2135         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2136                                  p.auth_algo_vpp_id, p.auth_key,
2137                                  p.crypt_algo_vpp_id, p.crypt_key,
2138                                  self.vpp_esp_protocol)
2139         p.tun_sa_in.add_vpp_config()
2140
2141     def config_sa_tun(self, p):
2142         config_tun_params(p, self.encryption_type, p.tun_if)
2143
2144         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2145                                   p.auth_algo_vpp_id, p.auth_key,
2146                                   p.crypt_algo_vpp_id, p.crypt_key,
2147                                   self.vpp_esp_protocol,
2148                                   self.tun_if.local_addr[p.addr_type],
2149                                   self.tun_if.remote_addr[p.addr_type])
2150         p.tun_sa_out.add_vpp_config()
2151
2152         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2153                                  p.auth_algo_vpp_id, p.auth_key,
2154                                  p.crypt_algo_vpp_id, p.crypt_key,
2155                                  self.vpp_esp_protocol,
2156                                  self.tun_if.remote_addr[p.addr_type],
2157                                  self.tun_if.local_addr[p.addr_type])
2158         p.tun_sa_in.add_vpp_config()
2159
2160     def config_protect(self, p):
2161         p.tun_protect = VppIpsecTunProtect(self,
2162                                            p.tun_if,
2163                                            p.tun_sa_out,
2164                                            [p.tun_sa_in])
2165         p.tun_protect.add_vpp_config()
2166
2167     def config_network(self, p):
2168         p.tun_if = VppIpIpTunInterface(self, self.pg0,
2169                                        self.pg0.local_ip6,
2170                                        self.pg0.remote_ip6)
2171         p.tun_if.add_vpp_config()
2172         p.tun_if.admin_up()
2173         p.tun_if.config_ip6()
2174         p.tun_if.config_ip4()
2175
2176         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2177                              [VppRoutePath(p.tun_if.remote_ip6,
2178                                            0xffffffff,
2179                                            proto=DpoProto.DPO_PROTO_IP6)])
2180         p.route.add_vpp_config()
2181         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2182                        [VppRoutePath(p.tun_if.remote_ip4,
2183                                      0xffffffff)])
2184         r.add_vpp_config()
2185
2186     def unconfig_network(self, p):
2187         p.route.remove_vpp_config()
2188         p.tun_if.remove_vpp_config()
2189
2190     def unconfig_protect(self, p):
2191         p.tun_protect.remove_vpp_config()
2192
2193     def unconfig_sa(self, p):
2194         p.tun_sa_out.remove_vpp_config()
2195         p.tun_sa_in.remove_vpp_config()
2196
2197
2198 class TestIpsec6TunProtect(TemplateIpsec,
2199                            TemplateIpsec6TunProtect,
2200                            IpsecTun6):
2201     """ IPsec IPv6 Tunnel protect - transport mode"""
2202
2203     encryption_type = ESP
2204     tun6_encrypt_node_name = "esp6-encrypt-tun"
2205     tun6_decrypt_node_name = "esp6-decrypt-tun"
2206
2207     def setUp(self):
2208         super(TestIpsec6TunProtect, self).setUp()
2209
2210         self.tun_if = self.pg0
2211
2212     def tearDown(self):
2213         super(TestIpsec6TunProtect, self).tearDown()
2214
2215     def test_tun_66(self):
2216         """IPSEC tunnel protect 6o6"""
2217
2218         p = self.ipv6_params
2219
2220         self.config_network(p)
2221         self.config_sa_tra(p)
2222         self.config_protect(p)
2223
2224         self.verify_tun_66(p, count=127)
2225         c = p.tun_if.get_rx_stats()
2226         self.assertEqual(c['packets'], 127)
2227         c = p.tun_if.get_tx_stats()
2228         self.assertEqual(c['packets'], 127)
2229
2230         # rekey - create new SAs and update the tunnel protection
2231         np = copy.copy(p)
2232         np.crypt_key = b'X' + p.crypt_key[1:]
2233         np.scapy_tun_spi += 100
2234         np.scapy_tun_sa_id += 1
2235         np.vpp_tun_spi += 100
2236         np.vpp_tun_sa_id += 1
2237         np.tun_if.local_spi = p.vpp_tun_spi
2238         np.tun_if.remote_spi = p.scapy_tun_spi
2239
2240         self.config_sa_tra(np)
2241         self.config_protect(np)
2242         self.unconfig_sa(p)
2243
2244         self.verify_tun_66(np, count=127)
2245         c = p.tun_if.get_rx_stats()
2246         self.assertEqual(c['packets'], 254)
2247         c = p.tun_if.get_tx_stats()
2248         self.assertEqual(c['packets'], 254)
2249
2250         # bounce the interface state
2251         p.tun_if.admin_down()
2252         self.verify_drop_tun_66(np, count=127)
2253         node = ('/err/ipsec6-tun-input/%s' %
2254                 'ipsec packets received on disabled interface')
2255         self.assertEqual(127, self.statistics.get_err_counter(node))
2256         p.tun_if.admin_up()
2257         self.verify_tun_66(np, count=127)
2258
2259         # 3 phase rekey
2260         #  1) add two input SAs [old, new]
2261         #  2) swap output SA to [new]
2262         #  3) use only [new] input SA
2263         np3 = copy.copy(np)
2264         np3.crypt_key = b'Z' + p.crypt_key[1:]
2265         np3.scapy_tun_spi += 100
2266         np3.scapy_tun_sa_id += 1
2267         np3.vpp_tun_spi += 100
2268         np3.vpp_tun_sa_id += 1
2269         np3.tun_if.local_spi = p.vpp_tun_spi
2270         np3.tun_if.remote_spi = p.scapy_tun_spi
2271
2272         self.config_sa_tra(np3)
2273
2274         # step 1;
2275         p.tun_protect.update_vpp_config(np.tun_sa_out,
2276                                         [np.tun_sa_in, np3.tun_sa_in])
2277         self.verify_tun_66(np, np, count=127)
2278         self.verify_tun_66(np3, np, count=127)
2279
2280         # step 2;
2281         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2282                                         [np.tun_sa_in, np3.tun_sa_in])
2283         self.verify_tun_66(np, np3, count=127)
2284         self.verify_tun_66(np3, np3, count=127)
2285
2286         # step 1;
2287         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2288                                         [np3.tun_sa_in])
2289         self.verify_tun_66(np3, np3, count=127)
2290         self.verify_drop_tun_66(np, count=127)
2291
2292         c = p.tun_if.get_rx_stats()
2293         self.assertEqual(c['packets'], 127*9)
2294         c = p.tun_if.get_tx_stats()
2295         self.assertEqual(c['packets'], 127*8)
2296         self.unconfig_sa(np)
2297
2298         # teardown
2299         self.unconfig_protect(np3)
2300         self.unconfig_sa(np3)
2301         self.unconfig_network(p)
2302
2303     def test_tun_46(self):
2304         """IPSEC tunnel protect 4o6"""
2305
2306         p = self.ipv6_params
2307
2308         self.config_network(p)
2309         self.config_sa_tra(p)
2310         self.config_protect(p)
2311
2312         self.verify_tun_46(p, count=127)
2313         c = p.tun_if.get_rx_stats()
2314         self.assertEqual(c['packets'], 127)
2315         c = p.tun_if.get_tx_stats()
2316         self.assertEqual(c['packets'], 127)
2317
2318         # teardown
2319         self.unconfig_protect(p)
2320         self.unconfig_sa(p)
2321         self.unconfig_network(p)
2322
2323
2324 class TestIpsec6TunProtectTun(TemplateIpsec,
2325                               TemplateIpsec6TunProtect,
2326                               IpsecTun6):
2327     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2328
2329     encryption_type = ESP
2330     tun6_encrypt_node_name = "esp6-encrypt-tun"
2331     tun6_decrypt_node_name = "esp6-decrypt-tun"
2332
2333     def setUp(self):
2334         super(TestIpsec6TunProtectTun, self).setUp()
2335
2336         self.tun_if = self.pg0
2337
2338     def tearDown(self):
2339         super(TestIpsec6TunProtectTun, self).tearDown()
2340
2341     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2342                           payload_size=100):
2343         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2344                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2345                                 dst=sw_intf.local_ip6) /
2346                            IPv6(src=src, dst=dst) /
2347                            UDP(sport=1166, dport=2233) /
2348                            Raw(b'X' * payload_size))
2349                 for i in range(count)]
2350
2351     def gen_pkts6(self, sw_intf, src, dst, count=1,
2352                   payload_size=100):
2353         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2354                 IPv6(src=src, dst=dst) /
2355                 UDP(sport=1166, dport=2233) /
2356                 Raw(b'X' * payload_size)
2357                 for i in range(count)]
2358
2359     def verify_decrypted6(self, p, rxs):
2360         for rx in rxs:
2361             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2362             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2363             self.assert_packet_checksums_valid(rx)
2364
2365     def verify_encrypted6(self, p, sa, rxs):
2366         for rx in rxs:
2367             try:
2368                 pkt = sa.decrypt(rx[IPv6])
2369                 if not pkt.haslayer(IPv6):
2370                     pkt = IPv6(pkt[Raw].load)
2371                 self.assert_packet_checksums_valid(pkt)
2372                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2373                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2374                 inner = pkt[IPv6].payload
2375                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2376
2377             except (IndexError, AssertionError):
2378                 self.logger.debug(ppp("Unexpected packet:", rx))
2379                 try:
2380                     self.logger.debug(ppp("Decrypted packet:", pkt))
2381                 except:
2382                     pass
2383                 raise
2384
2385     def test_tun_66(self):
2386         """IPSEC tunnel protect """
2387
2388         p = self.ipv6_params
2389
2390         self.config_network(p)
2391         self.config_sa_tun(p)
2392         self.config_protect(p)
2393
2394         self.verify_tun_66(p, count=127)
2395
2396         c = p.tun_if.get_rx_stats()
2397         self.assertEqual(c['packets'], 127)
2398         c = p.tun_if.get_tx_stats()
2399         self.assertEqual(c['packets'], 127)
2400
2401         # rekey - create new SAs and update the tunnel protection
2402         np = copy.copy(p)
2403         np.crypt_key = b'X' + p.crypt_key[1:]
2404         np.scapy_tun_spi += 100
2405         np.scapy_tun_sa_id += 1
2406         np.vpp_tun_spi += 100
2407         np.vpp_tun_sa_id += 1
2408         np.tun_if.local_spi = p.vpp_tun_spi
2409         np.tun_if.remote_spi = p.scapy_tun_spi
2410
2411         self.config_sa_tun(np)
2412         self.config_protect(np)
2413         self.unconfig_sa(p)
2414
2415         self.verify_tun_66(np, count=127)
2416         c = p.tun_if.get_rx_stats()
2417         self.assertEqual(c['packets'], 254)
2418         c = p.tun_if.get_tx_stats()
2419         self.assertEqual(c['packets'], 254)
2420
2421         # teardown
2422         self.unconfig_protect(np)
2423         self.unconfig_sa(np)
2424         self.unconfig_network(p)
2425
2426
2427 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2428                                   TemplateIpsec6TunProtect,
2429                                   IpsecTun6):
2430     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2431
2432     encryption_type = ESP
2433     tun6_encrypt_node_name = "esp6-encrypt-tun"
2434     tun6_decrypt_node_name = "esp6-decrypt-tun"
2435
2436     def setUp(self):
2437         super(TestIpsec6TunProtectTunDrop, self).setUp()
2438
2439         self.tun_if = self.pg0
2440
2441     def tearDown(self):
2442         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2443
2444     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2445                           payload_size=100):
2446         # the IP destination of the revelaed packet does not match
2447         # that assigned to the tunnel
2448         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2449                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2450                                 dst="5::5") /
2451                            IPv6(src=src, dst=dst) /
2452                            UDP(sport=1144, dport=2233) /
2453                            Raw(b'X' * payload_size))
2454                 for i in range(count)]
2455
2456     def test_tun_drop_66(self):
2457         """IPSEC 6 tunnel protect bogus tunnel header """
2458
2459         p = self.ipv6_params
2460
2461         self.config_network(p)
2462         self.config_sa_tun(p)
2463         self.config_protect(p)
2464
2465         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2466                                     src=p.remote_tun_if_host,
2467                                     dst=self.pg1.remote_ip6,
2468                                     count=63)
2469         self.send_and_assert_no_replies(self.tun_if, tx)
2470
2471         self.unconfig_protect(p)
2472         self.unconfig_sa(p)
2473         self.unconfig_network(p)
2474
2475
2476 if __name__ == '__main__':
2477     unittest.main(testRunner=VppTestRunner)