3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import VppTestCase
23 from asfframework import (
24 tag_fixme_vpp_workers,
31 from vpp_ikev2 import Profile, IDType, AuthMethod
32 from vpp_papi import VppEnum
39 KEY_PAD = b"Key Pad for IKEv2"
46 # tuple structure is (p, g, key_len)
51 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
69 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
70 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
71 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
72 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
73 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
74 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
75 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
76 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
77 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
78 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
79 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
80 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
81 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
82 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
83 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
84 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
92 class CryptoAlgo(object):
93 def __init__(self, name, cipher, mode):
97 if self.cipher is not None:
98 self.bs = self.cipher.block_size // 8
100 if self.name == "AES-GCM-16ICV":
101 self.iv_len = GCM_IV_SIZE
103 self.iv_len = self.bs
105 def encrypt(self, data, key, aad=None):
106 iv = os.urandom(self.iv_len)
109 self.cipher(key), self.mode(iv), default_backend()
111 return iv + encryptor.update(data) + encryptor.finalize()
113 salt = key[-SALT_SIZE:]
116 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
118 encryptor.authenticate_additional_data(aad)
119 data = encryptor.update(data) + encryptor.finalize()
120 data += encryptor.tag[:GCM_ICV_SIZE]
123 def decrypt(self, data, key, aad=None, icv=None):
125 iv = data[: self.iv_len]
126 ct = data[self.iv_len :]
128 algorithms.AES(key), self.mode(iv), default_backend()
130 return decryptor.update(ct) + decryptor.finalize()
132 salt = key[-SALT_SIZE:]
133 nonce = salt + data[:GCM_IV_SIZE]
134 ct = data[GCM_IV_SIZE:]
135 key = key[:-SALT_SIZE]
137 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
139 decryptor.authenticate_additional_data(aad)
140 return decryptor.update(ct) + decryptor.finalize()
143 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
144 data = data + b"\x00" * (pad_len - 1)
145 return data + bytes([pad_len - 1])
148 class AuthAlgo(object):
149 def __init__(self, name, mac, mod, key_len, trunc_len=None):
153 self.key_len = key_len
154 self.trunc_len = trunc_len or key_len
158 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
159 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
160 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
164 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
165 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
166 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
167 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
168 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
172 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
173 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
189 class IKEv2ChildSA(object):
190 def __init__(self, local_ts, remote_ts, is_initiator):
198 self.local_ts = local_ts
199 self.remote_ts = remote_ts
202 class IKEv2SA(object):
209 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
215 auth_method="shared-key",
221 self.udp_encap = udp_encap
231 self.dh_params = None
233 self.priv_key = priv_key
234 self.is_initiator = is_initiator
235 nonce = nonce or os.urandom(32)
236 self.auth_data = auth_data
239 if isinstance(id_type, str):
240 self.id_type = IDType.value(id_type)
242 self.id_type = id_type
243 self.auth_method = auth_method
244 if self.is_initiator:
245 self.rspi = 8 * b"\x00"
250 self.ispi = 8 * b"\x00"
252 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
254 def new_msg_id(self):
259 def my_dh_pub_key(self):
260 if self.is_initiator:
261 return self.i_dh_data
262 return self.r_dh_data
265 def peer_dh_pub_key(self):
266 if self.is_initiator:
267 return self.r_dh_data
268 return self.i_dh_data
272 return self.i_natt or self.r_natt
274 def compute_secret(self):
275 priv = self.dh_private_key
276 peer = self.peer_dh_pub_key
277 p, g, l = self.ike_group
279 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
282 def generate_dh_data(self):
284 if self.ike_dh not in DH:
285 raise NotImplementedError("%s not in DH group" % self.ike_dh)
287 if self.dh_params is None:
288 dhg = DH[self.ike_dh]
289 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
290 self.dh_params = pn.parameters(default_backend())
292 priv = self.dh_params.generate_private_key()
293 pub = priv.public_key()
294 x = priv.private_numbers().x
295 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
296 y = pub.public_numbers().y
298 if self.is_initiator:
299 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
301 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
303 def complete_dh_data(self):
304 self.dh_shared_secret = self.compute_secret()
306 def calc_child_keys(self, kex=False):
307 prf = self.ike_prf_alg.mod()
308 s = self.i_nonce + self.r_nonce
310 s = self.dh_shared_secret + s
311 c = self.child_sas[0]
313 encr_key_len = self.esp_crypto_key_len
314 integ_key_len = self.esp_integ_alg.key_len
315 salt_len = 0 if integ_key_len else 4
317 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
318 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
321 c.sk_ei = keymat[pos : pos + encr_key_len]
325 c.sk_ai = keymat[pos : pos + integ_key_len]
328 c.salt_ei = keymat[pos : pos + salt_len]
331 c.sk_er = keymat[pos : pos + encr_key_len]
335 c.sk_ar = keymat[pos : pos + integ_key_len]
338 c.salt_er = keymat[pos : pos + salt_len]
341 def calc_prfplus(self, prf, key, seed, length):
345 while len(r) < length and x < 255:
350 s = s + seed + bytes([x])
351 t = self.calc_prf(prf, key, s)
359 def calc_prf(self, prf, key, data):
360 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
364 def calc_keys(self, sk_d=None):
365 prf = self.ike_prf_alg.mod()
367 # SKEYSEED = prf(Ni | Nr, g^ir)
368 self.skeyseed = self.calc_prf(
369 prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
372 # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
373 self.skeyseed = self.calc_prf(
374 prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
377 # calculate S = Ni | Nr | SPIi SPIr
378 s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
380 prf_key_trunc = self.ike_prf_alg.trunc_len
381 encr_key_len = self.ike_crypto_key_len
382 tr_prf_key_len = self.ike_prf_alg.key_len
383 integ_key_len = self.ike_integ_alg.key_len
384 if integ_key_len == 0:
396 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
399 self.sk_d = keymat[: pos + prf_key_trunc]
402 self.sk_ai = keymat[pos : pos + integ_key_len]
404 self.sk_ar = keymat[pos : pos + integ_key_len]
407 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
408 pos += encr_key_len + salt_size
409 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
410 pos += encr_key_len + salt_size
412 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
413 pos += tr_prf_key_len
414 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
416 def generate_authmsg(self, prf, packet):
417 if self.is_initiator:
425 data = bytes([self.id_type, 0, 0, 0]) + id
426 id_hash = self.calc_prf(prf, key, data)
427 return packet + nonce + id_hash
430 prf = self.ike_prf_alg.mod()
431 if self.is_initiator:
432 packet = self.init_req_packet
434 packet = self.init_resp_packet
435 authmsg = self.generate_authmsg(prf, raw(packet))
436 if self.auth_method == "shared-key":
437 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
438 self.auth_data = self.calc_prf(prf, psk, authmsg)
439 elif self.auth_method == "rsa-sig":
440 self.auth_data = self.priv_key.sign(
441 authmsg, padding.PKCS1v15(), hashes.SHA1()
444 raise TypeError("unknown auth method type!")
446 def encrypt(self, data, aad=None):
447 data = self.ike_crypto_alg.pad(data)
448 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
451 def peer_authkey(self):
452 if self.is_initiator:
457 def my_authkey(self):
458 if self.is_initiator:
463 def my_cryptokey(self):
464 if self.is_initiator:
469 def peer_cryptokey(self):
470 if self.is_initiator:
474 def concat(self, alg, key_len):
475 return alg + "-" + str(key_len * 8)
478 def vpp_ike_cypto_alg(self):
479 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
482 def vpp_esp_cypto_alg(self):
483 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
485 def verify_hmac(self, ikemsg):
486 integ_trunc = self.ike_integ_alg.trunc_len
487 exp_hmac = ikemsg[-integ_trunc:]
488 data = ikemsg[:-integ_trunc]
489 computed_hmac = self.compute_hmac(
490 self.ike_integ_alg.mod(), self.peer_authkey, data
492 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
494 def compute_hmac(self, integ, key, data):
495 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
499 def decrypt(self, data, aad=None, icv=None):
500 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
502 def hmac_and_decrypt(self, ike):
503 ep = ike[ikev2.IKEv2_payload_Encrypted]
504 if self.ike_crypto == "AES-GCM-16ICV":
505 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
506 ct = ep.load[:-GCM_ICV_SIZE]
507 tag = ep.load[-GCM_ICV_SIZE:]
508 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
510 self.verify_hmac(raw(ike))
511 integ_trunc = self.ike_integ_alg.trunc_len
513 # remove ICV and decrypt payload
514 ct = ep.load[:-integ_trunc]
515 plain = self.decrypt(ct)
518 return plain[: -pad_len - 1]
520 def build_ts_addr(self, ts, version):
522 "starting_address_v" + version: ts["start_addr"],
523 "ending_address_v" + version: ts["end_addr"],
526 def generate_ts(self, is_ip4):
527 c = self.child_sas[0]
528 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
530 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
531 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
532 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
533 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
535 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
536 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
537 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
538 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
540 if self.is_initiator:
541 return ([ts1], [ts2])
542 return ([ts2], [ts1])
544 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
545 if crypto not in CRYPTO_ALGOS:
546 raise TypeError("unsupported encryption algo %r" % crypto)
547 self.ike_crypto = crypto
548 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
549 self.ike_crypto_key_len = crypto_key_len
551 if integ not in AUTH_ALGOS:
552 raise TypeError("unsupported auth algo %r" % integ)
553 self.ike_integ = None if integ == "NULL" else integ
554 self.ike_integ_alg = AUTH_ALGOS[integ]
556 if prf not in PRF_ALGOS:
557 raise TypeError("unsupported prf algo %r" % prf)
559 self.ike_prf_alg = PRF_ALGOS[prf]
561 self.ike_group = DH[self.ike_dh]
563 def set_esp_props(self, crypto, crypto_key_len, integ):
564 self.esp_crypto_key_len = crypto_key_len
565 if crypto not in CRYPTO_ALGOS:
566 raise TypeError("unsupported encryption algo %r" % crypto)
567 self.esp_crypto = crypto
568 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
570 if integ not in AUTH_ALGOS:
571 raise TypeError("unsupported auth algo %r" % integ)
572 self.esp_integ = None if integ == "NULL" else integ
573 self.esp_integ_alg = AUTH_ALGOS[integ]
575 def crypto_attr(self, key_len):
576 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
577 return (0x800E << 16 | key_len << 3, 12)
579 raise Exception("unsupported attribute type")
581 def ike_crypto_attr(self):
582 return self.crypto_attr(self.ike_crypto_key_len)
584 def esp_crypto_attr(self):
585 return self.crypto_attr(self.esp_crypto_key_len)
587 def compute_nat_sha1(self, ip, port, rspi=None):
590 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
591 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
593 return digest.finalize()
595 def clone(self, test, **kwargs):
596 if "spi" not in kwargs:
597 kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
598 if "nonce" not in kwargs:
599 kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
601 if "local_ts" not in kwargs:
602 kwargs["local_ts"] = self.child_sas[0].local_ts
603 if "remote_ts" not in kwargs:
604 kwargs["remote_ts"] = self.child_sas[0].remote_ts
607 is_initiator=self.is_initiator,
610 id_type=self.id_type,
611 auth_data=self.auth_data,
612 auth_method=self.auth_method,
613 priv_key=self.priv_key,
616 udp_encap=self.udp_encap,
621 crypto=self.ike_crypto,
622 crypto_key_len=self.ike_crypto_key_len,
623 integ=self.ike_integ,
628 crypto=self.esp_crypto,
629 crypto_key_len=self.esp_crypto_key_len,
630 integ=self.esp_integ,
635 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
636 class IkePeer(VppTestCase):
637 """common class for initiator and responder"""
641 import scapy.contrib.ikev2 as _ikev2
643 globals()["ikev2"] = _ikev2
644 super(IkePeer, cls).setUpClass()
645 cls.create_pg_interfaces(range(2))
646 for i in cls.pg_interfaces:
654 def tearDownClass(cls):
655 super(IkePeer, cls).tearDownClass()
658 super(IkePeer, self).tearDown()
659 if self.del_sa_from_responder:
660 self.initiate_del_sa_from_responder()
662 self.initiate_del_sa_from_initiator()
663 r = self.vapi.ikev2_sa_dump()
664 self.assertEqual(len(r), 0)
665 sas = self.vapi.ipsec_sa_dump()
666 self.assertEqual(len(sas), 0)
667 self.p.remove_vpp_config()
668 self.assertIsNone(self.p.query_vpp_config())
671 super(IkePeer, self).setUp()
673 self.p.add_vpp_config()
674 self.assertIsNotNone(self.p.query_vpp_config())
675 if self.sa.is_initiator:
676 self.sa.generate_dh_data()
677 self.vapi.cli("ikev2 set logging level 4")
678 self.vapi.cli("event-lo clear")
680 def assert_counter(self, count, name, version="ip4"):
681 node_name = "/err/ikev2-%s/" % version + name
682 self.assertEqual(count, self.statistics.get_err_counter(node_name))
684 def create_rekey_request(self, kex=False):
685 sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
686 header = ikev2.IKEv2(
687 init_SPI=self.sa.ispi,
688 resp_SPI=self.sa.rspi,
689 id=self.sa.new_msg_id(),
691 exch_type="CREATE_CHILD_SA",
694 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
695 return self.create_packet(
696 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
699 def create_sa_rekey_request(self, **kwargs):
700 sa = self.generate_sa_init_payload(**kwargs)
701 header = ikev2.IKEv2(
702 init_SPI=self.sa.ispi,
703 resp_SPI=self.sa.rspi,
704 id=self.sa.new_msg_id(),
706 exch_type="CREATE_CHILD_SA",
708 ike_msg = self.encrypt_ike_msg(header, sa, "SA")
709 return self.create_packet(
710 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
713 def create_empty_request(self):
714 header = ikev2.IKEv2(
715 init_SPI=self.sa.ispi,
716 resp_SPI=self.sa.rspi,
717 id=self.sa.new_msg_id(),
719 exch_type="INFORMATIONAL",
720 next_payload="Encrypted",
723 msg = self.encrypt_ike_msg(header, b"", None)
724 return self.create_packet(
725 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
729 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
732 src_ip = src_if.remote_ip6
733 dst_ip = src_if.local_ip6
736 src_ip = src_if.remote_ip4
737 dst_ip = src_if.local_ip4
740 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
741 / ip_layer(src=src_ip, dst=dst_ip)
742 / UDP(sport=sport, dport=dport)
745 # insert non ESP marker
746 res = res / Raw(b"\x00" * 4)
749 def verify_udp(self, udp):
750 self.assertEqual(udp.sport, self.sa.sport)
751 self.assertEqual(udp.dport, self.sa.dport)
753 def get_ike_header(self, packet):
755 ih = packet[ikev2.IKEv2]
756 ih = self.verify_and_remove_non_esp_marker(ih)
757 except IndexError as e:
758 # this is a workaround for getting IKEv2 layer as both ikev2 and
759 # ipsec register for port 4500
761 ih = self.verify_and_remove_non_esp_marker(esp)
762 self.assertEqual(ih.version, 0x20)
763 self.assertNotIn("Version", ih.flags)
766 def verify_and_remove_non_esp_marker(self, packet):
768 # if we are in nat traversal mode check for non esp marker
771 self.assertEqual(data[:4], b"\x00" * 4)
772 return ikev2.IKEv2(data[4:])
776 def encrypt_ike_msg(self, header, plain, first_payload):
777 if self.sa.ike_crypto == "AES-GCM-16ICV":
778 data = self.sa.ike_crypto_alg.pad(raw(plain))
783 + len(ikev2.IKEv2_payload_Encrypted())
785 tlen = plen + len(ikev2.IKEv2())
788 sk_p = ikev2.IKEv2_payload_Encrypted(
789 next_payload=first_payload, length=plen
793 encr = self.sa.encrypt(raw(plain), raw(res))
794 sk_p = ikev2.IKEv2_payload_Encrypted(
795 next_payload=first_payload, length=plen, load=encr
799 encr = self.sa.encrypt(raw(plain))
800 trunc_len = self.sa.ike_integ_alg.trunc_len
801 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
802 tlen = plen + len(ikev2.IKEv2())
804 sk_p = ikev2.IKEv2_payload_Encrypted(
805 next_payload=first_payload, length=plen, load=encr
810 integ_data = raw(res)
811 hmac_data = self.sa.compute_hmac(
812 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
814 res = res / Raw(hmac_data[:trunc_len])
815 assert len(res) == tlen
818 def verify_udp_encap(self, ipsec_sa):
819 e = VppEnum.vl_api_ipsec_sad_flags_t
820 if self.sa.udp_encap or self.sa.natt:
821 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
823 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
825 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
826 sas = self.vapi.ipsec_sa_dump()
829 # after rekey there is a short period of time in which old
830 # inbound SA is still present
834 self.assertEqual(len(sas), sa_count)
835 if self.sa.is_initiator:
850 c = self.sa.child_sas[0]
852 self.verify_udp_encap(sa0)
853 self.verify_udp_encap(sa1)
854 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
855 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
856 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
858 if self.sa.esp_integ is None:
861 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
862 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
863 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
866 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
867 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
868 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
869 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
873 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
874 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
875 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
876 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
878 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
879 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
881 def verify_keymat(self, api_keys, keys, name):
882 km = getattr(keys, name)
883 api_km = getattr(api_keys, name)
884 api_km_len = getattr(api_keys, name + "_len")
885 self.assertEqual(len(km), api_km_len)
886 self.assertEqual(km, api_km[:api_km_len])
888 def verify_id(self, api_id, exp_id):
889 self.assertEqual(api_id.type, IDType.value(exp_id.type))
890 self.assertEqual(api_id.data_len, exp_id.data_len)
891 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
893 def verify_ike_sas(self, is_rekey=False):
894 r = self.vapi.ikev2_sa_dump()
901 self.assertEqual(len(r), sa_count)
902 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
903 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
905 if self.sa.is_initiator:
906 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
907 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
909 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
910 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
912 if self.sa.is_initiator:
913 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
914 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
916 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
917 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
918 self.verify_keymat(sa.keys, self.sa, "sk_d")
919 self.verify_keymat(sa.keys, self.sa, "sk_ai")
920 self.verify_keymat(sa.keys, self.sa, "sk_ar")
921 self.verify_keymat(sa.keys, self.sa, "sk_ei")
922 self.verify_keymat(sa.keys, self.sa, "sk_er")
923 self.verify_keymat(sa.keys, self.sa, "sk_pi")
924 self.verify_keymat(sa.keys, self.sa, "sk_pr")
926 self.assertEqual(sa.i_id.type, self.sa.id_type)
927 self.assertEqual(sa.r_id.type, self.sa.id_type)
928 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
929 self.assertEqual(sa.r_id.data_len, len(self.idr))
930 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
931 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
933 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
934 self.verify_nonce(n, self.sa.i_nonce)
935 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
936 self.verify_nonce(n, self.sa.r_nonce)
938 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
940 self.assertEqual(len(r), 0)
943 self.assertEqual(len(r), 1)
945 self.assertEqual(csa.sa_index, sa.sa_index)
946 c = self.sa.child_sas[0]
947 if hasattr(c, "sk_ai"):
948 self.verify_keymat(csa.keys, c, "sk_ai")
949 self.verify_keymat(csa.keys, c, "sk_ar")
950 self.verify_keymat(csa.keys, c, "sk_ei")
951 self.verify_keymat(csa.keys, c, "sk_er")
952 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
953 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
955 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
958 r = self.vapi.ikev2_traffic_selector_dump(
959 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
961 self.assertEqual(len(r), 1)
963 self.verify_ts(r[0].ts, tsi[0], True)
965 r = self.vapi.ikev2_traffic_selector_dump(
966 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
968 self.assertEqual(len(r), 1)
969 self.verify_ts(r[0].ts, tsr[0], False)
971 def verify_nonce(self, api_nonce, nonce):
972 self.assertEqual(api_nonce.data_len, len(nonce))
973 self.assertEqual(api_nonce.nonce, nonce)
975 def verify_ts(self, api_ts, ts, is_initiator):
977 self.assertTrue(api_ts.is_local)
979 self.assertFalse(api_ts.is_local)
982 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
983 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
985 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
986 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
987 self.assertEqual(api_ts.start_port, ts.start_port)
988 self.assertEqual(api_ts.end_port, ts.end_port)
989 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
992 class TemplateInitiator(IkePeer):
993 """initiator test template"""
995 def initiate_del_sa_from_initiator(self):
996 ispi = int.from_bytes(self.sa.ispi, "little")
997 self.pg0.enable_capture()
999 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
1000 capture = self.pg0.get_capture(1)
1001 ih = self.get_ike_header(capture[0])
1002 self.assertNotIn("Response", ih.flags)
1003 self.assertIn("Initiator", ih.flags)
1004 self.assertEqual(ih.init_SPI, self.sa.ispi)
1005 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1006 plain = self.sa.hmac_and_decrypt(ih)
1007 d = ikev2.IKEv2_payload_Delete(plain)
1008 self.assertEqual(d.proto, 1) # proto=IKEv2
1009 header = ikev2.IKEv2(
1010 init_SPI=self.sa.ispi,
1011 resp_SPI=self.sa.rspi,
1013 exch_type="INFORMATIONAL",
1015 next_payload="Encrypted",
1017 resp = self.encrypt_ike_msg(header, b"", None)
1018 self.send_and_assert_no_replies(self.pg0, resp)
1020 def verify_del_sa(self, packet):
1021 ih = self.get_ike_header(packet)
1022 self.assertEqual(ih.id, self.sa.msg_id)
1023 self.assertEqual(ih.exch_type, 37) # exchange informational
1024 self.assertIn("Response", ih.flags)
1025 self.assertIn("Initiator", ih.flags)
1026 plain = self.sa.hmac_and_decrypt(ih)
1027 self.assertEqual(plain, b"")
1029 def initiate_del_sa_from_responder(self):
1030 header = ikev2.IKEv2(
1031 init_SPI=self.sa.ispi,
1032 resp_SPI=self.sa.rspi,
1033 exch_type="INFORMATIONAL",
1034 id=self.sa.new_msg_id(),
1036 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1037 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1038 packet = self.create_packet(
1039 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1041 self.pg0.add_stream(packet)
1042 self.pg0.enable_capture()
1044 capture = self.pg0.get_capture(1)
1045 self.verify_del_sa(capture[0])
1048 def find_notify_payload(packet, notify_type):
1049 n = packet[ikev2.IKEv2_payload_Notify]
1050 while n is not None:
1051 if n.type == notify_type:
1056 def verify_nat_detection(self, packet):
1063 # NAT_DETECTION_SOURCE_IP
1064 s = self.find_notify_payload(packet, 16388)
1065 self.assertIsNotNone(s)
1066 src_sha = self.sa.compute_nat_sha1(
1067 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
1069 self.assertEqual(s.load, src_sha)
1071 # NAT_DETECTION_DESTINATION_IP
1072 s = self.find_notify_payload(packet, 16389)
1073 self.assertIsNotNone(s)
1074 dst_sha = self.sa.compute_nat_sha1(
1075 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1077 self.assertEqual(s.load, dst_sha)
1079 def verify_sa_init_request(self, packet):
1081 self.sa.dport = udp.sport
1082 ih = packet[ikev2.IKEv2]
1083 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1084 self.assertEqual(ih.exch_type, 34) # SA_INIT
1085 self.sa.ispi = ih.init_SPI
1086 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1087 self.assertIn("Initiator", ih.flags)
1088 self.assertNotIn("Response", ih.flags)
1089 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1090 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1092 prop = packet[ikev2.IKEv2_payload_Proposal]
1093 self.assertEqual(prop.proto, 1) # proto = ikev2
1094 self.assertEqual(prop.proposal, 1)
1095 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1097 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1099 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1100 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1101 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1102 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1104 self.verify_nat_detection(packet)
1105 self.sa.set_ike_props(
1106 crypto="AES-GCM-16ICV",
1109 prf="PRF_HMAC_SHA2_256",
1112 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1113 self.sa.generate_dh_data()
1114 self.sa.complete_dh_data()
1117 def update_esp_transforms(self, trans, sa):
1119 if trans.transform_type == 1: # ecryption
1120 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1121 elif trans.transform_type == 3: # integrity
1122 sa.esp_integ = INTEG_IDS[trans.transform_id]
1123 trans = trans.payload
1125 def verify_sa_auth_req(self, packet):
1127 self.sa.dport = udp.sport
1128 ih = self.get_ike_header(packet)
1129 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1130 self.assertEqual(ih.init_SPI, self.sa.ispi)
1131 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1132 self.assertIn("Initiator", ih.flags)
1133 self.assertNotIn("Response", ih.flags)
1136 self.verify_udp(udp)
1137 self.assertEqual(ih.id, self.sa.msg_id + 1)
1139 plain = self.sa.hmac_and_decrypt(ih)
1140 idi = ikev2.IKEv2_payload_IDi(plain)
1141 self.assertEqual(idi.load, self.sa.i_id)
1142 if self.no_idr_auth:
1143 self.assertEqual(idi.next_payload, 39) # AUTH
1145 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1146 self.assertEqual(idr.load, self.sa.r_id)
1147 prop = idi[ikev2.IKEv2_payload_Proposal]
1148 c = self.sa.child_sas[0]
1150 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1152 def send_init_response(self):
1153 tr_attr = self.sa.ike_crypto_attr()
1155 ikev2.IKEv2_payload_Transform(
1156 transform_type="Encryption",
1157 transform_id=self.sa.ike_crypto,
1159 key_length=tr_attr[0],
1161 / ikev2.IKEv2_payload_Transform(
1162 transform_type="Integrity", transform_id=self.sa.ike_integ
1164 / ikev2.IKEv2_payload_Transform(
1165 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1167 / ikev2.IKEv2_payload_Transform(
1168 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1171 props = ikev2.IKEv2_payload_Proposal(
1172 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1175 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1177 dst_address = b"\x0a\x0a\x0a\x0a"
1179 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1180 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1181 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1183 self.sa.init_resp_packet = (
1185 init_SPI=self.sa.ispi,
1186 resp_SPI=self.sa.rspi,
1187 exch_type="IKE_SA_INIT",
1190 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1191 / ikev2.IKEv2_payload_KE(
1192 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1194 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1195 / ikev2.IKEv2_payload_Notify(
1196 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1198 / ikev2.IKEv2_payload_Notify(
1199 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1203 ike_msg = self.create_packet(
1205 self.sa.init_resp_packet,
1211 self.pg_send(self.pg0, ike_msg)
1212 capture = self.pg0.get_capture(1)
1213 self.verify_sa_auth_req(capture[0])
1215 def initiate_sa_init(self):
1216 self.pg0.enable_capture()
1218 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1220 capture = self.pg0.get_capture(1)
1221 self.verify_sa_init_request(capture[0])
1222 self.send_init_response()
1224 def send_auth_response(self):
1225 tr_attr = self.sa.esp_crypto_attr()
1227 ikev2.IKEv2_payload_Transform(
1228 transform_type="Encryption",
1229 transform_id=self.sa.esp_crypto,
1231 key_length=tr_attr[0],
1233 / ikev2.IKEv2_payload_Transform(
1234 transform_type="Integrity", transform_id=self.sa.esp_integ
1236 / ikev2.IKEv2_payload_Transform(
1237 transform_type="Extended Sequence Number", transform_id="No ESN"
1239 / ikev2.IKEv2_payload_Transform(
1240 transform_type="Extended Sequence Number", transform_id="ESN"
1244 c = self.sa.child_sas[0]
1245 props = ikev2.IKEv2_payload_Proposal(
1246 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1249 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1251 ikev2.IKEv2_payload_IDi(
1252 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1254 / ikev2.IKEv2_payload_IDr(
1255 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1257 / ikev2.IKEv2_payload_AUTH(
1259 auth_type=AuthMethod.value(self.sa.auth_method),
1260 load=self.sa.auth_data,
1262 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1263 / ikev2.IKEv2_payload_TSi(
1264 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1266 / ikev2.IKEv2_payload_TSr(
1267 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1269 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1272 header = ikev2.IKEv2(
1273 init_SPI=self.sa.ispi,
1274 resp_SPI=self.sa.rspi,
1275 id=self.sa.new_msg_id(),
1277 exch_type="IKE_AUTH",
1280 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1281 packet = self.create_packet(
1282 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1284 self.pg_send(self.pg0, packet)
1286 def test_initiator(self):
1287 self.initiate_sa_init()
1289 self.sa.calc_child_keys()
1290 self.send_auth_response()
1291 self.verify_ike_sas()
1294 class TemplateResponder(IkePeer):
1295 """responder test template"""
1297 def initiate_del_sa_from_responder(self):
1298 self.pg0.enable_capture()
1300 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1301 capture = self.pg0.get_capture(1)
1302 ih = self.get_ike_header(capture[0])
1303 self.assertNotIn("Response", ih.flags)
1304 self.assertNotIn("Initiator", ih.flags)
1305 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1306 plain = self.sa.hmac_and_decrypt(ih)
1307 d = ikev2.IKEv2_payload_Delete(plain)
1308 self.assertEqual(d.proto, 1) # proto=IKEv2
1309 self.assertEqual(ih.init_SPI, self.sa.ispi)
1310 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1311 header = ikev2.IKEv2(
1312 init_SPI=self.sa.ispi,
1313 resp_SPI=self.sa.rspi,
1314 flags="Initiator+Response",
1315 exch_type="INFORMATIONAL",
1317 next_payload="Encrypted",
1319 resp = self.encrypt_ike_msg(header, b"", None)
1320 self.send_and_assert_no_replies(self.pg0, resp)
1322 def verify_del_sa(self, packet):
1323 ih = self.get_ike_header(packet)
1324 self.assertEqual(ih.id, self.sa.msg_id)
1325 self.assertEqual(ih.exch_type, 37) # exchange informational
1326 self.assertIn("Response", ih.flags)
1327 self.assertNotIn("Initiator", ih.flags)
1328 self.assertEqual(ih.next_payload, 46) # Encrypted
1329 self.assertEqual(ih.init_SPI, self.sa.ispi)
1330 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1331 plain = self.sa.hmac_and_decrypt(ih)
1332 self.assertEqual(plain, b"")
1334 def initiate_del_sa_from_initiator(self):
1335 header = ikev2.IKEv2(
1336 init_SPI=self.sa.ispi,
1337 resp_SPI=self.sa.rspi,
1339 exch_type="INFORMATIONAL",
1340 id=self.sa.new_msg_id(),
1342 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1343 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1344 packet = self.create_packet(
1345 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1347 self.pg0.add_stream(packet)
1348 self.pg0.enable_capture()
1350 capture = self.pg0.get_capture(1)
1351 self.verify_del_sa(capture[0])
1353 def generate_sa_init_payload(
1354 self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
1356 tr_attr = self.sa.ike_crypto_attr()
1358 ikev2.IKEv2_payload_Transform(
1359 transform_type="Encryption",
1360 transform_id=self.sa.ike_crypto,
1362 key_length=tr_attr[0],
1364 / ikev2.IKEv2_payload_Transform(
1365 transform_type="Integrity", transform_id=self.sa.ike_integ
1367 / ikev2.IKEv2_payload_Transform(
1368 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1370 / ikev2.IKEv2_payload_Transform(
1371 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1378 pargs = {"SPI": spi, "SPIsize": len(spi)}
1379 props = ikev2.IKEv2_payload_Proposal(
1388 ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1389 / ikev2.IKEv2_payload_KE(
1390 next_payload="Nonce",
1391 group=self.sa.ike_dh,
1392 load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
1394 / ikev2.IKEv2_payload_Nonce(
1395 next_payload=next_payload,
1396 load=self.sa.i_nonce if nonce is None else nonce,
1400 def send_sa_init_req(self):
1401 self.sa.init_req_packet = ikev2.IKEv2(
1402 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1403 ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
1407 src_address = b"\x0a\x0a\x0a\x01"
1409 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1412 dst_address = b"\x0a\x0a\x0a\x0a"
1414 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1416 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1417 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1418 nat_src_detection = ikev2.IKEv2_payload_Notify(
1419 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1421 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1422 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1424 self.sa.init_req_packet = (
1425 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1428 ike_msg = self.create_packet(
1430 self.sa.init_req_packet,
1436 self.pg0.add_stream(ike_msg)
1437 self.pg0.enable_capture()
1439 capture = self.pg0.get_capture(1)
1440 self.verify_sa_init(capture[0])
1442 def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1443 tr_attr = self.sa.esp_crypto_attr()
1444 last_payload = last_payload or "Notify"
1447 ikev2.IKEv2_payload_Transform(
1448 transform_type="Encryption",
1449 transform_id=self.sa.esp_crypto,
1451 key_length=tr_attr[0],
1453 / ikev2.IKEv2_payload_Transform(
1454 transform_type="Integrity", transform_id=self.sa.esp_integ
1456 / ikev2.IKEv2_payload_Transform(
1457 transform_type="Extended Sequence Number", transform_id="No ESN"
1459 / ikev2.IKEv2_payload_Transform(
1460 transform_type="Extended Sequence Number", transform_id="ESN"
1466 trans /= ikev2.IKEv2_payload_Transform(
1467 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1470 c = self.sa.child_sas[0]
1471 props = ikev2.IKEv2_payload_Proposal(
1480 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1482 ikev2.IKEv2_payload_AUTH(
1484 auth_type=AuthMethod.value(self.sa.auth_method),
1485 load=self.sa.auth_data,
1487 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1488 / ikev2.IKEv2_payload_TSi(
1489 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1491 / ikev2.IKEv2_payload_TSr(
1492 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1497 first_payload = "Nonce"
1499 head = ikev2.IKEv2_payload_Nonce(
1500 load=self.sa.i_nonce, next_payload="KE"
1501 ) / ikev2.IKEv2_payload_KE(
1502 group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1505 head = ikev2.IKEv2_payload_Nonce(
1506 load=self.sa.i_nonce, next_payload="SA"
1511 / ikev2.IKEv2_payload_Notify(
1515 length=8 + len(c.ispi),
1516 next_payload="Notify",
1518 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1521 first_payload = "IDi"
1522 if self.no_idr_auth:
1523 ids = ikev2.IKEv2_payload_IDi(
1524 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1527 ids = ikev2.IKEv2_payload_IDi(
1528 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1529 ) / ikev2.IKEv2_payload_IDr(
1530 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1533 return plain, first_payload
1535 def send_sa_auth(self):
1536 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1537 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1538 header = ikev2.IKEv2(
1539 init_SPI=self.sa.ispi,
1540 resp_SPI=self.sa.rspi,
1541 id=self.sa.new_msg_id(),
1543 exch_type="IKE_AUTH",
1546 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1547 packet = self.create_packet(
1548 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1550 self.pg0.add_stream(packet)
1551 self.pg0.enable_capture()
1553 capture = self.pg0.get_capture(1)
1554 self.verify_sa_auth_resp(capture[0])
1556 def verify_sa_init(self, packet):
1557 ih = self.get_ike_header(packet)
1559 self.assertEqual(ih.id, self.sa.msg_id)
1560 self.assertEqual(ih.exch_type, 34)
1561 self.assertIn("Response", ih.flags)
1562 self.assertEqual(ih.init_SPI, self.sa.ispi)
1563 self.assertNotEqual(ih.resp_SPI, 0)
1564 self.sa.rspi = ih.resp_SPI
1566 sa = ih[ikev2.IKEv2_payload_SA]
1567 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1568 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1569 except IndexError as e:
1570 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1571 self.logger.error(ih.show())
1573 self.sa.complete_dh_data()
1577 def verify_sa_auth_resp(self, packet):
1578 ike = self.get_ike_header(packet)
1580 self.verify_udp(udp)
1581 self.assertEqual(ike.id, self.sa.msg_id)
1582 plain = self.sa.hmac_and_decrypt(ike)
1583 idr = ikev2.IKEv2_payload_IDr(plain)
1584 prop = idr[ikev2.IKEv2_payload_Proposal]
1585 self.assertEqual(prop.SPIsize, 4)
1586 self.sa.child_sas[0].rspi = prop.SPI
1587 self.sa.calc_child_keys()
1589 IKE_NODE_SUFFIX = "ip4"
1591 def verify_counters(self):
1592 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1593 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1594 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1596 r = self.vapi.ikev2_sa_dump()
1598 self.assertEqual(1, s.n_sa_auth_req)
1599 self.assertEqual(1, s.n_sa_init_req)
1601 def test_responder(self):
1602 self.send_sa_init_req()
1604 self.verify_ipsec_sas()
1605 self.verify_ike_sas()
1606 self.verify_counters()
1609 class Ikev2Params(object):
1610 def config_params(self, params={}):
1611 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1612 ei = VppEnum.vl_api_ipsec_integ_alg_t
1614 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1615 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1616 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1617 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1618 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1619 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1620 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1621 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1622 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1623 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1626 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1628 self.vapi.cli("ikev2 dpd disable")
1629 self.del_sa_from_responder = (
1631 if "del_sa_from_responder" not in params
1632 else params["del_sa_from_responder"]
1634 i_natt = False if "i_natt" not in params else params["i_natt"]
1635 r_natt = False if "r_natt" not in params else params["r_natt"]
1636 self.p = Profile(self, "pr1")
1637 self.ip6 = False if "ip6" not in params else params["ip6"]
1639 if "auth" in params and params["auth"] == "rsa-sig":
1640 auth_method = "rsa-sig"
1641 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1642 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1644 client_file = work_dir + params["client-cert"]
1645 server_pem = open(work_dir + params["server-cert"]).read()
1646 client_priv = open(work_dir + params["client-key"]).read()
1647 client_priv = load_pem_private_key(
1648 str.encode(client_priv), None, default_backend()
1650 self.peer_cert = x509.load_pem_x509_certificate(
1651 str.encode(server_pem), default_backend()
1653 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1656 auth_data = b"$3cr3tpa$$w0rd"
1657 self.p.add_auth(method="shared-key", data=auth_data)
1658 auth_method = "shared-key"
1661 is_init = True if "is_initiator" not in params else params["is_initiator"]
1662 self.no_idr_auth = params.get("no_idr_in_auth", False)
1664 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1665 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1666 r_id = self.idr = idr["data"]
1667 i_id = self.idi = idi["data"]
1669 # scapy is initiator, VPP is responder
1670 self.p.add_local_id(**idr)
1671 self.p.add_remote_id(**idi)
1672 if self.no_idr_auth:
1675 # VPP is initiator, scapy is responder
1676 self.p.add_local_id(**idi)
1677 if not self.no_idr_auth:
1678 self.p.add_remote_id(**idr)
1681 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1682 if "loc_ts" not in params
1683 else params["loc_ts"]
1686 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1687 if "rem_ts" not in params
1688 else params["rem_ts"]
1690 self.p.add_local_ts(**loc_ts)
1691 self.p.add_remote_ts(**rem_ts)
1692 if "responder" in params:
1693 self.p.add_responder(params["responder"])
1694 if "ike_transforms" in params:
1695 self.p.add_ike_transforms(params["ike_transforms"])
1696 if "esp_transforms" in params:
1697 self.p.add_esp_transforms(params["esp_transforms"])
1699 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1701 self.p.set_udp_encap(True)
1703 if "responder_hostname" in params:
1704 hn = params["responder_hostname"]
1705 self.p.add_responder_hostname(hn)
1707 # configure static dns record
1708 self.vapi.dns_name_server_add_del(
1709 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1711 self.vapi.dns_enable_disable(enable=1)
1713 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1720 is_initiator=is_init,
1721 id_type=self.p.local_id["id_type"],
1724 priv_key=client_priv,
1725 auth_method=auth_method,
1726 nonce=params.get("nonce"),
1727 auth_data=auth_data,
1728 udp_encap=udp_encap,
1729 local_ts=self.p.remote_ts,
1730 remote_ts=self.p.local_ts,
1735 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1738 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1740 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1743 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1746 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1749 self.sa.set_ike_props(
1750 crypto=ike_crypto[0],
1751 crypto_key_len=ike_crypto[1],
1753 prf="PRF_HMAC_SHA2_256",
1756 self.sa.set_esp_props(
1757 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1761 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1762 class TestApi(VppTestCase):
1763 """Test IKEV2 API"""
1766 def setUpClass(cls):
1767 super(TestApi, cls).setUpClass()
1770 def tearDownClass(cls):
1771 super(TestApi, cls).tearDownClass()
1774 super(TestApi, self).tearDown()
1775 self.p1.remove_vpp_config()
1776 self.p2.remove_vpp_config()
1777 r = self.vapi.ikev2_profile_dump()
1778 self.assertEqual(len(r), 0)
1780 def configure_profile(self, cfg):
1781 p = Profile(self, cfg["name"])
1782 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1783 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1784 p.add_local_ts(**cfg["loc_ts"])
1785 p.add_remote_ts(**cfg["rem_ts"])
1786 p.add_responder(cfg["responder"])
1787 p.add_ike_transforms(cfg["ike_ts"])
1788 p.add_esp_transforms(cfg["esp_ts"])
1789 p.add_auth(**cfg["auth"])
1790 p.set_udp_encap(cfg["udp_encap"])
1791 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1792 if "lifetime_data" in cfg:
1793 p.set_lifetime_data(cfg["lifetime_data"])
1794 if "tun_itf" in cfg:
1795 p.set_tunnel_interface(cfg["tun_itf"])
1796 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1801 def test_profile_api(self):
1802 """test profile dump API"""
1807 "start_addr": "3.3.3.2",
1808 "end_addr": "3.3.3.3",
1814 "start_addr": "4.5.76.80",
1815 "end_addr": "2.3.4.6",
1822 "start_addr": "ab::1",
1823 "end_addr": "ab::4",
1829 "start_addr": "cd::12",
1830 "end_addr": "cd::13",
1836 "natt_disabled": True,
1837 "loc_id": ("fqdn", b"vpp.home"),
1838 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1841 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1844 "crypto_key_size": 32,
1848 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1849 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1851 "ipsec_over_udp_port": 4501,
1854 "lifetime_maxdata": 20192,
1855 "lifetime_jitter": 9,
1861 "loc_id": ("ip4-addr", b"192.168.2.1"),
1862 "rem_id": ("ip6-addr", b"abcd::1"),
1865 "responder": {"sw_if_index": 4, "addr": "def::10"},
1868 "crypto_key_size": 16,
1872 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1873 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1875 "ipsec_over_udp_port": 4600,
1879 self.p1 = self.configure_profile(conf["p1"])
1880 self.p2 = self.configure_profile(conf["p2"])
1882 r = self.vapi.ikev2_profile_dump()
1883 self.assertEqual(len(r), 2)
1884 self.verify_profile(r[0].profile, conf["p1"])
1885 self.verify_profile(r[1].profile, conf["p2"])
1887 def verify_id(self, api_id, cfg_id):
1888 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1889 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1891 def verify_ts(self, api_ts, cfg_ts):
1892 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1893 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1894 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1895 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1896 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1898 def verify_responder(self, api_r, cfg_r):
1899 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1900 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1902 def verify_transforms(self, api_ts, cfg_ts):
1903 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1904 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1905 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1907 def verify_ike_transforms(self, api_ts, cfg_ts):
1908 self.verify_transforms(api_ts, cfg_ts)
1909 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1911 def verify_esp_transforms(self, api_ts, cfg_ts):
1912 self.verify_transforms(api_ts, cfg_ts)
1914 def verify_auth(self, api_auth, cfg_auth):
1915 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1916 self.assertEqual(api_auth.data, cfg_auth["data"])
1917 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1919 def verify_lifetime_data(self, p, ld):
1920 self.assertEqual(p.lifetime, ld["lifetime"])
1921 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1922 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1923 self.assertEqual(p.handover, ld["handover"])
1925 def verify_profile(self, ap, cp):
1926 self.assertEqual(ap.name, cp["name"])
1927 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1928 self.verify_id(ap.loc_id, cp["loc_id"])
1929 self.verify_id(ap.rem_id, cp["rem_id"])
1930 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1931 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1932 self.verify_responder(ap.responder, cp["responder"])
1933 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1934 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1935 self.verify_auth(ap.auth, cp["auth"])
1936 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1937 self.assertTrue(natt_dis == ap.natt_disabled)
1939 if "lifetime_data" in cp:
1940 self.verify_lifetime_data(ap, cp["lifetime_data"])
1941 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1943 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1945 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1948 @tag_fixme_vpp_workers
1949 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1950 """test responder - responder behind NAT"""
1952 IKE_NODE_SUFFIX = "ip4-natt"
1954 def config_tc(self):
1955 self.config_params({"r_natt": True})
1958 @tag_fixme_vpp_workers
1959 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1960 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1962 def config_tc(self):
1966 "is_initiator": False, # seen from test case perspective
1967 # thus vpp is initiator
1969 "sw_if_index": self.pg0.sw_if_index,
1970 "addr": self.pg0.remote_ip4,
1972 "ike-crypto": ("AES-GCM-16ICV", 32),
1973 "ike-integ": "NULL",
1974 "ike-dh": "3072MODPgr",
1976 "crypto_alg": 20, # "aes-gcm-16"
1977 "crypto_key_size": 256,
1978 "dh_group": 15, # "modp-3072"
1981 "crypto_alg": 12, # "aes-cbc"
1982 "crypto_key_size": 256,
1983 # "hmac-sha2-256-128"
1990 @tag_fixme_vpp_workers
1991 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1992 """test ikev2 initiator - pre shared key auth"""
1994 def config_tc(self):
1997 "is_initiator": False, # seen from test case perspective
1998 # thus vpp is initiator
1999 "ike-crypto": ("AES-GCM-16ICV", 32),
2000 "ike-integ": "NULL",
2001 "ike-dh": "3072MODPgr",
2003 "crypto_alg": 20, # "aes-gcm-16"
2004 "crypto_key_size": 256,
2005 "dh_group": 15, # "modp-3072"
2008 "crypto_alg": 12, # "aes-cbc"
2009 "crypto_key_size": 256,
2010 # "hmac-sha2-256-128"
2013 "responder_hostname": {
2014 "hostname": "vpp.responder.org",
2015 "sw_if_index": self.pg0.sw_if_index,
2021 @tag_fixme_vpp_workers
2022 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
2023 """test initiator - request window size (1)"""
2025 def rekey_respond(self, req, update_child_sa_data):
2026 ih = self.get_ike_header(req)
2027 plain = self.sa.hmac_and_decrypt(ih)
2028 sa = ikev2.IKEv2_payload_SA(plain)
2029 if update_child_sa_data:
2030 prop = sa[ikev2.IKEv2_payload_Proposal]
2031 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2032 self.sa.r_nonce = self.sa.i_nonce
2033 self.sa.child_sas[0].ispi = prop.SPI
2034 self.sa.child_sas[0].rspi = prop.SPI
2035 self.sa.calc_child_keys()
2037 header = ikev2.IKEv2(
2038 init_SPI=self.sa.ispi,
2039 resp_SPI=self.sa.rspi,
2043 next_payload="Encrypted",
2045 resp = self.encrypt_ike_msg(header, sa, "SA")
2046 packet = self.create_packet(
2047 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2049 self.send_and_assert_no_replies(self.pg0, packet)
2051 def test_initiator(self):
2052 super(TestInitiatorRequestWindowSize, self).test_initiator()
2053 self.pg0.enable_capture()
2055 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2056 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2057 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2058 capture = self.pg0.get_capture(2)
2060 # reply in reverse order
2061 self.rekey_respond(capture[1], True)
2062 self.rekey_respond(capture[0], False)
2064 # verify that only the second request was accepted
2065 self.verify_ike_sas()
2066 self.verify_ipsec_sas(is_rekey=True)
2069 @tag_fixme_vpp_workers
2070 class TestInitiatorRekey(TestInitiatorPsk):
2071 """test ikev2 initiator - rekey"""
2073 def rekey_from_initiator(self):
2074 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2075 self.pg0.enable_capture()
2077 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2078 capture = self.pg0.get_capture(1)
2079 ih = self.get_ike_header(capture[0])
2080 self.assertEqual(ih.exch_type, 36) # CHILD_SA
2081 self.assertNotIn("Response", ih.flags)
2082 self.assertIn("Initiator", ih.flags)
2083 plain = self.sa.hmac_and_decrypt(ih)
2084 sa = ikev2.IKEv2_payload_SA(plain)
2085 prop = sa[ikev2.IKEv2_payload_Proposal]
2086 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2087 self.sa.r_nonce = self.sa.i_nonce
2088 # update new responder SPI
2089 self.sa.child_sas[0].ispi = prop.SPI
2090 self.sa.child_sas[0].rspi = prop.SPI
2091 self.sa.calc_child_keys()
2092 header = ikev2.IKEv2(
2093 init_SPI=self.sa.ispi,
2094 resp_SPI=self.sa.rspi,
2098 next_payload="Encrypted",
2100 resp = self.encrypt_ike_msg(header, sa, "SA")
2101 packet = self.create_packet(
2102 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2104 self.send_and_assert_no_replies(self.pg0, packet)
2106 def test_initiator(self):
2107 super(TestInitiatorRekey, self).test_initiator()
2108 self.rekey_from_initiator()
2109 self.verify_ike_sas()
2110 self.verify_ipsec_sas(is_rekey=True)
2113 @tag_fixme_vpp_workers
2114 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2115 """test ikev2 initiator - delete IKE SA from responder"""
2117 def config_tc(self):
2120 "del_sa_from_responder": True,
2121 "is_initiator": False, # seen from test case perspective
2122 # thus vpp is initiator
2124 "sw_if_index": self.pg0.sw_if_index,
2125 "addr": self.pg0.remote_ip4,
2127 "ike-crypto": ("AES-GCM-16ICV", 32),
2128 "ike-integ": "NULL",
2129 "ike-dh": "3072MODPgr",
2131 "crypto_alg": 20, # "aes-gcm-16"
2132 "crypto_key_size": 256,
2133 "dh_group": 15, # "modp-3072"
2136 "crypto_alg": 12, # "aes-cbc"
2137 "crypto_key_size": 256,
2138 # "hmac-sha2-256-128"
2141 "no_idr_in_auth": True,
2146 @tag_fixme_vpp_workers
2147 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2148 """test ikev2 responder - initiator behind NAT"""
2150 IKE_NODE_SUFFIX = "ip4-natt"
2152 def config_tc(self):
2153 self.config_params({"i_natt": True})
2156 @tag_fixme_vpp_workers
2157 class TestResponderPsk(TemplateResponder, Ikev2Params):
2158 """test ikev2 responder - pre shared key auth"""
2160 def config_tc(self):
2161 self.config_params()
2164 @tag_fixme_vpp_workers
2165 class TestResponderDpd(TestResponderPsk):
2167 Dead peer detection test
2170 def config_tc(self):
2171 self.config_params({"dpd_disabled": False})
2176 def test_responder(self):
2177 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2178 super(TestResponderDpd, self).test_responder()
2179 self.pg0.enable_capture()
2181 # capture empty request but don't reply
2182 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2183 ih = self.get_ike_header(capture[0])
2184 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2185 plain = self.sa.hmac_and_decrypt(ih)
2186 self.assertEqual(plain, b"")
2187 # wait for SA expiration
2189 ike_sas = self.vapi.ikev2_sa_dump()
2190 self.assertEqual(len(ike_sas), 0)
2191 ipsec_sas = self.vapi.ipsec_sa_dump()
2192 self.assertEqual(len(ipsec_sas), 0)
2195 @tag_fixme_vpp_workers
2196 class TestResponderRekey(TestResponderPsk):
2197 """test ikev2 responder - rekey"""
2201 def send_rekey_from_initiator(self):
2203 self.sa.generate_dh_data()
2204 packet = self.create_rekey_request(kex=self.WITH_KEX)
2205 self.pg0.add_stream(packet)
2206 self.pg0.enable_capture()
2208 capture = self.pg0.get_capture(1)
2211 def process_rekey_response(self, capture):
2212 ih = self.get_ike_header(capture[0])
2213 plain = self.sa.hmac_and_decrypt(ih)
2214 sa = ikev2.IKEv2_payload_SA(plain)
2215 prop = sa[ikev2.IKEv2_payload_Proposal]
2216 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2217 # update new responder SPI
2218 self.sa.child_sas[0].rspi = prop.SPI
2220 self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2221 self.sa.complete_dh_data()
2222 self.sa.calc_child_keys(kex=self.WITH_KEX)
2224 def test_responder(self):
2225 super(TestResponderRekey, self).test_responder()
2226 self.process_rekey_response(self.send_rekey_from_initiator())
2227 self.verify_ike_sas()
2228 self.verify_ipsec_sas(is_rekey=True)
2229 self.assert_counter(1, "rekey_req", "ip4")
2230 r = self.vapi.ikev2_sa_dump()
2231 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2234 @tag_fixme_vpp_workers
2235 class TestResponderRekeyRepeat(TestResponderRekey):
2236 """test ikev2 responder - rekey repeat"""
2238 def test_responder(self):
2239 super(TestResponderRekeyRepeat, self).test_responder()
2240 # rekey request is not accepted until old IPsec SA is expired
2241 capture = self.send_rekey_from_initiator()
2242 ih = self.get_ike_header(capture[0])
2243 plain = self.sa.hmac_and_decrypt(ih)
2244 notify = ikev2.IKEv2_payload_Notify(plain)
2245 self.assertEqual(notify.type, 43)
2246 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2247 # rekey request is accepted after old IPsec SA was expired
2249 if len(self.vapi.ipsec_sa_dump()) != 3:
2253 self.fail("old IPsec SA not expired")
2254 self.process_rekey_response(self.send_rekey_from_initiator())
2255 self.verify_ike_sas()
2256 self.verify_ipsec_sas(sa_count=3)
2259 @tag_fixme_vpp_workers
2260 class TestResponderRekeyKEX(TestResponderRekey):
2261 """test ikev2 responder - rekey with key exchange"""
2266 @tag_fixme_vpp_workers
2267 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2268 """test ikev2 responder - rekey repeat with key exchange"""
2273 @tag_fixme_vpp_workers
2274 class TestResponderRekeySA(TestResponderPsk):
2275 """test ikev2 responder - rekey IKE SA"""
2277 def send_rekey_from_initiator(self, newsa):
2278 packet = self.create_sa_rekey_request(
2280 dh_pub_key=newsa.my_dh_pub_key,
2281 nonce=newsa.i_nonce,
2283 self.pg0.add_stream(packet)
2284 self.pg0.enable_capture()
2286 capture = self.pg0.get_capture(1)
2289 def process_rekey_response(self, newsa, capture):
2290 ih = self.get_ike_header(capture[0])
2291 plain = self.sa.hmac_and_decrypt(ih)
2292 sa = ikev2.IKEv2_payload_SA(plain)
2293 prop = sa[ikev2.IKEv2_payload_Proposal]
2294 newsa.rspi = prop.SPI
2295 newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2296 newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2297 newsa.complete_dh_data()
2298 newsa.calc_keys(sk_d=self.sa.sk_d)
2299 newsa.child_sas = self.sa.child_sas
2300 self.sa.child_sas = []
2302 def test_responder(self):
2303 super(TestResponderRekeySA, self).test_responder()
2304 newsa = self.sa.clone(self, spi=os.urandom(8))
2305 newsa.generate_dh_data()
2306 capture = self.send_rekey_from_initiator(newsa)
2307 self.process_rekey_response(newsa, capture)
2308 self.verify_ike_sas(is_rekey=True)
2309 self.assert_counter(1, "rekey_req", "ip4")
2310 r = self.vapi.ikev2_sa_dump()
2311 self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
2312 self.initiate_del_sa_from_initiator()
2314 self.verify_ike_sas()
2317 @tag_fixme_ubuntu2204
2319 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2320 """test ikev2 responder - non-default table id"""
2323 def setUpClass(cls):
2324 import scapy.contrib.ikev2 as _ikev2
2326 globals()["ikev2"] = _ikev2
2327 super(IkePeer, cls).setUpClass()
2328 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2332 cls.create_pg_interfaces(range(1))
2333 cls.vapi.cli("ip table add 1")
2334 cls.vapi.cli("set interface ip table pg0 1")
2335 for i in cls.pg_interfaces:
2342 def config_tc(self):
2343 self.config_params({"dpd_disabled": False})
2345 def test_responder(self):
2346 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2347 super(TestResponderVrf, self).test_responder()
2348 self.pg0.enable_capture()
2350 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2351 ih = self.get_ike_header(capture[0])
2352 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2353 plain = self.sa.hmac_and_decrypt(ih)
2354 self.assertEqual(plain, b"")
2357 @tag_fixme_vpp_workers
2358 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2359 """test ikev2 responder - cert based auth"""
2361 def config_tc(self):
2366 "server-key": "server-key.pem",
2367 "client-key": "client-key.pem",
2368 "client-cert": "client-cert.pem",
2369 "server-cert": "server-cert.pem",
2374 @tag_fixme_vpp_workers
2375 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2376 TemplateResponder, Ikev2Params
2379 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2382 def config_tc(self):
2385 "ike-crypto": ("AES-CBC", 16),
2386 "ike-integ": "SHA2-256-128",
2387 "esp-crypto": ("AES-CBC", 24),
2388 "esp-integ": "SHA2-384-192",
2389 "ike-dh": "2048MODPgr",
2390 "nonce": os.urandom(256),
2391 "no_idr_in_auth": True,
2396 @tag_fixme_vpp_workers
2397 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2398 TemplateResponder, Ikev2Params
2402 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2405 def config_tc(self):
2408 "ike-crypto": ("AES-CBC", 32),
2409 "ike-integ": "SHA2-256-128",
2410 "esp-crypto": ("AES-GCM-16ICV", 32),
2411 "esp-integ": "NULL",
2412 "ike-dh": "3072MODPgr",
2417 @tag_fixme_vpp_workers
2418 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2423 IKE_NODE_SUFFIX = "ip6"
2425 def config_tc(self):
2428 "del_sa_from_responder": True,
2431 "ike-crypto": ("AES-GCM-16ICV", 32),
2432 "ike-integ": "NULL",
2433 "ike-dh": "2048MODPgr",
2434 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2435 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2440 @tag_fixme_vpp_workers
2441 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2443 Test for keep alive messages
2446 def send_empty_req_from_responder(self):
2447 packet = self.create_empty_request()
2448 self.pg0.add_stream(packet)
2449 self.pg0.enable_capture()
2451 capture = self.pg0.get_capture(1)
2452 ih = self.get_ike_header(capture[0])
2453 self.assertEqual(ih.id, self.sa.msg_id)
2454 plain = self.sa.hmac_and_decrypt(ih)
2455 self.assertEqual(plain, b"")
2456 self.assert_counter(1, "keepalive", "ip4")
2457 r = self.vapi.ikev2_sa_dump()
2458 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2460 def test_initiator(self):
2461 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2462 self.send_empty_req_from_responder()
2465 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2466 """malformed packet test"""
2471 def config_tc(self):
2472 self.config_params()
2474 def create_ike_init_msg(self, length=None, payload=None):
2477 init_SPI="\x11" * 8,
2479 exch_type="IKE_SA_INIT",
2481 if payload is not None:
2483 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2485 def verify_bad_packet_length(self):
2486 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2487 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2488 self.assert_counter(self.pkt_count, "bad_length")
2490 def verify_bad_sa_payload_length(self):
2491 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2492 ike_msg = self.create_ike_init_msg(payload=p)
2493 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2494 self.assert_counter(self.pkt_count, "malformed_packet")
2496 def test_responder(self):
2497 self.pkt_count = 254
2498 self.verify_bad_packet_length()
2499 self.verify_bad_sa_payload_length()
2502 if __name__ == "__main__":
2503 unittest.main(testRunner=VppTestRunner)