5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
21 from vpp_papi import VppEnum
24 def config_tun_params(p, encryption_type, tun_if):
25 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
26 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
27 IPSEC_API_SAD_FLAG_USE_ESN))
28 crypt_key = mk_scapy_crypt_key(p)
29 p.tun_dst = tun_if.remote_ip
30 p.tun_src = tun_if.local_ip
31 p.scapy_tun_sa = SecurityAssociation(
32 encryption_type, spi=p.vpp_tun_spi,
33 crypt_algo=p.crypt_algo,
35 auth_algo=p.auth_algo, auth_key=p.auth_key,
36 tunnel_header=ip_class_by_addr_type[p.addr_type](
39 nat_t_header=p.nat_header,
41 p.vpp_tun_sa = SecurityAssociation(
42 encryption_type, spi=p.scapy_tun_spi,
43 crypt_algo=p.crypt_algo,
45 auth_algo=p.auth_algo, auth_key=p.auth_key,
46 tunnel_header=ip_class_by_addr_type[p.addr_type](
49 nat_t_header=p.nat_header,
53 def config_tra_params(p, encryption_type, tun_if):
54 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
55 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
56 IPSEC_API_SAD_FLAG_USE_ESN))
57 crypt_key = mk_scapy_crypt_key(p)
58 p.tun_dst = tun_if.remote_ip
59 p.tun_src = tun_if.local_ip
60 p.scapy_tun_sa = SecurityAssociation(
61 encryption_type, spi=p.vpp_tun_spi,
62 crypt_algo=p.crypt_algo,
64 auth_algo=p.auth_algo, auth_key=p.auth_key,
66 p.vpp_tun_sa = SecurityAssociation(
67 encryption_type, spi=p.scapy_tun_spi,
68 crypt_algo=p.crypt_algo,
70 auth_algo=p.auth_algo, auth_key=p.auth_key,
74 class TemplateIpsec4TunIfEsp(TemplateIpsec):
75 """ IPsec tunnel interface tests """
81 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
84 def tearDownClass(cls):
85 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
88 super(TemplateIpsec4TunIfEsp, self).setUp()
90 self.tun_if = self.pg0
94 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
95 p.scapy_tun_spi, p.crypt_algo_vpp_id,
96 p.crypt_key, p.crypt_key,
97 p.auth_algo_vpp_id, p.auth_key,
99 p.tun_if.add_vpp_config()
101 p.tun_if.config_ip4()
102 p.tun_if.config_ip6()
103 config_tun_params(p, self.encryption_type, p.tun_if)
105 r = VppIpRoute(self, p.remote_tun_if_host, 32,
106 [VppRoutePath(p.tun_if.remote_ip4,
109 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
110 [VppRoutePath(p.tun_if.remote_ip6,
112 proto=DpoProto.DPO_PROTO_IP6)])
116 super(TemplateIpsec4TunIfEsp, self).tearDown()
119 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
120 """ IPsec UDP tunnel interface tests """
122 tun4_encrypt_node_name = "esp4-encrypt-tun"
123 tun4_decrypt_node_name = "esp4-decrypt-tun"
124 encryption_type = ESP
128 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
131 def tearDownClass(cls):
132 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
135 super(TemplateIpsec4TunIfEspUdp, self).setUp()
137 self.tun_if = self.pg0
140 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
141 IPSEC_API_SAD_FLAG_UDP_ENCAP)
142 p.nat_header = UDP(sport=5454, dport=4500)
144 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
145 p.scapy_tun_spi, p.crypt_algo_vpp_id,
146 p.crypt_key, p.crypt_key,
147 p.auth_algo_vpp_id, p.auth_key,
148 p.auth_key, udp_encap=True)
149 p.tun_if.add_vpp_config()
151 p.tun_if.config_ip4()
152 p.tun_if.config_ip6()
153 config_tun_params(p, self.encryption_type, p.tun_if)
155 r = VppIpRoute(self, p.remote_tun_if_host, 32,
156 [VppRoutePath(p.tun_if.remote_ip4,
159 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
160 [VppRoutePath(p.tun_if.remote_ip6,
162 proto=DpoProto.DPO_PROTO_IP6)])
166 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
169 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
170 """ Ipsec ESP - TUN tests """
171 tun4_encrypt_node_name = "esp4-encrypt-tun"
172 tun4_decrypt_node_name = "esp4-decrypt-tun"
174 def test_tun_basic64(self):
175 """ ipsec 6o4 tunnel basic test """
176 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
178 self.verify_tun_64(self.params[socket.AF_INET], count=1)
180 def test_tun_burst64(self):
181 """ ipsec 6o4 tunnel basic test """
182 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
184 self.verify_tun_64(self.params[socket.AF_INET], count=257)
186 def test_tun_basic_frag44(self):
187 """ ipsec 4o4 tunnel frag basic test """
188 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
192 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
194 self.verify_tun_44(self.params[socket.AF_INET],
195 count=1, payload_size=1800, n_rx=2)
196 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
200 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
201 """ Ipsec ESP UDP tests """
203 tun4_input_node = "ipsec4-tun-input"
205 def test_keepalive(self):
206 """ IPSEC NAT Keepalive """
207 self.verify_keepalive(self.ipv4_params)
210 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
211 """ Ipsec ESP - TCP tests """
215 class TemplateIpsec6TunIfEsp(TemplateIpsec):
216 """ IPsec tunnel interface tests """
218 encryption_type = ESP
221 super(TemplateIpsec6TunIfEsp, self).setUp()
223 self.tun_if = self.pg0
226 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
227 p.scapy_tun_spi, p.crypt_algo_vpp_id,
228 p.crypt_key, p.crypt_key,
229 p.auth_algo_vpp_id, p.auth_key,
230 p.auth_key, is_ip6=True)
231 p.tun_if.add_vpp_config()
233 p.tun_if.config_ip6()
234 p.tun_if.config_ip4()
235 config_tun_params(p, self.encryption_type, p.tun_if)
237 r = VppIpRoute(self, p.remote_tun_if_host, 128,
238 [VppRoutePath(p.tun_if.remote_ip6,
240 proto=DpoProto.DPO_PROTO_IP6)])
242 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
243 [VppRoutePath(p.tun_if.remote_ip4,
248 super(TemplateIpsec6TunIfEsp, self).tearDown()
251 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
253 """ Ipsec ESP - TUN tests """
254 tun6_encrypt_node_name = "esp6-encrypt-tun"
255 tun6_decrypt_node_name = "esp6-decrypt-tun"
257 def test_tun_basic46(self):
258 """ ipsec 4o6 tunnel basic test """
259 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
260 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
262 def test_tun_burst46(self):
263 """ ipsec 4o6 tunnel burst test """
264 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
265 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
268 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
269 IpsecTun6HandoffTests):
270 """ Ipsec ESP 6 Handoff tests """
271 tun6_encrypt_node_name = "esp6-encrypt-tun"
272 tun6_decrypt_node_name = "esp6-decrypt-tun"
275 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
276 IpsecTun4HandoffTests):
277 """ Ipsec ESP 4 Handoff tests """
278 tun4_encrypt_node_name = "esp4-encrypt-tun"
279 tun4_decrypt_node_name = "esp4-decrypt-tun"
282 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
283 """ IPsec IPv4 Multi Tunnel interface """
285 encryption_type = ESP
286 tun4_encrypt_node_name = "esp4-encrypt-tun"
287 tun4_decrypt_node_name = "esp4-decrypt-tun"
290 super(TestIpsec4MultiTunIfEsp, self).setUp()
292 self.tun_if = self.pg0
294 self.multi_params = []
295 self.pg0.generate_remote_hosts(10)
296 self.pg0.configure_ipv4_neighbors()
299 p = copy.copy(self.ipv4_params)
301 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
302 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
303 p.scapy_tun_spi = p.scapy_tun_spi + ii
304 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
305 p.vpp_tun_spi = p.vpp_tun_spi + ii
307 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
308 p.scapy_tra_spi = p.scapy_tra_spi + ii
309 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
310 p.vpp_tra_spi = p.vpp_tra_spi + ii
311 p.tun_dst = self.pg0.remote_hosts[ii].ip4
313 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
316 p.crypt_key, p.crypt_key,
317 p.auth_algo_vpp_id, p.auth_key,
320 p.tun_if.add_vpp_config()
322 p.tun_if.config_ip4()
323 config_tun_params(p, self.encryption_type, p.tun_if)
324 self.multi_params.append(p)
326 VppIpRoute(self, p.remote_tun_if_host, 32,
327 [VppRoutePath(p.tun_if.remote_ip4,
328 0xffffffff)]).add_vpp_config()
331 super(TestIpsec4MultiTunIfEsp, self).tearDown()
333 def test_tun_44(self):
334 """Multiple IPSEC tunnel interfaces """
335 for p in self.multi_params:
336 self.verify_tun_44(p, count=127)
337 c = p.tun_if.get_rx_stats()
338 self.assertEqual(c['packets'], 127)
339 c = p.tun_if.get_tx_stats()
340 self.assertEqual(c['packets'], 127)
342 def test_tun_rr_44(self):
343 """ Round-robin packets acrros multiple interface """
345 for p in self.multi_params:
346 tx = tx + self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
347 src=p.remote_tun_if_host,
348 dst=self.pg1.remote_ip4)
349 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
351 for rx, p in zip(rxs, self.multi_params):
352 self.verify_decrypted(p, [rx])
355 for p in self.multi_params:
356 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
357 dst=p.remote_tun_if_host)
358 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
360 for rx, p in zip(rxs, self.multi_params):
361 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
364 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
365 """ IPsec IPv4 Tunnel interface all Algos """
367 encryption_type = ESP
368 tun4_encrypt_node_name = "esp4-encrypt-tun"
369 tun4_decrypt_node_name = "esp4-decrypt-tun"
371 def config_network(self, p):
373 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
376 p.crypt_key, p.crypt_key,
377 p.auth_algo_vpp_id, p.auth_key,
380 p.tun_if.add_vpp_config()
382 p.tun_if.config_ip4()
383 config_tun_params(p, self.encryption_type, p.tun_if)
384 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
385 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
387 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
388 [VppRoutePath(p.tun_if.remote_ip4,
390 p.route.add_vpp_config()
392 def unconfig_network(self, p):
393 p.tun_if.unconfig_ip4()
394 p.tun_if.remove_vpp_config()
395 p.route.remove_vpp_config()
398 super(TestIpsec4TunIfEspAll, self).setUp()
400 self.tun_if = self.pg0
403 super(TestIpsec4TunIfEspAll, self).tearDown()
407 # change the key and the SPI
409 p.crypt_key = b'X' + p.crypt_key[1:]
411 p.scapy_tun_sa_id += 1
414 p.tun_if.local_spi = p.vpp_tun_spi
415 p.tun_if.remote_spi = p.scapy_tun_spi
417 config_tun_params(p, self.encryption_type, p.tun_if)
419 p.tun_sa_in = VppIpsecSA(self,
426 self.vpp_esp_protocol,
429 p.tun_sa_out = VppIpsecSA(self,
436 self.vpp_esp_protocol,
439 p.tun_sa_in.add_vpp_config()
440 p.tun_sa_out.add_vpp_config()
442 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
443 sa_id=p.tun_sa_in.id,
445 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
446 sa_id=p.tun_sa_out.id,
448 self.logger.info(self.vapi.cli("sh ipsec sa"))
450 def test_tun_44(self):
451 """IPSEC tunnel all algos """
453 # foreach VPP crypto engine
454 engines = ["ia32", "ipsecmb", "openssl"]
456 # foreach crypto algorithm
457 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
458 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
459 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
460 IPSEC_API_INTEG_ALG_NONE),
461 'scapy-crypto': "AES-GCM",
462 'scapy-integ': "NULL",
463 'key': b"JPjyOWBeVEQiMe7h",
465 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
466 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
467 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
468 IPSEC_API_INTEG_ALG_NONE),
469 'scapy-crypto': "AES-GCM",
470 'scapy-integ': "NULL",
471 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
473 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
474 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
475 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
476 IPSEC_API_INTEG_ALG_NONE),
477 'scapy-crypto': "AES-GCM",
478 'scapy-integ': "NULL",
479 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
481 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
482 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
483 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
484 IPSEC_API_INTEG_ALG_SHA1_96),
485 'scapy-crypto': "AES-CBC",
486 'scapy-integ': "HMAC-SHA1-96",
488 'key': b"JPjyOWBeVEQiMe7h"},
489 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
490 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
491 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
492 IPSEC_API_INTEG_ALG_SHA1_96),
493 'scapy-crypto': "AES-CBC",
494 'scapy-integ': "HMAC-SHA1-96",
496 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
497 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
498 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
499 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
500 IPSEC_API_INTEG_ALG_SHA1_96),
501 'scapy-crypto': "AES-CBC",
502 'scapy-integ': "HMAC-SHA1-96",
504 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
505 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
506 IPSEC_API_CRYPTO_ALG_NONE),
507 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
508 IPSEC_API_INTEG_ALG_SHA1_96),
509 'scapy-crypto': "NULL",
510 'scapy-integ': "HMAC-SHA1-96",
512 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
514 for engine in engines:
515 self.vapi.cli("set crypto handler all %s" % engine)
518 # loop through each of the algorithms
521 # with self.subTest(algo=algo['scapy']):
523 p = copy.copy(self.ipv4_params)
524 p.auth_algo_vpp_id = algo['vpp-integ']
525 p.crypt_algo_vpp_id = algo['vpp-crypto']
526 p.crypt_algo = algo['scapy-crypto']
527 p.auth_algo = algo['scapy-integ']
528 p.crypt_key = algo['key']
529 p.salt = algo['salt']
531 self.config_network(p)
533 self.verify_tun_44(p, count=127)
534 c = p.tun_if.get_rx_stats()
535 self.assertEqual(c['packets'], 127)
536 c = p.tun_if.get_tx_stats()
537 self.assertEqual(c['packets'], 127)
543 self.verify_tun_44(p, count=127)
545 self.unconfig_network(p)
546 p.tun_sa_out.remove_vpp_config()
547 p.tun_sa_in.remove_vpp_config()
550 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
551 """ IPsec IPv4 Tunnel interface all Algos """
553 encryption_type = ESP
554 tun4_encrypt_node_name = "esp4-encrypt-tun"
555 tun4_decrypt_node_name = "esp4-decrypt-tun"
557 def config_network(self, p):
559 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
560 IPSEC_API_INTEG_ALG_NONE)
564 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
565 IPSEC_API_CRYPTO_ALG_NONE)
566 p.crypt_algo = 'NULL'
569 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
572 p.crypt_key, p.crypt_key,
573 p.auth_algo_vpp_id, p.auth_key,
576 p.tun_if.add_vpp_config()
578 p.tun_if.config_ip4()
579 config_tun_params(p, self.encryption_type, p.tun_if)
580 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
581 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
583 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
584 [VppRoutePath(p.tun_if.remote_ip4,
586 p.route.add_vpp_config()
588 def unconfig_network(self, p):
589 p.tun_if.unconfig_ip4()
590 p.tun_if.remove_vpp_config()
591 p.route.remove_vpp_config()
594 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
596 self.tun_if = self.pg0
599 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
601 def test_tun_44(self):
604 self.config_network(p)
606 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
607 dst=p.remote_tun_if_host)
608 self.send_and_assert_no_replies(self.pg1, tx)
610 self.unconfig_network(p)
613 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
614 """ IPsec IPv6 Multi Tunnel interface """
616 encryption_type = ESP
617 tun6_encrypt_node_name = "esp6-encrypt-tun"
618 tun6_decrypt_node_name = "esp6-decrypt-tun"
621 super(TestIpsec6MultiTunIfEsp, self).setUp()
623 self.tun_if = self.pg0
625 self.multi_params = []
626 self.pg0.generate_remote_hosts(10)
627 self.pg0.configure_ipv6_neighbors()
630 p = copy.copy(self.ipv6_params)
632 p.remote_tun_if_host = "1111::%d" % (ii + 1)
633 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
634 p.scapy_tun_spi = p.scapy_tun_spi + ii
635 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
636 p.vpp_tun_spi = p.vpp_tun_spi + ii
638 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
639 p.scapy_tra_spi = p.scapy_tra_spi + ii
640 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
641 p.vpp_tra_spi = p.vpp_tra_spi + ii
643 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
646 p.crypt_key, p.crypt_key,
647 p.auth_algo_vpp_id, p.auth_key,
648 p.auth_key, is_ip6=True,
649 dst=self.pg0.remote_hosts[ii].ip6)
650 p.tun_if.add_vpp_config()
652 p.tun_if.config_ip6()
653 config_tun_params(p, self.encryption_type, p.tun_if)
654 self.multi_params.append(p)
656 r = VppIpRoute(self, p.remote_tun_if_host, 128,
657 [VppRoutePath(p.tun_if.remote_ip6,
659 proto=DpoProto.DPO_PROTO_IP6)])
663 super(TestIpsec6MultiTunIfEsp, self).tearDown()
665 def test_tun_66(self):
666 """Multiple IPSEC tunnel interfaces """
667 for p in self.multi_params:
668 self.verify_tun_66(p, count=127)
669 c = p.tun_if.get_rx_stats()
670 self.assertEqual(c['packets'], 127)
671 c = p.tun_if.get_tx_stats()
672 self.assertEqual(c['packets'], 127)
675 class TestIpsecGreTebIfEsp(TemplateIpsec,
677 """ Ipsec GRE TEB ESP - TUN tests """
678 tun4_encrypt_node_name = "esp4-encrypt-tun"
679 tun4_decrypt_node_name = "esp4-decrypt-tun"
680 encryption_type = ESP
681 omac = "00:11:22:33:44:55"
683 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
685 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
686 sa.encrypt(IP(src=self.pg0.remote_ip4,
687 dst=self.pg0.local_ip4) /
689 Ether(dst=self.omac) /
690 IP(src="1.1.1.1", dst="1.1.1.2") /
691 UDP(sport=1144, dport=2233) /
692 Raw(b'X' * payload_size))
693 for i in range(count)]
695 def gen_pkts(self, sw_intf, src, dst, count=1,
697 return [Ether(dst=self.omac) /
698 IP(src="1.1.1.1", dst="1.1.1.2") /
699 UDP(sport=1144, dport=2233) /
700 Raw(b'X' * payload_size)
701 for i in range(count)]
703 def verify_decrypted(self, p, rxs):
705 self.assert_equal(rx[Ether].dst, self.omac)
706 self.assert_equal(rx[IP].dst, "1.1.1.2")
708 def verify_encrypted(self, p, sa, rxs):
711 pkt = sa.decrypt(rx[IP])
712 if not pkt.haslayer(IP):
713 pkt = IP(pkt[Raw].load)
714 self.assert_packet_checksums_valid(pkt)
715 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
716 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
717 self.assertTrue(pkt.haslayer(GRE))
719 self.assertEqual(e[Ether].dst, self.omac)
720 self.assertEqual(e[IP].dst, "1.1.1.2")
721 except (IndexError, AssertionError):
722 self.logger.debug(ppp("Unexpected packet:", rx))
724 self.logger.debug(ppp("Decrypted packet:", pkt))
730 super(TestIpsecGreTebIfEsp, self).setUp()
732 self.tun_if = self.pg0
736 bd1 = VppBridgeDomain(self, 1)
739 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
740 p.auth_algo_vpp_id, p.auth_key,
741 p.crypt_algo_vpp_id, p.crypt_key,
742 self.vpp_esp_protocol,
745 p.tun_sa_out.add_vpp_config()
747 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
748 p.auth_algo_vpp_id, p.auth_key,
749 p.crypt_algo_vpp_id, p.crypt_key,
750 self.vpp_esp_protocol,
753 p.tun_sa_in.add_vpp_config()
755 p.tun_if = VppGreInterface(self,
758 type=(VppEnum.vl_api_gre_tunnel_type_t.
759 GRE_API_TUNNEL_TYPE_TEB))
760 p.tun_if.add_vpp_config()
762 p.tun_protect = VppIpsecTunProtect(self,
767 p.tun_protect.add_vpp_config()
770 p.tun_if.config_ip4()
771 config_tun_params(p, self.encryption_type, p.tun_if)
773 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
774 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
776 self.vapi.cli("clear ipsec sa")
780 p.tun_if.unconfig_ip4()
781 super(TestIpsecGreTebIfEsp, self).tearDown()
784 class TestIpsecGreTebIfEspTra(TemplateIpsec,
786 """ Ipsec GRE TEB ESP - Tra tests """
787 tun4_encrypt_node_name = "esp4-encrypt-tun"
788 tun4_decrypt_node_name = "esp4-decrypt-tun"
789 encryption_type = ESP
790 omac = "00:11:22:33:44:55"
792 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
794 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
795 sa.encrypt(IP(src=self.pg0.remote_ip4,
796 dst=self.pg0.local_ip4) /
798 Ether(dst=self.omac) /
799 IP(src="1.1.1.1", dst="1.1.1.2") /
800 UDP(sport=1144, dport=2233) /
801 Raw(b'X' * payload_size))
802 for i in range(count)]
804 def gen_pkts(self, sw_intf, src, dst, count=1,
806 return [Ether(dst=self.omac) /
807 IP(src="1.1.1.1", dst="1.1.1.2") /
808 UDP(sport=1144, dport=2233) /
809 Raw(b'X' * payload_size)
810 for i in range(count)]
812 def verify_decrypted(self, p, rxs):
814 self.assert_equal(rx[Ether].dst, self.omac)
815 self.assert_equal(rx[IP].dst, "1.1.1.2")
817 def verify_encrypted(self, p, sa, rxs):
820 pkt = sa.decrypt(rx[IP])
821 if not pkt.haslayer(IP):
822 pkt = IP(pkt[Raw].load)
823 self.assert_packet_checksums_valid(pkt)
824 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
825 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
826 self.assertTrue(pkt.haslayer(GRE))
828 self.assertEqual(e[Ether].dst, self.omac)
829 self.assertEqual(e[IP].dst, "1.1.1.2")
830 except (IndexError, AssertionError):
831 self.logger.debug(ppp("Unexpected packet:", rx))
833 self.logger.debug(ppp("Decrypted packet:", pkt))
839 super(TestIpsecGreTebIfEspTra, self).setUp()
841 self.tun_if = self.pg0
845 bd1 = VppBridgeDomain(self, 1)
848 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
849 p.auth_algo_vpp_id, p.auth_key,
850 p.crypt_algo_vpp_id, p.crypt_key,
851 self.vpp_esp_protocol)
852 p.tun_sa_out.add_vpp_config()
854 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
855 p.auth_algo_vpp_id, p.auth_key,
856 p.crypt_algo_vpp_id, p.crypt_key,
857 self.vpp_esp_protocol)
858 p.tun_sa_in.add_vpp_config()
860 p.tun_if = VppGreInterface(self,
863 type=(VppEnum.vl_api_gre_tunnel_type_t.
864 GRE_API_TUNNEL_TYPE_TEB))
865 p.tun_if.add_vpp_config()
867 p.tun_protect = VppIpsecTunProtect(self,
872 p.tun_protect.add_vpp_config()
875 p.tun_if.config_ip4()
876 config_tra_params(p, self.encryption_type, p.tun_if)
878 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
879 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
881 self.vapi.cli("clear ipsec sa")
885 p.tun_if.unconfig_ip4()
886 super(TestIpsecGreTebIfEspTra, self).tearDown()
889 class TestIpsecGreIfEsp(TemplateIpsec,
891 """ Ipsec GRE ESP - TUN tests """
892 tun4_encrypt_node_name = "esp4-encrypt-tun"
893 tun4_decrypt_node_name = "esp4-decrypt-tun"
894 encryption_type = ESP
896 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
898 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
899 sa.encrypt(IP(src=self.pg0.remote_ip4,
900 dst=self.pg0.local_ip4) /
902 IP(src=self.pg1.local_ip4,
903 dst=self.pg1.remote_ip4) /
904 UDP(sport=1144, dport=2233) /
905 Raw(b'X' * payload_size))
906 for i in range(count)]
908 def gen_pkts(self, sw_intf, src, dst, count=1,
910 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
911 IP(src="1.1.1.1", dst="1.1.1.2") /
912 UDP(sport=1144, dport=2233) /
913 Raw(b'X' * payload_size)
914 for i in range(count)]
916 def verify_decrypted(self, p, rxs):
918 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
919 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
921 def verify_encrypted(self, p, sa, rxs):
924 pkt = sa.decrypt(rx[IP])
925 if not pkt.haslayer(IP):
926 pkt = IP(pkt[Raw].load)
927 self.assert_packet_checksums_valid(pkt)
928 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
929 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
930 self.assertTrue(pkt.haslayer(GRE))
932 self.assertEqual(e[IP].dst, "1.1.1.2")
933 except (IndexError, AssertionError):
934 self.logger.debug(ppp("Unexpected packet:", rx))
936 self.logger.debug(ppp("Decrypted packet:", pkt))
942 super(TestIpsecGreIfEsp, self).setUp()
944 self.tun_if = self.pg0
948 bd1 = VppBridgeDomain(self, 1)
951 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
952 p.auth_algo_vpp_id, p.auth_key,
953 p.crypt_algo_vpp_id, p.crypt_key,
954 self.vpp_esp_protocol,
957 p.tun_sa_out.add_vpp_config()
959 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
960 p.auth_algo_vpp_id, p.auth_key,
961 p.crypt_algo_vpp_id, p.crypt_key,
962 self.vpp_esp_protocol,
965 p.tun_sa_in.add_vpp_config()
967 p.tun_if = VppGreInterface(self,
970 p.tun_if.add_vpp_config()
972 p.tun_protect = VppIpsecTunProtect(self,
976 p.tun_protect.add_vpp_config()
979 p.tun_if.config_ip4()
980 config_tun_params(p, self.encryption_type, p.tun_if)
982 VppIpRoute(self, "1.1.1.2", 32,
983 [VppRoutePath(p.tun_if.remote_ip4,
984 0xffffffff)]).add_vpp_config()
988 p.tun_if.unconfig_ip4()
989 super(TestIpsecGreIfEsp, self).tearDown()
992 class TestIpsecGreIfEspTra(TemplateIpsec,
994 """ Ipsec GRE ESP - TRA tests """
995 tun4_encrypt_node_name = "esp4-encrypt-tun"
996 tun4_decrypt_node_name = "esp4-decrypt-tun"
997 encryption_type = ESP
999 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1001 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1002 sa.encrypt(IP(src=self.pg0.remote_ip4,
1003 dst=self.pg0.local_ip4) /
1005 IP(src=self.pg1.local_ip4,
1006 dst=self.pg1.remote_ip4) /
1007 UDP(sport=1144, dport=2233) /
1008 Raw(b'X' * payload_size))
1009 for i in range(count)]
1011 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1013 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1014 sa.encrypt(IP(src=self.pg0.remote_ip4,
1015 dst=self.pg0.local_ip4) /
1017 UDP(sport=1144, dport=2233) /
1018 Raw(b'X' * payload_size))
1019 for i in range(count)]
1021 def gen_pkts(self, sw_intf, src, dst, count=1,
1023 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1024 IP(src="1.1.1.1", dst="1.1.1.2") /
1025 UDP(sport=1144, dport=2233) /
1026 Raw(b'X' * payload_size)
1027 for i in range(count)]
1029 def verify_decrypted(self, p, rxs):
1031 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1032 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1034 def verify_encrypted(self, p, sa, rxs):
1037 pkt = sa.decrypt(rx[IP])
1038 if not pkt.haslayer(IP):
1039 pkt = IP(pkt[Raw].load)
1040 self.assert_packet_checksums_valid(pkt)
1041 self.assertTrue(pkt.haslayer(GRE))
1043 self.assertEqual(e[IP].dst, "1.1.1.2")
1044 except (IndexError, AssertionError):
1045 self.logger.debug(ppp("Unexpected packet:", rx))
1047 self.logger.debug(ppp("Decrypted packet:", pkt))
1053 super(TestIpsecGreIfEspTra, self).setUp()
1055 self.tun_if = self.pg0
1057 p = self.ipv4_params
1059 bd1 = VppBridgeDomain(self, 1)
1060 bd1.add_vpp_config()
1062 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1063 p.auth_algo_vpp_id, p.auth_key,
1064 p.crypt_algo_vpp_id, p.crypt_key,
1065 self.vpp_esp_protocol)
1066 p.tun_sa_out.add_vpp_config()
1068 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1069 p.auth_algo_vpp_id, p.auth_key,
1070 p.crypt_algo_vpp_id, p.crypt_key,
1071 self.vpp_esp_protocol)
1072 p.tun_sa_in.add_vpp_config()
1074 p.tun_if = VppGreInterface(self,
1076 self.pg0.remote_ip4)
1077 p.tun_if.add_vpp_config()
1079 p.tun_protect = VppIpsecTunProtect(self,
1083 p.tun_protect.add_vpp_config()
1086 p.tun_if.config_ip4()
1087 config_tra_params(p, self.encryption_type, p.tun_if)
1089 VppIpRoute(self, "1.1.1.2", 32,
1090 [VppRoutePath(p.tun_if.remote_ip4,
1091 0xffffffff)]).add_vpp_config()
1094 p = self.ipv4_params
1095 p.tun_if.unconfig_ip4()
1096 super(TestIpsecGreIfEspTra, self).tearDown()
1098 def test_gre_non_ip(self):
1099 p = self.ipv4_params
1100 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1101 src=p.remote_tun_if_host,
1102 dst=self.pg1.remote_ip6)
1103 self.send_and_assert_no_replies(self.tun_if, tx)
1104 node_name = ('/err/%s/unsupported payload' %
1105 self.tun4_decrypt_node_name)
1106 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1109 class TestIpsecGre6IfEspTra(TemplateIpsec,
1111 """ Ipsec GRE ESP - TRA tests """
1112 tun6_encrypt_node_name = "esp6-encrypt-tun"
1113 tun6_decrypt_node_name = "esp6-decrypt-tun"
1114 encryption_type = ESP
1116 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1118 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1119 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1120 dst=self.pg0.local_ip6) /
1122 IPv6(src=self.pg1.local_ip6,
1123 dst=self.pg1.remote_ip6) /
1124 UDP(sport=1144, dport=2233) /
1125 Raw(b'X' * payload_size))
1126 for i in range(count)]
1128 def gen_pkts6(self, sw_intf, src, dst, count=1,
1130 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1131 IPv6(src="1::1", dst="1::2") /
1132 UDP(sport=1144, dport=2233) /
1133 Raw(b'X' * payload_size)
1134 for i in range(count)]
1136 def verify_decrypted6(self, p, rxs):
1138 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1139 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1141 def verify_encrypted6(self, p, sa, rxs):
1144 pkt = sa.decrypt(rx[IPv6])
1145 if not pkt.haslayer(IPv6):
1146 pkt = IPv6(pkt[Raw].load)
1147 self.assert_packet_checksums_valid(pkt)
1148 self.assertTrue(pkt.haslayer(GRE))
1150 self.assertEqual(e[IPv6].dst, "1::2")
1151 except (IndexError, AssertionError):
1152 self.logger.debug(ppp("Unexpected packet:", rx))
1154 self.logger.debug(ppp("Decrypted packet:", pkt))
1160 super(TestIpsecGre6IfEspTra, self).setUp()
1162 self.tun_if = self.pg0
1164 p = self.ipv6_params
1166 bd1 = VppBridgeDomain(self, 1)
1167 bd1.add_vpp_config()
1169 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1170 p.auth_algo_vpp_id, p.auth_key,
1171 p.crypt_algo_vpp_id, p.crypt_key,
1172 self.vpp_esp_protocol)
1173 p.tun_sa_out.add_vpp_config()
1175 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1176 p.auth_algo_vpp_id, p.auth_key,
1177 p.crypt_algo_vpp_id, p.crypt_key,
1178 self.vpp_esp_protocol)
1179 p.tun_sa_in.add_vpp_config()
1181 p.tun_if = VppGreInterface(self,
1183 self.pg0.remote_ip6)
1184 p.tun_if.add_vpp_config()
1186 p.tun_protect = VppIpsecTunProtect(self,
1190 p.tun_protect.add_vpp_config()
1193 p.tun_if.config_ip6()
1194 config_tra_params(p, self.encryption_type, p.tun_if)
1196 r = VppIpRoute(self, "1::2", 128,
1197 [VppRoutePath(p.tun_if.remote_ip6,
1199 proto=DpoProto.DPO_PROTO_IP6)])
1203 p = self.ipv6_params
1204 p.tun_if.unconfig_ip6()
1205 super(TestIpsecGre6IfEspTra, self).tearDown()
1208 class TemplateIpsec4TunProtect(object):
1209 """ IPsec IPv4 Tunnel protect """
1211 encryption_type = ESP
1212 tun4_encrypt_node_name = "esp4-encrypt-tun"
1213 tun4_decrypt_node_name = "esp4-decrypt-tun"
1214 tun4_input_node = "ipsec4-tun-input"
1216 def config_sa_tra(self, p):
1217 config_tun_params(p, self.encryption_type, p.tun_if)
1219 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1220 p.auth_algo_vpp_id, p.auth_key,
1221 p.crypt_algo_vpp_id, p.crypt_key,
1222 self.vpp_esp_protocol,
1224 p.tun_sa_out.add_vpp_config()
1226 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1227 p.auth_algo_vpp_id, p.auth_key,
1228 p.crypt_algo_vpp_id, p.crypt_key,
1229 self.vpp_esp_protocol,
1231 p.tun_sa_in.add_vpp_config()
1233 def config_sa_tun(self, p):
1234 config_tun_params(p, self.encryption_type, p.tun_if)
1236 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1237 p.auth_algo_vpp_id, p.auth_key,
1238 p.crypt_algo_vpp_id, p.crypt_key,
1239 self.vpp_esp_protocol,
1240 self.tun_if.local_addr[p.addr_type],
1241 self.tun_if.remote_addr[p.addr_type],
1243 p.tun_sa_out.add_vpp_config()
1245 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1246 p.auth_algo_vpp_id, p.auth_key,
1247 p.crypt_algo_vpp_id, p.crypt_key,
1248 self.vpp_esp_protocol,
1249 self.tun_if.remote_addr[p.addr_type],
1250 self.tun_if.local_addr[p.addr_type],
1252 p.tun_sa_in.add_vpp_config()
1254 def config_protect(self, p):
1255 p.tun_protect = VppIpsecTunProtect(self,
1259 p.tun_protect.add_vpp_config()
1261 def config_network(self, p):
1262 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1264 self.pg0.remote_ip4)
1265 p.tun_if.add_vpp_config()
1267 p.tun_if.config_ip4()
1268 p.tun_if.config_ip6()
1270 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1271 [VppRoutePath(p.tun_if.remote_ip4,
1273 p.route.add_vpp_config()
1274 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1275 [VppRoutePath(p.tun_if.remote_ip6,
1277 proto=DpoProto.DPO_PROTO_IP6)])
1280 def unconfig_network(self, p):
1281 p.route.remove_vpp_config()
1282 p.tun_if.remove_vpp_config()
1284 def unconfig_protect(self, p):
1285 p.tun_protect.remove_vpp_config()
1287 def unconfig_sa(self, p):
1288 p.tun_sa_out.remove_vpp_config()
1289 p.tun_sa_in.remove_vpp_config()
1292 class TestIpsec4TunProtect(TemplateIpsec,
1293 TemplateIpsec4TunProtect,
1295 """ IPsec IPv4 Tunnel protect - transport mode"""
1298 super(TestIpsec4TunProtect, self).setUp()
1300 self.tun_if = self.pg0
1303 super(TestIpsec4TunProtect, self).tearDown()
1305 def test_tun_44(self):
1306 """IPSEC tunnel protect"""
1308 p = self.ipv4_params
1310 self.config_network(p)
1311 self.config_sa_tra(p)
1312 self.config_protect(p)
1314 self.verify_tun_44(p, count=127)
1315 c = p.tun_if.get_rx_stats()
1316 self.assertEqual(c['packets'], 127)
1317 c = p.tun_if.get_tx_stats()
1318 self.assertEqual(c['packets'], 127)
1320 self.vapi.cli("clear ipsec sa")
1321 self.verify_tun_64(p, count=127)
1322 c = p.tun_if.get_rx_stats()
1323 self.assertEqual(c['packets'], 254)
1324 c = p.tun_if.get_tx_stats()
1325 self.assertEqual(c['packets'], 254)
1327 # rekey - create new SAs and update the tunnel protection
1329 np.crypt_key = b'X' + p.crypt_key[1:]
1330 np.scapy_tun_spi += 100
1331 np.scapy_tun_sa_id += 1
1332 np.vpp_tun_spi += 100
1333 np.vpp_tun_sa_id += 1
1334 np.tun_if.local_spi = p.vpp_tun_spi
1335 np.tun_if.remote_spi = p.scapy_tun_spi
1337 self.config_sa_tra(np)
1338 self.config_protect(np)
1341 self.verify_tun_44(np, count=127)
1342 c = p.tun_if.get_rx_stats()
1343 self.assertEqual(c['packets'], 381)
1344 c = p.tun_if.get_tx_stats()
1345 self.assertEqual(c['packets'], 381)
1348 self.unconfig_protect(np)
1349 self.unconfig_sa(np)
1350 self.unconfig_network(p)
1353 class TestIpsec4TunProtectUdp(TemplateIpsec,
1354 TemplateIpsec4TunProtect,
1356 """ IPsec IPv4 Tunnel protect - transport mode"""
1359 super(TestIpsec4TunProtectUdp, self).setUp()
1361 self.tun_if = self.pg0
1363 p = self.ipv4_params
1364 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1365 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1366 p.nat_header = UDP(sport=5454, dport=4500)
1367 self.config_network(p)
1368 self.config_sa_tra(p)
1369 self.config_protect(p)
1372 p = self.ipv4_params
1373 self.unconfig_protect(p)
1375 self.unconfig_network(p)
1376 super(TestIpsec4TunProtectUdp, self).tearDown()
1378 def test_tun_44(self):
1379 """IPSEC UDP tunnel protect"""
1381 p = self.ipv4_params
1383 self.verify_tun_44(p, count=127)
1384 c = p.tun_if.get_rx_stats()
1385 self.assertEqual(c['packets'], 127)
1386 c = p.tun_if.get_tx_stats()
1387 self.assertEqual(c['packets'], 127)
1389 def test_keepalive(self):
1390 """ IPSEC NAT Keepalive """
1391 self.verify_keepalive(self.ipv4_params)
1394 class TestIpsec4TunProtectTun(TemplateIpsec,
1395 TemplateIpsec4TunProtect,
1397 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1399 encryption_type = ESP
1400 tun4_encrypt_node_name = "esp4-encrypt-tun"
1401 tun4_decrypt_node_name = "esp4-decrypt-tun"
1404 super(TestIpsec4TunProtectTun, self).setUp()
1406 self.tun_if = self.pg0
1409 super(TestIpsec4TunProtectTun, self).tearDown()
1411 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1413 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1414 sa.encrypt(IP(src=sw_intf.remote_ip4,
1415 dst=sw_intf.local_ip4) /
1416 IP(src=src, dst=dst) /
1417 UDP(sport=1144, dport=2233) /
1418 Raw(b'X' * payload_size))
1419 for i in range(count)]
1421 def gen_pkts(self, sw_intf, src, dst, count=1,
1423 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1424 IP(src=src, dst=dst) /
1425 UDP(sport=1144, dport=2233) /
1426 Raw(b'X' * payload_size)
1427 for i in range(count)]
1429 def verify_decrypted(self, p, rxs):
1431 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1432 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1433 self.assert_packet_checksums_valid(rx)
1435 def verify_encrypted(self, p, sa, rxs):
1438 pkt = sa.decrypt(rx[IP])
1439 if not pkt.haslayer(IP):
1440 pkt = IP(pkt[Raw].load)
1441 self.assert_packet_checksums_valid(pkt)
1442 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1443 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1444 inner = pkt[IP].payload
1445 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1447 except (IndexError, AssertionError):
1448 self.logger.debug(ppp("Unexpected packet:", rx))
1450 self.logger.debug(ppp("Decrypted packet:", pkt))
1455 def test_tun_44(self):
1456 """IPSEC tunnel protect """
1458 p = self.ipv4_params
1460 self.config_network(p)
1461 self.config_sa_tun(p)
1462 self.config_protect(p)
1464 self.verify_tun_44(p, count=127)
1466 c = p.tun_if.get_rx_stats()
1467 self.assertEqual(c['packets'], 127)
1468 c = p.tun_if.get_tx_stats()
1469 self.assertEqual(c['packets'], 127)
1471 # rekey - create new SAs and update the tunnel protection
1473 np.crypt_key = b'X' + p.crypt_key[1:]
1474 np.scapy_tun_spi += 100
1475 np.scapy_tun_sa_id += 1
1476 np.vpp_tun_spi += 100
1477 np.vpp_tun_sa_id += 1
1478 np.tun_if.local_spi = p.vpp_tun_spi
1479 np.tun_if.remote_spi = p.scapy_tun_spi
1481 self.config_sa_tun(np)
1482 self.config_protect(np)
1485 self.verify_tun_44(np, count=127)
1486 c = p.tun_if.get_rx_stats()
1487 self.assertEqual(c['packets'], 254)
1488 c = p.tun_if.get_tx_stats()
1489 self.assertEqual(c['packets'], 254)
1492 self.unconfig_protect(np)
1493 self.unconfig_sa(np)
1494 self.unconfig_network(p)
1497 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1498 TemplateIpsec4TunProtect,
1500 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1502 encryption_type = ESP
1503 tun4_encrypt_node_name = "esp4-encrypt-tun"
1504 tun4_decrypt_node_name = "esp4-decrypt-tun"
1507 super(TestIpsec4TunProtectTunDrop, self).setUp()
1509 self.tun_if = self.pg0
1512 super(TestIpsec4TunProtectTunDrop, self).tearDown()
1514 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1516 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1517 sa.encrypt(IP(src=sw_intf.remote_ip4,
1519 IP(src=src, dst=dst) /
1520 UDP(sport=1144, dport=2233) /
1521 Raw(b'X' * payload_size))
1522 for i in range(count)]
1524 def test_tun_drop_44(self):
1525 """IPSEC tunnel protect bogus tunnel header """
1527 p = self.ipv4_params
1529 self.config_network(p)
1530 self.config_sa_tun(p)
1531 self.config_protect(p)
1533 tx = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1534 src=p.remote_tun_if_host,
1535 dst=self.pg1.remote_ip4,
1537 self.send_and_assert_no_replies(self.tun_if, tx)
1540 self.unconfig_protect(p)
1542 self.unconfig_network(p)
1545 class TemplateIpsec6TunProtect(object):
1546 """ IPsec IPv6 Tunnel protect """
1548 def config_sa_tra(self, p):
1549 config_tun_params(p, self.encryption_type, p.tun_if)
1551 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1552 p.auth_algo_vpp_id, p.auth_key,
1553 p.crypt_algo_vpp_id, p.crypt_key,
1554 self.vpp_esp_protocol)
1555 p.tun_sa_out.add_vpp_config()
1557 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1558 p.auth_algo_vpp_id, p.auth_key,
1559 p.crypt_algo_vpp_id, p.crypt_key,
1560 self.vpp_esp_protocol)
1561 p.tun_sa_in.add_vpp_config()
1563 def config_sa_tun(self, p):
1564 config_tun_params(p, self.encryption_type, p.tun_if)
1566 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1567 p.auth_algo_vpp_id, p.auth_key,
1568 p.crypt_algo_vpp_id, p.crypt_key,
1569 self.vpp_esp_protocol,
1570 self.tun_if.local_addr[p.addr_type],
1571 self.tun_if.remote_addr[p.addr_type])
1572 p.tun_sa_out.add_vpp_config()
1574 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1575 p.auth_algo_vpp_id, p.auth_key,
1576 p.crypt_algo_vpp_id, p.crypt_key,
1577 self.vpp_esp_protocol,
1578 self.tun_if.remote_addr[p.addr_type],
1579 self.tun_if.local_addr[p.addr_type])
1580 p.tun_sa_in.add_vpp_config()
1582 def config_protect(self, p):
1583 p.tun_protect = VppIpsecTunProtect(self,
1587 p.tun_protect.add_vpp_config()
1589 def config_network(self, p):
1590 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1592 self.pg0.remote_ip6)
1593 p.tun_if.add_vpp_config()
1595 p.tun_if.config_ip6()
1596 p.tun_if.config_ip4()
1598 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1599 [VppRoutePath(p.tun_if.remote_ip6,
1601 proto=DpoProto.DPO_PROTO_IP6)])
1602 p.route.add_vpp_config()
1603 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1604 [VppRoutePath(p.tun_if.remote_ip4,
1608 def unconfig_network(self, p):
1609 p.route.remove_vpp_config()
1610 p.tun_if.remove_vpp_config()
1612 def unconfig_protect(self, p):
1613 p.tun_protect.remove_vpp_config()
1615 def unconfig_sa(self, p):
1616 p.tun_sa_out.remove_vpp_config()
1617 p.tun_sa_in.remove_vpp_config()
1620 class TestIpsec6TunProtect(TemplateIpsec,
1621 TemplateIpsec6TunProtect,
1623 """ IPsec IPv6 Tunnel protect - transport mode"""
1625 encryption_type = ESP
1626 tun6_encrypt_node_name = "esp6-encrypt-tun"
1627 tun6_decrypt_node_name = "esp6-decrypt-tun"
1630 super(TestIpsec6TunProtect, self).setUp()
1632 self.tun_if = self.pg0
1635 super(TestIpsec6TunProtect, self).tearDown()
1637 def test_tun_66(self):
1638 """IPSEC tunnel protect 6o6"""
1640 p = self.ipv6_params
1642 self.config_network(p)
1643 self.config_sa_tra(p)
1644 self.config_protect(p)
1646 self.verify_tun_66(p, count=127)
1647 c = p.tun_if.get_rx_stats()
1648 self.assertEqual(c['packets'], 127)
1649 c = p.tun_if.get_tx_stats()
1650 self.assertEqual(c['packets'], 127)
1652 # rekey - create new SAs and update the tunnel protection
1654 np.crypt_key = b'X' + p.crypt_key[1:]
1655 np.scapy_tun_spi += 100
1656 np.scapy_tun_sa_id += 1
1657 np.vpp_tun_spi += 100
1658 np.vpp_tun_sa_id += 1
1659 np.tun_if.local_spi = p.vpp_tun_spi
1660 np.tun_if.remote_spi = p.scapy_tun_spi
1662 self.config_sa_tra(np)
1663 self.config_protect(np)
1666 self.verify_tun_66(np, count=127)
1667 c = p.tun_if.get_rx_stats()
1668 self.assertEqual(c['packets'], 254)
1669 c = p.tun_if.get_tx_stats()
1670 self.assertEqual(c['packets'], 254)
1672 # bounce the interface state
1673 p.tun_if.admin_down()
1674 self.verify_drop_tun_66(np, count=127)
1675 node = ('/err/ipsec6-tun-input/%s' %
1676 'ipsec packets received on disabled interface')
1677 self.assertEqual(127, self.statistics.get_err_counter(node))
1679 self.verify_tun_66(np, count=127)
1682 # 1) add two input SAs [old, new]
1683 # 2) swap output SA to [new]
1684 # 3) use only [new] input SA
1686 np3.crypt_key = b'Z' + p.crypt_key[1:]
1687 np3.scapy_tun_spi += 100
1688 np3.scapy_tun_sa_id += 1
1689 np3.vpp_tun_spi += 100
1690 np3.vpp_tun_sa_id += 1
1691 np3.tun_if.local_spi = p.vpp_tun_spi
1692 np3.tun_if.remote_spi = p.scapy_tun_spi
1694 self.config_sa_tra(np3)
1697 p.tun_protect.update_vpp_config(np.tun_sa_out,
1698 [np.tun_sa_in, np3.tun_sa_in])
1699 self.verify_tun_66(np, np, count=127)
1700 self.verify_tun_66(np3, np, count=127)
1703 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1704 [np.tun_sa_in, np3.tun_sa_in])
1705 self.verify_tun_66(np, np3, count=127)
1706 self.verify_tun_66(np3, np3, count=127)
1709 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1711 self.verify_tun_66(np3, np3, count=127)
1712 self.verify_drop_tun_66(np, count=127)
1714 c = p.tun_if.get_rx_stats()
1715 self.assertEqual(c['packets'], 127*9)
1716 c = p.tun_if.get_tx_stats()
1717 self.assertEqual(c['packets'], 127*8)
1718 self.unconfig_sa(np)
1721 self.unconfig_protect(np3)
1722 self.unconfig_sa(np3)
1723 self.unconfig_network(p)
1725 def test_tun_46(self):
1726 """IPSEC tunnel protect 4o6"""
1728 p = self.ipv6_params
1730 self.config_network(p)
1731 self.config_sa_tra(p)
1732 self.config_protect(p)
1734 self.verify_tun_46(p, count=127)
1735 c = p.tun_if.get_rx_stats()
1736 self.assertEqual(c['packets'], 127)
1737 c = p.tun_if.get_tx_stats()
1738 self.assertEqual(c['packets'], 127)
1741 self.unconfig_protect(p)
1743 self.unconfig_network(p)
1746 class TestIpsec6TunProtectTun(TemplateIpsec,
1747 TemplateIpsec6TunProtect,
1749 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1751 encryption_type = ESP
1752 tun6_encrypt_node_name = "esp6-encrypt-tun"
1753 tun6_decrypt_node_name = "esp6-decrypt-tun"
1756 super(TestIpsec6TunProtectTun, self).setUp()
1758 self.tun_if = self.pg0
1761 super(TestIpsec6TunProtectTun, self).tearDown()
1763 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1765 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1766 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1767 dst=sw_intf.local_ip6) /
1768 IPv6(src=src, dst=dst) /
1769 UDP(sport=1166, dport=2233) /
1770 Raw(b'X' * payload_size))
1771 for i in range(count)]
1773 def gen_pkts6(self, sw_intf, src, dst, count=1,
1775 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1776 IPv6(src=src, dst=dst) /
1777 UDP(sport=1166, dport=2233) /
1778 Raw(b'X' * payload_size)
1779 for i in range(count)]
1781 def verify_decrypted6(self, p, rxs):
1783 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1784 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1785 self.assert_packet_checksums_valid(rx)
1787 def verify_encrypted6(self, p, sa, rxs):
1790 pkt = sa.decrypt(rx[IPv6])
1791 if not pkt.haslayer(IPv6):
1792 pkt = IPv6(pkt[Raw].load)
1793 self.assert_packet_checksums_valid(pkt)
1794 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1795 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1796 inner = pkt[IPv6].payload
1797 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1799 except (IndexError, AssertionError):
1800 self.logger.debug(ppp("Unexpected packet:", rx))
1802 self.logger.debug(ppp("Decrypted packet:", pkt))
1807 def test_tun_66(self):
1808 """IPSEC tunnel protect """
1810 p = self.ipv6_params
1812 self.config_network(p)
1813 self.config_sa_tun(p)
1814 self.config_protect(p)
1816 self.verify_tun_66(p, count=127)
1818 c = p.tun_if.get_rx_stats()
1819 self.assertEqual(c['packets'], 127)
1820 c = p.tun_if.get_tx_stats()
1821 self.assertEqual(c['packets'], 127)
1823 # rekey - create new SAs and update the tunnel protection
1825 np.crypt_key = b'X' + p.crypt_key[1:]
1826 np.scapy_tun_spi += 100
1827 np.scapy_tun_sa_id += 1
1828 np.vpp_tun_spi += 100
1829 np.vpp_tun_sa_id += 1
1830 np.tun_if.local_spi = p.vpp_tun_spi
1831 np.tun_if.remote_spi = p.scapy_tun_spi
1833 self.config_sa_tun(np)
1834 self.config_protect(np)
1837 self.verify_tun_66(np, count=127)
1838 c = p.tun_if.get_rx_stats()
1839 self.assertEqual(c['packets'], 254)
1840 c = p.tun_if.get_tx_stats()
1841 self.assertEqual(c['packets'], 254)
1844 self.unconfig_protect(np)
1845 self.unconfig_sa(np)
1846 self.unconfig_network(p)
1849 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
1850 TemplateIpsec6TunProtect,
1852 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
1854 encryption_type = ESP
1855 tun6_encrypt_node_name = "esp6-encrypt-tun"
1856 tun6_decrypt_node_name = "esp6-decrypt-tun"
1859 super(TestIpsec6TunProtectTunDrop, self).setUp()
1861 self.tun_if = self.pg0
1864 super(TestIpsec6TunProtectTunDrop, self).tearDown()
1866 def gen_encrypt_pkts5(self, sa, sw_intf, src, dst, count=1,
1868 # the IP destination of the revelaed packet does not match
1869 # that assigned to the tunnel
1870 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1871 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1873 IPv6(src=src, dst=dst) /
1874 UDP(sport=1144, dport=2233) /
1875 Raw(b'X' * payload_size))
1876 for i in range(count)]
1878 def test_tun_drop_66(self):
1879 """IPSEC 6 tunnel protect bogus tunnel header """
1881 p = self.ipv6_params
1883 self.config_network(p)
1884 self.config_sa_tun(p)
1885 self.config_protect(p)
1887 tx = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1888 src=p.remote_tun_if_host,
1889 dst=self.pg1.remote_ip6,
1891 self.send_and_assert_no_replies(self.tun_if, tx)
1893 self.unconfig_protect(p)
1895 self.unconfig_network(p)
1898 if __name__ == '__main__':
1899 unittest.main(testRunner=VppTestRunner)