3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
23 from framework import is_distro_ubuntu2204, is_distro_debian11
24 from framework import VppTestCase, VppTestRunner
25 from vpp_ikev2 import Profile, IDType, AuthMethod
26 from vpp_papi import VppEnum
33 KEY_PAD = b"Key Pad for IKEv2"
40 # tuple structure is (p, g, key_len)
45 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
63 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
64 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
65 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
66 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
67 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
68 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
69 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
70 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
71 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
72 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
73 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
74 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
75 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
76 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
77 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
78 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
86 class CryptoAlgo(object):
87 def __init__(self, name, cipher, mode):
91 if self.cipher is not None:
92 self.bs = self.cipher.block_size // 8
94 if self.name == "AES-GCM-16ICV":
95 self.iv_len = GCM_IV_SIZE
99 def encrypt(self, data, key, aad=None):
100 iv = os.urandom(self.iv_len)
103 self.cipher(key), self.mode(iv), default_backend()
105 return iv + encryptor.update(data) + encryptor.finalize()
107 salt = key[-SALT_SIZE:]
110 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
112 encryptor.authenticate_additional_data(aad)
113 data = encryptor.update(data) + encryptor.finalize()
114 data += encryptor.tag[:GCM_ICV_SIZE]
117 def decrypt(self, data, key, aad=None, icv=None):
119 iv = data[: self.iv_len]
120 ct = data[self.iv_len :]
122 algorithms.AES(key), self.mode(iv), default_backend()
124 return decryptor.update(ct) + decryptor.finalize()
126 salt = key[-SALT_SIZE:]
127 nonce = salt + data[:GCM_IV_SIZE]
128 ct = data[GCM_IV_SIZE:]
129 key = key[:-SALT_SIZE]
131 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
133 decryptor.authenticate_additional_data(aad)
134 return decryptor.update(ct) + decryptor.finalize()
137 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
138 data = data + b"\x00" * (pad_len - 1)
139 return data + bytes([pad_len - 1])
142 class AuthAlgo(object):
143 def __init__(self, name, mac, mod, key_len, trunc_len=None):
147 self.key_len = key_len
148 self.trunc_len = trunc_len or key_len
152 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
153 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
154 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
158 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
159 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
160 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
161 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
162 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
166 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
167 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
183 class IKEv2ChildSA(object):
184 def __init__(self, local_ts, remote_ts, is_initiator):
192 self.local_ts = local_ts
193 self.remote_ts = remote_ts
196 class IKEv2SA(object):
203 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
209 auth_method="shared-key",
215 self.udp_encap = udp_encap
225 self.dh_params = None
227 self.priv_key = priv_key
228 self.is_initiator = is_initiator
229 nonce = nonce or os.urandom(32)
230 self.auth_data = auth_data
233 if isinstance(id_type, str):
234 self.id_type = IDType.value(id_type)
236 self.id_type = id_type
237 self.auth_method = auth_method
238 if self.is_initiator:
239 self.rspi = 8 * b"\x00"
244 self.ispi = 8 * b"\x00"
246 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
248 def new_msg_id(self):
253 def my_dh_pub_key(self):
254 if self.is_initiator:
255 return self.i_dh_data
256 return self.r_dh_data
259 def peer_dh_pub_key(self):
260 if self.is_initiator:
261 return self.r_dh_data
262 return self.i_dh_data
266 return self.i_natt or self.r_natt
268 def compute_secret(self):
269 priv = self.dh_private_key
270 peer = self.peer_dh_pub_key
271 p, g, l = self.ike_group
273 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
276 def generate_dh_data(self):
278 if self.ike_dh not in DH:
279 raise NotImplementedError("%s not in DH group" % self.ike_dh)
281 if self.dh_params is None:
282 dhg = DH[self.ike_dh]
283 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
284 self.dh_params = pn.parameters(default_backend())
286 priv = self.dh_params.generate_private_key()
287 pub = priv.public_key()
288 x = priv.private_numbers().x
289 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
290 y = pub.public_numbers().y
292 if self.is_initiator:
293 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
295 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
297 def complete_dh_data(self):
298 self.dh_shared_secret = self.compute_secret()
300 def calc_child_keys(self, kex=False):
301 prf = self.ike_prf_alg.mod()
302 s = self.i_nonce + self.r_nonce
304 s = self.dh_shared_secret + s
305 c = self.child_sas[0]
307 encr_key_len = self.esp_crypto_key_len
308 integ_key_len = self.esp_integ_alg.key_len
309 salt_len = 0 if integ_key_len else 4
311 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
312 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
315 c.sk_ei = keymat[pos : pos + encr_key_len]
319 c.sk_ai = keymat[pos : pos + integ_key_len]
322 c.salt_ei = keymat[pos : pos + salt_len]
325 c.sk_er = keymat[pos : pos + encr_key_len]
329 c.sk_ar = keymat[pos : pos + integ_key_len]
332 c.salt_er = keymat[pos : pos + salt_len]
335 def calc_prfplus(self, prf, key, seed, length):
339 while len(r) < length and x < 255:
344 s = s + seed + bytes([x])
345 t = self.calc_prf(prf, key, s)
353 def calc_prf(self, prf, key, data):
354 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
359 prf = self.ike_prf_alg.mod()
360 # SKEYSEED = prf(Ni | Nr, g^ir)
361 s = self.i_nonce + self.r_nonce
362 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
364 # calculate S = Ni | Nr | SPIi SPIr
365 s = s + self.ispi + self.rspi
367 prf_key_trunc = self.ike_prf_alg.trunc_len
368 encr_key_len = self.ike_crypto_key_len
369 tr_prf_key_len = self.ike_prf_alg.key_len
370 integ_key_len = self.ike_integ_alg.key_len
371 if integ_key_len == 0:
383 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
386 self.sk_d = keymat[: pos + prf_key_trunc]
389 self.sk_ai = keymat[pos : pos + integ_key_len]
391 self.sk_ar = keymat[pos : pos + integ_key_len]
394 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
395 pos += encr_key_len + salt_size
396 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
397 pos += encr_key_len + salt_size
399 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
400 pos += tr_prf_key_len
401 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
403 def generate_authmsg(self, prf, packet):
404 if self.is_initiator:
412 data = bytes([self.id_type, 0, 0, 0]) + id
413 id_hash = self.calc_prf(prf, key, data)
414 return packet + nonce + id_hash
417 prf = self.ike_prf_alg.mod()
418 if self.is_initiator:
419 packet = self.init_req_packet
421 packet = self.init_resp_packet
422 authmsg = self.generate_authmsg(prf, raw(packet))
423 if self.auth_method == "shared-key":
424 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
425 self.auth_data = self.calc_prf(prf, psk, authmsg)
426 elif self.auth_method == "rsa-sig":
427 self.auth_data = self.priv_key.sign(
428 authmsg, padding.PKCS1v15(), hashes.SHA1()
431 raise TypeError("unknown auth method type!")
433 def encrypt(self, data, aad=None):
434 data = self.ike_crypto_alg.pad(data)
435 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
438 def peer_authkey(self):
439 if self.is_initiator:
444 def my_authkey(self):
445 if self.is_initiator:
450 def my_cryptokey(self):
451 if self.is_initiator:
456 def peer_cryptokey(self):
457 if self.is_initiator:
461 def concat(self, alg, key_len):
462 return alg + "-" + str(key_len * 8)
465 def vpp_ike_cypto_alg(self):
466 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
469 def vpp_esp_cypto_alg(self):
470 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
472 def verify_hmac(self, ikemsg):
473 integ_trunc = self.ike_integ_alg.trunc_len
474 exp_hmac = ikemsg[-integ_trunc:]
475 data = ikemsg[:-integ_trunc]
476 computed_hmac = self.compute_hmac(
477 self.ike_integ_alg.mod(), self.peer_authkey, data
479 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
481 def compute_hmac(self, integ, key, data):
482 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
486 def decrypt(self, data, aad=None, icv=None):
487 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
489 def hmac_and_decrypt(self, ike):
490 ep = ike[ikev2.IKEv2_payload_Encrypted]
491 if self.ike_crypto == "AES-GCM-16ICV":
492 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
493 ct = ep.load[:-GCM_ICV_SIZE]
494 tag = ep.load[-GCM_ICV_SIZE:]
495 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
497 self.verify_hmac(raw(ike))
498 integ_trunc = self.ike_integ_alg.trunc_len
500 # remove ICV and decrypt payload
501 ct = ep.load[:-integ_trunc]
502 plain = self.decrypt(ct)
505 return plain[: -pad_len - 1]
507 def build_ts_addr(self, ts, version):
509 "starting_address_v" + version: ts["start_addr"],
510 "ending_address_v" + version: ts["end_addr"],
513 def generate_ts(self, is_ip4):
514 c = self.child_sas[0]
515 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
517 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
518 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
519 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
520 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
522 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
523 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
524 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
525 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
527 if self.is_initiator:
528 return ([ts1], [ts2])
529 return ([ts2], [ts1])
531 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
532 if crypto not in CRYPTO_ALGOS:
533 raise TypeError("unsupported encryption algo %r" % crypto)
534 self.ike_crypto = crypto
535 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
536 self.ike_crypto_key_len = crypto_key_len
538 if integ not in AUTH_ALGOS:
539 raise TypeError("unsupported auth algo %r" % integ)
540 self.ike_integ = None if integ == "NULL" else integ
541 self.ike_integ_alg = AUTH_ALGOS[integ]
543 if prf not in PRF_ALGOS:
544 raise TypeError("unsupported prf algo %r" % prf)
546 self.ike_prf_alg = PRF_ALGOS[prf]
548 self.ike_group = DH[self.ike_dh]
550 def set_esp_props(self, crypto, crypto_key_len, integ):
551 self.esp_crypto_key_len = crypto_key_len
552 if crypto not in CRYPTO_ALGOS:
553 raise TypeError("unsupported encryption algo %r" % crypto)
554 self.esp_crypto = crypto
555 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
557 if integ not in AUTH_ALGOS:
558 raise TypeError("unsupported auth algo %r" % integ)
559 self.esp_integ = None if integ == "NULL" else integ
560 self.esp_integ_alg = AUTH_ALGOS[integ]
562 def crypto_attr(self, key_len):
563 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
564 return (0x800E << 16 | key_len << 3, 12)
566 raise Exception("unsupported attribute type")
568 def ike_crypto_attr(self):
569 return self.crypto_attr(self.ike_crypto_key_len)
571 def esp_crypto_attr(self):
572 return self.crypto_attr(self.esp_crypto_key_len)
574 def compute_nat_sha1(self, ip, port, rspi=None):
577 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
578 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
580 return digest.finalize()
583 class IkePeer(VppTestCase):
584 """common class for initiator and responder"""
588 import scapy.contrib.ikev2 as _ikev2
590 globals()["ikev2"] = _ikev2
591 super(IkePeer, cls).setUpClass()
592 cls.create_pg_interfaces(range(2))
593 for i in cls.pg_interfaces:
601 def tearDownClass(cls):
602 super(IkePeer, cls).tearDownClass()
605 super(IkePeer, self).tearDown()
606 if self.del_sa_from_responder:
607 self.initiate_del_sa_from_responder()
609 self.initiate_del_sa_from_initiator()
610 r = self.vapi.ikev2_sa_dump()
611 self.assertEqual(len(r), 0)
612 sas = self.vapi.ipsec_sa_dump()
613 self.assertEqual(len(sas), 0)
614 self.p.remove_vpp_config()
615 self.assertIsNone(self.p.query_vpp_config())
618 super(IkePeer, self).setUp()
620 self.p.add_vpp_config()
621 self.assertIsNotNone(self.p.query_vpp_config())
622 if self.sa.is_initiator:
623 self.sa.generate_dh_data()
624 self.vapi.cli("ikev2 set logging level 4")
625 self.vapi.cli("event-lo clear")
627 def assert_counter(self, count, name, version="ip4"):
628 node_name = "/err/ikev2-%s/" % version + name
629 self.assertEqual(count, self.statistics.get_err_counter(node_name))
631 def create_rekey_request(self, kex=False):
632 sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
633 header = ikev2.IKEv2(
634 init_SPI=self.sa.ispi,
635 resp_SPI=self.sa.rspi,
636 id=self.sa.new_msg_id(),
638 exch_type="CREATE_CHILD_SA",
641 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
642 return self.create_packet(
643 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
646 def create_empty_request(self):
647 header = ikev2.IKEv2(
648 init_SPI=self.sa.ispi,
649 resp_SPI=self.sa.rspi,
650 id=self.sa.new_msg_id(),
652 exch_type="INFORMATIONAL",
653 next_payload="Encrypted",
656 msg = self.encrypt_ike_msg(header, b"", None)
657 return self.create_packet(
658 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
662 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
665 src_ip = src_if.remote_ip6
666 dst_ip = src_if.local_ip6
669 src_ip = src_if.remote_ip4
670 dst_ip = src_if.local_ip4
673 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
674 / ip_layer(src=src_ip, dst=dst_ip)
675 / UDP(sport=sport, dport=dport)
678 # insert non ESP marker
679 res = res / Raw(b"\x00" * 4)
682 def verify_udp(self, udp):
683 self.assertEqual(udp.sport, self.sa.sport)
684 self.assertEqual(udp.dport, self.sa.dport)
686 def get_ike_header(self, packet):
688 ih = packet[ikev2.IKEv2]
689 ih = self.verify_and_remove_non_esp_marker(ih)
690 except IndexError as e:
691 # this is a workaround for getting IKEv2 layer as both ikev2 and
692 # ipsec register for port 4500
694 ih = self.verify_and_remove_non_esp_marker(esp)
695 self.assertEqual(ih.version, 0x20)
696 self.assertNotIn("Version", ih.flags)
699 def verify_and_remove_non_esp_marker(self, packet):
701 # if we are in nat traversal mode check for non esp marker
704 self.assertEqual(data[:4], b"\x00" * 4)
705 return ikev2.IKEv2(data[4:])
709 def encrypt_ike_msg(self, header, plain, first_payload):
710 if self.sa.ike_crypto == "AES-GCM-16ICV":
711 data = self.sa.ike_crypto_alg.pad(raw(plain))
716 + len(ikev2.IKEv2_payload_Encrypted())
718 tlen = plen + len(ikev2.IKEv2())
721 sk_p = ikev2.IKEv2_payload_Encrypted(
722 next_payload=first_payload, length=plen
726 encr = self.sa.encrypt(raw(plain), raw(res))
727 sk_p = ikev2.IKEv2_payload_Encrypted(
728 next_payload=first_payload, length=plen, load=encr
732 encr = self.sa.encrypt(raw(plain))
733 trunc_len = self.sa.ike_integ_alg.trunc_len
734 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
735 tlen = plen + len(ikev2.IKEv2())
737 sk_p = ikev2.IKEv2_payload_Encrypted(
738 next_payload=first_payload, length=plen, load=encr
743 integ_data = raw(res)
744 hmac_data = self.sa.compute_hmac(
745 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
747 res = res / Raw(hmac_data[:trunc_len])
748 assert len(res) == tlen
751 def verify_udp_encap(self, ipsec_sa):
752 e = VppEnum.vl_api_ipsec_sad_flags_t
753 if self.sa.udp_encap or self.sa.natt:
754 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
756 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
758 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
759 sas = self.vapi.ipsec_sa_dump()
762 # after rekey there is a short period of time in which old
763 # inbound SA is still present
767 self.assertEqual(len(sas), sa_count)
768 if self.sa.is_initiator:
783 c = self.sa.child_sas[0]
785 self.verify_udp_encap(sa0)
786 self.verify_udp_encap(sa1)
787 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
788 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
789 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
791 if self.sa.esp_integ is None:
794 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
795 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
796 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
799 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
800 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
801 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
802 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
806 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
807 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
808 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
809 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
811 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
812 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
814 def verify_keymat(self, api_keys, keys, name):
815 km = getattr(keys, name)
816 api_km = getattr(api_keys, name)
817 api_km_len = getattr(api_keys, name + "_len")
818 self.assertEqual(len(km), api_km_len)
819 self.assertEqual(km, api_km[:api_km_len])
821 def verify_id(self, api_id, exp_id):
822 self.assertEqual(api_id.type, IDType.value(exp_id.type))
823 self.assertEqual(api_id.data_len, exp_id.data_len)
824 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
826 def verify_ike_sas(self):
827 r = self.vapi.ikev2_sa_dump()
828 self.assertEqual(len(r), 1)
830 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
831 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
833 if self.sa.is_initiator:
834 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
835 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
837 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
838 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
840 if self.sa.is_initiator:
841 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
842 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
844 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
845 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
846 self.verify_keymat(sa.keys, self.sa, "sk_d")
847 self.verify_keymat(sa.keys, self.sa, "sk_ai")
848 self.verify_keymat(sa.keys, self.sa, "sk_ar")
849 self.verify_keymat(sa.keys, self.sa, "sk_ei")
850 self.verify_keymat(sa.keys, self.sa, "sk_er")
851 self.verify_keymat(sa.keys, self.sa, "sk_pi")
852 self.verify_keymat(sa.keys, self.sa, "sk_pr")
854 self.assertEqual(sa.i_id.type, self.sa.id_type)
855 self.assertEqual(sa.r_id.type, self.sa.id_type)
856 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
857 self.assertEqual(sa.r_id.data_len, len(self.idr))
858 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
859 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
861 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
862 self.assertEqual(len(r), 1)
864 self.assertEqual(csa.sa_index, sa.sa_index)
865 c = self.sa.child_sas[0]
866 if hasattr(c, "sk_ai"):
867 self.verify_keymat(csa.keys, c, "sk_ai")
868 self.verify_keymat(csa.keys, c, "sk_ar")
869 self.verify_keymat(csa.keys, c, "sk_ei")
870 self.verify_keymat(csa.keys, c, "sk_er")
871 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
872 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
874 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
877 r = self.vapi.ikev2_traffic_selector_dump(
878 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
880 self.assertEqual(len(r), 1)
882 self.verify_ts(r[0].ts, tsi[0], True)
884 r = self.vapi.ikev2_traffic_selector_dump(
885 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
887 self.assertEqual(len(r), 1)
888 self.verify_ts(r[0].ts, tsr[0], False)
890 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
891 self.verify_nonce(n, self.sa.i_nonce)
892 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
893 self.verify_nonce(n, self.sa.r_nonce)
895 def verify_nonce(self, api_nonce, nonce):
896 self.assertEqual(api_nonce.data_len, len(nonce))
897 self.assertEqual(api_nonce.nonce, nonce)
899 def verify_ts(self, api_ts, ts, is_initiator):
901 self.assertTrue(api_ts.is_local)
903 self.assertFalse(api_ts.is_local)
906 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
907 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
909 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
910 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
911 self.assertEqual(api_ts.start_port, ts.start_port)
912 self.assertEqual(api_ts.end_port, ts.end_port)
913 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
916 class TemplateInitiator(IkePeer):
917 """initiator test template"""
919 def initiate_del_sa_from_initiator(self):
920 ispi = int.from_bytes(self.sa.ispi, "little")
921 self.pg0.enable_capture()
923 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
924 capture = self.pg0.get_capture(1)
925 ih = self.get_ike_header(capture[0])
926 self.assertNotIn("Response", ih.flags)
927 self.assertIn("Initiator", ih.flags)
928 self.assertEqual(ih.init_SPI, self.sa.ispi)
929 self.assertEqual(ih.resp_SPI, self.sa.rspi)
930 plain = self.sa.hmac_and_decrypt(ih)
931 d = ikev2.IKEv2_payload_Delete(plain)
932 self.assertEqual(d.proto, 1) # proto=IKEv2
933 header = ikev2.IKEv2(
934 init_SPI=self.sa.ispi,
935 resp_SPI=self.sa.rspi,
937 exch_type="INFORMATIONAL",
939 next_payload="Encrypted",
941 resp = self.encrypt_ike_msg(header, b"", None)
942 self.send_and_assert_no_replies(self.pg0, resp)
944 def verify_del_sa(self, packet):
945 ih = self.get_ike_header(packet)
946 self.assertEqual(ih.id, self.sa.msg_id)
947 self.assertEqual(ih.exch_type, 37) # exchange informational
948 self.assertIn("Response", ih.flags)
949 self.assertIn("Initiator", ih.flags)
950 plain = self.sa.hmac_and_decrypt(ih)
951 self.assertEqual(plain, b"")
953 def initiate_del_sa_from_responder(self):
954 header = ikev2.IKEv2(
955 init_SPI=self.sa.ispi,
956 resp_SPI=self.sa.rspi,
957 exch_type="INFORMATIONAL",
958 id=self.sa.new_msg_id(),
960 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
961 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
962 packet = self.create_packet(
963 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
965 self.pg0.add_stream(packet)
966 self.pg0.enable_capture()
968 capture = self.pg0.get_capture(1)
969 self.verify_del_sa(capture[0])
972 def find_notify_payload(packet, notify_type):
973 n = packet[ikev2.IKEv2_payload_Notify]
975 if n.type == notify_type:
980 def verify_nat_detection(self, packet):
987 # NAT_DETECTION_SOURCE_IP
988 s = self.find_notify_payload(packet, 16388)
989 self.assertIsNotNone(s)
990 src_sha = self.sa.compute_nat_sha1(
991 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
993 self.assertEqual(s.load, src_sha)
995 # NAT_DETECTION_DESTINATION_IP
996 s = self.find_notify_payload(packet, 16389)
997 self.assertIsNotNone(s)
998 dst_sha = self.sa.compute_nat_sha1(
999 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1001 self.assertEqual(s.load, dst_sha)
1003 def verify_sa_init_request(self, packet):
1005 self.sa.dport = udp.sport
1006 ih = packet[ikev2.IKEv2]
1007 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1008 self.assertEqual(ih.exch_type, 34) # SA_INIT
1009 self.sa.ispi = ih.init_SPI
1010 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1011 self.assertIn("Initiator", ih.flags)
1012 self.assertNotIn("Response", ih.flags)
1013 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1014 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1016 prop = packet[ikev2.IKEv2_payload_Proposal]
1017 self.assertEqual(prop.proto, 1) # proto = ikev2
1018 self.assertEqual(prop.proposal, 1)
1019 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1021 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1023 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1024 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1025 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1026 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1028 self.verify_nat_detection(packet)
1029 self.sa.set_ike_props(
1030 crypto="AES-GCM-16ICV",
1033 prf="PRF_HMAC_SHA2_256",
1036 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1037 self.sa.generate_dh_data()
1038 self.sa.complete_dh_data()
1041 def update_esp_transforms(self, trans, sa):
1043 if trans.transform_type == 1: # ecryption
1044 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1045 elif trans.transform_type == 3: # integrity
1046 sa.esp_integ = INTEG_IDS[trans.transform_id]
1047 trans = trans.payload
1049 def verify_sa_auth_req(self, packet):
1051 self.sa.dport = udp.sport
1052 ih = self.get_ike_header(packet)
1053 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1054 self.assertEqual(ih.init_SPI, self.sa.ispi)
1055 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1056 self.assertIn("Initiator", ih.flags)
1057 self.assertNotIn("Response", ih.flags)
1060 self.verify_udp(udp)
1061 self.assertEqual(ih.id, self.sa.msg_id + 1)
1063 plain = self.sa.hmac_and_decrypt(ih)
1064 idi = ikev2.IKEv2_payload_IDi(plain)
1065 self.assertEqual(idi.load, self.sa.i_id)
1066 if self.no_idr_auth:
1067 self.assertEqual(idi.next_payload, 39) # AUTH
1069 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1070 self.assertEqual(idr.load, self.sa.r_id)
1071 prop = idi[ikev2.IKEv2_payload_Proposal]
1072 c = self.sa.child_sas[0]
1074 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1076 def send_init_response(self):
1077 tr_attr = self.sa.ike_crypto_attr()
1079 ikev2.IKEv2_payload_Transform(
1080 transform_type="Encryption",
1081 transform_id=self.sa.ike_crypto,
1083 key_length=tr_attr[0],
1085 / ikev2.IKEv2_payload_Transform(
1086 transform_type="Integrity", transform_id=self.sa.ike_integ
1088 / ikev2.IKEv2_payload_Transform(
1089 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1091 / ikev2.IKEv2_payload_Transform(
1092 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1095 props = ikev2.IKEv2_payload_Proposal(
1096 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1099 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1101 dst_address = b"\x0a\x0a\x0a\x0a"
1103 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1104 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1105 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1107 self.sa.init_resp_packet = (
1109 init_SPI=self.sa.ispi,
1110 resp_SPI=self.sa.rspi,
1111 exch_type="IKE_SA_INIT",
1114 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1115 / ikev2.IKEv2_payload_KE(
1116 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1118 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1119 / ikev2.IKEv2_payload_Notify(
1120 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1122 / ikev2.IKEv2_payload_Notify(
1123 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1127 ike_msg = self.create_packet(
1129 self.sa.init_resp_packet,
1135 self.pg_send(self.pg0, ike_msg)
1136 capture = self.pg0.get_capture(1)
1137 self.verify_sa_auth_req(capture[0])
1139 def initiate_sa_init(self):
1140 self.pg0.enable_capture()
1142 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1144 capture = self.pg0.get_capture(1)
1145 self.verify_sa_init_request(capture[0])
1146 self.send_init_response()
1148 def send_auth_response(self):
1149 tr_attr = self.sa.esp_crypto_attr()
1151 ikev2.IKEv2_payload_Transform(
1152 transform_type="Encryption",
1153 transform_id=self.sa.esp_crypto,
1155 key_length=tr_attr[0],
1157 / ikev2.IKEv2_payload_Transform(
1158 transform_type="Integrity", transform_id=self.sa.esp_integ
1160 / ikev2.IKEv2_payload_Transform(
1161 transform_type="Extended Sequence Number", transform_id="No ESN"
1163 / ikev2.IKEv2_payload_Transform(
1164 transform_type="Extended Sequence Number", transform_id="ESN"
1168 c = self.sa.child_sas[0]
1169 props = ikev2.IKEv2_payload_Proposal(
1170 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1173 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1175 ikev2.IKEv2_payload_IDi(
1176 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1178 / ikev2.IKEv2_payload_IDr(
1179 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1181 / ikev2.IKEv2_payload_AUTH(
1183 auth_type=AuthMethod.value(self.sa.auth_method),
1184 load=self.sa.auth_data,
1186 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1187 / ikev2.IKEv2_payload_TSi(
1188 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1190 / ikev2.IKEv2_payload_TSr(
1191 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1193 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1196 header = ikev2.IKEv2(
1197 init_SPI=self.sa.ispi,
1198 resp_SPI=self.sa.rspi,
1199 id=self.sa.new_msg_id(),
1201 exch_type="IKE_AUTH",
1204 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1205 packet = self.create_packet(
1206 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1208 self.pg_send(self.pg0, packet)
1210 def test_initiator(self):
1211 self.initiate_sa_init()
1213 self.sa.calc_child_keys()
1214 self.send_auth_response()
1215 self.verify_ike_sas()
1218 class TemplateResponder(IkePeer):
1219 """responder test template"""
1221 def initiate_del_sa_from_responder(self):
1222 self.pg0.enable_capture()
1224 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1225 capture = self.pg0.get_capture(1)
1226 ih = self.get_ike_header(capture[0])
1227 self.assertNotIn("Response", ih.flags)
1228 self.assertNotIn("Initiator", ih.flags)
1229 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1230 plain = self.sa.hmac_and_decrypt(ih)
1231 d = ikev2.IKEv2_payload_Delete(plain)
1232 self.assertEqual(d.proto, 1) # proto=IKEv2
1233 self.assertEqual(ih.init_SPI, self.sa.ispi)
1234 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1235 header = ikev2.IKEv2(
1236 init_SPI=self.sa.ispi,
1237 resp_SPI=self.sa.rspi,
1238 flags="Initiator+Response",
1239 exch_type="INFORMATIONAL",
1241 next_payload="Encrypted",
1243 resp = self.encrypt_ike_msg(header, b"", None)
1244 self.send_and_assert_no_replies(self.pg0, resp)
1246 def verify_del_sa(self, packet):
1247 ih = self.get_ike_header(packet)
1248 self.assertEqual(ih.id, self.sa.msg_id)
1249 self.assertEqual(ih.exch_type, 37) # exchange informational
1250 self.assertIn("Response", ih.flags)
1251 self.assertNotIn("Initiator", ih.flags)
1252 self.assertEqual(ih.next_payload, 46) # Encrypted
1253 self.assertEqual(ih.init_SPI, self.sa.ispi)
1254 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1255 plain = self.sa.hmac_and_decrypt(ih)
1256 self.assertEqual(plain, b"")
1258 def initiate_del_sa_from_initiator(self):
1259 header = ikev2.IKEv2(
1260 init_SPI=self.sa.ispi,
1261 resp_SPI=self.sa.rspi,
1263 exch_type="INFORMATIONAL",
1264 id=self.sa.new_msg_id(),
1266 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1267 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1268 packet = self.create_packet(
1269 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1271 self.pg0.add_stream(packet)
1272 self.pg0.enable_capture()
1274 capture = self.pg0.get_capture(1)
1275 self.verify_del_sa(capture[0])
1277 def send_sa_init_req(self):
1278 tr_attr = self.sa.ike_crypto_attr()
1280 ikev2.IKEv2_payload_Transform(
1281 transform_type="Encryption",
1282 transform_id=self.sa.ike_crypto,
1284 key_length=tr_attr[0],
1286 / ikev2.IKEv2_payload_Transform(
1287 transform_type="Integrity", transform_id=self.sa.ike_integ
1289 / ikev2.IKEv2_payload_Transform(
1290 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1292 / ikev2.IKEv2_payload_Transform(
1293 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1297 props = ikev2.IKEv2_payload_Proposal(
1298 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1301 next_payload = None if self.ip6 else "Notify"
1303 self.sa.init_req_packet = (
1305 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1307 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1308 / ikev2.IKEv2_payload_KE(
1309 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1311 / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1316 src_address = b"\x0a\x0a\x0a\x01"
1318 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1321 dst_address = b"\x0a\x0a\x0a\x0a"
1323 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1325 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1326 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1327 nat_src_detection = ikev2.IKEv2_payload_Notify(
1328 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1330 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1331 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1333 self.sa.init_req_packet = (
1334 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1337 ike_msg = self.create_packet(
1339 self.sa.init_req_packet,
1345 self.pg0.add_stream(ike_msg)
1346 self.pg0.enable_capture()
1348 capture = self.pg0.get_capture(1)
1349 self.verify_sa_init(capture[0])
1351 def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1352 tr_attr = self.sa.esp_crypto_attr()
1353 last_payload = last_payload or "Notify"
1356 ikev2.IKEv2_payload_Transform(
1357 transform_type="Encryption",
1358 transform_id=self.sa.esp_crypto,
1360 key_length=tr_attr[0],
1362 / ikev2.IKEv2_payload_Transform(
1363 transform_type="Integrity", transform_id=self.sa.esp_integ
1365 / ikev2.IKEv2_payload_Transform(
1366 transform_type="Extended Sequence Number", transform_id="No ESN"
1368 / ikev2.IKEv2_payload_Transform(
1369 transform_type="Extended Sequence Number", transform_id="ESN"
1375 trans /= ikev2.IKEv2_payload_Transform(
1376 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1379 c = self.sa.child_sas[0]
1380 props = ikev2.IKEv2_payload_Proposal(
1389 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1391 ikev2.IKEv2_payload_AUTH(
1393 auth_type=AuthMethod.value(self.sa.auth_method),
1394 load=self.sa.auth_data,
1396 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1397 / ikev2.IKEv2_payload_TSi(
1398 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1400 / ikev2.IKEv2_payload_TSr(
1401 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1406 first_payload = "Nonce"
1408 head = ikev2.IKEv2_payload_Nonce(
1409 load=self.sa.i_nonce, next_payload="KE"
1410 ) / ikev2.IKEv2_payload_KE(
1411 group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1414 head = ikev2.IKEv2_payload_Nonce(
1415 load=self.sa.i_nonce, next_payload="SA"
1420 / ikev2.IKEv2_payload_Notify(
1424 length=8 + len(c.ispi),
1425 next_payload="Notify",
1427 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1430 first_payload = "IDi"
1431 if self.no_idr_auth:
1432 ids = ikev2.IKEv2_payload_IDi(
1433 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1436 ids = ikev2.IKEv2_payload_IDi(
1437 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1438 ) / ikev2.IKEv2_payload_IDr(
1439 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1442 return plain, first_payload
1444 def send_sa_auth(self):
1445 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1446 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1447 header = ikev2.IKEv2(
1448 init_SPI=self.sa.ispi,
1449 resp_SPI=self.sa.rspi,
1450 id=self.sa.new_msg_id(),
1452 exch_type="IKE_AUTH",
1455 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1456 packet = self.create_packet(
1457 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1459 self.pg0.add_stream(packet)
1460 self.pg0.enable_capture()
1462 capture = self.pg0.get_capture(1)
1463 self.verify_sa_auth_resp(capture[0])
1465 def verify_sa_init(self, packet):
1466 ih = self.get_ike_header(packet)
1468 self.assertEqual(ih.id, self.sa.msg_id)
1469 self.assertEqual(ih.exch_type, 34)
1470 self.assertIn("Response", ih.flags)
1471 self.assertEqual(ih.init_SPI, self.sa.ispi)
1472 self.assertNotEqual(ih.resp_SPI, 0)
1473 self.sa.rspi = ih.resp_SPI
1475 sa = ih[ikev2.IKEv2_payload_SA]
1476 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1477 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1478 except IndexError as e:
1479 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1480 self.logger.error(ih.show())
1482 self.sa.complete_dh_data()
1486 def verify_sa_auth_resp(self, packet):
1487 ike = self.get_ike_header(packet)
1489 self.verify_udp(udp)
1490 self.assertEqual(ike.id, self.sa.msg_id)
1491 plain = self.sa.hmac_and_decrypt(ike)
1492 idr = ikev2.IKEv2_payload_IDr(plain)
1493 prop = idr[ikev2.IKEv2_payload_Proposal]
1494 self.assertEqual(prop.SPIsize, 4)
1495 self.sa.child_sas[0].rspi = prop.SPI
1496 self.sa.calc_child_keys()
1498 IKE_NODE_SUFFIX = "ip4"
1500 def verify_counters(self):
1501 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1502 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1503 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1505 r = self.vapi.ikev2_sa_dump()
1507 self.assertEqual(1, s.n_sa_auth_req)
1508 self.assertEqual(1, s.n_sa_init_req)
1510 def test_responder(self):
1511 self.send_sa_init_req()
1513 self.verify_ipsec_sas()
1514 self.verify_ike_sas()
1515 self.verify_counters()
1518 class Ikev2Params(object):
1519 def config_params(self, params={}):
1520 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1521 ei = VppEnum.vl_api_ipsec_integ_alg_t
1523 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1524 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1525 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1526 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1527 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1528 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1529 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1530 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1531 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1532 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1535 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1537 self.vapi.cli("ikev2 dpd disable")
1538 self.del_sa_from_responder = (
1540 if "del_sa_from_responder" not in params
1541 else params["del_sa_from_responder"]
1543 i_natt = False if "i_natt" not in params else params["i_natt"]
1544 r_natt = False if "r_natt" not in params else params["r_natt"]
1545 self.p = Profile(self, "pr1")
1546 self.ip6 = False if "ip6" not in params else params["ip6"]
1548 if "auth" in params and params["auth"] == "rsa-sig":
1549 auth_method = "rsa-sig"
1550 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1551 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1553 client_file = work_dir + params["client-cert"]
1554 server_pem = open(work_dir + params["server-cert"]).read()
1555 client_priv = open(work_dir + params["client-key"]).read()
1556 client_priv = load_pem_private_key(
1557 str.encode(client_priv), None, default_backend()
1559 self.peer_cert = x509.load_pem_x509_certificate(
1560 str.encode(server_pem), default_backend()
1562 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1565 auth_data = b"$3cr3tpa$$w0rd"
1566 self.p.add_auth(method="shared-key", data=auth_data)
1567 auth_method = "shared-key"
1570 is_init = True if "is_initiator" not in params else params["is_initiator"]
1571 self.no_idr_auth = params.get("no_idr_in_auth", False)
1573 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1574 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1575 r_id = self.idr = idr["data"]
1576 i_id = self.idi = idi["data"]
1578 # scapy is initiator, VPP is responder
1579 self.p.add_local_id(**idr)
1580 self.p.add_remote_id(**idi)
1581 if self.no_idr_auth:
1584 # VPP is initiator, scapy is responder
1585 self.p.add_local_id(**idi)
1586 if not self.no_idr_auth:
1587 self.p.add_remote_id(**idr)
1590 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1591 if "loc_ts" not in params
1592 else params["loc_ts"]
1595 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1596 if "rem_ts" not in params
1597 else params["rem_ts"]
1599 self.p.add_local_ts(**loc_ts)
1600 self.p.add_remote_ts(**rem_ts)
1601 if "responder" in params:
1602 self.p.add_responder(params["responder"])
1603 if "ike_transforms" in params:
1604 self.p.add_ike_transforms(params["ike_transforms"])
1605 if "esp_transforms" in params:
1606 self.p.add_esp_transforms(params["esp_transforms"])
1608 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1610 self.p.set_udp_encap(True)
1612 if "responder_hostname" in params:
1613 hn = params["responder_hostname"]
1614 self.p.add_responder_hostname(hn)
1616 # configure static dns record
1617 self.vapi.dns_name_server_add_del(
1618 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1620 self.vapi.dns_enable_disable(enable=1)
1622 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1629 is_initiator=is_init,
1630 id_type=self.p.local_id["id_type"],
1633 priv_key=client_priv,
1634 auth_method=auth_method,
1635 nonce=params.get("nonce"),
1636 auth_data=auth_data,
1637 udp_encap=udp_encap,
1638 local_ts=self.p.remote_ts,
1639 remote_ts=self.p.local_ts,
1644 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1647 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1649 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1652 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1655 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1658 self.sa.set_ike_props(
1659 crypto=ike_crypto[0],
1660 crypto_key_len=ike_crypto[1],
1662 prf="PRF_HMAC_SHA2_256",
1665 self.sa.set_esp_props(
1666 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1670 class TestApi(VppTestCase):
1671 """Test IKEV2 API"""
1674 def setUpClass(cls):
1675 super(TestApi, cls).setUpClass()
1678 def tearDownClass(cls):
1679 super(TestApi, cls).tearDownClass()
1682 super(TestApi, self).tearDown()
1683 self.p1.remove_vpp_config()
1684 self.p2.remove_vpp_config()
1685 r = self.vapi.ikev2_profile_dump()
1686 self.assertEqual(len(r), 0)
1688 def configure_profile(self, cfg):
1689 p = Profile(self, cfg["name"])
1690 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1691 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1692 p.add_local_ts(**cfg["loc_ts"])
1693 p.add_remote_ts(**cfg["rem_ts"])
1694 p.add_responder(cfg["responder"])
1695 p.add_ike_transforms(cfg["ike_ts"])
1696 p.add_esp_transforms(cfg["esp_ts"])
1697 p.add_auth(**cfg["auth"])
1698 p.set_udp_encap(cfg["udp_encap"])
1699 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1700 if "lifetime_data" in cfg:
1701 p.set_lifetime_data(cfg["lifetime_data"])
1702 if "tun_itf" in cfg:
1703 p.set_tunnel_interface(cfg["tun_itf"])
1704 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1709 def test_profile_api(self):
1710 """test profile dump API"""
1715 "start_addr": "3.3.3.2",
1716 "end_addr": "3.3.3.3",
1722 "start_addr": "4.5.76.80",
1723 "end_addr": "2.3.4.6",
1730 "start_addr": "ab::1",
1731 "end_addr": "ab::4",
1737 "start_addr": "cd::12",
1738 "end_addr": "cd::13",
1744 "natt_disabled": True,
1745 "loc_id": ("fqdn", b"vpp.home"),
1746 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1749 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1752 "crypto_key_size": 32,
1756 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1757 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1759 "ipsec_over_udp_port": 4501,
1762 "lifetime_maxdata": 20192,
1763 "lifetime_jitter": 9,
1769 "loc_id": ("ip4-addr", b"192.168.2.1"),
1770 "rem_id": ("ip6-addr", b"abcd::1"),
1773 "responder": {"sw_if_index": 4, "addr": "def::10"},
1776 "crypto_key_size": 16,
1780 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1781 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1783 "ipsec_over_udp_port": 4600,
1787 self.p1 = self.configure_profile(conf["p1"])
1788 self.p2 = self.configure_profile(conf["p2"])
1790 r = self.vapi.ikev2_profile_dump()
1791 self.assertEqual(len(r), 2)
1792 self.verify_profile(r[0].profile, conf["p1"])
1793 self.verify_profile(r[1].profile, conf["p2"])
1795 def verify_id(self, api_id, cfg_id):
1796 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1797 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1799 def verify_ts(self, api_ts, cfg_ts):
1800 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1801 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1802 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1803 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1804 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1806 def verify_responder(self, api_r, cfg_r):
1807 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1808 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1810 def verify_transforms(self, api_ts, cfg_ts):
1811 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1812 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1813 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1815 def verify_ike_transforms(self, api_ts, cfg_ts):
1816 self.verify_transforms(api_ts, cfg_ts)
1817 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1819 def verify_esp_transforms(self, api_ts, cfg_ts):
1820 self.verify_transforms(api_ts, cfg_ts)
1822 def verify_auth(self, api_auth, cfg_auth):
1823 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1824 self.assertEqual(api_auth.data, cfg_auth["data"])
1825 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1827 def verify_lifetime_data(self, p, ld):
1828 self.assertEqual(p.lifetime, ld["lifetime"])
1829 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1830 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1831 self.assertEqual(p.handover, ld["handover"])
1833 def verify_profile(self, ap, cp):
1834 self.assertEqual(ap.name, cp["name"])
1835 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1836 self.verify_id(ap.loc_id, cp["loc_id"])
1837 self.verify_id(ap.rem_id, cp["rem_id"])
1838 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1839 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1840 self.verify_responder(ap.responder, cp["responder"])
1841 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1842 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1843 self.verify_auth(ap.auth, cp["auth"])
1844 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1845 self.assertTrue(natt_dis == ap.natt_disabled)
1847 if "lifetime_data" in cp:
1848 self.verify_lifetime_data(ap, cp["lifetime_data"])
1849 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1851 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1853 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1856 @tag_fixme_vpp_workers
1857 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1858 """test responder - responder behind NAT"""
1860 IKE_NODE_SUFFIX = "ip4-natt"
1862 def config_tc(self):
1863 self.config_params({"r_natt": True})
1866 @tag_fixme_vpp_workers
1867 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1868 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1870 def config_tc(self):
1874 "is_initiator": False, # seen from test case perspective
1875 # thus vpp is initiator
1877 "sw_if_index": self.pg0.sw_if_index,
1878 "addr": self.pg0.remote_ip4,
1880 "ike-crypto": ("AES-GCM-16ICV", 32),
1881 "ike-integ": "NULL",
1882 "ike-dh": "3072MODPgr",
1884 "crypto_alg": 20, # "aes-gcm-16"
1885 "crypto_key_size": 256,
1886 "dh_group": 15, # "modp-3072"
1889 "crypto_alg": 12, # "aes-cbc"
1890 "crypto_key_size": 256,
1891 # "hmac-sha2-256-128"
1898 @tag_fixme_vpp_workers
1899 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1900 """test ikev2 initiator - pre shared key auth"""
1902 def config_tc(self):
1905 "is_initiator": False, # seen from test case perspective
1906 # thus vpp is initiator
1907 "ike-crypto": ("AES-GCM-16ICV", 32),
1908 "ike-integ": "NULL",
1909 "ike-dh": "3072MODPgr",
1911 "crypto_alg": 20, # "aes-gcm-16"
1912 "crypto_key_size": 256,
1913 "dh_group": 15, # "modp-3072"
1916 "crypto_alg": 12, # "aes-cbc"
1917 "crypto_key_size": 256,
1918 # "hmac-sha2-256-128"
1921 "responder_hostname": {
1922 "hostname": "vpp.responder.org",
1923 "sw_if_index": self.pg0.sw_if_index,
1929 @tag_fixme_vpp_workers
1930 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1931 """test initiator - request window size (1)"""
1933 def rekey_respond(self, req, update_child_sa_data):
1934 ih = self.get_ike_header(req)
1935 plain = self.sa.hmac_and_decrypt(ih)
1936 sa = ikev2.IKEv2_payload_SA(plain)
1937 if update_child_sa_data:
1938 prop = sa[ikev2.IKEv2_payload_Proposal]
1939 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1940 self.sa.r_nonce = self.sa.i_nonce
1941 self.sa.child_sas[0].ispi = prop.SPI
1942 self.sa.child_sas[0].rspi = prop.SPI
1943 self.sa.calc_child_keys()
1945 header = ikev2.IKEv2(
1946 init_SPI=self.sa.ispi,
1947 resp_SPI=self.sa.rspi,
1951 next_payload="Encrypted",
1953 resp = self.encrypt_ike_msg(header, sa, "SA")
1954 packet = self.create_packet(
1955 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1957 self.send_and_assert_no_replies(self.pg0, packet)
1959 def test_initiator(self):
1960 super(TestInitiatorRequestWindowSize, self).test_initiator()
1961 self.pg0.enable_capture()
1963 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1964 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1965 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1966 capture = self.pg0.get_capture(2)
1968 # reply in reverse order
1969 self.rekey_respond(capture[1], True)
1970 self.rekey_respond(capture[0], False)
1972 # verify that only the second request was accepted
1973 self.verify_ike_sas()
1974 self.verify_ipsec_sas(is_rekey=True)
1977 @tag_fixme_vpp_workers
1978 class TestInitiatorRekey(TestInitiatorPsk):
1979 """test ikev2 initiator - rekey"""
1981 def rekey_from_initiator(self):
1982 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1983 self.pg0.enable_capture()
1985 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1986 capture = self.pg0.get_capture(1)
1987 ih = self.get_ike_header(capture[0])
1988 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1989 self.assertNotIn("Response", ih.flags)
1990 self.assertIn("Initiator", ih.flags)
1991 plain = self.sa.hmac_and_decrypt(ih)
1992 sa = ikev2.IKEv2_payload_SA(plain)
1993 prop = sa[ikev2.IKEv2_payload_Proposal]
1994 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1995 self.sa.r_nonce = self.sa.i_nonce
1996 # update new responder SPI
1997 self.sa.child_sas[0].ispi = prop.SPI
1998 self.sa.child_sas[0].rspi = prop.SPI
1999 self.sa.calc_child_keys()
2000 header = ikev2.IKEv2(
2001 init_SPI=self.sa.ispi,
2002 resp_SPI=self.sa.rspi,
2006 next_payload="Encrypted",
2008 resp = self.encrypt_ike_msg(header, sa, "SA")
2009 packet = self.create_packet(
2010 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2012 self.send_and_assert_no_replies(self.pg0, packet)
2014 def test_initiator(self):
2015 super(TestInitiatorRekey, self).test_initiator()
2016 self.rekey_from_initiator()
2017 self.verify_ike_sas()
2018 self.verify_ipsec_sas(is_rekey=True)
2021 @tag_fixme_vpp_workers
2022 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2023 """test ikev2 initiator - delete IKE SA from responder"""
2025 def config_tc(self):
2028 "del_sa_from_responder": True,
2029 "is_initiator": False, # seen from test case perspective
2030 # thus vpp is initiator
2032 "sw_if_index": self.pg0.sw_if_index,
2033 "addr": self.pg0.remote_ip4,
2035 "ike-crypto": ("AES-GCM-16ICV", 32),
2036 "ike-integ": "NULL",
2037 "ike-dh": "3072MODPgr",
2039 "crypto_alg": 20, # "aes-gcm-16"
2040 "crypto_key_size": 256,
2041 "dh_group": 15, # "modp-3072"
2044 "crypto_alg": 12, # "aes-cbc"
2045 "crypto_key_size": 256,
2046 # "hmac-sha2-256-128"
2049 "no_idr_in_auth": True,
2054 @tag_fixme_vpp_workers
2055 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2056 """test ikev2 responder - initiator behind NAT"""
2058 IKE_NODE_SUFFIX = "ip4-natt"
2060 def config_tc(self):
2061 self.config_params({"i_natt": True})
2064 @tag_fixme_vpp_workers
2065 class TestResponderPsk(TemplateResponder, Ikev2Params):
2066 """test ikev2 responder - pre shared key auth"""
2068 def config_tc(self):
2069 self.config_params()
2072 @tag_fixme_vpp_workers
2073 class TestResponderDpd(TestResponderPsk):
2075 Dead peer detection test
2078 def config_tc(self):
2079 self.config_params({"dpd_disabled": False})
2084 def test_responder(self):
2085 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2086 super(TestResponderDpd, self).test_responder()
2087 self.pg0.enable_capture()
2089 # capture empty request but don't reply
2090 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2091 ih = self.get_ike_header(capture[0])
2092 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2093 plain = self.sa.hmac_and_decrypt(ih)
2094 self.assertEqual(plain, b"")
2095 # wait for SA expiration
2097 ike_sas = self.vapi.ikev2_sa_dump()
2098 self.assertEqual(len(ike_sas), 0)
2099 ipsec_sas = self.vapi.ipsec_sa_dump()
2100 self.assertEqual(len(ipsec_sas), 0)
2103 @tag_fixme_vpp_workers
2104 class TestResponderRekey(TestResponderPsk):
2105 """test ikev2 responder - rekey"""
2109 def send_rekey_from_initiator(self):
2111 self.sa.generate_dh_data()
2112 packet = self.create_rekey_request(kex=self.WITH_KEX)
2113 self.pg0.add_stream(packet)
2114 self.pg0.enable_capture()
2116 capture = self.pg0.get_capture(1)
2119 def process_rekey_response(self, capture):
2120 ih = self.get_ike_header(capture[0])
2121 plain = self.sa.hmac_and_decrypt(ih)
2122 sa = ikev2.IKEv2_payload_SA(plain)
2123 prop = sa[ikev2.IKEv2_payload_Proposal]
2124 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2125 # update new responder SPI
2126 self.sa.child_sas[0].rspi = prop.SPI
2128 self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2129 self.sa.complete_dh_data()
2130 self.sa.calc_child_keys(kex=self.WITH_KEX)
2132 def test_responder(self):
2133 super(TestResponderRekey, self).test_responder()
2134 self.process_rekey_response(self.send_rekey_from_initiator())
2135 self.verify_ike_sas()
2136 self.verify_ipsec_sas(is_rekey=True)
2137 self.assert_counter(1, "rekey_req", "ip4")
2138 r = self.vapi.ikev2_sa_dump()
2139 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2142 @tag_fixme_vpp_workers
2143 class TestResponderRekeyRepeat(TestResponderRekey):
2144 """test ikev2 responder - rekey repeat"""
2146 def test_responder(self):
2147 super(TestResponderRekeyRepeat, self).test_responder()
2148 # rekey request is not accepted until old IPsec SA is expired
2149 capture = self.send_rekey_from_initiator()
2150 ih = self.get_ike_header(capture[0])
2151 plain = self.sa.hmac_and_decrypt(ih)
2152 notify = ikev2.IKEv2_payload_Notify(plain)
2153 self.assertEqual(notify.type, 43)
2154 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2155 # rekey request is accepted after old IPsec SA was expired
2157 if len(self.vapi.ipsec_sa_dump()) != 3:
2161 self.fail("old IPsec SA not expired")
2162 self.process_rekey_response(self.send_rekey_from_initiator())
2163 self.verify_ike_sas()
2164 self.verify_ipsec_sas(sa_count=3)
2167 @tag_fixme_vpp_workers
2168 class TestResponderRekeyKEX(TestResponderRekey):
2169 """test ikev2 responder - rekey with key exchange"""
2174 @tag_fixme_vpp_workers
2175 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2176 """test ikev2 responder - rekey repeat with key exchange"""
2181 @tag_fixme_ubuntu2204
2183 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2184 """test ikev2 responder - non-default table id"""
2187 def setUpClass(cls):
2188 import scapy.contrib.ikev2 as _ikev2
2190 globals()["ikev2"] = _ikev2
2191 super(IkePeer, cls).setUpClass()
2192 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2196 cls.create_pg_interfaces(range(1))
2197 cls.vapi.cli("ip table add 1")
2198 cls.vapi.cli("set interface ip table pg0 1")
2199 for i in cls.pg_interfaces:
2206 def config_tc(self):
2207 self.config_params({"dpd_disabled": False})
2209 def test_responder(self):
2210 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2211 super(TestResponderVrf, self).test_responder()
2212 self.pg0.enable_capture()
2214 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2215 ih = self.get_ike_header(capture[0])
2216 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2217 plain = self.sa.hmac_and_decrypt(ih)
2218 self.assertEqual(plain, b"")
2221 @tag_fixme_vpp_workers
2222 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2223 """test ikev2 responder - cert based auth"""
2225 def config_tc(self):
2230 "server-key": "server-key.pem",
2231 "client-key": "client-key.pem",
2232 "client-cert": "client-cert.pem",
2233 "server-cert": "server-cert.pem",
2238 @tag_fixme_vpp_workers
2239 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2240 TemplateResponder, Ikev2Params
2243 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2246 def config_tc(self):
2249 "ike-crypto": ("AES-CBC", 16),
2250 "ike-integ": "SHA2-256-128",
2251 "esp-crypto": ("AES-CBC", 24),
2252 "esp-integ": "SHA2-384-192",
2253 "ike-dh": "2048MODPgr",
2254 "nonce": os.urandom(256),
2255 "no_idr_in_auth": True,
2260 @tag_fixme_vpp_workers
2261 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2262 TemplateResponder, Ikev2Params
2266 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2269 def config_tc(self):
2272 "ike-crypto": ("AES-CBC", 32),
2273 "ike-integ": "SHA2-256-128",
2274 "esp-crypto": ("AES-GCM-16ICV", 32),
2275 "esp-integ": "NULL",
2276 "ike-dh": "3072MODPgr",
2281 @tag_fixme_vpp_workers
2282 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2287 IKE_NODE_SUFFIX = "ip6"
2289 def config_tc(self):
2292 "del_sa_from_responder": True,
2295 "ike-crypto": ("AES-GCM-16ICV", 32),
2296 "ike-integ": "NULL",
2297 "ike-dh": "2048MODPgr",
2298 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2299 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2304 @tag_fixme_vpp_workers
2305 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2307 Test for keep alive messages
2310 def send_empty_req_from_responder(self):
2311 packet = self.create_empty_request()
2312 self.pg0.add_stream(packet)
2313 self.pg0.enable_capture()
2315 capture = self.pg0.get_capture(1)
2316 ih = self.get_ike_header(capture[0])
2317 self.assertEqual(ih.id, self.sa.msg_id)
2318 plain = self.sa.hmac_and_decrypt(ih)
2319 self.assertEqual(plain, b"")
2320 self.assert_counter(1, "keepalive", "ip4")
2321 r = self.vapi.ikev2_sa_dump()
2322 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2324 def test_initiator(self):
2325 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2326 self.send_empty_req_from_responder()
2329 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2330 """malformed packet test"""
2335 def config_tc(self):
2336 self.config_params()
2338 def create_ike_init_msg(self, length=None, payload=None):
2341 init_SPI="\x11" * 8,
2343 exch_type="IKE_SA_INIT",
2345 if payload is not None:
2347 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2349 def verify_bad_packet_length(self):
2350 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2351 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2352 self.assert_counter(self.pkt_count, "bad_length")
2354 def verify_bad_sa_payload_length(self):
2355 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2356 ike_msg = self.create_ike_init_msg(payload=p)
2357 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2358 self.assert_counter(self.pkt_count, "malformed_packet")
2360 def test_responder(self):
2361 self.pkt_count = 254
2362 self.verify_bad_packet_length()
2363 self.verify_bad_sa_payload_length()
2366 if __name__ == "__main__":
2367 unittest.main(testRunner=VppTestRunner)