5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
21 from vpp_papi import VppEnum
24 def config_tun_params(p, encryption_type, tun_if):
25 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
26 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
27 IPSEC_API_SAD_FLAG_USE_ESN))
28 crypt_key = mk_scapy_crypt_key(p)
29 p.scapy_tun_sa = SecurityAssociation(
30 encryption_type, spi=p.vpp_tun_spi,
31 crypt_algo=p.crypt_algo,
33 auth_algo=p.auth_algo, auth_key=p.auth_key,
34 tunnel_header=ip_class_by_addr_type[p.addr_type](
37 nat_t_header=p.nat_header,
39 p.vpp_tun_sa = SecurityAssociation(
40 encryption_type, spi=p.scapy_tun_spi,
41 crypt_algo=p.crypt_algo,
43 auth_algo=p.auth_algo, auth_key=p.auth_key,
44 tunnel_header=ip_class_by_addr_type[p.addr_type](
47 nat_t_header=p.nat_header,
51 def config_tra_params(p, encryption_type, tun_if):
52 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
53 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
54 IPSEC_API_SAD_FLAG_USE_ESN))
55 crypt_key = mk_scapy_crypt_key(p)
56 p.scapy_tun_sa = SecurityAssociation(
57 encryption_type, spi=p.vpp_tun_spi,
58 crypt_algo=p.crypt_algo,
60 auth_algo=p.auth_algo, auth_key=p.auth_key,
62 p.vpp_tun_sa = SecurityAssociation(
63 encryption_type, spi=p.scapy_tun_spi,
64 crypt_algo=p.crypt_algo,
66 auth_algo=p.auth_algo, auth_key=p.auth_key,
70 class TemplateIpsec4TunIfEsp(TemplateIpsec):
71 """ IPsec tunnel interface tests """
77 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
80 def tearDownClass(cls):
81 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
84 super(TemplateIpsec4TunIfEsp, self).setUp()
86 self.tun_if = self.pg0
90 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
91 p.scapy_tun_spi, p.crypt_algo_vpp_id,
92 p.crypt_key, p.crypt_key,
93 p.auth_algo_vpp_id, p.auth_key,
95 p.tun_if.add_vpp_config()
99 config_tun_params(p, self.encryption_type, p.tun_if)
101 r = VppIpRoute(self, p.remote_tun_if_host, 32,
102 [VppRoutePath(p.tun_if.remote_ip4,
105 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
106 [VppRoutePath(p.tun_if.remote_ip6,
108 proto=DpoProto.DPO_PROTO_IP6)])
112 super(TemplateIpsec4TunIfEsp, self).tearDown()
115 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
116 """ IPsec UDP tunnel interface tests """
118 tun4_encrypt_node_name = "esp4-encrypt-tun"
119 tun4_decrypt_node_name = "esp4-decrypt-tun"
120 encryption_type = ESP
124 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
127 def tearDownClass(cls):
128 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
131 super(TemplateIpsec4TunIfEspUdp, self).setUp()
133 self.tun_if = self.pg0
136 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
137 IPSEC_API_SAD_FLAG_UDP_ENCAP)
138 p.nat_header = UDP(sport=5454, dport=4500)
140 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
141 p.scapy_tun_spi, p.crypt_algo_vpp_id,
142 p.crypt_key, p.crypt_key,
143 p.auth_algo_vpp_id, p.auth_key,
144 p.auth_key, udp_encap=True)
145 p.tun_if.add_vpp_config()
147 p.tun_if.config_ip4()
148 p.tun_if.config_ip6()
149 config_tun_params(p, self.encryption_type, p.tun_if)
151 r = VppIpRoute(self, p.remote_tun_if_host, 32,
152 [VppRoutePath(p.tun_if.remote_ip4,
155 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
156 [VppRoutePath(p.tun_if.remote_ip6,
158 proto=DpoProto.DPO_PROTO_IP6)])
162 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
165 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
166 """ Ipsec ESP - TUN tests """
167 tun4_encrypt_node_name = "esp4-encrypt-tun"
168 tun4_decrypt_node_name = "esp4-decrypt-tun"
170 def test_tun_basic64(self):
171 """ ipsec 6o4 tunnel basic test """
172 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
174 self.verify_tun_64(self.params[socket.AF_INET], count=1)
176 def test_tun_burst64(self):
177 """ ipsec 6o4 tunnel basic test """
178 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
180 self.verify_tun_64(self.params[socket.AF_INET], count=257)
182 def test_tun_basic_frag44(self):
183 """ ipsec 4o4 tunnel frag basic test """
184 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
188 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
190 self.verify_tun_44(self.params[socket.AF_INET],
191 count=1, payload_size=1800, n_rx=2)
192 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
196 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
197 """ Ipsec ESP UDP tests """
199 tun4_input_node = "ipsec4-tun-input"
201 def test_keepalive(self):
202 """ IPSEC NAT Keepalive """
203 self.verify_keepalive(self.ipv4_params)
206 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
207 """ Ipsec ESP - TCP tests """
211 class TemplateIpsec6TunIfEsp(TemplateIpsec):
212 """ IPsec tunnel interface tests """
214 encryption_type = ESP
217 super(TemplateIpsec6TunIfEsp, self).setUp()
219 self.tun_if = self.pg0
222 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
223 p.scapy_tun_spi, p.crypt_algo_vpp_id,
224 p.crypt_key, p.crypt_key,
225 p.auth_algo_vpp_id, p.auth_key,
226 p.auth_key, is_ip6=True)
227 p.tun_if.add_vpp_config()
229 p.tun_if.config_ip6()
230 p.tun_if.config_ip4()
231 config_tun_params(p, self.encryption_type, p.tun_if)
233 r = VppIpRoute(self, p.remote_tun_if_host, 128,
234 [VppRoutePath(p.tun_if.remote_ip6,
236 proto=DpoProto.DPO_PROTO_IP6)])
238 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
239 [VppRoutePath(p.tun_if.remote_ip4,
244 super(TemplateIpsec6TunIfEsp, self).tearDown()
247 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
249 """ Ipsec ESP - TUN tests """
250 tun6_encrypt_node_name = "esp6-encrypt-tun"
251 tun6_decrypt_node_name = "esp6-decrypt-tun"
253 def test_tun_basic46(self):
254 """ ipsec 4o6 tunnel basic test """
255 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
256 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
258 def test_tun_burst46(self):
259 """ ipsec 4o6 tunnel burst test """
260 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
261 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
264 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
265 IpsecTun6HandoffTests):
266 """ Ipsec ESP 6 Handoff tests """
267 tun6_encrypt_node_name = "esp6-encrypt-tun"
268 tun6_decrypt_node_name = "esp6-decrypt-tun"
271 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
272 IpsecTun4HandoffTests):
273 """ Ipsec ESP 4 Handoff tests """
274 tun4_encrypt_node_name = "esp4-encrypt-tun"
275 tun4_decrypt_node_name = "esp4-decrypt-tun"
278 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
279 """ IPsec IPv4 Multi Tunnel interface """
281 encryption_type = ESP
282 tun4_encrypt_node_name = "esp4-encrypt-tun"
283 tun4_decrypt_node_name = "esp4-decrypt-tun"
286 super(TestIpsec4MultiTunIfEsp, self).setUp()
288 self.tun_if = self.pg0
290 self.multi_params = []
291 self.pg0.generate_remote_hosts(10)
292 self.pg0.configure_ipv4_neighbors()
295 p = copy.copy(self.ipv4_params)
297 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
298 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
299 p.scapy_tun_spi = p.scapy_tun_spi + ii
300 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
301 p.vpp_tun_spi = p.vpp_tun_spi + ii
303 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
304 p.scapy_tra_spi = p.scapy_tra_spi + ii
305 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
306 p.vpp_tra_spi = p.vpp_tra_spi + ii
308 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
311 p.crypt_key, p.crypt_key,
312 p.auth_algo_vpp_id, p.auth_key,
314 dst=self.pg0.remote_hosts[ii].ip4)
315 p.tun_if.add_vpp_config()
317 p.tun_if.config_ip4()
318 config_tun_params(p, self.encryption_type, p.tun_if)
319 self.multi_params.append(p)
321 VppIpRoute(self, p.remote_tun_if_host, 32,
322 [VppRoutePath(p.tun_if.remote_ip4,
323 0xffffffff)]).add_vpp_config()
326 super(TestIpsec4MultiTunIfEsp, self).tearDown()
328 def test_tun_44(self):
329 """Multiple IPSEC tunnel interfaces """
330 for p in self.multi_params:
331 self.verify_tun_44(p, count=127)
332 c = p.tun_if.get_rx_stats()
333 self.assertEqual(c['packets'], 127)
334 c = p.tun_if.get_tx_stats()
335 self.assertEqual(c['packets'], 127)
338 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
339 """ IPsec IPv4 Tunnel interface all Algos """
341 encryption_type = ESP
342 tun4_encrypt_node_name = "esp4-encrypt-tun"
343 tun4_decrypt_node_name = "esp4-decrypt-tun"
345 def config_network(self, p):
347 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
350 p.crypt_key, p.crypt_key,
351 p.auth_algo_vpp_id, p.auth_key,
354 p.tun_if.add_vpp_config()
356 p.tun_if.config_ip4()
357 config_tun_params(p, self.encryption_type, p.tun_if)
358 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
359 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
361 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
362 [VppRoutePath(p.tun_if.remote_ip4,
364 p.route.add_vpp_config()
366 def unconfig_network(self, p):
367 p.tun_if.unconfig_ip4()
368 p.tun_if.remove_vpp_config()
369 p.route.remove_vpp_config()
372 super(TestIpsec4TunIfEspAll, self).setUp()
374 self.tun_if = self.pg0
377 super(TestIpsec4TunIfEspAll, self).tearDown()
381 # change the key and the SPI
383 p.crypt_key = b'X' + p.crypt_key[1:]
385 p.scapy_tun_sa_id += 1
388 p.tun_if.local_spi = p.vpp_tun_spi
389 p.tun_if.remote_spi = p.scapy_tun_spi
391 config_tun_params(p, self.encryption_type, p.tun_if)
393 p.tun_sa_in = VppIpsecSA(self,
400 self.vpp_esp_protocol,
403 p.tun_sa_out = VppIpsecSA(self,
410 self.vpp_esp_protocol,
413 p.tun_sa_in.add_vpp_config()
414 p.tun_sa_out.add_vpp_config()
416 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
417 sa_id=p.tun_sa_in.id,
419 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
420 sa_id=p.tun_sa_out.id,
422 self.logger.info(self.vapi.cli("sh ipsec sa"))
424 def test_tun_44(self):
425 """IPSEC tunnel all algos """
427 # foreach VPP crypto engine
428 engines = ["ia32", "ipsecmb", "openssl"]
430 # foreach crypto algorithm
431 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
432 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
433 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
434 IPSEC_API_INTEG_ALG_NONE),
435 'scapy-crypto': "AES-GCM",
436 'scapy-integ': "NULL",
437 'key': b"JPjyOWBeVEQiMe7h",
439 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
440 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
441 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
442 IPSEC_API_INTEG_ALG_NONE),
443 'scapy-crypto': "AES-GCM",
444 'scapy-integ': "NULL",
445 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
447 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
448 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
449 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
450 IPSEC_API_INTEG_ALG_NONE),
451 'scapy-crypto': "AES-GCM",
452 'scapy-integ': "NULL",
453 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
455 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
456 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
457 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
458 IPSEC_API_INTEG_ALG_SHA1_96),
459 'scapy-crypto': "AES-CBC",
460 'scapy-integ': "HMAC-SHA1-96",
462 'key': b"JPjyOWBeVEQiMe7h"},
463 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
464 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
465 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
466 IPSEC_API_INTEG_ALG_SHA1_96),
467 'scapy-crypto': "AES-CBC",
468 'scapy-integ': "HMAC-SHA1-96",
470 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
471 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
472 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
473 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
474 IPSEC_API_INTEG_ALG_SHA1_96),
475 'scapy-crypto': "AES-CBC",
476 'scapy-integ': "HMAC-SHA1-96",
478 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
479 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
480 IPSEC_API_CRYPTO_ALG_NONE),
481 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
482 IPSEC_API_INTEG_ALG_SHA1_96),
483 'scapy-crypto': "NULL",
484 'scapy-integ': "HMAC-SHA1-96",
486 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
488 for engine in engines:
489 self.vapi.cli("set crypto handler all %s" % engine)
492 # loop through each of the algorithms
495 # with self.subTest(algo=algo['scapy']):
497 p = copy.copy(self.ipv4_params)
498 p.auth_algo_vpp_id = algo['vpp-integ']
499 p.crypt_algo_vpp_id = algo['vpp-crypto']
500 p.crypt_algo = algo['scapy-crypto']
501 p.auth_algo = algo['scapy-integ']
502 p.crypt_key = algo['key']
503 p.salt = algo['salt']
505 self.config_network(p)
507 self.verify_tun_44(p, count=127)
508 c = p.tun_if.get_rx_stats()
509 self.assertEqual(c['packets'], 127)
510 c = p.tun_if.get_tx_stats()
511 self.assertEqual(c['packets'], 127)
517 self.verify_tun_44(p, count=127)
519 self.unconfig_network(p)
520 p.tun_sa_out.remove_vpp_config()
521 p.tun_sa_in.remove_vpp_config()
524 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
525 """ IPsec IPv6 Multi Tunnel interface """
527 encryption_type = ESP
528 tun6_encrypt_node_name = "esp6-encrypt-tun"
529 tun6_decrypt_node_name = "esp6-decrypt-tun"
532 super(TestIpsec6MultiTunIfEsp, self).setUp()
534 self.tun_if = self.pg0
536 self.multi_params = []
537 self.pg0.generate_remote_hosts(10)
538 self.pg0.configure_ipv6_neighbors()
541 p = copy.copy(self.ipv6_params)
543 p.remote_tun_if_host = "1111::%d" % (ii + 1)
544 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
545 p.scapy_tun_spi = p.scapy_tun_spi + ii
546 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
547 p.vpp_tun_spi = p.vpp_tun_spi + ii
549 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
550 p.scapy_tra_spi = p.scapy_tra_spi + ii
551 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
552 p.vpp_tra_spi = p.vpp_tra_spi + ii
554 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
557 p.crypt_key, p.crypt_key,
558 p.auth_algo_vpp_id, p.auth_key,
559 p.auth_key, is_ip6=True,
560 dst=self.pg0.remote_hosts[ii].ip6)
561 p.tun_if.add_vpp_config()
563 p.tun_if.config_ip6()
564 config_tun_params(p, self.encryption_type, p.tun_if)
565 self.multi_params.append(p)
567 r = VppIpRoute(self, p.remote_tun_if_host, 128,
568 [VppRoutePath(p.tun_if.remote_ip6,
570 proto=DpoProto.DPO_PROTO_IP6)])
574 super(TestIpsec6MultiTunIfEsp, self).tearDown()
576 def test_tun_66(self):
577 """Multiple IPSEC tunnel interfaces """
578 for p in self.multi_params:
579 self.verify_tun_66(p, count=127)
580 c = p.tun_if.get_rx_stats()
581 self.assertEqual(c['packets'], 127)
582 c = p.tun_if.get_tx_stats()
583 self.assertEqual(c['packets'], 127)
586 class TestIpsecGreTebIfEsp(TemplateIpsec,
588 """ Ipsec GRE TEB ESP - TUN tests """
589 tun4_encrypt_node_name = "esp4-encrypt-tun"
590 tun4_decrypt_node_name = "esp4-decrypt-tun"
591 encryption_type = ESP
592 omac = "00:11:22:33:44:55"
594 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
596 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
597 sa.encrypt(IP(src=self.pg0.remote_ip4,
598 dst=self.pg0.local_ip4) /
600 Ether(dst=self.omac) /
601 IP(src="1.1.1.1", dst="1.1.1.2") /
602 UDP(sport=1144, dport=2233) /
603 Raw(b'X' * payload_size))
604 for i in range(count)]
606 def gen_pkts(self, sw_intf, src, dst, count=1,
608 return [Ether(dst=self.omac) /
609 IP(src="1.1.1.1", dst="1.1.1.2") /
610 UDP(sport=1144, dport=2233) /
611 Raw(b'X' * payload_size)
612 for i in range(count)]
614 def verify_decrypted(self, p, rxs):
616 self.assert_equal(rx[Ether].dst, self.omac)
617 self.assert_equal(rx[IP].dst, "1.1.1.2")
619 def verify_encrypted(self, p, sa, rxs):
622 pkt = sa.decrypt(rx[IP])
623 if not pkt.haslayer(IP):
624 pkt = IP(pkt[Raw].load)
625 self.assert_packet_checksums_valid(pkt)
626 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
627 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
628 self.assertTrue(pkt.haslayer(GRE))
630 self.assertEqual(e[Ether].dst, self.omac)
631 self.assertEqual(e[IP].dst, "1.1.1.2")
632 except (IndexError, AssertionError):
633 self.logger.debug(ppp("Unexpected packet:", rx))
635 self.logger.debug(ppp("Decrypted packet:", pkt))
641 super(TestIpsecGreTebIfEsp, self).setUp()
643 self.tun_if = self.pg0
647 bd1 = VppBridgeDomain(self, 1)
650 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
651 p.auth_algo_vpp_id, p.auth_key,
652 p.crypt_algo_vpp_id, p.crypt_key,
653 self.vpp_esp_protocol,
656 p.tun_sa_out.add_vpp_config()
658 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
659 p.auth_algo_vpp_id, p.auth_key,
660 p.crypt_algo_vpp_id, p.crypt_key,
661 self.vpp_esp_protocol,
664 p.tun_sa_in.add_vpp_config()
666 p.tun_if = VppGreInterface(self,
669 type=(VppEnum.vl_api_gre_tunnel_type_t.
670 GRE_API_TUNNEL_TYPE_TEB))
671 p.tun_if.add_vpp_config()
673 p.tun_protect = VppIpsecTunProtect(self,
678 p.tun_protect.add_vpp_config()
681 p.tun_if.config_ip4()
682 config_tun_params(p, self.encryption_type, p.tun_if)
684 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
685 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
687 self.vapi.cli("clear ipsec sa")
691 p.tun_if.unconfig_ip4()
692 super(TestIpsecGreTebIfEsp, self).tearDown()
695 class TestIpsecGreTebIfEspTra(TemplateIpsec,
697 """ Ipsec GRE TEB ESP - Tra tests """
698 tun4_encrypt_node_name = "esp4-encrypt-tun"
699 tun4_decrypt_node_name = "esp4-decrypt-tun"
700 encryption_type = ESP
701 omac = "00:11:22:33:44:55"
703 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
705 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
706 sa.encrypt(IP(src=self.pg0.remote_ip4,
707 dst=self.pg0.local_ip4) /
709 Ether(dst=self.omac) /
710 IP(src="1.1.1.1", dst="1.1.1.2") /
711 UDP(sport=1144, dport=2233) /
712 Raw(b'X' * payload_size))
713 for i in range(count)]
715 def gen_pkts(self, sw_intf, src, dst, count=1,
717 return [Ether(dst=self.omac) /
718 IP(src="1.1.1.1", dst="1.1.1.2") /
719 UDP(sport=1144, dport=2233) /
720 Raw(b'X' * payload_size)
721 for i in range(count)]
723 def verify_decrypted(self, p, rxs):
725 self.assert_equal(rx[Ether].dst, self.omac)
726 self.assert_equal(rx[IP].dst, "1.1.1.2")
728 def verify_encrypted(self, p, sa, rxs):
731 pkt = sa.decrypt(rx[IP])
732 if not pkt.haslayer(IP):
733 pkt = IP(pkt[Raw].load)
734 self.assert_packet_checksums_valid(pkt)
735 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
736 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
737 self.assertTrue(pkt.haslayer(GRE))
739 self.assertEqual(e[Ether].dst, self.omac)
740 self.assertEqual(e[IP].dst, "1.1.1.2")
741 except (IndexError, AssertionError):
742 self.logger.debug(ppp("Unexpected packet:", rx))
744 self.logger.debug(ppp("Decrypted packet:", pkt))
750 super(TestIpsecGreTebIfEspTra, self).setUp()
752 self.tun_if = self.pg0
756 bd1 = VppBridgeDomain(self, 1)
759 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
760 p.auth_algo_vpp_id, p.auth_key,
761 p.crypt_algo_vpp_id, p.crypt_key,
762 self.vpp_esp_protocol)
763 p.tun_sa_out.add_vpp_config()
765 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
766 p.auth_algo_vpp_id, p.auth_key,
767 p.crypt_algo_vpp_id, p.crypt_key,
768 self.vpp_esp_protocol)
769 p.tun_sa_in.add_vpp_config()
771 p.tun_if = VppGreInterface(self,
774 type=(VppEnum.vl_api_gre_tunnel_type_t.
775 GRE_API_TUNNEL_TYPE_TEB))
776 p.tun_if.add_vpp_config()
778 p.tun_protect = VppIpsecTunProtect(self,
783 p.tun_protect.add_vpp_config()
786 p.tun_if.config_ip4()
787 config_tra_params(p, self.encryption_type, p.tun_if)
789 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
790 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
792 self.vapi.cli("clear ipsec sa")
796 p.tun_if.unconfig_ip4()
797 super(TestIpsecGreTebIfEspTra, self).tearDown()
800 class TestIpsecGreIfEsp(TemplateIpsec,
802 """ Ipsec GRE ESP - TUN tests """
803 tun4_encrypt_node_name = "esp4-encrypt-tun"
804 tun4_decrypt_node_name = "esp4-decrypt-tun"
805 encryption_type = ESP
807 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
809 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
810 sa.encrypt(IP(src=self.pg0.remote_ip4,
811 dst=self.pg0.local_ip4) /
813 IP(src=self.pg1.local_ip4,
814 dst=self.pg1.remote_ip4) /
815 UDP(sport=1144, dport=2233) /
816 Raw(b'X' * payload_size))
817 for i in range(count)]
819 def gen_pkts(self, sw_intf, src, dst, count=1,
821 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
822 IP(src="1.1.1.1", dst="1.1.1.2") /
823 UDP(sport=1144, dport=2233) /
824 Raw(b'X' * payload_size)
825 for i in range(count)]
827 def verify_decrypted(self, p, rxs):
829 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
830 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
832 def verify_encrypted(self, p, sa, rxs):
835 pkt = sa.decrypt(rx[IP])
836 if not pkt.haslayer(IP):
837 pkt = IP(pkt[Raw].load)
838 self.assert_packet_checksums_valid(pkt)
839 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
840 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
841 self.assertTrue(pkt.haslayer(GRE))
843 self.assertEqual(e[IP].dst, "1.1.1.2")
844 except (IndexError, AssertionError):
845 self.logger.debug(ppp("Unexpected packet:", rx))
847 self.logger.debug(ppp("Decrypted packet:", pkt))
853 super(TestIpsecGreIfEsp, self).setUp()
855 self.tun_if = self.pg0
859 bd1 = VppBridgeDomain(self, 1)
862 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
863 p.auth_algo_vpp_id, p.auth_key,
864 p.crypt_algo_vpp_id, p.crypt_key,
865 self.vpp_esp_protocol,
868 p.tun_sa_out.add_vpp_config()
870 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
871 p.auth_algo_vpp_id, p.auth_key,
872 p.crypt_algo_vpp_id, p.crypt_key,
873 self.vpp_esp_protocol,
876 p.tun_sa_in.add_vpp_config()
878 p.tun_if = VppGreInterface(self,
881 p.tun_if.add_vpp_config()
883 p.tun_protect = VppIpsecTunProtect(self,
887 p.tun_protect.add_vpp_config()
890 p.tun_if.config_ip4()
891 config_tun_params(p, self.encryption_type, p.tun_if)
893 VppIpRoute(self, "1.1.1.2", 32,
894 [VppRoutePath(p.tun_if.remote_ip4,
895 0xffffffff)]).add_vpp_config()
899 p.tun_if.unconfig_ip4()
900 super(TestIpsecGreIfEsp, self).tearDown()
903 class TestIpsecGreIfEspTra(TemplateIpsec,
905 """ Ipsec GRE ESP - TRA tests """
906 tun4_encrypt_node_name = "esp4-encrypt-tun"
907 tun4_decrypt_node_name = "esp4-decrypt-tun"
908 encryption_type = ESP
910 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
912 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
913 sa.encrypt(IP(src=self.pg0.remote_ip4,
914 dst=self.pg0.local_ip4) /
916 IP(src=self.pg1.local_ip4,
917 dst=self.pg1.remote_ip4) /
918 UDP(sport=1144, dport=2233) /
919 Raw(b'X' * payload_size))
920 for i in range(count)]
922 def gen_pkts(self, sw_intf, src, dst, count=1,
924 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
925 IP(src="1.1.1.1", dst="1.1.1.2") /
926 UDP(sport=1144, dport=2233) /
927 Raw(b'X' * payload_size)
928 for i in range(count)]
930 def verify_decrypted(self, p, rxs):
932 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
933 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
935 def verify_encrypted(self, p, sa, rxs):
938 pkt = sa.decrypt(rx[IP])
939 if not pkt.haslayer(IP):
940 pkt = IP(pkt[Raw].load)
941 self.assert_packet_checksums_valid(pkt)
942 self.assertTrue(pkt.haslayer(GRE))
944 self.assertEqual(e[IP].dst, "1.1.1.2")
945 except (IndexError, AssertionError):
946 self.logger.debug(ppp("Unexpected packet:", rx))
948 self.logger.debug(ppp("Decrypted packet:", pkt))
954 super(TestIpsecGreIfEspTra, self).setUp()
956 self.tun_if = self.pg0
960 bd1 = VppBridgeDomain(self, 1)
963 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
964 p.auth_algo_vpp_id, p.auth_key,
965 p.crypt_algo_vpp_id, p.crypt_key,
966 self.vpp_esp_protocol)
967 p.tun_sa_out.add_vpp_config()
969 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
970 p.auth_algo_vpp_id, p.auth_key,
971 p.crypt_algo_vpp_id, p.crypt_key,
972 self.vpp_esp_protocol)
973 p.tun_sa_in.add_vpp_config()
975 p.tun_if = VppGreInterface(self,
978 p.tun_if.add_vpp_config()
980 p.tun_protect = VppIpsecTunProtect(self,
984 p.tun_protect.add_vpp_config()
987 p.tun_if.config_ip4()
988 config_tra_params(p, self.encryption_type, p.tun_if)
990 VppIpRoute(self, "1.1.1.2", 32,
991 [VppRoutePath(p.tun_if.remote_ip4,
992 0xffffffff)]).add_vpp_config()
996 p.tun_if.unconfig_ip4()
997 super(TestIpsecGreIfEspTra, self).tearDown()
1000 class TemplateIpsec4TunProtect(object):
1001 """ IPsec IPv4 Tunnel protect """
1003 encryption_type = ESP
1004 tun4_encrypt_node_name = "esp4-encrypt-tun"
1005 tun4_decrypt_node_name = "esp4-decrypt-tun"
1006 tun4_input_node = "ipsec4-tun-input"
1008 def config_sa_tra(self, p):
1009 config_tun_params(p, self.encryption_type, p.tun_if)
1011 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1012 p.auth_algo_vpp_id, p.auth_key,
1013 p.crypt_algo_vpp_id, p.crypt_key,
1014 self.vpp_esp_protocol,
1016 p.tun_sa_out.add_vpp_config()
1018 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1019 p.auth_algo_vpp_id, p.auth_key,
1020 p.crypt_algo_vpp_id, p.crypt_key,
1021 self.vpp_esp_protocol,
1023 p.tun_sa_in.add_vpp_config()
1025 def config_sa_tun(self, p):
1026 config_tun_params(p, self.encryption_type, p.tun_if)
1028 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1029 p.auth_algo_vpp_id, p.auth_key,
1030 p.crypt_algo_vpp_id, p.crypt_key,
1031 self.vpp_esp_protocol,
1032 self.tun_if.remote_addr[p.addr_type],
1033 self.tun_if.local_addr[p.addr_type],
1035 p.tun_sa_out.add_vpp_config()
1037 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1038 p.auth_algo_vpp_id, p.auth_key,
1039 p.crypt_algo_vpp_id, p.crypt_key,
1040 self.vpp_esp_protocol,
1041 self.tun_if.remote_addr[p.addr_type],
1042 self.tun_if.local_addr[p.addr_type],
1044 p.tun_sa_in.add_vpp_config()
1046 def config_protect(self, p):
1047 p.tun_protect = VppIpsecTunProtect(self,
1051 p.tun_protect.add_vpp_config()
1053 def config_network(self, p):
1054 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1056 self.pg0.remote_ip4)
1057 p.tun_if.add_vpp_config()
1059 p.tun_if.config_ip4()
1060 p.tun_if.config_ip6()
1062 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1063 [VppRoutePath(p.tun_if.remote_ip4,
1065 p.route.add_vpp_config()
1066 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1067 [VppRoutePath(p.tun_if.remote_ip6,
1069 proto=DpoProto.DPO_PROTO_IP6)])
1072 def unconfig_network(self, p):
1073 p.route.remove_vpp_config()
1074 p.tun_if.remove_vpp_config()
1076 def unconfig_protect(self, p):
1077 p.tun_protect.remove_vpp_config()
1079 def unconfig_sa(self, p):
1080 p.tun_sa_out.remove_vpp_config()
1081 p.tun_sa_in.remove_vpp_config()
1084 class TestIpsec4TunProtect(TemplateIpsec,
1085 TemplateIpsec4TunProtect,
1087 """ IPsec IPv4 Tunnel protect - transport mode"""
1090 super(TestIpsec4TunProtect, self).setUp()
1092 self.tun_if = self.pg0
1095 super(TestIpsec4TunProtect, self).tearDown()
1097 def test_tun_44(self):
1098 """IPSEC tunnel protect"""
1100 p = self.ipv4_params
1102 self.config_network(p)
1103 self.config_sa_tra(p)
1104 self.config_protect(p)
1106 self.verify_tun_44(p, count=127)
1107 c = p.tun_if.get_rx_stats()
1108 self.assertEqual(c['packets'], 127)
1109 c = p.tun_if.get_tx_stats()
1110 self.assertEqual(c['packets'], 127)
1112 self.vapi.cli("clear ipsec sa")
1113 self.verify_tun_64(p, count=127)
1114 c = p.tun_if.get_rx_stats()
1115 self.assertEqual(c['packets'], 254)
1116 c = p.tun_if.get_tx_stats()
1117 self.assertEqual(c['packets'], 254)
1119 # rekey - create new SAs and update the tunnel protection
1121 np.crypt_key = b'X' + p.crypt_key[1:]
1122 np.scapy_tun_spi += 100
1123 np.scapy_tun_sa_id += 1
1124 np.vpp_tun_spi += 100
1125 np.vpp_tun_sa_id += 1
1126 np.tun_if.local_spi = p.vpp_tun_spi
1127 np.tun_if.remote_spi = p.scapy_tun_spi
1129 self.config_sa_tra(np)
1130 self.config_protect(np)
1133 self.verify_tun_44(np, count=127)
1134 c = p.tun_if.get_rx_stats()
1135 self.assertEqual(c['packets'], 381)
1136 c = p.tun_if.get_tx_stats()
1137 self.assertEqual(c['packets'], 381)
1140 self.unconfig_protect(np)
1141 self.unconfig_sa(np)
1142 self.unconfig_network(p)
1145 class TestIpsec4TunProtectUdp(TemplateIpsec,
1146 TemplateIpsec4TunProtect,
1148 """ IPsec IPv4 Tunnel protect - transport mode"""
1151 super(TestIpsec4TunProtectUdp, self).setUp()
1153 self.tun_if = self.pg0
1155 p = self.ipv4_params
1156 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1157 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1158 p.nat_header = UDP(sport=5454, dport=4500)
1159 self.config_network(p)
1160 self.config_sa_tra(p)
1161 self.config_protect(p)
1164 p = self.ipv4_params
1165 self.unconfig_protect(p)
1167 self.unconfig_network(p)
1168 super(TestIpsec4TunProtectUdp, self).tearDown()
1170 def test_tun_44(self):
1171 """IPSEC UDP tunnel protect"""
1173 p = self.ipv4_params
1175 self.verify_tun_44(p, count=127)
1176 c = p.tun_if.get_rx_stats()
1177 self.assertEqual(c['packets'], 127)
1178 c = p.tun_if.get_tx_stats()
1179 self.assertEqual(c['packets'], 127)
1181 def test_keepalive(self):
1182 """ IPSEC NAT Keepalive """
1183 self.verify_keepalive(self.ipv4_params)
1186 class TestIpsec4TunProtectTun(TemplateIpsec,
1187 TemplateIpsec4TunProtect,
1189 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1191 encryption_type = ESP
1192 tun4_encrypt_node_name = "esp4-encrypt-tun"
1193 tun4_decrypt_node_name = "esp4-decrypt-tun"
1196 super(TestIpsec4TunProtectTun, self).setUp()
1198 self.tun_if = self.pg0
1201 super(TestIpsec4TunProtectTun, self).tearDown()
1203 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1205 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1206 sa.encrypt(IP(src=sw_intf.remote_ip4,
1207 dst=sw_intf.local_ip4) /
1208 IP(src=src, dst=dst) /
1209 UDP(sport=1144, dport=2233) /
1210 Raw(b'X' * payload_size))
1211 for i in range(count)]
1213 def gen_pkts(self, sw_intf, src, dst, count=1,
1215 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1216 IP(src=src, dst=dst) /
1217 UDP(sport=1144, dport=2233) /
1218 Raw(b'X' * payload_size)
1219 for i in range(count)]
1221 def verify_decrypted(self, p, rxs):
1223 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1224 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1225 self.assert_packet_checksums_valid(rx)
1227 def verify_encrypted(self, p, sa, rxs):
1230 pkt = sa.decrypt(rx[IP])
1231 if not pkt.haslayer(IP):
1232 pkt = IP(pkt[Raw].load)
1233 self.assert_packet_checksums_valid(pkt)
1234 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1235 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1236 inner = pkt[IP].payload
1237 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1239 except (IndexError, AssertionError):
1240 self.logger.debug(ppp("Unexpected packet:", rx))
1242 self.logger.debug(ppp("Decrypted packet:", pkt))
1247 def test_tun_44(self):
1248 """IPSEC tunnel protect """
1250 p = self.ipv4_params
1252 self.config_network(p)
1253 self.config_sa_tun(p)
1254 self.config_protect(p)
1256 self.verify_tun_44(p, count=127)
1258 c = p.tun_if.get_rx_stats()
1259 self.assertEqual(c['packets'], 127)
1260 c = p.tun_if.get_tx_stats()
1261 self.assertEqual(c['packets'], 127)
1263 # rekey - create new SAs and update the tunnel protection
1265 np.crypt_key = b'X' + p.crypt_key[1:]
1266 np.scapy_tun_spi += 100
1267 np.scapy_tun_sa_id += 1
1268 np.vpp_tun_spi += 100
1269 np.vpp_tun_sa_id += 1
1270 np.tun_if.local_spi = p.vpp_tun_spi
1271 np.tun_if.remote_spi = p.scapy_tun_spi
1273 self.config_sa_tun(np)
1274 self.config_protect(np)
1277 self.verify_tun_44(np, count=127)
1278 c = p.tun_if.get_rx_stats()
1279 self.assertEqual(c['packets'], 254)
1280 c = p.tun_if.get_tx_stats()
1281 self.assertEqual(c['packets'], 254)
1284 self.unconfig_protect(np)
1285 self.unconfig_sa(np)
1286 self.unconfig_network(p)
1289 class TemplateIpsec6TunProtect(object):
1290 """ IPsec IPv6 Tunnel protect """
1292 def config_sa_tra(self, p):
1293 config_tun_params(p, self.encryption_type, p.tun_if)
1295 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1296 p.auth_algo_vpp_id, p.auth_key,
1297 p.crypt_algo_vpp_id, p.crypt_key,
1298 self.vpp_esp_protocol)
1299 p.tun_sa_out.add_vpp_config()
1301 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1302 p.auth_algo_vpp_id, p.auth_key,
1303 p.crypt_algo_vpp_id, p.crypt_key,
1304 self.vpp_esp_protocol)
1305 p.tun_sa_in.add_vpp_config()
1307 def config_sa_tun(self, p):
1308 config_tun_params(p, self.encryption_type, p.tun_if)
1310 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1311 p.auth_algo_vpp_id, p.auth_key,
1312 p.crypt_algo_vpp_id, p.crypt_key,
1313 self.vpp_esp_protocol,
1314 self.tun_if.remote_addr[p.addr_type],
1315 self.tun_if.local_addr[p.addr_type])
1316 p.tun_sa_out.add_vpp_config()
1318 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1319 p.auth_algo_vpp_id, p.auth_key,
1320 p.crypt_algo_vpp_id, p.crypt_key,
1321 self.vpp_esp_protocol,
1322 self.tun_if.remote_addr[p.addr_type],
1323 self.tun_if.local_addr[p.addr_type])
1324 p.tun_sa_in.add_vpp_config()
1326 def config_protect(self, p):
1327 p.tun_protect = VppIpsecTunProtect(self,
1331 p.tun_protect.add_vpp_config()
1333 def config_network(self, p):
1334 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1336 self.pg0.remote_ip6)
1337 p.tun_if.add_vpp_config()
1339 p.tun_if.config_ip6()
1340 p.tun_if.config_ip4()
1342 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1343 [VppRoutePath(p.tun_if.remote_ip6,
1345 proto=DpoProto.DPO_PROTO_IP6)])
1346 p.route.add_vpp_config()
1347 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1348 [VppRoutePath(p.tun_if.remote_ip4,
1352 def unconfig_network(self, p):
1353 p.route.remove_vpp_config()
1354 p.tun_if.remove_vpp_config()
1356 def unconfig_protect(self, p):
1357 p.tun_protect.remove_vpp_config()
1359 def unconfig_sa(self, p):
1360 p.tun_sa_out.remove_vpp_config()
1361 p.tun_sa_in.remove_vpp_config()
1364 class TestIpsec6TunProtect(TemplateIpsec,
1365 TemplateIpsec6TunProtect,
1367 """ IPsec IPv6 Tunnel protect - transport mode"""
1369 encryption_type = ESP
1370 tun6_encrypt_node_name = "esp6-encrypt-tun"
1371 tun6_decrypt_node_name = "esp6-decrypt-tun"
1374 super(TestIpsec6TunProtect, self).setUp()
1376 self.tun_if = self.pg0
1379 super(TestIpsec6TunProtect, self).tearDown()
1381 def test_tun_66(self):
1382 """IPSEC tunnel protect"""
1384 p = self.ipv6_params
1386 self.config_network(p)
1387 self.config_sa_tra(p)
1388 self.config_protect(p)
1390 self.verify_tun_66(p, count=127)
1391 c = p.tun_if.get_rx_stats()
1392 self.assertEqual(c['packets'], 127)
1393 c = p.tun_if.get_tx_stats()
1394 self.assertEqual(c['packets'], 127)
1396 # rekey - create new SAs and update the tunnel protection
1398 np.crypt_key = b'X' + p.crypt_key[1:]
1399 np.scapy_tun_spi += 100
1400 np.scapy_tun_sa_id += 1
1401 np.vpp_tun_spi += 100
1402 np.vpp_tun_sa_id += 1
1403 np.tun_if.local_spi = p.vpp_tun_spi
1404 np.tun_if.remote_spi = p.scapy_tun_spi
1406 self.config_sa_tra(np)
1407 self.config_protect(np)
1410 self.verify_tun_66(np, count=127)
1411 c = p.tun_if.get_rx_stats()
1412 self.assertEqual(c['packets'], 254)
1413 c = p.tun_if.get_tx_stats()
1414 self.assertEqual(c['packets'], 254)
1417 # 1) add two input SAs [old, new]
1418 # 2) swap output SA to [new]
1419 # 3) use only [new] input SA
1421 np3.crypt_key = b'Z' + p.crypt_key[1:]
1422 np3.scapy_tun_spi += 100
1423 np3.scapy_tun_sa_id += 1
1424 np3.vpp_tun_spi += 100
1425 np3.vpp_tun_sa_id += 1
1426 np3.tun_if.local_spi = p.vpp_tun_spi
1427 np3.tun_if.remote_spi = p.scapy_tun_spi
1429 self.config_sa_tra(np3)
1432 p.tun_protect.update_vpp_config(np.tun_sa_out,
1433 [np.tun_sa_in, np3.tun_sa_in])
1434 self.verify_tun_66(np, np, count=127)
1435 self.verify_tun_66(np3, np, count=127)
1438 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1439 [np.tun_sa_in, np3.tun_sa_in])
1440 self.verify_tun_66(np, np3, count=127)
1441 self.verify_tun_66(np3, np3, count=127)
1444 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1446 self.verify_tun_66(np3, np3, count=127)
1447 self.verify_drop_tun_66(np, count=127)
1449 c = p.tun_if.get_rx_stats()
1450 self.assertEqual(c['packets'], 127*7)
1451 c = p.tun_if.get_tx_stats()
1452 self.assertEqual(c['packets'], 127*7)
1453 self.unconfig_sa(np)
1456 self.unconfig_protect(np3)
1457 self.unconfig_sa(np3)
1458 self.unconfig_network(p)
1460 def test_tun_46(self):
1461 """IPSEC tunnel protect"""
1463 p = self.ipv6_params
1465 self.config_network(p)
1466 self.config_sa_tra(p)
1467 self.config_protect(p)
1469 self.verify_tun_46(p, count=127)
1470 c = p.tun_if.get_rx_stats()
1471 self.assertEqual(c['packets'], 127)
1472 c = p.tun_if.get_tx_stats()
1473 self.assertEqual(c['packets'], 127)
1476 self.unconfig_protect(p)
1478 self.unconfig_network(p)
1481 class TestIpsec6TunProtectTun(TemplateIpsec,
1482 TemplateIpsec6TunProtect,
1484 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1486 encryption_type = ESP
1487 tun6_encrypt_node_name = "esp6-encrypt-tun"
1488 tun6_decrypt_node_name = "esp6-decrypt-tun"
1491 super(TestIpsec6TunProtectTun, self).setUp()
1493 self.tun_if = self.pg0
1496 super(TestIpsec6TunProtectTun, self).tearDown()
1498 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1500 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1501 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1502 dst=sw_intf.local_ip6) /
1503 IPv6(src=src, dst=dst) /
1504 UDP(sport=1166, dport=2233) /
1505 Raw(b'X' * payload_size))
1506 for i in range(count)]
1508 def gen_pkts6(self, sw_intf, src, dst, count=1,
1510 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1511 IPv6(src=src, dst=dst) /
1512 UDP(sport=1166, dport=2233) /
1513 Raw(b'X' * payload_size)
1514 for i in range(count)]
1516 def verify_decrypted6(self, p, rxs):
1518 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1519 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1520 self.assert_packet_checksums_valid(rx)
1522 def verify_encrypted6(self, p, sa, rxs):
1525 pkt = sa.decrypt(rx[IPv6])
1526 if not pkt.haslayer(IPv6):
1527 pkt = IPv6(pkt[Raw].load)
1528 self.assert_packet_checksums_valid(pkt)
1529 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1530 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1531 inner = pkt[IPv6].payload
1532 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1534 except (IndexError, AssertionError):
1535 self.logger.debug(ppp("Unexpected packet:", rx))
1537 self.logger.debug(ppp("Decrypted packet:", pkt))
1542 def test_tun_66(self):
1543 """IPSEC tunnel protect """
1545 p = self.ipv6_params
1547 self.config_network(p)
1548 self.config_sa_tun(p)
1549 self.config_protect(p)
1551 self.verify_tun_66(p, count=127)
1553 c = p.tun_if.get_rx_stats()
1554 self.assertEqual(c['packets'], 127)
1555 c = p.tun_if.get_tx_stats()
1556 self.assertEqual(c['packets'], 127)
1558 # rekey - create new SAs and update the tunnel protection
1560 np.crypt_key = b'X' + p.crypt_key[1:]
1561 np.scapy_tun_spi += 100
1562 np.scapy_tun_sa_id += 1
1563 np.vpp_tun_spi += 100
1564 np.vpp_tun_sa_id += 1
1565 np.tun_if.local_spi = p.vpp_tun_spi
1566 np.tun_if.remote_spi = p.scapy_tun_spi
1568 self.config_sa_tun(np)
1569 self.config_protect(np)
1572 self.verify_tun_66(np, count=127)
1573 c = p.tun_if.get_rx_stats()
1574 self.assertEqual(c['packets'], 254)
1575 c = p.tun_if.get_tx_stats()
1576 self.assertEqual(c['packets'], 254)
1579 self.unconfig_protect(np)
1580 self.unconfig_sa(np)
1581 self.unconfig_network(p)
1584 if __name__ == '__main__':
1585 unittest.main(testRunner=VppTestRunner)