4 from socket import inet_pton
5 from cryptography import x509
6 from cryptography.hazmat.backends import default_backend
7 from cryptography.hazmat.primitives import hashes, hmac
8 from cryptography.hazmat.primitives.asymmetric import dh, padding
9 from cryptography.hazmat.primitives.serialization import load_pem_private_key
10 from cryptography.hazmat.primitives.ciphers import (
15 from ipaddress import IPv4Address, IPv6Address, ip_address
17 from config import config
18 from scapy.layers.ipsec import ESP
19 from scapy.layers.inet import IP, UDP, Ether
20 from scapy.layers.inet6 import IPv6
21 from scapy.packet import raw, Raw
22 from scapy.utils import long_converter
23 from framework import VppTestCase
24 from asfframework import (
25 tag_fixme_vpp_workers,
32 from vpp_ikev2 import Profile, IDType, AuthMethod
33 from vpp_papi import VppEnum
40 KEY_PAD = b"Key Pad for IKEv2"
47 # tuple structure is (p, g, key_len)
52 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
70 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
71 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
72 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
73 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
74 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
75 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
76 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
77 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
78 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
79 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
80 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
81 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
82 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
83 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
84 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
85 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
93 class CryptoAlgo(object):
94 def __init__(self, name, cipher, mode):
98 if self.cipher is not None:
99 self.bs = self.cipher.block_size // 8
101 if self.name == "AES-GCM-16ICV":
102 self.iv_len = GCM_IV_SIZE
104 self.iv_len = self.bs
106 def encrypt(self, data, key, aad=None):
107 iv = os.urandom(self.iv_len)
110 self.cipher(key), self.mode(iv), default_backend()
112 return iv + encryptor.update(data) + encryptor.finalize()
114 salt = key[-SALT_SIZE:]
117 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
119 encryptor.authenticate_additional_data(aad)
120 data = encryptor.update(data) + encryptor.finalize()
121 data += encryptor.tag[:GCM_ICV_SIZE]
124 def decrypt(self, data, key, aad=None, icv=None):
126 iv = data[: self.iv_len]
127 ct = data[self.iv_len :]
129 algorithms.AES(key), self.mode(iv), default_backend()
131 return decryptor.update(ct) + decryptor.finalize()
133 salt = key[-SALT_SIZE:]
134 nonce = salt + data[:GCM_IV_SIZE]
135 ct = data[GCM_IV_SIZE:]
136 key = key[:-SALT_SIZE]
138 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
140 decryptor.authenticate_additional_data(aad)
141 return decryptor.update(ct) + decryptor.finalize()
144 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
145 data = data + b"\x00" * (pad_len - 1)
146 return data + bytes([pad_len - 1])
149 class AuthAlgo(object):
150 def __init__(self, name, mac, mod, key_len, trunc_len=None):
154 self.key_len = key_len
155 self.trunc_len = trunc_len or key_len
159 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
160 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
161 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
165 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
167 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
168 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
169 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
173 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
174 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
190 class IKEv2ChildSA(object):
191 def __init__(self, local_ts, remote_ts, is_initiator):
199 self.local_ts = local_ts
200 self.remote_ts = remote_ts
203 class IKEv2SA(object):
210 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
216 auth_method="shared-key",
222 self.udp_encap = udp_encap
232 self.dh_params = None
234 self.priv_key = priv_key
235 self.is_initiator = is_initiator
236 nonce = nonce or os.urandom(32)
237 self.auth_data = auth_data
240 if isinstance(id_type, str):
241 self.id_type = IDType.value(id_type)
243 self.id_type = id_type
244 self.auth_method = auth_method
245 if self.is_initiator:
246 self.rspi = 8 * b"\x00"
251 self.ispi = 8 * b"\x00"
253 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
255 def new_msg_id(self):
260 def my_dh_pub_key(self):
261 if self.is_initiator:
262 return self.i_dh_data
263 return self.r_dh_data
266 def peer_dh_pub_key(self):
267 if self.is_initiator:
268 return self.r_dh_data
269 return self.i_dh_data
273 return self.i_natt or self.r_natt
275 def compute_secret(self):
276 priv = self.dh_private_key
277 peer = self.peer_dh_pub_key
278 p, g, l = self.ike_group
280 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
283 def generate_dh_data(self):
285 if self.ike_dh not in DH:
286 raise NotImplementedError("%s not in DH group" % self.ike_dh)
288 if self.dh_params is None:
289 dhg = DH[self.ike_dh]
290 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
291 self.dh_params = pn.parameters(default_backend())
293 priv = self.dh_params.generate_private_key()
294 pub = priv.public_key()
295 x = priv.private_numbers().x
296 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
297 y = pub.public_numbers().y
299 if self.is_initiator:
300 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
302 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
304 def complete_dh_data(self):
305 self.dh_shared_secret = self.compute_secret()
307 def calc_child_keys(self, kex=False):
308 prf = self.ike_prf_alg.mod()
309 s = self.i_nonce + self.r_nonce
311 s = self.dh_shared_secret + s
312 c = self.child_sas[0]
314 encr_key_len = self.esp_crypto_key_len
315 integ_key_len = self.esp_integ_alg.key_len
316 salt_len = 0 if integ_key_len else 4
318 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
319 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
322 c.sk_ei = keymat[pos : pos + encr_key_len]
326 c.sk_ai = keymat[pos : pos + integ_key_len]
329 c.salt_ei = keymat[pos : pos + salt_len]
332 c.sk_er = keymat[pos : pos + encr_key_len]
336 c.sk_ar = keymat[pos : pos + integ_key_len]
339 c.salt_er = keymat[pos : pos + salt_len]
342 def calc_prfplus(self, prf, key, seed, length):
346 while len(r) < length and x < 255:
351 s = s + seed + bytes([x])
352 t = self.calc_prf(prf, key, s)
360 def calc_prf(self, prf, key, data):
361 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
365 def calc_keys(self, sk_d=None):
366 prf = self.ike_prf_alg.mod()
368 # SKEYSEED = prf(Ni | Nr, g^ir)
369 self.skeyseed = self.calc_prf(
370 prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
373 # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
374 self.skeyseed = self.calc_prf(
375 prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
378 # calculate S = Ni | Nr | SPIi SPIr
379 s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
381 prf_key_trunc = self.ike_prf_alg.trunc_len
382 encr_key_len = self.ike_crypto_key_len
383 tr_prf_key_len = self.ike_prf_alg.key_len
384 integ_key_len = self.ike_integ_alg.key_len
385 if integ_key_len == 0:
397 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
400 self.sk_d = keymat[: pos + prf_key_trunc]
403 self.sk_ai = keymat[pos : pos + integ_key_len]
405 self.sk_ar = keymat[pos : pos + integ_key_len]
408 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
409 pos += encr_key_len + salt_size
410 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
411 pos += encr_key_len + salt_size
413 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
414 pos += tr_prf_key_len
415 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
417 def generate_authmsg(self, prf, packet):
418 if self.is_initiator:
426 data = bytes([self.id_type, 0, 0, 0]) + id
427 id_hash = self.calc_prf(prf, key, data)
428 return packet + nonce + id_hash
431 prf = self.ike_prf_alg.mod()
432 if self.is_initiator:
433 packet = self.init_req_packet
435 packet = self.init_resp_packet
436 authmsg = self.generate_authmsg(prf, raw(packet))
437 if self.auth_method == "shared-key":
438 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
439 self.auth_data = self.calc_prf(prf, psk, authmsg)
440 elif self.auth_method == "rsa-sig":
441 self.auth_data = self.priv_key.sign(
442 authmsg, padding.PKCS1v15(), hashes.SHA1()
445 raise TypeError("unknown auth method type!")
447 def encrypt(self, data, aad=None):
448 data = self.ike_crypto_alg.pad(data)
449 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
452 def peer_authkey(self):
453 if self.is_initiator:
458 def my_authkey(self):
459 if self.is_initiator:
464 def my_cryptokey(self):
465 if self.is_initiator:
470 def peer_cryptokey(self):
471 if self.is_initiator:
475 def concat(self, alg, key_len):
476 return alg + "-" + str(key_len * 8)
479 def vpp_ike_cypto_alg(self):
480 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
483 def vpp_esp_cypto_alg(self):
484 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
486 def verify_hmac(self, ikemsg):
487 integ_trunc = self.ike_integ_alg.trunc_len
488 exp_hmac = ikemsg[-integ_trunc:]
489 data = ikemsg[:-integ_trunc]
490 computed_hmac = self.compute_hmac(
491 self.ike_integ_alg.mod(), self.peer_authkey, data
493 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
495 def compute_hmac(self, integ, key, data):
496 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
500 def decrypt(self, data, aad=None, icv=None):
501 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
503 def hmac_and_decrypt(self, ike):
504 ep = ike[ikev2.IKEv2_payload_Encrypted]
505 if self.ike_crypto == "AES-GCM-16ICV":
506 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
507 ct = ep.load[:-GCM_ICV_SIZE]
508 tag = ep.load[-GCM_ICV_SIZE:]
509 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
511 self.verify_hmac(raw(ike))
512 integ_trunc = self.ike_integ_alg.trunc_len
514 # remove ICV and decrypt payload
515 ct = ep.load[:-integ_trunc]
516 plain = self.decrypt(ct)
519 return plain[: -pad_len - 1]
521 def build_ts_addr(self, ts, version):
523 "starting_address_v" + version: ts["start_addr"],
524 "ending_address_v" + version: ts["end_addr"],
527 def generate_ts(self, is_ip4):
528 c = self.child_sas[0]
529 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
531 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
532 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
533 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
534 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
536 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
537 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
538 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
539 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
541 if self.is_initiator:
542 return ([ts1], [ts2])
543 return ([ts2], [ts1])
545 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
546 if crypto not in CRYPTO_ALGOS:
547 raise TypeError("unsupported encryption algo %r" % crypto)
548 self.ike_crypto = crypto
549 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
550 self.ike_crypto_key_len = crypto_key_len
552 if integ not in AUTH_ALGOS:
553 raise TypeError("unsupported auth algo %r" % integ)
554 self.ike_integ = None if integ == "NULL" else integ
555 self.ike_integ_alg = AUTH_ALGOS[integ]
557 if prf not in PRF_ALGOS:
558 raise TypeError("unsupported prf algo %r" % prf)
560 self.ike_prf_alg = PRF_ALGOS[prf]
562 self.ike_group = DH[self.ike_dh]
564 def set_esp_props(self, crypto, crypto_key_len, integ):
565 self.esp_crypto_key_len = crypto_key_len
566 if crypto not in CRYPTO_ALGOS:
567 raise TypeError("unsupported encryption algo %r" % crypto)
568 self.esp_crypto = crypto
569 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
571 if integ not in AUTH_ALGOS:
572 raise TypeError("unsupported auth algo %r" % integ)
573 self.esp_integ = None if integ == "NULL" else integ
574 self.esp_integ_alg = AUTH_ALGOS[integ]
576 def crypto_attr(self, key_len):
577 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
578 return (0x800E << 16 | key_len << 3, 12)
580 raise Exception("unsupported attribute type")
582 def ike_crypto_attr(self):
583 return self.crypto_attr(self.ike_crypto_key_len)
585 def esp_crypto_attr(self):
586 return self.crypto_attr(self.esp_crypto_key_len)
588 def compute_nat_sha1(self, ip, port, rspi=None):
591 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
592 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
594 return digest.finalize()
596 def clone(self, test, **kwargs):
597 if "spi" not in kwargs:
598 kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
599 if "nonce" not in kwargs:
600 kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
602 if "local_ts" not in kwargs:
603 kwargs["local_ts"] = self.child_sas[0].local_ts
604 if "remote_ts" not in kwargs:
605 kwargs["remote_ts"] = self.child_sas[0].remote_ts
608 is_initiator=self.is_initiator,
611 id_type=self.id_type,
612 auth_data=self.auth_data,
613 auth_method=self.auth_method,
614 priv_key=self.priv_key,
617 udp_encap=self.udp_encap,
622 crypto=self.ike_crypto,
623 crypto_key_len=self.ike_crypto_key_len,
624 integ=self.ike_integ,
629 crypto=self.esp_crypto,
630 crypto_key_len=self.esp_crypto_key_len,
631 integ=self.esp_integ,
636 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
637 class IkePeer(VppTestCase):
638 """common class for initiator and responder"""
642 import scapy.contrib.ikev2 as _ikev2
644 globals()["ikev2"] = _ikev2
645 super(IkePeer, cls).setUpClass()
646 cls.create_pg_interfaces(range(2))
647 for i in cls.pg_interfaces:
655 def tearDownClass(cls):
656 super(IkePeer, cls).tearDownClass()
659 super(IkePeer, self).tearDown()
660 if self.del_sa_from_responder:
661 self.initiate_del_sa_from_responder()
663 self.initiate_del_sa_from_initiator()
664 r = self.vapi.ikev2_sa_dump()
665 self.assertEqual(len(r), 0)
666 r = self.vapi.ikev2_sa_v2_dump()
667 self.assertEqual(len(r), 0)
668 sas = self.vapi.ipsec_sa_dump()
669 self.assertEqual(len(sas), 0)
670 self.p.remove_vpp_config()
671 self.assertIsNone(self.p.query_vpp_config())
674 super(IkePeer, self).setUp()
676 self.p.add_vpp_config()
677 self.assertIsNotNone(self.p.query_vpp_config())
678 if self.sa.is_initiator:
679 self.sa.generate_dh_data()
680 self.vapi.cli("ikev2 set logging level 4")
681 self.vapi.cli("event-lo clear")
683 def assert_counter(self, count, name, version="ip4"):
684 node_name = "/err/ikev2-%s/" % version + name
685 self.assertEqual(count, self.statistics.get_err_counter(node_name))
687 def create_rekey_request(self, kex=False):
688 sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
689 header = ikev2.IKEv2(
690 init_SPI=self.sa.ispi,
691 resp_SPI=self.sa.rspi,
692 id=self.sa.new_msg_id(),
694 exch_type="CREATE_CHILD_SA",
697 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
698 return self.create_packet(
699 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
702 def create_sa_rekey_request(self, **kwargs):
703 sa = self.generate_sa_init_payload(**kwargs)
704 header = ikev2.IKEv2(
705 init_SPI=self.sa.ispi,
706 resp_SPI=self.sa.rspi,
707 id=self.sa.new_msg_id(),
709 exch_type="CREATE_CHILD_SA",
711 ike_msg = self.encrypt_ike_msg(header, sa, "SA")
712 return self.create_packet(
713 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
716 def create_empty_request(self):
717 header = ikev2.IKEv2(
718 init_SPI=self.sa.ispi,
719 resp_SPI=self.sa.rspi,
720 id=self.sa.new_msg_id(),
722 exch_type="INFORMATIONAL",
723 next_payload="Encrypted",
726 msg = self.encrypt_ike_msg(header, b"", None)
727 return self.create_packet(
728 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
732 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
735 src_ip = src_if.remote_ip6
736 dst_ip = src_if.local_ip6
739 src_ip = src_if.remote_ip4
740 dst_ip = src_if.local_ip4
743 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
744 / ip_layer(src=src_ip, dst=dst_ip)
745 / UDP(sport=sport, dport=dport)
748 # insert non ESP marker
749 res = res / Raw(b"\x00" * 4)
752 def verify_udp(self, udp):
753 self.assertEqual(udp.sport, self.sa.sport)
754 self.assertEqual(udp.dport, self.sa.dport)
756 def get_ike_header(self, packet):
758 ih = packet[ikev2.IKEv2]
759 ih = self.verify_and_remove_non_esp_marker(ih)
760 except IndexError as e:
761 # this is a workaround for getting IKEv2 layer as both ikev2 and
762 # ipsec register for port 4500
764 ih = self.verify_and_remove_non_esp_marker(esp)
765 self.assertEqual(ih.version, 0x20)
766 self.assertNotIn("Version", ih.flags)
769 def verify_and_remove_non_esp_marker(self, packet):
771 # if we are in nat traversal mode check for non esp marker
774 self.assertEqual(data[:4], b"\x00" * 4)
775 return ikev2.IKEv2(data[4:])
779 def encrypt_ike_msg(self, header, plain, first_payload):
780 if self.sa.ike_crypto == "AES-GCM-16ICV":
781 data = self.sa.ike_crypto_alg.pad(raw(plain))
786 + len(ikev2.IKEv2_payload_Encrypted())
788 tlen = plen + len(ikev2.IKEv2())
791 sk_p = ikev2.IKEv2_payload_Encrypted(
792 next_payload=first_payload, length=plen
796 encr = self.sa.encrypt(raw(plain), raw(res))
797 sk_p = ikev2.IKEv2_payload_Encrypted(
798 next_payload=first_payload, length=plen, load=encr
802 encr = self.sa.encrypt(raw(plain))
803 trunc_len = self.sa.ike_integ_alg.trunc_len
804 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
805 tlen = plen + len(ikev2.IKEv2())
807 sk_p = ikev2.IKEv2_payload_Encrypted(
808 next_payload=first_payload, length=plen, load=encr
813 integ_data = raw(res)
814 hmac_data = self.sa.compute_hmac(
815 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
817 res = res / Raw(hmac_data[:trunc_len])
818 assert len(res) == tlen
821 def verify_udp_encap(self, ipsec_sa):
822 e = VppEnum.vl_api_ipsec_sad_flags_t
823 if self.sa.udp_encap or self.sa.natt:
824 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
826 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
828 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
829 sas = self.vapi.ipsec_sa_dump()
832 # after rekey there is a short period of time in which old
833 # inbound SA is still present
837 self.assertEqual(len(sas), sa_count)
838 if self.sa.is_initiator:
853 c = self.sa.child_sas[0]
855 self.verify_udp_encap(sa0)
856 self.verify_udp_encap(sa1)
857 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
858 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
859 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
861 if self.sa.esp_integ is None:
864 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
865 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
866 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
869 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
870 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
871 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
872 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
876 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
877 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
878 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
879 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
881 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
882 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
884 def verify_keymat(self, api_keys, keys, name):
885 km = getattr(keys, name)
886 api_km = getattr(api_keys, name)
887 api_km_len = getattr(api_keys, name + "_len")
888 self.assertEqual(len(km), api_km_len)
889 self.assertEqual(km, api_km[:api_km_len])
891 def verify_id(self, api_id, exp_id):
892 self.assertEqual(api_id.type, IDType.value(exp_id.type))
893 self.assertEqual(api_id.data_len, exp_id.data_len)
894 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
896 def verify_ike_sas(self, is_rekey=False):
897 r = self.vapi.ikev2_sa_dump()
904 self.assertEqual(len(r), sa_count)
905 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
906 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
908 if self.sa.is_initiator:
909 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
910 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
912 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
913 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
915 if self.sa.is_initiator:
916 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
917 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
919 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
920 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
921 self.verify_keymat(sa.keys, self.sa, "sk_d")
922 self.verify_keymat(sa.keys, self.sa, "sk_ai")
923 self.verify_keymat(sa.keys, self.sa, "sk_ar")
924 self.verify_keymat(sa.keys, self.sa, "sk_ei")
925 self.verify_keymat(sa.keys, self.sa, "sk_er")
926 self.verify_keymat(sa.keys, self.sa, "sk_pi")
927 self.verify_keymat(sa.keys, self.sa, "sk_pr")
929 self.assertEqual(sa.i_id.type, self.sa.id_type)
930 self.assertEqual(sa.r_id.type, self.sa.id_type)
931 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
932 self.assertEqual(sa.r_id.data_len, len(self.idr))
933 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
934 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
936 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
937 self.verify_nonce(n, self.sa.i_nonce)
938 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
939 self.verify_nonce(n, self.sa.r_nonce)
941 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
943 self.assertEqual(len(r), 0)
946 self.assertEqual(len(r), 1)
948 self.assertEqual(csa.sa_index, sa.sa_index)
949 c = self.sa.child_sas[0]
950 if hasattr(c, "sk_ai"):
951 self.verify_keymat(csa.keys, c, "sk_ai")
952 self.verify_keymat(csa.keys, c, "sk_ar")
953 self.verify_keymat(csa.keys, c, "sk_ei")
954 self.verify_keymat(csa.keys, c, "sk_er")
955 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
956 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
958 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
961 r = self.vapi.ikev2_traffic_selector_dump(
962 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
964 self.assertEqual(len(r), 1)
966 self.verify_ts(r[0].ts, tsi[0], True)
968 r = self.vapi.ikev2_traffic_selector_dump(
969 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
971 self.assertEqual(len(r), 1)
972 self.verify_ts(r[0].ts, tsr[0], False)
974 def verify_ike_sas_v2(self):
975 r = self.vapi.ikev2_sa_v2_dump()
976 self.assertEqual(len(r), 1)
978 self.assertEqual(self.p.profile_name, sa.profile_name)
979 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
980 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
982 if self.sa.is_initiator:
983 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
984 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
986 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
987 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
989 if self.sa.is_initiator:
990 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
991 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
993 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
994 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
995 self.verify_keymat(sa.keys, self.sa, "sk_d")
996 self.verify_keymat(sa.keys, self.sa, "sk_ai")
997 self.verify_keymat(sa.keys, self.sa, "sk_ar")
998 self.verify_keymat(sa.keys, self.sa, "sk_ei")
999 self.verify_keymat(sa.keys, self.sa, "sk_er")
1000 self.verify_keymat(sa.keys, self.sa, "sk_pi")
1001 self.verify_keymat(sa.keys, self.sa, "sk_pr")
1003 self.assertEqual(sa.i_id.type, self.sa.id_type)
1004 self.assertEqual(sa.r_id.type, self.sa.id_type)
1005 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
1006 self.assertEqual(sa.r_id.data_len, len(self.idr))
1007 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
1008 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
1010 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
1011 self.assertEqual(len(r), 1)
1013 self.assertEqual(csa.sa_index, sa.sa_index)
1014 c = self.sa.child_sas[0]
1015 if hasattr(c, "sk_ai"):
1016 self.verify_keymat(csa.keys, c, "sk_ai")
1017 self.verify_keymat(csa.keys, c, "sk_ar")
1018 self.verify_keymat(csa.keys, c, "sk_ei")
1019 self.verify_keymat(csa.keys, c, "sk_er")
1020 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
1021 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
1023 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1026 r = self.vapi.ikev2_traffic_selector_dump(
1027 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1029 self.assertEqual(len(r), 1)
1031 self.verify_ts(r[0].ts, tsi[0], True)
1033 r = self.vapi.ikev2_traffic_selector_dump(
1034 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1036 self.assertEqual(len(r), 1)
1037 self.verify_ts(r[0].ts, tsr[0], False)
1039 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
1040 self.verify_nonce(n, self.sa.i_nonce)
1041 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
1042 self.verify_nonce(n, self.sa.r_nonce)
1044 def verify_nonce(self, api_nonce, nonce):
1045 self.assertEqual(api_nonce.data_len, len(nonce))
1046 self.assertEqual(api_nonce.nonce, nonce)
1048 def verify_ts(self, api_ts, ts, is_initiator):
1050 self.assertTrue(api_ts.is_local)
1052 self.assertFalse(api_ts.is_local)
1054 if self.p.ts_is_ip4:
1055 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
1056 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
1058 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
1059 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
1060 self.assertEqual(api_ts.start_port, ts.start_port)
1061 self.assertEqual(api_ts.end_port, ts.end_port)
1062 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
1065 class TemplateInitiator(IkePeer):
1066 """initiator test template"""
1068 def initiate_del_sa_from_initiator(self):
1069 ispi = int.from_bytes(self.sa.ispi, "little")
1070 self.pg0.enable_capture()
1072 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
1073 capture = self.pg0.get_capture(1)
1074 ih = self.get_ike_header(capture[0])
1075 self.assertNotIn("Response", ih.flags)
1076 self.assertIn("Initiator", ih.flags)
1077 self.assertEqual(ih.init_SPI, self.sa.ispi)
1078 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1079 plain = self.sa.hmac_and_decrypt(ih)
1080 d = ikev2.IKEv2_payload_Delete(plain)
1081 self.assertEqual(d.proto, 1) # proto=IKEv2
1082 header = ikev2.IKEv2(
1083 init_SPI=self.sa.ispi,
1084 resp_SPI=self.sa.rspi,
1086 exch_type="INFORMATIONAL",
1088 next_payload="Encrypted",
1090 resp = self.encrypt_ike_msg(header, b"", None)
1091 self.send_and_assert_no_replies(self.pg0, resp)
1093 def verify_del_sa(self, packet):
1094 ih = self.get_ike_header(packet)
1095 self.assertEqual(ih.id, self.sa.msg_id)
1096 self.assertEqual(ih.exch_type, 37) # exchange informational
1097 self.assertIn("Response", ih.flags)
1098 self.assertIn("Initiator", ih.flags)
1099 plain = self.sa.hmac_and_decrypt(ih)
1100 self.assertEqual(plain, b"")
1102 def initiate_del_sa_from_responder(self):
1103 header = ikev2.IKEv2(
1104 init_SPI=self.sa.ispi,
1105 resp_SPI=self.sa.rspi,
1106 exch_type="INFORMATIONAL",
1107 id=self.sa.new_msg_id(),
1109 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1110 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1111 packet = self.create_packet(
1112 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1114 self.pg0.add_stream(packet)
1115 self.pg0.enable_capture()
1117 capture = self.pg0.get_capture(1)
1118 self.verify_del_sa(capture[0])
1121 def find_notify_payload(packet, notify_type):
1122 n = packet[ikev2.IKEv2_payload_Notify]
1123 while n is not None:
1124 if n.type == notify_type:
1129 def verify_nat_detection(self, packet):
1136 # NAT_DETECTION_SOURCE_IP
1137 s = self.find_notify_payload(packet, 16388)
1138 self.assertIsNotNone(s)
1139 src_sha = self.sa.compute_nat_sha1(
1140 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
1142 self.assertEqual(s.load, src_sha)
1144 # NAT_DETECTION_DESTINATION_IP
1145 s = self.find_notify_payload(packet, 16389)
1146 self.assertIsNotNone(s)
1147 dst_sha = self.sa.compute_nat_sha1(
1148 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1150 self.assertEqual(s.load, dst_sha)
1152 def verify_sa_init_request(self, packet):
1154 self.sa.dport = udp.sport
1155 ih = packet[ikev2.IKEv2]
1156 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1157 self.assertEqual(ih.exch_type, 34) # SA_INIT
1158 self.sa.ispi = ih.init_SPI
1159 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1160 self.assertIn("Initiator", ih.flags)
1161 self.assertNotIn("Response", ih.flags)
1162 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1163 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1165 prop = packet[ikev2.IKEv2_payload_Proposal]
1166 self.assertEqual(prop.proto, 1) # proto = ikev2
1167 self.assertEqual(prop.proposal, 1)
1168 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1170 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1172 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1173 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1174 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1175 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1177 self.verify_nat_detection(packet)
1178 self.sa.set_ike_props(
1179 crypto="AES-GCM-16ICV",
1182 prf="PRF_HMAC_SHA2_256",
1185 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1186 self.sa.generate_dh_data()
1187 self.sa.complete_dh_data()
1190 def update_esp_transforms(self, trans, sa):
1192 if trans.transform_type == 1: # ecryption
1193 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1194 elif trans.transform_type == 3: # integrity
1195 sa.esp_integ = INTEG_IDS[trans.transform_id]
1196 trans = trans.payload
1198 def verify_sa_auth_req(self, packet):
1200 self.sa.dport = udp.sport
1201 ih = self.get_ike_header(packet)
1202 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1203 self.assertEqual(ih.init_SPI, self.sa.ispi)
1204 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1205 self.assertIn("Initiator", ih.flags)
1206 self.assertNotIn("Response", ih.flags)
1209 self.verify_udp(udp)
1210 self.assertEqual(ih.id, self.sa.msg_id + 1)
1212 plain = self.sa.hmac_and_decrypt(ih)
1213 idi = ikev2.IKEv2_payload_IDi(plain)
1214 self.assertEqual(idi.load, self.sa.i_id)
1215 if self.no_idr_auth:
1216 self.assertEqual(idi.next_payload, 39) # AUTH
1218 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1219 self.assertEqual(idr.load, self.sa.r_id)
1220 prop = idi[ikev2.IKEv2_payload_Proposal]
1221 c = self.sa.child_sas[0]
1223 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1225 def send_init_response(self):
1226 tr_attr = self.sa.ike_crypto_attr()
1228 ikev2.IKEv2_payload_Transform(
1229 transform_type="Encryption",
1230 transform_id=self.sa.ike_crypto,
1232 key_length=tr_attr[0],
1234 / ikev2.IKEv2_payload_Transform(
1235 transform_type="Integrity", transform_id=self.sa.ike_integ
1237 / ikev2.IKEv2_payload_Transform(
1238 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1240 / ikev2.IKEv2_payload_Transform(
1241 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1244 props = ikev2.IKEv2_payload_Proposal(
1245 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1248 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1250 dst_address = b"\x0a\x0a\x0a\x0a"
1252 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1253 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1254 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1256 self.sa.init_resp_packet = (
1258 init_SPI=self.sa.ispi,
1259 resp_SPI=self.sa.rspi,
1260 exch_type="IKE_SA_INIT",
1263 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1264 / ikev2.IKEv2_payload_KE(
1265 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1267 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1268 / ikev2.IKEv2_payload_Notify(
1269 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1271 / ikev2.IKEv2_payload_Notify(
1272 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1276 ike_msg = self.create_packet(
1278 self.sa.init_resp_packet,
1284 self.pg_send(self.pg0, ike_msg)
1285 capture = self.pg0.get_capture(1)
1286 self.verify_sa_auth_req(capture[0])
1288 def initiate_sa_init(self):
1289 self.pg0.enable_capture()
1291 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1293 capture = self.pg0.get_capture(1)
1294 self.verify_sa_init_request(capture[0])
1295 self.send_init_response()
1297 def send_auth_response(self):
1298 tr_attr = self.sa.esp_crypto_attr()
1300 ikev2.IKEv2_payload_Transform(
1301 transform_type="Encryption",
1302 transform_id=self.sa.esp_crypto,
1304 key_length=tr_attr[0],
1306 / ikev2.IKEv2_payload_Transform(
1307 transform_type="Integrity", transform_id=self.sa.esp_integ
1309 / ikev2.IKEv2_payload_Transform(
1310 transform_type="Extended Sequence Number", transform_id="No ESN"
1312 / ikev2.IKEv2_payload_Transform(
1313 transform_type="Extended Sequence Number", transform_id="ESN"
1317 c = self.sa.child_sas[0]
1318 props = ikev2.IKEv2_payload_Proposal(
1319 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1322 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1324 ikev2.IKEv2_payload_IDi(
1325 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1327 / ikev2.IKEv2_payload_IDr(
1328 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1330 / ikev2.IKEv2_payload_AUTH(
1332 auth_type=AuthMethod.value(self.sa.auth_method),
1333 load=self.sa.auth_data,
1335 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1336 / ikev2.IKEv2_payload_TSi(
1337 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1339 / ikev2.IKEv2_payload_TSr(
1340 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1342 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1345 header = ikev2.IKEv2(
1346 init_SPI=self.sa.ispi,
1347 resp_SPI=self.sa.rspi,
1348 id=self.sa.new_msg_id(),
1350 exch_type="IKE_AUTH",
1353 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1354 packet = self.create_packet(
1355 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1357 self.pg_send(self.pg0, packet)
1359 def test_initiator(self):
1360 self.initiate_sa_init()
1362 self.sa.calc_child_keys()
1363 self.send_auth_response()
1364 self.verify_ike_sas()
1365 self.verify_ike_sas_v2()
1368 class TemplateResponder(IkePeer):
1369 """responder test template"""
1371 def initiate_del_sa_from_responder(self):
1372 self.pg0.enable_capture()
1374 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1375 capture = self.pg0.get_capture(1)
1376 ih = self.get_ike_header(capture[0])
1377 self.assertNotIn("Response", ih.flags)
1378 self.assertNotIn("Initiator", ih.flags)
1379 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1380 plain = self.sa.hmac_and_decrypt(ih)
1381 d = ikev2.IKEv2_payload_Delete(plain)
1382 self.assertEqual(d.proto, 1) # proto=IKEv2
1383 self.assertEqual(ih.init_SPI, self.sa.ispi)
1384 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1385 header = ikev2.IKEv2(
1386 init_SPI=self.sa.ispi,
1387 resp_SPI=self.sa.rspi,
1388 flags="Initiator+Response",
1389 exch_type="INFORMATIONAL",
1391 next_payload="Encrypted",
1393 resp = self.encrypt_ike_msg(header, b"", None)
1394 self.send_and_assert_no_replies(self.pg0, resp)
1396 def verify_del_sa(self, packet):
1397 ih = self.get_ike_header(packet)
1398 self.assertEqual(ih.id, self.sa.msg_id)
1399 self.assertEqual(ih.exch_type, 37) # exchange informational
1400 self.assertIn("Response", ih.flags)
1401 self.assertNotIn("Initiator", ih.flags)
1402 self.assertEqual(ih.next_payload, 46) # Encrypted
1403 self.assertEqual(ih.init_SPI, self.sa.ispi)
1404 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1405 plain = self.sa.hmac_and_decrypt(ih)
1406 self.assertEqual(plain, b"")
1408 def initiate_del_sa_from_initiator(self):
1409 header = ikev2.IKEv2(
1410 init_SPI=self.sa.ispi,
1411 resp_SPI=self.sa.rspi,
1413 exch_type="INFORMATIONAL",
1414 id=self.sa.new_msg_id(),
1416 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1417 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1418 packet = self.create_packet(
1419 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1421 self.pg0.add_stream(packet)
1422 self.pg0.enable_capture()
1424 capture = self.pg0.get_capture(1)
1425 self.verify_del_sa(capture[0])
1427 def generate_sa_init_payload(
1428 self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
1430 tr_attr = self.sa.ike_crypto_attr()
1432 ikev2.IKEv2_payload_Transform(
1433 transform_type="Encryption",
1434 transform_id=self.sa.ike_crypto,
1436 key_length=tr_attr[0],
1438 / ikev2.IKEv2_payload_Transform(
1439 transform_type="Integrity", transform_id=self.sa.ike_integ
1441 / ikev2.IKEv2_payload_Transform(
1442 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1444 / ikev2.IKEv2_payload_Transform(
1445 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1452 pargs = {"SPI": spi, "SPIsize": len(spi)}
1453 props = ikev2.IKEv2_payload_Proposal(
1462 ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1463 / ikev2.IKEv2_payload_KE(
1464 next_payload="Nonce",
1465 group=self.sa.ike_dh,
1466 load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
1468 / ikev2.IKEv2_payload_Nonce(
1469 next_payload=next_payload,
1470 load=self.sa.i_nonce if nonce is None else nonce,
1474 def send_sa_init_req(self):
1475 self.sa.init_req_packet = ikev2.IKEv2(
1476 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1477 ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
1481 src_address = b"\x0a\x0a\x0a\x01"
1483 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1486 dst_address = b"\x0a\x0a\x0a\x0a"
1488 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1490 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1491 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1492 nat_src_detection = ikev2.IKEv2_payload_Notify(
1493 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1495 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1496 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1498 self.sa.init_req_packet = (
1499 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1502 ike_msg = self.create_packet(
1504 self.sa.init_req_packet,
1510 self.pg0.add_stream(ike_msg)
1511 self.pg0.enable_capture()
1513 capture = self.pg0.get_capture(1)
1514 self.verify_sa_init(capture[0])
1516 def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1517 tr_attr = self.sa.esp_crypto_attr()
1518 last_payload = last_payload or "Notify"
1521 ikev2.IKEv2_payload_Transform(
1522 transform_type="Encryption",
1523 transform_id=self.sa.esp_crypto,
1525 key_length=tr_attr[0],
1527 / ikev2.IKEv2_payload_Transform(
1528 transform_type="Integrity", transform_id=self.sa.esp_integ
1530 / ikev2.IKEv2_payload_Transform(
1531 transform_type="Extended Sequence Number", transform_id="No ESN"
1533 / ikev2.IKEv2_payload_Transform(
1534 transform_type="Extended Sequence Number", transform_id="ESN"
1540 trans /= ikev2.IKEv2_payload_Transform(
1541 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1544 c = self.sa.child_sas[0]
1545 props = ikev2.IKEv2_payload_Proposal(
1554 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1556 ikev2.IKEv2_payload_AUTH(
1558 auth_type=AuthMethod.value(self.sa.auth_method),
1559 load=self.sa.auth_data,
1561 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1562 / ikev2.IKEv2_payload_TSi(
1563 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1565 / ikev2.IKEv2_payload_TSr(
1566 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1571 first_payload = "Nonce"
1573 head = ikev2.IKEv2_payload_Nonce(
1574 load=self.sa.i_nonce, next_payload="KE"
1575 ) / ikev2.IKEv2_payload_KE(
1576 group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1579 head = ikev2.IKEv2_payload_Nonce(
1580 load=self.sa.i_nonce, next_payload="SA"
1585 / ikev2.IKEv2_payload_Notify(
1589 length=8 + len(c.ispi),
1590 next_payload="Notify",
1592 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1595 first_payload = "IDi"
1596 if self.no_idr_auth:
1597 ids = ikev2.IKEv2_payload_IDi(
1598 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1601 ids = ikev2.IKEv2_payload_IDi(
1602 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1603 ) / ikev2.IKEv2_payload_IDr(
1604 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1607 return plain, first_payload
1609 def send_sa_auth(self):
1610 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1611 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1612 header = ikev2.IKEv2(
1613 init_SPI=self.sa.ispi,
1614 resp_SPI=self.sa.rspi,
1615 id=self.sa.new_msg_id(),
1617 exch_type="IKE_AUTH",
1620 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1621 packet = self.create_packet(
1622 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1624 self.pg0.add_stream(packet)
1625 self.pg0.enable_capture()
1627 capture = self.pg0.get_capture(1)
1628 self.verify_sa_auth_resp(capture[0])
1630 def verify_sa_init(self, packet):
1631 ih = self.get_ike_header(packet)
1633 self.assertEqual(ih.id, self.sa.msg_id)
1634 self.assertEqual(ih.exch_type, 34)
1635 self.assertIn("Response", ih.flags)
1636 self.assertEqual(ih.init_SPI, self.sa.ispi)
1637 self.assertNotEqual(ih.resp_SPI, 0)
1638 self.sa.rspi = ih.resp_SPI
1640 sa = ih[ikev2.IKEv2_payload_SA]
1641 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1642 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1643 except IndexError as e:
1644 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1645 self.logger.error(ih.show())
1647 self.sa.complete_dh_data()
1651 def verify_sa_auth_resp(self, packet):
1652 ike = self.get_ike_header(packet)
1654 self.verify_udp(udp)
1655 self.assertEqual(ike.id, self.sa.msg_id)
1656 plain = self.sa.hmac_and_decrypt(ike)
1657 idr = ikev2.IKEv2_payload_IDr(plain)
1658 prop = idr[ikev2.IKEv2_payload_Proposal]
1659 self.assertEqual(prop.SPIsize, 4)
1660 self.sa.child_sas[0].rspi = prop.SPI
1661 self.sa.calc_child_keys()
1663 IKE_NODE_SUFFIX = "ip4"
1665 def verify_counters(self):
1666 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1667 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1668 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1670 r = self.vapi.ikev2_sa_dump()
1672 self.assertEqual(1, s.n_sa_auth_req)
1673 self.assertEqual(1, s.n_sa_init_req)
1675 r = self.vapi.ikev2_sa_v2_dump()
1677 self.assertEqual(1, s.n_sa_auth_req)
1678 self.assertEqual(1, s.n_sa_init_req)
1680 def test_responder(self):
1681 self.send_sa_init_req()
1683 self.verify_ipsec_sas()
1684 self.verify_ike_sas()
1685 self.verify_ike_sas_v2()
1686 self.verify_counters()
1689 class Ikev2Params(object):
1690 def config_params(self, params={}):
1691 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1692 ei = VppEnum.vl_api_ipsec_integ_alg_t
1694 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1695 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1696 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1697 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1698 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1699 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1700 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1701 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1702 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1703 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1706 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1708 self.vapi.cli("ikev2 dpd disable")
1709 self.del_sa_from_responder = (
1711 if "del_sa_from_responder" not in params
1712 else params["del_sa_from_responder"]
1714 i_natt = False if "i_natt" not in params else params["i_natt"]
1715 r_natt = False if "r_natt" not in params else params["r_natt"]
1716 self.p = Profile(self, "pr1")
1717 self.ip6 = False if "ip6" not in params else params["ip6"]
1719 if "auth" in params and params["auth"] == "rsa-sig":
1720 auth_method = "rsa-sig"
1721 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1722 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1724 client_file = work_dir + params["client-cert"]
1725 server_pem = open(work_dir + params["server-cert"]).read()
1726 client_priv = open(work_dir + params["client-key"]).read()
1727 client_priv = load_pem_private_key(
1728 str.encode(client_priv), None, default_backend()
1730 self.peer_cert = x509.load_pem_x509_certificate(
1731 str.encode(server_pem), default_backend()
1733 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1736 auth_data = b"$3cr3tpa$$w0rd"
1737 self.p.add_auth(method="shared-key", data=auth_data)
1738 auth_method = "shared-key"
1741 is_init = True if "is_initiator" not in params else params["is_initiator"]
1742 self.no_idr_auth = params.get("no_idr_in_auth", False)
1744 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1745 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1746 r_id = self.idr = idr["data"]
1747 i_id = self.idi = idi["data"]
1749 # scapy is initiator, VPP is responder
1750 self.p.add_local_id(**idr)
1751 self.p.add_remote_id(**idi)
1752 if self.no_idr_auth:
1755 # VPP is initiator, scapy is responder
1756 self.p.add_local_id(**idi)
1757 if not self.no_idr_auth:
1758 self.p.add_remote_id(**idr)
1761 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1762 if "loc_ts" not in params
1763 else params["loc_ts"]
1766 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1767 if "rem_ts" not in params
1768 else params["rem_ts"]
1770 self.p.add_local_ts(**loc_ts)
1771 self.p.add_remote_ts(**rem_ts)
1772 if "responder" in params:
1773 self.p.add_responder(params["responder"])
1774 if "ike_transforms" in params:
1775 self.p.add_ike_transforms(params["ike_transforms"])
1776 if "esp_transforms" in params:
1777 self.p.add_esp_transforms(params["esp_transforms"])
1779 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1781 self.p.set_udp_encap(True)
1783 if "responder_hostname" in params:
1784 hn = params["responder_hostname"]
1785 self.p.add_responder_hostname(hn)
1787 # configure static dns record
1788 self.vapi.dns_name_server_add_del(
1789 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1791 self.vapi.dns_enable_disable(enable=1)
1793 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1800 is_initiator=is_init,
1801 id_type=self.p.local_id["id_type"],
1804 priv_key=client_priv,
1805 auth_method=auth_method,
1806 nonce=params.get("nonce"),
1807 auth_data=auth_data,
1808 udp_encap=udp_encap,
1809 local_ts=self.p.remote_ts,
1810 remote_ts=self.p.local_ts,
1815 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1818 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1820 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1823 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1826 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1829 self.sa.set_ike_props(
1830 crypto=ike_crypto[0],
1831 crypto_key_len=ike_crypto[1],
1833 prf="PRF_HMAC_SHA2_256",
1836 self.sa.set_esp_props(
1837 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1841 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1842 class TestApi(VppTestCase):
1843 """Test IKEV2 API"""
1846 def setUpClass(cls):
1847 super(TestApi, cls).setUpClass()
1850 def tearDownClass(cls):
1851 super(TestApi, cls).tearDownClass()
1854 super(TestApi, self).tearDown()
1855 self.p1.remove_vpp_config()
1856 self.p2.remove_vpp_config()
1857 r = self.vapi.ikev2_profile_dump()
1858 self.assertEqual(len(r), 0)
1860 def configure_profile(self, cfg):
1861 p = Profile(self, cfg["name"])
1862 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1863 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1864 p.add_local_ts(**cfg["loc_ts"])
1865 p.add_remote_ts(**cfg["rem_ts"])
1866 p.add_responder(cfg["responder"])
1867 p.add_ike_transforms(cfg["ike_ts"])
1868 p.add_esp_transforms(cfg["esp_ts"])
1869 p.add_auth(**cfg["auth"])
1870 p.set_udp_encap(cfg["udp_encap"])
1871 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1872 if "lifetime_data" in cfg:
1873 p.set_lifetime_data(cfg["lifetime_data"])
1874 if "tun_itf" in cfg:
1875 p.set_tunnel_interface(cfg["tun_itf"])
1876 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1881 def test_profile_api(self):
1882 """test profile dump API"""
1887 "start_addr": "3.3.3.2",
1888 "end_addr": "3.3.3.3",
1894 "start_addr": "4.5.76.80",
1895 "end_addr": "2.3.4.6",
1902 "start_addr": "ab::1",
1903 "end_addr": "ab::4",
1909 "start_addr": "cd::12",
1910 "end_addr": "cd::13",
1916 "natt_disabled": True,
1917 "loc_id": ("fqdn", b"vpp.home"),
1918 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1921 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1924 "crypto_key_size": 32,
1928 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1929 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1931 "ipsec_over_udp_port": 4501,
1934 "lifetime_maxdata": 20192,
1935 "lifetime_jitter": 9,
1941 "loc_id": ("ip4-addr", b"192.168.2.1"),
1942 "rem_id": ("ip6-addr", b"abcd::1"),
1945 "responder": {"sw_if_index": 4, "addr": "def::10"},
1948 "crypto_key_size": 16,
1952 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1953 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1955 "ipsec_over_udp_port": 4600,
1959 self.p1 = self.configure_profile(conf["p1"])
1960 self.p2 = self.configure_profile(conf["p2"])
1962 r = self.vapi.ikev2_profile_dump()
1963 self.assertEqual(len(r), 2)
1964 self.verify_profile(r[0].profile, conf["p1"])
1965 self.verify_profile(r[1].profile, conf["p2"])
1967 def verify_id(self, api_id, cfg_id):
1968 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1969 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1971 def verify_ts(self, api_ts, cfg_ts):
1972 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1973 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1974 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1975 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1976 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1978 def verify_responder(self, api_r, cfg_r):
1979 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1980 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1982 def verify_transforms(self, api_ts, cfg_ts):
1983 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1984 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1985 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1987 def verify_ike_transforms(self, api_ts, cfg_ts):
1988 self.verify_transforms(api_ts, cfg_ts)
1989 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1991 def verify_esp_transforms(self, api_ts, cfg_ts):
1992 self.verify_transforms(api_ts, cfg_ts)
1994 def verify_auth(self, api_auth, cfg_auth):
1995 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1996 self.assertEqual(api_auth.data, cfg_auth["data"])
1997 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1999 def verify_lifetime_data(self, p, ld):
2000 self.assertEqual(p.lifetime, ld["lifetime"])
2001 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
2002 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
2003 self.assertEqual(p.handover, ld["handover"])
2005 def verify_profile(self, ap, cp):
2006 self.assertEqual(ap.name, cp["name"])
2007 self.assertEqual(ap.udp_encap, cp["udp_encap"])
2008 self.verify_id(ap.loc_id, cp["loc_id"])
2009 self.verify_id(ap.rem_id, cp["rem_id"])
2010 self.verify_ts(ap.loc_ts, cp["loc_ts"])
2011 self.verify_ts(ap.rem_ts, cp["rem_ts"])
2012 self.verify_responder(ap.responder, cp["responder"])
2013 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
2014 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
2015 self.verify_auth(ap.auth, cp["auth"])
2016 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
2017 self.assertTrue(natt_dis == ap.natt_disabled)
2019 if "lifetime_data" in cp:
2020 self.verify_lifetime_data(ap, cp["lifetime_data"])
2021 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
2023 self.assertEqual(ap.tun_itf, cp["tun_itf"])
2025 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
2028 @tag_fixme_vpp_workers
2029 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
2030 """test responder - responder behind NAT"""
2032 IKE_NODE_SUFFIX = "ip4-natt"
2034 def config_tc(self):
2035 self.config_params({"r_natt": True})
2038 @tag_fixme_vpp_workers
2039 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
2040 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
2042 def config_tc(self):
2046 "is_initiator": False, # seen from test case perspective
2047 # thus vpp is initiator
2049 "sw_if_index": self.pg0.sw_if_index,
2050 "addr": self.pg0.remote_ip4,
2052 "ike-crypto": ("AES-GCM-16ICV", 32),
2053 "ike-integ": "NULL",
2054 "ike-dh": "3072MODPgr",
2056 "crypto_alg": 20, # "aes-gcm-16"
2057 "crypto_key_size": 256,
2058 "dh_group": 15, # "modp-3072"
2061 "crypto_alg": 12, # "aes-cbc"
2062 "crypto_key_size": 256,
2063 # "hmac-sha2-256-128"
2070 @tag_fixme_vpp_workers
2071 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
2072 """test ikev2 initiator - pre shared key auth"""
2074 def config_tc(self):
2077 "is_initiator": False, # seen from test case perspective
2078 # thus vpp is initiator
2079 "ike-crypto": ("AES-GCM-16ICV", 32),
2080 "ike-integ": "NULL",
2081 "ike-dh": "3072MODPgr",
2083 "crypto_alg": 20, # "aes-gcm-16"
2084 "crypto_key_size": 256,
2085 "dh_group": 15, # "modp-3072"
2088 "crypto_alg": 12, # "aes-cbc"
2089 "crypto_key_size": 256,
2090 # "hmac-sha2-256-128"
2093 "responder_hostname": {
2094 "hostname": "vpp.responder.org",
2095 "sw_if_index": self.pg0.sw_if_index,
2101 @tag_fixme_vpp_workers
2102 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
2103 """test initiator - request window size (1)"""
2105 def rekey_respond(self, req, update_child_sa_data):
2106 ih = self.get_ike_header(req)
2107 plain = self.sa.hmac_and_decrypt(ih)
2108 sa = ikev2.IKEv2_payload_SA(plain)
2109 if update_child_sa_data:
2110 prop = sa[ikev2.IKEv2_payload_Proposal]
2111 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2112 self.sa.r_nonce = self.sa.i_nonce
2113 self.sa.child_sas[0].ispi = prop.SPI
2114 self.sa.child_sas[0].rspi = prop.SPI
2115 self.sa.calc_child_keys()
2117 header = ikev2.IKEv2(
2118 init_SPI=self.sa.ispi,
2119 resp_SPI=self.sa.rspi,
2123 next_payload="Encrypted",
2125 resp = self.encrypt_ike_msg(header, sa, "SA")
2126 packet = self.create_packet(
2127 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2129 self.send_and_assert_no_replies(self.pg0, packet)
2131 def test_initiator(self):
2132 super(TestInitiatorRequestWindowSize, self).test_initiator()
2133 self.pg0.enable_capture()
2135 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2136 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2137 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2138 capture = self.pg0.get_capture(2)
2140 # reply in reverse order
2141 self.rekey_respond(capture[1], True)
2142 self.rekey_respond(capture[0], False)
2144 # verify that only the second request was accepted
2145 self.verify_ike_sas()
2146 self.verify_ike_sas_v2()
2147 self.verify_ipsec_sas(is_rekey=True)
2150 @tag_fixme_vpp_workers
2151 class TestInitiatorRekey(TestInitiatorPsk):
2152 """test ikev2 initiator - rekey"""
2154 def rekey_from_initiator(self):
2155 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2156 self.pg0.enable_capture()
2158 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2159 capture = self.pg0.get_capture(1)
2160 ih = self.get_ike_header(capture[0])
2161 self.assertEqual(ih.exch_type, 36) # CHILD_SA
2162 self.assertNotIn("Response", ih.flags)
2163 self.assertIn("Initiator", ih.flags)
2164 plain = self.sa.hmac_and_decrypt(ih)
2165 sa = ikev2.IKEv2_payload_SA(plain)
2166 prop = sa[ikev2.IKEv2_payload_Proposal]
2167 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2168 self.sa.r_nonce = self.sa.i_nonce
2169 # update new responder SPI
2170 self.sa.child_sas[0].ispi = prop.SPI
2171 self.sa.child_sas[0].rspi = prop.SPI
2172 self.sa.calc_child_keys()
2173 header = ikev2.IKEv2(
2174 init_SPI=self.sa.ispi,
2175 resp_SPI=self.sa.rspi,
2179 next_payload="Encrypted",
2181 resp = self.encrypt_ike_msg(header, sa, "SA")
2182 packet = self.create_packet(
2183 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2185 self.send_and_assert_no_replies(self.pg0, packet)
2187 def test_initiator(self):
2188 super(TestInitiatorRekey, self).test_initiator()
2189 self.rekey_from_initiator()
2190 self.verify_ike_sas()
2191 self.verify_ike_sas_v2()
2192 self.verify_ipsec_sas(is_rekey=True)
2195 @tag_fixme_vpp_workers
2196 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2197 """test ikev2 initiator - delete IKE SA from responder"""
2199 def config_tc(self):
2202 "del_sa_from_responder": True,
2203 "is_initiator": False, # seen from test case perspective
2204 # thus vpp is initiator
2206 "sw_if_index": self.pg0.sw_if_index,
2207 "addr": self.pg0.remote_ip4,
2209 "ike-crypto": ("AES-GCM-16ICV", 32),
2210 "ike-integ": "NULL",
2211 "ike-dh": "3072MODPgr",
2213 "crypto_alg": 20, # "aes-gcm-16"
2214 "crypto_key_size": 256,
2215 "dh_group": 15, # "modp-3072"
2218 "crypto_alg": 12, # "aes-cbc"
2219 "crypto_key_size": 256,
2220 # "hmac-sha2-256-128"
2223 "no_idr_in_auth": True,
2228 @tag_fixme_vpp_workers
2229 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2230 """test ikev2 responder - initiator behind NAT"""
2232 IKE_NODE_SUFFIX = "ip4-natt"
2234 def config_tc(self):
2235 self.config_params({"i_natt": True})
2238 @tag_fixme_vpp_workers
2239 class TestResponderPsk(TemplateResponder, Ikev2Params):
2240 """test ikev2 responder - pre shared key auth"""
2242 def config_tc(self):
2243 self.config_params()
2246 @tag_fixme_vpp_workers
2247 class TestResponderDpd(TestResponderPsk):
2249 Dead peer detection test
2252 def config_tc(self):
2253 self.config_params({"dpd_disabled": False})
2258 def test_responder(self):
2259 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2260 super(TestResponderDpd, self).test_responder()
2261 self.pg0.enable_capture()
2263 # capture empty request but don't reply
2264 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2265 ih = self.get_ike_header(capture[0])
2266 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2267 plain = self.sa.hmac_and_decrypt(ih)
2268 self.assertEqual(plain, b"")
2269 # wait for SA expiration
2271 ike_sas = self.vapi.ikev2_sa_dump()
2272 self.assertEqual(len(ike_sas), 0)
2273 ike_sas = self.vapi.ikev2_sa_v2_dump()
2274 self.assertEqual(len(ike_sas), 0)
2275 ipsec_sas = self.vapi.ipsec_sa_dump()
2276 self.assertEqual(len(ipsec_sas), 0)
2279 @tag_fixme_vpp_workers
2280 class TestResponderRekey(TestResponderPsk):
2281 """test ikev2 responder - rekey"""
2285 def send_rekey_from_initiator(self):
2287 self.sa.generate_dh_data()
2288 packet = self.create_rekey_request(kex=self.WITH_KEX)
2289 self.pg0.add_stream(packet)
2290 self.pg0.enable_capture()
2292 capture = self.pg0.get_capture(1)
2295 def process_rekey_response(self, capture):
2296 ih = self.get_ike_header(capture[0])
2297 plain = self.sa.hmac_and_decrypt(ih)
2298 sa = ikev2.IKEv2_payload_SA(plain)
2299 prop = sa[ikev2.IKEv2_payload_Proposal]
2300 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2301 # update new responder SPI
2302 self.sa.child_sas[0].rspi = prop.SPI
2304 self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2305 self.sa.complete_dh_data()
2306 self.sa.calc_child_keys(kex=self.WITH_KEX)
2308 def test_responder(self):
2309 super(TestResponderRekey, self).test_responder()
2310 self.process_rekey_response(self.send_rekey_from_initiator())
2311 self.verify_ike_sas()
2312 self.verify_ike_sas_v2()
2313 self.verify_ipsec_sas(is_rekey=True)
2314 self.assert_counter(1, "rekey_req", "ip4")
2315 r = self.vapi.ikev2_sa_dump()
2316 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2317 r = self.vapi.ikev2_sa_v2_dump()
2318 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2321 @tag_fixme_vpp_workers
2322 class TestResponderRekeyRepeat(TestResponderRekey):
2323 """test ikev2 responder - rekey repeat"""
2325 def test_responder(self):
2326 super(TestResponderRekeyRepeat, self).test_responder()
2327 # rekey request is not accepted until old IPsec SA is expired
2328 capture = self.send_rekey_from_initiator()
2329 ih = self.get_ike_header(capture[0])
2330 plain = self.sa.hmac_and_decrypt(ih)
2331 notify = ikev2.IKEv2_payload_Notify(plain)
2332 self.assertEqual(notify.type, 43)
2333 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2334 # rekey request is accepted after old IPsec SA was expired
2336 if len(self.vapi.ipsec_sa_dump()) != 3:
2340 self.fail("old IPsec SA not expired")
2341 self.process_rekey_response(self.send_rekey_from_initiator())
2342 self.verify_ike_sas()
2343 self.verify_ike_sas_v2()
2344 self.verify_ipsec_sas(sa_count=3)
2347 @tag_fixme_vpp_workers
2348 class TestResponderRekeyKEX(TestResponderRekey):
2349 """test ikev2 responder - rekey with key exchange"""
2354 @tag_fixme_vpp_workers
2355 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2356 """test ikev2 responder - rekey repeat with key exchange"""
2361 @tag_fixme_vpp_workers
2362 class TestResponderRekeySA(TestResponderPsk):
2363 """test ikev2 responder - rekey IKE SA"""
2365 def send_rekey_from_initiator(self, newsa):
2366 packet = self.create_sa_rekey_request(
2368 dh_pub_key=newsa.my_dh_pub_key,
2369 nonce=newsa.i_nonce,
2371 self.pg0.add_stream(packet)
2372 self.pg0.enable_capture()
2374 capture = self.pg0.get_capture(1)
2377 def process_rekey_response(self, newsa, capture):
2378 ih = self.get_ike_header(capture[0])
2379 plain = self.sa.hmac_and_decrypt(ih)
2380 sa = ikev2.IKEv2_payload_SA(plain)
2381 prop = sa[ikev2.IKEv2_payload_Proposal]
2382 newsa.rspi = prop.SPI
2383 newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2384 newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2385 newsa.complete_dh_data()
2386 newsa.calc_keys(sk_d=self.sa.sk_d)
2387 newsa.child_sas = self.sa.child_sas
2388 self.sa.child_sas = []
2390 def test_responder(self):
2391 super(TestResponderRekeySA, self).test_responder()
2392 newsa = self.sa.clone(self, spi=os.urandom(8))
2393 newsa.generate_dh_data()
2394 capture = self.send_rekey_from_initiator(newsa)
2395 self.process_rekey_response(newsa, capture)
2396 self.verify_ike_sas(is_rekey=True)
2397 self.assert_counter(1, "rekey_req", "ip4")
2398 r = self.vapi.ikev2_sa_dump()
2399 self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
2400 self.initiate_del_sa_from_initiator()
2402 self.verify_ike_sas()
2405 @tag_fixme_ubuntu2204
2407 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2408 """test ikev2 responder - non-default table id"""
2411 def setUpClass(cls):
2412 import scapy.contrib.ikev2 as _ikev2
2414 globals()["ikev2"] = _ikev2
2415 super(IkePeer, cls).setUpClass()
2416 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2420 cls.create_pg_interfaces(range(1))
2421 cls.vapi.cli("ip table add 1")
2422 cls.vapi.cli("set interface ip table pg0 1")
2423 for i in cls.pg_interfaces:
2430 def config_tc(self):
2431 self.config_params({"dpd_disabled": False})
2433 def test_responder(self):
2434 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2435 super(TestResponderVrf, self).test_responder()
2436 self.pg0.enable_capture()
2438 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2439 ih = self.get_ike_header(capture[0])
2440 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2441 plain = self.sa.hmac_and_decrypt(ih)
2442 self.assertEqual(plain, b"")
2445 @tag_fixme_vpp_workers
2446 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2447 """test ikev2 responder - cert based auth"""
2449 def config_tc(self):
2454 "server-key": "server-key.pem",
2455 "client-key": "client-key.pem",
2456 "client-cert": "client-cert.pem",
2457 "server-cert": "server-cert.pem",
2462 @tag_fixme_vpp_workers
2463 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2464 TemplateResponder, Ikev2Params
2467 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2470 def config_tc(self):
2473 "ike-crypto": ("AES-CBC", 16),
2474 "ike-integ": "SHA2-256-128",
2475 "esp-crypto": ("AES-CBC", 24),
2476 "esp-integ": "SHA2-384-192",
2477 "ike-dh": "2048MODPgr",
2478 "nonce": os.urandom(256),
2479 "no_idr_in_auth": True,
2484 @tag_fixme_vpp_workers
2485 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2486 TemplateResponder, Ikev2Params
2490 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2493 def config_tc(self):
2496 "ike-crypto": ("AES-CBC", 32),
2497 "ike-integ": "SHA2-256-128",
2498 "esp-crypto": ("AES-GCM-16ICV", 32),
2499 "esp-integ": "NULL",
2500 "ike-dh": "3072MODPgr",
2505 @tag_fixme_vpp_workers
2506 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2511 IKE_NODE_SUFFIX = "ip6"
2513 def config_tc(self):
2516 "del_sa_from_responder": True,
2519 "ike-crypto": ("AES-GCM-16ICV", 32),
2520 "ike-integ": "NULL",
2521 "ike-dh": "2048MODPgr",
2522 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2523 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2528 @tag_fixme_vpp_workers
2529 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2531 Test for keep alive messages
2534 def send_empty_req_from_responder(self):
2535 packet = self.create_empty_request()
2536 self.pg0.add_stream(packet)
2537 self.pg0.enable_capture()
2539 capture = self.pg0.get_capture(1)
2540 ih = self.get_ike_header(capture[0])
2541 self.assertEqual(ih.id, self.sa.msg_id)
2542 plain = self.sa.hmac_and_decrypt(ih)
2543 self.assertEqual(plain, b"")
2544 self.assert_counter(1, "keepalive", "ip4")
2545 r = self.vapi.ikev2_sa_dump()
2546 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2547 r = self.vapi.ikev2_sa_v2_dump()
2548 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2550 def test_initiator(self):
2551 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2552 self.send_empty_req_from_responder()
2555 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2556 """malformed packet test"""
2561 def config_tc(self):
2562 self.config_params()
2564 def create_ike_init_msg(self, length=None, payload=None):
2567 init_SPI="\x11" * 8,
2569 exch_type="IKE_SA_INIT",
2571 if payload is not None:
2573 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2575 def verify_bad_packet_length(self):
2576 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2577 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2578 self.assert_counter(self.pkt_count, "bad_length")
2580 def verify_bad_sa_payload_length(self):
2581 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2582 ike_msg = self.create_ike_init_msg(payload=p)
2583 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2584 self.assert_counter(self.pkt_count, "malformed_packet")
2586 def test_responder(self):
2587 self.pkt_count = 254
2588 self.verify_bad_packet_length()
2589 self.verify_bad_sa_payload_length()
2592 if __name__ == "__main__":
2593 unittest.main(testRunner=VppTestRunner)