5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key
13 from vpp_ipsec_tun_interface import VppIpsecTunInterface
14 from vpp_gre_interface import VppGreInterface
15 from vpp_ipip_tun_interface import VppIpIpTunInterface
16 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
17 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
18 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_papi import VppEnum
23 def config_tun_params(p, encryption_type, tun_if):
24 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
25 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
26 IPSEC_API_SAD_FLAG_USE_ESN))
27 crypt_key = mk_scapy_crypt_key(p)
28 p.scapy_tun_sa = SecurityAssociation(
29 encryption_type, spi=p.vpp_tun_spi,
30 crypt_algo=p.crypt_algo,
32 auth_algo=p.auth_algo, auth_key=p.auth_key,
33 tunnel_header=ip_class_by_addr_type[p.addr_type](
36 nat_t_header=p.nat_header,
38 p.vpp_tun_sa = SecurityAssociation(
39 encryption_type, spi=p.scapy_tun_spi,
40 crypt_algo=p.crypt_algo,
42 auth_algo=p.auth_algo, auth_key=p.auth_key,
43 tunnel_header=ip_class_by_addr_type[p.addr_type](
46 nat_t_header=p.nat_header,
50 def config_tra_params(p, encryption_type, tun_if):
51 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
52 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
53 IPSEC_API_SAD_FLAG_USE_ESN))
54 crypt_key = mk_scapy_crypt_key(p)
55 p.scapy_tun_sa = SecurityAssociation(
56 encryption_type, spi=p.vpp_tun_spi,
57 crypt_algo=p.crypt_algo,
59 auth_algo=p.auth_algo, auth_key=p.auth_key,
61 p.vpp_tun_sa = SecurityAssociation(
62 encryption_type, spi=p.scapy_tun_spi,
63 crypt_algo=p.crypt_algo,
65 auth_algo=p.auth_algo, auth_key=p.auth_key,
69 class TemplateIpsec4TunIfEsp(TemplateIpsec):
70 """ IPsec tunnel interface tests """
76 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
79 def tearDownClass(cls):
80 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
83 super(TemplateIpsec4TunIfEsp, self).setUp()
85 self.tun_if = self.pg0
89 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
90 p.scapy_tun_spi, p.crypt_algo_vpp_id,
91 p.crypt_key, p.crypt_key,
92 p.auth_algo_vpp_id, p.auth_key,
94 p.tun_if.add_vpp_config()
98 config_tun_params(p, self.encryption_type, p.tun_if)
100 r = VppIpRoute(self, p.remote_tun_if_host, 32,
101 [VppRoutePath(p.tun_if.remote_ip4,
104 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
105 [VppRoutePath(p.tun_if.remote_ip6,
107 proto=DpoProto.DPO_PROTO_IP6)])
111 super(TemplateIpsec4TunIfEsp, self).tearDown()
114 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
115 """ IPsec UDP tunnel interface tests """
117 tun4_encrypt_node_name = "esp4-encrypt-tun"
118 tun4_decrypt_node_name = "esp4-decrypt-tun"
119 encryption_type = ESP
123 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
126 def tearDownClass(cls):
127 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
130 super(TemplateIpsec4TunIfEspUdp, self).setUp()
132 self.tun_if = self.pg0
135 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
136 IPSEC_API_SAD_FLAG_UDP_ENCAP)
137 p.nat_header = UDP(sport=5454, dport=4500)
139 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
140 p.scapy_tun_spi, p.crypt_algo_vpp_id,
141 p.crypt_key, p.crypt_key,
142 p.auth_algo_vpp_id, p.auth_key,
143 p.auth_key, udp_encap=True)
144 p.tun_if.add_vpp_config()
146 p.tun_if.config_ip4()
147 p.tun_if.config_ip6()
148 config_tun_params(p, self.encryption_type, p.tun_if)
150 r = VppIpRoute(self, p.remote_tun_if_host, 32,
151 [VppRoutePath(p.tun_if.remote_ip4,
154 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
155 [VppRoutePath(p.tun_if.remote_ip6,
157 proto=DpoProto.DPO_PROTO_IP6)])
161 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
164 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
165 """ Ipsec ESP - TUN tests """
166 tun4_encrypt_node_name = "esp4-encrypt-tun"
167 tun4_decrypt_node_name = "esp4-decrypt-tun"
169 def test_tun_basic64(self):
170 """ ipsec 6o4 tunnel basic test """
171 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
173 self.verify_tun_64(self.params[socket.AF_INET], count=1)
175 def test_tun_burst64(self):
176 """ ipsec 6o4 tunnel basic test """
177 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
179 self.verify_tun_64(self.params[socket.AF_INET], count=257)
181 def test_tun_basic_frag44(self):
182 """ ipsec 4o4 tunnel frag basic test """
183 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
187 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
189 self.verify_tun_44(self.params[socket.AF_INET],
190 count=1, payload_size=1800, n_rx=2)
191 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
195 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
196 """ Ipsec ESP UDP tests """
198 tun4_input_node = "ipsec4-tun-input"
200 def test_keepalive(self):
201 """ IPSEC NAT Keepalive """
202 self.verify_keepalive(self.ipv4_params)
205 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
206 """ Ipsec ESP - TCP tests """
210 class TemplateIpsec6TunIfEsp(TemplateIpsec):
211 """ IPsec tunnel interface tests """
213 encryption_type = ESP
216 super(TemplateIpsec6TunIfEsp, self).setUp()
218 self.tun_if = self.pg0
221 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
222 p.scapy_tun_spi, p.crypt_algo_vpp_id,
223 p.crypt_key, p.crypt_key,
224 p.auth_algo_vpp_id, p.auth_key,
225 p.auth_key, is_ip6=True)
226 p.tun_if.add_vpp_config()
228 p.tun_if.config_ip6()
229 p.tun_if.config_ip4()
230 config_tun_params(p, self.encryption_type, p.tun_if)
232 r = VppIpRoute(self, p.remote_tun_if_host, 128,
233 [VppRoutePath(p.tun_if.remote_ip6,
235 proto=DpoProto.DPO_PROTO_IP6)])
237 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
238 [VppRoutePath(p.tun_if.remote_ip4,
243 super(TemplateIpsec6TunIfEsp, self).tearDown()
246 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
247 """ Ipsec ESP - TUN tests """
248 tun6_encrypt_node_name = "esp6-encrypt-tun"
249 tun6_decrypt_node_name = "esp6-decrypt-tun"
251 def test_tun_basic46(self):
252 """ ipsec 4o6 tunnel basic test """
253 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
254 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
256 def test_tun_burst46(self):
257 """ ipsec 4o6 tunnel burst test """
258 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
259 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
262 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
263 """ IPsec IPv4 Multi Tunnel interface """
265 encryption_type = ESP
266 tun4_encrypt_node_name = "esp4-encrypt-tun"
267 tun4_decrypt_node_name = "esp4-decrypt-tun"
270 super(TestIpsec4MultiTunIfEsp, self).setUp()
272 self.tun_if = self.pg0
274 self.multi_params = []
275 self.pg0.generate_remote_hosts(10)
276 self.pg0.configure_ipv4_neighbors()
279 p = copy.copy(self.ipv4_params)
281 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
282 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
283 p.scapy_tun_spi = p.scapy_tun_spi + ii
284 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
285 p.vpp_tun_spi = p.vpp_tun_spi + ii
287 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
288 p.scapy_tra_spi = p.scapy_tra_spi + ii
289 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
290 p.vpp_tra_spi = p.vpp_tra_spi + ii
292 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
295 p.crypt_key, p.crypt_key,
296 p.auth_algo_vpp_id, p.auth_key,
298 dst=self.pg0.remote_hosts[ii].ip4)
299 p.tun_if.add_vpp_config()
301 p.tun_if.config_ip4()
302 config_tun_params(p, self.encryption_type, p.tun_if)
303 self.multi_params.append(p)
305 VppIpRoute(self, p.remote_tun_if_host, 32,
306 [VppRoutePath(p.tun_if.remote_ip4,
307 0xffffffff)]).add_vpp_config()
310 super(TestIpsec4MultiTunIfEsp, self).tearDown()
312 def test_tun_44(self):
313 """Multiple IPSEC tunnel interfaces """
314 for p in self.multi_params:
315 self.verify_tun_44(p, count=127)
316 c = p.tun_if.get_rx_stats()
317 self.assertEqual(c['packets'], 127)
318 c = p.tun_if.get_tx_stats()
319 self.assertEqual(c['packets'], 127)
322 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
323 """ IPsec IPv4 Tunnel interface all Algos """
325 encryption_type = ESP
326 tun4_encrypt_node_name = "esp4-encrypt-tun"
327 tun4_decrypt_node_name = "esp4-decrypt-tun"
329 def config_network(self, p):
331 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
334 p.crypt_key, p.crypt_key,
335 p.auth_algo_vpp_id, p.auth_key,
338 p.tun_if.add_vpp_config()
340 p.tun_if.config_ip4()
341 config_tun_params(p, self.encryption_type, p.tun_if)
342 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
343 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
345 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
346 [VppRoutePath(p.tun_if.remote_ip4,
348 p.route.add_vpp_config()
350 def unconfig_network(self, p):
351 p.tun_if.unconfig_ip4()
352 p.tun_if.remove_vpp_config()
353 p.route.remove_vpp_config()
356 super(TestIpsec4TunIfEspAll, self).setUp()
358 self.tun_if = self.pg0
361 super(TestIpsec4TunIfEspAll, self).tearDown()
365 # change the key and the SPI
367 p.crypt_key = b'X' + p.crypt_key[1:]
369 p.scapy_tun_sa_id += 1
372 p.tun_if.local_spi = p.vpp_tun_spi
373 p.tun_if.remote_spi = p.scapy_tun_spi
375 config_tun_params(p, self.encryption_type, p.tun_if)
377 p.tun_sa_in = VppIpsecSA(self,
384 self.vpp_esp_protocol,
387 p.tun_sa_out = VppIpsecSA(self,
394 self.vpp_esp_protocol,
397 p.tun_sa_in.add_vpp_config()
398 p.tun_sa_out.add_vpp_config()
400 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
401 sa_id=p.tun_sa_in.id,
403 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
404 sa_id=p.tun_sa_out.id,
406 self.logger.info(self.vapi.cli("sh ipsec sa"))
408 def test_tun_44(self):
409 """IPSEC tunnel all algos """
411 # foreach VPP crypto engine
412 engines = ["ia32", "ipsecmb", "openssl"]
414 # foreach crypto algorithm
415 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
416 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
417 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
418 IPSEC_API_INTEG_ALG_NONE),
419 'scapy-crypto': "AES-GCM",
420 'scapy-integ': "NULL",
421 'key': b"JPjyOWBeVEQiMe7h",
423 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
424 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
425 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
426 IPSEC_API_INTEG_ALG_NONE),
427 'scapy-crypto': "AES-GCM",
428 'scapy-integ': "NULL",
429 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
431 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
432 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
433 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
434 IPSEC_API_INTEG_ALG_NONE),
435 'scapy-crypto': "AES-GCM",
436 'scapy-integ': "NULL",
437 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
439 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
440 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
441 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
442 IPSEC_API_INTEG_ALG_SHA1_96),
443 'scapy-crypto': "AES-CBC",
444 'scapy-integ': "HMAC-SHA1-96",
446 'key': b"JPjyOWBeVEQiMe7h"},
447 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
448 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
449 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
450 IPSEC_API_INTEG_ALG_SHA1_96),
451 'scapy-crypto': "AES-CBC",
452 'scapy-integ': "HMAC-SHA1-96",
454 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
455 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
456 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
457 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
458 IPSEC_API_INTEG_ALG_SHA1_96),
459 'scapy-crypto': "AES-CBC",
460 'scapy-integ': "HMAC-SHA1-96",
462 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
463 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
464 IPSEC_API_CRYPTO_ALG_NONE),
465 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
466 IPSEC_API_INTEG_ALG_SHA1_96),
467 'scapy-crypto': "NULL",
468 'scapy-integ': "HMAC-SHA1-96",
470 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
472 for engine in engines:
473 self.vapi.cli("set crypto handler all %s" % engine)
476 # loop through each of the algorithms
479 # with self.subTest(algo=algo['scapy']):
481 p = copy.copy(self.ipv4_params)
482 p.auth_algo_vpp_id = algo['vpp-integ']
483 p.crypt_algo_vpp_id = algo['vpp-crypto']
484 p.crypt_algo = algo['scapy-crypto']
485 p.auth_algo = algo['scapy-integ']
486 p.crypt_key = algo['key']
487 p.salt = algo['salt']
489 self.config_network(p)
491 self.verify_tun_44(p, count=127)
492 c = p.tun_if.get_rx_stats()
493 self.assertEqual(c['packets'], 127)
494 c = p.tun_if.get_tx_stats()
495 self.assertEqual(c['packets'], 127)
501 self.verify_tun_44(p, count=127)
503 self.unconfig_network(p)
504 p.tun_sa_out.remove_vpp_config()
505 p.tun_sa_in.remove_vpp_config()
508 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
509 """ IPsec IPv6 Multi Tunnel interface """
511 encryption_type = ESP
512 tun6_encrypt_node_name = "esp6-encrypt-tun"
513 tun6_decrypt_node_name = "esp6-decrypt-tun"
516 super(TestIpsec6MultiTunIfEsp, self).setUp()
518 self.tun_if = self.pg0
520 self.multi_params = []
521 self.pg0.generate_remote_hosts(10)
522 self.pg0.configure_ipv6_neighbors()
525 p = copy.copy(self.ipv6_params)
527 p.remote_tun_if_host = "1111::%d" % (ii + 1)
528 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
529 p.scapy_tun_spi = p.scapy_tun_spi + ii
530 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
531 p.vpp_tun_spi = p.vpp_tun_spi + ii
533 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
534 p.scapy_tra_spi = p.scapy_tra_spi + ii
535 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
536 p.vpp_tra_spi = p.vpp_tra_spi + ii
538 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
541 p.crypt_key, p.crypt_key,
542 p.auth_algo_vpp_id, p.auth_key,
543 p.auth_key, is_ip6=True,
544 dst=self.pg0.remote_hosts[ii].ip6)
545 p.tun_if.add_vpp_config()
547 p.tun_if.config_ip6()
548 config_tun_params(p, self.encryption_type, p.tun_if)
549 self.multi_params.append(p)
551 r = VppIpRoute(self, p.remote_tun_if_host, 128,
552 [VppRoutePath(p.tun_if.remote_ip6,
554 proto=DpoProto.DPO_PROTO_IP6)])
558 super(TestIpsec6MultiTunIfEsp, self).tearDown()
560 def test_tun_66(self):
561 """Multiple IPSEC tunnel interfaces """
562 for p in self.multi_params:
563 self.verify_tun_66(p, count=127)
564 c = p.tun_if.get_rx_stats()
565 self.assertEqual(c['packets'], 127)
566 c = p.tun_if.get_tx_stats()
567 self.assertEqual(c['packets'], 127)
570 class TestIpsecGreTebIfEsp(TemplateIpsec,
572 """ Ipsec GRE TEB ESP - TUN tests """
573 tun4_encrypt_node_name = "esp4-encrypt-tun"
574 tun4_decrypt_node_name = "esp4-decrypt-tun"
575 encryption_type = ESP
576 omac = "00:11:22:33:44:55"
578 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
580 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
581 sa.encrypt(IP(src=self.pg0.remote_ip4,
582 dst=self.pg0.local_ip4) /
584 Ether(dst=self.omac) /
585 IP(src="1.1.1.1", dst="1.1.1.2") /
586 UDP(sport=1144, dport=2233) /
587 Raw(b'X' * payload_size))
588 for i in range(count)]
590 def gen_pkts(self, sw_intf, src, dst, count=1,
592 return [Ether(dst=self.omac) /
593 IP(src="1.1.1.1", dst="1.1.1.2") /
594 UDP(sport=1144, dport=2233) /
595 Raw(b'X' * payload_size)
596 for i in range(count)]
598 def verify_decrypted(self, p, rxs):
600 self.assert_equal(rx[Ether].dst, self.omac)
601 self.assert_equal(rx[IP].dst, "1.1.1.2")
603 def verify_encrypted(self, p, sa, rxs):
606 pkt = sa.decrypt(rx[IP])
607 if not pkt.haslayer(IP):
608 pkt = IP(pkt[Raw].load)
609 self.assert_packet_checksums_valid(pkt)
610 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
611 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
612 self.assertTrue(pkt.haslayer(GRE))
614 self.assertEqual(e[Ether].dst, self.omac)
615 self.assertEqual(e[IP].dst, "1.1.1.2")
616 except (IndexError, AssertionError):
617 self.logger.debug(ppp("Unexpected packet:", rx))
619 self.logger.debug(ppp("Decrypted packet:", pkt))
625 super(TestIpsecGreTebIfEsp, self).setUp()
627 self.tun_if = self.pg0
631 bd1 = VppBridgeDomain(self, 1)
634 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
635 p.auth_algo_vpp_id, p.auth_key,
636 p.crypt_algo_vpp_id, p.crypt_key,
637 self.vpp_esp_protocol,
640 p.tun_sa_out.add_vpp_config()
642 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
643 p.auth_algo_vpp_id, p.auth_key,
644 p.crypt_algo_vpp_id, p.crypt_key,
645 self.vpp_esp_protocol,
648 p.tun_sa_in.add_vpp_config()
650 p.tun_if = VppGreInterface(self,
653 type=(VppEnum.vl_api_gre_tunnel_type_t.
654 GRE_API_TUNNEL_TYPE_TEB))
655 p.tun_if.add_vpp_config()
657 p.tun_protect = VppIpsecTunProtect(self,
662 p.tun_protect.add_vpp_config()
665 p.tun_if.config_ip4()
666 config_tun_params(p, self.encryption_type, p.tun_if)
668 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
669 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
671 self.vapi.cli("clear ipsec sa")
675 p.tun_if.unconfig_ip4()
676 super(TestIpsecGreTebIfEsp, self).tearDown()
679 class TestIpsecGreTebIfEspTra(TemplateIpsec,
681 """ Ipsec GRE TEB ESP - Tra tests """
682 tun4_encrypt_node_name = "esp4-encrypt-tun"
683 tun4_decrypt_node_name = "esp4-decrypt-tun"
684 encryption_type = ESP
685 omac = "00:11:22:33:44:55"
687 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
689 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
690 sa.encrypt(IP(src=self.pg0.remote_ip4,
691 dst=self.pg0.local_ip4) /
693 Ether(dst=self.omac) /
694 IP(src="1.1.1.1", dst="1.1.1.2") /
695 UDP(sport=1144, dport=2233) /
696 Raw(b'X' * payload_size))
697 for i in range(count)]
699 def gen_pkts(self, sw_intf, src, dst, count=1,
701 return [Ether(dst=self.omac) /
702 IP(src="1.1.1.1", dst="1.1.1.2") /
703 UDP(sport=1144, dport=2233) /
704 Raw(b'X' * payload_size)
705 for i in range(count)]
707 def verify_decrypted(self, p, rxs):
709 self.assert_equal(rx[Ether].dst, self.omac)
710 self.assert_equal(rx[IP].dst, "1.1.1.2")
712 def verify_encrypted(self, p, sa, rxs):
715 pkt = sa.decrypt(rx[IP])
716 if not pkt.haslayer(IP):
717 pkt = IP(pkt[Raw].load)
718 self.assert_packet_checksums_valid(pkt)
719 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
720 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
721 self.assertTrue(pkt.haslayer(GRE))
723 self.assertEqual(e[Ether].dst, self.omac)
724 self.assertEqual(e[IP].dst, "1.1.1.2")
725 except (IndexError, AssertionError):
726 self.logger.debug(ppp("Unexpected packet:", rx))
728 self.logger.debug(ppp("Decrypted packet:", pkt))
734 super(TestIpsecGreTebIfEspTra, self).setUp()
736 self.tun_if = self.pg0
740 bd1 = VppBridgeDomain(self, 1)
743 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
744 p.auth_algo_vpp_id, p.auth_key,
745 p.crypt_algo_vpp_id, p.crypt_key,
746 self.vpp_esp_protocol)
747 p.tun_sa_out.add_vpp_config()
749 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
750 p.auth_algo_vpp_id, p.auth_key,
751 p.crypt_algo_vpp_id, p.crypt_key,
752 self.vpp_esp_protocol)
753 p.tun_sa_in.add_vpp_config()
755 p.tun_if = VppGreInterface(self,
758 type=(VppEnum.vl_api_gre_tunnel_type_t.
759 GRE_API_TUNNEL_TYPE_TEB))
760 p.tun_if.add_vpp_config()
762 p.tun_protect = VppIpsecTunProtect(self,
767 p.tun_protect.add_vpp_config()
770 p.tun_if.config_ip4()
771 config_tra_params(p, self.encryption_type, p.tun_if)
773 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
774 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
776 self.vapi.cli("clear ipsec sa")
780 p.tun_if.unconfig_ip4()
781 super(TestIpsecGreTebIfEspTra, self).tearDown()
784 class TestIpsecGreIfEsp(TemplateIpsec,
786 """ Ipsec GRE ESP - TUN tests """
787 tun4_encrypt_node_name = "esp4-encrypt-tun"
788 tun4_decrypt_node_name = "esp4-decrypt-tun"
789 encryption_type = ESP
791 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
793 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
794 sa.encrypt(IP(src=self.pg0.remote_ip4,
795 dst=self.pg0.local_ip4) /
797 IP(src=self.pg1.local_ip4,
798 dst=self.pg1.remote_ip4) /
799 UDP(sport=1144, dport=2233) /
800 Raw(b'X' * payload_size))
801 for i in range(count)]
803 def gen_pkts(self, sw_intf, src, dst, count=1,
805 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
806 IP(src="1.1.1.1", dst="1.1.1.2") /
807 UDP(sport=1144, dport=2233) /
808 Raw(b'X' * payload_size)
809 for i in range(count)]
811 def verify_decrypted(self, p, rxs):
813 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
814 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
816 def verify_encrypted(self, p, sa, rxs):
819 pkt = sa.decrypt(rx[IP])
820 if not pkt.haslayer(IP):
821 pkt = IP(pkt[Raw].load)
822 self.assert_packet_checksums_valid(pkt)
823 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
824 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
825 self.assertTrue(pkt.haslayer(GRE))
827 self.assertEqual(e[IP].dst, "1.1.1.2")
828 except (IndexError, AssertionError):
829 self.logger.debug(ppp("Unexpected packet:", rx))
831 self.logger.debug(ppp("Decrypted packet:", pkt))
837 super(TestIpsecGreIfEsp, self).setUp()
839 self.tun_if = self.pg0
843 bd1 = VppBridgeDomain(self, 1)
846 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
847 p.auth_algo_vpp_id, p.auth_key,
848 p.crypt_algo_vpp_id, p.crypt_key,
849 self.vpp_esp_protocol,
852 p.tun_sa_out.add_vpp_config()
854 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
855 p.auth_algo_vpp_id, p.auth_key,
856 p.crypt_algo_vpp_id, p.crypt_key,
857 self.vpp_esp_protocol,
860 p.tun_sa_in.add_vpp_config()
862 p.tun_if = VppGreInterface(self,
865 p.tun_if.add_vpp_config()
867 p.tun_protect = VppIpsecTunProtect(self,
871 p.tun_protect.add_vpp_config()
874 p.tun_if.config_ip4()
875 config_tun_params(p, self.encryption_type, p.tun_if)
877 VppIpRoute(self, "1.1.1.2", 32,
878 [VppRoutePath(p.tun_if.remote_ip4,
879 0xffffffff)]).add_vpp_config()
883 p.tun_if.unconfig_ip4()
884 super(TestIpsecGreIfEsp, self).tearDown()
887 class TestIpsecGreIfEspTra(TemplateIpsec,
889 """ Ipsec GRE ESP - TRA tests """
890 tun4_encrypt_node_name = "esp4-encrypt-tun"
891 tun4_decrypt_node_name = "esp4-decrypt-tun"
892 encryption_type = ESP
894 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
896 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
897 sa.encrypt(IP(src=self.pg0.remote_ip4,
898 dst=self.pg0.local_ip4) /
900 IP(src=self.pg1.local_ip4,
901 dst=self.pg1.remote_ip4) /
902 UDP(sport=1144, dport=2233) /
903 Raw(b'X' * payload_size))
904 for i in range(count)]
906 def gen_pkts(self, sw_intf, src, dst, count=1,
908 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
909 IP(src="1.1.1.1", dst="1.1.1.2") /
910 UDP(sport=1144, dport=2233) /
911 Raw(b'X' * payload_size)
912 for i in range(count)]
914 def verify_decrypted(self, p, rxs):
916 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
917 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
919 def verify_encrypted(self, p, sa, rxs):
922 pkt = sa.decrypt(rx[IP])
923 if not pkt.haslayer(IP):
924 pkt = IP(pkt[Raw].load)
925 self.assert_packet_checksums_valid(pkt)
926 self.assertTrue(pkt.haslayer(GRE))
928 self.assertEqual(e[IP].dst, "1.1.1.2")
929 except (IndexError, AssertionError):
930 self.logger.debug(ppp("Unexpected packet:", rx))
932 self.logger.debug(ppp("Decrypted packet:", pkt))
938 super(TestIpsecGreIfEspTra, self).setUp()
940 self.tun_if = self.pg0
944 bd1 = VppBridgeDomain(self, 1)
947 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
948 p.auth_algo_vpp_id, p.auth_key,
949 p.crypt_algo_vpp_id, p.crypt_key,
950 self.vpp_esp_protocol)
951 p.tun_sa_out.add_vpp_config()
953 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
954 p.auth_algo_vpp_id, p.auth_key,
955 p.crypt_algo_vpp_id, p.crypt_key,
956 self.vpp_esp_protocol)
957 p.tun_sa_in.add_vpp_config()
959 p.tun_if = VppGreInterface(self,
962 p.tun_if.add_vpp_config()
964 p.tun_protect = VppIpsecTunProtect(self,
968 p.tun_protect.add_vpp_config()
971 p.tun_if.config_ip4()
972 config_tra_params(p, self.encryption_type, p.tun_if)
974 VppIpRoute(self, "1.1.1.2", 32,
975 [VppRoutePath(p.tun_if.remote_ip4,
976 0xffffffff)]).add_vpp_config()
980 p.tun_if.unconfig_ip4()
981 super(TestIpsecGreIfEspTra, self).tearDown()
984 class TemplateIpsec4TunProtect(object):
985 """ IPsec IPv4 Tunnel protect """
987 encryption_type = ESP
988 tun4_encrypt_node_name = "esp4-encrypt-tun"
989 tun4_decrypt_node_name = "esp4-decrypt-tun"
990 tun4_input_node = "ipsec4-tun-input"
992 def config_sa_tra(self, p):
993 config_tun_params(p, self.encryption_type, p.tun_if)
995 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
996 p.auth_algo_vpp_id, p.auth_key,
997 p.crypt_algo_vpp_id, p.crypt_key,
998 self.vpp_esp_protocol,
1000 p.tun_sa_out.add_vpp_config()
1002 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1003 p.auth_algo_vpp_id, p.auth_key,
1004 p.crypt_algo_vpp_id, p.crypt_key,
1005 self.vpp_esp_protocol,
1007 p.tun_sa_in.add_vpp_config()
1009 def config_sa_tun(self, p):
1010 config_tun_params(p, self.encryption_type, p.tun_if)
1012 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1013 p.auth_algo_vpp_id, p.auth_key,
1014 p.crypt_algo_vpp_id, p.crypt_key,
1015 self.vpp_esp_protocol,
1016 self.tun_if.remote_addr[p.addr_type],
1017 self.tun_if.local_addr[p.addr_type],
1019 p.tun_sa_out.add_vpp_config()
1021 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1022 p.auth_algo_vpp_id, p.auth_key,
1023 p.crypt_algo_vpp_id, p.crypt_key,
1024 self.vpp_esp_protocol,
1025 self.tun_if.remote_addr[p.addr_type],
1026 self.tun_if.local_addr[p.addr_type],
1028 p.tun_sa_in.add_vpp_config()
1030 def config_protect(self, p):
1031 p.tun_protect = VppIpsecTunProtect(self,
1035 p.tun_protect.add_vpp_config()
1037 def config_network(self, p):
1038 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1040 self.pg0.remote_ip4)
1041 p.tun_if.add_vpp_config()
1043 p.tun_if.config_ip4()
1044 p.tun_if.config_ip6()
1046 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1047 [VppRoutePath(p.tun_if.remote_ip4,
1049 p.route.add_vpp_config()
1050 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1051 [VppRoutePath(p.tun_if.remote_ip6,
1053 proto=DpoProto.DPO_PROTO_IP6)])
1056 def unconfig_network(self, p):
1057 p.route.remove_vpp_config()
1058 p.tun_if.remove_vpp_config()
1060 def unconfig_protect(self, p):
1061 p.tun_protect.remove_vpp_config()
1063 def unconfig_sa(self, p):
1064 p.tun_sa_out.remove_vpp_config()
1065 p.tun_sa_in.remove_vpp_config()
1068 class TestIpsec4TunProtect(TemplateIpsec,
1069 TemplateIpsec4TunProtect,
1071 """ IPsec IPv4 Tunnel protect - transport mode"""
1074 super(TestIpsec4TunProtect, self).setUp()
1076 self.tun_if = self.pg0
1079 super(TestIpsec4TunProtect, self).tearDown()
1081 def test_tun_44(self):
1082 """IPSEC tunnel protect"""
1084 p = self.ipv4_params
1086 self.config_network(p)
1087 self.config_sa_tra(p)
1088 self.config_protect(p)
1090 self.verify_tun_44(p, count=127)
1091 c = p.tun_if.get_rx_stats()
1092 self.assertEqual(c['packets'], 127)
1093 c = p.tun_if.get_tx_stats()
1094 self.assertEqual(c['packets'], 127)
1096 self.vapi.cli("clear ipsec sa")
1097 self.verify_tun_64(p, count=127)
1098 c = p.tun_if.get_rx_stats()
1099 self.assertEqual(c['packets'], 254)
1100 c = p.tun_if.get_tx_stats()
1101 self.assertEqual(c['packets'], 254)
1103 # rekey - create new SAs and update the tunnel protection
1105 np.crypt_key = b'X' + p.crypt_key[1:]
1106 np.scapy_tun_spi += 100
1107 np.scapy_tun_sa_id += 1
1108 np.vpp_tun_spi += 100
1109 np.vpp_tun_sa_id += 1
1110 np.tun_if.local_spi = p.vpp_tun_spi
1111 np.tun_if.remote_spi = p.scapy_tun_spi
1113 self.config_sa_tra(np)
1114 self.config_protect(np)
1117 self.verify_tun_44(np, count=127)
1118 c = p.tun_if.get_rx_stats()
1119 self.assertEqual(c['packets'], 381)
1120 c = p.tun_if.get_tx_stats()
1121 self.assertEqual(c['packets'], 381)
1124 self.unconfig_protect(np)
1125 self.unconfig_sa(np)
1126 self.unconfig_network(p)
1129 class TestIpsec4TunProtectUdp(TemplateIpsec,
1130 TemplateIpsec4TunProtect,
1132 """ IPsec IPv4 Tunnel protect - transport mode"""
1135 super(TestIpsec4TunProtectUdp, self).setUp()
1137 self.tun_if = self.pg0
1139 p = self.ipv4_params
1140 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1141 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1142 p.nat_header = UDP(sport=5454, dport=4500)
1143 self.config_network(p)
1144 self.config_sa_tra(p)
1145 self.config_protect(p)
1148 p = self.ipv4_params
1149 self.unconfig_protect(p)
1151 self.unconfig_network(p)
1152 super(TestIpsec4TunProtectUdp, self).tearDown()
1154 def test_tun_44(self):
1155 """IPSEC UDP tunnel protect"""
1157 p = self.ipv4_params
1159 self.verify_tun_44(p, count=127)
1160 c = p.tun_if.get_rx_stats()
1161 self.assertEqual(c['packets'], 127)
1162 c = p.tun_if.get_tx_stats()
1163 self.assertEqual(c['packets'], 127)
1165 def test_keepalive(self):
1166 """ IPSEC NAT Keepalive """
1167 self.verify_keepalive(self.ipv4_params)
1170 class TestIpsec4TunProtectTun(TemplateIpsec,
1171 TemplateIpsec4TunProtect,
1173 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1175 encryption_type = ESP
1176 tun4_encrypt_node_name = "esp4-encrypt-tun"
1177 tun4_decrypt_node_name = "esp4-decrypt-tun"
1180 super(TestIpsec4TunProtectTun, self).setUp()
1182 self.tun_if = self.pg0
1185 super(TestIpsec4TunProtectTun, self).tearDown()
1187 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1189 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1190 sa.encrypt(IP(src=sw_intf.remote_ip4,
1191 dst=sw_intf.local_ip4) /
1192 IP(src=src, dst=dst) /
1193 UDP(sport=1144, dport=2233) /
1194 Raw(b'X' * payload_size))
1195 for i in range(count)]
1197 def gen_pkts(self, sw_intf, src, dst, count=1,
1199 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1200 IP(src=src, dst=dst) /
1201 UDP(sport=1144, dport=2233) /
1202 Raw(b'X' * payload_size)
1203 for i in range(count)]
1205 def verify_decrypted(self, p, rxs):
1207 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1208 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1209 self.assert_packet_checksums_valid(rx)
1211 def verify_encrypted(self, p, sa, rxs):
1214 pkt = sa.decrypt(rx[IP])
1215 if not pkt.haslayer(IP):
1216 pkt = IP(pkt[Raw].load)
1217 self.assert_packet_checksums_valid(pkt)
1218 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1219 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1220 inner = pkt[IP].payload
1221 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1223 except (IndexError, AssertionError):
1224 self.logger.debug(ppp("Unexpected packet:", rx))
1226 self.logger.debug(ppp("Decrypted packet:", pkt))
1231 def test_tun_44(self):
1232 """IPSEC tunnel protect """
1234 p = self.ipv4_params
1236 self.config_network(p)
1237 self.config_sa_tun(p)
1238 self.config_protect(p)
1240 self.verify_tun_44(p, count=127)
1242 c = p.tun_if.get_rx_stats()
1243 self.assertEqual(c['packets'], 127)
1244 c = p.tun_if.get_tx_stats()
1245 self.assertEqual(c['packets'], 127)
1247 # rekey - create new SAs and update the tunnel protection
1249 np.crypt_key = b'X' + p.crypt_key[1:]
1250 np.scapy_tun_spi += 100
1251 np.scapy_tun_sa_id += 1
1252 np.vpp_tun_spi += 100
1253 np.vpp_tun_sa_id += 1
1254 np.tun_if.local_spi = p.vpp_tun_spi
1255 np.tun_if.remote_spi = p.scapy_tun_spi
1257 self.config_sa_tun(np)
1258 self.config_protect(np)
1261 self.verify_tun_44(np, count=127)
1262 c = p.tun_if.get_rx_stats()
1263 self.assertEqual(c['packets'], 254)
1264 c = p.tun_if.get_tx_stats()
1265 self.assertEqual(c['packets'], 254)
1268 self.unconfig_protect(np)
1269 self.unconfig_sa(np)
1270 self.unconfig_network(p)
1273 class TemplateIpsec6TunProtect(object):
1274 """ IPsec IPv6 Tunnel protect """
1276 def config_sa_tra(self, p):
1277 config_tun_params(p, self.encryption_type, p.tun_if)
1279 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1280 p.auth_algo_vpp_id, p.auth_key,
1281 p.crypt_algo_vpp_id, p.crypt_key,
1282 self.vpp_esp_protocol)
1283 p.tun_sa_out.add_vpp_config()
1285 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1286 p.auth_algo_vpp_id, p.auth_key,
1287 p.crypt_algo_vpp_id, p.crypt_key,
1288 self.vpp_esp_protocol)
1289 p.tun_sa_in.add_vpp_config()
1291 def config_sa_tun(self, p):
1292 config_tun_params(p, self.encryption_type, p.tun_if)
1294 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1295 p.auth_algo_vpp_id, p.auth_key,
1296 p.crypt_algo_vpp_id, p.crypt_key,
1297 self.vpp_esp_protocol,
1298 self.tun_if.remote_addr[p.addr_type],
1299 self.tun_if.local_addr[p.addr_type])
1300 p.tun_sa_out.add_vpp_config()
1302 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1303 p.auth_algo_vpp_id, p.auth_key,
1304 p.crypt_algo_vpp_id, p.crypt_key,
1305 self.vpp_esp_protocol,
1306 self.tun_if.remote_addr[p.addr_type],
1307 self.tun_if.local_addr[p.addr_type])
1308 p.tun_sa_in.add_vpp_config()
1310 def config_protect(self, p):
1311 p.tun_protect = VppIpsecTunProtect(self,
1315 p.tun_protect.add_vpp_config()
1317 def config_network(self, p):
1318 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1320 self.pg0.remote_ip6)
1321 p.tun_if.add_vpp_config()
1323 p.tun_if.config_ip6()
1324 p.tun_if.config_ip4()
1326 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1327 [VppRoutePath(p.tun_if.remote_ip6,
1329 proto=DpoProto.DPO_PROTO_IP6)])
1330 p.route.add_vpp_config()
1331 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1332 [VppRoutePath(p.tun_if.remote_ip4,
1336 def unconfig_network(self, p):
1337 p.route.remove_vpp_config()
1338 p.tun_if.remove_vpp_config()
1340 def unconfig_protect(self, p):
1341 p.tun_protect.remove_vpp_config()
1343 def unconfig_sa(self, p):
1344 p.tun_sa_out.remove_vpp_config()
1345 p.tun_sa_in.remove_vpp_config()
1348 class TestIpsec6TunProtect(TemplateIpsec,
1349 TemplateIpsec6TunProtect,
1351 """ IPsec IPv6 Tunnel protect - transport mode"""
1353 encryption_type = ESP
1354 tun6_encrypt_node_name = "esp6-encrypt-tun"
1355 tun6_decrypt_node_name = "esp6-decrypt-tun"
1358 super(TestIpsec6TunProtect, self).setUp()
1360 self.tun_if = self.pg0
1363 super(TestIpsec6TunProtect, self).tearDown()
1365 def test_tun_66(self):
1366 """IPSEC tunnel protect"""
1368 p = self.ipv6_params
1370 self.config_network(p)
1371 self.config_sa_tra(p)
1372 self.config_protect(p)
1374 self.verify_tun_66(p, count=127)
1375 c = p.tun_if.get_rx_stats()
1376 self.assertEqual(c['packets'], 127)
1377 c = p.tun_if.get_tx_stats()
1378 self.assertEqual(c['packets'], 127)
1380 # rekey - create new SAs and update the tunnel protection
1382 np.crypt_key = b'X' + p.crypt_key[1:]
1383 np.scapy_tun_spi += 100
1384 np.scapy_tun_sa_id += 1
1385 np.vpp_tun_spi += 100
1386 np.vpp_tun_sa_id += 1
1387 np.tun_if.local_spi = p.vpp_tun_spi
1388 np.tun_if.remote_spi = p.scapy_tun_spi
1390 self.config_sa_tra(np)
1391 self.config_protect(np)
1394 self.verify_tun_66(np, count=127)
1395 c = p.tun_if.get_rx_stats()
1396 self.assertEqual(c['packets'], 254)
1397 c = p.tun_if.get_tx_stats()
1398 self.assertEqual(c['packets'], 254)
1401 # 1) add two input SAs [old, new]
1402 # 2) swap output SA to [new]
1403 # 3) use only [new] input SA
1405 np3.crypt_key = b'Z' + p.crypt_key[1:]
1406 np3.scapy_tun_spi += 100
1407 np3.scapy_tun_sa_id += 1
1408 np3.vpp_tun_spi += 100
1409 np3.vpp_tun_sa_id += 1
1410 np3.tun_if.local_spi = p.vpp_tun_spi
1411 np3.tun_if.remote_spi = p.scapy_tun_spi
1413 self.config_sa_tra(np3)
1416 p.tun_protect.update_vpp_config(np.tun_sa_out,
1417 [np.tun_sa_in, np3.tun_sa_in])
1418 self.verify_tun_66(np, np, count=127)
1419 self.verify_tun_66(np3, np, count=127)
1422 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1423 [np.tun_sa_in, np3.tun_sa_in])
1424 self.verify_tun_66(np, np3, count=127)
1425 self.verify_tun_66(np3, np3, count=127)
1428 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1430 self.verify_tun_66(np3, np3, count=127)
1431 self.verify_drop_tun_66(np, count=127)
1433 c = p.tun_if.get_rx_stats()
1434 self.assertEqual(c['packets'], 127*7)
1435 c = p.tun_if.get_tx_stats()
1436 self.assertEqual(c['packets'], 127*7)
1437 self.unconfig_sa(np)
1440 self.unconfig_protect(np3)
1441 self.unconfig_sa(np3)
1442 self.unconfig_network(p)
1444 def test_tun_46(self):
1445 """IPSEC tunnel protect"""
1447 p = self.ipv6_params
1449 self.config_network(p)
1450 self.config_sa_tra(p)
1451 self.config_protect(p)
1453 self.verify_tun_46(p, count=127)
1454 c = p.tun_if.get_rx_stats()
1455 self.assertEqual(c['packets'], 127)
1456 c = p.tun_if.get_tx_stats()
1457 self.assertEqual(c['packets'], 127)
1460 self.unconfig_protect(p)
1462 self.unconfig_network(p)
1465 class TestIpsec6TunProtectTun(TemplateIpsec,
1466 TemplateIpsec6TunProtect,
1468 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1470 encryption_type = ESP
1471 tun6_encrypt_node_name = "esp6-encrypt-tun"
1472 tun6_decrypt_node_name = "esp6-decrypt-tun"
1475 super(TestIpsec6TunProtectTun, self).setUp()
1477 self.tun_if = self.pg0
1480 super(TestIpsec6TunProtectTun, self).tearDown()
1482 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1484 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1485 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1486 dst=sw_intf.local_ip6) /
1487 IPv6(src=src, dst=dst) /
1488 UDP(sport=1166, dport=2233) /
1489 Raw(b'X' * payload_size))
1490 for i in range(count)]
1492 def gen_pkts6(self, sw_intf, src, dst, count=1,
1494 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1495 IPv6(src=src, dst=dst) /
1496 UDP(sport=1166, dport=2233) /
1497 Raw(b'X' * payload_size)
1498 for i in range(count)]
1500 def verify_decrypted6(self, p, rxs):
1502 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1503 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1504 self.assert_packet_checksums_valid(rx)
1506 def verify_encrypted6(self, p, sa, rxs):
1509 pkt = sa.decrypt(rx[IPv6])
1510 if not pkt.haslayer(IPv6):
1511 pkt = IPv6(pkt[Raw].load)
1512 self.assert_packet_checksums_valid(pkt)
1513 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1514 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1515 inner = pkt[IPv6].payload
1516 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1518 except (IndexError, AssertionError):
1519 self.logger.debug(ppp("Unexpected packet:", rx))
1521 self.logger.debug(ppp("Decrypted packet:", pkt))
1526 def test_tun_66(self):
1527 """IPSEC tunnel protect """
1529 p = self.ipv6_params
1531 self.config_network(p)
1532 self.config_sa_tun(p)
1533 self.config_protect(p)
1535 self.verify_tun_66(p, count=127)
1537 c = p.tun_if.get_rx_stats()
1538 self.assertEqual(c['packets'], 127)
1539 c = p.tun_if.get_tx_stats()
1540 self.assertEqual(c['packets'], 127)
1542 # rekey - create new SAs and update the tunnel protection
1544 np.crypt_key = b'X' + p.crypt_key[1:]
1545 np.scapy_tun_spi += 100
1546 np.scapy_tun_sa_id += 1
1547 np.vpp_tun_spi += 100
1548 np.vpp_tun_sa_id += 1
1549 np.tun_if.local_spi = p.vpp_tun_spi
1550 np.tun_if.remote_spi = p.scapy_tun_spi
1552 self.config_sa_tun(np)
1553 self.config_protect(np)
1556 self.verify_tun_66(np, count=127)
1557 c = p.tun_if.get_rx_stats()
1558 self.assertEqual(c['packets'], 254)
1559 c = p.tun_if.get_tx_stats()
1560 self.assertEqual(c['packets'], 254)
1563 self.unconfig_protect(np)
1564 self.unconfig_sa(np)
1565 self.unconfig_network(p)
1568 if __name__ == '__main__':
1569 unittest.main(testRunner=VppTestRunner)