3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
23 from framework import is_distro_ubuntu2204, is_distro_debian11
24 from framework import VppTestCase, VppTestRunner
25 from vpp_ikev2 import Profile, IDType, AuthMethod
26 from vpp_papi import VppEnum
33 KEY_PAD = b"Key Pad for IKEv2"
40 # tuple structure is (p, g, key_len)
45 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
63 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
64 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
65 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
66 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
67 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
68 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
69 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
70 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
71 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
72 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
73 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
74 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
75 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
76 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
77 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
78 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
86 class CryptoAlgo(object):
87 def __init__(self, name, cipher, mode):
91 if self.cipher is not None:
92 self.bs = self.cipher.block_size // 8
94 if self.name == "AES-GCM-16ICV":
95 self.iv_len = GCM_IV_SIZE
99 def encrypt(self, data, key, aad=None):
100 iv = os.urandom(self.iv_len)
103 self.cipher(key), self.mode(iv), default_backend()
105 return iv + encryptor.update(data) + encryptor.finalize()
107 salt = key[-SALT_SIZE:]
110 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
112 encryptor.authenticate_additional_data(aad)
113 data = encryptor.update(data) + encryptor.finalize()
114 data += encryptor.tag[:GCM_ICV_SIZE]
117 def decrypt(self, data, key, aad=None, icv=None):
119 iv = data[: self.iv_len]
120 ct = data[self.iv_len :]
122 algorithms.AES(key), self.mode(iv), default_backend()
124 return decryptor.update(ct) + decryptor.finalize()
126 salt = key[-SALT_SIZE:]
127 nonce = salt + data[:GCM_IV_SIZE]
128 ct = data[GCM_IV_SIZE:]
129 key = key[:-SALT_SIZE]
131 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
133 decryptor.authenticate_additional_data(aad)
134 return decryptor.update(ct) + decryptor.finalize()
137 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
138 data = data + b"\x00" * (pad_len - 1)
139 return data + bytes([pad_len - 1])
142 class AuthAlgo(object):
143 def __init__(self, name, mac, mod, key_len, trunc_len=None):
147 self.key_len = key_len
148 self.trunc_len = trunc_len or key_len
152 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
153 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
154 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
158 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
159 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
160 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
161 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
162 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
166 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
167 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
183 class IKEv2ChildSA(object):
184 def __init__(self, local_ts, remote_ts, is_initiator):
192 self.local_ts = local_ts
193 self.remote_ts = remote_ts
196 class IKEv2SA(object):
203 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
209 auth_method="shared-key",
215 self.udp_encap = udp_encap
225 self.dh_params = None
227 self.priv_key = priv_key
228 self.is_initiator = is_initiator
229 nonce = nonce or os.urandom(32)
230 self.auth_data = auth_data
233 if isinstance(id_type, str):
234 self.id_type = IDType.value(id_type)
236 self.id_type = id_type
237 self.auth_method = auth_method
238 if self.is_initiator:
239 self.rspi = 8 * b"\x00"
244 self.ispi = 8 * b"\x00"
246 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
248 def new_msg_id(self):
253 def my_dh_pub_key(self):
254 if self.is_initiator:
255 return self.i_dh_data
256 return self.r_dh_data
259 def peer_dh_pub_key(self):
260 if self.is_initiator:
261 return self.r_dh_data
262 return self.i_dh_data
266 return self.i_natt or self.r_natt
268 def compute_secret(self):
269 priv = self.dh_private_key
270 peer = self.peer_dh_pub_key
271 p, g, l = self.ike_group
273 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
276 def generate_dh_data(self):
278 if self.ike_dh not in DH:
279 raise NotImplementedError("%s not in DH group" % self.ike_dh)
281 if self.dh_params is None:
282 dhg = DH[self.ike_dh]
283 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
284 self.dh_params = pn.parameters(default_backend())
286 priv = self.dh_params.generate_private_key()
287 pub = priv.public_key()
288 x = priv.private_numbers().x
289 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
290 y = pub.public_numbers().y
292 if self.is_initiator:
293 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
295 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
297 def complete_dh_data(self):
298 self.dh_shared_secret = self.compute_secret()
300 def calc_child_keys(self, kex=False):
301 prf = self.ike_prf_alg.mod()
302 s = self.i_nonce + self.r_nonce
304 s = self.dh_shared_secret + s
305 c = self.child_sas[0]
307 encr_key_len = self.esp_crypto_key_len
308 integ_key_len = self.esp_integ_alg.key_len
309 salt_len = 0 if integ_key_len else 4
311 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
312 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
315 c.sk_ei = keymat[pos : pos + encr_key_len]
319 c.sk_ai = keymat[pos : pos + integ_key_len]
322 c.salt_ei = keymat[pos : pos + salt_len]
325 c.sk_er = keymat[pos : pos + encr_key_len]
329 c.sk_ar = keymat[pos : pos + integ_key_len]
332 c.salt_er = keymat[pos : pos + salt_len]
335 def calc_prfplus(self, prf, key, seed, length):
339 while len(r) < length and x < 255:
344 s = s + seed + bytes([x])
345 t = self.calc_prf(prf, key, s)
353 def calc_prf(self, prf, key, data):
354 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
359 prf = self.ike_prf_alg.mod()
360 # SKEYSEED = prf(Ni | Nr, g^ir)
361 s = self.i_nonce + self.r_nonce
362 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
364 # calculate S = Ni | Nr | SPIi SPIr
365 s = s + self.ispi + self.rspi
367 prf_key_trunc = self.ike_prf_alg.trunc_len
368 encr_key_len = self.ike_crypto_key_len
369 tr_prf_key_len = self.ike_prf_alg.key_len
370 integ_key_len = self.ike_integ_alg.key_len
371 if integ_key_len == 0:
383 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
386 self.sk_d = keymat[: pos + prf_key_trunc]
389 self.sk_ai = keymat[pos : pos + integ_key_len]
391 self.sk_ar = keymat[pos : pos + integ_key_len]
394 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
395 pos += encr_key_len + salt_size
396 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
397 pos += encr_key_len + salt_size
399 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
400 pos += tr_prf_key_len
401 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
403 def generate_authmsg(self, prf, packet):
404 if self.is_initiator:
412 data = bytes([self.id_type, 0, 0, 0]) + id
413 id_hash = self.calc_prf(prf, key, data)
414 return packet + nonce + id_hash
417 prf = self.ike_prf_alg.mod()
418 if self.is_initiator:
419 packet = self.init_req_packet
421 packet = self.init_resp_packet
422 authmsg = self.generate_authmsg(prf, raw(packet))
423 if self.auth_method == "shared-key":
424 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
425 self.auth_data = self.calc_prf(prf, psk, authmsg)
426 elif self.auth_method == "rsa-sig":
427 self.auth_data = self.priv_key.sign(
428 authmsg, padding.PKCS1v15(), hashes.SHA1()
431 raise TypeError("unknown auth method type!")
433 def encrypt(self, data, aad=None):
434 data = self.ike_crypto_alg.pad(data)
435 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
438 def peer_authkey(self):
439 if self.is_initiator:
444 def my_authkey(self):
445 if self.is_initiator:
450 def my_cryptokey(self):
451 if self.is_initiator:
456 def peer_cryptokey(self):
457 if self.is_initiator:
461 def concat(self, alg, key_len):
462 return alg + "-" + str(key_len * 8)
465 def vpp_ike_cypto_alg(self):
466 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
469 def vpp_esp_cypto_alg(self):
470 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
472 def verify_hmac(self, ikemsg):
473 integ_trunc = self.ike_integ_alg.trunc_len
474 exp_hmac = ikemsg[-integ_trunc:]
475 data = ikemsg[:-integ_trunc]
476 computed_hmac = self.compute_hmac(
477 self.ike_integ_alg.mod(), self.peer_authkey, data
479 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
481 def compute_hmac(self, integ, key, data):
482 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
486 def decrypt(self, data, aad=None, icv=None):
487 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
489 def hmac_and_decrypt(self, ike):
490 ep = ike[ikev2.IKEv2_payload_Encrypted]
491 if self.ike_crypto == "AES-GCM-16ICV":
492 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
493 ct = ep.load[:-GCM_ICV_SIZE]
494 tag = ep.load[-GCM_ICV_SIZE:]
495 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
497 self.verify_hmac(raw(ike))
498 integ_trunc = self.ike_integ_alg.trunc_len
500 # remove ICV and decrypt payload
501 ct = ep.load[:-integ_trunc]
502 plain = self.decrypt(ct)
505 return plain[: -pad_len - 1]
507 def build_ts_addr(self, ts, version):
509 "starting_address_v" + version: ts["start_addr"],
510 "ending_address_v" + version: ts["end_addr"],
513 def generate_ts(self, is_ip4):
514 c = self.child_sas[0]
515 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
517 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
518 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
519 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
520 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
522 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
523 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
524 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
525 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
527 if self.is_initiator:
528 return ([ts1], [ts2])
529 return ([ts2], [ts1])
531 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
532 if crypto not in CRYPTO_ALGOS:
533 raise TypeError("unsupported encryption algo %r" % crypto)
534 self.ike_crypto = crypto
535 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
536 self.ike_crypto_key_len = crypto_key_len
538 if integ not in AUTH_ALGOS:
539 raise TypeError("unsupported auth algo %r" % integ)
540 self.ike_integ = None if integ == "NULL" else integ
541 self.ike_integ_alg = AUTH_ALGOS[integ]
543 if prf not in PRF_ALGOS:
544 raise TypeError("unsupported prf algo %r" % prf)
546 self.ike_prf_alg = PRF_ALGOS[prf]
548 self.ike_group = DH[self.ike_dh]
550 def set_esp_props(self, crypto, crypto_key_len, integ):
551 self.esp_crypto_key_len = crypto_key_len
552 if crypto not in CRYPTO_ALGOS:
553 raise TypeError("unsupported encryption algo %r" % crypto)
554 self.esp_crypto = crypto
555 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
557 if integ not in AUTH_ALGOS:
558 raise TypeError("unsupported auth algo %r" % integ)
559 self.esp_integ = None if integ == "NULL" else integ
560 self.esp_integ_alg = AUTH_ALGOS[integ]
562 def crypto_attr(self, key_len):
563 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
564 return (0x800E << 16 | key_len << 3, 12)
566 raise Exception("unsupported attribute type")
568 def ike_crypto_attr(self):
569 return self.crypto_attr(self.ike_crypto_key_len)
571 def esp_crypto_attr(self):
572 return self.crypto_attr(self.esp_crypto_key_len)
574 def compute_nat_sha1(self, ip, port, rspi=None):
577 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
578 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
580 return digest.finalize()
583 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
584 class IkePeer(VppTestCase):
585 """common class for initiator and responder"""
589 import scapy.contrib.ikev2 as _ikev2
591 globals()["ikev2"] = _ikev2
592 super(IkePeer, cls).setUpClass()
593 cls.create_pg_interfaces(range(2))
594 for i in cls.pg_interfaces:
602 def tearDownClass(cls):
603 super(IkePeer, cls).tearDownClass()
606 super(IkePeer, self).tearDown()
607 if self.del_sa_from_responder:
608 self.initiate_del_sa_from_responder()
610 self.initiate_del_sa_from_initiator()
611 r = self.vapi.ikev2_sa_dump()
612 self.assertEqual(len(r), 0)
613 sas = self.vapi.ipsec_sa_dump()
614 self.assertEqual(len(sas), 0)
615 self.p.remove_vpp_config()
616 self.assertIsNone(self.p.query_vpp_config())
619 super(IkePeer, self).setUp()
621 self.p.add_vpp_config()
622 self.assertIsNotNone(self.p.query_vpp_config())
623 if self.sa.is_initiator:
624 self.sa.generate_dh_data()
625 self.vapi.cli("ikev2 set logging level 4")
626 self.vapi.cli("event-lo clear")
628 def assert_counter(self, count, name, version="ip4"):
629 node_name = "/err/ikev2-%s/" % version + name
630 self.assertEqual(count, self.statistics.get_err_counter(node_name))
632 def create_rekey_request(self, kex=False):
633 sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
634 header = ikev2.IKEv2(
635 init_SPI=self.sa.ispi,
636 resp_SPI=self.sa.rspi,
637 id=self.sa.new_msg_id(),
639 exch_type="CREATE_CHILD_SA",
642 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
643 return self.create_packet(
644 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
647 def create_empty_request(self):
648 header = ikev2.IKEv2(
649 init_SPI=self.sa.ispi,
650 resp_SPI=self.sa.rspi,
651 id=self.sa.new_msg_id(),
653 exch_type="INFORMATIONAL",
654 next_payload="Encrypted",
657 msg = self.encrypt_ike_msg(header, b"", None)
658 return self.create_packet(
659 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
663 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
666 src_ip = src_if.remote_ip6
667 dst_ip = src_if.local_ip6
670 src_ip = src_if.remote_ip4
671 dst_ip = src_if.local_ip4
674 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
675 / ip_layer(src=src_ip, dst=dst_ip)
676 / UDP(sport=sport, dport=dport)
679 # insert non ESP marker
680 res = res / Raw(b"\x00" * 4)
683 def verify_udp(self, udp):
684 self.assertEqual(udp.sport, self.sa.sport)
685 self.assertEqual(udp.dport, self.sa.dport)
687 def get_ike_header(self, packet):
689 ih = packet[ikev2.IKEv2]
690 ih = self.verify_and_remove_non_esp_marker(ih)
691 except IndexError as e:
692 # this is a workaround for getting IKEv2 layer as both ikev2 and
693 # ipsec register for port 4500
695 ih = self.verify_and_remove_non_esp_marker(esp)
696 self.assertEqual(ih.version, 0x20)
697 self.assertNotIn("Version", ih.flags)
700 def verify_and_remove_non_esp_marker(self, packet):
702 # if we are in nat traversal mode check for non esp marker
705 self.assertEqual(data[:4], b"\x00" * 4)
706 return ikev2.IKEv2(data[4:])
710 def encrypt_ike_msg(self, header, plain, first_payload):
711 if self.sa.ike_crypto == "AES-GCM-16ICV":
712 data = self.sa.ike_crypto_alg.pad(raw(plain))
717 + len(ikev2.IKEv2_payload_Encrypted())
719 tlen = plen + len(ikev2.IKEv2())
722 sk_p = ikev2.IKEv2_payload_Encrypted(
723 next_payload=first_payload, length=plen
727 encr = self.sa.encrypt(raw(plain), raw(res))
728 sk_p = ikev2.IKEv2_payload_Encrypted(
729 next_payload=first_payload, length=plen, load=encr
733 encr = self.sa.encrypt(raw(plain))
734 trunc_len = self.sa.ike_integ_alg.trunc_len
735 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
736 tlen = plen + len(ikev2.IKEv2())
738 sk_p = ikev2.IKEv2_payload_Encrypted(
739 next_payload=first_payload, length=plen, load=encr
744 integ_data = raw(res)
745 hmac_data = self.sa.compute_hmac(
746 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
748 res = res / Raw(hmac_data[:trunc_len])
749 assert len(res) == tlen
752 def verify_udp_encap(self, ipsec_sa):
753 e = VppEnum.vl_api_ipsec_sad_flags_t
754 if self.sa.udp_encap or self.sa.natt:
755 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
757 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
759 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
760 sas = self.vapi.ipsec_sa_dump()
763 # after rekey there is a short period of time in which old
764 # inbound SA is still present
768 self.assertEqual(len(sas), sa_count)
769 if self.sa.is_initiator:
784 c = self.sa.child_sas[0]
786 self.verify_udp_encap(sa0)
787 self.verify_udp_encap(sa1)
788 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
789 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
790 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
792 if self.sa.esp_integ is None:
795 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
796 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
797 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
800 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
801 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
802 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
803 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
807 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
808 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
809 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
810 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
812 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
813 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
815 def verify_keymat(self, api_keys, keys, name):
816 km = getattr(keys, name)
817 api_km = getattr(api_keys, name)
818 api_km_len = getattr(api_keys, name + "_len")
819 self.assertEqual(len(km), api_km_len)
820 self.assertEqual(km, api_km[:api_km_len])
822 def verify_id(self, api_id, exp_id):
823 self.assertEqual(api_id.type, IDType.value(exp_id.type))
824 self.assertEqual(api_id.data_len, exp_id.data_len)
825 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
827 def verify_ike_sas(self):
828 r = self.vapi.ikev2_sa_dump()
829 self.assertEqual(len(r), 1)
831 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
832 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
834 if self.sa.is_initiator:
835 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
836 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
838 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
839 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
841 if self.sa.is_initiator:
842 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
843 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
845 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
846 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
847 self.verify_keymat(sa.keys, self.sa, "sk_d")
848 self.verify_keymat(sa.keys, self.sa, "sk_ai")
849 self.verify_keymat(sa.keys, self.sa, "sk_ar")
850 self.verify_keymat(sa.keys, self.sa, "sk_ei")
851 self.verify_keymat(sa.keys, self.sa, "sk_er")
852 self.verify_keymat(sa.keys, self.sa, "sk_pi")
853 self.verify_keymat(sa.keys, self.sa, "sk_pr")
855 self.assertEqual(sa.i_id.type, self.sa.id_type)
856 self.assertEqual(sa.r_id.type, self.sa.id_type)
857 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
858 self.assertEqual(sa.r_id.data_len, len(self.idr))
859 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
860 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
862 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
863 self.assertEqual(len(r), 1)
865 self.assertEqual(csa.sa_index, sa.sa_index)
866 c = self.sa.child_sas[0]
867 if hasattr(c, "sk_ai"):
868 self.verify_keymat(csa.keys, c, "sk_ai")
869 self.verify_keymat(csa.keys, c, "sk_ar")
870 self.verify_keymat(csa.keys, c, "sk_ei")
871 self.verify_keymat(csa.keys, c, "sk_er")
872 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
873 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
875 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
878 r = self.vapi.ikev2_traffic_selector_dump(
879 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
881 self.assertEqual(len(r), 1)
883 self.verify_ts(r[0].ts, tsi[0], True)
885 r = self.vapi.ikev2_traffic_selector_dump(
886 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
888 self.assertEqual(len(r), 1)
889 self.verify_ts(r[0].ts, tsr[0], False)
891 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
892 self.verify_nonce(n, self.sa.i_nonce)
893 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
894 self.verify_nonce(n, self.sa.r_nonce)
896 def verify_nonce(self, api_nonce, nonce):
897 self.assertEqual(api_nonce.data_len, len(nonce))
898 self.assertEqual(api_nonce.nonce, nonce)
900 def verify_ts(self, api_ts, ts, is_initiator):
902 self.assertTrue(api_ts.is_local)
904 self.assertFalse(api_ts.is_local)
907 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
908 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
910 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
911 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
912 self.assertEqual(api_ts.start_port, ts.start_port)
913 self.assertEqual(api_ts.end_port, ts.end_port)
914 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
917 class TemplateInitiator(IkePeer):
918 """initiator test template"""
920 def initiate_del_sa_from_initiator(self):
921 ispi = int.from_bytes(self.sa.ispi, "little")
922 self.pg0.enable_capture()
924 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
925 capture = self.pg0.get_capture(1)
926 ih = self.get_ike_header(capture[0])
927 self.assertNotIn("Response", ih.flags)
928 self.assertIn("Initiator", ih.flags)
929 self.assertEqual(ih.init_SPI, self.sa.ispi)
930 self.assertEqual(ih.resp_SPI, self.sa.rspi)
931 plain = self.sa.hmac_and_decrypt(ih)
932 d = ikev2.IKEv2_payload_Delete(plain)
933 self.assertEqual(d.proto, 1) # proto=IKEv2
934 header = ikev2.IKEv2(
935 init_SPI=self.sa.ispi,
936 resp_SPI=self.sa.rspi,
938 exch_type="INFORMATIONAL",
940 next_payload="Encrypted",
942 resp = self.encrypt_ike_msg(header, b"", None)
943 self.send_and_assert_no_replies(self.pg0, resp)
945 def verify_del_sa(self, packet):
946 ih = self.get_ike_header(packet)
947 self.assertEqual(ih.id, self.sa.msg_id)
948 self.assertEqual(ih.exch_type, 37) # exchange informational
949 self.assertIn("Response", ih.flags)
950 self.assertIn("Initiator", ih.flags)
951 plain = self.sa.hmac_and_decrypt(ih)
952 self.assertEqual(plain, b"")
954 def initiate_del_sa_from_responder(self):
955 header = ikev2.IKEv2(
956 init_SPI=self.sa.ispi,
957 resp_SPI=self.sa.rspi,
958 exch_type="INFORMATIONAL",
959 id=self.sa.new_msg_id(),
961 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
962 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
963 packet = self.create_packet(
964 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
966 self.pg0.add_stream(packet)
967 self.pg0.enable_capture()
969 capture = self.pg0.get_capture(1)
970 self.verify_del_sa(capture[0])
973 def find_notify_payload(packet, notify_type):
974 n = packet[ikev2.IKEv2_payload_Notify]
976 if n.type == notify_type:
981 def verify_nat_detection(self, packet):
988 # NAT_DETECTION_SOURCE_IP
989 s = self.find_notify_payload(packet, 16388)
990 self.assertIsNotNone(s)
991 src_sha = self.sa.compute_nat_sha1(
992 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
994 self.assertEqual(s.load, src_sha)
996 # NAT_DETECTION_DESTINATION_IP
997 s = self.find_notify_payload(packet, 16389)
998 self.assertIsNotNone(s)
999 dst_sha = self.sa.compute_nat_sha1(
1000 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1002 self.assertEqual(s.load, dst_sha)
1004 def verify_sa_init_request(self, packet):
1006 self.sa.dport = udp.sport
1007 ih = packet[ikev2.IKEv2]
1008 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1009 self.assertEqual(ih.exch_type, 34) # SA_INIT
1010 self.sa.ispi = ih.init_SPI
1011 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1012 self.assertIn("Initiator", ih.flags)
1013 self.assertNotIn("Response", ih.flags)
1014 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1015 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1017 prop = packet[ikev2.IKEv2_payload_Proposal]
1018 self.assertEqual(prop.proto, 1) # proto = ikev2
1019 self.assertEqual(prop.proposal, 1)
1020 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1022 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1024 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1025 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1026 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1027 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1029 self.verify_nat_detection(packet)
1030 self.sa.set_ike_props(
1031 crypto="AES-GCM-16ICV",
1034 prf="PRF_HMAC_SHA2_256",
1037 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1038 self.sa.generate_dh_data()
1039 self.sa.complete_dh_data()
1042 def update_esp_transforms(self, trans, sa):
1044 if trans.transform_type == 1: # ecryption
1045 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1046 elif trans.transform_type == 3: # integrity
1047 sa.esp_integ = INTEG_IDS[trans.transform_id]
1048 trans = trans.payload
1050 def verify_sa_auth_req(self, packet):
1052 self.sa.dport = udp.sport
1053 ih = self.get_ike_header(packet)
1054 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1055 self.assertEqual(ih.init_SPI, self.sa.ispi)
1056 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1057 self.assertIn("Initiator", ih.flags)
1058 self.assertNotIn("Response", ih.flags)
1061 self.verify_udp(udp)
1062 self.assertEqual(ih.id, self.sa.msg_id + 1)
1064 plain = self.sa.hmac_and_decrypt(ih)
1065 idi = ikev2.IKEv2_payload_IDi(plain)
1066 self.assertEqual(idi.load, self.sa.i_id)
1067 if self.no_idr_auth:
1068 self.assertEqual(idi.next_payload, 39) # AUTH
1070 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1071 self.assertEqual(idr.load, self.sa.r_id)
1072 prop = idi[ikev2.IKEv2_payload_Proposal]
1073 c = self.sa.child_sas[0]
1075 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1077 def send_init_response(self):
1078 tr_attr = self.sa.ike_crypto_attr()
1080 ikev2.IKEv2_payload_Transform(
1081 transform_type="Encryption",
1082 transform_id=self.sa.ike_crypto,
1084 key_length=tr_attr[0],
1086 / ikev2.IKEv2_payload_Transform(
1087 transform_type="Integrity", transform_id=self.sa.ike_integ
1089 / ikev2.IKEv2_payload_Transform(
1090 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1092 / ikev2.IKEv2_payload_Transform(
1093 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1096 props = ikev2.IKEv2_payload_Proposal(
1097 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1100 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1102 dst_address = b"\x0a\x0a\x0a\x0a"
1104 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1105 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1106 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1108 self.sa.init_resp_packet = (
1110 init_SPI=self.sa.ispi,
1111 resp_SPI=self.sa.rspi,
1112 exch_type="IKE_SA_INIT",
1115 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1116 / ikev2.IKEv2_payload_KE(
1117 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1119 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1120 / ikev2.IKEv2_payload_Notify(
1121 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1123 / ikev2.IKEv2_payload_Notify(
1124 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1128 ike_msg = self.create_packet(
1130 self.sa.init_resp_packet,
1136 self.pg_send(self.pg0, ike_msg)
1137 capture = self.pg0.get_capture(1)
1138 self.verify_sa_auth_req(capture[0])
1140 def initiate_sa_init(self):
1141 self.pg0.enable_capture()
1143 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1145 capture = self.pg0.get_capture(1)
1146 self.verify_sa_init_request(capture[0])
1147 self.send_init_response()
1149 def send_auth_response(self):
1150 tr_attr = self.sa.esp_crypto_attr()
1152 ikev2.IKEv2_payload_Transform(
1153 transform_type="Encryption",
1154 transform_id=self.sa.esp_crypto,
1156 key_length=tr_attr[0],
1158 / ikev2.IKEv2_payload_Transform(
1159 transform_type="Integrity", transform_id=self.sa.esp_integ
1161 / ikev2.IKEv2_payload_Transform(
1162 transform_type="Extended Sequence Number", transform_id="No ESN"
1164 / ikev2.IKEv2_payload_Transform(
1165 transform_type="Extended Sequence Number", transform_id="ESN"
1169 c = self.sa.child_sas[0]
1170 props = ikev2.IKEv2_payload_Proposal(
1171 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1174 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1176 ikev2.IKEv2_payload_IDi(
1177 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1179 / ikev2.IKEv2_payload_IDr(
1180 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1182 / ikev2.IKEv2_payload_AUTH(
1184 auth_type=AuthMethod.value(self.sa.auth_method),
1185 load=self.sa.auth_data,
1187 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1188 / ikev2.IKEv2_payload_TSi(
1189 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1191 / ikev2.IKEv2_payload_TSr(
1192 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1194 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1197 header = ikev2.IKEv2(
1198 init_SPI=self.sa.ispi,
1199 resp_SPI=self.sa.rspi,
1200 id=self.sa.new_msg_id(),
1202 exch_type="IKE_AUTH",
1205 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1206 packet = self.create_packet(
1207 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1209 self.pg_send(self.pg0, packet)
1211 def test_initiator(self):
1212 self.initiate_sa_init()
1214 self.sa.calc_child_keys()
1215 self.send_auth_response()
1216 self.verify_ike_sas()
1219 class TemplateResponder(IkePeer):
1220 """responder test template"""
1222 def initiate_del_sa_from_responder(self):
1223 self.pg0.enable_capture()
1225 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1226 capture = self.pg0.get_capture(1)
1227 ih = self.get_ike_header(capture[0])
1228 self.assertNotIn("Response", ih.flags)
1229 self.assertNotIn("Initiator", ih.flags)
1230 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1231 plain = self.sa.hmac_and_decrypt(ih)
1232 d = ikev2.IKEv2_payload_Delete(plain)
1233 self.assertEqual(d.proto, 1) # proto=IKEv2
1234 self.assertEqual(ih.init_SPI, self.sa.ispi)
1235 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1236 header = ikev2.IKEv2(
1237 init_SPI=self.sa.ispi,
1238 resp_SPI=self.sa.rspi,
1239 flags="Initiator+Response",
1240 exch_type="INFORMATIONAL",
1242 next_payload="Encrypted",
1244 resp = self.encrypt_ike_msg(header, b"", None)
1245 self.send_and_assert_no_replies(self.pg0, resp)
1247 def verify_del_sa(self, packet):
1248 ih = self.get_ike_header(packet)
1249 self.assertEqual(ih.id, self.sa.msg_id)
1250 self.assertEqual(ih.exch_type, 37) # exchange informational
1251 self.assertIn("Response", ih.flags)
1252 self.assertNotIn("Initiator", ih.flags)
1253 self.assertEqual(ih.next_payload, 46) # Encrypted
1254 self.assertEqual(ih.init_SPI, self.sa.ispi)
1255 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1256 plain = self.sa.hmac_and_decrypt(ih)
1257 self.assertEqual(plain, b"")
1259 def initiate_del_sa_from_initiator(self):
1260 header = ikev2.IKEv2(
1261 init_SPI=self.sa.ispi,
1262 resp_SPI=self.sa.rspi,
1264 exch_type="INFORMATIONAL",
1265 id=self.sa.new_msg_id(),
1267 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1268 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1269 packet = self.create_packet(
1270 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1272 self.pg0.add_stream(packet)
1273 self.pg0.enable_capture()
1275 capture = self.pg0.get_capture(1)
1276 self.verify_del_sa(capture[0])
1278 def send_sa_init_req(self):
1279 tr_attr = self.sa.ike_crypto_attr()
1281 ikev2.IKEv2_payload_Transform(
1282 transform_type="Encryption",
1283 transform_id=self.sa.ike_crypto,
1285 key_length=tr_attr[0],
1287 / ikev2.IKEv2_payload_Transform(
1288 transform_type="Integrity", transform_id=self.sa.ike_integ
1290 / ikev2.IKEv2_payload_Transform(
1291 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1293 / ikev2.IKEv2_payload_Transform(
1294 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1298 props = ikev2.IKEv2_payload_Proposal(
1299 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1302 next_payload = None if self.ip6 else "Notify"
1304 self.sa.init_req_packet = (
1306 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1308 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1309 / ikev2.IKEv2_payload_KE(
1310 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1312 / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1317 src_address = b"\x0a\x0a\x0a\x01"
1319 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1322 dst_address = b"\x0a\x0a\x0a\x0a"
1324 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1326 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1327 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1328 nat_src_detection = ikev2.IKEv2_payload_Notify(
1329 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1331 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1332 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1334 self.sa.init_req_packet = (
1335 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1338 ike_msg = self.create_packet(
1340 self.sa.init_req_packet,
1346 self.pg0.add_stream(ike_msg)
1347 self.pg0.enable_capture()
1349 capture = self.pg0.get_capture(1)
1350 self.verify_sa_init(capture[0])
1352 def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1353 tr_attr = self.sa.esp_crypto_attr()
1354 last_payload = last_payload or "Notify"
1357 ikev2.IKEv2_payload_Transform(
1358 transform_type="Encryption",
1359 transform_id=self.sa.esp_crypto,
1361 key_length=tr_attr[0],
1363 / ikev2.IKEv2_payload_Transform(
1364 transform_type="Integrity", transform_id=self.sa.esp_integ
1366 / ikev2.IKEv2_payload_Transform(
1367 transform_type="Extended Sequence Number", transform_id="No ESN"
1369 / ikev2.IKEv2_payload_Transform(
1370 transform_type="Extended Sequence Number", transform_id="ESN"
1376 trans /= ikev2.IKEv2_payload_Transform(
1377 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1380 c = self.sa.child_sas[0]
1381 props = ikev2.IKEv2_payload_Proposal(
1390 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1392 ikev2.IKEv2_payload_AUTH(
1394 auth_type=AuthMethod.value(self.sa.auth_method),
1395 load=self.sa.auth_data,
1397 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1398 / ikev2.IKEv2_payload_TSi(
1399 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1401 / ikev2.IKEv2_payload_TSr(
1402 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1407 first_payload = "Nonce"
1409 head = ikev2.IKEv2_payload_Nonce(
1410 load=self.sa.i_nonce, next_payload="KE"
1411 ) / ikev2.IKEv2_payload_KE(
1412 group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1415 head = ikev2.IKEv2_payload_Nonce(
1416 load=self.sa.i_nonce, next_payload="SA"
1421 / ikev2.IKEv2_payload_Notify(
1425 length=8 + len(c.ispi),
1426 next_payload="Notify",
1428 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1431 first_payload = "IDi"
1432 if self.no_idr_auth:
1433 ids = ikev2.IKEv2_payload_IDi(
1434 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1437 ids = ikev2.IKEv2_payload_IDi(
1438 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1439 ) / ikev2.IKEv2_payload_IDr(
1440 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1443 return plain, first_payload
1445 def send_sa_auth(self):
1446 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1447 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1448 header = ikev2.IKEv2(
1449 init_SPI=self.sa.ispi,
1450 resp_SPI=self.sa.rspi,
1451 id=self.sa.new_msg_id(),
1453 exch_type="IKE_AUTH",
1456 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1457 packet = self.create_packet(
1458 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1460 self.pg0.add_stream(packet)
1461 self.pg0.enable_capture()
1463 capture = self.pg0.get_capture(1)
1464 self.verify_sa_auth_resp(capture[0])
1466 def verify_sa_init(self, packet):
1467 ih = self.get_ike_header(packet)
1469 self.assertEqual(ih.id, self.sa.msg_id)
1470 self.assertEqual(ih.exch_type, 34)
1471 self.assertIn("Response", ih.flags)
1472 self.assertEqual(ih.init_SPI, self.sa.ispi)
1473 self.assertNotEqual(ih.resp_SPI, 0)
1474 self.sa.rspi = ih.resp_SPI
1476 sa = ih[ikev2.IKEv2_payload_SA]
1477 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1478 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1479 except IndexError as e:
1480 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1481 self.logger.error(ih.show())
1483 self.sa.complete_dh_data()
1487 def verify_sa_auth_resp(self, packet):
1488 ike = self.get_ike_header(packet)
1490 self.verify_udp(udp)
1491 self.assertEqual(ike.id, self.sa.msg_id)
1492 plain = self.sa.hmac_and_decrypt(ike)
1493 idr = ikev2.IKEv2_payload_IDr(plain)
1494 prop = idr[ikev2.IKEv2_payload_Proposal]
1495 self.assertEqual(prop.SPIsize, 4)
1496 self.sa.child_sas[0].rspi = prop.SPI
1497 self.sa.calc_child_keys()
1499 IKE_NODE_SUFFIX = "ip4"
1501 def verify_counters(self):
1502 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1503 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1504 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1506 r = self.vapi.ikev2_sa_dump()
1508 self.assertEqual(1, s.n_sa_auth_req)
1509 self.assertEqual(1, s.n_sa_init_req)
1511 def test_responder(self):
1512 self.send_sa_init_req()
1514 self.verify_ipsec_sas()
1515 self.verify_ike_sas()
1516 self.verify_counters()
1519 class Ikev2Params(object):
1520 def config_params(self, params={}):
1521 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1522 ei = VppEnum.vl_api_ipsec_integ_alg_t
1524 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1525 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1526 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1527 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1528 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1529 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1530 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1531 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1532 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1533 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1536 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1538 self.vapi.cli("ikev2 dpd disable")
1539 self.del_sa_from_responder = (
1541 if "del_sa_from_responder" not in params
1542 else params["del_sa_from_responder"]
1544 i_natt = False if "i_natt" not in params else params["i_natt"]
1545 r_natt = False if "r_natt" not in params else params["r_natt"]
1546 self.p = Profile(self, "pr1")
1547 self.ip6 = False if "ip6" not in params else params["ip6"]
1549 if "auth" in params and params["auth"] == "rsa-sig":
1550 auth_method = "rsa-sig"
1551 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1552 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1554 client_file = work_dir + params["client-cert"]
1555 server_pem = open(work_dir + params["server-cert"]).read()
1556 client_priv = open(work_dir + params["client-key"]).read()
1557 client_priv = load_pem_private_key(
1558 str.encode(client_priv), None, default_backend()
1560 self.peer_cert = x509.load_pem_x509_certificate(
1561 str.encode(server_pem), default_backend()
1563 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1566 auth_data = b"$3cr3tpa$$w0rd"
1567 self.p.add_auth(method="shared-key", data=auth_data)
1568 auth_method = "shared-key"
1571 is_init = True if "is_initiator" not in params else params["is_initiator"]
1572 self.no_idr_auth = params.get("no_idr_in_auth", False)
1574 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1575 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1576 r_id = self.idr = idr["data"]
1577 i_id = self.idi = idi["data"]
1579 # scapy is initiator, VPP is responder
1580 self.p.add_local_id(**idr)
1581 self.p.add_remote_id(**idi)
1582 if self.no_idr_auth:
1585 # VPP is initiator, scapy is responder
1586 self.p.add_local_id(**idi)
1587 if not self.no_idr_auth:
1588 self.p.add_remote_id(**idr)
1591 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1592 if "loc_ts" not in params
1593 else params["loc_ts"]
1596 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1597 if "rem_ts" not in params
1598 else params["rem_ts"]
1600 self.p.add_local_ts(**loc_ts)
1601 self.p.add_remote_ts(**rem_ts)
1602 if "responder" in params:
1603 self.p.add_responder(params["responder"])
1604 if "ike_transforms" in params:
1605 self.p.add_ike_transforms(params["ike_transforms"])
1606 if "esp_transforms" in params:
1607 self.p.add_esp_transforms(params["esp_transforms"])
1609 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1611 self.p.set_udp_encap(True)
1613 if "responder_hostname" in params:
1614 hn = params["responder_hostname"]
1615 self.p.add_responder_hostname(hn)
1617 # configure static dns record
1618 self.vapi.dns_name_server_add_del(
1619 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1621 self.vapi.dns_enable_disable(enable=1)
1623 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1630 is_initiator=is_init,
1631 id_type=self.p.local_id["id_type"],
1634 priv_key=client_priv,
1635 auth_method=auth_method,
1636 nonce=params.get("nonce"),
1637 auth_data=auth_data,
1638 udp_encap=udp_encap,
1639 local_ts=self.p.remote_ts,
1640 remote_ts=self.p.local_ts,
1645 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1648 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1650 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1653 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1656 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1659 self.sa.set_ike_props(
1660 crypto=ike_crypto[0],
1661 crypto_key_len=ike_crypto[1],
1663 prf="PRF_HMAC_SHA2_256",
1666 self.sa.set_esp_props(
1667 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1671 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1672 class TestApi(VppTestCase):
1673 """Test IKEV2 API"""
1676 def setUpClass(cls):
1677 super(TestApi, cls).setUpClass()
1680 def tearDownClass(cls):
1681 super(TestApi, cls).tearDownClass()
1684 super(TestApi, self).tearDown()
1685 self.p1.remove_vpp_config()
1686 self.p2.remove_vpp_config()
1687 r = self.vapi.ikev2_profile_dump()
1688 self.assertEqual(len(r), 0)
1690 def configure_profile(self, cfg):
1691 p = Profile(self, cfg["name"])
1692 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1693 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1694 p.add_local_ts(**cfg["loc_ts"])
1695 p.add_remote_ts(**cfg["rem_ts"])
1696 p.add_responder(cfg["responder"])
1697 p.add_ike_transforms(cfg["ike_ts"])
1698 p.add_esp_transforms(cfg["esp_ts"])
1699 p.add_auth(**cfg["auth"])
1700 p.set_udp_encap(cfg["udp_encap"])
1701 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1702 if "lifetime_data" in cfg:
1703 p.set_lifetime_data(cfg["lifetime_data"])
1704 if "tun_itf" in cfg:
1705 p.set_tunnel_interface(cfg["tun_itf"])
1706 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1711 def test_profile_api(self):
1712 """test profile dump API"""
1717 "start_addr": "3.3.3.2",
1718 "end_addr": "3.3.3.3",
1724 "start_addr": "4.5.76.80",
1725 "end_addr": "2.3.4.6",
1732 "start_addr": "ab::1",
1733 "end_addr": "ab::4",
1739 "start_addr": "cd::12",
1740 "end_addr": "cd::13",
1746 "natt_disabled": True,
1747 "loc_id": ("fqdn", b"vpp.home"),
1748 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1751 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1754 "crypto_key_size": 32,
1758 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1759 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1761 "ipsec_over_udp_port": 4501,
1764 "lifetime_maxdata": 20192,
1765 "lifetime_jitter": 9,
1771 "loc_id": ("ip4-addr", b"192.168.2.1"),
1772 "rem_id": ("ip6-addr", b"abcd::1"),
1775 "responder": {"sw_if_index": 4, "addr": "def::10"},
1778 "crypto_key_size": 16,
1782 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1783 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1785 "ipsec_over_udp_port": 4600,
1789 self.p1 = self.configure_profile(conf["p1"])
1790 self.p2 = self.configure_profile(conf["p2"])
1792 r = self.vapi.ikev2_profile_dump()
1793 self.assertEqual(len(r), 2)
1794 self.verify_profile(r[0].profile, conf["p1"])
1795 self.verify_profile(r[1].profile, conf["p2"])
1797 def verify_id(self, api_id, cfg_id):
1798 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1799 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1801 def verify_ts(self, api_ts, cfg_ts):
1802 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1803 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1804 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1805 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1806 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1808 def verify_responder(self, api_r, cfg_r):
1809 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1810 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1812 def verify_transforms(self, api_ts, cfg_ts):
1813 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1814 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1815 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1817 def verify_ike_transforms(self, api_ts, cfg_ts):
1818 self.verify_transforms(api_ts, cfg_ts)
1819 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1821 def verify_esp_transforms(self, api_ts, cfg_ts):
1822 self.verify_transforms(api_ts, cfg_ts)
1824 def verify_auth(self, api_auth, cfg_auth):
1825 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1826 self.assertEqual(api_auth.data, cfg_auth["data"])
1827 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1829 def verify_lifetime_data(self, p, ld):
1830 self.assertEqual(p.lifetime, ld["lifetime"])
1831 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1832 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1833 self.assertEqual(p.handover, ld["handover"])
1835 def verify_profile(self, ap, cp):
1836 self.assertEqual(ap.name, cp["name"])
1837 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1838 self.verify_id(ap.loc_id, cp["loc_id"])
1839 self.verify_id(ap.rem_id, cp["rem_id"])
1840 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1841 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1842 self.verify_responder(ap.responder, cp["responder"])
1843 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1844 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1845 self.verify_auth(ap.auth, cp["auth"])
1846 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1847 self.assertTrue(natt_dis == ap.natt_disabled)
1849 if "lifetime_data" in cp:
1850 self.verify_lifetime_data(ap, cp["lifetime_data"])
1851 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1853 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1855 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1858 @tag_fixme_vpp_workers
1859 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1860 """test responder - responder behind NAT"""
1862 IKE_NODE_SUFFIX = "ip4-natt"
1864 def config_tc(self):
1865 self.config_params({"r_natt": True})
1868 @tag_fixme_vpp_workers
1869 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1870 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1872 def config_tc(self):
1876 "is_initiator": False, # seen from test case perspective
1877 # thus vpp is initiator
1879 "sw_if_index": self.pg0.sw_if_index,
1880 "addr": self.pg0.remote_ip4,
1882 "ike-crypto": ("AES-GCM-16ICV", 32),
1883 "ike-integ": "NULL",
1884 "ike-dh": "3072MODPgr",
1886 "crypto_alg": 20, # "aes-gcm-16"
1887 "crypto_key_size": 256,
1888 "dh_group": 15, # "modp-3072"
1891 "crypto_alg": 12, # "aes-cbc"
1892 "crypto_key_size": 256,
1893 # "hmac-sha2-256-128"
1900 @tag_fixme_vpp_workers
1901 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1902 """test ikev2 initiator - pre shared key auth"""
1904 def config_tc(self):
1907 "is_initiator": False, # seen from test case perspective
1908 # thus vpp is initiator
1909 "ike-crypto": ("AES-GCM-16ICV", 32),
1910 "ike-integ": "NULL",
1911 "ike-dh": "3072MODPgr",
1913 "crypto_alg": 20, # "aes-gcm-16"
1914 "crypto_key_size": 256,
1915 "dh_group": 15, # "modp-3072"
1918 "crypto_alg": 12, # "aes-cbc"
1919 "crypto_key_size": 256,
1920 # "hmac-sha2-256-128"
1923 "responder_hostname": {
1924 "hostname": "vpp.responder.org",
1925 "sw_if_index": self.pg0.sw_if_index,
1931 @tag_fixme_vpp_workers
1932 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1933 """test initiator - request window size (1)"""
1935 def rekey_respond(self, req, update_child_sa_data):
1936 ih = self.get_ike_header(req)
1937 plain = self.sa.hmac_and_decrypt(ih)
1938 sa = ikev2.IKEv2_payload_SA(plain)
1939 if update_child_sa_data:
1940 prop = sa[ikev2.IKEv2_payload_Proposal]
1941 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1942 self.sa.r_nonce = self.sa.i_nonce
1943 self.sa.child_sas[0].ispi = prop.SPI
1944 self.sa.child_sas[0].rspi = prop.SPI
1945 self.sa.calc_child_keys()
1947 header = ikev2.IKEv2(
1948 init_SPI=self.sa.ispi,
1949 resp_SPI=self.sa.rspi,
1953 next_payload="Encrypted",
1955 resp = self.encrypt_ike_msg(header, sa, "SA")
1956 packet = self.create_packet(
1957 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1959 self.send_and_assert_no_replies(self.pg0, packet)
1961 def test_initiator(self):
1962 super(TestInitiatorRequestWindowSize, self).test_initiator()
1963 self.pg0.enable_capture()
1965 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1966 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1967 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1968 capture = self.pg0.get_capture(2)
1970 # reply in reverse order
1971 self.rekey_respond(capture[1], True)
1972 self.rekey_respond(capture[0], False)
1974 # verify that only the second request was accepted
1975 self.verify_ike_sas()
1976 self.verify_ipsec_sas(is_rekey=True)
1979 @tag_fixme_vpp_workers
1980 class TestInitiatorRekey(TestInitiatorPsk):
1981 """test ikev2 initiator - rekey"""
1983 def rekey_from_initiator(self):
1984 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1985 self.pg0.enable_capture()
1987 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1988 capture = self.pg0.get_capture(1)
1989 ih = self.get_ike_header(capture[0])
1990 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1991 self.assertNotIn("Response", ih.flags)
1992 self.assertIn("Initiator", ih.flags)
1993 plain = self.sa.hmac_and_decrypt(ih)
1994 sa = ikev2.IKEv2_payload_SA(plain)
1995 prop = sa[ikev2.IKEv2_payload_Proposal]
1996 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1997 self.sa.r_nonce = self.sa.i_nonce
1998 # update new responder SPI
1999 self.sa.child_sas[0].ispi = prop.SPI
2000 self.sa.child_sas[0].rspi = prop.SPI
2001 self.sa.calc_child_keys()
2002 header = ikev2.IKEv2(
2003 init_SPI=self.sa.ispi,
2004 resp_SPI=self.sa.rspi,
2008 next_payload="Encrypted",
2010 resp = self.encrypt_ike_msg(header, sa, "SA")
2011 packet = self.create_packet(
2012 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2014 self.send_and_assert_no_replies(self.pg0, packet)
2016 def test_initiator(self):
2017 super(TestInitiatorRekey, self).test_initiator()
2018 self.rekey_from_initiator()
2019 self.verify_ike_sas()
2020 self.verify_ipsec_sas(is_rekey=True)
2023 @tag_fixme_vpp_workers
2024 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2025 """test ikev2 initiator - delete IKE SA from responder"""
2027 def config_tc(self):
2030 "del_sa_from_responder": True,
2031 "is_initiator": False, # seen from test case perspective
2032 # thus vpp is initiator
2034 "sw_if_index": self.pg0.sw_if_index,
2035 "addr": self.pg0.remote_ip4,
2037 "ike-crypto": ("AES-GCM-16ICV", 32),
2038 "ike-integ": "NULL",
2039 "ike-dh": "3072MODPgr",
2041 "crypto_alg": 20, # "aes-gcm-16"
2042 "crypto_key_size": 256,
2043 "dh_group": 15, # "modp-3072"
2046 "crypto_alg": 12, # "aes-cbc"
2047 "crypto_key_size": 256,
2048 # "hmac-sha2-256-128"
2051 "no_idr_in_auth": True,
2056 @tag_fixme_vpp_workers
2057 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2058 """test ikev2 responder - initiator behind NAT"""
2060 IKE_NODE_SUFFIX = "ip4-natt"
2062 def config_tc(self):
2063 self.config_params({"i_natt": True})
2066 @tag_fixme_vpp_workers
2067 class TestResponderPsk(TemplateResponder, Ikev2Params):
2068 """test ikev2 responder - pre shared key auth"""
2070 def config_tc(self):
2071 self.config_params()
2074 @tag_fixme_vpp_workers
2075 class TestResponderDpd(TestResponderPsk):
2077 Dead peer detection test
2080 def config_tc(self):
2081 self.config_params({"dpd_disabled": False})
2086 def test_responder(self):
2087 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2088 super(TestResponderDpd, self).test_responder()
2089 self.pg0.enable_capture()
2091 # capture empty request but don't reply
2092 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2093 ih = self.get_ike_header(capture[0])
2094 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2095 plain = self.sa.hmac_and_decrypt(ih)
2096 self.assertEqual(plain, b"")
2097 # wait for SA expiration
2099 ike_sas = self.vapi.ikev2_sa_dump()
2100 self.assertEqual(len(ike_sas), 0)
2101 ipsec_sas = self.vapi.ipsec_sa_dump()
2102 self.assertEqual(len(ipsec_sas), 0)
2105 @tag_fixme_vpp_workers
2106 class TestResponderRekey(TestResponderPsk):
2107 """test ikev2 responder - rekey"""
2111 def send_rekey_from_initiator(self):
2113 self.sa.generate_dh_data()
2114 packet = self.create_rekey_request(kex=self.WITH_KEX)
2115 self.pg0.add_stream(packet)
2116 self.pg0.enable_capture()
2118 capture = self.pg0.get_capture(1)
2121 def process_rekey_response(self, capture):
2122 ih = self.get_ike_header(capture[0])
2123 plain = self.sa.hmac_and_decrypt(ih)
2124 sa = ikev2.IKEv2_payload_SA(plain)
2125 prop = sa[ikev2.IKEv2_payload_Proposal]
2126 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2127 # update new responder SPI
2128 self.sa.child_sas[0].rspi = prop.SPI
2130 self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2131 self.sa.complete_dh_data()
2132 self.sa.calc_child_keys(kex=self.WITH_KEX)
2134 def test_responder(self):
2135 super(TestResponderRekey, self).test_responder()
2136 self.process_rekey_response(self.send_rekey_from_initiator())
2137 self.verify_ike_sas()
2138 self.verify_ipsec_sas(is_rekey=True)
2139 self.assert_counter(1, "rekey_req", "ip4")
2140 r = self.vapi.ikev2_sa_dump()
2141 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2144 @tag_fixme_vpp_workers
2145 class TestResponderRekeyRepeat(TestResponderRekey):
2146 """test ikev2 responder - rekey repeat"""
2148 def test_responder(self):
2149 super(TestResponderRekeyRepeat, self).test_responder()
2150 # rekey request is not accepted until old IPsec SA is expired
2151 capture = self.send_rekey_from_initiator()
2152 ih = self.get_ike_header(capture[0])
2153 plain = self.sa.hmac_and_decrypt(ih)
2154 notify = ikev2.IKEv2_payload_Notify(plain)
2155 self.assertEqual(notify.type, 43)
2156 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2157 # rekey request is accepted after old IPsec SA was expired
2159 if len(self.vapi.ipsec_sa_dump()) != 3:
2163 self.fail("old IPsec SA not expired")
2164 self.process_rekey_response(self.send_rekey_from_initiator())
2165 self.verify_ike_sas()
2166 self.verify_ipsec_sas(sa_count=3)
2169 @tag_fixme_vpp_workers
2170 class TestResponderRekeyKEX(TestResponderRekey):
2171 """test ikev2 responder - rekey with key exchange"""
2176 @tag_fixme_vpp_workers
2177 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2178 """test ikev2 responder - rekey repeat with key exchange"""
2183 @tag_fixme_ubuntu2204
2185 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2186 """test ikev2 responder - non-default table id"""
2189 def setUpClass(cls):
2190 import scapy.contrib.ikev2 as _ikev2
2192 globals()["ikev2"] = _ikev2
2193 super(IkePeer, cls).setUpClass()
2194 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2198 cls.create_pg_interfaces(range(1))
2199 cls.vapi.cli("ip table add 1")
2200 cls.vapi.cli("set interface ip table pg0 1")
2201 for i in cls.pg_interfaces:
2208 def config_tc(self):
2209 self.config_params({"dpd_disabled": False})
2211 def test_responder(self):
2212 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2213 super(TestResponderVrf, self).test_responder()
2214 self.pg0.enable_capture()
2216 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2217 ih = self.get_ike_header(capture[0])
2218 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2219 plain = self.sa.hmac_and_decrypt(ih)
2220 self.assertEqual(plain, b"")
2223 @tag_fixme_vpp_workers
2224 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2225 """test ikev2 responder - cert based auth"""
2227 def config_tc(self):
2232 "server-key": "server-key.pem",
2233 "client-key": "client-key.pem",
2234 "client-cert": "client-cert.pem",
2235 "server-cert": "server-cert.pem",
2240 @tag_fixme_vpp_workers
2241 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2242 TemplateResponder, Ikev2Params
2245 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2248 def config_tc(self):
2251 "ike-crypto": ("AES-CBC", 16),
2252 "ike-integ": "SHA2-256-128",
2253 "esp-crypto": ("AES-CBC", 24),
2254 "esp-integ": "SHA2-384-192",
2255 "ike-dh": "2048MODPgr",
2256 "nonce": os.urandom(256),
2257 "no_idr_in_auth": True,
2262 @tag_fixme_vpp_workers
2263 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2264 TemplateResponder, Ikev2Params
2268 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2271 def config_tc(self):
2274 "ike-crypto": ("AES-CBC", 32),
2275 "ike-integ": "SHA2-256-128",
2276 "esp-crypto": ("AES-GCM-16ICV", 32),
2277 "esp-integ": "NULL",
2278 "ike-dh": "3072MODPgr",
2283 @tag_fixme_vpp_workers
2284 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2289 IKE_NODE_SUFFIX = "ip6"
2291 def config_tc(self):
2294 "del_sa_from_responder": True,
2297 "ike-crypto": ("AES-GCM-16ICV", 32),
2298 "ike-integ": "NULL",
2299 "ike-dh": "2048MODPgr",
2300 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2301 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2306 @tag_fixme_vpp_workers
2307 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2309 Test for keep alive messages
2312 def send_empty_req_from_responder(self):
2313 packet = self.create_empty_request()
2314 self.pg0.add_stream(packet)
2315 self.pg0.enable_capture()
2317 capture = self.pg0.get_capture(1)
2318 ih = self.get_ike_header(capture[0])
2319 self.assertEqual(ih.id, self.sa.msg_id)
2320 plain = self.sa.hmac_and_decrypt(ih)
2321 self.assertEqual(plain, b"")
2322 self.assert_counter(1, "keepalive", "ip4")
2323 r = self.vapi.ikev2_sa_dump()
2324 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2326 def test_initiator(self):
2327 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2328 self.send_empty_req_from_responder()
2331 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2332 """malformed packet test"""
2337 def config_tc(self):
2338 self.config_params()
2340 def create_ike_init_msg(self, length=None, payload=None):
2343 init_SPI="\x11" * 8,
2345 exch_type="IKE_SA_INIT",
2347 if payload is not None:
2349 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2351 def verify_bad_packet_length(self):
2352 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2353 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2354 self.assert_counter(self.pkt_count, "bad_length")
2356 def verify_bad_sa_payload_length(self):
2357 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2358 ike_msg = self.create_ike_init_msg(payload=p)
2359 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2360 self.assert_counter(self.pkt_count, "malformed_packet")
2362 def test_responder(self):
2363 self.pkt_count = 254
2364 self.verify_bad_packet_length()
2365 self.verify_bad_sa_payload_length()
2368 if __name__ == "__main__":
2369 unittest.main(testRunner=VppTestRunner)