3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import VppTestCase
23 from asfframework import (
24 tag_fixme_vpp_workers,
31 from vpp_ikev2 import Profile, IDType, AuthMethod
32 from vpp_papi import VppEnum
39 KEY_PAD = b"Key Pad for IKEv2"
46 # tuple structure is (p, g, key_len)
51 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
69 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
70 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
71 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
72 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
73 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
74 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
75 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
76 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
77 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
78 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
79 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
80 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
81 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
82 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
83 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
84 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
92 class CryptoAlgo(object):
93 def __init__(self, name, cipher, mode):
97 if self.cipher is not None:
98 self.bs = self.cipher.block_size // 8
100 if self.name == "AES-GCM-16ICV":
101 self.iv_len = GCM_IV_SIZE
103 self.iv_len = self.bs
105 def encrypt(self, data, key, aad=None):
106 iv = os.urandom(self.iv_len)
109 self.cipher(key), self.mode(iv), default_backend()
111 return iv + encryptor.update(data) + encryptor.finalize()
113 salt = key[-SALT_SIZE:]
116 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
118 encryptor.authenticate_additional_data(aad)
119 data = encryptor.update(data) + encryptor.finalize()
120 data += encryptor.tag[:GCM_ICV_SIZE]
123 def decrypt(self, data, key, aad=None, icv=None):
125 iv = data[: self.iv_len]
126 ct = data[self.iv_len :]
128 algorithms.AES(key), self.mode(iv), default_backend()
130 return decryptor.update(ct) + decryptor.finalize()
132 salt = key[-SALT_SIZE:]
133 nonce = salt + data[:GCM_IV_SIZE]
134 ct = data[GCM_IV_SIZE:]
135 key = key[:-SALT_SIZE]
137 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
139 decryptor.authenticate_additional_data(aad)
140 return decryptor.update(ct) + decryptor.finalize()
143 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
144 data = data + b"\x00" * (pad_len - 1)
145 return data + bytes([pad_len - 1])
148 class AuthAlgo(object):
149 def __init__(self, name, mac, mod, key_len, trunc_len=None):
153 self.key_len = key_len
154 self.trunc_len = trunc_len or key_len
158 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
159 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
160 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
164 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
165 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
166 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
167 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
168 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
172 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
173 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
189 class IKEv2ChildSA(object):
190 def __init__(self, local_ts, remote_ts, is_initiator):
198 self.local_ts = local_ts
199 self.remote_ts = remote_ts
202 class IKEv2SA(object):
209 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
215 auth_method="shared-key",
221 self.udp_encap = udp_encap
231 self.dh_params = None
233 self.priv_key = priv_key
234 self.is_initiator = is_initiator
235 nonce = nonce or os.urandom(32)
236 self.auth_data = auth_data
239 if isinstance(id_type, str):
240 self.id_type = IDType.value(id_type)
242 self.id_type = id_type
243 self.auth_method = auth_method
244 if self.is_initiator:
245 self.rspi = 8 * b"\x00"
250 self.ispi = 8 * b"\x00"
252 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
254 def new_msg_id(self):
259 def my_dh_pub_key(self):
260 if self.is_initiator:
261 return self.i_dh_data
262 return self.r_dh_data
265 def peer_dh_pub_key(self):
266 if self.is_initiator:
267 return self.r_dh_data
268 return self.i_dh_data
272 return self.i_natt or self.r_natt
274 def compute_secret(self):
275 priv = self.dh_private_key
276 peer = self.peer_dh_pub_key
277 p, g, l = self.ike_group
279 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
282 def generate_dh_data(self):
284 if self.ike_dh not in DH:
285 raise NotImplementedError("%s not in DH group" % self.ike_dh)
287 if self.dh_params is None:
288 dhg = DH[self.ike_dh]
289 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
290 self.dh_params = pn.parameters(default_backend())
292 priv = self.dh_params.generate_private_key()
293 pub = priv.public_key()
294 x = priv.private_numbers().x
295 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
296 y = pub.public_numbers().y
298 if self.is_initiator:
299 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
301 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
303 def complete_dh_data(self):
304 self.dh_shared_secret = self.compute_secret()
306 def calc_child_keys(self, kex=False):
307 prf = self.ike_prf_alg.mod()
308 s = self.i_nonce + self.r_nonce
310 s = self.dh_shared_secret + s
311 c = self.child_sas[0]
313 encr_key_len = self.esp_crypto_key_len
314 integ_key_len = self.esp_integ_alg.key_len
315 salt_len = 0 if integ_key_len else 4
317 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
318 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
321 c.sk_ei = keymat[pos : pos + encr_key_len]
325 c.sk_ai = keymat[pos : pos + integ_key_len]
328 c.salt_ei = keymat[pos : pos + salt_len]
331 c.sk_er = keymat[pos : pos + encr_key_len]
335 c.sk_ar = keymat[pos : pos + integ_key_len]
338 c.salt_er = keymat[pos : pos + salt_len]
341 def calc_prfplus(self, prf, key, seed, length):
345 while len(r) < length and x < 255:
350 s = s + seed + bytes([x])
351 t = self.calc_prf(prf, key, s)
359 def calc_prf(self, prf, key, data):
360 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
364 def calc_keys(self, sk_d=None):
365 prf = self.ike_prf_alg.mod()
367 # SKEYSEED = prf(Ni | Nr, g^ir)
368 self.skeyseed = self.calc_prf(
369 prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
372 # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
373 self.skeyseed = self.calc_prf(
374 prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
377 # calculate S = Ni | Nr | SPIi SPIr
378 s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
380 prf_key_trunc = self.ike_prf_alg.trunc_len
381 encr_key_len = self.ike_crypto_key_len
382 tr_prf_key_len = self.ike_prf_alg.key_len
383 integ_key_len = self.ike_integ_alg.key_len
384 if integ_key_len == 0:
396 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
399 self.sk_d = keymat[: pos + prf_key_trunc]
402 self.sk_ai = keymat[pos : pos + integ_key_len]
404 self.sk_ar = keymat[pos : pos + integ_key_len]
407 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
408 pos += encr_key_len + salt_size
409 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
410 pos += encr_key_len + salt_size
412 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
413 pos += tr_prf_key_len
414 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
416 def generate_authmsg(self, prf, packet):
417 if self.is_initiator:
425 data = bytes([self.id_type, 0, 0, 0]) + id
426 id_hash = self.calc_prf(prf, key, data)
427 return packet + nonce + id_hash
430 prf = self.ike_prf_alg.mod()
431 if self.is_initiator:
432 packet = self.init_req_packet
434 packet = self.init_resp_packet
435 authmsg = self.generate_authmsg(prf, raw(packet))
436 if self.auth_method == "shared-key":
437 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
438 self.auth_data = self.calc_prf(prf, psk, authmsg)
439 elif self.auth_method == "rsa-sig":
440 self.auth_data = self.priv_key.sign(
441 authmsg, padding.PKCS1v15(), hashes.SHA1()
444 raise TypeError("unknown auth method type!")
446 def encrypt(self, data, aad=None):
447 data = self.ike_crypto_alg.pad(data)
448 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
451 def peer_authkey(self):
452 if self.is_initiator:
457 def my_authkey(self):
458 if self.is_initiator:
463 def my_cryptokey(self):
464 if self.is_initiator:
469 def peer_cryptokey(self):
470 if self.is_initiator:
474 def concat(self, alg, key_len):
475 return alg + "-" + str(key_len * 8)
478 def vpp_ike_cypto_alg(self):
479 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
482 def vpp_esp_cypto_alg(self):
483 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
485 def verify_hmac(self, ikemsg):
486 integ_trunc = self.ike_integ_alg.trunc_len
487 exp_hmac = ikemsg[-integ_trunc:]
488 data = ikemsg[:-integ_trunc]
489 computed_hmac = self.compute_hmac(
490 self.ike_integ_alg.mod(), self.peer_authkey, data
492 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
494 def compute_hmac(self, integ, key, data):
495 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
499 def decrypt(self, data, aad=None, icv=None):
500 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
502 def hmac_and_decrypt(self, ike):
503 ep = ike[ikev2.IKEv2_payload_Encrypted]
504 if self.ike_crypto == "AES-GCM-16ICV":
505 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
506 ct = ep.load[:-GCM_ICV_SIZE]
507 tag = ep.load[-GCM_ICV_SIZE:]
508 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
510 self.verify_hmac(raw(ike))
511 integ_trunc = self.ike_integ_alg.trunc_len
513 # remove ICV and decrypt payload
514 ct = ep.load[:-integ_trunc]
515 plain = self.decrypt(ct)
518 return plain[: -pad_len - 1]
520 def build_ts_addr(self, ts, version):
522 "starting_address_v" + version: ts["start_addr"],
523 "ending_address_v" + version: ts["end_addr"],
526 def generate_ts(self, is_ip4):
527 c = self.child_sas[0]
528 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
530 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
531 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
532 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
533 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
535 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
536 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
537 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
538 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
540 if self.is_initiator:
541 return ([ts1], [ts2])
542 return ([ts2], [ts1])
544 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
545 if crypto not in CRYPTO_ALGOS:
546 raise TypeError("unsupported encryption algo %r" % crypto)
547 self.ike_crypto = crypto
548 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
549 self.ike_crypto_key_len = crypto_key_len
551 if integ not in AUTH_ALGOS:
552 raise TypeError("unsupported auth algo %r" % integ)
553 self.ike_integ = None if integ == "NULL" else integ
554 self.ike_integ_alg = AUTH_ALGOS[integ]
556 if prf not in PRF_ALGOS:
557 raise TypeError("unsupported prf algo %r" % prf)
559 self.ike_prf_alg = PRF_ALGOS[prf]
561 self.ike_group = DH[self.ike_dh]
563 def set_esp_props(self, crypto, crypto_key_len, integ):
564 self.esp_crypto_key_len = crypto_key_len
565 if crypto not in CRYPTO_ALGOS:
566 raise TypeError("unsupported encryption algo %r" % crypto)
567 self.esp_crypto = crypto
568 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
570 if integ not in AUTH_ALGOS:
571 raise TypeError("unsupported auth algo %r" % integ)
572 self.esp_integ = None if integ == "NULL" else integ
573 self.esp_integ_alg = AUTH_ALGOS[integ]
575 def crypto_attr(self, key_len):
576 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
577 return (0x800E << 16 | key_len << 3, 12)
579 raise Exception("unsupported attribute type")
581 def ike_crypto_attr(self):
582 return self.crypto_attr(self.ike_crypto_key_len)
584 def esp_crypto_attr(self):
585 return self.crypto_attr(self.esp_crypto_key_len)
587 def compute_nat_sha1(self, ip, port, rspi=None):
590 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
591 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
593 return digest.finalize()
595 def clone(self, test, **kwargs):
596 if "spi" not in kwargs:
597 kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
598 if "nonce" not in kwargs:
599 kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
601 if "local_ts" not in kwargs:
602 kwargs["local_ts"] = self.child_sas[0].local_ts
603 if "remote_ts" not in kwargs:
604 kwargs["remote_ts"] = self.child_sas[0].remote_ts
607 is_initiator=self.is_initiator,
610 id_type=self.id_type,
611 auth_data=self.auth_data,
612 auth_method=self.auth_method,
613 priv_key=self.priv_key,
616 udp_encap=self.udp_encap,
621 crypto=self.ike_crypto,
622 crypto_key_len=self.ike_crypto_key_len,
623 integ=self.ike_integ,
628 crypto=self.esp_crypto,
629 crypto_key_len=self.esp_crypto_key_len,
630 integ=self.esp_integ,
635 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
636 class IkePeer(VppTestCase):
637 """common class for initiator and responder"""
641 import scapy.contrib.ikev2 as _ikev2
643 globals()["ikev2"] = _ikev2
644 super(IkePeer, cls).setUpClass()
645 cls.create_pg_interfaces(range(2))
646 for i in cls.pg_interfaces:
654 def tearDownClass(cls):
655 super(IkePeer, cls).tearDownClass()
658 super(IkePeer, self).tearDown()
659 if self.del_sa_from_responder:
660 self.initiate_del_sa_from_responder()
662 self.initiate_del_sa_from_initiator()
663 r = self.vapi.ikev2_sa_dump()
664 self.assertEqual(len(r), 0)
665 r = self.vapi.ikev2_sa_v2_dump()
666 self.assertEqual(len(r), 0)
667 sas = self.vapi.ipsec_sa_dump()
668 self.assertEqual(len(sas), 0)
669 self.p.remove_vpp_config()
670 self.assertIsNone(self.p.query_vpp_config())
673 super(IkePeer, self).setUp()
675 self.p.add_vpp_config()
676 self.assertIsNotNone(self.p.query_vpp_config())
677 if self.sa.is_initiator:
678 self.sa.generate_dh_data()
679 self.vapi.cli("ikev2 set logging level 4")
680 self.vapi.cli("event-lo clear")
682 def assert_counter(self, count, name, version="ip4"):
683 node_name = "/err/ikev2-%s/" % version + name
684 self.assertEqual(count, self.statistics.get_err_counter(node_name))
686 def create_rekey_request(self, kex=False):
687 sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
688 header = ikev2.IKEv2(
689 init_SPI=self.sa.ispi,
690 resp_SPI=self.sa.rspi,
691 id=self.sa.new_msg_id(),
693 exch_type="CREATE_CHILD_SA",
696 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
697 return self.create_packet(
698 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
701 def create_sa_rekey_request(self, **kwargs):
702 sa = self.generate_sa_init_payload(**kwargs)
703 header = ikev2.IKEv2(
704 init_SPI=self.sa.ispi,
705 resp_SPI=self.sa.rspi,
706 id=self.sa.new_msg_id(),
708 exch_type="CREATE_CHILD_SA",
710 ike_msg = self.encrypt_ike_msg(header, sa, "SA")
711 return self.create_packet(
712 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
715 def create_empty_request(self):
716 header = ikev2.IKEv2(
717 init_SPI=self.sa.ispi,
718 resp_SPI=self.sa.rspi,
719 id=self.sa.new_msg_id(),
721 exch_type="INFORMATIONAL",
722 next_payload="Encrypted",
725 msg = self.encrypt_ike_msg(header, b"", None)
726 return self.create_packet(
727 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
731 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
734 src_ip = src_if.remote_ip6
735 dst_ip = src_if.local_ip6
738 src_ip = src_if.remote_ip4
739 dst_ip = src_if.local_ip4
742 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
743 / ip_layer(src=src_ip, dst=dst_ip)
744 / UDP(sport=sport, dport=dport)
747 # insert non ESP marker
748 res = res / Raw(b"\x00" * 4)
751 def verify_udp(self, udp):
752 self.assertEqual(udp.sport, self.sa.sport)
753 self.assertEqual(udp.dport, self.sa.dport)
755 def get_ike_header(self, packet):
757 ih = packet[ikev2.IKEv2]
758 ih = self.verify_and_remove_non_esp_marker(ih)
759 except IndexError as e:
760 # this is a workaround for getting IKEv2 layer as both ikev2 and
761 # ipsec register for port 4500
763 ih = self.verify_and_remove_non_esp_marker(esp)
764 self.assertEqual(ih.version, 0x20)
765 self.assertNotIn("Version", ih.flags)
768 def verify_and_remove_non_esp_marker(self, packet):
770 # if we are in nat traversal mode check for non esp marker
773 self.assertEqual(data[:4], b"\x00" * 4)
774 return ikev2.IKEv2(data[4:])
778 def encrypt_ike_msg(self, header, plain, first_payload):
779 if self.sa.ike_crypto == "AES-GCM-16ICV":
780 data = self.sa.ike_crypto_alg.pad(raw(plain))
785 + len(ikev2.IKEv2_payload_Encrypted())
787 tlen = plen + len(ikev2.IKEv2())
790 sk_p = ikev2.IKEv2_payload_Encrypted(
791 next_payload=first_payload, length=plen
795 encr = self.sa.encrypt(raw(plain), raw(res))
796 sk_p = ikev2.IKEv2_payload_Encrypted(
797 next_payload=first_payload, length=plen, load=encr
801 encr = self.sa.encrypt(raw(plain))
802 trunc_len = self.sa.ike_integ_alg.trunc_len
803 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
804 tlen = plen + len(ikev2.IKEv2())
806 sk_p = ikev2.IKEv2_payload_Encrypted(
807 next_payload=first_payload, length=plen, load=encr
812 integ_data = raw(res)
813 hmac_data = self.sa.compute_hmac(
814 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
816 res = res / Raw(hmac_data[:trunc_len])
817 assert len(res) == tlen
820 def verify_udp_encap(self, ipsec_sa):
821 e = VppEnum.vl_api_ipsec_sad_flags_t
822 if self.sa.udp_encap or self.sa.natt:
823 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
825 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
827 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
828 sas = self.vapi.ipsec_sa_dump()
831 # after rekey there is a short period of time in which old
832 # inbound SA is still present
836 self.assertEqual(len(sas), sa_count)
837 if self.sa.is_initiator:
852 c = self.sa.child_sas[0]
854 self.verify_udp_encap(sa0)
855 self.verify_udp_encap(sa1)
856 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
857 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
858 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
860 if self.sa.esp_integ is None:
863 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
864 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
865 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
868 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
869 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
870 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
871 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
875 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
876 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
877 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
878 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
880 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
881 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
883 def verify_keymat(self, api_keys, keys, name):
884 km = getattr(keys, name)
885 api_km = getattr(api_keys, name)
886 api_km_len = getattr(api_keys, name + "_len")
887 self.assertEqual(len(km), api_km_len)
888 self.assertEqual(km, api_km[:api_km_len])
890 def verify_id(self, api_id, exp_id):
891 self.assertEqual(api_id.type, IDType.value(exp_id.type))
892 self.assertEqual(api_id.data_len, exp_id.data_len)
893 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
895 def verify_ike_sas(self, is_rekey=False):
896 r = self.vapi.ikev2_sa_dump()
903 self.assertEqual(len(r), sa_count)
904 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
905 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
907 if self.sa.is_initiator:
908 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
909 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
911 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
912 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
914 if self.sa.is_initiator:
915 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
916 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
918 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
919 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
920 self.verify_keymat(sa.keys, self.sa, "sk_d")
921 self.verify_keymat(sa.keys, self.sa, "sk_ai")
922 self.verify_keymat(sa.keys, self.sa, "sk_ar")
923 self.verify_keymat(sa.keys, self.sa, "sk_ei")
924 self.verify_keymat(sa.keys, self.sa, "sk_er")
925 self.verify_keymat(sa.keys, self.sa, "sk_pi")
926 self.verify_keymat(sa.keys, self.sa, "sk_pr")
928 self.assertEqual(sa.i_id.type, self.sa.id_type)
929 self.assertEqual(sa.r_id.type, self.sa.id_type)
930 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
931 self.assertEqual(sa.r_id.data_len, len(self.idr))
932 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
933 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
935 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
936 self.verify_nonce(n, self.sa.i_nonce)
937 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
938 self.verify_nonce(n, self.sa.r_nonce)
940 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
942 self.assertEqual(len(r), 0)
945 self.assertEqual(len(r), 1)
947 self.assertEqual(csa.sa_index, sa.sa_index)
948 c = self.sa.child_sas[0]
949 if hasattr(c, "sk_ai"):
950 self.verify_keymat(csa.keys, c, "sk_ai")
951 self.verify_keymat(csa.keys, c, "sk_ar")
952 self.verify_keymat(csa.keys, c, "sk_ei")
953 self.verify_keymat(csa.keys, c, "sk_er")
954 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
955 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
957 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
960 r = self.vapi.ikev2_traffic_selector_dump(
961 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
963 self.assertEqual(len(r), 1)
965 self.verify_ts(r[0].ts, tsi[0], True)
967 r = self.vapi.ikev2_traffic_selector_dump(
968 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
970 self.assertEqual(len(r), 1)
971 self.verify_ts(r[0].ts, tsr[0], False)
973 def verify_ike_sas_v2(self):
974 r = self.vapi.ikev2_sa_v2_dump()
975 self.assertEqual(len(r), 1)
977 self.assertEqual(self.p.profile_name, sa.profile_name)
978 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
979 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
981 if self.sa.is_initiator:
982 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
983 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
985 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
986 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
988 if self.sa.is_initiator:
989 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
990 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
992 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
993 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
994 self.verify_keymat(sa.keys, self.sa, "sk_d")
995 self.verify_keymat(sa.keys, self.sa, "sk_ai")
996 self.verify_keymat(sa.keys, self.sa, "sk_ar")
997 self.verify_keymat(sa.keys, self.sa, "sk_ei")
998 self.verify_keymat(sa.keys, self.sa, "sk_er")
999 self.verify_keymat(sa.keys, self.sa, "sk_pi")
1000 self.verify_keymat(sa.keys, self.sa, "sk_pr")
1002 self.assertEqual(sa.i_id.type, self.sa.id_type)
1003 self.assertEqual(sa.r_id.type, self.sa.id_type)
1004 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
1005 self.assertEqual(sa.r_id.data_len, len(self.idr))
1006 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
1007 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
1009 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
1010 self.assertEqual(len(r), 1)
1012 self.assertEqual(csa.sa_index, sa.sa_index)
1013 c = self.sa.child_sas[0]
1014 if hasattr(c, "sk_ai"):
1015 self.verify_keymat(csa.keys, c, "sk_ai")
1016 self.verify_keymat(csa.keys, c, "sk_ar")
1017 self.verify_keymat(csa.keys, c, "sk_ei")
1018 self.verify_keymat(csa.keys, c, "sk_er")
1019 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
1020 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
1022 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1025 r = self.vapi.ikev2_traffic_selector_dump(
1026 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1028 self.assertEqual(len(r), 1)
1030 self.verify_ts(r[0].ts, tsi[0], True)
1032 r = self.vapi.ikev2_traffic_selector_dump(
1033 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1035 self.assertEqual(len(r), 1)
1036 self.verify_ts(r[0].ts, tsr[0], False)
1038 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
1039 self.verify_nonce(n, self.sa.i_nonce)
1040 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
1041 self.verify_nonce(n, self.sa.r_nonce)
1043 def verify_nonce(self, api_nonce, nonce):
1044 self.assertEqual(api_nonce.data_len, len(nonce))
1045 self.assertEqual(api_nonce.nonce, nonce)
1047 def verify_ts(self, api_ts, ts, is_initiator):
1049 self.assertTrue(api_ts.is_local)
1051 self.assertFalse(api_ts.is_local)
1053 if self.p.ts_is_ip4:
1054 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
1055 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
1057 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
1058 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
1059 self.assertEqual(api_ts.start_port, ts.start_port)
1060 self.assertEqual(api_ts.end_port, ts.end_port)
1061 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
1064 class TemplateInitiator(IkePeer):
1065 """initiator test template"""
1067 def initiate_del_sa_from_initiator(self):
1068 ispi = int.from_bytes(self.sa.ispi, "little")
1069 self.pg0.enable_capture()
1071 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
1072 capture = self.pg0.get_capture(1)
1073 ih = self.get_ike_header(capture[0])
1074 self.assertNotIn("Response", ih.flags)
1075 self.assertIn("Initiator", ih.flags)
1076 self.assertEqual(ih.init_SPI, self.sa.ispi)
1077 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1078 plain = self.sa.hmac_and_decrypt(ih)
1079 d = ikev2.IKEv2_payload_Delete(plain)
1080 self.assertEqual(d.proto, 1) # proto=IKEv2
1081 header = ikev2.IKEv2(
1082 init_SPI=self.sa.ispi,
1083 resp_SPI=self.sa.rspi,
1085 exch_type="INFORMATIONAL",
1087 next_payload="Encrypted",
1089 resp = self.encrypt_ike_msg(header, b"", None)
1090 self.send_and_assert_no_replies(self.pg0, resp)
1092 def verify_del_sa(self, packet):
1093 ih = self.get_ike_header(packet)
1094 self.assertEqual(ih.id, self.sa.msg_id)
1095 self.assertEqual(ih.exch_type, 37) # exchange informational
1096 self.assertIn("Response", ih.flags)
1097 self.assertIn("Initiator", ih.flags)
1098 plain = self.sa.hmac_and_decrypt(ih)
1099 self.assertEqual(plain, b"")
1101 def initiate_del_sa_from_responder(self):
1102 header = ikev2.IKEv2(
1103 init_SPI=self.sa.ispi,
1104 resp_SPI=self.sa.rspi,
1105 exch_type="INFORMATIONAL",
1106 id=self.sa.new_msg_id(),
1108 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1109 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1110 packet = self.create_packet(
1111 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1113 self.pg0.add_stream(packet)
1114 self.pg0.enable_capture()
1116 capture = self.pg0.get_capture(1)
1117 self.verify_del_sa(capture[0])
1120 def find_notify_payload(packet, notify_type):
1121 n = packet[ikev2.IKEv2_payload_Notify]
1122 while n is not None:
1123 if n.type == notify_type:
1128 def verify_nat_detection(self, packet):
1135 # NAT_DETECTION_SOURCE_IP
1136 s = self.find_notify_payload(packet, 16388)
1137 self.assertIsNotNone(s)
1138 src_sha = self.sa.compute_nat_sha1(
1139 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
1141 self.assertEqual(s.load, src_sha)
1143 # NAT_DETECTION_DESTINATION_IP
1144 s = self.find_notify_payload(packet, 16389)
1145 self.assertIsNotNone(s)
1146 dst_sha = self.sa.compute_nat_sha1(
1147 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1149 self.assertEqual(s.load, dst_sha)
1151 def verify_sa_init_request(self, packet):
1153 self.sa.dport = udp.sport
1154 ih = packet[ikev2.IKEv2]
1155 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1156 self.assertEqual(ih.exch_type, 34) # SA_INIT
1157 self.sa.ispi = ih.init_SPI
1158 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1159 self.assertIn("Initiator", ih.flags)
1160 self.assertNotIn("Response", ih.flags)
1161 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1162 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1164 prop = packet[ikev2.IKEv2_payload_Proposal]
1165 self.assertEqual(prop.proto, 1) # proto = ikev2
1166 self.assertEqual(prop.proposal, 1)
1167 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1169 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1171 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1172 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1173 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1174 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1176 self.verify_nat_detection(packet)
1177 self.sa.set_ike_props(
1178 crypto="AES-GCM-16ICV",
1181 prf="PRF_HMAC_SHA2_256",
1184 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1185 self.sa.generate_dh_data()
1186 self.sa.complete_dh_data()
1189 def update_esp_transforms(self, trans, sa):
1191 if trans.transform_type == 1: # ecryption
1192 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1193 elif trans.transform_type == 3: # integrity
1194 sa.esp_integ = INTEG_IDS[trans.transform_id]
1195 trans = trans.payload
1197 def verify_sa_auth_req(self, packet):
1199 self.sa.dport = udp.sport
1200 ih = self.get_ike_header(packet)
1201 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1202 self.assertEqual(ih.init_SPI, self.sa.ispi)
1203 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1204 self.assertIn("Initiator", ih.flags)
1205 self.assertNotIn("Response", ih.flags)
1208 self.verify_udp(udp)
1209 self.assertEqual(ih.id, self.sa.msg_id + 1)
1211 plain = self.sa.hmac_and_decrypt(ih)
1212 idi = ikev2.IKEv2_payload_IDi(plain)
1213 self.assertEqual(idi.load, self.sa.i_id)
1214 if self.no_idr_auth:
1215 self.assertEqual(idi.next_payload, 39) # AUTH
1217 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1218 self.assertEqual(idr.load, self.sa.r_id)
1219 prop = idi[ikev2.IKEv2_payload_Proposal]
1220 c = self.sa.child_sas[0]
1222 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1224 def send_init_response(self):
1225 tr_attr = self.sa.ike_crypto_attr()
1227 ikev2.IKEv2_payload_Transform(
1228 transform_type="Encryption",
1229 transform_id=self.sa.ike_crypto,
1231 key_length=tr_attr[0],
1233 / ikev2.IKEv2_payload_Transform(
1234 transform_type="Integrity", transform_id=self.sa.ike_integ
1236 / ikev2.IKEv2_payload_Transform(
1237 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1239 / ikev2.IKEv2_payload_Transform(
1240 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1243 props = ikev2.IKEv2_payload_Proposal(
1244 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1247 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1249 dst_address = b"\x0a\x0a\x0a\x0a"
1251 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1252 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1253 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1255 self.sa.init_resp_packet = (
1257 init_SPI=self.sa.ispi,
1258 resp_SPI=self.sa.rspi,
1259 exch_type="IKE_SA_INIT",
1262 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1263 / ikev2.IKEv2_payload_KE(
1264 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1266 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1267 / ikev2.IKEv2_payload_Notify(
1268 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1270 / ikev2.IKEv2_payload_Notify(
1271 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1275 ike_msg = self.create_packet(
1277 self.sa.init_resp_packet,
1283 self.pg_send(self.pg0, ike_msg)
1284 capture = self.pg0.get_capture(1)
1285 self.verify_sa_auth_req(capture[0])
1287 def initiate_sa_init(self):
1288 self.pg0.enable_capture()
1290 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1292 capture = self.pg0.get_capture(1)
1293 self.verify_sa_init_request(capture[0])
1294 self.send_init_response()
1296 def send_auth_response(self):
1297 tr_attr = self.sa.esp_crypto_attr()
1299 ikev2.IKEv2_payload_Transform(
1300 transform_type="Encryption",
1301 transform_id=self.sa.esp_crypto,
1303 key_length=tr_attr[0],
1305 / ikev2.IKEv2_payload_Transform(
1306 transform_type="Integrity", transform_id=self.sa.esp_integ
1308 / ikev2.IKEv2_payload_Transform(
1309 transform_type="Extended Sequence Number", transform_id="No ESN"
1311 / ikev2.IKEv2_payload_Transform(
1312 transform_type="Extended Sequence Number", transform_id="ESN"
1316 c = self.sa.child_sas[0]
1317 props = ikev2.IKEv2_payload_Proposal(
1318 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1321 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1323 ikev2.IKEv2_payload_IDi(
1324 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1326 / ikev2.IKEv2_payload_IDr(
1327 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1329 / ikev2.IKEv2_payload_AUTH(
1331 auth_type=AuthMethod.value(self.sa.auth_method),
1332 load=self.sa.auth_data,
1334 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1335 / ikev2.IKEv2_payload_TSi(
1336 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1338 / ikev2.IKEv2_payload_TSr(
1339 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1341 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1344 header = ikev2.IKEv2(
1345 init_SPI=self.sa.ispi,
1346 resp_SPI=self.sa.rspi,
1347 id=self.sa.new_msg_id(),
1349 exch_type="IKE_AUTH",
1352 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1353 packet = self.create_packet(
1354 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1356 self.pg_send(self.pg0, packet)
1358 def test_initiator(self):
1359 self.initiate_sa_init()
1361 self.sa.calc_child_keys()
1362 self.send_auth_response()
1363 self.verify_ike_sas()
1364 self.verify_ike_sas_v2()
1367 class TemplateResponder(IkePeer):
1368 """responder test template"""
1370 def initiate_del_sa_from_responder(self):
1371 self.pg0.enable_capture()
1373 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1374 capture = self.pg0.get_capture(1)
1375 ih = self.get_ike_header(capture[0])
1376 self.assertNotIn("Response", ih.flags)
1377 self.assertNotIn("Initiator", ih.flags)
1378 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1379 plain = self.sa.hmac_and_decrypt(ih)
1380 d = ikev2.IKEv2_payload_Delete(plain)
1381 self.assertEqual(d.proto, 1) # proto=IKEv2
1382 self.assertEqual(ih.init_SPI, self.sa.ispi)
1383 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1384 header = ikev2.IKEv2(
1385 init_SPI=self.sa.ispi,
1386 resp_SPI=self.sa.rspi,
1387 flags="Initiator+Response",
1388 exch_type="INFORMATIONAL",
1390 next_payload="Encrypted",
1392 resp = self.encrypt_ike_msg(header, b"", None)
1393 self.send_and_assert_no_replies(self.pg0, resp)
1395 def verify_del_sa(self, packet):
1396 ih = self.get_ike_header(packet)
1397 self.assertEqual(ih.id, self.sa.msg_id)
1398 self.assertEqual(ih.exch_type, 37) # exchange informational
1399 self.assertIn("Response", ih.flags)
1400 self.assertNotIn("Initiator", ih.flags)
1401 self.assertEqual(ih.next_payload, 46) # Encrypted
1402 self.assertEqual(ih.init_SPI, self.sa.ispi)
1403 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1404 plain = self.sa.hmac_and_decrypt(ih)
1405 self.assertEqual(plain, b"")
1407 def initiate_del_sa_from_initiator(self):
1408 header = ikev2.IKEv2(
1409 init_SPI=self.sa.ispi,
1410 resp_SPI=self.sa.rspi,
1412 exch_type="INFORMATIONAL",
1413 id=self.sa.new_msg_id(),
1415 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1416 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1417 packet = self.create_packet(
1418 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1420 self.pg0.add_stream(packet)
1421 self.pg0.enable_capture()
1423 capture = self.pg0.get_capture(1)
1424 self.verify_del_sa(capture[0])
1426 def generate_sa_init_payload(
1427 self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
1429 tr_attr = self.sa.ike_crypto_attr()
1431 ikev2.IKEv2_payload_Transform(
1432 transform_type="Encryption",
1433 transform_id=self.sa.ike_crypto,
1435 key_length=tr_attr[0],
1437 / ikev2.IKEv2_payload_Transform(
1438 transform_type="Integrity", transform_id=self.sa.ike_integ
1440 / ikev2.IKEv2_payload_Transform(
1441 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1443 / ikev2.IKEv2_payload_Transform(
1444 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1451 pargs = {"SPI": spi, "SPIsize": len(spi)}
1452 props = ikev2.IKEv2_payload_Proposal(
1461 ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1462 / ikev2.IKEv2_payload_KE(
1463 next_payload="Nonce",
1464 group=self.sa.ike_dh,
1465 load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
1467 / ikev2.IKEv2_payload_Nonce(
1468 next_payload=next_payload,
1469 load=self.sa.i_nonce if nonce is None else nonce,
1473 def send_sa_init_req(self):
1474 self.sa.init_req_packet = ikev2.IKEv2(
1475 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1476 ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
1480 src_address = b"\x0a\x0a\x0a\x01"
1482 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1485 dst_address = b"\x0a\x0a\x0a\x0a"
1487 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1489 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1490 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1491 nat_src_detection = ikev2.IKEv2_payload_Notify(
1492 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1494 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1495 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1497 self.sa.init_req_packet = (
1498 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1501 ike_msg = self.create_packet(
1503 self.sa.init_req_packet,
1509 self.pg0.add_stream(ike_msg)
1510 self.pg0.enable_capture()
1512 capture = self.pg0.get_capture(1)
1513 self.verify_sa_init(capture[0])
1515 def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1516 tr_attr = self.sa.esp_crypto_attr()
1517 last_payload = last_payload or "Notify"
1520 ikev2.IKEv2_payload_Transform(
1521 transform_type="Encryption",
1522 transform_id=self.sa.esp_crypto,
1524 key_length=tr_attr[0],
1526 / ikev2.IKEv2_payload_Transform(
1527 transform_type="Integrity", transform_id=self.sa.esp_integ
1529 / ikev2.IKEv2_payload_Transform(
1530 transform_type="Extended Sequence Number", transform_id="No ESN"
1532 / ikev2.IKEv2_payload_Transform(
1533 transform_type="Extended Sequence Number", transform_id="ESN"
1539 trans /= ikev2.IKEv2_payload_Transform(
1540 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1543 c = self.sa.child_sas[0]
1544 props = ikev2.IKEv2_payload_Proposal(
1553 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1555 ikev2.IKEv2_payload_AUTH(
1557 auth_type=AuthMethod.value(self.sa.auth_method),
1558 load=self.sa.auth_data,
1560 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1561 / ikev2.IKEv2_payload_TSi(
1562 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1564 / ikev2.IKEv2_payload_TSr(
1565 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1570 first_payload = "Nonce"
1572 head = ikev2.IKEv2_payload_Nonce(
1573 load=self.sa.i_nonce, next_payload="KE"
1574 ) / ikev2.IKEv2_payload_KE(
1575 group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1578 head = ikev2.IKEv2_payload_Nonce(
1579 load=self.sa.i_nonce, next_payload="SA"
1584 / ikev2.IKEv2_payload_Notify(
1588 length=8 + len(c.ispi),
1589 next_payload="Notify",
1591 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1594 first_payload = "IDi"
1595 if self.no_idr_auth:
1596 ids = ikev2.IKEv2_payload_IDi(
1597 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1600 ids = ikev2.IKEv2_payload_IDi(
1601 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1602 ) / ikev2.IKEv2_payload_IDr(
1603 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1606 return plain, first_payload
1608 def send_sa_auth(self):
1609 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1610 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1611 header = ikev2.IKEv2(
1612 init_SPI=self.sa.ispi,
1613 resp_SPI=self.sa.rspi,
1614 id=self.sa.new_msg_id(),
1616 exch_type="IKE_AUTH",
1619 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1620 packet = self.create_packet(
1621 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1623 self.pg0.add_stream(packet)
1624 self.pg0.enable_capture()
1626 capture = self.pg0.get_capture(1)
1627 self.verify_sa_auth_resp(capture[0])
1629 def verify_sa_init(self, packet):
1630 ih = self.get_ike_header(packet)
1632 self.assertEqual(ih.id, self.sa.msg_id)
1633 self.assertEqual(ih.exch_type, 34)
1634 self.assertIn("Response", ih.flags)
1635 self.assertEqual(ih.init_SPI, self.sa.ispi)
1636 self.assertNotEqual(ih.resp_SPI, 0)
1637 self.sa.rspi = ih.resp_SPI
1639 sa = ih[ikev2.IKEv2_payload_SA]
1640 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1641 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1642 except IndexError as e:
1643 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1644 self.logger.error(ih.show())
1646 self.sa.complete_dh_data()
1650 def verify_sa_auth_resp(self, packet):
1651 ike = self.get_ike_header(packet)
1653 self.verify_udp(udp)
1654 self.assertEqual(ike.id, self.sa.msg_id)
1655 plain = self.sa.hmac_and_decrypt(ike)
1656 idr = ikev2.IKEv2_payload_IDr(plain)
1657 prop = idr[ikev2.IKEv2_payload_Proposal]
1658 self.assertEqual(prop.SPIsize, 4)
1659 self.sa.child_sas[0].rspi = prop.SPI
1660 self.sa.calc_child_keys()
1662 IKE_NODE_SUFFIX = "ip4"
1664 def verify_counters(self):
1665 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1666 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1667 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1669 r = self.vapi.ikev2_sa_dump()
1671 self.assertEqual(1, s.n_sa_auth_req)
1672 self.assertEqual(1, s.n_sa_init_req)
1674 r = self.vapi.ikev2_sa_v2_dump()
1676 self.assertEqual(1, s.n_sa_auth_req)
1677 self.assertEqual(1, s.n_sa_init_req)
1679 def test_responder(self):
1680 self.send_sa_init_req()
1682 self.verify_ipsec_sas()
1683 self.verify_ike_sas()
1684 self.verify_ike_sas_v2()
1685 self.verify_counters()
1688 class Ikev2Params(object):
1689 def config_params(self, params={}):
1690 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1691 ei = VppEnum.vl_api_ipsec_integ_alg_t
1693 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1694 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1695 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1696 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1697 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1698 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1699 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1700 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1701 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1702 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1705 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1707 self.vapi.cli("ikev2 dpd disable")
1708 self.del_sa_from_responder = (
1710 if "del_sa_from_responder" not in params
1711 else params["del_sa_from_responder"]
1713 i_natt = False if "i_natt" not in params else params["i_natt"]
1714 r_natt = False if "r_natt" not in params else params["r_natt"]
1715 self.p = Profile(self, "pr1")
1716 self.ip6 = False if "ip6" not in params else params["ip6"]
1718 if "auth" in params and params["auth"] == "rsa-sig":
1719 auth_method = "rsa-sig"
1720 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1721 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1723 client_file = work_dir + params["client-cert"]
1724 server_pem = open(work_dir + params["server-cert"]).read()
1725 client_priv = open(work_dir + params["client-key"]).read()
1726 client_priv = load_pem_private_key(
1727 str.encode(client_priv), None, default_backend()
1729 self.peer_cert = x509.load_pem_x509_certificate(
1730 str.encode(server_pem), default_backend()
1732 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1735 auth_data = b"$3cr3tpa$$w0rd"
1736 self.p.add_auth(method="shared-key", data=auth_data)
1737 auth_method = "shared-key"
1740 is_init = True if "is_initiator" not in params else params["is_initiator"]
1741 self.no_idr_auth = params.get("no_idr_in_auth", False)
1743 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1744 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1745 r_id = self.idr = idr["data"]
1746 i_id = self.idi = idi["data"]
1748 # scapy is initiator, VPP is responder
1749 self.p.add_local_id(**idr)
1750 self.p.add_remote_id(**idi)
1751 if self.no_idr_auth:
1754 # VPP is initiator, scapy is responder
1755 self.p.add_local_id(**idi)
1756 if not self.no_idr_auth:
1757 self.p.add_remote_id(**idr)
1760 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1761 if "loc_ts" not in params
1762 else params["loc_ts"]
1765 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1766 if "rem_ts" not in params
1767 else params["rem_ts"]
1769 self.p.add_local_ts(**loc_ts)
1770 self.p.add_remote_ts(**rem_ts)
1771 if "responder" in params:
1772 self.p.add_responder(params["responder"])
1773 if "ike_transforms" in params:
1774 self.p.add_ike_transforms(params["ike_transforms"])
1775 if "esp_transforms" in params:
1776 self.p.add_esp_transforms(params["esp_transforms"])
1778 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1780 self.p.set_udp_encap(True)
1782 if "responder_hostname" in params:
1783 hn = params["responder_hostname"]
1784 self.p.add_responder_hostname(hn)
1786 # configure static dns record
1787 self.vapi.dns_name_server_add_del(
1788 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1790 self.vapi.dns_enable_disable(enable=1)
1792 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1799 is_initiator=is_init,
1800 id_type=self.p.local_id["id_type"],
1803 priv_key=client_priv,
1804 auth_method=auth_method,
1805 nonce=params.get("nonce"),
1806 auth_data=auth_data,
1807 udp_encap=udp_encap,
1808 local_ts=self.p.remote_ts,
1809 remote_ts=self.p.local_ts,
1814 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1817 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1819 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1822 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1825 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1828 self.sa.set_ike_props(
1829 crypto=ike_crypto[0],
1830 crypto_key_len=ike_crypto[1],
1832 prf="PRF_HMAC_SHA2_256",
1835 self.sa.set_esp_props(
1836 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1840 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1841 class TestApi(VppTestCase):
1842 """Test IKEV2 API"""
1845 def setUpClass(cls):
1846 super(TestApi, cls).setUpClass()
1849 def tearDownClass(cls):
1850 super(TestApi, cls).tearDownClass()
1853 super(TestApi, self).tearDown()
1854 self.p1.remove_vpp_config()
1855 self.p2.remove_vpp_config()
1856 r = self.vapi.ikev2_profile_dump()
1857 self.assertEqual(len(r), 0)
1859 def configure_profile(self, cfg):
1860 p = Profile(self, cfg["name"])
1861 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1862 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1863 p.add_local_ts(**cfg["loc_ts"])
1864 p.add_remote_ts(**cfg["rem_ts"])
1865 p.add_responder(cfg["responder"])
1866 p.add_ike_transforms(cfg["ike_ts"])
1867 p.add_esp_transforms(cfg["esp_ts"])
1868 p.add_auth(**cfg["auth"])
1869 p.set_udp_encap(cfg["udp_encap"])
1870 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1871 if "lifetime_data" in cfg:
1872 p.set_lifetime_data(cfg["lifetime_data"])
1873 if "tun_itf" in cfg:
1874 p.set_tunnel_interface(cfg["tun_itf"])
1875 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1880 def test_profile_api(self):
1881 """test profile dump API"""
1886 "start_addr": "3.3.3.2",
1887 "end_addr": "3.3.3.3",
1893 "start_addr": "4.5.76.80",
1894 "end_addr": "2.3.4.6",
1901 "start_addr": "ab::1",
1902 "end_addr": "ab::4",
1908 "start_addr": "cd::12",
1909 "end_addr": "cd::13",
1915 "natt_disabled": True,
1916 "loc_id": ("fqdn", b"vpp.home"),
1917 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1920 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1923 "crypto_key_size": 32,
1927 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1928 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1930 "ipsec_over_udp_port": 4501,
1933 "lifetime_maxdata": 20192,
1934 "lifetime_jitter": 9,
1940 "loc_id": ("ip4-addr", b"192.168.2.1"),
1941 "rem_id": ("ip6-addr", b"abcd::1"),
1944 "responder": {"sw_if_index": 4, "addr": "def::10"},
1947 "crypto_key_size": 16,
1951 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1952 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1954 "ipsec_over_udp_port": 4600,
1958 self.p1 = self.configure_profile(conf["p1"])
1959 self.p2 = self.configure_profile(conf["p2"])
1961 r = self.vapi.ikev2_profile_dump()
1962 self.assertEqual(len(r), 2)
1963 self.verify_profile(r[0].profile, conf["p1"])
1964 self.verify_profile(r[1].profile, conf["p2"])
1966 def verify_id(self, api_id, cfg_id):
1967 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1968 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1970 def verify_ts(self, api_ts, cfg_ts):
1971 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1972 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1973 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1974 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1975 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1977 def verify_responder(self, api_r, cfg_r):
1978 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1979 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1981 def verify_transforms(self, api_ts, cfg_ts):
1982 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1983 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1984 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1986 def verify_ike_transforms(self, api_ts, cfg_ts):
1987 self.verify_transforms(api_ts, cfg_ts)
1988 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1990 def verify_esp_transforms(self, api_ts, cfg_ts):
1991 self.verify_transforms(api_ts, cfg_ts)
1993 def verify_auth(self, api_auth, cfg_auth):
1994 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1995 self.assertEqual(api_auth.data, cfg_auth["data"])
1996 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1998 def verify_lifetime_data(self, p, ld):
1999 self.assertEqual(p.lifetime, ld["lifetime"])
2000 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
2001 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
2002 self.assertEqual(p.handover, ld["handover"])
2004 def verify_profile(self, ap, cp):
2005 self.assertEqual(ap.name, cp["name"])
2006 self.assertEqual(ap.udp_encap, cp["udp_encap"])
2007 self.verify_id(ap.loc_id, cp["loc_id"])
2008 self.verify_id(ap.rem_id, cp["rem_id"])
2009 self.verify_ts(ap.loc_ts, cp["loc_ts"])
2010 self.verify_ts(ap.rem_ts, cp["rem_ts"])
2011 self.verify_responder(ap.responder, cp["responder"])
2012 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
2013 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
2014 self.verify_auth(ap.auth, cp["auth"])
2015 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
2016 self.assertTrue(natt_dis == ap.natt_disabled)
2018 if "lifetime_data" in cp:
2019 self.verify_lifetime_data(ap, cp["lifetime_data"])
2020 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
2022 self.assertEqual(ap.tun_itf, cp["tun_itf"])
2024 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
2027 @tag_fixme_vpp_workers
2028 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
2029 """test responder - responder behind NAT"""
2031 IKE_NODE_SUFFIX = "ip4-natt"
2033 def config_tc(self):
2034 self.config_params({"r_natt": True})
2037 @tag_fixme_vpp_workers
2038 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
2039 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
2041 def config_tc(self):
2045 "is_initiator": False, # seen from test case perspective
2046 # thus vpp is initiator
2048 "sw_if_index": self.pg0.sw_if_index,
2049 "addr": self.pg0.remote_ip4,
2051 "ike-crypto": ("AES-GCM-16ICV", 32),
2052 "ike-integ": "NULL",
2053 "ike-dh": "3072MODPgr",
2055 "crypto_alg": 20, # "aes-gcm-16"
2056 "crypto_key_size": 256,
2057 "dh_group": 15, # "modp-3072"
2060 "crypto_alg": 12, # "aes-cbc"
2061 "crypto_key_size": 256,
2062 # "hmac-sha2-256-128"
2069 @tag_fixme_vpp_workers
2070 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
2071 """test ikev2 initiator - pre shared key auth"""
2073 def config_tc(self):
2076 "is_initiator": False, # seen from test case perspective
2077 # thus vpp is initiator
2078 "ike-crypto": ("AES-GCM-16ICV", 32),
2079 "ike-integ": "NULL",
2080 "ike-dh": "3072MODPgr",
2082 "crypto_alg": 20, # "aes-gcm-16"
2083 "crypto_key_size": 256,
2084 "dh_group": 15, # "modp-3072"
2087 "crypto_alg": 12, # "aes-cbc"
2088 "crypto_key_size": 256,
2089 # "hmac-sha2-256-128"
2092 "responder_hostname": {
2093 "hostname": "vpp.responder.org",
2094 "sw_if_index": self.pg0.sw_if_index,
2100 @tag_fixme_vpp_workers
2101 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
2102 """test initiator - request window size (1)"""
2104 def rekey_respond(self, req, update_child_sa_data):
2105 ih = self.get_ike_header(req)
2106 plain = self.sa.hmac_and_decrypt(ih)
2107 sa = ikev2.IKEv2_payload_SA(plain)
2108 if update_child_sa_data:
2109 prop = sa[ikev2.IKEv2_payload_Proposal]
2110 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2111 self.sa.r_nonce = self.sa.i_nonce
2112 self.sa.child_sas[0].ispi = prop.SPI
2113 self.sa.child_sas[0].rspi = prop.SPI
2114 self.sa.calc_child_keys()
2116 header = ikev2.IKEv2(
2117 init_SPI=self.sa.ispi,
2118 resp_SPI=self.sa.rspi,
2122 next_payload="Encrypted",
2124 resp = self.encrypt_ike_msg(header, sa, "SA")
2125 packet = self.create_packet(
2126 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2128 self.send_and_assert_no_replies(self.pg0, packet)
2130 def test_initiator(self):
2131 super(TestInitiatorRequestWindowSize, self).test_initiator()
2132 self.pg0.enable_capture()
2134 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2135 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2136 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2137 capture = self.pg0.get_capture(2)
2139 # reply in reverse order
2140 self.rekey_respond(capture[1], True)
2141 self.rekey_respond(capture[0], False)
2143 # verify that only the second request was accepted
2144 self.verify_ike_sas()
2145 self.verify_ike_sas_v2()
2146 self.verify_ipsec_sas(is_rekey=True)
2149 @tag_fixme_vpp_workers
2150 class TestInitiatorRekey(TestInitiatorPsk):
2151 """test ikev2 initiator - rekey"""
2153 def rekey_from_initiator(self):
2154 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2155 self.pg0.enable_capture()
2157 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2158 capture = self.pg0.get_capture(1)
2159 ih = self.get_ike_header(capture[0])
2160 self.assertEqual(ih.exch_type, 36) # CHILD_SA
2161 self.assertNotIn("Response", ih.flags)
2162 self.assertIn("Initiator", ih.flags)
2163 plain = self.sa.hmac_and_decrypt(ih)
2164 sa = ikev2.IKEv2_payload_SA(plain)
2165 prop = sa[ikev2.IKEv2_payload_Proposal]
2166 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2167 self.sa.r_nonce = self.sa.i_nonce
2168 # update new responder SPI
2169 self.sa.child_sas[0].ispi = prop.SPI
2170 self.sa.child_sas[0].rspi = prop.SPI
2171 self.sa.calc_child_keys()
2172 header = ikev2.IKEv2(
2173 init_SPI=self.sa.ispi,
2174 resp_SPI=self.sa.rspi,
2178 next_payload="Encrypted",
2180 resp = self.encrypt_ike_msg(header, sa, "SA")
2181 packet = self.create_packet(
2182 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2184 self.send_and_assert_no_replies(self.pg0, packet)
2186 def test_initiator(self):
2187 super(TestInitiatorRekey, self).test_initiator()
2188 self.rekey_from_initiator()
2189 self.verify_ike_sas()
2190 self.verify_ike_sas_v2()
2191 self.verify_ipsec_sas(is_rekey=True)
2194 @tag_fixme_vpp_workers
2195 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2196 """test ikev2 initiator - delete IKE SA from responder"""
2198 def config_tc(self):
2201 "del_sa_from_responder": True,
2202 "is_initiator": False, # seen from test case perspective
2203 # thus vpp is initiator
2205 "sw_if_index": self.pg0.sw_if_index,
2206 "addr": self.pg0.remote_ip4,
2208 "ike-crypto": ("AES-GCM-16ICV", 32),
2209 "ike-integ": "NULL",
2210 "ike-dh": "3072MODPgr",
2212 "crypto_alg": 20, # "aes-gcm-16"
2213 "crypto_key_size": 256,
2214 "dh_group": 15, # "modp-3072"
2217 "crypto_alg": 12, # "aes-cbc"
2218 "crypto_key_size": 256,
2219 # "hmac-sha2-256-128"
2222 "no_idr_in_auth": True,
2227 @tag_fixme_vpp_workers
2228 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2229 """test ikev2 responder - initiator behind NAT"""
2231 IKE_NODE_SUFFIX = "ip4-natt"
2233 def config_tc(self):
2234 self.config_params({"i_natt": True})
2237 @tag_fixme_vpp_workers
2238 class TestResponderPsk(TemplateResponder, Ikev2Params):
2239 """test ikev2 responder - pre shared key auth"""
2241 def config_tc(self):
2242 self.config_params()
2245 @tag_fixme_vpp_workers
2246 class TestResponderDpd(TestResponderPsk):
2248 Dead peer detection test
2251 def config_tc(self):
2252 self.config_params({"dpd_disabled": False})
2257 def test_responder(self):
2258 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2259 super(TestResponderDpd, self).test_responder()
2260 self.pg0.enable_capture()
2262 # capture empty request but don't reply
2263 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2264 ih = self.get_ike_header(capture[0])
2265 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2266 plain = self.sa.hmac_and_decrypt(ih)
2267 self.assertEqual(plain, b"")
2268 # wait for SA expiration
2270 ike_sas = self.vapi.ikev2_sa_dump()
2271 self.assertEqual(len(ike_sas), 0)
2272 ike_sas = self.vapi.ikev2_sa_v2_dump()
2273 self.assertEqual(len(ike_sas), 0)
2274 ipsec_sas = self.vapi.ipsec_sa_dump()
2275 self.assertEqual(len(ipsec_sas), 0)
2278 @tag_fixme_vpp_workers
2279 class TestResponderRekey(TestResponderPsk):
2280 """test ikev2 responder - rekey"""
2284 def send_rekey_from_initiator(self):
2286 self.sa.generate_dh_data()
2287 packet = self.create_rekey_request(kex=self.WITH_KEX)
2288 self.pg0.add_stream(packet)
2289 self.pg0.enable_capture()
2291 capture = self.pg0.get_capture(1)
2294 def process_rekey_response(self, capture):
2295 ih = self.get_ike_header(capture[0])
2296 plain = self.sa.hmac_and_decrypt(ih)
2297 sa = ikev2.IKEv2_payload_SA(plain)
2298 prop = sa[ikev2.IKEv2_payload_Proposal]
2299 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2300 # update new responder SPI
2301 self.sa.child_sas[0].rspi = prop.SPI
2303 self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2304 self.sa.complete_dh_data()
2305 self.sa.calc_child_keys(kex=self.WITH_KEX)
2307 def test_responder(self):
2308 super(TestResponderRekey, self).test_responder()
2309 self.process_rekey_response(self.send_rekey_from_initiator())
2310 self.verify_ike_sas()
2311 self.verify_ike_sas_v2()
2312 self.verify_ipsec_sas(is_rekey=True)
2313 self.assert_counter(1, "rekey_req", "ip4")
2314 r = self.vapi.ikev2_sa_dump()
2315 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2316 r = self.vapi.ikev2_sa_v2_dump()
2317 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2320 @tag_fixme_vpp_workers
2321 class TestResponderRekeyRepeat(TestResponderRekey):
2322 """test ikev2 responder - rekey repeat"""
2324 def test_responder(self):
2325 super(TestResponderRekeyRepeat, self).test_responder()
2326 # rekey request is not accepted until old IPsec SA is expired
2327 capture = self.send_rekey_from_initiator()
2328 ih = self.get_ike_header(capture[0])
2329 plain = self.sa.hmac_and_decrypt(ih)
2330 notify = ikev2.IKEv2_payload_Notify(plain)
2331 self.assertEqual(notify.type, 43)
2332 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2333 # rekey request is accepted after old IPsec SA was expired
2335 if len(self.vapi.ipsec_sa_dump()) != 3:
2339 self.fail("old IPsec SA not expired")
2340 self.process_rekey_response(self.send_rekey_from_initiator())
2341 self.verify_ike_sas()
2342 self.verify_ike_sas_v2()
2343 self.verify_ipsec_sas(sa_count=3)
2346 @tag_fixme_vpp_workers
2347 class TestResponderRekeyKEX(TestResponderRekey):
2348 """test ikev2 responder - rekey with key exchange"""
2353 @tag_fixme_vpp_workers
2354 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2355 """test ikev2 responder - rekey repeat with key exchange"""
2360 @tag_fixme_vpp_workers
2361 class TestResponderRekeySA(TestResponderPsk):
2362 """test ikev2 responder - rekey IKE SA"""
2364 def send_rekey_from_initiator(self, newsa):
2365 packet = self.create_sa_rekey_request(
2367 dh_pub_key=newsa.my_dh_pub_key,
2368 nonce=newsa.i_nonce,
2370 self.pg0.add_stream(packet)
2371 self.pg0.enable_capture()
2373 capture = self.pg0.get_capture(1)
2376 def process_rekey_response(self, newsa, capture):
2377 ih = self.get_ike_header(capture[0])
2378 plain = self.sa.hmac_and_decrypt(ih)
2379 sa = ikev2.IKEv2_payload_SA(plain)
2380 prop = sa[ikev2.IKEv2_payload_Proposal]
2381 newsa.rspi = prop.SPI
2382 newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2383 newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2384 newsa.complete_dh_data()
2385 newsa.calc_keys(sk_d=self.sa.sk_d)
2386 newsa.child_sas = self.sa.child_sas
2387 self.sa.child_sas = []
2389 def test_responder(self):
2390 super(TestResponderRekeySA, self).test_responder()
2391 newsa = self.sa.clone(self, spi=os.urandom(8))
2392 newsa.generate_dh_data()
2393 capture = self.send_rekey_from_initiator(newsa)
2394 self.process_rekey_response(newsa, capture)
2395 self.verify_ike_sas(is_rekey=True)
2396 self.assert_counter(1, "rekey_req", "ip4")
2397 r = self.vapi.ikev2_sa_dump()
2398 self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
2399 self.initiate_del_sa_from_initiator()
2401 self.verify_ike_sas()
2404 @tag_fixme_ubuntu2204
2406 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2407 """test ikev2 responder - non-default table id"""
2410 def setUpClass(cls):
2411 import scapy.contrib.ikev2 as _ikev2
2413 globals()["ikev2"] = _ikev2
2414 super(IkePeer, cls).setUpClass()
2415 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2419 cls.create_pg_interfaces(range(1))
2420 cls.vapi.cli("ip table add 1")
2421 cls.vapi.cli("set interface ip table pg0 1")
2422 for i in cls.pg_interfaces:
2429 def config_tc(self):
2430 self.config_params({"dpd_disabled": False})
2432 def test_responder(self):
2433 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2434 super(TestResponderVrf, self).test_responder()
2435 self.pg0.enable_capture()
2437 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2438 ih = self.get_ike_header(capture[0])
2439 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2440 plain = self.sa.hmac_and_decrypt(ih)
2441 self.assertEqual(plain, b"")
2444 @tag_fixme_vpp_workers
2445 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2446 """test ikev2 responder - cert based auth"""
2448 def config_tc(self):
2453 "server-key": "server-key.pem",
2454 "client-key": "client-key.pem",
2455 "client-cert": "client-cert.pem",
2456 "server-cert": "server-cert.pem",
2461 @tag_fixme_vpp_workers
2462 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2463 TemplateResponder, Ikev2Params
2466 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2469 def config_tc(self):
2472 "ike-crypto": ("AES-CBC", 16),
2473 "ike-integ": "SHA2-256-128",
2474 "esp-crypto": ("AES-CBC", 24),
2475 "esp-integ": "SHA2-384-192",
2476 "ike-dh": "2048MODPgr",
2477 "nonce": os.urandom(256),
2478 "no_idr_in_auth": True,
2483 @tag_fixme_vpp_workers
2484 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2485 TemplateResponder, Ikev2Params
2489 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2492 def config_tc(self):
2495 "ike-crypto": ("AES-CBC", 32),
2496 "ike-integ": "SHA2-256-128",
2497 "esp-crypto": ("AES-GCM-16ICV", 32),
2498 "esp-integ": "NULL",
2499 "ike-dh": "3072MODPgr",
2504 @tag_fixme_vpp_workers
2505 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2510 IKE_NODE_SUFFIX = "ip6"
2512 def config_tc(self):
2515 "del_sa_from_responder": True,
2518 "ike-crypto": ("AES-GCM-16ICV", 32),
2519 "ike-integ": "NULL",
2520 "ike-dh": "2048MODPgr",
2521 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2522 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2527 @tag_fixme_vpp_workers
2528 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2530 Test for keep alive messages
2533 def send_empty_req_from_responder(self):
2534 packet = self.create_empty_request()
2535 self.pg0.add_stream(packet)
2536 self.pg0.enable_capture()
2538 capture = self.pg0.get_capture(1)
2539 ih = self.get_ike_header(capture[0])
2540 self.assertEqual(ih.id, self.sa.msg_id)
2541 plain = self.sa.hmac_and_decrypt(ih)
2542 self.assertEqual(plain, b"")
2543 self.assert_counter(1, "keepalive", "ip4")
2544 r = self.vapi.ikev2_sa_dump()
2545 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2546 r = self.vapi.ikev2_sa_v2_dump()
2547 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2549 def test_initiator(self):
2550 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2551 self.send_empty_req_from_responder()
2554 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2555 """malformed packet test"""
2560 def config_tc(self):
2561 self.config_params()
2563 def create_ike_init_msg(self, length=None, payload=None):
2566 init_SPI="\x11" * 8,
2568 exch_type="IKE_SA_INIT",
2570 if payload is not None:
2572 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2574 def verify_bad_packet_length(self):
2575 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2576 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2577 self.assert_counter(self.pkt_count, "bad_length")
2579 def verify_bad_sa_payload_length(self):
2580 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2581 ike_msg = self.create_ike_init_msg(payload=p)
2582 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2583 self.assert_counter(self.pkt_count, "malformed_packet")
2585 def test_responder(self):
2586 self.pkt_count = 254
2587 self.verify_bad_packet_length()
2588 self.verify_bad_sa_payload_length()
2591 if __name__ == "__main__":
2592 unittest.main(testRunner=VppTestRunner)