5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from vpp_papi import VppEnum
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23 """ IPsec tunnel interface tests """
29 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
32 def tearDownClass(cls):
33 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
36 super(TemplateIpsec4TunIfEsp, self).setUp()
38 self.tun_if = self.pg0
42 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43 p.scapy_tun_spi, p.crypt_algo_vpp_id,
44 p.crypt_key, p.crypt_key,
45 p.auth_algo_vpp_id, p.auth_key,
47 p.tun_if.add_vpp_config()
52 r = VppIpRoute(self, p.remote_tun_if_host, 32,
53 [VppRoutePath(p.tun_if.remote_ip4,
56 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57 [VppRoutePath(p.tun_if.remote_ip6,
59 proto=DpoProto.DPO_PROTO_IP6)])
63 super(TemplateIpsec4TunIfEsp, self).tearDown()
66 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
67 """ IPsec UDP tunnel interface tests """
69 tun4_encrypt_node_name = "esp4-encrypt-tun"
70 tun4_decrypt_node_name = "esp4-decrypt"
75 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
78 def tearDownClass(cls):
79 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
82 super(TemplateIpsec4TunIfEspUdp, self).setUp()
84 self.tun_if = self.pg0
87 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
88 IPSEC_API_SAD_FLAG_UDP_ENCAP)
89 p.nat_header = UDP(sport=5454, dport=4500)
91 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
92 p.scapy_tun_spi, p.crypt_algo_vpp_id,
93 p.crypt_key, p.crypt_key,
94 p.auth_algo_vpp_id, p.auth_key,
95 p.auth_key, udp_encap=True)
96 p.tun_if.add_vpp_config()
101 r = VppIpRoute(self, p.remote_tun_if_host, 32,
102 [VppRoutePath(p.tun_if.remote_ip4,
105 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106 [VppRoutePath(p.tun_if.remote_ip6,
108 proto=DpoProto.DPO_PROTO_IP6)])
112 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
115 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
116 """ Ipsec ESP - TUN tests """
117 tun4_encrypt_node_name = "esp4-encrypt-tun"
118 tun4_decrypt_node_name = "esp4-decrypt"
120 def test_tun_basic64(self):
121 """ ipsec 6o4 tunnel basic test """
122 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
124 self.verify_tun_64(self.params[socket.AF_INET], count=1)
126 def test_tun_burst64(self):
127 """ ipsec 6o4 tunnel basic test """
128 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
130 self.verify_tun_64(self.params[socket.AF_INET], count=257)
132 def test_tun_basic_frag44(self):
133 """ ipsec 4o4 tunnel frag basic test """
134 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
138 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
140 self.verify_tun_44(self.params[socket.AF_INET],
141 count=1, payload_size=1800, n_rx=2)
142 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
146 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
147 """ Ipsec ESP UDP tests """
149 tun4_input_node = "ipsec4-if-input"
151 def test_keepalive(self):
152 """ IPSEC NAT Keepalive """
153 self.verify_keepalive(self.ipv4_params)
156 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
157 """ Ipsec ESP - TCP tests """
161 class TemplateIpsec6TunIfEsp(TemplateIpsec):
162 """ IPsec tunnel interface tests """
164 encryption_type = ESP
167 super(TemplateIpsec6TunIfEsp, self).setUp()
169 self.tun_if = self.pg0
172 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173 p.scapy_tun_spi, p.crypt_algo_vpp_id,
174 p.crypt_key, p.crypt_key,
175 p.auth_algo_vpp_id, p.auth_key,
176 p.auth_key, is_ip6=True)
177 tun_if.add_vpp_config()
182 r = VppIpRoute(self, p.remote_tun_if_host, 128,
183 [VppRoutePath(tun_if.remote_ip6,
185 proto=DpoProto.DPO_PROTO_IP6)])
187 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
188 [VppRoutePath(tun_if.remote_ip4,
193 super(TemplateIpsec6TunIfEsp, self).tearDown()
196 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
197 """ Ipsec ESP - TUN tests """
198 tun6_encrypt_node_name = "esp6-encrypt-tun"
199 tun6_decrypt_node_name = "esp6-decrypt"
201 def test_tun_basic46(self):
202 """ ipsec 4o6 tunnel basic test """
203 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
204 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
206 def test_tun_burst46(self):
207 """ ipsec 4o6 tunnel burst test """
208 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
209 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
212 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
213 """ IPsec IPv4 Multi Tunnel interface """
215 encryption_type = ESP
216 tun4_encrypt_node_name = "esp4-encrypt-tun"
217 tun4_decrypt_node_name = "esp4-decrypt"
220 super(TestIpsec4MultiTunIfEsp, self).setUp()
222 self.tun_if = self.pg0
224 self.multi_params = []
227 p = copy.copy(self.ipv4_params)
229 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
230 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
231 p.scapy_tun_spi = p.scapy_tun_spi + ii
232 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
233 p.vpp_tun_spi = p.vpp_tun_spi + ii
235 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
236 p.scapy_tra_spi = p.scapy_tra_spi + ii
237 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
238 p.vpp_tra_spi = p.vpp_tra_spi + ii
240 config_tun_params(p, self.encryption_type, self.tun_if)
241 self.multi_params.append(p)
243 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
246 p.crypt_key, p.crypt_key,
247 p.auth_algo_vpp_id, p.auth_key,
249 p.tun_if.add_vpp_config()
251 p.tun_if.config_ip4()
253 VppIpRoute(self, p.remote_tun_if_host, 32,
254 [VppRoutePath(p.tun_if.remote_ip4,
255 0xffffffff)]).add_vpp_config()
258 super(TestIpsec4MultiTunIfEsp, self).tearDown()
260 def test_tun_44(self):
261 """Multiple IPSEC tunnel interfaces """
262 for p in self.multi_params:
263 self.verify_tun_44(p, count=127)
264 c = p.tun_if.get_rx_stats()
265 self.assertEqual(c['packets'], 127)
266 c = p.tun_if.get_tx_stats()
267 self.assertEqual(c['packets'], 127)
270 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
271 """ IPsec IPv4 Tunnel interface all Algos """
273 encryption_type = ESP
274 tun4_encrypt_node_name = "esp4-encrypt-tun"
275 tun4_decrypt_node_name = "esp4-decrypt"
277 def config_network(self, p):
278 config_tun_params(p, self.encryption_type, self.tun_if)
280 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
283 p.crypt_key, p.crypt_key,
284 p.auth_algo_vpp_id, p.auth_key,
287 p.tun_if.add_vpp_config()
289 p.tun_if.config_ip4()
290 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
291 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
293 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
294 [VppRoutePath(p.tun_if.remote_ip4,
296 p.route.add_vpp_config()
298 def unconfig_network(self, p):
299 p.tun_if.unconfig_ip4()
300 p.tun_if.remove_vpp_config()
301 p.route.remove_vpp_config()
304 super(TestIpsec4TunIfEspAll, self).setUp()
306 self.tun_if = self.pg0
309 super(TestIpsec4TunIfEspAll, self).tearDown()
313 # change the key and the SPI
315 p.crypt_key = 'X' + p.crypt_key[1:]
317 p.scapy_tun_sa_id += 1
320 p.tun_if.local_spi = p.vpp_tun_spi
321 p.tun_if.remote_spi = p.scapy_tun_spi
323 config_tun_params(p, self.encryption_type, self.tun_if)
325 p.tun_sa_in = VppIpsecSA(self,
332 self.vpp_esp_protocol,
333 self.tun_if.local_addr[p.addr_type],
334 self.tun_if.remote_addr[p.addr_type],
337 p.tun_sa_out = VppIpsecSA(self,
344 self.vpp_esp_protocol,
345 self.tun_if.remote_addr[p.addr_type],
346 self.tun_if.local_addr[p.addr_type],
349 p.tun_sa_in.add_vpp_config()
350 p.tun_sa_out.add_vpp_config()
352 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
353 sa_id=p.tun_sa_in.id,
355 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
356 sa_id=p.tun_sa_out.id,
358 self.logger.info(self.vapi.cli("sh ipsec sa"))
360 def test_tun_44(self):
361 """IPSEC tunnel all algos """
363 # foreach VPP crypto engine
364 engines = ["ia32", "ipsecmb", "openssl"]
366 # foreach crypto algorithm
367 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
368 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
369 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
370 IPSEC_API_INTEG_ALG_NONE),
371 'scapy-crypto': "AES-GCM",
372 'scapy-integ': "NULL",
373 'key': "JPjyOWBeVEQiMe7h",
375 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
376 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
377 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
378 IPSEC_API_INTEG_ALG_NONE),
379 'scapy-crypto': "AES-GCM",
380 'scapy-integ': "NULL",
381 'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
383 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
384 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
385 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
386 IPSEC_API_INTEG_ALG_NONE),
387 'scapy-crypto': "AES-GCM",
388 'scapy-integ': "NULL",
389 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
391 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
392 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
393 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
394 IPSEC_API_INTEG_ALG_SHA1_96),
395 'scapy-crypto': "AES-CBC",
396 'scapy-integ': "HMAC-SHA1-96",
398 'key': "JPjyOWBeVEQiMe7h"},
399 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
400 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
401 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
402 IPSEC_API_INTEG_ALG_SHA1_96),
403 'scapy-crypto': "AES-CBC",
404 'scapy-integ': "HMAC-SHA1-96",
406 'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
407 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
408 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
409 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
410 IPSEC_API_INTEG_ALG_SHA1_96),
411 'scapy-crypto': "AES-CBC",
412 'scapy-integ': "HMAC-SHA1-96",
414 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
415 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
416 IPSEC_API_CRYPTO_ALG_NONE),
417 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
418 IPSEC_API_INTEG_ALG_SHA1_96),
419 'scapy-crypto': "NULL",
420 'scapy-integ': "HMAC-SHA1-96",
422 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
424 for engine in engines:
425 self.vapi.cli("set crypto handler all %s" % engine)
428 # loop through each of the algorithms
431 # with self.subTest(algo=algo['scapy']):
433 p = copy.copy(self.ipv4_params)
434 p.auth_algo_vpp_id = algo['vpp-integ']
435 p.crypt_algo_vpp_id = algo['vpp-crypto']
436 p.crypt_algo = algo['scapy-crypto']
437 p.auth_algo = algo['scapy-integ']
438 p.crypt_key = algo['key']
439 p.salt = algo['salt']
441 self.config_network(p)
443 self.verify_tun_44(p, count=127)
444 c = p.tun_if.get_rx_stats()
445 self.assertEqual(c['packets'], 127)
446 c = p.tun_if.get_tx_stats()
447 self.assertEqual(c['packets'], 127)
453 self.verify_tun_44(p, count=127)
455 self.unconfig_network(p)
456 p.tun_sa_out.remove_vpp_config()
457 p.tun_sa_in.remove_vpp_config()
460 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
461 """ IPsec IPv6 Multi Tunnel interface """
463 encryption_type = ESP
464 tun6_encrypt_node_name = "esp6-encrypt-tun"
465 tun6_decrypt_node_name = "esp6-decrypt"
468 super(TestIpsec6MultiTunIfEsp, self).setUp()
470 self.tun_if = self.pg0
472 self.multi_params = []
475 p = copy.copy(self.ipv6_params)
477 p.remote_tun_if_host = "1111::%d" % (ii + 1)
478 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
479 p.scapy_tun_spi = p.scapy_tun_spi + ii
480 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
481 p.vpp_tun_spi = p.vpp_tun_spi + ii
483 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
484 p.scapy_tra_spi = p.scapy_tra_spi + ii
485 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
486 p.vpp_tra_spi = p.vpp_tra_spi + ii
488 config_tun_params(p, self.encryption_type, self.tun_if)
489 self.multi_params.append(p)
491 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
494 p.crypt_key, p.crypt_key,
495 p.auth_algo_vpp_id, p.auth_key,
496 p.auth_key, is_ip6=True)
497 p.tun_if.add_vpp_config()
499 p.tun_if.config_ip6()
501 r = VppIpRoute(self, p.remote_tun_if_host, 128,
502 [VppRoutePath(p.tun_if.remote_ip6,
504 proto=DpoProto.DPO_PROTO_IP6)])
508 super(TestIpsec6MultiTunIfEsp, self).tearDown()
510 def test_tun_66(self):
511 """Multiple IPSEC tunnel interfaces """
512 for p in self.multi_params:
513 self.verify_tun_66(p, count=127)
514 c = p.tun_if.get_rx_stats()
515 self.assertEqual(c['packets'], 127)
516 c = p.tun_if.get_tx_stats()
517 self.assertEqual(c['packets'], 127)
520 class TestIpsecGreTebIfEsp(TemplateIpsec,
522 """ Ipsec GRE TEB ESP - TUN tests """
523 tun4_encrypt_node_name = "esp4-encrypt-tun"
524 tun4_decrypt_node_name = "esp4-decrypt-tun"
525 encryption_type = ESP
526 omac = "00:11:22:33:44:55"
528 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
530 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
531 sa.encrypt(IP(src=self.pg0.remote_ip4,
532 dst=self.pg0.local_ip4) /
534 Ether(dst=self.omac) /
535 IP(src="1.1.1.1", dst="1.1.1.2") /
536 UDP(sport=1144, dport=2233) /
537 Raw('X' * payload_size))
538 for i in range(count)]
540 def gen_pkts(self, sw_intf, src, dst, count=1,
542 return [Ether(dst=self.omac) /
543 IP(src="1.1.1.1", dst="1.1.1.2") /
544 UDP(sport=1144, dport=2233) /
545 Raw('X' * payload_size)
546 for i in range(count)]
548 def verify_decrypted(self, p, rxs):
550 self.assert_equal(rx[Ether].dst, self.omac)
551 self.assert_equal(rx[IP].dst, "1.1.1.2")
553 def verify_encrypted(self, p, sa, rxs):
556 pkt = sa.decrypt(rx[IP])
557 if not pkt.haslayer(IP):
558 pkt = IP(pkt[Raw].load)
559 self.assert_packet_checksums_valid(pkt)
560 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
561 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
562 self.assertTrue(pkt.haslayer(GRE))
564 self.assertEqual(e[Ether].dst, self.omac)
565 self.assertEqual(e[IP].dst, "1.1.1.2")
566 except (IndexError, AssertionError):
567 self.logger.debug(ppp("Unexpected packet:", rx))
569 self.logger.debug(ppp("Decrypted packet:", pkt))
575 super(TestIpsecGreTebIfEsp, self).setUp()
577 self.tun_if = self.pg0
581 bd1 = VppBridgeDomain(self, 1)
584 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
585 p.auth_algo_vpp_id, p.auth_key,
586 p.crypt_algo_vpp_id, p.crypt_key,
587 self.vpp_esp_protocol,
590 p.tun_sa_out.add_vpp_config()
592 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
593 p.auth_algo_vpp_id, p.auth_key,
594 p.crypt_algo_vpp_id, p.crypt_key,
595 self.vpp_esp_protocol,
598 p.tun_sa_in.add_vpp_config()
600 self.tun = VppGreInterface(self,
603 type=(VppEnum.vl_api_gre_tunnel_type_t.
604 GRE_API_TUNNEL_TYPE_TEB))
605 self.tun.add_vpp_config()
607 p.tun_protect = VppIpsecTunProtect(self,
612 p.tun_protect.add_vpp_config()
615 self.tun.config_ip4()
617 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
618 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
620 self.vapi.cli("clear ipsec sa")
623 self.tun.unconfig_ip4()
624 super(TestIpsecGreTebIfEsp, self).tearDown()
627 class TestIpsecGreIfEsp(TemplateIpsec,
629 """ Ipsec GRE ESP - TUN tests """
630 tun4_encrypt_node_name = "esp4-encrypt-tun"
631 tun4_decrypt_node_name = "esp4-decrypt-tun"
632 encryption_type = ESP
634 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
636 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
637 sa.encrypt(IP(src=self.pg0.remote_ip4,
638 dst=self.pg0.local_ip4) /
640 IP(src=self.pg1.local_ip4,
641 dst=self.pg1.remote_ip4) /
642 UDP(sport=1144, dport=2233) /
643 Raw('X' * payload_size))
644 for i in range(count)]
646 def gen_pkts(self, sw_intf, src, dst, count=1,
648 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
649 IP(src="1.1.1.1", dst="1.1.1.2") /
650 UDP(sport=1144, dport=2233) /
651 Raw('X' * payload_size)
652 for i in range(count)]
654 def verify_decrypted(self, p, rxs):
656 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
657 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
659 def verify_encrypted(self, p, sa, rxs):
662 pkt = sa.decrypt(rx[IP])
663 if not pkt.haslayer(IP):
664 pkt = IP(pkt[Raw].load)
665 self.assert_packet_checksums_valid(pkt)
666 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
667 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
668 self.assertTrue(pkt.haslayer(GRE))
670 self.assertEqual(e[IP].dst, "1.1.1.2")
671 except (IndexError, AssertionError):
672 self.logger.debug(ppp("Unexpected packet:", rx))
674 self.logger.debug(ppp("Decrypted packet:", pkt))
680 super(TestIpsecGreIfEsp, self).setUp()
682 self.tun_if = self.pg0
686 bd1 = VppBridgeDomain(self, 1)
689 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
690 p.auth_algo_vpp_id, p.auth_key,
691 p.crypt_algo_vpp_id, p.crypt_key,
692 self.vpp_esp_protocol,
695 p.tun_sa_out.add_vpp_config()
697 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
698 p.auth_algo_vpp_id, p.auth_key,
699 p.crypt_algo_vpp_id, p.crypt_key,
700 self.vpp_esp_protocol,
703 p.tun_sa_in.add_vpp_config()
705 self.tun = VppGreInterface(self,
708 self.tun.add_vpp_config()
710 p.tun_protect = VppIpsecTunProtect(self,
714 p.tun_protect.add_vpp_config()
717 self.tun.config_ip4()
719 VppIpRoute(self, "1.1.1.2", 32,
720 [VppRoutePath(self.tun.remote_ip4,
721 0xffffffff)]).add_vpp_config()
724 self.tun.unconfig_ip4()
725 super(TestIpsecGreIfEsp, self).tearDown()
728 class TemplateIpsec4TunProtect(object):
729 """ IPsec IPv4 Tunnel protect """
731 encryption_type = ESP
732 tun4_encrypt_node_name = "esp4-encrypt-tun"
733 tun4_decrypt_node_name = "esp4-decrypt-tun"
734 tun4_input_node = "ipsec4-tun-input"
736 def config_sa_tra(self, p):
737 config_tun_params(p, self.encryption_type, self.tun_if)
739 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
740 p.auth_algo_vpp_id, p.auth_key,
741 p.crypt_algo_vpp_id, p.crypt_key,
742 self.vpp_esp_protocol,
744 p.tun_sa_out.add_vpp_config()
746 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
747 p.auth_algo_vpp_id, p.auth_key,
748 p.crypt_algo_vpp_id, p.crypt_key,
749 self.vpp_esp_protocol,
751 p.tun_sa_in.add_vpp_config()
753 def config_sa_tun(self, p):
754 config_tun_params(p, self.encryption_type, self.tun_if)
756 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
757 p.auth_algo_vpp_id, p.auth_key,
758 p.crypt_algo_vpp_id, p.crypt_key,
759 self.vpp_esp_protocol,
760 self.tun_if.remote_addr[p.addr_type],
761 self.tun_if.local_addr[p.addr_type],
763 p.tun_sa_out.add_vpp_config()
765 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
766 p.auth_algo_vpp_id, p.auth_key,
767 p.crypt_algo_vpp_id, p.crypt_key,
768 self.vpp_esp_protocol,
769 self.tun_if.remote_addr[p.addr_type],
770 self.tun_if.local_addr[p.addr_type],
772 p.tun_sa_in.add_vpp_config()
774 def config_protect(self, p):
775 p.tun_protect = VppIpsecTunProtect(self,
779 p.tun_protect.add_vpp_config()
781 def config_network(self, p):
782 p.tun_if = VppIpIpTunInterface(self, self.pg0,
785 p.tun_if.add_vpp_config()
787 p.tun_if.config_ip4()
788 p.tun_if.config_ip6()
790 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
791 [VppRoutePath(p.tun_if.remote_ip4,
793 p.route.add_vpp_config()
794 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
795 [VppRoutePath(p.tun_if.remote_ip6,
797 proto=DpoProto.DPO_PROTO_IP6)])
800 def unconfig_network(self, p):
801 p.route.remove_vpp_config()
802 p.tun_if.remove_vpp_config()
804 def unconfig_protect(self, p):
805 p.tun_protect.remove_vpp_config()
807 def unconfig_sa(self, p):
808 p.tun_sa_out.remove_vpp_config()
809 p.tun_sa_in.remove_vpp_config()
812 class TestIpsec4TunProtect(TemplateIpsec,
813 TemplateIpsec4TunProtect,
815 """ IPsec IPv4 Tunnel protect - transport mode"""
818 super(TestIpsec4TunProtect, self).setUp()
820 self.tun_if = self.pg0
823 super(TestIpsec4TunProtect, self).tearDown()
825 def test_tun_44(self):
826 """IPSEC tunnel protect"""
830 self.config_network(p)
831 self.config_sa_tra(p)
832 self.config_protect(p)
834 self.verify_tun_44(p, count=127)
835 c = p.tun_if.get_rx_stats()
836 self.assertEqual(c['packets'], 127)
837 c = p.tun_if.get_tx_stats()
838 self.assertEqual(c['packets'], 127)
840 self.vapi.cli("clear ipsec sa")
841 self.verify_tun_64(p, count=127)
842 c = p.tun_if.get_rx_stats()
843 self.assertEqual(c['packets'], 254)
844 c = p.tun_if.get_tx_stats()
845 self.assertEqual(c['packets'], 254)
847 # rekey - create new SAs and update the tunnel protection
849 np.crypt_key = 'X' + p.crypt_key[1:]
850 np.scapy_tun_spi += 100
851 np.scapy_tun_sa_id += 1
852 np.vpp_tun_spi += 100
853 np.vpp_tun_sa_id += 1
854 np.tun_if.local_spi = p.vpp_tun_spi
855 np.tun_if.remote_spi = p.scapy_tun_spi
857 self.config_sa_tra(np)
858 self.config_protect(np)
861 self.verify_tun_44(np, count=127)
862 c = p.tun_if.get_rx_stats()
863 self.assertEqual(c['packets'], 381)
864 c = p.tun_if.get_tx_stats()
865 self.assertEqual(c['packets'], 381)
868 self.unconfig_protect(np)
870 self.unconfig_network(p)
873 class TestIpsec4TunProtectUdp(TemplateIpsec,
874 TemplateIpsec4TunProtect,
876 """ IPsec IPv4 Tunnel protect - transport mode"""
879 super(TestIpsec4TunProtectUdp, self).setUp()
881 self.tun_if = self.pg0
884 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
885 IPSEC_API_SAD_FLAG_UDP_ENCAP)
886 p.nat_header = UDP(sport=5454, dport=4500)
887 self.config_network(p)
888 self.config_sa_tra(p)
889 self.config_protect(p)
893 self.unconfig_protect(p)
895 self.unconfig_network(p)
896 super(TestIpsec4TunProtectUdp, self).tearDown()
898 def test_tun_44(self):
899 """IPSEC UDP tunnel protect"""
903 self.verify_tun_44(p, count=127)
904 c = p.tun_if.get_rx_stats()
905 self.assertEqual(c['packets'], 127)
906 c = p.tun_if.get_tx_stats()
907 self.assertEqual(c['packets'], 127)
909 def test_keepalive(self):
910 """ IPSEC NAT Keepalive """
911 self.verify_keepalive(self.ipv4_params)
914 class TestIpsec4TunProtectTun(TemplateIpsec,
915 TemplateIpsec4TunProtect,
917 """ IPsec IPv4 Tunnel protect - tunnel mode"""
919 encryption_type = ESP
920 tun4_encrypt_node_name = "esp4-encrypt-tun"
921 tun4_decrypt_node_name = "esp4-decrypt-tun"
924 super(TestIpsec4TunProtectTun, self).setUp()
926 self.tun_if = self.pg0
929 super(TestIpsec4TunProtectTun, self).tearDown()
931 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
933 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
934 sa.encrypt(IP(src=sw_intf.remote_ip4,
935 dst=sw_intf.local_ip4) /
936 IP(src=src, dst=dst) /
937 UDP(sport=1144, dport=2233) /
938 Raw('X' * payload_size))
939 for i in range(count)]
941 def gen_pkts(self, sw_intf, src, dst, count=1,
943 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
944 IP(src=src, dst=dst) /
945 UDP(sport=1144, dport=2233) /
946 Raw('X' * payload_size)
947 for i in range(count)]
949 def verify_decrypted(self, p, rxs):
951 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
952 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
953 self.assert_packet_checksums_valid(rx)
955 def verify_encrypted(self, p, sa, rxs):
958 pkt = sa.decrypt(rx[IP])
959 if not pkt.haslayer(IP):
960 pkt = IP(pkt[Raw].load)
961 self.assert_packet_checksums_valid(pkt)
962 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
963 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
964 inner = pkt[IP].payload
965 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
967 except (IndexError, AssertionError):
968 self.logger.debug(ppp("Unexpected packet:", rx))
970 self.logger.debug(ppp("Decrypted packet:", pkt))
975 def test_tun_44(self):
976 """IPSEC tunnel protect """
980 self.config_network(p)
981 self.config_sa_tun(p)
982 self.config_protect(p)
984 self.verify_tun_44(p, count=127)
986 c = p.tun_if.get_rx_stats()
987 self.assertEqual(c['packets'], 127)
988 c = p.tun_if.get_tx_stats()
989 self.assertEqual(c['packets'], 127)
991 # rekey - create new SAs and update the tunnel protection
993 np.crypt_key = 'X' + p.crypt_key[1:]
994 np.scapy_tun_spi += 100
995 np.scapy_tun_sa_id += 1
996 np.vpp_tun_spi += 100
997 np.vpp_tun_sa_id += 1
998 np.tun_if.local_spi = p.vpp_tun_spi
999 np.tun_if.remote_spi = p.scapy_tun_spi
1001 self.config_sa_tun(np)
1002 self.config_protect(np)
1005 self.verify_tun_44(np, count=127)
1006 c = p.tun_if.get_rx_stats()
1007 self.assertEqual(c['packets'], 254)
1008 c = p.tun_if.get_tx_stats()
1009 self.assertEqual(c['packets'], 254)
1012 self.unconfig_protect(np)
1013 self.unconfig_sa(np)
1014 self.unconfig_network(p)
1017 class TemplateIpsec6TunProtect(object):
1018 """ IPsec IPv6 Tunnel protect """
1020 def config_sa_tra(self, p):
1021 config_tun_params(p, self.encryption_type, self.tun_if)
1023 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1024 p.auth_algo_vpp_id, p.auth_key,
1025 p.crypt_algo_vpp_id, p.crypt_key,
1026 self.vpp_esp_protocol)
1027 p.tun_sa_out.add_vpp_config()
1029 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1030 p.auth_algo_vpp_id, p.auth_key,
1031 p.crypt_algo_vpp_id, p.crypt_key,
1032 self.vpp_esp_protocol)
1033 p.tun_sa_in.add_vpp_config()
1035 def config_sa_tun(self, p):
1036 config_tun_params(p, self.encryption_type, self.tun_if)
1038 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1039 p.auth_algo_vpp_id, p.auth_key,
1040 p.crypt_algo_vpp_id, p.crypt_key,
1041 self.vpp_esp_protocol,
1042 self.tun_if.remote_addr[p.addr_type],
1043 self.tun_if.local_addr[p.addr_type])
1044 p.tun_sa_out.add_vpp_config()
1046 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1047 p.auth_algo_vpp_id, p.auth_key,
1048 p.crypt_algo_vpp_id, p.crypt_key,
1049 self.vpp_esp_protocol,
1050 self.tun_if.remote_addr[p.addr_type],
1051 self.tun_if.local_addr[p.addr_type])
1052 p.tun_sa_in.add_vpp_config()
1054 def config_protect(self, p):
1055 p.tun_protect = VppIpsecTunProtect(self,
1059 p.tun_protect.add_vpp_config()
1061 def config_network(self, p):
1062 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1064 self.pg0.remote_ip6)
1065 p.tun_if.add_vpp_config()
1067 p.tun_if.config_ip6()
1068 p.tun_if.config_ip4()
1070 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1071 [VppRoutePath(p.tun_if.remote_ip6,
1073 proto=DpoProto.DPO_PROTO_IP6)])
1074 p.route.add_vpp_config()
1075 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1076 [VppRoutePath(p.tun_if.remote_ip4,
1080 def unconfig_network(self, p):
1081 p.route.remove_vpp_config()
1082 p.tun_if.remove_vpp_config()
1084 def unconfig_protect(self, p):
1085 p.tun_protect.remove_vpp_config()
1087 def unconfig_sa(self, p):
1088 p.tun_sa_out.remove_vpp_config()
1089 p.tun_sa_in.remove_vpp_config()
1092 class TestIpsec6TunProtect(TemplateIpsec,
1093 TemplateIpsec6TunProtect,
1095 """ IPsec IPv6 Tunnel protect - transport mode"""
1097 encryption_type = ESP
1098 tun6_encrypt_node_name = "esp6-encrypt-tun"
1099 tun6_decrypt_node_name = "esp6-decrypt-tun"
1102 super(TestIpsec6TunProtect, self).setUp()
1104 self.tun_if = self.pg0
1107 super(TestIpsec6TunProtect, self).tearDown()
1109 def test_tun_66(self):
1110 """IPSEC tunnel protect"""
1112 p = self.ipv6_params
1114 self.config_network(p)
1115 self.config_sa_tra(p)
1116 self.config_protect(p)
1118 self.verify_tun_66(p, count=127)
1119 c = p.tun_if.get_rx_stats()
1120 self.assertEqual(c['packets'], 127)
1121 c = p.tun_if.get_tx_stats()
1122 self.assertEqual(c['packets'], 127)
1124 # rekey - create new SAs and update the tunnel protection
1126 np.crypt_key = 'X' + p.crypt_key[1:]
1127 np.scapy_tun_spi += 100
1128 np.scapy_tun_sa_id += 1
1129 np.vpp_tun_spi += 100
1130 np.vpp_tun_sa_id += 1
1131 np.tun_if.local_spi = p.vpp_tun_spi
1132 np.tun_if.remote_spi = p.scapy_tun_spi
1134 self.config_sa_tra(np)
1135 self.config_protect(np)
1138 self.verify_tun_66(np, count=127)
1139 c = p.tun_if.get_rx_stats()
1140 self.assertEqual(c['packets'], 254)
1141 c = p.tun_if.get_tx_stats()
1142 self.assertEqual(c['packets'], 254)
1145 # 1) add two input SAs [old, new]
1146 # 2) swap output SA to [new]
1147 # 3) use only [new] input SA
1149 np3.crypt_key = 'Z' + p.crypt_key[1:]
1150 np3.scapy_tun_spi += 100
1151 np3.scapy_tun_sa_id += 1
1152 np3.vpp_tun_spi += 100
1153 np3.vpp_tun_sa_id += 1
1154 np3.tun_if.local_spi = p.vpp_tun_spi
1155 np3.tun_if.remote_spi = p.scapy_tun_spi
1157 self.config_sa_tra(np3)
1160 p.tun_protect.update_vpp_config(np.tun_sa_out,
1161 [np.tun_sa_in, np3.tun_sa_in])
1162 self.verify_tun_66(np, np, count=127)
1163 self.verify_tun_66(np3, np, count=127)
1166 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1167 [np.tun_sa_in, np3.tun_sa_in])
1168 self.verify_tun_66(np, np3, count=127)
1169 self.verify_tun_66(np3, np3, count=127)
1172 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1174 self.verify_tun_66(np3, np3, count=127)
1175 self.verify_drop_tun_66(np, count=127)
1177 c = p.tun_if.get_rx_stats()
1178 self.assertEqual(c['packets'], 127*7)
1179 c = p.tun_if.get_tx_stats()
1180 self.assertEqual(c['packets'], 127*7)
1181 self.unconfig_sa(np)
1184 self.unconfig_protect(np3)
1185 self.unconfig_sa(np3)
1186 self.unconfig_network(p)
1188 def test_tun_46(self):
1189 """IPSEC tunnel protect"""
1191 p = self.ipv6_params
1193 self.config_network(p)
1194 self.config_sa_tra(p)
1195 self.config_protect(p)
1197 self.verify_tun_46(p, count=127)
1198 c = p.tun_if.get_rx_stats()
1199 self.assertEqual(c['packets'], 127)
1200 c = p.tun_if.get_tx_stats()
1201 self.assertEqual(c['packets'], 127)
1204 self.unconfig_protect(p)
1206 self.unconfig_network(p)
1209 class TestIpsec6TunProtectTun(TemplateIpsec,
1210 TemplateIpsec6TunProtect,
1212 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1214 encryption_type = ESP
1215 tun6_encrypt_node_name = "esp6-encrypt-tun"
1216 tun6_decrypt_node_name = "esp6-decrypt-tun"
1219 super(TestIpsec6TunProtectTun, self).setUp()
1221 self.tun_if = self.pg0
1224 super(TestIpsec6TunProtectTun, self).tearDown()
1226 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1228 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1229 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1230 dst=sw_intf.local_ip6) /
1231 IPv6(src=src, dst=dst) /
1232 UDP(sport=1166, dport=2233) /
1233 Raw('X' * payload_size))
1234 for i in range(count)]
1236 def gen_pkts6(self, sw_intf, src, dst, count=1,
1238 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1239 IPv6(src=src, dst=dst) /
1240 UDP(sport=1166, dport=2233) /
1241 Raw('X' * payload_size)
1242 for i in range(count)]
1244 def verify_decrypted6(self, p, rxs):
1246 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1247 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1248 self.assert_packet_checksums_valid(rx)
1250 def verify_encrypted6(self, p, sa, rxs):
1253 pkt = sa.decrypt(rx[IPv6])
1254 if not pkt.haslayer(IPv6):
1255 pkt = IPv6(pkt[Raw].load)
1256 self.assert_packet_checksums_valid(pkt)
1257 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1258 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1259 inner = pkt[IPv6].payload
1260 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1262 except (IndexError, AssertionError):
1263 self.logger.debug(ppp("Unexpected packet:", rx))
1265 self.logger.debug(ppp("Decrypted packet:", pkt))
1270 def test_tun_66(self):
1271 """IPSEC tunnel protect """
1273 p = self.ipv6_params
1275 self.config_network(p)
1276 self.config_sa_tun(p)
1277 self.config_protect(p)
1279 self.verify_tun_66(p, count=127)
1281 c = p.tun_if.get_rx_stats()
1282 self.assertEqual(c['packets'], 127)
1283 c = p.tun_if.get_tx_stats()
1284 self.assertEqual(c['packets'], 127)
1286 # rekey - create new SAs and update the tunnel protection
1288 np.crypt_key = 'X' + p.crypt_key[1:]
1289 np.scapy_tun_spi += 100
1290 np.scapy_tun_sa_id += 1
1291 np.vpp_tun_spi += 100
1292 np.vpp_tun_sa_id += 1
1293 np.tun_if.local_spi = p.vpp_tun_spi
1294 np.tun_if.remote_spi = p.scapy_tun_spi
1296 self.config_sa_tun(np)
1297 self.config_protect(np)
1300 self.verify_tun_66(np, count=127)
1301 c = p.tun_if.get_rx_stats()
1302 self.assertEqual(c['packets'], 254)
1303 c = p.tun_if.get_tx_stats()
1304 self.assertEqual(c['packets'], 254)
1307 self.unconfig_protect(np)
1308 self.unconfig_sa(np)
1309 self.unconfig_network(p)
1312 if __name__ == '__main__':
1313 unittest.main(testRunner=VppTestRunner)