5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from vpp_papi import VppEnum
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23 """ IPsec tunnel interface tests """
29 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
32 def tearDownClass(cls):
33 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
36 super(TemplateIpsec4TunIfEsp, self).setUp()
38 self.tun_if = self.pg0
42 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43 p.scapy_tun_spi, p.crypt_algo_vpp_id,
44 p.crypt_key, p.crypt_key,
45 p.auth_algo_vpp_id, p.auth_key,
47 p.tun_if.add_vpp_config()
52 r = VppIpRoute(self, p.remote_tun_if_host, 32,
53 [VppRoutePath(p.tun_if.remote_ip4,
56 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57 [VppRoutePath(p.tun_if.remote_ip6,
59 proto=DpoProto.DPO_PROTO_IP6)])
63 super(TemplateIpsec4TunIfEsp, self).tearDown()
66 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
67 """ IPsec UDP tunnel interface tests """
69 tun4_encrypt_node_name = "esp4-encrypt-tun"
70 tun4_decrypt_node_name = "esp4-decrypt"
75 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
78 def tearDownClass(cls):
79 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
82 super(TemplateIpsec4TunIfEspUdp, self).setUp()
84 self.tun_if = self.pg0
87 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
88 IPSEC_API_SAD_FLAG_UDP_ENCAP)
89 p.nat_header = UDP(sport=5454, dport=4500)
91 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
92 p.scapy_tun_spi, p.crypt_algo_vpp_id,
93 p.crypt_key, p.crypt_key,
94 p.auth_algo_vpp_id, p.auth_key,
95 p.auth_key, udp_encap=True)
96 p.tun_if.add_vpp_config()
101 r = VppIpRoute(self, p.remote_tun_if_host, 32,
102 [VppRoutePath(p.tun_if.remote_ip4,
105 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106 [VppRoutePath(p.tun_if.remote_ip6,
108 proto=DpoProto.DPO_PROTO_IP6)])
112 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
115 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
116 """ Ipsec ESP - TUN tests """
117 tun4_encrypt_node_name = "esp4-encrypt-tun"
118 tun4_decrypt_node_name = "esp4-decrypt"
120 def test_tun_basic64(self):
121 """ ipsec 6o4 tunnel basic test """
122 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
124 self.verify_tun_64(self.params[socket.AF_INET], count=1)
126 def test_tun_burst64(self):
127 """ ipsec 6o4 tunnel basic test """
128 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
130 self.verify_tun_64(self.params[socket.AF_INET], count=257)
132 def test_tun_basic_frag44(self):
133 """ ipsec 4o4 tunnel frag basic test """
134 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
138 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
140 self.verify_tun_44(self.params[socket.AF_INET],
141 count=1, payload_size=1800, n_rx=2)
142 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
146 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
147 """ Ipsec ESP UDP tests """
149 tun4_input_node = "ipsec4-if-input"
151 def test_keepalive(self):
152 """ IPSEC NAT Keepalive """
153 self.verify_keepalive(self.ipv4_params)
156 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
157 """ Ipsec ESP - TCP tests """
161 class TemplateIpsec6TunIfEsp(TemplateIpsec):
162 """ IPsec tunnel interface tests """
164 encryption_type = ESP
167 super(TemplateIpsec6TunIfEsp, self).setUp()
169 self.tun_if = self.pg0
172 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173 p.scapy_tun_spi, p.crypt_algo_vpp_id,
174 p.crypt_key, p.crypt_key,
175 p.auth_algo_vpp_id, p.auth_key,
176 p.auth_key, is_ip6=True)
177 tun_if.add_vpp_config()
182 r = VppIpRoute(self, p.remote_tun_if_host, 128,
183 [VppRoutePath(tun_if.remote_ip6,
185 proto=DpoProto.DPO_PROTO_IP6)])
187 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
188 [VppRoutePath(tun_if.remote_ip4,
193 super(TemplateIpsec6TunIfEsp, self).tearDown()
196 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
197 """ Ipsec ESP - TUN tests """
198 tun6_encrypt_node_name = "esp6-encrypt-tun"
199 tun6_decrypt_node_name = "esp6-decrypt"
201 def test_tun_basic46(self):
202 """ ipsec 4o6 tunnel basic test """
203 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
204 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
206 def test_tun_burst46(self):
207 """ ipsec 4o6 tunnel burst test """
208 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
209 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
212 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
213 """ IPsec IPv4 Multi Tunnel interface """
215 encryption_type = ESP
216 tun4_encrypt_node_name = "esp4-encrypt-tun"
217 tun4_decrypt_node_name = "esp4-decrypt"
220 super(TestIpsec4MultiTunIfEsp, self).setUp()
222 self.tun_if = self.pg0
224 self.multi_params = []
227 p = copy.copy(self.ipv4_params)
229 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
230 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
231 p.scapy_tun_spi = p.scapy_tun_spi + ii
232 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
233 p.vpp_tun_spi = p.vpp_tun_spi + ii
235 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
236 p.scapy_tra_spi = p.scapy_tra_spi + ii
237 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
238 p.vpp_tra_spi = p.vpp_tra_spi + ii
240 config_tun_params(p, self.encryption_type, self.tun_if)
241 self.multi_params.append(p)
243 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
246 p.crypt_key, p.crypt_key,
247 p.auth_algo_vpp_id, p.auth_key,
249 p.tun_if.add_vpp_config()
251 p.tun_if.config_ip4()
253 VppIpRoute(self, p.remote_tun_if_host, 32,
254 [VppRoutePath(p.tun_if.remote_ip4,
255 0xffffffff)]).add_vpp_config()
258 super(TestIpsec4MultiTunIfEsp, self).tearDown()
260 def test_tun_44(self):
261 """Multiple IPSEC tunnel interfaces """
262 for p in self.multi_params:
263 self.verify_tun_44(p, count=127)
264 c = p.tun_if.get_rx_stats()
265 self.assertEqual(c['packets'], 127)
266 c = p.tun_if.get_tx_stats()
267 self.assertEqual(c['packets'], 127)
270 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
271 """ IPsec IPv4 Tunnel interface all Algos """
273 encryption_type = ESP
274 tun4_encrypt_node_name = "esp4-encrypt-tun"
275 tun4_decrypt_node_name = "esp4-decrypt"
277 def config_network(self, p):
278 config_tun_params(p, self.encryption_type, self.tun_if)
280 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
283 p.crypt_key, p.crypt_key,
284 p.auth_algo_vpp_id, p.auth_key,
287 p.tun_if.add_vpp_config()
289 p.tun_if.config_ip4()
290 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
291 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
293 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
294 [VppRoutePath(p.tun_if.remote_ip4,
296 p.route.add_vpp_config()
298 def unconfig_network(self, p):
299 p.tun_if.unconfig_ip4()
300 p.tun_if.remove_vpp_config()
301 p.route.remove_vpp_config()
304 super(TestIpsec4TunIfEspAll, self).setUp()
306 self.tun_if = self.pg0
309 super(TestIpsec4TunIfEspAll, self).tearDown()
313 # change the key and the SPI
315 p.crypt_key = 'X' + p.crypt_key[1:]
317 p.scapy_tun_sa_id += 1
320 p.tun_if.local_spi = p.vpp_tun_spi
321 p.tun_if.remote_spi = p.scapy_tun_spi
323 config_tun_params(p, self.encryption_type, self.tun_if)
325 p.tun_sa_in = VppIpsecSA(self,
332 self.vpp_esp_protocol,
333 self.tun_if.local_addr[p.addr_type],
334 self.tun_if.remote_addr[p.addr_type],
337 p.tun_sa_out = VppIpsecSA(self,
344 self.vpp_esp_protocol,
345 self.tun_if.remote_addr[p.addr_type],
346 self.tun_if.local_addr[p.addr_type],
349 p.tun_sa_in.add_vpp_config()
350 p.tun_sa_out.add_vpp_config()
352 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
353 sa_id=p.tun_sa_in.id,
355 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
356 sa_id=p.tun_sa_out.id,
358 self.logger.info(self.vapi.cli("sh ipsec sa"))
360 def test_tun_44(self):
361 """IPSEC tunnel all algos """
363 # foreach VPP crypto engine
364 engines = ["ia32", "ipsecmb", "openssl"]
366 # foreach crypto algorithm
367 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
368 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
369 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
370 IPSEC_API_INTEG_ALG_NONE),
371 'scapy-crypto': "AES-GCM",
372 'scapy-integ': "NULL",
373 'key': "JPjyOWBeVEQiMe7h",
375 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
376 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
377 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
378 IPSEC_API_INTEG_ALG_NONE),
379 'scapy-crypto': "AES-GCM",
380 'scapy-integ': "NULL",
381 'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
383 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
384 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
385 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
386 IPSEC_API_INTEG_ALG_NONE),
387 'scapy-crypto': "AES-GCM",
388 'scapy-integ': "NULL",
389 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
391 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
392 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
393 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
394 IPSEC_API_INTEG_ALG_SHA1_96),
395 'scapy-crypto': "AES-CBC",
396 'scapy-integ': "HMAC-SHA1-96",
398 'key': "JPjyOWBeVEQiMe7h"},
399 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
400 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
401 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
402 IPSEC_API_INTEG_ALG_SHA1_96),
403 'scapy-crypto': "AES-CBC",
404 'scapy-integ': "HMAC-SHA1-96",
406 'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
407 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
408 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
409 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
410 IPSEC_API_INTEG_ALG_SHA1_96),
411 'scapy-crypto': "AES-CBC",
412 'scapy-integ': "HMAC-SHA1-96",
414 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
416 for engine in engines:
417 self.vapi.cli("set crypto handler all %s" % engine)
420 # loop through each of the algorithms
423 # with self.subTest(algo=algo['scapy']):
425 p = copy.copy(self.ipv4_params)
426 p.auth_algo_vpp_id = algo['vpp-integ']
427 p.crypt_algo_vpp_id = algo['vpp-crypto']
428 p.crypt_algo = algo['scapy-crypto']
429 p.auth_algo = algo['scapy-integ']
430 p.crypt_key = algo['key']
431 p.salt = algo['salt']
433 self.config_network(p)
435 self.verify_tun_44(p, count=127)
436 c = p.tun_if.get_rx_stats()
437 self.assertEqual(c['packets'], 127)
438 c = p.tun_if.get_tx_stats()
439 self.assertEqual(c['packets'], 127)
445 self.verify_tun_44(p, count=127)
447 self.unconfig_network(p)
448 p.tun_sa_out.remove_vpp_config()
449 p.tun_sa_in.remove_vpp_config()
452 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
453 """ IPsec IPv6 Multi Tunnel interface """
455 encryption_type = ESP
456 tun6_encrypt_node_name = "esp6-encrypt-tun"
457 tun6_decrypt_node_name = "esp6-decrypt"
460 super(TestIpsec6MultiTunIfEsp, self).setUp()
462 self.tun_if = self.pg0
464 self.multi_params = []
467 p = copy.copy(self.ipv6_params)
469 p.remote_tun_if_host = "1111::%d" % (ii + 1)
470 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
471 p.scapy_tun_spi = p.scapy_tun_spi + ii
472 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
473 p.vpp_tun_spi = p.vpp_tun_spi + ii
475 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
476 p.scapy_tra_spi = p.scapy_tra_spi + ii
477 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
478 p.vpp_tra_spi = p.vpp_tra_spi + ii
480 config_tun_params(p, self.encryption_type, self.tun_if)
481 self.multi_params.append(p)
483 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
486 p.crypt_key, p.crypt_key,
487 p.auth_algo_vpp_id, p.auth_key,
488 p.auth_key, is_ip6=True)
489 p.tun_if.add_vpp_config()
491 p.tun_if.config_ip6()
493 r = VppIpRoute(self, p.remote_tun_if_host, 128,
494 [VppRoutePath(p.tun_if.remote_ip6,
496 proto=DpoProto.DPO_PROTO_IP6)])
500 super(TestIpsec6MultiTunIfEsp, self).tearDown()
502 def test_tun_66(self):
503 """Multiple IPSEC tunnel interfaces """
504 for p in self.multi_params:
505 self.verify_tun_66(p, count=127)
506 c = p.tun_if.get_rx_stats()
507 self.assertEqual(c['packets'], 127)
508 c = p.tun_if.get_tx_stats()
509 self.assertEqual(c['packets'], 127)
512 class TestIpsecGreTebIfEsp(TemplateIpsec,
514 """ Ipsec GRE TEB ESP - TUN tests """
515 tun4_encrypt_node_name = "esp4-encrypt-tun"
516 tun4_decrypt_node_name = "esp4-decrypt-tun"
517 encryption_type = ESP
518 omac = "00:11:22:33:44:55"
520 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
522 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
523 sa.encrypt(IP(src=self.pg0.remote_ip4,
524 dst=self.pg0.local_ip4) /
526 Ether(dst=self.omac) /
527 IP(src="1.1.1.1", dst="1.1.1.2") /
528 UDP(sport=1144, dport=2233) /
529 Raw('X' * payload_size))
530 for i in range(count)]
532 def gen_pkts(self, sw_intf, src, dst, count=1,
534 return [Ether(dst=self.omac) /
535 IP(src="1.1.1.1", dst="1.1.1.2") /
536 UDP(sport=1144, dport=2233) /
537 Raw('X' * payload_size)
538 for i in range(count)]
540 def verify_decrypted(self, p, rxs):
542 self.assert_equal(rx[Ether].dst, self.omac)
543 self.assert_equal(rx[IP].dst, "1.1.1.2")
545 def verify_encrypted(self, p, sa, rxs):
548 pkt = sa.decrypt(rx[IP])
549 if not pkt.haslayer(IP):
550 pkt = IP(pkt[Raw].load)
551 self.assert_packet_checksums_valid(pkt)
552 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
553 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
554 self.assertTrue(pkt.haslayer(GRE))
556 self.assertEqual(e[Ether].dst, self.omac)
557 self.assertEqual(e[IP].dst, "1.1.1.2")
558 except (IndexError, AssertionError):
559 self.logger.debug(ppp("Unexpected packet:", rx))
561 self.logger.debug(ppp("Decrypted packet:", pkt))
567 super(TestIpsecGreTebIfEsp, self).setUp()
569 self.tun_if = self.pg0
573 bd1 = VppBridgeDomain(self, 1)
576 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
577 p.auth_algo_vpp_id, p.auth_key,
578 p.crypt_algo_vpp_id, p.crypt_key,
579 self.vpp_esp_protocol,
582 p.tun_sa_out.add_vpp_config()
584 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
585 p.auth_algo_vpp_id, p.auth_key,
586 p.crypt_algo_vpp_id, p.crypt_key,
587 self.vpp_esp_protocol,
590 p.tun_sa_in.add_vpp_config()
592 self.tun = VppGreInterface(self,
595 type=(VppEnum.vl_api_gre_tunnel_type_t.
596 GRE_API_TUNNEL_TYPE_TEB))
597 self.tun.add_vpp_config()
599 p.tun_protect = VppIpsecTunProtect(self,
604 p.tun_protect.add_vpp_config()
607 self.tun.config_ip4()
609 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
610 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
612 self.vapi.cli("clear ipsec sa")
615 self.tun.unconfig_ip4()
616 super(TestIpsecGreTebIfEsp, self).tearDown()
619 class TestIpsecGreIfEsp(TemplateIpsec,
621 """ Ipsec GRE ESP - TUN tests """
622 tun4_encrypt_node_name = "esp4-encrypt-tun"
623 tun4_decrypt_node_name = "esp4-decrypt-tun"
624 encryption_type = ESP
626 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
628 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
629 sa.encrypt(IP(src=self.pg0.remote_ip4,
630 dst=self.pg0.local_ip4) /
632 IP(src=self.pg1.local_ip4,
633 dst=self.pg1.remote_ip4) /
634 UDP(sport=1144, dport=2233) /
635 Raw('X' * payload_size))
636 for i in range(count)]
638 def gen_pkts(self, sw_intf, src, dst, count=1,
640 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
641 IP(src="1.1.1.1", dst="1.1.1.2") /
642 UDP(sport=1144, dport=2233) /
643 Raw('X' * payload_size)
644 for i in range(count)]
646 def verify_decrypted(self, p, rxs):
648 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
649 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
651 def verify_encrypted(self, p, sa, rxs):
654 pkt = sa.decrypt(rx[IP])
655 if not pkt.haslayer(IP):
656 pkt = IP(pkt[Raw].load)
657 self.assert_packet_checksums_valid(pkt)
658 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
659 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
660 self.assertTrue(pkt.haslayer(GRE))
662 self.assertEqual(e[IP].dst, "1.1.1.2")
663 except (IndexError, AssertionError):
664 self.logger.debug(ppp("Unexpected packet:", rx))
666 self.logger.debug(ppp("Decrypted packet:", pkt))
672 super(TestIpsecGreIfEsp, self).setUp()
674 self.tun_if = self.pg0
678 bd1 = VppBridgeDomain(self, 1)
681 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
682 p.auth_algo_vpp_id, p.auth_key,
683 p.crypt_algo_vpp_id, p.crypt_key,
684 self.vpp_esp_protocol,
687 p.tun_sa_out.add_vpp_config()
689 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
690 p.auth_algo_vpp_id, p.auth_key,
691 p.crypt_algo_vpp_id, p.crypt_key,
692 self.vpp_esp_protocol,
695 p.tun_sa_in.add_vpp_config()
697 self.tun = VppGreInterface(self,
700 self.tun.add_vpp_config()
702 p.tun_protect = VppIpsecTunProtect(self,
706 p.tun_protect.add_vpp_config()
709 self.tun.config_ip4()
711 VppIpRoute(self, "1.1.1.2", 32,
712 [VppRoutePath(self.tun.remote_ip4,
713 0xffffffff)]).add_vpp_config()
716 self.tun.unconfig_ip4()
717 super(TestIpsecGreIfEsp, self).tearDown()
720 class TemplateIpsec4TunProtect(object):
721 """ IPsec IPv4 Tunnel protect """
723 encryption_type = ESP
724 tun4_encrypt_node_name = "esp4-encrypt-tun"
725 tun4_decrypt_node_name = "esp4-decrypt-tun"
726 tun4_input_node = "ipsec4-tun-input"
728 def config_sa_tra(self, p):
729 config_tun_params(p, self.encryption_type, self.tun_if)
731 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
732 p.auth_algo_vpp_id, p.auth_key,
733 p.crypt_algo_vpp_id, p.crypt_key,
734 self.vpp_esp_protocol,
736 p.tun_sa_out.add_vpp_config()
738 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
739 p.auth_algo_vpp_id, p.auth_key,
740 p.crypt_algo_vpp_id, p.crypt_key,
741 self.vpp_esp_protocol,
743 p.tun_sa_in.add_vpp_config()
745 def config_sa_tun(self, p):
746 config_tun_params(p, self.encryption_type, self.tun_if)
748 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
749 p.auth_algo_vpp_id, p.auth_key,
750 p.crypt_algo_vpp_id, p.crypt_key,
751 self.vpp_esp_protocol,
752 self.tun_if.remote_addr[p.addr_type],
753 self.tun_if.local_addr[p.addr_type],
755 p.tun_sa_out.add_vpp_config()
757 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
758 p.auth_algo_vpp_id, p.auth_key,
759 p.crypt_algo_vpp_id, p.crypt_key,
760 self.vpp_esp_protocol,
761 self.tun_if.remote_addr[p.addr_type],
762 self.tun_if.local_addr[p.addr_type],
764 p.tun_sa_in.add_vpp_config()
766 def config_protect(self, p):
767 p.tun_protect = VppIpsecTunProtect(self,
771 p.tun_protect.add_vpp_config()
773 def config_network(self, p):
774 p.tun_if = VppIpIpTunInterface(self, self.pg0,
777 p.tun_if.add_vpp_config()
779 p.tun_if.config_ip4()
781 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
782 [VppRoutePath(p.tun_if.remote_ip4,
784 p.route.add_vpp_config()
786 def unconfig_network(self, p):
787 p.route.remove_vpp_config()
788 p.tun_if.remove_vpp_config()
790 def unconfig_protect(self, p):
791 p.tun_protect.remove_vpp_config()
793 def unconfig_sa(self, p):
794 p.tun_sa_out.remove_vpp_config()
795 p.tun_sa_in.remove_vpp_config()
798 class TestIpsec4TunProtect(TemplateIpsec,
799 TemplateIpsec4TunProtect,
801 """ IPsec IPv4 Tunnel protect - transport mode"""
804 super(TestIpsec4TunProtect, self).setUp()
806 self.tun_if = self.pg0
809 super(TestIpsec4TunProtect, self).tearDown()
811 def test_tun_44(self):
812 """IPSEC tunnel protect"""
816 self.config_network(p)
817 self.config_sa_tra(p)
818 self.config_protect(p)
820 self.verify_tun_44(p, count=127)
821 c = p.tun_if.get_rx_stats()
822 self.assertEqual(c['packets'], 127)
823 c = p.tun_if.get_tx_stats()
824 self.assertEqual(c['packets'], 127)
826 # rekey - create new SAs and update the tunnel protection
828 np.crypt_key = 'X' + p.crypt_key[1:]
829 np.scapy_tun_spi += 100
830 np.scapy_tun_sa_id += 1
831 np.vpp_tun_spi += 100
832 np.vpp_tun_sa_id += 1
833 np.tun_if.local_spi = p.vpp_tun_spi
834 np.tun_if.remote_spi = p.scapy_tun_spi
836 self.config_sa_tra(np)
837 self.config_protect(np)
840 self.verify_tun_44(np, count=127)
841 c = p.tun_if.get_rx_stats()
842 self.assertEqual(c['packets'], 254)
843 c = p.tun_if.get_tx_stats()
844 self.assertEqual(c['packets'], 254)
847 self.unconfig_protect(np)
849 self.unconfig_network(p)
852 class TestIpsec4TunProtectUdp(TemplateIpsec,
853 TemplateIpsec4TunProtect,
855 """ IPsec IPv4 Tunnel protect - transport mode"""
858 super(TestIpsec4TunProtectUdp, self).setUp()
860 self.tun_if = self.pg0
863 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
864 IPSEC_API_SAD_FLAG_UDP_ENCAP)
865 p.nat_header = UDP(sport=5454, dport=4500)
866 self.config_network(p)
867 self.config_sa_tra(p)
868 self.config_protect(p)
872 self.unconfig_protect(p)
874 self.unconfig_network(p)
875 super(TestIpsec4TunProtectUdp, self).tearDown()
877 def test_tun_44(self):
878 """IPSEC UDP tunnel protect"""
882 self.verify_tun_44(p, count=127)
883 c = p.tun_if.get_rx_stats()
884 self.assertEqual(c['packets'], 127)
885 c = p.tun_if.get_tx_stats()
886 self.assertEqual(c['packets'], 127)
888 def test_keepalive(self):
889 """ IPSEC NAT Keepalive """
890 self.verify_keepalive(self.ipv4_params)
893 class TestIpsec4TunProtectTun(TemplateIpsec,
894 TemplateIpsec4TunProtect,
896 """ IPsec IPv4 Tunnel protect - tunnel mode"""
898 encryption_type = ESP
899 tun4_encrypt_node_name = "esp4-encrypt-tun"
900 tun4_decrypt_node_name = "esp4-decrypt-tun"
903 super(TestIpsec4TunProtectTun, self).setUp()
905 self.tun_if = self.pg0
908 super(TestIpsec4TunProtectTun, self).tearDown()
910 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
912 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
913 sa.encrypt(IP(src=sw_intf.remote_ip4,
914 dst=sw_intf.local_ip4) /
915 IP(src=src, dst=dst) /
916 UDP(sport=1144, dport=2233) /
917 Raw('X' * payload_size))
918 for i in range(count)]
920 def gen_pkts(self, sw_intf, src, dst, count=1,
922 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
923 IP(src=src, dst=dst) /
924 UDP(sport=1144, dport=2233) /
925 Raw('X' * payload_size)
926 for i in range(count)]
928 def verify_decrypted(self, p, rxs):
930 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
931 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
932 self.assert_packet_checksums_valid(rx)
934 def verify_encrypted(self, p, sa, rxs):
937 pkt = sa.decrypt(rx[IP])
938 if not pkt.haslayer(IP):
939 pkt = IP(pkt[Raw].load)
940 self.assert_packet_checksums_valid(pkt)
941 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
942 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
943 inner = pkt[IP].payload
944 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
946 except (IndexError, AssertionError):
947 self.logger.debug(ppp("Unexpected packet:", rx))
949 self.logger.debug(ppp("Decrypted packet:", pkt))
954 def test_tun_44(self):
955 """IPSEC tunnel protect """
959 self.config_network(p)
960 self.config_sa_tun(p)
961 self.config_protect(p)
963 self.verify_tun_44(p, count=127)
965 c = p.tun_if.get_rx_stats()
966 self.assertEqual(c['packets'], 127)
967 c = p.tun_if.get_tx_stats()
968 self.assertEqual(c['packets'], 127)
970 # rekey - create new SAs and update the tunnel protection
972 np.crypt_key = 'X' + p.crypt_key[1:]
973 np.scapy_tun_spi += 100
974 np.scapy_tun_sa_id += 1
975 np.vpp_tun_spi += 100
976 np.vpp_tun_sa_id += 1
977 np.tun_if.local_spi = p.vpp_tun_spi
978 np.tun_if.remote_spi = p.scapy_tun_spi
980 self.config_sa_tun(np)
981 self.config_protect(np)
984 self.verify_tun_44(np, count=127)
985 c = p.tun_if.get_rx_stats()
986 self.assertEqual(c['packets'], 254)
987 c = p.tun_if.get_tx_stats()
988 self.assertEqual(c['packets'], 254)
991 self.unconfig_protect(np)
993 self.unconfig_network(p)
996 class TemplateIpsec6TunProtect(object):
997 """ IPsec IPv6 Tunnel protect """
999 def config_sa_tra(self, p):
1000 config_tun_params(p, self.encryption_type, self.tun_if)
1002 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1003 p.auth_algo_vpp_id, p.auth_key,
1004 p.crypt_algo_vpp_id, p.crypt_key,
1005 self.vpp_esp_protocol)
1006 p.tun_sa_out.add_vpp_config()
1008 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1009 p.auth_algo_vpp_id, p.auth_key,
1010 p.crypt_algo_vpp_id, p.crypt_key,
1011 self.vpp_esp_protocol)
1012 p.tun_sa_in.add_vpp_config()
1014 def config_sa_tun(self, p):
1015 config_tun_params(p, self.encryption_type, self.tun_if)
1017 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1018 p.auth_algo_vpp_id, p.auth_key,
1019 p.crypt_algo_vpp_id, p.crypt_key,
1020 self.vpp_esp_protocol,
1021 self.tun_if.remote_addr[p.addr_type],
1022 self.tun_if.local_addr[p.addr_type])
1023 p.tun_sa_out.add_vpp_config()
1025 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1026 p.auth_algo_vpp_id, p.auth_key,
1027 p.crypt_algo_vpp_id, p.crypt_key,
1028 self.vpp_esp_protocol,
1029 self.tun_if.remote_addr[p.addr_type],
1030 self.tun_if.local_addr[p.addr_type])
1031 p.tun_sa_in.add_vpp_config()
1033 def config_protect(self, p):
1034 p.tun_protect = VppIpsecTunProtect(self,
1038 p.tun_protect.add_vpp_config()
1040 def config_network(self, p):
1041 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1043 self.pg0.remote_ip6)
1044 p.tun_if.add_vpp_config()
1046 p.tun_if.config_ip6()
1048 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1049 [VppRoutePath(p.tun_if.remote_ip6,
1051 proto=DpoProto.DPO_PROTO_IP6)])
1052 p.route.add_vpp_config()
1054 def unconfig_network(self, p):
1055 p.route.remove_vpp_config()
1056 p.tun_if.remove_vpp_config()
1058 def unconfig_protect(self, p):
1059 p.tun_protect.remove_vpp_config()
1061 def unconfig_sa(self, p):
1062 p.tun_sa_out.remove_vpp_config()
1063 p.tun_sa_in.remove_vpp_config()
1066 class TestIpsec6TunProtect(TemplateIpsec,
1067 TemplateIpsec6TunProtect,
1069 """ IPsec IPv6 Tunnel protect - transport mode"""
1071 encryption_type = ESP
1072 tun6_encrypt_node_name = "esp6-encrypt-tun"
1073 tun6_decrypt_node_name = "esp6-decrypt-tun"
1076 super(TestIpsec6TunProtect, self).setUp()
1078 self.tun_if = self.pg0
1081 super(TestIpsec6TunProtect, self).tearDown()
1083 def test_tun_66(self):
1084 """IPSEC tunnel protect"""
1086 p = self.ipv6_params
1088 self.config_network(p)
1089 self.config_sa_tra(p)
1090 self.config_protect(p)
1092 self.verify_tun_66(p, count=127)
1093 c = p.tun_if.get_rx_stats()
1094 self.assertEqual(c['packets'], 127)
1095 c = p.tun_if.get_tx_stats()
1096 self.assertEqual(c['packets'], 127)
1098 # rekey - create new SAs and update the tunnel protection
1100 np.crypt_key = 'X' + p.crypt_key[1:]
1101 np.scapy_tun_spi += 100
1102 np.scapy_tun_sa_id += 1
1103 np.vpp_tun_spi += 100
1104 np.vpp_tun_sa_id += 1
1105 np.tun_if.local_spi = p.vpp_tun_spi
1106 np.tun_if.remote_spi = p.scapy_tun_spi
1108 self.config_sa_tra(np)
1109 self.config_protect(np)
1112 self.verify_tun_66(np, count=127)
1113 c = p.tun_if.get_rx_stats()
1114 self.assertEqual(c['packets'], 254)
1115 c = p.tun_if.get_tx_stats()
1116 self.assertEqual(c['packets'], 254)
1119 # 1) add two input SAs [old, new]
1120 # 2) swap output SA to [new]
1121 # 3) use only [new] input SA
1123 np3.crypt_key = 'Z' + p.crypt_key[1:]
1124 np3.scapy_tun_spi += 100
1125 np3.scapy_tun_sa_id += 1
1126 np3.vpp_tun_spi += 100
1127 np3.vpp_tun_sa_id += 1
1128 np3.tun_if.local_spi = p.vpp_tun_spi
1129 np3.tun_if.remote_spi = p.scapy_tun_spi
1131 self.config_sa_tra(np3)
1134 p.tun_protect.update_vpp_config(np.tun_sa_out,
1135 [np.tun_sa_in, np3.tun_sa_in])
1136 self.verify_tun_66(np, np, count=127)
1137 self.verify_tun_66(np3, np, count=127)
1140 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1141 [np.tun_sa_in, np3.tun_sa_in])
1142 self.verify_tun_66(np, np3, count=127)
1143 self.verify_tun_66(np3, np3, count=127)
1146 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1148 self.verify_tun_66(np3, np3, count=127)
1149 self.verify_drop_tun_66(np, count=127)
1151 c = p.tun_if.get_rx_stats()
1152 self.assertEqual(c['packets'], 127*7)
1153 c = p.tun_if.get_tx_stats()
1154 self.assertEqual(c['packets'], 127*7)
1155 self.unconfig_sa(np)
1158 self.unconfig_protect(np3)
1159 self.unconfig_sa(np3)
1160 self.unconfig_network(p)
1163 class TestIpsec6TunProtectTun(TemplateIpsec,
1164 TemplateIpsec6TunProtect,
1166 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1168 encryption_type = ESP
1169 tun6_encrypt_node_name = "esp6-encrypt-tun"
1170 tun6_decrypt_node_name = "esp6-decrypt-tun"
1173 super(TestIpsec6TunProtectTun, self).setUp()
1175 self.tun_if = self.pg0
1178 super(TestIpsec6TunProtectTun, self).tearDown()
1180 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1182 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1183 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1184 dst=sw_intf.local_ip6) /
1185 IPv6(src=src, dst=dst) /
1186 UDP(sport=1166, dport=2233) /
1187 Raw('X' * payload_size))
1188 for i in range(count)]
1190 def gen_pkts6(self, sw_intf, src, dst, count=1,
1192 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1193 IPv6(src=src, dst=dst) /
1194 UDP(sport=1166, dport=2233) /
1195 Raw('X' * payload_size)
1196 for i in range(count)]
1198 def verify_decrypted6(self, p, rxs):
1200 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1201 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1202 self.assert_packet_checksums_valid(rx)
1204 def verify_encrypted6(self, p, sa, rxs):
1207 pkt = sa.decrypt(rx[IPv6])
1208 if not pkt.haslayer(IPv6):
1209 pkt = IPv6(pkt[Raw].load)
1210 self.assert_packet_checksums_valid(pkt)
1211 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1212 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1213 inner = pkt[IPv6].payload
1214 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1216 except (IndexError, AssertionError):
1217 self.logger.debug(ppp("Unexpected packet:", rx))
1219 self.logger.debug(ppp("Decrypted packet:", pkt))
1224 def test_tun_66(self):
1225 """IPSEC tunnel protect """
1227 p = self.ipv6_params
1229 self.config_network(p)
1230 self.config_sa_tun(p)
1231 self.config_protect(p)
1233 self.verify_tun_66(p, count=127)
1235 c = p.tun_if.get_rx_stats()
1236 self.assertEqual(c['packets'], 127)
1237 c = p.tun_if.get_tx_stats()
1238 self.assertEqual(c['packets'], 127)
1240 # rekey - create new SAs and update the tunnel protection
1242 np.crypt_key = 'X' + p.crypt_key[1:]
1243 np.scapy_tun_spi += 100
1244 np.scapy_tun_sa_id += 1
1245 np.vpp_tun_spi += 100
1246 np.vpp_tun_sa_id += 1
1247 np.tun_if.local_spi = p.vpp_tun_spi
1248 np.tun_if.remote_spi = p.scapy_tun_spi
1250 self.config_sa_tun(np)
1251 self.config_protect(np)
1254 self.verify_tun_66(np, count=127)
1255 c = p.tun_if.get_rx_stats()
1256 self.assertEqual(c['packets'], 254)
1257 c = p.tun_if.get_tx_stats()
1258 self.assertEqual(c['packets'], 254)
1261 self.unconfig_protect(np)
1262 self.unconfig_sa(np)
1263 self.unconfig_network(p)
1266 if __name__ == '__main__':
1267 unittest.main(testRunner=VppTestRunner)