3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
32 KEY_PAD = b"Key Pad for IKEv2"
39 # tuple structure is (p, g, key_len)
44 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
45 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
46 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
47 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
48 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
49 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
50 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
51 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
52 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
53 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
54 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
62 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
63 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
64 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
65 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
66 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
67 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
68 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
69 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
70 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
71 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
72 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
73 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
74 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
75 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
76 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
77 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
85 class CryptoAlgo(object):
86 def __init__(self, name, cipher, mode):
90 if self.cipher is not None:
91 self.bs = self.cipher.block_size // 8
93 if self.name == "AES-GCM-16ICV":
94 self.iv_len = GCM_IV_SIZE
98 def encrypt(self, data, key, aad=None):
99 iv = os.urandom(self.iv_len)
102 self.cipher(key), self.mode(iv), default_backend()
104 return iv + encryptor.update(data) + encryptor.finalize()
106 salt = key[-SALT_SIZE:]
109 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
111 encryptor.authenticate_additional_data(aad)
112 data = encryptor.update(data) + encryptor.finalize()
113 data += encryptor.tag[:GCM_ICV_SIZE]
116 def decrypt(self, data, key, aad=None, icv=None):
118 iv = data[: self.iv_len]
119 ct = data[self.iv_len :]
121 algorithms.AES(key), self.mode(iv), default_backend()
123 return decryptor.update(ct) + decryptor.finalize()
125 salt = key[-SALT_SIZE:]
126 nonce = salt + data[:GCM_IV_SIZE]
127 ct = data[GCM_IV_SIZE:]
128 key = key[:-SALT_SIZE]
130 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
132 decryptor.authenticate_additional_data(aad)
133 return decryptor.update(ct) + decryptor.finalize()
136 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
137 data = data + b"\x00" * (pad_len - 1)
138 return data + bytes([pad_len - 1])
141 class AuthAlgo(object):
142 def __init__(self, name, mac, mod, key_len, trunc_len=None):
146 self.key_len = key_len
147 self.trunc_len = trunc_len or key_len
151 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
152 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
153 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
157 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
158 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
159 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
160 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
161 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
165 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
182 class IKEv2ChildSA(object):
183 def __init__(self, local_ts, remote_ts, is_initiator):
191 self.local_ts = local_ts
192 self.remote_ts = remote_ts
195 class IKEv2SA(object):
202 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
208 auth_method="shared-key",
214 self.udp_encap = udp_encap
224 self.dh_params = None
226 self.priv_key = priv_key
227 self.is_initiator = is_initiator
228 nonce = nonce or os.urandom(32)
229 self.auth_data = auth_data
232 if isinstance(id_type, str):
233 self.id_type = IDType.value(id_type)
235 self.id_type = id_type
236 self.auth_method = auth_method
237 if self.is_initiator:
238 self.rspi = 8 * b"\x00"
243 self.ispi = 8 * b"\x00"
245 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
247 def new_msg_id(self):
252 def my_dh_pub_key(self):
253 if self.is_initiator:
254 return self.i_dh_data
255 return self.r_dh_data
258 def peer_dh_pub_key(self):
259 if self.is_initiator:
260 return self.r_dh_data
261 return self.i_dh_data
265 return self.i_natt or self.r_natt
267 def compute_secret(self):
268 priv = self.dh_private_key
269 peer = self.peer_dh_pub_key
270 p, g, l = self.ike_group
272 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
275 def generate_dh_data(self):
277 if self.ike_dh not in DH:
278 raise NotImplementedError("%s not in DH group" % self.ike_dh)
280 if self.dh_params is None:
281 dhg = DH[self.ike_dh]
282 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
283 self.dh_params = pn.parameters(default_backend())
285 priv = self.dh_params.generate_private_key()
286 pub = priv.public_key()
287 x = priv.private_numbers().x
288 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
289 y = pub.public_numbers().y
291 if self.is_initiator:
292 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
294 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
296 def complete_dh_data(self):
297 self.dh_shared_secret = self.compute_secret()
299 def calc_child_keys(self, kex=False):
300 prf = self.ike_prf_alg.mod()
301 s = self.i_nonce + self.r_nonce
303 s = self.dh_shared_secret + s
304 c = self.child_sas[0]
306 encr_key_len = self.esp_crypto_key_len
307 integ_key_len = self.esp_integ_alg.key_len
308 salt_len = 0 if integ_key_len else 4
310 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
311 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
314 c.sk_ei = keymat[pos : pos + encr_key_len]
318 c.sk_ai = keymat[pos : pos + integ_key_len]
321 c.salt_ei = keymat[pos : pos + salt_len]
324 c.sk_er = keymat[pos : pos + encr_key_len]
328 c.sk_ar = keymat[pos : pos + integ_key_len]
331 c.salt_er = keymat[pos : pos + salt_len]
334 def calc_prfplus(self, prf, key, seed, length):
338 while len(r) < length and x < 255:
343 s = s + seed + bytes([x])
344 t = self.calc_prf(prf, key, s)
352 def calc_prf(self, prf, key, data):
353 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
358 prf = self.ike_prf_alg.mod()
359 # SKEYSEED = prf(Ni | Nr, g^ir)
360 s = self.i_nonce + self.r_nonce
361 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
363 # calculate S = Ni | Nr | SPIi SPIr
364 s = s + self.ispi + self.rspi
366 prf_key_trunc = self.ike_prf_alg.trunc_len
367 encr_key_len = self.ike_crypto_key_len
368 tr_prf_key_len = self.ike_prf_alg.key_len
369 integ_key_len = self.ike_integ_alg.key_len
370 if integ_key_len == 0:
382 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
385 self.sk_d = keymat[: pos + prf_key_trunc]
388 self.sk_ai = keymat[pos : pos + integ_key_len]
390 self.sk_ar = keymat[pos : pos + integ_key_len]
393 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
394 pos += encr_key_len + salt_size
395 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
396 pos += encr_key_len + salt_size
398 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
399 pos += tr_prf_key_len
400 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
402 def generate_authmsg(self, prf, packet):
403 if self.is_initiator:
411 data = bytes([self.id_type, 0, 0, 0]) + id
412 id_hash = self.calc_prf(prf, key, data)
413 return packet + nonce + id_hash
416 prf = self.ike_prf_alg.mod()
417 if self.is_initiator:
418 packet = self.init_req_packet
420 packet = self.init_resp_packet
421 authmsg = self.generate_authmsg(prf, raw(packet))
422 if self.auth_method == "shared-key":
423 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
424 self.auth_data = self.calc_prf(prf, psk, authmsg)
425 elif self.auth_method == "rsa-sig":
426 self.auth_data = self.priv_key.sign(
427 authmsg, padding.PKCS1v15(), hashes.SHA1()
430 raise TypeError("unknown auth method type!")
432 def encrypt(self, data, aad=None):
433 data = self.ike_crypto_alg.pad(data)
434 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
437 def peer_authkey(self):
438 if self.is_initiator:
443 def my_authkey(self):
444 if self.is_initiator:
449 def my_cryptokey(self):
450 if self.is_initiator:
455 def peer_cryptokey(self):
456 if self.is_initiator:
460 def concat(self, alg, key_len):
461 return alg + "-" + str(key_len * 8)
464 def vpp_ike_cypto_alg(self):
465 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
468 def vpp_esp_cypto_alg(self):
469 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
471 def verify_hmac(self, ikemsg):
472 integ_trunc = self.ike_integ_alg.trunc_len
473 exp_hmac = ikemsg[-integ_trunc:]
474 data = ikemsg[:-integ_trunc]
475 computed_hmac = self.compute_hmac(
476 self.ike_integ_alg.mod(), self.peer_authkey, data
478 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
480 def compute_hmac(self, integ, key, data):
481 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
485 def decrypt(self, data, aad=None, icv=None):
486 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
488 def hmac_and_decrypt(self, ike):
489 ep = ike[ikev2.IKEv2_payload_Encrypted]
490 if self.ike_crypto == "AES-GCM-16ICV":
491 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
492 ct = ep.load[:-GCM_ICV_SIZE]
493 tag = ep.load[-GCM_ICV_SIZE:]
494 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
496 self.verify_hmac(raw(ike))
497 integ_trunc = self.ike_integ_alg.trunc_len
499 # remove ICV and decrypt payload
500 ct = ep.load[:-integ_trunc]
501 plain = self.decrypt(ct)
504 return plain[: -pad_len - 1]
506 def build_ts_addr(self, ts, version):
508 "starting_address_v" + version: ts["start_addr"],
509 "ending_address_v" + version: ts["end_addr"],
512 def generate_ts(self, is_ip4):
513 c = self.child_sas[0]
514 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
516 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
517 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
518 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
519 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
521 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
522 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
523 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
524 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
526 if self.is_initiator:
527 return ([ts1], [ts2])
528 return ([ts2], [ts1])
530 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
531 if crypto not in CRYPTO_ALGOS:
532 raise TypeError("unsupported encryption algo %r" % crypto)
533 self.ike_crypto = crypto
534 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
535 self.ike_crypto_key_len = crypto_key_len
537 if integ not in AUTH_ALGOS:
538 raise TypeError("unsupported auth algo %r" % integ)
539 self.ike_integ = None if integ == "NULL" else integ
540 self.ike_integ_alg = AUTH_ALGOS[integ]
542 if prf not in PRF_ALGOS:
543 raise TypeError("unsupported prf algo %r" % prf)
545 self.ike_prf_alg = PRF_ALGOS[prf]
547 self.ike_group = DH[self.ike_dh]
549 def set_esp_props(self, crypto, crypto_key_len, integ):
550 self.esp_crypto_key_len = crypto_key_len
551 if crypto not in CRYPTO_ALGOS:
552 raise TypeError("unsupported encryption algo %r" % crypto)
553 self.esp_crypto = crypto
554 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
556 if integ not in AUTH_ALGOS:
557 raise TypeError("unsupported auth algo %r" % integ)
558 self.esp_integ = None if integ == "NULL" else integ
559 self.esp_integ_alg = AUTH_ALGOS[integ]
561 def crypto_attr(self, key_len):
562 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
563 return (0x800E << 16 | key_len << 3, 12)
565 raise Exception("unsupported attribute type")
567 def ike_crypto_attr(self):
568 return self.crypto_attr(self.ike_crypto_key_len)
570 def esp_crypto_attr(self):
571 return self.crypto_attr(self.esp_crypto_key_len)
573 def compute_nat_sha1(self, ip, port, rspi=None):
576 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
577 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
579 return digest.finalize()
582 class IkePeer(VppTestCase):
583 """common class for initiator and responder"""
587 import scapy.contrib.ikev2 as _ikev2
589 globals()["ikev2"] = _ikev2
590 super(IkePeer, cls).setUpClass()
591 cls.create_pg_interfaces(range(2))
592 for i in cls.pg_interfaces:
600 def tearDownClass(cls):
601 super(IkePeer, cls).tearDownClass()
604 super(IkePeer, self).tearDown()
605 if self.del_sa_from_responder:
606 self.initiate_del_sa_from_responder()
608 self.initiate_del_sa_from_initiator()
609 r = self.vapi.ikev2_sa_dump()
610 self.assertEqual(len(r), 0)
611 sas = self.vapi.ipsec_sa_dump()
612 self.assertEqual(len(sas), 0)
613 self.p.remove_vpp_config()
614 self.assertIsNone(self.p.query_vpp_config())
617 super(IkePeer, self).setUp()
619 self.p.add_vpp_config()
620 self.assertIsNotNone(self.p.query_vpp_config())
621 if self.sa.is_initiator:
622 self.sa.generate_dh_data()
623 self.vapi.cli("ikev2 set logging level 4")
624 self.vapi.cli("event-lo clear")
626 def assert_counter(self, count, name, version="ip4"):
627 node_name = "/err/ikev2-%s/" % version + name
628 self.assertEqual(count, self.statistics.get_err_counter(node_name))
630 def create_rekey_request(self, kex=False):
631 sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
632 header = ikev2.IKEv2(
633 init_SPI=self.sa.ispi,
634 resp_SPI=self.sa.rspi,
635 id=self.sa.new_msg_id(),
637 exch_type="CREATE_CHILD_SA",
640 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
641 return self.create_packet(
642 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
645 def create_empty_request(self):
646 header = ikev2.IKEv2(
647 init_SPI=self.sa.ispi,
648 resp_SPI=self.sa.rspi,
649 id=self.sa.new_msg_id(),
651 exch_type="INFORMATIONAL",
652 next_payload="Encrypted",
655 msg = self.encrypt_ike_msg(header, b"", None)
656 return self.create_packet(
657 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
661 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
664 src_ip = src_if.remote_ip6
665 dst_ip = src_if.local_ip6
668 src_ip = src_if.remote_ip4
669 dst_ip = src_if.local_ip4
672 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
673 / ip_layer(src=src_ip, dst=dst_ip)
674 / UDP(sport=sport, dport=dport)
677 # insert non ESP marker
678 res = res / Raw(b"\x00" * 4)
681 def verify_udp(self, udp):
682 self.assertEqual(udp.sport, self.sa.sport)
683 self.assertEqual(udp.dport, self.sa.dport)
685 def get_ike_header(self, packet):
687 ih = packet[ikev2.IKEv2]
688 ih = self.verify_and_remove_non_esp_marker(ih)
689 except IndexError as e:
690 # this is a workaround for getting IKEv2 layer as both ikev2 and
691 # ipsec register for port 4500
693 ih = self.verify_and_remove_non_esp_marker(esp)
694 self.assertEqual(ih.version, 0x20)
695 self.assertNotIn("Version", ih.flags)
698 def verify_and_remove_non_esp_marker(self, packet):
700 # if we are in nat traversal mode check for non esp marker
703 self.assertEqual(data[:4], b"\x00" * 4)
704 return ikev2.IKEv2(data[4:])
708 def encrypt_ike_msg(self, header, plain, first_payload):
709 if self.sa.ike_crypto == "AES-GCM-16ICV":
710 data = self.sa.ike_crypto_alg.pad(raw(plain))
715 + len(ikev2.IKEv2_payload_Encrypted())
717 tlen = plen + len(ikev2.IKEv2())
720 sk_p = ikev2.IKEv2_payload_Encrypted(
721 next_payload=first_payload, length=plen
725 encr = self.sa.encrypt(raw(plain), raw(res))
726 sk_p = ikev2.IKEv2_payload_Encrypted(
727 next_payload=first_payload, length=plen, load=encr
731 encr = self.sa.encrypt(raw(plain))
732 trunc_len = self.sa.ike_integ_alg.trunc_len
733 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
734 tlen = plen + len(ikev2.IKEv2())
736 sk_p = ikev2.IKEv2_payload_Encrypted(
737 next_payload=first_payload, length=plen, load=encr
742 integ_data = raw(res)
743 hmac_data = self.sa.compute_hmac(
744 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
746 res = res / Raw(hmac_data[:trunc_len])
747 assert len(res) == tlen
750 def verify_udp_encap(self, ipsec_sa):
751 e = VppEnum.vl_api_ipsec_sad_flags_t
752 if self.sa.udp_encap or self.sa.natt:
753 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
755 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
757 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
758 sas = self.vapi.ipsec_sa_dump()
761 # after rekey there is a short period of time in which old
762 # inbound SA is still present
766 self.assertEqual(len(sas), sa_count)
767 if self.sa.is_initiator:
782 c = self.sa.child_sas[0]
784 self.verify_udp_encap(sa0)
785 self.verify_udp_encap(sa1)
786 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
787 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
788 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
790 if self.sa.esp_integ is None:
793 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
794 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
795 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
798 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
799 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
800 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
801 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
805 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
806 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
807 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
808 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
810 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
811 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
813 def verify_keymat(self, api_keys, keys, name):
814 km = getattr(keys, name)
815 api_km = getattr(api_keys, name)
816 api_km_len = getattr(api_keys, name + "_len")
817 self.assertEqual(len(km), api_km_len)
818 self.assertEqual(km, api_km[:api_km_len])
820 def verify_id(self, api_id, exp_id):
821 self.assertEqual(api_id.type, IDType.value(exp_id.type))
822 self.assertEqual(api_id.data_len, exp_id.data_len)
823 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
825 def verify_ike_sas(self):
826 r = self.vapi.ikev2_sa_dump()
827 self.assertEqual(len(r), 1)
829 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
830 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
832 if self.sa.is_initiator:
833 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
834 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
836 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
837 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
839 if self.sa.is_initiator:
840 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
841 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
843 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
844 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
845 self.verify_keymat(sa.keys, self.sa, "sk_d")
846 self.verify_keymat(sa.keys, self.sa, "sk_ai")
847 self.verify_keymat(sa.keys, self.sa, "sk_ar")
848 self.verify_keymat(sa.keys, self.sa, "sk_ei")
849 self.verify_keymat(sa.keys, self.sa, "sk_er")
850 self.verify_keymat(sa.keys, self.sa, "sk_pi")
851 self.verify_keymat(sa.keys, self.sa, "sk_pr")
853 self.assertEqual(sa.i_id.type, self.sa.id_type)
854 self.assertEqual(sa.r_id.type, self.sa.id_type)
855 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
856 self.assertEqual(sa.r_id.data_len, len(self.idr))
857 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
858 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
860 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
861 self.assertEqual(len(r), 1)
863 self.assertEqual(csa.sa_index, sa.sa_index)
864 c = self.sa.child_sas[0]
865 if hasattr(c, "sk_ai"):
866 self.verify_keymat(csa.keys, c, "sk_ai")
867 self.verify_keymat(csa.keys, c, "sk_ar")
868 self.verify_keymat(csa.keys, c, "sk_ei")
869 self.verify_keymat(csa.keys, c, "sk_er")
870 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
871 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
873 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
876 r = self.vapi.ikev2_traffic_selector_dump(
877 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
879 self.assertEqual(len(r), 1)
881 self.verify_ts(r[0].ts, tsi[0], True)
883 r = self.vapi.ikev2_traffic_selector_dump(
884 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
886 self.assertEqual(len(r), 1)
887 self.verify_ts(r[0].ts, tsr[0], False)
889 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
890 self.verify_nonce(n, self.sa.i_nonce)
891 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
892 self.verify_nonce(n, self.sa.r_nonce)
894 def verify_nonce(self, api_nonce, nonce):
895 self.assertEqual(api_nonce.data_len, len(nonce))
896 self.assertEqual(api_nonce.nonce, nonce)
898 def verify_ts(self, api_ts, ts, is_initiator):
900 self.assertTrue(api_ts.is_local)
902 self.assertFalse(api_ts.is_local)
905 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
906 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
908 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
909 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
910 self.assertEqual(api_ts.start_port, ts.start_port)
911 self.assertEqual(api_ts.end_port, ts.end_port)
912 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
915 class TemplateInitiator(IkePeer):
916 """initiator test template"""
918 def initiate_del_sa_from_initiator(self):
919 ispi = int.from_bytes(self.sa.ispi, "little")
920 self.pg0.enable_capture()
922 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
923 capture = self.pg0.get_capture(1)
924 ih = self.get_ike_header(capture[0])
925 self.assertNotIn("Response", ih.flags)
926 self.assertIn("Initiator", ih.flags)
927 self.assertEqual(ih.init_SPI, self.sa.ispi)
928 self.assertEqual(ih.resp_SPI, self.sa.rspi)
929 plain = self.sa.hmac_and_decrypt(ih)
930 d = ikev2.IKEv2_payload_Delete(plain)
931 self.assertEqual(d.proto, 1) # proto=IKEv2
932 header = ikev2.IKEv2(
933 init_SPI=self.sa.ispi,
934 resp_SPI=self.sa.rspi,
936 exch_type="INFORMATIONAL",
938 next_payload="Encrypted",
940 resp = self.encrypt_ike_msg(header, b"", None)
941 self.send_and_assert_no_replies(self.pg0, resp)
943 def verify_del_sa(self, packet):
944 ih = self.get_ike_header(packet)
945 self.assertEqual(ih.id, self.sa.msg_id)
946 self.assertEqual(ih.exch_type, 37) # exchange informational
947 self.assertIn("Response", ih.flags)
948 self.assertIn("Initiator", ih.flags)
949 plain = self.sa.hmac_and_decrypt(ih)
950 self.assertEqual(plain, b"")
952 def initiate_del_sa_from_responder(self):
953 header = ikev2.IKEv2(
954 init_SPI=self.sa.ispi,
955 resp_SPI=self.sa.rspi,
956 exch_type="INFORMATIONAL",
957 id=self.sa.new_msg_id(),
959 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
960 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
961 packet = self.create_packet(
962 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
964 self.pg0.add_stream(packet)
965 self.pg0.enable_capture()
967 capture = self.pg0.get_capture(1)
968 self.verify_del_sa(capture[0])
971 def find_notify_payload(packet, notify_type):
972 n = packet[ikev2.IKEv2_payload_Notify]
974 if n.type == notify_type:
979 def verify_nat_detection(self, packet):
986 # NAT_DETECTION_SOURCE_IP
987 s = self.find_notify_payload(packet, 16388)
988 self.assertIsNotNone(s)
989 src_sha = self.sa.compute_nat_sha1(
990 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
992 self.assertEqual(s.load, src_sha)
994 # NAT_DETECTION_DESTINATION_IP
995 s = self.find_notify_payload(packet, 16389)
996 self.assertIsNotNone(s)
997 dst_sha = self.sa.compute_nat_sha1(
998 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1000 self.assertEqual(s.load, dst_sha)
1002 def verify_sa_init_request(self, packet):
1004 self.sa.dport = udp.sport
1005 ih = packet[ikev2.IKEv2]
1006 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1007 self.assertEqual(ih.exch_type, 34) # SA_INIT
1008 self.sa.ispi = ih.init_SPI
1009 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1010 self.assertIn("Initiator", ih.flags)
1011 self.assertNotIn("Response", ih.flags)
1012 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1013 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1015 prop = packet[ikev2.IKEv2_payload_Proposal]
1016 self.assertEqual(prop.proto, 1) # proto = ikev2
1017 self.assertEqual(prop.proposal, 1)
1018 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1020 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1022 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1023 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1024 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1025 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1027 self.verify_nat_detection(packet)
1028 self.sa.set_ike_props(
1029 crypto="AES-GCM-16ICV",
1032 prf="PRF_HMAC_SHA2_256",
1035 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1036 self.sa.generate_dh_data()
1037 self.sa.complete_dh_data()
1040 def update_esp_transforms(self, trans, sa):
1042 if trans.transform_type == 1: # ecryption
1043 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1044 elif trans.transform_type == 3: # integrity
1045 sa.esp_integ = INTEG_IDS[trans.transform_id]
1046 trans = trans.payload
1048 def verify_sa_auth_req(self, packet):
1050 self.sa.dport = udp.sport
1051 ih = self.get_ike_header(packet)
1052 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1053 self.assertEqual(ih.init_SPI, self.sa.ispi)
1054 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1055 self.assertIn("Initiator", ih.flags)
1056 self.assertNotIn("Response", ih.flags)
1059 self.verify_udp(udp)
1060 self.assertEqual(ih.id, self.sa.msg_id + 1)
1062 plain = self.sa.hmac_and_decrypt(ih)
1063 idi = ikev2.IKEv2_payload_IDi(plain)
1064 self.assertEqual(idi.load, self.sa.i_id)
1065 if self.no_idr_auth:
1066 self.assertEqual(idi.next_payload, 39) # AUTH
1068 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1069 self.assertEqual(idr.load, self.sa.r_id)
1070 prop = idi[ikev2.IKEv2_payload_Proposal]
1071 c = self.sa.child_sas[0]
1073 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1075 def send_init_response(self):
1076 tr_attr = self.sa.ike_crypto_attr()
1078 ikev2.IKEv2_payload_Transform(
1079 transform_type="Encryption",
1080 transform_id=self.sa.ike_crypto,
1082 key_length=tr_attr[0],
1084 / ikev2.IKEv2_payload_Transform(
1085 transform_type="Integrity", transform_id=self.sa.ike_integ
1087 / ikev2.IKEv2_payload_Transform(
1088 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1090 / ikev2.IKEv2_payload_Transform(
1091 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1094 props = ikev2.IKEv2_payload_Proposal(
1095 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1098 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1100 dst_address = b"\x0a\x0a\x0a\x0a"
1102 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1103 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1104 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1106 self.sa.init_resp_packet = (
1108 init_SPI=self.sa.ispi,
1109 resp_SPI=self.sa.rspi,
1110 exch_type="IKE_SA_INIT",
1113 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1114 / ikev2.IKEv2_payload_KE(
1115 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1117 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1118 / ikev2.IKEv2_payload_Notify(
1119 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1121 / ikev2.IKEv2_payload_Notify(
1122 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1126 ike_msg = self.create_packet(
1128 self.sa.init_resp_packet,
1134 self.pg_send(self.pg0, ike_msg)
1135 capture = self.pg0.get_capture(1)
1136 self.verify_sa_auth_req(capture[0])
1138 def initiate_sa_init(self):
1139 self.pg0.enable_capture()
1141 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1143 capture = self.pg0.get_capture(1)
1144 self.verify_sa_init_request(capture[0])
1145 self.send_init_response()
1147 def send_auth_response(self):
1148 tr_attr = self.sa.esp_crypto_attr()
1150 ikev2.IKEv2_payload_Transform(
1151 transform_type="Encryption",
1152 transform_id=self.sa.esp_crypto,
1154 key_length=tr_attr[0],
1156 / ikev2.IKEv2_payload_Transform(
1157 transform_type="Integrity", transform_id=self.sa.esp_integ
1159 / ikev2.IKEv2_payload_Transform(
1160 transform_type="Extended Sequence Number", transform_id="No ESN"
1162 / ikev2.IKEv2_payload_Transform(
1163 transform_type="Extended Sequence Number", transform_id="ESN"
1167 c = self.sa.child_sas[0]
1168 props = ikev2.IKEv2_payload_Proposal(
1169 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1172 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1174 ikev2.IKEv2_payload_IDi(
1175 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1177 / ikev2.IKEv2_payload_IDr(
1178 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1180 / ikev2.IKEv2_payload_AUTH(
1182 auth_type=AuthMethod.value(self.sa.auth_method),
1183 load=self.sa.auth_data,
1185 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1186 / ikev2.IKEv2_payload_TSi(
1187 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1189 / ikev2.IKEv2_payload_TSr(
1190 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1192 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1195 header = ikev2.IKEv2(
1196 init_SPI=self.sa.ispi,
1197 resp_SPI=self.sa.rspi,
1198 id=self.sa.new_msg_id(),
1200 exch_type="IKE_AUTH",
1203 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1204 packet = self.create_packet(
1205 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1207 self.pg_send(self.pg0, packet)
1209 def test_initiator(self):
1210 self.initiate_sa_init()
1212 self.sa.calc_child_keys()
1213 self.send_auth_response()
1214 self.verify_ike_sas()
1217 class TemplateResponder(IkePeer):
1218 """responder test template"""
1220 def initiate_del_sa_from_responder(self):
1221 self.pg0.enable_capture()
1223 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1224 capture = self.pg0.get_capture(1)
1225 ih = self.get_ike_header(capture[0])
1226 self.assertNotIn("Response", ih.flags)
1227 self.assertNotIn("Initiator", ih.flags)
1228 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1229 plain = self.sa.hmac_and_decrypt(ih)
1230 d = ikev2.IKEv2_payload_Delete(plain)
1231 self.assertEqual(d.proto, 1) # proto=IKEv2
1232 self.assertEqual(ih.init_SPI, self.sa.ispi)
1233 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1234 header = ikev2.IKEv2(
1235 init_SPI=self.sa.ispi,
1236 resp_SPI=self.sa.rspi,
1237 flags="Initiator+Response",
1238 exch_type="INFORMATIONAL",
1240 next_payload="Encrypted",
1242 resp = self.encrypt_ike_msg(header, b"", None)
1243 self.send_and_assert_no_replies(self.pg0, resp)
1245 def verify_del_sa(self, packet):
1246 ih = self.get_ike_header(packet)
1247 self.assertEqual(ih.id, self.sa.msg_id)
1248 self.assertEqual(ih.exch_type, 37) # exchange informational
1249 self.assertIn("Response", ih.flags)
1250 self.assertNotIn("Initiator", ih.flags)
1251 self.assertEqual(ih.next_payload, 46) # Encrypted
1252 self.assertEqual(ih.init_SPI, self.sa.ispi)
1253 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1254 plain = self.sa.hmac_and_decrypt(ih)
1255 self.assertEqual(plain, b"")
1257 def initiate_del_sa_from_initiator(self):
1258 header = ikev2.IKEv2(
1259 init_SPI=self.sa.ispi,
1260 resp_SPI=self.sa.rspi,
1262 exch_type="INFORMATIONAL",
1263 id=self.sa.new_msg_id(),
1265 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1266 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1267 packet = self.create_packet(
1268 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1270 self.pg0.add_stream(packet)
1271 self.pg0.enable_capture()
1273 capture = self.pg0.get_capture(1)
1274 self.verify_del_sa(capture[0])
1276 def send_sa_init_req(self):
1277 tr_attr = self.sa.ike_crypto_attr()
1279 ikev2.IKEv2_payload_Transform(
1280 transform_type="Encryption",
1281 transform_id=self.sa.ike_crypto,
1283 key_length=tr_attr[0],
1285 / ikev2.IKEv2_payload_Transform(
1286 transform_type="Integrity", transform_id=self.sa.ike_integ
1288 / ikev2.IKEv2_payload_Transform(
1289 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1291 / ikev2.IKEv2_payload_Transform(
1292 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1296 props = ikev2.IKEv2_payload_Proposal(
1297 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1300 next_payload = None if self.ip6 else "Notify"
1302 self.sa.init_req_packet = (
1304 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1306 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1307 / ikev2.IKEv2_payload_KE(
1308 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1310 / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1315 src_address = b"\x0a\x0a\x0a\x01"
1317 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1320 dst_address = b"\x0a\x0a\x0a\x0a"
1322 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1324 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1325 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1326 nat_src_detection = ikev2.IKEv2_payload_Notify(
1327 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1329 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1330 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1332 self.sa.init_req_packet = (
1333 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1336 ike_msg = self.create_packet(
1338 self.sa.init_req_packet,
1344 self.pg0.add_stream(ike_msg)
1345 self.pg0.enable_capture()
1347 capture = self.pg0.get_capture(1)
1348 self.verify_sa_init(capture[0])
1350 def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1351 tr_attr = self.sa.esp_crypto_attr()
1352 last_payload = last_payload or "Notify"
1355 ikev2.IKEv2_payload_Transform(
1356 transform_type="Encryption",
1357 transform_id=self.sa.esp_crypto,
1359 key_length=tr_attr[0],
1361 / ikev2.IKEv2_payload_Transform(
1362 transform_type="Integrity", transform_id=self.sa.esp_integ
1364 / ikev2.IKEv2_payload_Transform(
1365 transform_type="Extended Sequence Number", transform_id="No ESN"
1367 / ikev2.IKEv2_payload_Transform(
1368 transform_type="Extended Sequence Number", transform_id="ESN"
1374 trans /= ikev2.IKEv2_payload_Transform(
1375 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1378 c = self.sa.child_sas[0]
1379 props = ikev2.IKEv2_payload_Proposal(
1388 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1390 ikev2.IKEv2_payload_AUTH(
1392 auth_type=AuthMethod.value(self.sa.auth_method),
1393 load=self.sa.auth_data,
1395 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1396 / ikev2.IKEv2_payload_TSi(
1397 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1399 / ikev2.IKEv2_payload_TSr(
1400 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1405 first_payload = "Nonce"
1407 head = ikev2.IKEv2_payload_Nonce(
1408 load=self.sa.i_nonce, next_payload="KE"
1409 ) / ikev2.IKEv2_payload_KE(
1410 group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1413 head = ikev2.IKEv2_payload_Nonce(
1414 load=self.sa.i_nonce, next_payload="SA"
1419 / ikev2.IKEv2_payload_Notify(
1423 length=8 + len(c.ispi),
1424 next_payload="Notify",
1426 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1429 first_payload = "IDi"
1430 if self.no_idr_auth:
1431 ids = ikev2.IKEv2_payload_IDi(
1432 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1435 ids = ikev2.IKEv2_payload_IDi(
1436 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1437 ) / ikev2.IKEv2_payload_IDr(
1438 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1441 return plain, first_payload
1443 def send_sa_auth(self):
1444 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1445 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1446 header = ikev2.IKEv2(
1447 init_SPI=self.sa.ispi,
1448 resp_SPI=self.sa.rspi,
1449 id=self.sa.new_msg_id(),
1451 exch_type="IKE_AUTH",
1454 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1455 packet = self.create_packet(
1456 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1458 self.pg0.add_stream(packet)
1459 self.pg0.enable_capture()
1461 capture = self.pg0.get_capture(1)
1462 self.verify_sa_auth_resp(capture[0])
1464 def verify_sa_init(self, packet):
1465 ih = self.get_ike_header(packet)
1467 self.assertEqual(ih.id, self.sa.msg_id)
1468 self.assertEqual(ih.exch_type, 34)
1469 self.assertIn("Response", ih.flags)
1470 self.assertEqual(ih.init_SPI, self.sa.ispi)
1471 self.assertNotEqual(ih.resp_SPI, 0)
1472 self.sa.rspi = ih.resp_SPI
1474 sa = ih[ikev2.IKEv2_payload_SA]
1475 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1476 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1477 except IndexError as e:
1478 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1479 self.logger.error(ih.show())
1481 self.sa.complete_dh_data()
1485 def verify_sa_auth_resp(self, packet):
1486 ike = self.get_ike_header(packet)
1488 self.verify_udp(udp)
1489 self.assertEqual(ike.id, self.sa.msg_id)
1490 plain = self.sa.hmac_and_decrypt(ike)
1491 idr = ikev2.IKEv2_payload_IDr(plain)
1492 prop = idr[ikev2.IKEv2_payload_Proposal]
1493 self.assertEqual(prop.SPIsize, 4)
1494 self.sa.child_sas[0].rspi = prop.SPI
1495 self.sa.calc_child_keys()
1497 IKE_NODE_SUFFIX = "ip4"
1499 def verify_counters(self):
1500 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1501 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1502 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1504 r = self.vapi.ikev2_sa_dump()
1506 self.assertEqual(1, s.n_sa_auth_req)
1507 self.assertEqual(1, s.n_sa_init_req)
1509 def test_responder(self):
1510 self.send_sa_init_req()
1512 self.verify_ipsec_sas()
1513 self.verify_ike_sas()
1514 self.verify_counters()
1517 class Ikev2Params(object):
1518 def config_params(self, params={}):
1519 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1520 ei = VppEnum.vl_api_ipsec_integ_alg_t
1522 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1523 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1524 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1525 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1526 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1527 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1528 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1529 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1530 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1531 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1534 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1536 self.vapi.cli("ikev2 dpd disable")
1537 self.del_sa_from_responder = (
1539 if "del_sa_from_responder" not in params
1540 else params["del_sa_from_responder"]
1542 i_natt = False if "i_natt" not in params else params["i_natt"]
1543 r_natt = False if "r_natt" not in params else params["r_natt"]
1544 self.p = Profile(self, "pr1")
1545 self.ip6 = False if "ip6" not in params else params["ip6"]
1547 if "auth" in params and params["auth"] == "rsa-sig":
1548 auth_method = "rsa-sig"
1549 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1550 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1552 client_file = work_dir + params["client-cert"]
1553 server_pem = open(work_dir + params["server-cert"]).read()
1554 client_priv = open(work_dir + params["client-key"]).read()
1555 client_priv = load_pem_private_key(
1556 str.encode(client_priv), None, default_backend()
1558 self.peer_cert = x509.load_pem_x509_certificate(
1559 str.encode(server_pem), default_backend()
1561 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1564 auth_data = b"$3cr3tpa$$w0rd"
1565 self.p.add_auth(method="shared-key", data=auth_data)
1566 auth_method = "shared-key"
1569 is_init = True if "is_initiator" not in params else params["is_initiator"]
1570 self.no_idr_auth = params.get("no_idr_in_auth", False)
1572 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1573 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1574 r_id = self.idr = idr["data"]
1575 i_id = self.idi = idi["data"]
1577 # scapy is initiator, VPP is responder
1578 self.p.add_local_id(**idr)
1579 self.p.add_remote_id(**idi)
1580 if self.no_idr_auth:
1583 # VPP is initiator, scapy is responder
1584 self.p.add_local_id(**idi)
1585 if not self.no_idr_auth:
1586 self.p.add_remote_id(**idr)
1589 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1590 if "loc_ts" not in params
1591 else params["loc_ts"]
1594 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1595 if "rem_ts" not in params
1596 else params["rem_ts"]
1598 self.p.add_local_ts(**loc_ts)
1599 self.p.add_remote_ts(**rem_ts)
1600 if "responder" in params:
1601 self.p.add_responder(params["responder"])
1602 if "ike_transforms" in params:
1603 self.p.add_ike_transforms(params["ike_transforms"])
1604 if "esp_transforms" in params:
1605 self.p.add_esp_transforms(params["esp_transforms"])
1607 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1609 self.p.set_udp_encap(True)
1611 if "responder_hostname" in params:
1612 hn = params["responder_hostname"]
1613 self.p.add_responder_hostname(hn)
1615 # configure static dns record
1616 self.vapi.dns_name_server_add_del(
1617 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1619 self.vapi.dns_enable_disable(enable=1)
1621 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1628 is_initiator=is_init,
1629 id_type=self.p.local_id["id_type"],
1632 priv_key=client_priv,
1633 auth_method=auth_method,
1634 nonce=params.get("nonce"),
1635 auth_data=auth_data,
1636 udp_encap=udp_encap,
1637 local_ts=self.p.remote_ts,
1638 remote_ts=self.p.local_ts,
1643 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1646 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1648 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1651 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1654 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1657 self.sa.set_ike_props(
1658 crypto=ike_crypto[0],
1659 crypto_key_len=ike_crypto[1],
1661 prf="PRF_HMAC_SHA2_256",
1664 self.sa.set_esp_props(
1665 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1669 class TestApi(VppTestCase):
1670 """Test IKEV2 API"""
1673 def setUpClass(cls):
1674 super(TestApi, cls).setUpClass()
1677 def tearDownClass(cls):
1678 super(TestApi, cls).tearDownClass()
1681 super(TestApi, self).tearDown()
1682 self.p1.remove_vpp_config()
1683 self.p2.remove_vpp_config()
1684 r = self.vapi.ikev2_profile_dump()
1685 self.assertEqual(len(r), 0)
1687 def configure_profile(self, cfg):
1688 p = Profile(self, cfg["name"])
1689 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1690 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1691 p.add_local_ts(**cfg["loc_ts"])
1692 p.add_remote_ts(**cfg["rem_ts"])
1693 p.add_responder(cfg["responder"])
1694 p.add_ike_transforms(cfg["ike_ts"])
1695 p.add_esp_transforms(cfg["esp_ts"])
1696 p.add_auth(**cfg["auth"])
1697 p.set_udp_encap(cfg["udp_encap"])
1698 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1699 if "lifetime_data" in cfg:
1700 p.set_lifetime_data(cfg["lifetime_data"])
1701 if "tun_itf" in cfg:
1702 p.set_tunnel_interface(cfg["tun_itf"])
1703 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1708 def test_profile_api(self):
1709 """test profile dump API"""
1714 "start_addr": "3.3.3.2",
1715 "end_addr": "3.3.3.3",
1721 "start_addr": "4.5.76.80",
1722 "end_addr": "2.3.4.6",
1729 "start_addr": "ab::1",
1730 "end_addr": "ab::4",
1736 "start_addr": "cd::12",
1737 "end_addr": "cd::13",
1743 "natt_disabled": True,
1744 "loc_id": ("fqdn", b"vpp.home"),
1745 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1748 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1751 "crypto_key_size": 32,
1755 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1756 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1758 "ipsec_over_udp_port": 4501,
1761 "lifetime_maxdata": 20192,
1762 "lifetime_jitter": 9,
1768 "loc_id": ("ip4-addr", b"192.168.2.1"),
1769 "rem_id": ("ip6-addr", b"abcd::1"),
1772 "responder": {"sw_if_index": 4, "addr": "def::10"},
1775 "crypto_key_size": 16,
1779 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1780 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1782 "ipsec_over_udp_port": 4600,
1786 self.p1 = self.configure_profile(conf["p1"])
1787 self.p2 = self.configure_profile(conf["p2"])
1789 r = self.vapi.ikev2_profile_dump()
1790 self.assertEqual(len(r), 2)
1791 self.verify_profile(r[0].profile, conf["p1"])
1792 self.verify_profile(r[1].profile, conf["p2"])
1794 def verify_id(self, api_id, cfg_id):
1795 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1796 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1798 def verify_ts(self, api_ts, cfg_ts):
1799 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1800 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1801 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1802 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1803 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1805 def verify_responder(self, api_r, cfg_r):
1806 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1807 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1809 def verify_transforms(self, api_ts, cfg_ts):
1810 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1811 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1812 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1814 def verify_ike_transforms(self, api_ts, cfg_ts):
1815 self.verify_transforms(api_ts, cfg_ts)
1816 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1818 def verify_esp_transforms(self, api_ts, cfg_ts):
1819 self.verify_transforms(api_ts, cfg_ts)
1821 def verify_auth(self, api_auth, cfg_auth):
1822 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1823 self.assertEqual(api_auth.data, cfg_auth["data"])
1824 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1826 def verify_lifetime_data(self, p, ld):
1827 self.assertEqual(p.lifetime, ld["lifetime"])
1828 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1829 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1830 self.assertEqual(p.handover, ld["handover"])
1832 def verify_profile(self, ap, cp):
1833 self.assertEqual(ap.name, cp["name"])
1834 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1835 self.verify_id(ap.loc_id, cp["loc_id"])
1836 self.verify_id(ap.rem_id, cp["rem_id"])
1837 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1838 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1839 self.verify_responder(ap.responder, cp["responder"])
1840 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1841 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1842 self.verify_auth(ap.auth, cp["auth"])
1843 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1844 self.assertTrue(natt_dis == ap.natt_disabled)
1846 if "lifetime_data" in cp:
1847 self.verify_lifetime_data(ap, cp["lifetime_data"])
1848 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1850 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1852 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1855 @tag_fixme_vpp_workers
1856 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1857 """test responder - responder behind NAT"""
1859 IKE_NODE_SUFFIX = "ip4-natt"
1861 def config_tc(self):
1862 self.config_params({"r_natt": True})
1865 @tag_fixme_vpp_workers
1866 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1867 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1869 def config_tc(self):
1873 "is_initiator": False, # seen from test case perspective
1874 # thus vpp is initiator
1876 "sw_if_index": self.pg0.sw_if_index,
1877 "addr": self.pg0.remote_ip4,
1879 "ike-crypto": ("AES-GCM-16ICV", 32),
1880 "ike-integ": "NULL",
1881 "ike-dh": "3072MODPgr",
1883 "crypto_alg": 20, # "aes-gcm-16"
1884 "crypto_key_size": 256,
1885 "dh_group": 15, # "modp-3072"
1888 "crypto_alg": 12, # "aes-cbc"
1889 "crypto_key_size": 256,
1890 # "hmac-sha2-256-128"
1897 @tag_fixme_vpp_workers
1898 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1899 """test ikev2 initiator - pre shared key auth"""
1901 def config_tc(self):
1904 "is_initiator": False, # seen from test case perspective
1905 # thus vpp is initiator
1906 "ike-crypto": ("AES-GCM-16ICV", 32),
1907 "ike-integ": "NULL",
1908 "ike-dh": "3072MODPgr",
1910 "crypto_alg": 20, # "aes-gcm-16"
1911 "crypto_key_size": 256,
1912 "dh_group": 15, # "modp-3072"
1915 "crypto_alg": 12, # "aes-cbc"
1916 "crypto_key_size": 256,
1917 # "hmac-sha2-256-128"
1920 "responder_hostname": {
1921 "hostname": "vpp.responder.org",
1922 "sw_if_index": self.pg0.sw_if_index,
1928 @tag_fixme_vpp_workers
1929 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1930 """test initiator - request window size (1)"""
1932 def rekey_respond(self, req, update_child_sa_data):
1933 ih = self.get_ike_header(req)
1934 plain = self.sa.hmac_and_decrypt(ih)
1935 sa = ikev2.IKEv2_payload_SA(plain)
1936 if update_child_sa_data:
1937 prop = sa[ikev2.IKEv2_payload_Proposal]
1938 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1939 self.sa.r_nonce = self.sa.i_nonce
1940 self.sa.child_sas[0].ispi = prop.SPI
1941 self.sa.child_sas[0].rspi = prop.SPI
1942 self.sa.calc_child_keys()
1944 header = ikev2.IKEv2(
1945 init_SPI=self.sa.ispi,
1946 resp_SPI=self.sa.rspi,
1950 next_payload="Encrypted",
1952 resp = self.encrypt_ike_msg(header, sa, "SA")
1953 packet = self.create_packet(
1954 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1956 self.send_and_assert_no_replies(self.pg0, packet)
1958 def test_initiator(self):
1959 super(TestInitiatorRequestWindowSize, self).test_initiator()
1960 self.pg0.enable_capture()
1962 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1963 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1964 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1965 capture = self.pg0.get_capture(2)
1967 # reply in reverse order
1968 self.rekey_respond(capture[1], True)
1969 self.rekey_respond(capture[0], False)
1971 # verify that only the second request was accepted
1972 self.verify_ike_sas()
1973 self.verify_ipsec_sas(is_rekey=True)
1976 @tag_fixme_vpp_workers
1977 class TestInitiatorRekey(TestInitiatorPsk):
1978 """test ikev2 initiator - rekey"""
1980 def rekey_from_initiator(self):
1981 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1982 self.pg0.enable_capture()
1984 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1985 capture = self.pg0.get_capture(1)
1986 ih = self.get_ike_header(capture[0])
1987 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1988 self.assertNotIn("Response", ih.flags)
1989 self.assertIn("Initiator", ih.flags)
1990 plain = self.sa.hmac_and_decrypt(ih)
1991 sa = ikev2.IKEv2_payload_SA(plain)
1992 prop = sa[ikev2.IKEv2_payload_Proposal]
1993 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1994 self.sa.r_nonce = self.sa.i_nonce
1995 # update new responder SPI
1996 self.sa.child_sas[0].ispi = prop.SPI
1997 self.sa.child_sas[0].rspi = prop.SPI
1998 self.sa.calc_child_keys()
1999 header = ikev2.IKEv2(
2000 init_SPI=self.sa.ispi,
2001 resp_SPI=self.sa.rspi,
2005 next_payload="Encrypted",
2007 resp = self.encrypt_ike_msg(header, sa, "SA")
2008 packet = self.create_packet(
2009 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2011 self.send_and_assert_no_replies(self.pg0, packet)
2013 def test_initiator(self):
2014 super(TestInitiatorRekey, self).test_initiator()
2015 self.rekey_from_initiator()
2016 self.verify_ike_sas()
2017 self.verify_ipsec_sas(is_rekey=True)
2020 @tag_fixme_vpp_workers
2021 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2022 """test ikev2 initiator - delete IKE SA from responder"""
2024 def config_tc(self):
2027 "del_sa_from_responder": True,
2028 "is_initiator": False, # seen from test case perspective
2029 # thus vpp is initiator
2031 "sw_if_index": self.pg0.sw_if_index,
2032 "addr": self.pg0.remote_ip4,
2034 "ike-crypto": ("AES-GCM-16ICV", 32),
2035 "ike-integ": "NULL",
2036 "ike-dh": "3072MODPgr",
2038 "crypto_alg": 20, # "aes-gcm-16"
2039 "crypto_key_size": 256,
2040 "dh_group": 15, # "modp-3072"
2043 "crypto_alg": 12, # "aes-cbc"
2044 "crypto_key_size": 256,
2045 # "hmac-sha2-256-128"
2048 "no_idr_in_auth": True,
2053 @tag_fixme_vpp_workers
2054 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2055 """test ikev2 responder - initiator behind NAT"""
2057 IKE_NODE_SUFFIX = "ip4-natt"
2059 def config_tc(self):
2060 self.config_params({"i_natt": True})
2063 @tag_fixme_vpp_workers
2064 class TestResponderPsk(TemplateResponder, Ikev2Params):
2065 """test ikev2 responder - pre shared key auth"""
2067 def config_tc(self):
2068 self.config_params()
2071 @tag_fixme_vpp_workers
2072 class TestResponderDpd(TestResponderPsk):
2074 Dead peer detection test
2077 def config_tc(self):
2078 self.config_params({"dpd_disabled": False})
2083 def test_responder(self):
2084 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2085 super(TestResponderDpd, self).test_responder()
2086 self.pg0.enable_capture()
2088 # capture empty request but don't reply
2089 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2090 ih = self.get_ike_header(capture[0])
2091 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2092 plain = self.sa.hmac_and_decrypt(ih)
2093 self.assertEqual(plain, b"")
2094 # wait for SA expiration
2096 ike_sas = self.vapi.ikev2_sa_dump()
2097 self.assertEqual(len(ike_sas), 0)
2098 ipsec_sas = self.vapi.ipsec_sa_dump()
2099 self.assertEqual(len(ipsec_sas), 0)
2102 @tag_fixme_vpp_workers
2103 class TestResponderRekey(TestResponderPsk):
2104 """test ikev2 responder - rekey"""
2108 def send_rekey_from_initiator(self):
2110 self.sa.generate_dh_data()
2111 packet = self.create_rekey_request(kex=self.WITH_KEX)
2112 self.pg0.add_stream(packet)
2113 self.pg0.enable_capture()
2115 capture = self.pg0.get_capture(1)
2118 def process_rekey_response(self, capture):
2119 ih = self.get_ike_header(capture[0])
2120 plain = self.sa.hmac_and_decrypt(ih)
2121 sa = ikev2.IKEv2_payload_SA(plain)
2122 prop = sa[ikev2.IKEv2_payload_Proposal]
2123 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2124 # update new responder SPI
2125 self.sa.child_sas[0].rspi = prop.SPI
2127 self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2128 self.sa.complete_dh_data()
2129 self.sa.calc_child_keys(kex=self.WITH_KEX)
2131 def test_responder(self):
2132 super(TestResponderRekey, self).test_responder()
2133 self.process_rekey_response(self.send_rekey_from_initiator())
2134 self.verify_ike_sas()
2135 self.verify_ipsec_sas(is_rekey=True)
2136 self.assert_counter(1, "rekey_req", "ip4")
2137 r = self.vapi.ikev2_sa_dump()
2138 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2141 @tag_fixme_vpp_workers
2142 class TestResponderRekeyRepeat(TestResponderRekey):
2143 """test ikev2 responder - rekey repeat"""
2145 def test_responder(self):
2146 super(TestResponderRekeyRepeat, self).test_responder()
2147 # rekey request is not accepted until old IPsec SA is expired
2148 capture = self.send_rekey_from_initiator()
2149 ih = self.get_ike_header(capture[0])
2150 plain = self.sa.hmac_and_decrypt(ih)
2151 notify = ikev2.IKEv2_payload_Notify(plain)
2152 self.assertEqual(notify.type, 43)
2153 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2154 # rekey request is accepted after old IPsec SA was expired
2156 if len(self.vapi.ipsec_sa_dump()) != 3:
2160 self.fail("old IPsec SA not expired")
2161 self.process_rekey_response(self.send_rekey_from_initiator())
2162 self.verify_ike_sas()
2163 self.verify_ipsec_sas(sa_count=3)
2166 @tag_fixme_vpp_workers
2167 class TestResponderRekeyKEX(TestResponderRekey):
2168 """test ikev2 responder - rekey with key exchange"""
2173 @tag_fixme_vpp_workers
2174 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2175 """test ikev2 responder - rekey repeat with key exchange"""
2180 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2181 """test ikev2 responder - non-default table id"""
2184 def setUpClass(cls):
2185 import scapy.contrib.ikev2 as _ikev2
2187 globals()["ikev2"] = _ikev2
2188 super(IkePeer, cls).setUpClass()
2189 cls.create_pg_interfaces(range(1))
2190 cls.vapi.cli("ip table add 1")
2191 cls.vapi.cli("set interface ip table pg0 1")
2192 for i in cls.pg_interfaces:
2199 def config_tc(self):
2200 self.config_params({"dpd_disabled": False})
2202 def test_responder(self):
2203 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2204 super(TestResponderVrf, self).test_responder()
2205 self.pg0.enable_capture()
2207 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2208 ih = self.get_ike_header(capture[0])
2209 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2210 plain = self.sa.hmac_and_decrypt(ih)
2211 self.assertEqual(plain, b"")
2214 @tag_fixme_vpp_workers
2215 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2216 """test ikev2 responder - cert based auth"""
2218 def config_tc(self):
2223 "server-key": "server-key.pem",
2224 "client-key": "client-key.pem",
2225 "client-cert": "client-cert.pem",
2226 "server-cert": "server-cert.pem",
2231 @tag_fixme_vpp_workers
2232 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2233 TemplateResponder, Ikev2Params
2236 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2239 def config_tc(self):
2242 "ike-crypto": ("AES-CBC", 16),
2243 "ike-integ": "SHA2-256-128",
2244 "esp-crypto": ("AES-CBC", 24),
2245 "esp-integ": "SHA2-384-192",
2246 "ike-dh": "2048MODPgr",
2247 "nonce": os.urandom(256),
2248 "no_idr_in_auth": True,
2253 @tag_fixme_vpp_workers
2254 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2255 TemplateResponder, Ikev2Params
2259 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2262 def config_tc(self):
2265 "ike-crypto": ("AES-CBC", 32),
2266 "ike-integ": "SHA2-256-128",
2267 "esp-crypto": ("AES-GCM-16ICV", 32),
2268 "esp-integ": "NULL",
2269 "ike-dh": "3072MODPgr",
2274 @tag_fixme_vpp_workers
2275 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2280 IKE_NODE_SUFFIX = "ip6"
2282 def config_tc(self):
2285 "del_sa_from_responder": True,
2288 "ike-crypto": ("AES-GCM-16ICV", 32),
2289 "ike-integ": "NULL",
2290 "ike-dh": "2048MODPgr",
2291 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2292 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2297 @tag_fixme_vpp_workers
2298 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2300 Test for keep alive messages
2303 def send_empty_req_from_responder(self):
2304 packet = self.create_empty_request()
2305 self.pg0.add_stream(packet)
2306 self.pg0.enable_capture()
2308 capture = self.pg0.get_capture(1)
2309 ih = self.get_ike_header(capture[0])
2310 self.assertEqual(ih.id, self.sa.msg_id)
2311 plain = self.sa.hmac_and_decrypt(ih)
2312 self.assertEqual(plain, b"")
2313 self.assert_counter(1, "keepalive", "ip4")
2314 r = self.vapi.ikev2_sa_dump()
2315 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2317 def test_initiator(self):
2318 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2319 self.send_empty_req_from_responder()
2322 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2323 """malformed packet test"""
2328 def config_tc(self):
2329 self.config_params()
2331 def create_ike_init_msg(self, length=None, payload=None):
2334 init_SPI="\x11" * 8,
2336 exch_type="IKE_SA_INIT",
2338 if payload is not None:
2340 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2342 def verify_bad_packet_length(self):
2343 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2344 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2345 self.assert_counter(self.pkt_count, "bad_length")
2347 def verify_bad_sa_payload_length(self):
2348 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2349 ike_msg = self.create_ike_init_msg(payload=p)
2350 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2351 self.assert_counter(self.pkt_count, "malformed_packet")
2353 def test_responder(self):
2354 self.pkt_count = 254
2355 self.verify_bad_packet_length()
2356 self.verify_bad_sa_payload_length()
2359 if __name__ == "__main__":
2360 unittest.main(testRunner=VppTestRunner)