5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from vpp_papi import VppEnum
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23 """ IPsec tunnel interface tests """
29 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
32 def tearDownClass(cls):
33 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
36 super(TemplateIpsec4TunIfEsp, self).setUp()
38 self.tun_if = self.pg0
42 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43 p.scapy_tun_spi, p.crypt_algo_vpp_id,
44 p.crypt_key, p.crypt_key,
45 p.auth_algo_vpp_id, p.auth_key,
47 p.tun_if.add_vpp_config()
52 r = VppIpRoute(self, p.remote_tun_if_host, 32,
53 [VppRoutePath(p.tun_if.remote_ip4,
56 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57 [VppRoutePath(p.tun_if.remote_ip6,
59 proto=DpoProto.DPO_PROTO_IP6)])
63 super(TemplateIpsec4TunIfEsp, self).tearDown()
66 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
67 """ IPsec UDP tunnel interface tests """
69 tun4_encrypt_node_name = "esp4-encrypt-tun"
70 tun4_decrypt_node_name = "esp4-decrypt"
75 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
78 def tearDownClass(cls):
79 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
82 super(TemplateIpsec4TunIfEspUdp, self).setUp()
84 self.tun_if = self.pg0
87 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
88 IPSEC_API_SAD_FLAG_UDP_ENCAP)
89 p.nat_header = UDP(sport=5454, dport=4500)
91 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
92 p.scapy_tun_spi, p.crypt_algo_vpp_id,
93 p.crypt_key, p.crypt_key,
94 p.auth_algo_vpp_id, p.auth_key,
95 p.auth_key, udp_encap=True)
96 p.tun_if.add_vpp_config()
101 r = VppIpRoute(self, p.remote_tun_if_host, 32,
102 [VppRoutePath(p.tun_if.remote_ip4,
105 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106 [VppRoutePath(p.tun_if.remote_ip6,
108 proto=DpoProto.DPO_PROTO_IP6)])
112 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
115 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
116 """ Ipsec ESP - TUN tests """
117 tun4_encrypt_node_name = "esp4-encrypt-tun"
118 tun4_decrypt_node_name = "esp4-decrypt"
120 def test_tun_basic64(self):
121 """ ipsec 6o4 tunnel basic test """
122 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
124 self.verify_tun_64(self.params[socket.AF_INET], count=1)
126 def test_tun_burst64(self):
127 """ ipsec 6o4 tunnel basic test """
128 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
130 self.verify_tun_64(self.params[socket.AF_INET], count=257)
132 def test_tun_basic_frag44(self):
133 """ ipsec 4o4 tunnel frag basic test """
134 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
138 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
140 self.verify_tun_44(self.params[socket.AF_INET],
141 count=1, payload_size=1800, n_rx=2)
142 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
146 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
147 """ Ipsec ESP UDP tests """
149 tun4_input_node = "ipsec4-if-input"
151 def test_keepalive(self):
152 """ IPSEC NAT Keepalive """
153 self.verify_keepalive(self.ipv4_params)
156 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
157 """ Ipsec ESP - TCP tests """
161 class TemplateIpsec6TunIfEsp(TemplateIpsec):
162 """ IPsec tunnel interface tests """
164 encryption_type = ESP
167 super(TemplateIpsec6TunIfEsp, self).setUp()
169 self.tun_if = self.pg0
172 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173 p.scapy_tun_spi, p.crypt_algo_vpp_id,
174 p.crypt_key, p.crypt_key,
175 p.auth_algo_vpp_id, p.auth_key,
176 p.auth_key, is_ip6=True)
177 tun_if.add_vpp_config()
182 r = VppIpRoute(self, p.remote_tun_if_host, 128,
183 [VppRoutePath(tun_if.remote_ip6,
185 proto=DpoProto.DPO_PROTO_IP6)])
187 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
188 [VppRoutePath(tun_if.remote_ip4,
193 super(TemplateIpsec6TunIfEsp, self).tearDown()
196 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
197 """ Ipsec ESP - TUN tests """
198 tun6_encrypt_node_name = "esp6-encrypt-tun"
199 tun6_decrypt_node_name = "esp6-decrypt"
201 def test_tun_basic46(self):
202 """ ipsec 4o6 tunnel basic test """
203 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
204 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
206 def test_tun_burst46(self):
207 """ ipsec 4o6 tunnel burst test """
208 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
209 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
212 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
213 """ IPsec IPv4 Multi Tunnel interface """
215 encryption_type = ESP
216 tun4_encrypt_node_name = "esp4-encrypt-tun"
217 tun4_decrypt_node_name = "esp4-decrypt"
220 super(TestIpsec4MultiTunIfEsp, self).setUp()
222 self.tun_if = self.pg0
224 self.multi_params = []
227 p = copy.copy(self.ipv4_params)
229 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
230 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
231 p.scapy_tun_spi = p.scapy_tun_spi + ii
232 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
233 p.vpp_tun_spi = p.vpp_tun_spi + ii
235 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
236 p.scapy_tra_spi = p.scapy_tra_spi + ii
237 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
238 p.vpp_tra_spi = p.vpp_tra_spi + ii
240 config_tun_params(p, self.encryption_type, self.tun_if)
241 self.multi_params.append(p)
243 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
246 p.crypt_key, p.crypt_key,
247 p.auth_algo_vpp_id, p.auth_key,
249 p.tun_if.add_vpp_config()
251 p.tun_if.config_ip4()
253 VppIpRoute(self, p.remote_tun_if_host, 32,
254 [VppRoutePath(p.tun_if.remote_ip4,
255 0xffffffff)]).add_vpp_config()
258 super(TestIpsec4MultiTunIfEsp, self).tearDown()
260 def test_tun_44(self):
261 """Multiple IPSEC tunnel interfaces """
262 for p in self.multi_params:
263 self.verify_tun_44(p, count=127)
264 c = p.tun_if.get_rx_stats()
265 self.assertEqual(c['packets'], 127)
266 c = p.tun_if.get_tx_stats()
267 self.assertEqual(c['packets'], 127)
270 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
271 """ IPsec IPv4 Tunnel interface all Algos """
273 encryption_type = ESP
274 tun4_encrypt_node_name = "esp4-encrypt-tun"
275 tun4_decrypt_node_name = "esp4-decrypt"
277 def config_network(self, p):
278 config_tun_params(p, self.encryption_type, self.tun_if)
280 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
283 p.crypt_key, p.crypt_key,
284 p.auth_algo_vpp_id, p.auth_key,
287 p.tun_if.add_vpp_config()
289 p.tun_if.config_ip4()
290 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
291 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
293 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
294 [VppRoutePath(p.tun_if.remote_ip4,
296 p.route.add_vpp_config()
298 def unconfig_network(self, p):
299 p.tun_if.unconfig_ip4()
300 p.tun_if.remove_vpp_config()
301 p.route.remove_vpp_config()
304 super(TestIpsec4TunIfEspAll, self).setUp()
306 self.tun_if = self.pg0
309 super(TestIpsec4TunIfEspAll, self).tearDown()
313 # change the key and the SPI
315 p.crypt_key = 'X' + p.crypt_key[1:]
317 p.scapy_tun_sa_id += 1
320 p.tun_if.local_spi = p.vpp_tun_spi
321 p.tun_if.remote_spi = p.scapy_tun_spi
323 config_tun_params(p, self.encryption_type, self.tun_if)
325 p.tun_sa_in = VppIpsecSA(self,
332 self.vpp_esp_protocol,
333 self.tun_if.local_addr[p.addr_type],
334 self.tun_if.remote_addr[p.addr_type],
337 p.tun_sa_out = VppIpsecSA(self,
344 self.vpp_esp_protocol,
345 self.tun_if.remote_addr[p.addr_type],
346 self.tun_if.local_addr[p.addr_type],
349 p.tun_sa_in.add_vpp_config()
350 p.tun_sa_out.add_vpp_config()
352 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
353 sa_id=p.tun_sa_in.id,
355 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
356 sa_id=p.tun_sa_out.id,
358 self.logger.info(self.vapi.cli("sh ipsec sa"))
360 def test_tun_44(self):
361 """IPSEC tunnel all algos """
363 # foreach VPP crypto engine
364 engines = ["ia32", "ipsecmb", "openssl"]
366 # foreach crypto algorithm
367 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
368 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
369 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
370 IPSEC_API_INTEG_ALG_NONE),
371 'scapy-crypto': "AES-GCM",
372 'scapy-integ': "NULL",
373 'key': "JPjyOWBeVEQiMe7h",
375 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
376 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
377 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
378 IPSEC_API_INTEG_ALG_NONE),
379 'scapy-crypto': "AES-GCM",
380 'scapy-integ': "NULL",
381 'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
383 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
384 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
385 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
386 IPSEC_API_INTEG_ALG_NONE),
387 'scapy-crypto': "AES-GCM",
388 'scapy-integ': "NULL",
389 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
391 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
392 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
393 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
394 IPSEC_API_INTEG_ALG_SHA1_96),
395 'scapy-crypto': "AES-CBC",
396 'scapy-integ': "HMAC-SHA1-96",
398 'key': "JPjyOWBeVEQiMe7h"},
399 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
400 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
401 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
402 IPSEC_API_INTEG_ALG_SHA1_96),
403 'scapy-crypto': "AES-CBC",
404 'scapy-integ': "HMAC-SHA1-96",
406 'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
407 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
408 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
409 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
410 IPSEC_API_INTEG_ALG_SHA1_96),
411 'scapy-crypto': "AES-CBC",
412 'scapy-integ': "HMAC-SHA1-96",
414 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
415 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
416 IPSEC_API_CRYPTO_ALG_NONE),
417 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
418 IPSEC_API_INTEG_ALG_SHA1_96),
419 'scapy-crypto': "NULL",
420 'scapy-integ': "HMAC-SHA1-96",
422 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
424 for engine in engines:
425 self.vapi.cli("set crypto handler all %s" % engine)
428 # loop through each of the algorithms
431 # with self.subTest(algo=algo['scapy']):
433 p = copy.copy(self.ipv4_params)
434 p.auth_algo_vpp_id = algo['vpp-integ']
435 p.crypt_algo_vpp_id = algo['vpp-crypto']
436 p.crypt_algo = algo['scapy-crypto']
437 p.auth_algo = algo['scapy-integ']
438 p.crypt_key = algo['key']
439 p.salt = algo['salt']
441 self.config_network(p)
443 self.verify_tun_44(p, count=127)
444 c = p.tun_if.get_rx_stats()
445 self.assertEqual(c['packets'], 127)
446 c = p.tun_if.get_tx_stats()
447 self.assertEqual(c['packets'], 127)
453 self.verify_tun_44(p, count=127)
455 self.unconfig_network(p)
456 p.tun_sa_out.remove_vpp_config()
457 p.tun_sa_in.remove_vpp_config()
460 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
461 """ IPsec IPv6 Multi Tunnel interface """
463 encryption_type = ESP
464 tun6_encrypt_node_name = "esp6-encrypt-tun"
465 tun6_decrypt_node_name = "esp6-decrypt"
468 super(TestIpsec6MultiTunIfEsp, self).setUp()
470 self.tun_if = self.pg0
472 self.multi_params = []
475 p = copy.copy(self.ipv6_params)
477 p.remote_tun_if_host = "1111::%d" % (ii + 1)
478 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
479 p.scapy_tun_spi = p.scapy_tun_spi + ii
480 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
481 p.vpp_tun_spi = p.vpp_tun_spi + ii
483 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
484 p.scapy_tra_spi = p.scapy_tra_spi + ii
485 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
486 p.vpp_tra_spi = p.vpp_tra_spi + ii
488 config_tun_params(p, self.encryption_type, self.tun_if)
489 self.multi_params.append(p)
491 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
494 p.crypt_key, p.crypt_key,
495 p.auth_algo_vpp_id, p.auth_key,
496 p.auth_key, is_ip6=True)
497 p.tun_if.add_vpp_config()
499 p.tun_if.config_ip6()
501 r = VppIpRoute(self, p.remote_tun_if_host, 128,
502 [VppRoutePath(p.tun_if.remote_ip6,
504 proto=DpoProto.DPO_PROTO_IP6)])
508 super(TestIpsec6MultiTunIfEsp, self).tearDown()
510 def test_tun_66(self):
511 """Multiple IPSEC tunnel interfaces """
512 for p in self.multi_params:
513 self.verify_tun_66(p, count=127)
514 c = p.tun_if.get_rx_stats()
515 self.assertEqual(c['packets'], 127)
516 c = p.tun_if.get_tx_stats()
517 self.assertEqual(c['packets'], 127)
520 class TestIpsecGreTebIfEsp(TemplateIpsec,
522 """ Ipsec GRE TEB ESP - TUN tests """
523 tun4_encrypt_node_name = "esp4-encrypt-tun"
524 tun4_decrypt_node_name = "esp4-decrypt-tun"
525 encryption_type = ESP
526 omac = "00:11:22:33:44:55"
528 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
530 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
531 sa.encrypt(IP(src=self.pg0.remote_ip4,
532 dst=self.pg0.local_ip4) /
534 Ether(dst=self.omac) /
535 IP(src="1.1.1.1", dst="1.1.1.2") /
536 UDP(sport=1144, dport=2233) /
537 Raw('X' * payload_size))
538 for i in range(count)]
540 def gen_pkts(self, sw_intf, src, dst, count=1,
542 return [Ether(dst=self.omac) /
543 IP(src="1.1.1.1", dst="1.1.1.2") /
544 UDP(sport=1144, dport=2233) /
545 Raw('X' * payload_size)
546 for i in range(count)]
548 def verify_decrypted(self, p, rxs):
550 self.assert_equal(rx[Ether].dst, self.omac)
551 self.assert_equal(rx[IP].dst, "1.1.1.2")
553 def verify_encrypted(self, p, sa, rxs):
556 pkt = sa.decrypt(rx[IP])
557 if not pkt.haslayer(IP):
558 pkt = IP(pkt[Raw].load)
559 self.assert_packet_checksums_valid(pkt)
560 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
561 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
562 self.assertTrue(pkt.haslayer(GRE))
564 self.assertEqual(e[Ether].dst, self.omac)
565 self.assertEqual(e[IP].dst, "1.1.1.2")
566 except (IndexError, AssertionError):
567 self.logger.debug(ppp("Unexpected packet:", rx))
569 self.logger.debug(ppp("Decrypted packet:", pkt))
575 super(TestIpsecGreTebIfEsp, self).setUp()
577 self.tun_if = self.pg0
581 bd1 = VppBridgeDomain(self, 1)
584 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
585 p.auth_algo_vpp_id, p.auth_key,
586 p.crypt_algo_vpp_id, p.crypt_key,
587 self.vpp_esp_protocol,
590 p.tun_sa_out.add_vpp_config()
592 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
593 p.auth_algo_vpp_id, p.auth_key,
594 p.crypt_algo_vpp_id, p.crypt_key,
595 self.vpp_esp_protocol,
598 p.tun_sa_in.add_vpp_config()
600 self.tun = VppGreInterface(self,
603 type=(VppEnum.vl_api_gre_tunnel_type_t.
604 GRE_API_TUNNEL_TYPE_TEB))
605 self.tun.add_vpp_config()
607 p.tun_protect = VppIpsecTunProtect(self,
612 p.tun_protect.add_vpp_config()
615 self.tun.config_ip4()
617 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
618 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
620 self.vapi.cli("clear ipsec sa")
623 self.tun.unconfig_ip4()
624 super(TestIpsecGreTebIfEsp, self).tearDown()
627 class TestIpsecGreIfEsp(TemplateIpsec,
629 """ Ipsec GRE ESP - TUN tests """
630 tun4_encrypt_node_name = "esp4-encrypt-tun"
631 tun4_decrypt_node_name = "esp4-decrypt-tun"
632 encryption_type = ESP
634 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
636 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
637 sa.encrypt(IP(src=self.pg0.remote_ip4,
638 dst=self.pg0.local_ip4) /
640 IP(src=self.pg1.local_ip4,
641 dst=self.pg1.remote_ip4) /
642 UDP(sport=1144, dport=2233) /
643 Raw('X' * payload_size))
644 for i in range(count)]
646 def gen_pkts(self, sw_intf, src, dst, count=1,
648 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
649 IP(src="1.1.1.1", dst="1.1.1.2") /
650 UDP(sport=1144, dport=2233) /
651 Raw('X' * payload_size)
652 for i in range(count)]
654 def verify_decrypted(self, p, rxs):
656 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
657 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
659 def verify_encrypted(self, p, sa, rxs):
662 pkt = sa.decrypt(rx[IP])
663 if not pkt.haslayer(IP):
664 pkt = IP(pkt[Raw].load)
665 self.assert_packet_checksums_valid(pkt)
666 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
667 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
668 self.assertTrue(pkt.haslayer(GRE))
670 self.assertEqual(e[IP].dst, "1.1.1.2")
671 except (IndexError, AssertionError):
672 self.logger.debug(ppp("Unexpected packet:", rx))
674 self.logger.debug(ppp("Decrypted packet:", pkt))
680 super(TestIpsecGreIfEsp, self).setUp()
682 self.tun_if = self.pg0
686 bd1 = VppBridgeDomain(self, 1)
689 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
690 p.auth_algo_vpp_id, p.auth_key,
691 p.crypt_algo_vpp_id, p.crypt_key,
692 self.vpp_esp_protocol,
695 p.tun_sa_out.add_vpp_config()
697 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
698 p.auth_algo_vpp_id, p.auth_key,
699 p.crypt_algo_vpp_id, p.crypt_key,
700 self.vpp_esp_protocol,
703 p.tun_sa_in.add_vpp_config()
705 self.tun = VppGreInterface(self,
708 self.tun.add_vpp_config()
710 p.tun_protect = VppIpsecTunProtect(self,
714 p.tun_protect.add_vpp_config()
717 self.tun.config_ip4()
719 VppIpRoute(self, "1.1.1.2", 32,
720 [VppRoutePath(self.tun.remote_ip4,
721 0xffffffff)]).add_vpp_config()
724 self.tun.unconfig_ip4()
725 super(TestIpsecGreIfEsp, self).tearDown()
728 class TemplateIpsec4TunProtect(object):
729 """ IPsec IPv4 Tunnel protect """
731 encryption_type = ESP
732 tun4_encrypt_node_name = "esp4-encrypt-tun"
733 tun4_decrypt_node_name = "esp4-decrypt-tun"
734 tun4_input_node = "ipsec4-tun-input"
736 def config_sa_tra(self, p):
737 config_tun_params(p, self.encryption_type, self.tun_if)
739 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
740 p.auth_algo_vpp_id, p.auth_key,
741 p.crypt_algo_vpp_id, p.crypt_key,
742 self.vpp_esp_protocol,
744 p.tun_sa_out.add_vpp_config()
746 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
747 p.auth_algo_vpp_id, p.auth_key,
748 p.crypt_algo_vpp_id, p.crypt_key,
749 self.vpp_esp_protocol,
751 p.tun_sa_in.add_vpp_config()
753 def config_sa_tun(self, p):
754 config_tun_params(p, self.encryption_type, self.tun_if)
756 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
757 p.auth_algo_vpp_id, p.auth_key,
758 p.crypt_algo_vpp_id, p.crypt_key,
759 self.vpp_esp_protocol,
760 self.tun_if.remote_addr[p.addr_type],
761 self.tun_if.local_addr[p.addr_type],
763 p.tun_sa_out.add_vpp_config()
765 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
766 p.auth_algo_vpp_id, p.auth_key,
767 p.crypt_algo_vpp_id, p.crypt_key,
768 self.vpp_esp_protocol,
769 self.tun_if.remote_addr[p.addr_type],
770 self.tun_if.local_addr[p.addr_type],
772 p.tun_sa_in.add_vpp_config()
774 def config_protect(self, p):
775 p.tun_protect = VppIpsecTunProtect(self,
779 p.tun_protect.add_vpp_config()
781 def config_network(self, p):
782 p.tun_if = VppIpIpTunInterface(self, self.pg0,
785 p.tun_if.add_vpp_config()
787 p.tun_if.config_ip4()
789 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
790 [VppRoutePath(p.tun_if.remote_ip4,
792 p.route.add_vpp_config()
794 def unconfig_network(self, p):
795 p.route.remove_vpp_config()
796 p.tun_if.remove_vpp_config()
798 def unconfig_protect(self, p):
799 p.tun_protect.remove_vpp_config()
801 def unconfig_sa(self, p):
802 p.tun_sa_out.remove_vpp_config()
803 p.tun_sa_in.remove_vpp_config()
806 class TestIpsec4TunProtect(TemplateIpsec,
807 TemplateIpsec4TunProtect,
809 """ IPsec IPv4 Tunnel protect - transport mode"""
812 super(TestIpsec4TunProtect, self).setUp()
814 self.tun_if = self.pg0
817 super(TestIpsec4TunProtect, self).tearDown()
819 def test_tun_44(self):
820 """IPSEC tunnel protect"""
824 self.config_network(p)
825 self.config_sa_tra(p)
826 self.config_protect(p)
828 self.verify_tun_44(p, count=127)
829 c = p.tun_if.get_rx_stats()
830 self.assertEqual(c['packets'], 127)
831 c = p.tun_if.get_tx_stats()
832 self.assertEqual(c['packets'], 127)
834 # rekey - create new SAs and update the tunnel protection
836 np.crypt_key = 'X' + p.crypt_key[1:]
837 np.scapy_tun_spi += 100
838 np.scapy_tun_sa_id += 1
839 np.vpp_tun_spi += 100
840 np.vpp_tun_sa_id += 1
841 np.tun_if.local_spi = p.vpp_tun_spi
842 np.tun_if.remote_spi = p.scapy_tun_spi
844 self.config_sa_tra(np)
845 self.config_protect(np)
848 self.verify_tun_44(np, count=127)
849 c = p.tun_if.get_rx_stats()
850 self.assertEqual(c['packets'], 254)
851 c = p.tun_if.get_tx_stats()
852 self.assertEqual(c['packets'], 254)
855 self.unconfig_protect(np)
857 self.unconfig_network(p)
860 class TestIpsec4TunProtectUdp(TemplateIpsec,
861 TemplateIpsec4TunProtect,
863 """ IPsec IPv4 Tunnel protect - transport mode"""
866 super(TestIpsec4TunProtectUdp, self).setUp()
868 self.tun_if = self.pg0
871 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
872 IPSEC_API_SAD_FLAG_UDP_ENCAP)
873 p.nat_header = UDP(sport=5454, dport=4500)
874 self.config_network(p)
875 self.config_sa_tra(p)
876 self.config_protect(p)
880 self.unconfig_protect(p)
882 self.unconfig_network(p)
883 super(TestIpsec4TunProtectUdp, self).tearDown()
885 def test_tun_44(self):
886 """IPSEC UDP tunnel protect"""
890 self.verify_tun_44(p, count=127)
891 c = p.tun_if.get_rx_stats()
892 self.assertEqual(c['packets'], 127)
893 c = p.tun_if.get_tx_stats()
894 self.assertEqual(c['packets'], 127)
896 def test_keepalive(self):
897 """ IPSEC NAT Keepalive """
898 self.verify_keepalive(self.ipv4_params)
901 class TestIpsec4TunProtectTun(TemplateIpsec,
902 TemplateIpsec4TunProtect,
904 """ IPsec IPv4 Tunnel protect - tunnel mode"""
906 encryption_type = ESP
907 tun4_encrypt_node_name = "esp4-encrypt-tun"
908 tun4_decrypt_node_name = "esp4-decrypt-tun"
911 super(TestIpsec4TunProtectTun, self).setUp()
913 self.tun_if = self.pg0
916 super(TestIpsec4TunProtectTun, self).tearDown()
918 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
920 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
921 sa.encrypt(IP(src=sw_intf.remote_ip4,
922 dst=sw_intf.local_ip4) /
923 IP(src=src, dst=dst) /
924 UDP(sport=1144, dport=2233) /
925 Raw('X' * payload_size))
926 for i in range(count)]
928 def gen_pkts(self, sw_intf, src, dst, count=1,
930 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
931 IP(src=src, dst=dst) /
932 UDP(sport=1144, dport=2233) /
933 Raw('X' * payload_size)
934 for i in range(count)]
936 def verify_decrypted(self, p, rxs):
938 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
939 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
940 self.assert_packet_checksums_valid(rx)
942 def verify_encrypted(self, p, sa, rxs):
945 pkt = sa.decrypt(rx[IP])
946 if not pkt.haslayer(IP):
947 pkt = IP(pkt[Raw].load)
948 self.assert_packet_checksums_valid(pkt)
949 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
950 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
951 inner = pkt[IP].payload
952 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
954 except (IndexError, AssertionError):
955 self.logger.debug(ppp("Unexpected packet:", rx))
957 self.logger.debug(ppp("Decrypted packet:", pkt))
962 def test_tun_44(self):
963 """IPSEC tunnel protect """
967 self.config_network(p)
968 self.config_sa_tun(p)
969 self.config_protect(p)
971 self.verify_tun_44(p, count=127)
973 c = p.tun_if.get_rx_stats()
974 self.assertEqual(c['packets'], 127)
975 c = p.tun_if.get_tx_stats()
976 self.assertEqual(c['packets'], 127)
978 # rekey - create new SAs and update the tunnel protection
980 np.crypt_key = 'X' + p.crypt_key[1:]
981 np.scapy_tun_spi += 100
982 np.scapy_tun_sa_id += 1
983 np.vpp_tun_spi += 100
984 np.vpp_tun_sa_id += 1
985 np.tun_if.local_spi = p.vpp_tun_spi
986 np.tun_if.remote_spi = p.scapy_tun_spi
988 self.config_sa_tun(np)
989 self.config_protect(np)
992 self.verify_tun_44(np, count=127)
993 c = p.tun_if.get_rx_stats()
994 self.assertEqual(c['packets'], 254)
995 c = p.tun_if.get_tx_stats()
996 self.assertEqual(c['packets'], 254)
999 self.unconfig_protect(np)
1000 self.unconfig_sa(np)
1001 self.unconfig_network(p)
1004 class TemplateIpsec6TunProtect(object):
1005 """ IPsec IPv6 Tunnel protect """
1007 def config_sa_tra(self, p):
1008 config_tun_params(p, self.encryption_type, self.tun_if)
1010 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1011 p.auth_algo_vpp_id, p.auth_key,
1012 p.crypt_algo_vpp_id, p.crypt_key,
1013 self.vpp_esp_protocol)
1014 p.tun_sa_out.add_vpp_config()
1016 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1017 p.auth_algo_vpp_id, p.auth_key,
1018 p.crypt_algo_vpp_id, p.crypt_key,
1019 self.vpp_esp_protocol)
1020 p.tun_sa_in.add_vpp_config()
1022 def config_sa_tun(self, p):
1023 config_tun_params(p, self.encryption_type, self.tun_if)
1025 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1026 p.auth_algo_vpp_id, p.auth_key,
1027 p.crypt_algo_vpp_id, p.crypt_key,
1028 self.vpp_esp_protocol,
1029 self.tun_if.remote_addr[p.addr_type],
1030 self.tun_if.local_addr[p.addr_type])
1031 p.tun_sa_out.add_vpp_config()
1033 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1034 p.auth_algo_vpp_id, p.auth_key,
1035 p.crypt_algo_vpp_id, p.crypt_key,
1036 self.vpp_esp_protocol,
1037 self.tun_if.remote_addr[p.addr_type],
1038 self.tun_if.local_addr[p.addr_type])
1039 p.tun_sa_in.add_vpp_config()
1041 def config_protect(self, p):
1042 p.tun_protect = VppIpsecTunProtect(self,
1046 p.tun_protect.add_vpp_config()
1048 def config_network(self, p):
1049 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1051 self.pg0.remote_ip6)
1052 p.tun_if.add_vpp_config()
1054 p.tun_if.config_ip6()
1056 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1057 [VppRoutePath(p.tun_if.remote_ip6,
1059 proto=DpoProto.DPO_PROTO_IP6)])
1060 p.route.add_vpp_config()
1062 def unconfig_network(self, p):
1063 p.route.remove_vpp_config()
1064 p.tun_if.remove_vpp_config()
1066 def unconfig_protect(self, p):
1067 p.tun_protect.remove_vpp_config()
1069 def unconfig_sa(self, p):
1070 p.tun_sa_out.remove_vpp_config()
1071 p.tun_sa_in.remove_vpp_config()
1074 class TestIpsec6TunProtect(TemplateIpsec,
1075 TemplateIpsec6TunProtect,
1077 """ IPsec IPv6 Tunnel protect - transport mode"""
1079 encryption_type = ESP
1080 tun6_encrypt_node_name = "esp6-encrypt-tun"
1081 tun6_decrypt_node_name = "esp6-decrypt-tun"
1084 super(TestIpsec6TunProtect, self).setUp()
1086 self.tun_if = self.pg0
1089 super(TestIpsec6TunProtect, self).tearDown()
1091 def test_tun_66(self):
1092 """IPSEC tunnel protect"""
1094 p = self.ipv6_params
1096 self.config_network(p)
1097 self.config_sa_tra(p)
1098 self.config_protect(p)
1100 self.verify_tun_66(p, count=127)
1101 c = p.tun_if.get_rx_stats()
1102 self.assertEqual(c['packets'], 127)
1103 c = p.tun_if.get_tx_stats()
1104 self.assertEqual(c['packets'], 127)
1106 # rekey - create new SAs and update the tunnel protection
1108 np.crypt_key = 'X' + p.crypt_key[1:]
1109 np.scapy_tun_spi += 100
1110 np.scapy_tun_sa_id += 1
1111 np.vpp_tun_spi += 100
1112 np.vpp_tun_sa_id += 1
1113 np.tun_if.local_spi = p.vpp_tun_spi
1114 np.tun_if.remote_spi = p.scapy_tun_spi
1116 self.config_sa_tra(np)
1117 self.config_protect(np)
1120 self.verify_tun_66(np, count=127)
1121 c = p.tun_if.get_rx_stats()
1122 self.assertEqual(c['packets'], 254)
1123 c = p.tun_if.get_tx_stats()
1124 self.assertEqual(c['packets'], 254)
1127 # 1) add two input SAs [old, new]
1128 # 2) swap output SA to [new]
1129 # 3) use only [new] input SA
1131 np3.crypt_key = 'Z' + p.crypt_key[1:]
1132 np3.scapy_tun_spi += 100
1133 np3.scapy_tun_sa_id += 1
1134 np3.vpp_tun_spi += 100
1135 np3.vpp_tun_sa_id += 1
1136 np3.tun_if.local_spi = p.vpp_tun_spi
1137 np3.tun_if.remote_spi = p.scapy_tun_spi
1139 self.config_sa_tra(np3)
1142 p.tun_protect.update_vpp_config(np.tun_sa_out,
1143 [np.tun_sa_in, np3.tun_sa_in])
1144 self.verify_tun_66(np, np, count=127)
1145 self.verify_tun_66(np3, np, count=127)
1148 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1149 [np.tun_sa_in, np3.tun_sa_in])
1150 self.verify_tun_66(np, np3, count=127)
1151 self.verify_tun_66(np3, np3, count=127)
1154 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1156 self.verify_tun_66(np3, np3, count=127)
1157 self.verify_drop_tun_66(np, count=127)
1159 c = p.tun_if.get_rx_stats()
1160 self.assertEqual(c['packets'], 127*7)
1161 c = p.tun_if.get_tx_stats()
1162 self.assertEqual(c['packets'], 127*7)
1163 self.unconfig_sa(np)
1166 self.unconfig_protect(np3)
1167 self.unconfig_sa(np3)
1168 self.unconfig_network(p)
1171 class TestIpsec6TunProtectTun(TemplateIpsec,
1172 TemplateIpsec6TunProtect,
1174 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1176 encryption_type = ESP
1177 tun6_encrypt_node_name = "esp6-encrypt-tun"
1178 tun6_decrypt_node_name = "esp6-decrypt-tun"
1181 super(TestIpsec6TunProtectTun, self).setUp()
1183 self.tun_if = self.pg0
1186 super(TestIpsec6TunProtectTun, self).tearDown()
1188 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1190 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1191 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1192 dst=sw_intf.local_ip6) /
1193 IPv6(src=src, dst=dst) /
1194 UDP(sport=1166, dport=2233) /
1195 Raw('X' * payload_size))
1196 for i in range(count)]
1198 def gen_pkts6(self, sw_intf, src, dst, count=1,
1200 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1201 IPv6(src=src, dst=dst) /
1202 UDP(sport=1166, dport=2233) /
1203 Raw('X' * payload_size)
1204 for i in range(count)]
1206 def verify_decrypted6(self, p, rxs):
1208 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1209 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1210 self.assert_packet_checksums_valid(rx)
1212 def verify_encrypted6(self, p, sa, rxs):
1215 pkt = sa.decrypt(rx[IPv6])
1216 if not pkt.haslayer(IPv6):
1217 pkt = IPv6(pkt[Raw].load)
1218 self.assert_packet_checksums_valid(pkt)
1219 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1220 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1221 inner = pkt[IPv6].payload
1222 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1224 except (IndexError, AssertionError):
1225 self.logger.debug(ppp("Unexpected packet:", rx))
1227 self.logger.debug(ppp("Decrypted packet:", pkt))
1232 def test_tun_66(self):
1233 """IPSEC tunnel protect """
1235 p = self.ipv6_params
1237 self.config_network(p)
1238 self.config_sa_tun(p)
1239 self.config_protect(p)
1241 self.verify_tun_66(p, count=127)
1243 c = p.tun_if.get_rx_stats()
1244 self.assertEqual(c['packets'], 127)
1245 c = p.tun_if.get_tx_stats()
1246 self.assertEqual(c['packets'], 127)
1248 # rekey - create new SAs and update the tunnel protection
1250 np.crypt_key = 'X' + p.crypt_key[1:]
1251 np.scapy_tun_spi += 100
1252 np.scapy_tun_sa_id += 1
1253 np.vpp_tun_spi += 100
1254 np.vpp_tun_sa_id += 1
1255 np.tun_if.local_spi = p.vpp_tun_spi
1256 np.tun_if.remote_spi = p.scapy_tun_spi
1258 self.config_sa_tun(np)
1259 self.config_protect(np)
1262 self.verify_tun_66(np, count=127)
1263 c = p.tun_if.get_rx_stats()
1264 self.assertEqual(c['packets'], 254)
1265 c = p.tun_if.get_tx_stats()
1266 self.assertEqual(c['packets'], 254)
1269 self.unconfig_protect(np)
1270 self.unconfig_sa(np)
1271 self.unconfig_network(p)
1274 if __name__ == '__main__':
1275 unittest.main(testRunner=VppTestRunner)