5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.inet6 import IPv6
9 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface
13 from vpp_gre_interface import VppGreInterface
14 from vpp_ipip_tun_interface import VppIpIpTunInterface
15 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
16 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
17 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from vpp_papi import VppEnum
22 class TemplateIpsec4TunIfEsp(TemplateIpsec):
23 """ IPsec tunnel interface tests """
29 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
32 def tearDownClass(cls):
33 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
36 super(TemplateIpsec4TunIfEsp, self).setUp()
38 self.tun_if = self.pg0
42 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
43 p.scapy_tun_spi, p.crypt_algo_vpp_id,
44 p.crypt_key, p.crypt_key,
45 p.auth_algo_vpp_id, p.auth_key,
47 p.tun_if.add_vpp_config()
52 r = VppIpRoute(self, p.remote_tun_if_host, 32,
53 [VppRoutePath(p.tun_if.remote_ip4,
56 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
57 [VppRoutePath(p.tun_if.remote_ip6,
59 proto=DpoProto.DPO_PROTO_IP6)])
63 super(TemplateIpsec4TunIfEsp, self).tearDown()
66 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
67 """ Ipsec ESP - TUN tests """
68 tun4_encrypt_node_name = "esp4-encrypt-tun"
69 tun4_decrypt_node_name = "esp4-decrypt"
71 def test_tun_basic64(self):
72 """ ipsec 6o4 tunnel basic test """
73 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
75 self.verify_tun_64(self.params[socket.AF_INET], count=1)
77 def test_tun_burst64(self):
78 """ ipsec 6o4 tunnel basic test """
79 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
81 self.verify_tun_64(self.params[socket.AF_INET], count=257)
83 def test_tun_basic_frag44(self):
84 """ ipsec 4o4 tunnel frag basic test """
85 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
89 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
91 self.verify_tun_44(self.params[socket.AF_INET],
92 count=1, payload_size=1800, n_rx=2)
93 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
97 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
98 """ Ipsec ESP - TCP tests """
102 class TemplateIpsec6TunIfEsp(TemplateIpsec):
103 """ IPsec tunnel interface tests """
105 encryption_type = ESP
108 super(TemplateIpsec6TunIfEsp, self).setUp()
110 self.tun_if = self.pg0
113 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
114 p.scapy_tun_spi, p.crypt_algo_vpp_id,
115 p.crypt_key, p.crypt_key,
116 p.auth_algo_vpp_id, p.auth_key,
117 p.auth_key, is_ip6=True)
118 tun_if.add_vpp_config()
123 r = VppIpRoute(self, p.remote_tun_if_host, 128,
124 [VppRoutePath(tun_if.remote_ip6,
126 proto=DpoProto.DPO_PROTO_IP6)])
128 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
129 [VppRoutePath(tun_if.remote_ip4,
134 super(TemplateIpsec6TunIfEsp, self).tearDown()
137 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
138 """ Ipsec ESP - TUN tests """
139 tun6_encrypt_node_name = "esp6-encrypt-tun"
140 tun6_decrypt_node_name = "esp6-decrypt"
142 def test_tun_basic46(self):
143 """ ipsec 4o6 tunnel basic test """
144 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
145 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
147 def test_tun_burst46(self):
148 """ ipsec 4o6 tunnel burst test """
149 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
150 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
153 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
154 """ IPsec IPv4 Multi Tunnel interface """
156 encryption_type = ESP
157 tun4_encrypt_node_name = "esp4-encrypt-tun"
158 tun4_decrypt_node_name = "esp4-decrypt"
161 super(TestIpsec4MultiTunIfEsp, self).setUp()
163 self.tun_if = self.pg0
165 self.multi_params = []
168 p = copy.copy(self.ipv4_params)
170 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
171 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
172 p.scapy_tun_spi = p.scapy_tun_spi + ii
173 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
174 p.vpp_tun_spi = p.vpp_tun_spi + ii
176 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
177 p.scapy_tra_spi = p.scapy_tra_spi + ii
178 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
179 p.vpp_tra_spi = p.vpp_tra_spi + ii
181 config_tun_params(p, self.encryption_type, self.tun_if)
182 self.multi_params.append(p)
184 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
187 p.crypt_key, p.crypt_key,
188 p.auth_algo_vpp_id, p.auth_key,
190 p.tun_if.add_vpp_config()
192 p.tun_if.config_ip4()
194 VppIpRoute(self, p.remote_tun_if_host, 32,
195 [VppRoutePath(p.tun_if.remote_ip4,
196 0xffffffff)]).add_vpp_config()
199 super(TestIpsec4MultiTunIfEsp, self).tearDown()
201 def test_tun_44(self):
202 """Multiple IPSEC tunnel interfaces """
203 for p in self.multi_params:
204 self.verify_tun_44(p, count=127)
205 c = p.tun_if.get_rx_stats()
206 self.assertEqual(c['packets'], 127)
207 c = p.tun_if.get_tx_stats()
208 self.assertEqual(c['packets'], 127)
211 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
212 """ IPsec IPv4 Tunnel interface all Algos """
214 encryption_type = ESP
215 tun4_encrypt_node_name = "esp4-encrypt-tun"
216 tun4_decrypt_node_name = "esp4-decrypt"
218 def config_network(self, p):
219 config_tun_params(p, self.encryption_type, self.tun_if)
221 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
224 p.crypt_key, p.crypt_key,
225 p.auth_algo_vpp_id, p.auth_key,
228 p.tun_if.add_vpp_config()
230 p.tun_if.config_ip4()
231 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
232 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
234 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
235 [VppRoutePath(p.tun_if.remote_ip4,
237 p.route.add_vpp_config()
239 def unconfig_network(self, p):
240 p.tun_if.unconfig_ip4()
241 p.tun_if.remove_vpp_config()
242 p.route.remove_vpp_config()
245 super(TestIpsec4TunIfEspAll, self).setUp()
247 self.tun_if = self.pg0
250 super(TestIpsec4TunIfEspAll, self).tearDown()
254 # change the key and the SPI
256 p.crypt_key = 'X' + p.crypt_key[1:]
258 p.scapy_tun_sa_id += 1
261 p.tun_if.local_spi = p.vpp_tun_spi
262 p.tun_if.remote_spi = p.scapy_tun_spi
264 config_tun_params(p, self.encryption_type, self.tun_if)
266 p.tun_sa_in = VppIpsecSA(self,
273 self.vpp_esp_protocol,
274 self.tun_if.local_addr[p.addr_type],
275 self.tun_if.remote_addr[p.addr_type],
278 p.tun_sa_out = VppIpsecSA(self,
285 self.vpp_esp_protocol,
286 self.tun_if.remote_addr[p.addr_type],
287 self.tun_if.local_addr[p.addr_type],
290 p.tun_sa_in.add_vpp_config()
291 p.tun_sa_out.add_vpp_config()
293 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
294 sa_id=p.tun_sa_in.id,
296 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
297 sa_id=p.tun_sa_out.id,
299 self.logger.info(self.vapi.cli("sh ipsec sa"))
301 def test_tun_44(self):
302 """IPSEC tunnel all algos """
304 # foreach VPP crypto engine
305 engines = ["ia32", "ipsecmb", "openssl"]
307 # foreach crypto algorithm
308 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
309 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
310 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
311 IPSEC_API_INTEG_ALG_NONE),
312 'scapy-crypto': "AES-GCM",
313 'scapy-integ': "NULL",
314 'key': "JPjyOWBeVEQiMe7h",
316 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
317 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
318 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
319 IPSEC_API_INTEG_ALG_NONE),
320 'scapy-crypto': "AES-GCM",
321 'scapy-integ': "NULL",
322 'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
324 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
325 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
326 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
327 IPSEC_API_INTEG_ALG_NONE),
328 'scapy-crypto': "AES-GCM",
329 'scapy-integ': "NULL",
330 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
332 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
333 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
334 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
335 IPSEC_API_INTEG_ALG_SHA1_96),
336 'scapy-crypto': "AES-CBC",
337 'scapy-integ': "HMAC-SHA1-96",
339 'key': "JPjyOWBeVEQiMe7h"},
340 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
341 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
342 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
343 IPSEC_API_INTEG_ALG_SHA1_96),
344 'scapy-crypto': "AES-CBC",
345 'scapy-integ': "HMAC-SHA1-96",
347 'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
348 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
349 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
350 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
351 IPSEC_API_INTEG_ALG_SHA1_96),
352 'scapy-crypto': "AES-CBC",
353 'scapy-integ': "HMAC-SHA1-96",
355 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
357 for engine in engines:
358 self.vapi.cli("set crypto handler all %s" % engine)
361 # loop through each of the algorithms
364 # with self.subTest(algo=algo['scapy']):
366 p = copy.copy(self.ipv4_params)
367 p.auth_algo_vpp_id = algo['vpp-integ']
368 p.crypt_algo_vpp_id = algo['vpp-crypto']
369 p.crypt_algo = algo['scapy-crypto']
370 p.auth_algo = algo['scapy-integ']
371 p.crypt_key = algo['key']
372 p.salt = algo['salt']
374 self.config_network(p)
376 self.verify_tun_44(p, count=127)
377 c = p.tun_if.get_rx_stats()
378 self.assertEqual(c['packets'], 127)
379 c = p.tun_if.get_tx_stats()
380 self.assertEqual(c['packets'], 127)
386 self.verify_tun_44(p, count=127)
388 self.unconfig_network(p)
389 p.tun_sa_out.remove_vpp_config()
390 p.tun_sa_in.remove_vpp_config()
393 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
394 """ IPsec IPv6 Multi Tunnel interface """
396 encryption_type = ESP
397 tun6_encrypt_node_name = "esp6-encrypt-tun"
398 tun6_decrypt_node_name = "esp6-decrypt"
401 super(TestIpsec6MultiTunIfEsp, self).setUp()
403 self.tun_if = self.pg0
405 self.multi_params = []
408 p = copy.copy(self.ipv6_params)
410 p.remote_tun_if_host = "1111::%d" % (ii + 1)
411 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
412 p.scapy_tun_spi = p.scapy_tun_spi + ii
413 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
414 p.vpp_tun_spi = p.vpp_tun_spi + ii
416 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
417 p.scapy_tra_spi = p.scapy_tra_spi + ii
418 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
419 p.vpp_tra_spi = p.vpp_tra_spi + ii
421 config_tun_params(p, self.encryption_type, self.tun_if)
422 self.multi_params.append(p)
424 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
427 p.crypt_key, p.crypt_key,
428 p.auth_algo_vpp_id, p.auth_key,
429 p.auth_key, is_ip6=True)
430 p.tun_if.add_vpp_config()
432 p.tun_if.config_ip6()
434 r = VppIpRoute(self, p.remote_tun_if_host, 128,
435 [VppRoutePath(p.tun_if.remote_ip6,
437 proto=DpoProto.DPO_PROTO_IP6)])
441 super(TestIpsec6MultiTunIfEsp, self).tearDown()
443 def test_tun_66(self):
444 """Multiple IPSEC tunnel interfaces """
445 for p in self.multi_params:
446 self.verify_tun_66(p, count=127)
447 c = p.tun_if.get_rx_stats()
448 self.assertEqual(c['packets'], 127)
449 c = p.tun_if.get_tx_stats()
450 self.assertEqual(c['packets'], 127)
453 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
454 "test doesn't work on aarch64")
455 class TestIpsecGreTebIfEsp(TemplateIpsec,
457 """ Ipsec GRE TEB ESP - TUN tests """
458 tun4_encrypt_node_name = "esp4-encrypt-tun"
459 tun4_decrypt_node_name = "esp4-decrypt-tun"
460 encryption_type = ESP
461 omac = "00:11:22:33:44:55"
463 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
465 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
466 sa.encrypt(IP(src=self.pg0.remote_ip4,
467 dst=self.pg0.local_ip4) /
469 Ether(dst=self.omac) /
470 IP(src="1.1.1.1", dst="1.1.1.2") /
471 UDP(sport=1144, dport=2233) /
472 Raw('X' * payload_size))
473 for i in range(count)]
475 def gen_pkts(self, sw_intf, src, dst, count=1,
477 return [Ether(dst=self.omac) /
478 IP(src="1.1.1.1", dst="1.1.1.2") /
479 UDP(sport=1144, dport=2233) /
480 Raw('X' * payload_size)
481 for i in range(count)]
483 def verify_decrypted(self, p, rxs):
485 self.assert_equal(rx[Ether].dst, self.omac)
486 self.assert_equal(rx[IP].dst, "1.1.1.2")
488 def verify_encrypted(self, p, sa, rxs):
491 pkt = sa.decrypt(rx[IP])
492 if not pkt.haslayer(IP):
493 pkt = IP(pkt[Raw].load)
494 self.assert_packet_checksums_valid(pkt)
495 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
496 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
497 self.assertTrue(pkt.haslayer(GRE))
499 self.assertEqual(e[Ether].dst, self.omac)
500 self.assertEqual(e[IP].dst, "1.1.1.2")
501 except (IndexError, AssertionError):
502 self.logger.debug(ppp("Unexpected packet:", rx))
504 self.logger.debug(ppp("Decrypted packet:", pkt))
510 super(TestIpsecGreTebIfEsp, self).setUp()
512 self.tun_if = self.pg0
516 bd1 = VppBridgeDomain(self, 1)
519 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
520 p.auth_algo_vpp_id, p.auth_key,
521 p.crypt_algo_vpp_id, p.crypt_key,
522 self.vpp_esp_protocol,
525 p.tun_sa_out.add_vpp_config()
527 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
528 p.auth_algo_vpp_id, p.auth_key,
529 p.crypt_algo_vpp_id, p.crypt_key,
530 self.vpp_esp_protocol,
533 p.tun_sa_in.add_vpp_config()
535 self.tun = VppGreInterface(self,
538 type=(VppEnum.vl_api_gre_tunnel_type_t.
539 GRE_API_TUNNEL_TYPE_TEB))
540 self.tun.add_vpp_config()
542 p.tun_protect = VppIpsecTunProtect(self,
547 p.tun_protect.add_vpp_config()
550 self.tun.config_ip4()
552 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
553 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
555 self.vapi.cli("clear ipsec sa")
558 self.tun.unconfig_ip4()
559 super(TestIpsecGreTebIfEsp, self).tearDown()
562 class TestIpsecGreIfEsp(TemplateIpsec,
564 """ Ipsec GRE ESP - TUN tests """
565 tun4_encrypt_node_name = "esp4-encrypt-tun"
566 tun4_decrypt_node_name = "esp4-decrypt-tun"
567 encryption_type = ESP
569 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
571 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
572 sa.encrypt(IP(src=self.pg0.remote_ip4,
573 dst=self.pg0.local_ip4) /
575 IP(src=self.pg1.local_ip4,
576 dst=self.pg1.remote_ip4) /
577 UDP(sport=1144, dport=2233) /
578 Raw('X' * payload_size))
579 for i in range(count)]
581 def gen_pkts(self, sw_intf, src, dst, count=1,
583 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
584 IP(src="1.1.1.1", dst="1.1.1.2") /
585 UDP(sport=1144, dport=2233) /
586 Raw('X' * payload_size)
587 for i in range(count)]
589 def verify_decrypted(self, p, rxs):
591 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
592 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
594 def verify_encrypted(self, p, sa, rxs):
597 pkt = sa.decrypt(rx[IP])
598 if not pkt.haslayer(IP):
599 pkt = IP(pkt[Raw].load)
600 self.assert_packet_checksums_valid(pkt)
601 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
602 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
603 self.assertTrue(pkt.haslayer(GRE))
605 self.assertEqual(e[IP].dst, "1.1.1.2")
606 except (IndexError, AssertionError):
607 self.logger.debug(ppp("Unexpected packet:", rx))
609 self.logger.debug(ppp("Decrypted packet:", pkt))
615 super(TestIpsecGreIfEsp, self).setUp()
617 self.tun_if = self.pg0
621 bd1 = VppBridgeDomain(self, 1)
624 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
625 p.auth_algo_vpp_id, p.auth_key,
626 p.crypt_algo_vpp_id, p.crypt_key,
627 self.vpp_esp_protocol,
630 p.tun_sa_out.add_vpp_config()
632 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
633 p.auth_algo_vpp_id, p.auth_key,
634 p.crypt_algo_vpp_id, p.crypt_key,
635 self.vpp_esp_protocol,
638 p.tun_sa_in.add_vpp_config()
640 self.tun = VppGreInterface(self,
643 self.tun.add_vpp_config()
645 p.tun_protect = VppIpsecTunProtect(self,
649 p.tun_protect.add_vpp_config()
652 self.tun.config_ip4()
654 VppIpRoute(self, "1.1.1.2", 32,
655 [VppRoutePath(self.tun.remote_ip4,
656 0xffffffff)]).add_vpp_config()
659 self.tun.unconfig_ip4()
660 super(TestIpsecGreIfEsp, self).tearDown()
663 class TemplateIpsec4TunProtect(object):
664 """ IPsec IPv4 Tunnel protect """
666 def config_sa_tra(self, p):
667 config_tun_params(p, self.encryption_type, self.tun_if)
669 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
670 p.auth_algo_vpp_id, p.auth_key,
671 p.crypt_algo_vpp_id, p.crypt_key,
672 self.vpp_esp_protocol)
673 p.tun_sa_out.add_vpp_config()
675 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
676 p.auth_algo_vpp_id, p.auth_key,
677 p.crypt_algo_vpp_id, p.crypt_key,
678 self.vpp_esp_protocol)
679 p.tun_sa_in.add_vpp_config()
681 def config_sa_tun(self, p):
682 config_tun_params(p, self.encryption_type, self.tun_if)
684 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
685 p.auth_algo_vpp_id, p.auth_key,
686 p.crypt_algo_vpp_id, p.crypt_key,
687 self.vpp_esp_protocol,
688 self.tun_if.remote_addr[p.addr_type],
689 self.tun_if.local_addr[p.addr_type])
690 p.tun_sa_out.add_vpp_config()
692 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
693 p.auth_algo_vpp_id, p.auth_key,
694 p.crypt_algo_vpp_id, p.crypt_key,
695 self.vpp_esp_protocol,
696 self.tun_if.remote_addr[p.addr_type],
697 self.tun_if.local_addr[p.addr_type])
698 p.tun_sa_in.add_vpp_config()
700 def config_protect(self, p):
701 p.tun_protect = VppIpsecTunProtect(self,
705 p.tun_protect.add_vpp_config()
707 def config_network(self, p):
708 p.tun_if = VppIpIpTunInterface(self, self.pg0,
711 p.tun_if.add_vpp_config()
713 p.tun_if.config_ip4()
715 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
716 [VppRoutePath(p.tun_if.remote_ip4,
718 p.route.add_vpp_config()
720 def unconfig_network(self, p):
721 p.route.remove_vpp_config()
722 p.tun_if.remove_vpp_config()
724 def unconfig_protect(self, p):
725 p.tun_protect.remove_vpp_config()
727 def unconfig_sa(self, p):
728 p.tun_sa_out.remove_vpp_config()
729 p.tun_sa_in.remove_vpp_config()
732 class TestIpsec4TunProtect(TemplateIpsec,
733 TemplateIpsec4TunProtect,
735 """ IPsec IPv4 Tunnel protect - transport mode"""
737 encryption_type = ESP
738 tun4_encrypt_node_name = "esp4-encrypt-tun"
739 tun4_decrypt_node_name = "esp4-decrypt-tun"
742 super(TestIpsec4TunProtect, self).setUp()
744 self.tun_if = self.pg0
747 super(TestIpsec4TunProtect, self).tearDown()
749 def test_tun_44(self):
750 """IPSEC tunnel protect"""
754 self.config_network(p)
755 self.config_sa_tra(p)
756 self.config_protect(p)
758 self.verify_tun_44(p, count=127)
759 c = p.tun_if.get_rx_stats()
760 self.assertEqual(c['packets'], 127)
761 c = p.tun_if.get_tx_stats()
762 self.assertEqual(c['packets'], 127)
764 # rekey - create new SAs and update the tunnel protection
766 np.crypt_key = 'X' + p.crypt_key[1:]
767 np.scapy_tun_spi += 100
768 np.scapy_tun_sa_id += 1
769 np.vpp_tun_spi += 100
770 np.vpp_tun_sa_id += 1
771 np.tun_if.local_spi = p.vpp_tun_spi
772 np.tun_if.remote_spi = p.scapy_tun_spi
774 self.config_sa_tra(np)
775 self.config_protect(np)
778 self.verify_tun_44(np, count=127)
779 c = p.tun_if.get_rx_stats()
780 self.assertEqual(c['packets'], 254)
781 c = p.tun_if.get_tx_stats()
782 self.assertEqual(c['packets'], 254)
785 self.unconfig_protect(np)
787 self.unconfig_network(p)
790 class TestIpsec4TunProtectTun(TemplateIpsec,
791 TemplateIpsec4TunProtect,
793 """ IPsec IPv4 Tunnel protect - tunnel mode"""
795 encryption_type = ESP
796 tun4_encrypt_node_name = "esp4-encrypt-tun"
797 tun4_decrypt_node_name = "esp4-decrypt-tun"
800 super(TestIpsec4TunProtectTun, self).setUp()
802 self.tun_if = self.pg0
805 super(TestIpsec4TunProtectTun, self).tearDown()
807 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
809 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
810 sa.encrypt(IP(src=sw_intf.remote_ip4,
811 dst=sw_intf.local_ip4) /
812 IP(src=src, dst=dst) /
813 UDP(sport=1144, dport=2233) /
814 Raw('X' * payload_size))
815 for i in range(count)]
817 def gen_pkts(self, sw_intf, src, dst, count=1,
819 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
820 IP(src=src, dst=dst) /
821 UDP(sport=1144, dport=2233) /
822 Raw('X' * payload_size)
823 for i in range(count)]
825 def verify_decrypted(self, p, rxs):
827 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
828 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
829 self.assert_packet_checksums_valid(rx)
831 def verify_encrypted(self, p, sa, rxs):
834 pkt = sa.decrypt(rx[IP])
835 if not pkt.haslayer(IP):
836 pkt = IP(pkt[Raw].load)
837 self.assert_packet_checksums_valid(pkt)
838 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
839 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
840 inner = pkt[IP].payload
841 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
843 except (IndexError, AssertionError):
844 self.logger.debug(ppp("Unexpected packet:", rx))
846 self.logger.debug(ppp("Decrypted packet:", pkt))
851 def test_tun_44(self):
852 """IPSEC tunnel protect """
856 self.config_network(p)
857 self.config_sa_tun(p)
858 self.config_protect(p)
860 self.verify_tun_44(p, count=127)
862 c = p.tun_if.get_rx_stats()
863 self.assertEqual(c['packets'], 127)
864 c = p.tun_if.get_tx_stats()
865 self.assertEqual(c['packets'], 127)
867 # rekey - create new SAs and update the tunnel protection
869 np.crypt_key = 'X' + p.crypt_key[1:]
870 np.scapy_tun_spi += 100
871 np.scapy_tun_sa_id += 1
872 np.vpp_tun_spi += 100
873 np.vpp_tun_sa_id += 1
874 np.tun_if.local_spi = p.vpp_tun_spi
875 np.tun_if.remote_spi = p.scapy_tun_spi
877 self.config_sa_tun(np)
878 self.config_protect(np)
881 self.verify_tun_44(np, count=127)
882 c = p.tun_if.get_rx_stats()
883 self.assertEqual(c['packets'], 254)
884 c = p.tun_if.get_tx_stats()
885 self.assertEqual(c['packets'], 254)
888 self.unconfig_protect(np)
890 self.unconfig_network(p)
893 class TemplateIpsec6TunProtect(object):
894 """ IPsec IPv6 Tunnel protect """
896 def config_sa_tra(self, p):
897 config_tun_params(p, self.encryption_type, self.tun_if)
899 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
900 p.auth_algo_vpp_id, p.auth_key,
901 p.crypt_algo_vpp_id, p.crypt_key,
902 self.vpp_esp_protocol)
903 p.tun_sa_out.add_vpp_config()
905 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
906 p.auth_algo_vpp_id, p.auth_key,
907 p.crypt_algo_vpp_id, p.crypt_key,
908 self.vpp_esp_protocol)
909 p.tun_sa_in.add_vpp_config()
911 def config_sa_tun(self, p):
912 config_tun_params(p, self.encryption_type, self.tun_if)
914 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
915 p.auth_algo_vpp_id, p.auth_key,
916 p.crypt_algo_vpp_id, p.crypt_key,
917 self.vpp_esp_protocol,
918 self.tun_if.remote_addr[p.addr_type],
919 self.tun_if.local_addr[p.addr_type])
920 p.tun_sa_out.add_vpp_config()
922 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
923 p.auth_algo_vpp_id, p.auth_key,
924 p.crypt_algo_vpp_id, p.crypt_key,
925 self.vpp_esp_protocol,
926 self.tun_if.remote_addr[p.addr_type],
927 self.tun_if.local_addr[p.addr_type])
928 p.tun_sa_in.add_vpp_config()
930 def config_protect(self, p):
931 p.tun_protect = VppIpsecTunProtect(self,
935 p.tun_protect.add_vpp_config()
937 def config_network(self, p):
938 p.tun_if = VppIpIpTunInterface(self, self.pg0,
941 p.tun_if.add_vpp_config()
943 p.tun_if.config_ip6()
945 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
946 [VppRoutePath(p.tun_if.remote_ip6,
948 proto=DpoProto.DPO_PROTO_IP6)])
949 p.route.add_vpp_config()
951 def unconfig_network(self, p):
952 p.route.remove_vpp_config()
953 p.tun_if.remove_vpp_config()
955 def unconfig_protect(self, p):
956 p.tun_protect.remove_vpp_config()
958 def unconfig_sa(self, p):
959 p.tun_sa_out.remove_vpp_config()
960 p.tun_sa_in.remove_vpp_config()
963 class TestIpsec6TunProtect(TemplateIpsec,
964 TemplateIpsec6TunProtect,
966 """ IPsec IPv6 Tunnel protect - transport mode"""
968 encryption_type = ESP
969 tun6_encrypt_node_name = "esp6-encrypt-tun"
970 tun6_decrypt_node_name = "esp6-decrypt-tun"
973 super(TestIpsec6TunProtect, self).setUp()
975 self.tun_if = self.pg0
978 super(TestIpsec6TunProtect, self).tearDown()
980 def test_tun_66(self):
981 """IPSEC tunnel protect"""
985 self.config_network(p)
986 self.config_sa_tra(p)
987 self.config_protect(p)
989 self.verify_tun_66(p, count=127)
990 c = p.tun_if.get_rx_stats()
991 self.assertEqual(c['packets'], 127)
992 c = p.tun_if.get_tx_stats()
993 self.assertEqual(c['packets'], 127)
995 # rekey - create new SAs and update the tunnel protection
997 np.crypt_key = 'X' + p.crypt_key[1:]
998 np.scapy_tun_spi += 100
999 np.scapy_tun_sa_id += 1
1000 np.vpp_tun_spi += 100
1001 np.vpp_tun_sa_id += 1
1002 np.tun_if.local_spi = p.vpp_tun_spi
1003 np.tun_if.remote_spi = p.scapy_tun_spi
1005 self.config_sa_tra(np)
1006 self.config_protect(np)
1009 self.verify_tun_66(np, count=127)
1010 c = p.tun_if.get_rx_stats()
1011 self.assertEqual(c['packets'], 254)
1012 c = p.tun_if.get_tx_stats()
1013 self.assertEqual(c['packets'], 254)
1016 # 1) add two input SAs [old, new]
1017 # 2) swap output SA to [new]
1018 # 3) use only [new] input SA
1020 np3.crypt_key = 'Z' + p.crypt_key[1:]
1021 np3.scapy_tun_spi += 100
1022 np3.scapy_tun_sa_id += 1
1023 np3.vpp_tun_spi += 100
1024 np3.vpp_tun_sa_id += 1
1025 np3.tun_if.local_spi = p.vpp_tun_spi
1026 np3.tun_if.remote_spi = p.scapy_tun_spi
1028 self.config_sa_tra(np3)
1031 p.tun_protect.update_vpp_config(np.tun_sa_out,
1032 [np.tun_sa_in, np3.tun_sa_in])
1033 self.verify_tun_66(np, np, count=127)
1034 self.verify_tun_66(np3, np, count=127)
1037 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1038 [np.tun_sa_in, np3.tun_sa_in])
1039 self.verify_tun_66(np, np3, count=127)
1040 self.verify_tun_66(np3, np3, count=127)
1043 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1045 self.verify_tun_66(np3, np3, count=127)
1046 self.verify_drop_tun_66(np, count=127)
1048 c = p.tun_if.get_rx_stats()
1049 self.assertEqual(c['packets'], 127*7)
1050 c = p.tun_if.get_tx_stats()
1051 self.assertEqual(c['packets'], 127*7)
1052 self.unconfig_sa(np)
1055 self.unconfig_protect(np3)
1056 self.unconfig_sa(np3)
1057 self.unconfig_network(p)
1060 class TestIpsec6TunProtectTun(TemplateIpsec,
1061 TemplateIpsec6TunProtect,
1063 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1065 encryption_type = ESP
1066 tun6_encrypt_node_name = "esp6-encrypt-tun"
1067 tun6_decrypt_node_name = "esp6-decrypt-tun"
1070 super(TestIpsec6TunProtectTun, self).setUp()
1072 self.tun_if = self.pg0
1075 super(TestIpsec6TunProtectTun, self).tearDown()
1077 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1079 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1080 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1081 dst=sw_intf.local_ip6) /
1082 IPv6(src=src, dst=dst) /
1083 UDP(sport=1166, dport=2233) /
1084 Raw('X' * payload_size))
1085 for i in range(count)]
1087 def gen_pkts6(self, sw_intf, src, dst, count=1,
1089 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1090 IPv6(src=src, dst=dst) /
1091 UDP(sport=1166, dport=2233) /
1092 Raw('X' * payload_size)
1093 for i in range(count)]
1095 def verify_decrypted6(self, p, rxs):
1097 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1098 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1099 self.assert_packet_checksums_valid(rx)
1101 def verify_encrypted6(self, p, sa, rxs):
1104 pkt = sa.decrypt(rx[IPv6])
1105 if not pkt.haslayer(IPv6):
1106 pkt = IPv6(pkt[Raw].load)
1107 self.assert_packet_checksums_valid(pkt)
1108 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1109 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1110 inner = pkt[IPv6].payload
1111 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1113 except (IndexError, AssertionError):
1114 self.logger.debug(ppp("Unexpected packet:", rx))
1116 self.logger.debug(ppp("Decrypted packet:", pkt))
1121 def test_tun_66(self):
1122 """IPSEC tunnel protect """
1124 p = self.ipv6_params
1126 self.config_network(p)
1127 self.config_sa_tun(p)
1128 self.config_protect(p)
1130 self.verify_tun_66(p, count=127)
1132 c = p.tun_if.get_rx_stats()
1133 self.assertEqual(c['packets'], 127)
1134 c = p.tun_if.get_tx_stats()
1135 self.assertEqual(c['packets'], 127)
1137 # rekey - create new SAs and update the tunnel protection
1139 np.crypt_key = 'X' + p.crypt_key[1:]
1140 np.scapy_tun_spi += 100
1141 np.scapy_tun_sa_id += 1
1142 np.vpp_tun_spi += 100
1143 np.vpp_tun_sa_id += 1
1144 np.tun_if.local_spi = p.vpp_tun_spi
1145 np.tun_if.remote_spi = p.scapy_tun_spi
1147 self.config_sa_tun(np)
1148 self.config_protect(np)
1151 self.verify_tun_66(np, count=127)
1152 c = p.tun_if.get_rx_stats()
1153 self.assertEqual(c['packets'], 254)
1154 c = p.tun_if.get_tx_stats()
1155 self.assertEqual(c['packets'], 254)
1158 self.unconfig_protect(np)
1159 self.unconfig_sa(np)
1160 self.unconfig_network(p)
1163 if __name__ == '__main__':
1164 unittest.main(testRunner=VppTestRunner)