5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
21 from vpp_papi import VppEnum
24 def config_tun_params(p, encryption_type, tun_if):
25 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
26 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
27 IPSEC_API_SAD_FLAG_USE_ESN))
28 crypt_key = mk_scapy_crypt_key(p)
29 p.scapy_tun_sa = SecurityAssociation(
30 encryption_type, spi=p.vpp_tun_spi,
31 crypt_algo=p.crypt_algo,
33 auth_algo=p.auth_algo, auth_key=p.auth_key,
34 tunnel_header=ip_class_by_addr_type[p.addr_type](
37 nat_t_header=p.nat_header,
39 p.vpp_tun_sa = SecurityAssociation(
40 encryption_type, spi=p.scapy_tun_spi,
41 crypt_algo=p.crypt_algo,
43 auth_algo=p.auth_algo, auth_key=p.auth_key,
44 tunnel_header=ip_class_by_addr_type[p.addr_type](
47 nat_t_header=p.nat_header,
51 def config_tra_params(p, encryption_type, tun_if):
52 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
53 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
54 IPSEC_API_SAD_FLAG_USE_ESN))
55 crypt_key = mk_scapy_crypt_key(p)
56 p.scapy_tun_sa = SecurityAssociation(
57 encryption_type, spi=p.vpp_tun_spi,
58 crypt_algo=p.crypt_algo,
60 auth_algo=p.auth_algo, auth_key=p.auth_key,
62 p.vpp_tun_sa = SecurityAssociation(
63 encryption_type, spi=p.scapy_tun_spi,
64 crypt_algo=p.crypt_algo,
66 auth_algo=p.auth_algo, auth_key=p.auth_key,
70 class TemplateIpsec4TunIfEsp(TemplateIpsec):
71 """ IPsec tunnel interface tests """
77 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
80 def tearDownClass(cls):
81 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
84 super(TemplateIpsec4TunIfEsp, self).setUp()
86 self.tun_if = self.pg0
90 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
91 p.scapy_tun_spi, p.crypt_algo_vpp_id,
92 p.crypt_key, p.crypt_key,
93 p.auth_algo_vpp_id, p.auth_key,
95 p.tun_if.add_vpp_config()
99 config_tun_params(p, self.encryption_type, p.tun_if)
101 r = VppIpRoute(self, p.remote_tun_if_host, 32,
102 [VppRoutePath(p.tun_if.remote_ip4,
105 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106 [VppRoutePath(p.tun_if.remote_ip6,
108 proto=DpoProto.DPO_PROTO_IP6)])
112 super(TemplateIpsec4TunIfEsp, self).tearDown()
115 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
116 """ IPsec UDP tunnel interface tests """
118 tun4_encrypt_node_name = "esp4-encrypt-tun"
119 tun4_decrypt_node_name = "esp4-decrypt-tun"
120 encryption_type = ESP
124 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
127 def tearDownClass(cls):
128 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
131 super(TemplateIpsec4TunIfEspUdp, self).setUp()
133 self.tun_if = self.pg0
136 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
137 IPSEC_API_SAD_FLAG_UDP_ENCAP)
138 p.nat_header = UDP(sport=5454, dport=4500)
140 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
141 p.scapy_tun_spi, p.crypt_algo_vpp_id,
142 p.crypt_key, p.crypt_key,
143 p.auth_algo_vpp_id, p.auth_key,
144 p.auth_key, udp_encap=True)
145 p.tun_if.add_vpp_config()
147 p.tun_if.config_ip4()
148 p.tun_if.config_ip6()
149 config_tun_params(p, self.encryption_type, p.tun_if)
151 r = VppIpRoute(self, p.remote_tun_if_host, 32,
152 [VppRoutePath(p.tun_if.remote_ip4,
155 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
156 [VppRoutePath(p.tun_if.remote_ip6,
158 proto=DpoProto.DPO_PROTO_IP6)])
162 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
165 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
166 """ Ipsec ESP - TUN tests """
167 tun4_encrypt_node_name = "esp4-encrypt-tun"
168 tun4_decrypt_node_name = "esp4-decrypt-tun"
170 def test_tun_basic64(self):
171 """ ipsec 6o4 tunnel basic test """
172 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
174 self.verify_tun_64(self.params[socket.AF_INET], count=1)
176 def test_tun_burst64(self):
177 """ ipsec 6o4 tunnel basic test """
178 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
180 self.verify_tun_64(self.params[socket.AF_INET], count=257)
182 def test_tun_basic_frag44(self):
183 """ ipsec 4o4 tunnel frag basic test """
184 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
188 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
190 self.verify_tun_44(self.params[socket.AF_INET],
191 count=1, payload_size=1800, n_rx=2)
192 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
196 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
197 """ Ipsec ESP UDP tests """
199 tun4_input_node = "ipsec4-tun-input"
201 def test_keepalive(self):
202 """ IPSEC NAT Keepalive """
203 self.verify_keepalive(self.ipv4_params)
206 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
207 """ Ipsec ESP - TCP tests """
211 class TemplateIpsec6TunIfEsp(TemplateIpsec):
212 """ IPsec tunnel interface tests """
214 encryption_type = ESP
217 super(TemplateIpsec6TunIfEsp, self).setUp()
219 self.tun_if = self.pg0
222 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
223 p.scapy_tun_spi, p.crypt_algo_vpp_id,
224 p.crypt_key, p.crypt_key,
225 p.auth_algo_vpp_id, p.auth_key,
226 p.auth_key, is_ip6=True)
227 p.tun_if.add_vpp_config()
229 p.tun_if.config_ip6()
230 p.tun_if.config_ip4()
231 config_tun_params(p, self.encryption_type, p.tun_if)
233 r = VppIpRoute(self, p.remote_tun_if_host, 128,
234 [VppRoutePath(p.tun_if.remote_ip6,
236 proto=DpoProto.DPO_PROTO_IP6)])
238 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
239 [VppRoutePath(p.tun_if.remote_ip4,
244 super(TemplateIpsec6TunIfEsp, self).tearDown()
247 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
249 """ Ipsec ESP - TUN tests """
250 tun6_encrypt_node_name = "esp6-encrypt-tun"
251 tun6_decrypt_node_name = "esp6-decrypt-tun"
253 def test_tun_basic46(self):
254 """ ipsec 4o6 tunnel basic test """
255 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
256 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
258 def test_tun_burst46(self):
259 """ ipsec 4o6 tunnel burst test """
260 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
261 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
264 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
265 IpsecTun6HandoffTests):
266 """ Ipsec ESP 6 Handoff tests """
267 tun6_encrypt_node_name = "esp6-encrypt-tun"
268 tun6_decrypt_node_name = "esp6-decrypt-tun"
271 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
272 IpsecTun4HandoffTests):
273 """ Ipsec ESP 4 Handoff tests """
274 tun4_encrypt_node_name = "esp4-encrypt-tun"
275 tun4_decrypt_node_name = "esp4-decrypt-tun"
278 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
279 """ IPsec IPv4 Multi Tunnel interface """
281 encryption_type = ESP
282 tun4_encrypt_node_name = "esp4-encrypt-tun"
283 tun4_decrypt_node_name = "esp4-decrypt-tun"
286 super(TestIpsec4MultiTunIfEsp, self).setUp()
288 self.tun_if = self.pg0
290 self.multi_params = []
291 self.pg0.generate_remote_hosts(10)
292 self.pg0.configure_ipv4_neighbors()
295 p = copy.copy(self.ipv4_params)
297 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
298 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
299 p.scapy_tun_spi = p.scapy_tun_spi + ii
300 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
301 p.vpp_tun_spi = p.vpp_tun_spi + ii
303 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
304 p.scapy_tra_spi = p.scapy_tra_spi + ii
305 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
306 p.vpp_tra_spi = p.vpp_tra_spi + ii
307 p.tun_dst = self.pg0.remote_hosts[ii].ip4
309 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
312 p.crypt_key, p.crypt_key,
313 p.auth_algo_vpp_id, p.auth_key,
316 p.tun_if.add_vpp_config()
318 p.tun_if.config_ip4()
319 config_tun_params(p, self.encryption_type, p.tun_if)
320 self.multi_params.append(p)
322 VppIpRoute(self, p.remote_tun_if_host, 32,
323 [VppRoutePath(p.tun_if.remote_ip4,
324 0xffffffff)]).add_vpp_config()
327 super(TestIpsec4MultiTunIfEsp, self).tearDown()
329 def test_tun_44(self):
330 """Multiple IPSEC tunnel interfaces """
331 for p in self.multi_params:
332 self.verify_tun_44(p, count=127)
333 c = p.tun_if.get_rx_stats()
334 self.assertEqual(c['packets'], 127)
335 c = p.tun_if.get_tx_stats()
336 self.assertEqual(c['packets'], 127)
338 def test_tun_rr_44(self):
339 """ Round-robin packets acrros multiple interface """
341 for p in self.multi_params:
342 tx = tx + self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
343 src=p.remote_tun_if_host,
344 dst=self.pg1.remote_ip4)
345 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
347 for rx, p in zip(rxs, self.multi_params):
348 self.verify_decrypted(p, [rx])
351 for p in self.multi_params:
352 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
353 dst=p.remote_tun_if_host)
354 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
356 for rx, p in zip(rxs, self.multi_params):
357 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
360 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
361 """ IPsec IPv4 Tunnel interface all Algos """
363 encryption_type = ESP
364 tun4_encrypt_node_name = "esp4-encrypt-tun"
365 tun4_decrypt_node_name = "esp4-decrypt-tun"
367 def config_network(self, p):
369 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
372 p.crypt_key, p.crypt_key,
373 p.auth_algo_vpp_id, p.auth_key,
376 p.tun_if.add_vpp_config()
378 p.tun_if.config_ip4()
379 config_tun_params(p, self.encryption_type, p.tun_if)
380 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
381 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
383 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
384 [VppRoutePath(p.tun_if.remote_ip4,
386 p.route.add_vpp_config()
388 def unconfig_network(self, p):
389 p.tun_if.unconfig_ip4()
390 p.tun_if.remove_vpp_config()
391 p.route.remove_vpp_config()
394 super(TestIpsec4TunIfEspAll, self).setUp()
396 self.tun_if = self.pg0
399 super(TestIpsec4TunIfEspAll, self).tearDown()
403 # change the key and the SPI
405 p.crypt_key = b'X' + p.crypt_key[1:]
407 p.scapy_tun_sa_id += 1
410 p.tun_if.local_spi = p.vpp_tun_spi
411 p.tun_if.remote_spi = p.scapy_tun_spi
413 config_tun_params(p, self.encryption_type, p.tun_if)
415 p.tun_sa_in = VppIpsecSA(self,
422 self.vpp_esp_protocol,
425 p.tun_sa_out = VppIpsecSA(self,
432 self.vpp_esp_protocol,
435 p.tun_sa_in.add_vpp_config()
436 p.tun_sa_out.add_vpp_config()
438 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
439 sa_id=p.tun_sa_in.id,
441 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
442 sa_id=p.tun_sa_out.id,
444 self.logger.info(self.vapi.cli("sh ipsec sa"))
446 def test_tun_44(self):
447 """IPSEC tunnel all algos """
449 # foreach VPP crypto engine
450 engines = ["ia32", "ipsecmb", "openssl"]
452 # foreach crypto algorithm
453 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
454 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
455 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
456 IPSEC_API_INTEG_ALG_NONE),
457 'scapy-crypto': "AES-GCM",
458 'scapy-integ': "NULL",
459 'key': b"JPjyOWBeVEQiMe7h",
461 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
462 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
463 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
464 IPSEC_API_INTEG_ALG_NONE),
465 'scapy-crypto': "AES-GCM",
466 'scapy-integ': "NULL",
467 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
469 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
470 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
471 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
472 IPSEC_API_INTEG_ALG_NONE),
473 'scapy-crypto': "AES-GCM",
474 'scapy-integ': "NULL",
475 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
477 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
478 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
479 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
480 IPSEC_API_INTEG_ALG_SHA1_96),
481 'scapy-crypto': "AES-CBC",
482 'scapy-integ': "HMAC-SHA1-96",
484 'key': b"JPjyOWBeVEQiMe7h"},
485 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
486 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
487 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
488 IPSEC_API_INTEG_ALG_SHA1_96),
489 'scapy-crypto': "AES-CBC",
490 'scapy-integ': "HMAC-SHA1-96",
492 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
493 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
494 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
495 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
496 IPSEC_API_INTEG_ALG_SHA1_96),
497 'scapy-crypto': "AES-CBC",
498 'scapy-integ': "HMAC-SHA1-96",
500 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
501 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
502 IPSEC_API_CRYPTO_ALG_NONE),
503 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
504 IPSEC_API_INTEG_ALG_SHA1_96),
505 'scapy-crypto': "NULL",
506 'scapy-integ': "HMAC-SHA1-96",
508 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
510 for engine in engines:
511 self.vapi.cli("set crypto handler all %s" % engine)
514 # loop through each of the algorithms
517 # with self.subTest(algo=algo['scapy']):
519 p = copy.copy(self.ipv4_params)
520 p.auth_algo_vpp_id = algo['vpp-integ']
521 p.crypt_algo_vpp_id = algo['vpp-crypto']
522 p.crypt_algo = algo['scapy-crypto']
523 p.auth_algo = algo['scapy-integ']
524 p.crypt_key = algo['key']
525 p.salt = algo['salt']
527 self.config_network(p)
529 self.verify_tun_44(p, count=127)
530 c = p.tun_if.get_rx_stats()
531 self.assertEqual(c['packets'], 127)
532 c = p.tun_if.get_tx_stats()
533 self.assertEqual(c['packets'], 127)
539 self.verify_tun_44(p, count=127)
541 self.unconfig_network(p)
542 p.tun_sa_out.remove_vpp_config()
543 p.tun_sa_in.remove_vpp_config()
546 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
547 """ IPsec IPv4 Tunnel interface all Algos """
549 encryption_type = ESP
550 tun4_encrypt_node_name = "esp4-encrypt-tun"
551 tun4_decrypt_node_name = "esp4-decrypt-tun"
553 def config_network(self, p):
555 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
556 IPSEC_API_INTEG_ALG_NONE)
560 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
561 IPSEC_API_CRYPTO_ALG_NONE)
562 p.crypt_algo = 'NULL'
565 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
568 p.crypt_key, p.crypt_key,
569 p.auth_algo_vpp_id, p.auth_key,
572 p.tun_if.add_vpp_config()
574 p.tun_if.config_ip4()
575 config_tun_params(p, self.encryption_type, p.tun_if)
576 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
577 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
579 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
580 [VppRoutePath(p.tun_if.remote_ip4,
582 p.route.add_vpp_config()
584 def unconfig_network(self, p):
585 p.tun_if.unconfig_ip4()
586 p.tun_if.remove_vpp_config()
587 p.route.remove_vpp_config()
590 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
592 self.tun_if = self.pg0
595 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
597 def test_tun_44(self):
600 self.config_network(p)
602 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
603 dst=p.remote_tun_if_host)
604 self.send_and_assert_no_replies(self.pg1, tx)
606 self.unconfig_network(p)
609 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
610 """ IPsec IPv6 Multi Tunnel interface """
612 encryption_type = ESP
613 tun6_encrypt_node_name = "esp6-encrypt-tun"
614 tun6_decrypt_node_name = "esp6-decrypt-tun"
617 super(TestIpsec6MultiTunIfEsp, self).setUp()
619 self.tun_if = self.pg0
621 self.multi_params = []
622 self.pg0.generate_remote_hosts(10)
623 self.pg0.configure_ipv6_neighbors()
626 p = copy.copy(self.ipv6_params)
628 p.remote_tun_if_host = "1111::%d" % (ii + 1)
629 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
630 p.scapy_tun_spi = p.scapy_tun_spi + ii
631 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
632 p.vpp_tun_spi = p.vpp_tun_spi + ii
634 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
635 p.scapy_tra_spi = p.scapy_tra_spi + ii
636 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
637 p.vpp_tra_spi = p.vpp_tra_spi + ii
639 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
642 p.crypt_key, p.crypt_key,
643 p.auth_algo_vpp_id, p.auth_key,
644 p.auth_key, is_ip6=True,
645 dst=self.pg0.remote_hosts[ii].ip6)
646 p.tun_if.add_vpp_config()
648 p.tun_if.config_ip6()
649 config_tun_params(p, self.encryption_type, p.tun_if)
650 self.multi_params.append(p)
652 r = VppIpRoute(self, p.remote_tun_if_host, 128,
653 [VppRoutePath(p.tun_if.remote_ip6,
655 proto=DpoProto.DPO_PROTO_IP6)])
659 super(TestIpsec6MultiTunIfEsp, self).tearDown()
661 def test_tun_66(self):
662 """Multiple IPSEC tunnel interfaces """
663 for p in self.multi_params:
664 self.verify_tun_66(p, count=127)
665 c = p.tun_if.get_rx_stats()
666 self.assertEqual(c['packets'], 127)
667 c = p.tun_if.get_tx_stats()
668 self.assertEqual(c['packets'], 127)
671 class TestIpsecGreTebIfEsp(TemplateIpsec,
673 """ Ipsec GRE TEB ESP - TUN tests """
674 tun4_encrypt_node_name = "esp4-encrypt-tun"
675 tun4_decrypt_node_name = "esp4-decrypt-tun"
676 encryption_type = ESP
677 omac = "00:11:22:33:44:55"
679 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
681 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
682 sa.encrypt(IP(src=self.pg0.remote_ip4,
683 dst=self.pg0.local_ip4) /
685 Ether(dst=self.omac) /
686 IP(src="1.1.1.1", dst="1.1.1.2") /
687 UDP(sport=1144, dport=2233) /
688 Raw(b'X' * payload_size))
689 for i in range(count)]
691 def gen_pkts(self, sw_intf, src, dst, count=1,
693 return [Ether(dst=self.omac) /
694 IP(src="1.1.1.1", dst="1.1.1.2") /
695 UDP(sport=1144, dport=2233) /
696 Raw(b'X' * payload_size)
697 for i in range(count)]
699 def verify_decrypted(self, p, rxs):
701 self.assert_equal(rx[Ether].dst, self.omac)
702 self.assert_equal(rx[IP].dst, "1.1.1.2")
704 def verify_encrypted(self, p, sa, rxs):
707 pkt = sa.decrypt(rx[IP])
708 if not pkt.haslayer(IP):
709 pkt = IP(pkt[Raw].load)
710 self.assert_packet_checksums_valid(pkt)
711 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
712 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
713 self.assertTrue(pkt.haslayer(GRE))
715 self.assertEqual(e[Ether].dst, self.omac)
716 self.assertEqual(e[IP].dst, "1.1.1.2")
717 except (IndexError, AssertionError):
718 self.logger.debug(ppp("Unexpected packet:", rx))
720 self.logger.debug(ppp("Decrypted packet:", pkt))
726 super(TestIpsecGreTebIfEsp, self).setUp()
728 self.tun_if = self.pg0
732 bd1 = VppBridgeDomain(self, 1)
735 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
736 p.auth_algo_vpp_id, p.auth_key,
737 p.crypt_algo_vpp_id, p.crypt_key,
738 self.vpp_esp_protocol,
741 p.tun_sa_out.add_vpp_config()
743 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
744 p.auth_algo_vpp_id, p.auth_key,
745 p.crypt_algo_vpp_id, p.crypt_key,
746 self.vpp_esp_protocol,
749 p.tun_sa_in.add_vpp_config()
751 p.tun_if = VppGreInterface(self,
754 type=(VppEnum.vl_api_gre_tunnel_type_t.
755 GRE_API_TUNNEL_TYPE_TEB))
756 p.tun_if.add_vpp_config()
758 p.tun_protect = VppIpsecTunProtect(self,
763 p.tun_protect.add_vpp_config()
766 p.tun_if.config_ip4()
767 config_tun_params(p, self.encryption_type, p.tun_if)
769 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
770 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
772 self.vapi.cli("clear ipsec sa")
776 p.tun_if.unconfig_ip4()
777 super(TestIpsecGreTebIfEsp, self).tearDown()
780 class TestIpsecGreTebIfEspTra(TemplateIpsec,
782 """ Ipsec GRE TEB ESP - Tra tests """
783 tun4_encrypt_node_name = "esp4-encrypt-tun"
784 tun4_decrypt_node_name = "esp4-decrypt-tun"
785 encryption_type = ESP
786 omac = "00:11:22:33:44:55"
788 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
790 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
791 sa.encrypt(IP(src=self.pg0.remote_ip4,
792 dst=self.pg0.local_ip4) /
794 Ether(dst=self.omac) /
795 IP(src="1.1.1.1", dst="1.1.1.2") /
796 UDP(sport=1144, dport=2233) /
797 Raw(b'X' * payload_size))
798 for i in range(count)]
800 def gen_pkts(self, sw_intf, src, dst, count=1,
802 return [Ether(dst=self.omac) /
803 IP(src="1.1.1.1", dst="1.1.1.2") /
804 UDP(sport=1144, dport=2233) /
805 Raw(b'X' * payload_size)
806 for i in range(count)]
808 def verify_decrypted(self, p, rxs):
810 self.assert_equal(rx[Ether].dst, self.omac)
811 self.assert_equal(rx[IP].dst, "1.1.1.2")
813 def verify_encrypted(self, p, sa, rxs):
816 pkt = sa.decrypt(rx[IP])
817 if not pkt.haslayer(IP):
818 pkt = IP(pkt[Raw].load)
819 self.assert_packet_checksums_valid(pkt)
820 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
821 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
822 self.assertTrue(pkt.haslayer(GRE))
824 self.assertEqual(e[Ether].dst, self.omac)
825 self.assertEqual(e[IP].dst, "1.1.1.2")
826 except (IndexError, AssertionError):
827 self.logger.debug(ppp("Unexpected packet:", rx))
829 self.logger.debug(ppp("Decrypted packet:", pkt))
835 super(TestIpsecGreTebIfEspTra, self).setUp()
837 self.tun_if = self.pg0
841 bd1 = VppBridgeDomain(self, 1)
844 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
845 p.auth_algo_vpp_id, p.auth_key,
846 p.crypt_algo_vpp_id, p.crypt_key,
847 self.vpp_esp_protocol)
848 p.tun_sa_out.add_vpp_config()
850 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
851 p.auth_algo_vpp_id, p.auth_key,
852 p.crypt_algo_vpp_id, p.crypt_key,
853 self.vpp_esp_protocol)
854 p.tun_sa_in.add_vpp_config()
856 p.tun_if = VppGreInterface(self,
859 type=(VppEnum.vl_api_gre_tunnel_type_t.
860 GRE_API_TUNNEL_TYPE_TEB))
861 p.tun_if.add_vpp_config()
863 p.tun_protect = VppIpsecTunProtect(self,
868 p.tun_protect.add_vpp_config()
871 p.tun_if.config_ip4()
872 config_tra_params(p, self.encryption_type, p.tun_if)
874 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
875 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
877 self.vapi.cli("clear ipsec sa")
881 p.tun_if.unconfig_ip4()
882 super(TestIpsecGreTebIfEspTra, self).tearDown()
885 class TestIpsecGreIfEsp(TemplateIpsec,
887 """ Ipsec GRE ESP - TUN tests """
888 tun4_encrypt_node_name = "esp4-encrypt-tun"
889 tun4_decrypt_node_name = "esp4-decrypt-tun"
890 encryption_type = ESP
892 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
894 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
895 sa.encrypt(IP(src=self.pg0.remote_ip4,
896 dst=self.pg0.local_ip4) /
898 IP(src=self.pg1.local_ip4,
899 dst=self.pg1.remote_ip4) /
900 UDP(sport=1144, dport=2233) /
901 Raw(b'X' * payload_size))
902 for i in range(count)]
904 def gen_pkts(self, sw_intf, src, dst, count=1,
906 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
907 IP(src="1.1.1.1", dst="1.1.1.2") /
908 UDP(sport=1144, dport=2233) /
909 Raw(b'X' * payload_size)
910 for i in range(count)]
912 def verify_decrypted(self, p, rxs):
914 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
915 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
917 def verify_encrypted(self, p, sa, rxs):
920 pkt = sa.decrypt(rx[IP])
921 if not pkt.haslayer(IP):
922 pkt = IP(pkt[Raw].load)
923 self.assert_packet_checksums_valid(pkt)
924 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
925 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
926 self.assertTrue(pkt.haslayer(GRE))
928 self.assertEqual(e[IP].dst, "1.1.1.2")
929 except (IndexError, AssertionError):
930 self.logger.debug(ppp("Unexpected packet:", rx))
932 self.logger.debug(ppp("Decrypted packet:", pkt))
938 super(TestIpsecGreIfEsp, self).setUp()
940 self.tun_if = self.pg0
944 bd1 = VppBridgeDomain(self, 1)
947 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
948 p.auth_algo_vpp_id, p.auth_key,
949 p.crypt_algo_vpp_id, p.crypt_key,
950 self.vpp_esp_protocol,
953 p.tun_sa_out.add_vpp_config()
955 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
956 p.auth_algo_vpp_id, p.auth_key,
957 p.crypt_algo_vpp_id, p.crypt_key,
958 self.vpp_esp_protocol,
961 p.tun_sa_in.add_vpp_config()
963 p.tun_if = VppGreInterface(self,
966 p.tun_if.add_vpp_config()
968 p.tun_protect = VppIpsecTunProtect(self,
972 p.tun_protect.add_vpp_config()
975 p.tun_if.config_ip4()
976 config_tun_params(p, self.encryption_type, p.tun_if)
978 VppIpRoute(self, "1.1.1.2", 32,
979 [VppRoutePath(p.tun_if.remote_ip4,
980 0xffffffff)]).add_vpp_config()
984 p.tun_if.unconfig_ip4()
985 super(TestIpsecGreIfEsp, self).tearDown()
988 class TestIpsecGreIfEspTra(TemplateIpsec,
990 """ Ipsec GRE ESP - TRA tests """
991 tun4_encrypt_node_name = "esp4-encrypt-tun"
992 tun4_decrypt_node_name = "esp4-decrypt-tun"
993 encryption_type = ESP
995 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
997 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
998 sa.encrypt(IP(src=self.pg0.remote_ip4,
999 dst=self.pg0.local_ip4) /
1001 IP(src=self.pg1.local_ip4,
1002 dst=self.pg1.remote_ip4) /
1003 UDP(sport=1144, dport=2233) /
1004 Raw(b'X' * payload_size))
1005 for i in range(count)]
1007 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1009 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1010 sa.encrypt(IP(src=self.pg0.remote_ip4,
1011 dst=self.pg0.local_ip4) /
1013 UDP(sport=1144, dport=2233) /
1014 Raw(b'X' * payload_size))
1015 for i in range(count)]
1017 def gen_pkts(self, sw_intf, src, dst, count=1,
1019 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1020 IP(src="1.1.1.1", dst="1.1.1.2") /
1021 UDP(sport=1144, dport=2233) /
1022 Raw(b'X' * payload_size)
1023 for i in range(count)]
1025 def verify_decrypted(self, p, rxs):
1027 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1028 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1030 def verify_encrypted(self, p, sa, rxs):
1033 pkt = sa.decrypt(rx[IP])
1034 if not pkt.haslayer(IP):
1035 pkt = IP(pkt[Raw].load)
1036 self.assert_packet_checksums_valid(pkt)
1037 self.assertTrue(pkt.haslayer(GRE))
1039 self.assertEqual(e[IP].dst, "1.1.1.2")
1040 except (IndexError, AssertionError):
1041 self.logger.debug(ppp("Unexpected packet:", rx))
1043 self.logger.debug(ppp("Decrypted packet:", pkt))
1049 super(TestIpsecGreIfEspTra, self).setUp()
1051 self.tun_if = self.pg0
1053 p = self.ipv4_params
1055 bd1 = VppBridgeDomain(self, 1)
1056 bd1.add_vpp_config()
1058 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1059 p.auth_algo_vpp_id, p.auth_key,
1060 p.crypt_algo_vpp_id, p.crypt_key,
1061 self.vpp_esp_protocol)
1062 p.tun_sa_out.add_vpp_config()
1064 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1065 p.auth_algo_vpp_id, p.auth_key,
1066 p.crypt_algo_vpp_id, p.crypt_key,
1067 self.vpp_esp_protocol)
1068 p.tun_sa_in.add_vpp_config()
1070 p.tun_if = VppGreInterface(self,
1072 self.pg0.remote_ip4)
1073 p.tun_if.add_vpp_config()
1075 p.tun_protect = VppIpsecTunProtect(self,
1079 p.tun_protect.add_vpp_config()
1082 p.tun_if.config_ip4()
1083 config_tra_params(p, self.encryption_type, p.tun_if)
1085 VppIpRoute(self, "1.1.1.2", 32,
1086 [VppRoutePath(p.tun_if.remote_ip4,
1087 0xffffffff)]).add_vpp_config()
1090 p = self.ipv4_params
1091 p.tun_if.unconfig_ip4()
1092 super(TestIpsecGreIfEspTra, self).tearDown()
1094 def test_gre_non_ip(self):
1095 p = self.ipv4_params
1096 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1097 src=p.remote_tun_if_host,
1098 dst=self.pg1.remote_ip6)
1099 self.send_and_assert_no_replies(self.tun_if, tx)
1100 node_name = ('/err/%s/unsupported payload' %
1101 self.tun4_decrypt_node_name)
1102 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1105 class TestIpsecGre6IfEspTra(TemplateIpsec,
1107 """ Ipsec GRE ESP - TRA tests """
1108 tun6_encrypt_node_name = "esp6-encrypt-tun"
1109 tun6_decrypt_node_name = "esp6-decrypt-tun"
1110 encryption_type = ESP
1112 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1114 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1115 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1116 dst=self.pg0.local_ip6) /
1118 IPv6(src=self.pg1.local_ip6,
1119 dst=self.pg1.remote_ip6) /
1120 UDP(sport=1144, dport=2233) /
1121 Raw(b'X' * payload_size))
1122 for i in range(count)]
1124 def gen_pkts6(self, sw_intf, src, dst, count=1,
1126 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1127 IPv6(src="1::1", dst="1::2") /
1128 UDP(sport=1144, dport=2233) /
1129 Raw(b'X' * payload_size)
1130 for i in range(count)]
1132 def verify_decrypted6(self, p, rxs):
1134 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1135 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1137 def verify_encrypted6(self, p, sa, rxs):
1140 pkt = sa.decrypt(rx[IPv6])
1141 if not pkt.haslayer(IPv6):
1142 pkt = IPv6(pkt[Raw].load)
1143 self.assert_packet_checksums_valid(pkt)
1144 self.assertTrue(pkt.haslayer(GRE))
1146 self.assertEqual(e[IPv6].dst, "1::2")
1147 except (IndexError, AssertionError):
1148 self.logger.debug(ppp("Unexpected packet:", rx))
1150 self.logger.debug(ppp("Decrypted packet:", pkt))
1156 super(TestIpsecGre6IfEspTra, self).setUp()
1158 self.tun_if = self.pg0
1160 p = self.ipv6_params
1162 bd1 = VppBridgeDomain(self, 1)
1163 bd1.add_vpp_config()
1165 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1166 p.auth_algo_vpp_id, p.auth_key,
1167 p.crypt_algo_vpp_id, p.crypt_key,
1168 self.vpp_esp_protocol)
1169 p.tun_sa_out.add_vpp_config()
1171 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1172 p.auth_algo_vpp_id, p.auth_key,
1173 p.crypt_algo_vpp_id, p.crypt_key,
1174 self.vpp_esp_protocol)
1175 p.tun_sa_in.add_vpp_config()
1177 p.tun_if = VppGreInterface(self,
1179 self.pg0.remote_ip6)
1180 p.tun_if.add_vpp_config()
1182 p.tun_protect = VppIpsecTunProtect(self,
1186 p.tun_protect.add_vpp_config()
1189 p.tun_if.config_ip6()
1190 config_tra_params(p, self.encryption_type, p.tun_if)
1192 r = VppIpRoute(self, "1::2", 128,
1193 [VppRoutePath(p.tun_if.remote_ip6,
1195 proto=DpoProto.DPO_PROTO_IP6)])
1199 p = self.ipv6_params
1200 p.tun_if.unconfig_ip6()
1201 super(TestIpsecGre6IfEspTra, self).tearDown()
1204 class TemplateIpsec4TunProtect(object):
1205 """ IPsec IPv4 Tunnel protect """
1207 encryption_type = ESP
1208 tun4_encrypt_node_name = "esp4-encrypt-tun"
1209 tun4_decrypt_node_name = "esp4-decrypt-tun"
1210 tun4_input_node = "ipsec4-tun-input"
1212 def config_sa_tra(self, p):
1213 config_tun_params(p, self.encryption_type, p.tun_if)
1215 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1216 p.auth_algo_vpp_id, p.auth_key,
1217 p.crypt_algo_vpp_id, p.crypt_key,
1218 self.vpp_esp_protocol,
1220 p.tun_sa_out.add_vpp_config()
1222 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1223 p.auth_algo_vpp_id, p.auth_key,
1224 p.crypt_algo_vpp_id, p.crypt_key,
1225 self.vpp_esp_protocol,
1227 p.tun_sa_in.add_vpp_config()
1229 def config_sa_tun(self, p):
1230 config_tun_params(p, self.encryption_type, p.tun_if)
1232 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1233 p.auth_algo_vpp_id, p.auth_key,
1234 p.crypt_algo_vpp_id, p.crypt_key,
1235 self.vpp_esp_protocol,
1236 self.tun_if.remote_addr[p.addr_type],
1237 self.tun_if.local_addr[p.addr_type],
1239 p.tun_sa_out.add_vpp_config()
1241 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1242 p.auth_algo_vpp_id, p.auth_key,
1243 p.crypt_algo_vpp_id, p.crypt_key,
1244 self.vpp_esp_protocol,
1245 self.tun_if.remote_addr[p.addr_type],
1246 self.tun_if.local_addr[p.addr_type],
1248 p.tun_sa_in.add_vpp_config()
1250 def config_protect(self, p):
1251 p.tun_protect = VppIpsecTunProtect(self,
1255 p.tun_protect.add_vpp_config()
1257 def config_network(self, p):
1258 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1260 self.pg0.remote_ip4)
1261 p.tun_if.add_vpp_config()
1263 p.tun_if.config_ip4()
1264 p.tun_if.config_ip6()
1266 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1267 [VppRoutePath(p.tun_if.remote_ip4,
1269 p.route.add_vpp_config()
1270 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1271 [VppRoutePath(p.tun_if.remote_ip6,
1273 proto=DpoProto.DPO_PROTO_IP6)])
1276 def unconfig_network(self, p):
1277 p.route.remove_vpp_config()
1278 p.tun_if.remove_vpp_config()
1280 def unconfig_protect(self, p):
1281 p.tun_protect.remove_vpp_config()
1283 def unconfig_sa(self, p):
1284 p.tun_sa_out.remove_vpp_config()
1285 p.tun_sa_in.remove_vpp_config()
1288 class TestIpsec4TunProtect(TemplateIpsec,
1289 TemplateIpsec4TunProtect,
1291 """ IPsec IPv4 Tunnel protect - transport mode"""
1294 super(TestIpsec4TunProtect, self).setUp()
1296 self.tun_if = self.pg0
1299 super(TestIpsec4TunProtect, self).tearDown()
1301 def test_tun_44(self):
1302 """IPSEC tunnel protect"""
1304 p = self.ipv4_params
1306 self.config_network(p)
1307 self.config_sa_tra(p)
1308 self.config_protect(p)
1310 self.verify_tun_44(p, count=127)
1311 c = p.tun_if.get_rx_stats()
1312 self.assertEqual(c['packets'], 127)
1313 c = p.tun_if.get_tx_stats()
1314 self.assertEqual(c['packets'], 127)
1316 self.vapi.cli("clear ipsec sa")
1317 self.verify_tun_64(p, count=127)
1318 c = p.tun_if.get_rx_stats()
1319 self.assertEqual(c['packets'], 254)
1320 c = p.tun_if.get_tx_stats()
1321 self.assertEqual(c['packets'], 254)
1323 # rekey - create new SAs and update the tunnel protection
1325 np.crypt_key = b'X' + p.crypt_key[1:]
1326 np.scapy_tun_spi += 100
1327 np.scapy_tun_sa_id += 1
1328 np.vpp_tun_spi += 100
1329 np.vpp_tun_sa_id += 1
1330 np.tun_if.local_spi = p.vpp_tun_spi
1331 np.tun_if.remote_spi = p.scapy_tun_spi
1333 self.config_sa_tra(np)
1334 self.config_protect(np)
1337 self.verify_tun_44(np, count=127)
1338 c = p.tun_if.get_rx_stats()
1339 self.assertEqual(c['packets'], 381)
1340 c = p.tun_if.get_tx_stats()
1341 self.assertEqual(c['packets'], 381)
1344 self.unconfig_protect(np)
1345 self.unconfig_sa(np)
1346 self.unconfig_network(p)
1349 class TestIpsec4TunProtectUdp(TemplateIpsec,
1350 TemplateIpsec4TunProtect,
1352 """ IPsec IPv4 Tunnel protect - transport mode"""
1355 super(TestIpsec4TunProtectUdp, self).setUp()
1357 self.tun_if = self.pg0
1359 p = self.ipv4_params
1360 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1361 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1362 p.nat_header = UDP(sport=5454, dport=4500)
1363 self.config_network(p)
1364 self.config_sa_tra(p)
1365 self.config_protect(p)
1368 p = self.ipv4_params
1369 self.unconfig_protect(p)
1371 self.unconfig_network(p)
1372 super(TestIpsec4TunProtectUdp, self).tearDown()
1374 def test_tun_44(self):
1375 """IPSEC UDP tunnel protect"""
1377 p = self.ipv4_params
1379 self.verify_tun_44(p, count=127)
1380 c = p.tun_if.get_rx_stats()
1381 self.assertEqual(c['packets'], 127)
1382 c = p.tun_if.get_tx_stats()
1383 self.assertEqual(c['packets'], 127)
1385 def test_keepalive(self):
1386 """ IPSEC NAT Keepalive """
1387 self.verify_keepalive(self.ipv4_params)
1390 class TestIpsec4TunProtectTun(TemplateIpsec,
1391 TemplateIpsec4TunProtect,
1393 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1395 encryption_type = ESP
1396 tun4_encrypt_node_name = "esp4-encrypt-tun"
1397 tun4_decrypt_node_name = "esp4-decrypt-tun"
1400 super(TestIpsec4TunProtectTun, self).setUp()
1402 self.tun_if = self.pg0
1405 super(TestIpsec4TunProtectTun, self).tearDown()
1407 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1409 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1410 sa.encrypt(IP(src=sw_intf.remote_ip4,
1411 dst=sw_intf.local_ip4) /
1412 IP(src=src, dst=dst) /
1413 UDP(sport=1144, dport=2233) /
1414 Raw(b'X' * payload_size))
1415 for i in range(count)]
1417 def gen_pkts(self, sw_intf, src, dst, count=1,
1419 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1420 IP(src=src, dst=dst) /
1421 UDP(sport=1144, dport=2233) /
1422 Raw(b'X' * payload_size)
1423 for i in range(count)]
1425 def verify_decrypted(self, p, rxs):
1427 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1428 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1429 self.assert_packet_checksums_valid(rx)
1431 def verify_encrypted(self, p, sa, rxs):
1434 pkt = sa.decrypt(rx[IP])
1435 if not pkt.haslayer(IP):
1436 pkt = IP(pkt[Raw].load)
1437 self.assert_packet_checksums_valid(pkt)
1438 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1439 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1440 inner = pkt[IP].payload
1441 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1443 except (IndexError, AssertionError):
1444 self.logger.debug(ppp("Unexpected packet:", rx))
1446 self.logger.debug(ppp("Decrypted packet:", pkt))
1451 def test_tun_44(self):
1452 """IPSEC tunnel protect """
1454 p = self.ipv4_params
1456 self.config_network(p)
1457 self.config_sa_tun(p)
1458 self.config_protect(p)
1460 self.verify_tun_44(p, count=127)
1462 c = p.tun_if.get_rx_stats()
1463 self.assertEqual(c['packets'], 127)
1464 c = p.tun_if.get_tx_stats()
1465 self.assertEqual(c['packets'], 127)
1467 # rekey - create new SAs and update the tunnel protection
1469 np.crypt_key = b'X' + p.crypt_key[1:]
1470 np.scapy_tun_spi += 100
1471 np.scapy_tun_sa_id += 1
1472 np.vpp_tun_spi += 100
1473 np.vpp_tun_sa_id += 1
1474 np.tun_if.local_spi = p.vpp_tun_spi
1475 np.tun_if.remote_spi = p.scapy_tun_spi
1477 self.config_sa_tun(np)
1478 self.config_protect(np)
1481 self.verify_tun_44(np, count=127)
1482 c = p.tun_if.get_rx_stats()
1483 self.assertEqual(c['packets'], 254)
1484 c = p.tun_if.get_tx_stats()
1485 self.assertEqual(c['packets'], 254)
1488 self.unconfig_protect(np)
1489 self.unconfig_sa(np)
1490 self.unconfig_network(p)
1493 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1494 TemplateIpsec4TunProtect,
1496 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1498 encryption_type = ESP
1499 tun4_encrypt_node_name = "esp4-encrypt-tun"
1500 tun4_decrypt_node_name = "esp4-decrypt-tun"
1503 super(TestIpsec4TunProtectTunDrop, self).setUp()
1505 self.tun_if = self.pg0
1508 super(TestIpsec4TunProtectTunDrop, self).tearDown()
1510 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1512 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1513 sa.encrypt(IP(src=sw_intf.remote_ip4,
1515 IP(src=src, dst=dst) /
1516 UDP(sport=1144, dport=2233) /
1517 Raw(b'X' * payload_size))
1518 for i in range(count)]
1520 def test_tun_drop_44(self):
1521 """IPSEC tunnel protect bogus tunnel header """
1523 p = self.ipv4_params
1525 self.config_network(p)
1526 self.config_sa_tun(p)
1527 self.config_protect(p)
1529 tx = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1530 src=p.remote_tun_if_host,
1531 dst=self.pg1.remote_ip4,
1533 self.send_and_assert_no_replies(self.tun_if, tx)
1536 self.unconfig_protect(p)
1538 self.unconfig_network(p)
1541 class TemplateIpsec6TunProtect(object):
1542 """ IPsec IPv6 Tunnel protect """
1544 def config_sa_tra(self, p):
1545 config_tun_params(p, self.encryption_type, p.tun_if)
1547 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1548 p.auth_algo_vpp_id, p.auth_key,
1549 p.crypt_algo_vpp_id, p.crypt_key,
1550 self.vpp_esp_protocol)
1551 p.tun_sa_out.add_vpp_config()
1553 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1554 p.auth_algo_vpp_id, p.auth_key,
1555 p.crypt_algo_vpp_id, p.crypt_key,
1556 self.vpp_esp_protocol)
1557 p.tun_sa_in.add_vpp_config()
1559 def config_sa_tun(self, p):
1560 config_tun_params(p, self.encryption_type, p.tun_if)
1562 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1563 p.auth_algo_vpp_id, p.auth_key,
1564 p.crypt_algo_vpp_id, p.crypt_key,
1565 self.vpp_esp_protocol,
1566 self.tun_if.remote_addr[p.addr_type],
1567 self.tun_if.local_addr[p.addr_type])
1568 p.tun_sa_out.add_vpp_config()
1570 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1571 p.auth_algo_vpp_id, p.auth_key,
1572 p.crypt_algo_vpp_id, p.crypt_key,
1573 self.vpp_esp_protocol,
1574 self.tun_if.remote_addr[p.addr_type],
1575 self.tun_if.local_addr[p.addr_type])
1576 p.tun_sa_in.add_vpp_config()
1578 def config_protect(self, p):
1579 p.tun_protect = VppIpsecTunProtect(self,
1583 p.tun_protect.add_vpp_config()
1585 def config_network(self, p):
1586 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1588 self.pg0.remote_ip6)
1589 p.tun_if.add_vpp_config()
1591 p.tun_if.config_ip6()
1592 p.tun_if.config_ip4()
1594 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1595 [VppRoutePath(p.tun_if.remote_ip6,
1597 proto=DpoProto.DPO_PROTO_IP6)])
1598 p.route.add_vpp_config()
1599 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1600 [VppRoutePath(p.tun_if.remote_ip4,
1604 def unconfig_network(self, p):
1605 p.route.remove_vpp_config()
1606 p.tun_if.remove_vpp_config()
1608 def unconfig_protect(self, p):
1609 p.tun_protect.remove_vpp_config()
1611 def unconfig_sa(self, p):
1612 p.tun_sa_out.remove_vpp_config()
1613 p.tun_sa_in.remove_vpp_config()
1616 class TestIpsec6TunProtect(TemplateIpsec,
1617 TemplateIpsec6TunProtect,
1619 """ IPsec IPv6 Tunnel protect - transport mode"""
1621 encryption_type = ESP
1622 tun6_encrypt_node_name = "esp6-encrypt-tun"
1623 tun6_decrypt_node_name = "esp6-decrypt-tun"
1626 super(TestIpsec6TunProtect, self).setUp()
1628 self.tun_if = self.pg0
1631 super(TestIpsec6TunProtect, self).tearDown()
1633 def test_tun_66(self):
1634 """IPSEC tunnel protect 6o6"""
1636 p = self.ipv6_params
1638 self.config_network(p)
1639 self.config_sa_tra(p)
1640 self.config_protect(p)
1642 self.verify_tun_66(p, count=127)
1643 c = p.tun_if.get_rx_stats()
1644 self.assertEqual(c['packets'], 127)
1645 c = p.tun_if.get_tx_stats()
1646 self.assertEqual(c['packets'], 127)
1648 # rekey - create new SAs and update the tunnel protection
1650 np.crypt_key = b'X' + p.crypt_key[1:]
1651 np.scapy_tun_spi += 100
1652 np.scapy_tun_sa_id += 1
1653 np.vpp_tun_spi += 100
1654 np.vpp_tun_sa_id += 1
1655 np.tun_if.local_spi = p.vpp_tun_spi
1656 np.tun_if.remote_spi = p.scapy_tun_spi
1658 self.config_sa_tra(np)
1659 self.config_protect(np)
1662 self.verify_tun_66(np, count=127)
1663 c = p.tun_if.get_rx_stats()
1664 self.assertEqual(c['packets'], 254)
1665 c = p.tun_if.get_tx_stats()
1666 self.assertEqual(c['packets'], 254)
1668 # bounce the interface state
1669 p.tun_if.admin_down()
1670 self.verify_drop_tun_66(np, count=127)
1671 node = ('/err/ipsec6-tun-input/%s' %
1672 'ipsec packets received on disabled interface')
1673 self.assertEqual(127, self.statistics.get_err_counter(node))
1675 self.verify_tun_66(np, count=127)
1678 # 1) add two input SAs [old, new]
1679 # 2) swap output SA to [new]
1680 # 3) use only [new] input SA
1682 np3.crypt_key = b'Z' + p.crypt_key[1:]
1683 np3.scapy_tun_spi += 100
1684 np3.scapy_tun_sa_id += 1
1685 np3.vpp_tun_spi += 100
1686 np3.vpp_tun_sa_id += 1
1687 np3.tun_if.local_spi = p.vpp_tun_spi
1688 np3.tun_if.remote_spi = p.scapy_tun_spi
1690 self.config_sa_tra(np3)
1693 p.tun_protect.update_vpp_config(np.tun_sa_out,
1694 [np.tun_sa_in, np3.tun_sa_in])
1695 self.verify_tun_66(np, np, count=127)
1696 self.verify_tun_66(np3, np, count=127)
1699 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1700 [np.tun_sa_in, np3.tun_sa_in])
1701 self.verify_tun_66(np, np3, count=127)
1702 self.verify_tun_66(np3, np3, count=127)
1705 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1707 self.verify_tun_66(np3, np3, count=127)
1708 self.verify_drop_tun_66(np, count=127)
1710 c = p.tun_if.get_rx_stats()
1711 self.assertEqual(c['packets'], 127*9)
1712 c = p.tun_if.get_tx_stats()
1713 self.assertEqual(c['packets'], 127*8)
1714 self.unconfig_sa(np)
1717 self.unconfig_protect(np3)
1718 self.unconfig_sa(np3)
1719 self.unconfig_network(p)
1721 def test_tun_46(self):
1722 """IPSEC tunnel protect 4o6"""
1724 p = self.ipv6_params
1726 self.config_network(p)
1727 self.config_sa_tra(p)
1728 self.config_protect(p)
1730 self.verify_tun_46(p, count=127)
1731 c = p.tun_if.get_rx_stats()
1732 self.assertEqual(c['packets'], 127)
1733 c = p.tun_if.get_tx_stats()
1734 self.assertEqual(c['packets'], 127)
1737 self.unconfig_protect(p)
1739 self.unconfig_network(p)
1742 class TestIpsec6TunProtectTun(TemplateIpsec,
1743 TemplateIpsec6TunProtect,
1745 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1747 encryption_type = ESP
1748 tun6_encrypt_node_name = "esp6-encrypt-tun"
1749 tun6_decrypt_node_name = "esp6-decrypt-tun"
1752 super(TestIpsec6TunProtectTun, self).setUp()
1754 self.tun_if = self.pg0
1757 super(TestIpsec6TunProtectTun, self).tearDown()
1759 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1761 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1762 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1763 dst=sw_intf.local_ip6) /
1764 IPv6(src=src, dst=dst) /
1765 UDP(sport=1166, dport=2233) /
1766 Raw(b'X' * payload_size))
1767 for i in range(count)]
1769 def gen_pkts6(self, sw_intf, src, dst, count=1,
1771 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1772 IPv6(src=src, dst=dst) /
1773 UDP(sport=1166, dport=2233) /
1774 Raw(b'X' * payload_size)
1775 for i in range(count)]
1777 def verify_decrypted6(self, p, rxs):
1779 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1780 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1781 self.assert_packet_checksums_valid(rx)
1783 def verify_encrypted6(self, p, sa, rxs):
1786 pkt = sa.decrypt(rx[IPv6])
1787 if not pkt.haslayer(IPv6):
1788 pkt = IPv6(pkt[Raw].load)
1789 self.assert_packet_checksums_valid(pkt)
1790 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1791 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1792 inner = pkt[IPv6].payload
1793 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1795 except (IndexError, AssertionError):
1796 self.logger.debug(ppp("Unexpected packet:", rx))
1798 self.logger.debug(ppp("Decrypted packet:", pkt))
1803 def test_tun_66(self):
1804 """IPSEC tunnel protect """
1806 p = self.ipv6_params
1808 self.config_network(p)
1809 self.config_sa_tun(p)
1810 self.config_protect(p)
1812 self.verify_tun_66(p, count=127)
1814 c = p.tun_if.get_rx_stats()
1815 self.assertEqual(c['packets'], 127)
1816 c = p.tun_if.get_tx_stats()
1817 self.assertEqual(c['packets'], 127)
1819 # rekey - create new SAs and update the tunnel protection
1821 np.crypt_key = b'X' + p.crypt_key[1:]
1822 np.scapy_tun_spi += 100
1823 np.scapy_tun_sa_id += 1
1824 np.vpp_tun_spi += 100
1825 np.vpp_tun_sa_id += 1
1826 np.tun_if.local_spi = p.vpp_tun_spi
1827 np.tun_if.remote_spi = p.scapy_tun_spi
1829 self.config_sa_tun(np)
1830 self.config_protect(np)
1833 self.verify_tun_66(np, count=127)
1834 c = p.tun_if.get_rx_stats()
1835 self.assertEqual(c['packets'], 254)
1836 c = p.tun_if.get_tx_stats()
1837 self.assertEqual(c['packets'], 254)
1840 self.unconfig_protect(np)
1841 self.unconfig_sa(np)
1842 self.unconfig_network(p)
1845 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
1846 TemplateIpsec6TunProtect,
1848 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
1850 encryption_type = ESP
1851 tun6_encrypt_node_name = "esp6-encrypt-tun"
1852 tun6_decrypt_node_name = "esp6-decrypt-tun"
1855 super(TestIpsec6TunProtectTunDrop, self).setUp()
1857 self.tun_if = self.pg0
1860 super(TestIpsec6TunProtectTunDrop, self).tearDown()
1862 def gen_encrypt_pkts5(self, sa, sw_intf, src, dst, count=1,
1864 # the IP destination of the revelaed packet does not match
1865 # that assigned to the tunnel
1866 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1867 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1869 IPv6(src=src, dst=dst) /
1870 UDP(sport=1144, dport=2233) /
1871 Raw(b'X' * payload_size))
1872 for i in range(count)]
1874 def test_tun_drop_66(self):
1875 """IPSEC 6 tunnel protect bogus tunnel header """
1877 p = self.ipv6_params
1879 self.config_network(p)
1880 self.config_sa_tun(p)
1881 self.config_protect(p)
1883 tx = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1884 src=p.remote_tun_if_host,
1885 dst=self.pg1.remote_ip6,
1887 self.send_and_assert_no_replies(self.tun_if, tx)
1889 self.unconfig_protect(p)
1891 self.unconfig_network(p)
1894 if __name__ == '__main__':
1895 unittest.main(testRunner=VppTestRunner)