3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import VppTestCase
23 from asfframework import (
24 tag_fixme_vpp_workers,
31 from vpp_ikev2 import Profile, IDType, AuthMethod
32 from vpp_papi import VppEnum
39 KEY_PAD = b"Key Pad for IKEv2"
46 # tuple structure is (p, g, key_len)
51 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
69 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
70 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
71 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
72 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
73 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
74 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
75 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
76 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
77 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
78 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
79 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
80 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
81 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
82 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
83 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
84 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
92 class CryptoAlgo(object):
93 def __init__(self, name, cipher, mode):
97 if self.cipher is not None:
98 self.bs = self.cipher.block_size // 8
100 if self.name == "AES-GCM-16ICV":
101 self.iv_len = GCM_IV_SIZE
103 self.iv_len = self.bs
105 def encrypt(self, data, key, aad=None):
106 iv = os.urandom(self.iv_len)
109 self.cipher(key), self.mode(iv), default_backend()
111 return iv + encryptor.update(data) + encryptor.finalize()
113 salt = key[-SALT_SIZE:]
116 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
118 encryptor.authenticate_additional_data(aad)
119 data = encryptor.update(data) + encryptor.finalize()
120 data += encryptor.tag[:GCM_ICV_SIZE]
123 def decrypt(self, data, key, aad=None, icv=None):
125 iv = data[: self.iv_len]
126 ct = data[self.iv_len :]
128 algorithms.AES(key), self.mode(iv), default_backend()
130 return decryptor.update(ct) + decryptor.finalize()
132 salt = key[-SALT_SIZE:]
133 nonce = salt + data[:GCM_IV_SIZE]
134 ct = data[GCM_IV_SIZE:]
135 key = key[:-SALT_SIZE]
137 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
139 decryptor.authenticate_additional_data(aad)
140 return decryptor.update(ct) + decryptor.finalize()
143 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
144 data = data + b"\x00" * (pad_len - 1)
145 return data + bytes([pad_len - 1])
148 class AuthAlgo(object):
149 def __init__(self, name, mac, mod, key_len, trunc_len=None):
153 self.key_len = key_len
154 self.trunc_len = trunc_len or key_len
158 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
159 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
160 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
164 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
165 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
166 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
167 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
168 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
172 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
173 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
189 class IKEv2ChildSA(object):
190 def __init__(self, local_ts, remote_ts, is_initiator):
198 self.local_ts = local_ts
199 self.remote_ts = remote_ts
202 class IKEv2SA(object):
209 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
215 auth_method="shared-key",
221 self.udp_encap = udp_encap
231 self.dh_params = None
233 self.priv_key = priv_key
234 self.is_initiator = is_initiator
235 nonce = nonce or os.urandom(32)
236 self.auth_data = auth_data
239 if isinstance(id_type, str):
240 self.id_type = IDType.value(id_type)
242 self.id_type = id_type
243 self.auth_method = auth_method
244 if self.is_initiator:
245 self.rspi = 8 * b"\x00"
250 self.ispi = 8 * b"\x00"
252 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
254 def new_msg_id(self):
259 def my_dh_pub_key(self):
260 if self.is_initiator:
261 return self.i_dh_data
262 return self.r_dh_data
265 def peer_dh_pub_key(self):
266 if self.is_initiator:
267 return self.r_dh_data
268 return self.i_dh_data
272 return self.i_natt or self.r_natt
274 def compute_secret(self):
275 priv = self.dh_private_key
276 peer = self.peer_dh_pub_key
277 p, g, l = self.ike_group
279 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
282 def generate_dh_data(self):
284 if self.ike_dh not in DH:
285 raise NotImplementedError("%s not in DH group" % self.ike_dh)
287 if self.dh_params is None:
288 dhg = DH[self.ike_dh]
289 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
290 self.dh_params = pn.parameters(default_backend())
292 priv = self.dh_params.generate_private_key()
293 pub = priv.public_key()
294 x = priv.private_numbers().x
295 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
296 y = pub.public_numbers().y
298 if self.is_initiator:
299 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
301 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
303 def complete_dh_data(self):
304 self.dh_shared_secret = self.compute_secret()
306 def calc_child_keys(self, kex=False):
307 prf = self.ike_prf_alg.mod()
308 s = self.i_nonce + self.r_nonce
310 s = self.dh_shared_secret + s
311 c = self.child_sas[0]
313 encr_key_len = self.esp_crypto_key_len
314 integ_key_len = self.esp_integ_alg.key_len
315 salt_len = 0 if integ_key_len else 4
317 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
318 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
321 c.sk_ei = keymat[pos : pos + encr_key_len]
325 c.sk_ai = keymat[pos : pos + integ_key_len]
328 c.salt_ei = keymat[pos : pos + salt_len]
331 c.sk_er = keymat[pos : pos + encr_key_len]
335 c.sk_ar = keymat[pos : pos + integ_key_len]
338 c.salt_er = keymat[pos : pos + salt_len]
341 def calc_prfplus(self, prf, key, seed, length):
345 while len(r) < length and x < 255:
350 s = s + seed + bytes([x])
351 t = self.calc_prf(prf, key, s)
359 def calc_prf(self, prf, key, data):
360 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
365 prf = self.ike_prf_alg.mod()
366 # SKEYSEED = prf(Ni | Nr, g^ir)
367 s = self.i_nonce + self.r_nonce
368 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
370 # calculate S = Ni | Nr | SPIi SPIr
371 s = s + self.ispi + self.rspi
373 prf_key_trunc = self.ike_prf_alg.trunc_len
374 encr_key_len = self.ike_crypto_key_len
375 tr_prf_key_len = self.ike_prf_alg.key_len
376 integ_key_len = self.ike_integ_alg.key_len
377 if integ_key_len == 0:
389 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
392 self.sk_d = keymat[: pos + prf_key_trunc]
395 self.sk_ai = keymat[pos : pos + integ_key_len]
397 self.sk_ar = keymat[pos : pos + integ_key_len]
400 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
401 pos += encr_key_len + salt_size
402 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
403 pos += encr_key_len + salt_size
405 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
406 pos += tr_prf_key_len
407 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
409 def generate_authmsg(self, prf, packet):
410 if self.is_initiator:
418 data = bytes([self.id_type, 0, 0, 0]) + id
419 id_hash = self.calc_prf(prf, key, data)
420 return packet + nonce + id_hash
423 prf = self.ike_prf_alg.mod()
424 if self.is_initiator:
425 packet = self.init_req_packet
427 packet = self.init_resp_packet
428 authmsg = self.generate_authmsg(prf, raw(packet))
429 if self.auth_method == "shared-key":
430 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
431 self.auth_data = self.calc_prf(prf, psk, authmsg)
432 elif self.auth_method == "rsa-sig":
433 self.auth_data = self.priv_key.sign(
434 authmsg, padding.PKCS1v15(), hashes.SHA1()
437 raise TypeError("unknown auth method type!")
439 def encrypt(self, data, aad=None):
440 data = self.ike_crypto_alg.pad(data)
441 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
444 def peer_authkey(self):
445 if self.is_initiator:
450 def my_authkey(self):
451 if self.is_initiator:
456 def my_cryptokey(self):
457 if self.is_initiator:
462 def peer_cryptokey(self):
463 if self.is_initiator:
467 def concat(self, alg, key_len):
468 return alg + "-" + str(key_len * 8)
471 def vpp_ike_cypto_alg(self):
472 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
475 def vpp_esp_cypto_alg(self):
476 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
478 def verify_hmac(self, ikemsg):
479 integ_trunc = self.ike_integ_alg.trunc_len
480 exp_hmac = ikemsg[-integ_trunc:]
481 data = ikemsg[:-integ_trunc]
482 computed_hmac = self.compute_hmac(
483 self.ike_integ_alg.mod(), self.peer_authkey, data
485 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
487 def compute_hmac(self, integ, key, data):
488 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
492 def decrypt(self, data, aad=None, icv=None):
493 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
495 def hmac_and_decrypt(self, ike):
496 ep = ike[ikev2.IKEv2_payload_Encrypted]
497 if self.ike_crypto == "AES-GCM-16ICV":
498 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
499 ct = ep.load[:-GCM_ICV_SIZE]
500 tag = ep.load[-GCM_ICV_SIZE:]
501 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
503 self.verify_hmac(raw(ike))
504 integ_trunc = self.ike_integ_alg.trunc_len
506 # remove ICV and decrypt payload
507 ct = ep.load[:-integ_trunc]
508 plain = self.decrypt(ct)
511 return plain[: -pad_len - 1]
513 def build_ts_addr(self, ts, version):
515 "starting_address_v" + version: ts["start_addr"],
516 "ending_address_v" + version: ts["end_addr"],
519 def generate_ts(self, is_ip4):
520 c = self.child_sas[0]
521 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
523 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
524 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
525 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
526 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
528 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
529 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
530 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
531 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
533 if self.is_initiator:
534 return ([ts1], [ts2])
535 return ([ts2], [ts1])
537 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
538 if crypto not in CRYPTO_ALGOS:
539 raise TypeError("unsupported encryption algo %r" % crypto)
540 self.ike_crypto = crypto
541 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
542 self.ike_crypto_key_len = crypto_key_len
544 if integ not in AUTH_ALGOS:
545 raise TypeError("unsupported auth algo %r" % integ)
546 self.ike_integ = None if integ == "NULL" else integ
547 self.ike_integ_alg = AUTH_ALGOS[integ]
549 if prf not in PRF_ALGOS:
550 raise TypeError("unsupported prf algo %r" % prf)
552 self.ike_prf_alg = PRF_ALGOS[prf]
554 self.ike_group = DH[self.ike_dh]
556 def set_esp_props(self, crypto, crypto_key_len, integ):
557 self.esp_crypto_key_len = crypto_key_len
558 if crypto not in CRYPTO_ALGOS:
559 raise TypeError("unsupported encryption algo %r" % crypto)
560 self.esp_crypto = crypto
561 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
563 if integ not in AUTH_ALGOS:
564 raise TypeError("unsupported auth algo %r" % integ)
565 self.esp_integ = None if integ == "NULL" else integ
566 self.esp_integ_alg = AUTH_ALGOS[integ]
568 def crypto_attr(self, key_len):
569 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
570 return (0x800E << 16 | key_len << 3, 12)
572 raise Exception("unsupported attribute type")
574 def ike_crypto_attr(self):
575 return self.crypto_attr(self.ike_crypto_key_len)
577 def esp_crypto_attr(self):
578 return self.crypto_attr(self.esp_crypto_key_len)
580 def compute_nat_sha1(self, ip, port, rspi=None):
583 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
584 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
586 return digest.finalize()
589 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
590 class IkePeer(VppTestCase):
591 """common class for initiator and responder"""
595 import scapy.contrib.ikev2 as _ikev2
597 globals()["ikev2"] = _ikev2
598 super(IkePeer, cls).setUpClass()
599 cls.create_pg_interfaces(range(2))
600 for i in cls.pg_interfaces:
608 def tearDownClass(cls):
609 super(IkePeer, cls).tearDownClass()
612 super(IkePeer, self).tearDown()
613 if self.del_sa_from_responder:
614 self.initiate_del_sa_from_responder()
616 self.initiate_del_sa_from_initiator()
617 r = self.vapi.ikev2_sa_dump()
618 self.assertEqual(len(r), 0)
619 sas = self.vapi.ipsec_sa_dump()
620 self.assertEqual(len(sas), 0)
621 self.p.remove_vpp_config()
622 self.assertIsNone(self.p.query_vpp_config())
625 super(IkePeer, self).setUp()
627 self.p.add_vpp_config()
628 self.assertIsNotNone(self.p.query_vpp_config())
629 if self.sa.is_initiator:
630 self.sa.generate_dh_data()
631 self.vapi.cli("ikev2 set logging level 4")
632 self.vapi.cli("event-lo clear")
634 def assert_counter(self, count, name, version="ip4"):
635 node_name = "/err/ikev2-%s/" % version + name
636 self.assertEqual(count, self.statistics.get_err_counter(node_name))
638 def create_rekey_request(self, kex=False):
639 sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
640 header = ikev2.IKEv2(
641 init_SPI=self.sa.ispi,
642 resp_SPI=self.sa.rspi,
643 id=self.sa.new_msg_id(),
645 exch_type="CREATE_CHILD_SA",
648 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
649 return self.create_packet(
650 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
653 def create_empty_request(self):
654 header = ikev2.IKEv2(
655 init_SPI=self.sa.ispi,
656 resp_SPI=self.sa.rspi,
657 id=self.sa.new_msg_id(),
659 exch_type="INFORMATIONAL",
660 next_payload="Encrypted",
663 msg = self.encrypt_ike_msg(header, b"", None)
664 return self.create_packet(
665 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
669 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
672 src_ip = src_if.remote_ip6
673 dst_ip = src_if.local_ip6
676 src_ip = src_if.remote_ip4
677 dst_ip = src_if.local_ip4
680 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
681 / ip_layer(src=src_ip, dst=dst_ip)
682 / UDP(sport=sport, dport=dport)
685 # insert non ESP marker
686 res = res / Raw(b"\x00" * 4)
689 def verify_udp(self, udp):
690 self.assertEqual(udp.sport, self.sa.sport)
691 self.assertEqual(udp.dport, self.sa.dport)
693 def get_ike_header(self, packet):
695 ih = packet[ikev2.IKEv2]
696 ih = self.verify_and_remove_non_esp_marker(ih)
697 except IndexError as e:
698 # this is a workaround for getting IKEv2 layer as both ikev2 and
699 # ipsec register for port 4500
701 ih = self.verify_and_remove_non_esp_marker(esp)
702 self.assertEqual(ih.version, 0x20)
703 self.assertNotIn("Version", ih.flags)
706 def verify_and_remove_non_esp_marker(self, packet):
708 # if we are in nat traversal mode check for non esp marker
711 self.assertEqual(data[:4], b"\x00" * 4)
712 return ikev2.IKEv2(data[4:])
716 def encrypt_ike_msg(self, header, plain, first_payload):
717 if self.sa.ike_crypto == "AES-GCM-16ICV":
718 data = self.sa.ike_crypto_alg.pad(raw(plain))
723 + len(ikev2.IKEv2_payload_Encrypted())
725 tlen = plen + len(ikev2.IKEv2())
728 sk_p = ikev2.IKEv2_payload_Encrypted(
729 next_payload=first_payload, length=plen
733 encr = self.sa.encrypt(raw(plain), raw(res))
734 sk_p = ikev2.IKEv2_payload_Encrypted(
735 next_payload=first_payload, length=plen, load=encr
739 encr = self.sa.encrypt(raw(plain))
740 trunc_len = self.sa.ike_integ_alg.trunc_len
741 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
742 tlen = plen + len(ikev2.IKEv2())
744 sk_p = ikev2.IKEv2_payload_Encrypted(
745 next_payload=first_payload, length=plen, load=encr
750 integ_data = raw(res)
751 hmac_data = self.sa.compute_hmac(
752 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
754 res = res / Raw(hmac_data[:trunc_len])
755 assert len(res) == tlen
758 def verify_udp_encap(self, ipsec_sa):
759 e = VppEnum.vl_api_ipsec_sad_flags_t
760 if self.sa.udp_encap or self.sa.natt:
761 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
763 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
765 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
766 sas = self.vapi.ipsec_sa_dump()
769 # after rekey there is a short period of time in which old
770 # inbound SA is still present
774 self.assertEqual(len(sas), sa_count)
775 if self.sa.is_initiator:
790 c = self.sa.child_sas[0]
792 self.verify_udp_encap(sa0)
793 self.verify_udp_encap(sa1)
794 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
795 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
796 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
798 if self.sa.esp_integ is None:
801 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
802 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
803 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
806 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
807 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
808 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
809 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
813 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
814 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
815 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
816 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
818 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
819 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
821 def verify_keymat(self, api_keys, keys, name):
822 km = getattr(keys, name)
823 api_km = getattr(api_keys, name)
824 api_km_len = getattr(api_keys, name + "_len")
825 self.assertEqual(len(km), api_km_len)
826 self.assertEqual(km, api_km[:api_km_len])
828 def verify_id(self, api_id, exp_id):
829 self.assertEqual(api_id.type, IDType.value(exp_id.type))
830 self.assertEqual(api_id.data_len, exp_id.data_len)
831 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
833 def verify_ike_sas(self):
834 r = self.vapi.ikev2_sa_dump()
835 self.assertEqual(len(r), 1)
837 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
838 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
840 if self.sa.is_initiator:
841 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
842 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
844 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
845 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
847 if self.sa.is_initiator:
848 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
849 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
851 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
852 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
853 self.verify_keymat(sa.keys, self.sa, "sk_d")
854 self.verify_keymat(sa.keys, self.sa, "sk_ai")
855 self.verify_keymat(sa.keys, self.sa, "sk_ar")
856 self.verify_keymat(sa.keys, self.sa, "sk_ei")
857 self.verify_keymat(sa.keys, self.sa, "sk_er")
858 self.verify_keymat(sa.keys, self.sa, "sk_pi")
859 self.verify_keymat(sa.keys, self.sa, "sk_pr")
861 self.assertEqual(sa.i_id.type, self.sa.id_type)
862 self.assertEqual(sa.r_id.type, self.sa.id_type)
863 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
864 self.assertEqual(sa.r_id.data_len, len(self.idr))
865 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
866 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
868 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
869 self.assertEqual(len(r), 1)
871 self.assertEqual(csa.sa_index, sa.sa_index)
872 c = self.sa.child_sas[0]
873 if hasattr(c, "sk_ai"):
874 self.verify_keymat(csa.keys, c, "sk_ai")
875 self.verify_keymat(csa.keys, c, "sk_ar")
876 self.verify_keymat(csa.keys, c, "sk_ei")
877 self.verify_keymat(csa.keys, c, "sk_er")
878 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
879 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
881 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
884 r = self.vapi.ikev2_traffic_selector_dump(
885 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
887 self.assertEqual(len(r), 1)
889 self.verify_ts(r[0].ts, tsi[0], True)
891 r = self.vapi.ikev2_traffic_selector_dump(
892 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
894 self.assertEqual(len(r), 1)
895 self.verify_ts(r[0].ts, tsr[0], False)
897 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
898 self.verify_nonce(n, self.sa.i_nonce)
899 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
900 self.verify_nonce(n, self.sa.r_nonce)
902 def verify_nonce(self, api_nonce, nonce):
903 self.assertEqual(api_nonce.data_len, len(nonce))
904 self.assertEqual(api_nonce.nonce, nonce)
906 def verify_ts(self, api_ts, ts, is_initiator):
908 self.assertTrue(api_ts.is_local)
910 self.assertFalse(api_ts.is_local)
913 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
914 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
916 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
917 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
918 self.assertEqual(api_ts.start_port, ts.start_port)
919 self.assertEqual(api_ts.end_port, ts.end_port)
920 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
923 class TemplateInitiator(IkePeer):
924 """initiator test template"""
926 def initiate_del_sa_from_initiator(self):
927 ispi = int.from_bytes(self.sa.ispi, "little")
928 self.pg0.enable_capture()
930 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
931 capture = self.pg0.get_capture(1)
932 ih = self.get_ike_header(capture[0])
933 self.assertNotIn("Response", ih.flags)
934 self.assertIn("Initiator", ih.flags)
935 self.assertEqual(ih.init_SPI, self.sa.ispi)
936 self.assertEqual(ih.resp_SPI, self.sa.rspi)
937 plain = self.sa.hmac_and_decrypt(ih)
938 d = ikev2.IKEv2_payload_Delete(plain)
939 self.assertEqual(d.proto, 1) # proto=IKEv2
940 header = ikev2.IKEv2(
941 init_SPI=self.sa.ispi,
942 resp_SPI=self.sa.rspi,
944 exch_type="INFORMATIONAL",
946 next_payload="Encrypted",
948 resp = self.encrypt_ike_msg(header, b"", None)
949 self.send_and_assert_no_replies(self.pg0, resp)
951 def verify_del_sa(self, packet):
952 ih = self.get_ike_header(packet)
953 self.assertEqual(ih.id, self.sa.msg_id)
954 self.assertEqual(ih.exch_type, 37) # exchange informational
955 self.assertIn("Response", ih.flags)
956 self.assertIn("Initiator", ih.flags)
957 plain = self.sa.hmac_and_decrypt(ih)
958 self.assertEqual(plain, b"")
960 def initiate_del_sa_from_responder(self):
961 header = ikev2.IKEv2(
962 init_SPI=self.sa.ispi,
963 resp_SPI=self.sa.rspi,
964 exch_type="INFORMATIONAL",
965 id=self.sa.new_msg_id(),
967 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
968 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
969 packet = self.create_packet(
970 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
972 self.pg0.add_stream(packet)
973 self.pg0.enable_capture()
975 capture = self.pg0.get_capture(1)
976 self.verify_del_sa(capture[0])
979 def find_notify_payload(packet, notify_type):
980 n = packet[ikev2.IKEv2_payload_Notify]
982 if n.type == notify_type:
987 def verify_nat_detection(self, packet):
994 # NAT_DETECTION_SOURCE_IP
995 s = self.find_notify_payload(packet, 16388)
996 self.assertIsNotNone(s)
997 src_sha = self.sa.compute_nat_sha1(
998 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
1000 self.assertEqual(s.load, src_sha)
1002 # NAT_DETECTION_DESTINATION_IP
1003 s = self.find_notify_payload(packet, 16389)
1004 self.assertIsNotNone(s)
1005 dst_sha = self.sa.compute_nat_sha1(
1006 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1008 self.assertEqual(s.load, dst_sha)
1010 def verify_sa_init_request(self, packet):
1012 self.sa.dport = udp.sport
1013 ih = packet[ikev2.IKEv2]
1014 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1015 self.assertEqual(ih.exch_type, 34) # SA_INIT
1016 self.sa.ispi = ih.init_SPI
1017 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1018 self.assertIn("Initiator", ih.flags)
1019 self.assertNotIn("Response", ih.flags)
1020 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1021 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1023 prop = packet[ikev2.IKEv2_payload_Proposal]
1024 self.assertEqual(prop.proto, 1) # proto = ikev2
1025 self.assertEqual(prop.proposal, 1)
1026 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1028 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1030 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1031 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1032 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1033 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1035 self.verify_nat_detection(packet)
1036 self.sa.set_ike_props(
1037 crypto="AES-GCM-16ICV",
1040 prf="PRF_HMAC_SHA2_256",
1043 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1044 self.sa.generate_dh_data()
1045 self.sa.complete_dh_data()
1048 def update_esp_transforms(self, trans, sa):
1050 if trans.transform_type == 1: # ecryption
1051 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1052 elif trans.transform_type == 3: # integrity
1053 sa.esp_integ = INTEG_IDS[trans.transform_id]
1054 trans = trans.payload
1056 def verify_sa_auth_req(self, packet):
1058 self.sa.dport = udp.sport
1059 ih = self.get_ike_header(packet)
1060 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1061 self.assertEqual(ih.init_SPI, self.sa.ispi)
1062 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1063 self.assertIn("Initiator", ih.flags)
1064 self.assertNotIn("Response", ih.flags)
1067 self.verify_udp(udp)
1068 self.assertEqual(ih.id, self.sa.msg_id + 1)
1070 plain = self.sa.hmac_and_decrypt(ih)
1071 idi = ikev2.IKEv2_payload_IDi(plain)
1072 self.assertEqual(idi.load, self.sa.i_id)
1073 if self.no_idr_auth:
1074 self.assertEqual(idi.next_payload, 39) # AUTH
1076 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1077 self.assertEqual(idr.load, self.sa.r_id)
1078 prop = idi[ikev2.IKEv2_payload_Proposal]
1079 c = self.sa.child_sas[0]
1081 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1083 def send_init_response(self):
1084 tr_attr = self.sa.ike_crypto_attr()
1086 ikev2.IKEv2_payload_Transform(
1087 transform_type="Encryption",
1088 transform_id=self.sa.ike_crypto,
1090 key_length=tr_attr[0],
1092 / ikev2.IKEv2_payload_Transform(
1093 transform_type="Integrity", transform_id=self.sa.ike_integ
1095 / ikev2.IKEv2_payload_Transform(
1096 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1098 / ikev2.IKEv2_payload_Transform(
1099 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1102 props = ikev2.IKEv2_payload_Proposal(
1103 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1106 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1108 dst_address = b"\x0a\x0a\x0a\x0a"
1110 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1111 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1112 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1114 self.sa.init_resp_packet = (
1116 init_SPI=self.sa.ispi,
1117 resp_SPI=self.sa.rspi,
1118 exch_type="IKE_SA_INIT",
1121 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1122 / ikev2.IKEv2_payload_KE(
1123 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1125 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1126 / ikev2.IKEv2_payload_Notify(
1127 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1129 / ikev2.IKEv2_payload_Notify(
1130 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1134 ike_msg = self.create_packet(
1136 self.sa.init_resp_packet,
1142 self.pg_send(self.pg0, ike_msg)
1143 capture = self.pg0.get_capture(1)
1144 self.verify_sa_auth_req(capture[0])
1146 def initiate_sa_init(self):
1147 self.pg0.enable_capture()
1149 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1151 capture = self.pg0.get_capture(1)
1152 self.verify_sa_init_request(capture[0])
1153 self.send_init_response()
1155 def send_auth_response(self):
1156 tr_attr = self.sa.esp_crypto_attr()
1158 ikev2.IKEv2_payload_Transform(
1159 transform_type="Encryption",
1160 transform_id=self.sa.esp_crypto,
1162 key_length=tr_attr[0],
1164 / ikev2.IKEv2_payload_Transform(
1165 transform_type="Integrity", transform_id=self.sa.esp_integ
1167 / ikev2.IKEv2_payload_Transform(
1168 transform_type="Extended Sequence Number", transform_id="No ESN"
1170 / ikev2.IKEv2_payload_Transform(
1171 transform_type="Extended Sequence Number", transform_id="ESN"
1175 c = self.sa.child_sas[0]
1176 props = ikev2.IKEv2_payload_Proposal(
1177 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1180 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1182 ikev2.IKEv2_payload_IDi(
1183 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1185 / ikev2.IKEv2_payload_IDr(
1186 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1188 / ikev2.IKEv2_payload_AUTH(
1190 auth_type=AuthMethod.value(self.sa.auth_method),
1191 load=self.sa.auth_data,
1193 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1194 / ikev2.IKEv2_payload_TSi(
1195 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1197 / ikev2.IKEv2_payload_TSr(
1198 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1200 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1203 header = ikev2.IKEv2(
1204 init_SPI=self.sa.ispi,
1205 resp_SPI=self.sa.rspi,
1206 id=self.sa.new_msg_id(),
1208 exch_type="IKE_AUTH",
1211 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1212 packet = self.create_packet(
1213 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1215 self.pg_send(self.pg0, packet)
1217 def test_initiator(self):
1218 self.initiate_sa_init()
1220 self.sa.calc_child_keys()
1221 self.send_auth_response()
1222 self.verify_ike_sas()
1225 class TemplateResponder(IkePeer):
1226 """responder test template"""
1228 def initiate_del_sa_from_responder(self):
1229 self.pg0.enable_capture()
1231 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1232 capture = self.pg0.get_capture(1)
1233 ih = self.get_ike_header(capture[0])
1234 self.assertNotIn("Response", ih.flags)
1235 self.assertNotIn("Initiator", ih.flags)
1236 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1237 plain = self.sa.hmac_and_decrypt(ih)
1238 d = ikev2.IKEv2_payload_Delete(plain)
1239 self.assertEqual(d.proto, 1) # proto=IKEv2
1240 self.assertEqual(ih.init_SPI, self.sa.ispi)
1241 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1242 header = ikev2.IKEv2(
1243 init_SPI=self.sa.ispi,
1244 resp_SPI=self.sa.rspi,
1245 flags="Initiator+Response",
1246 exch_type="INFORMATIONAL",
1248 next_payload="Encrypted",
1250 resp = self.encrypt_ike_msg(header, b"", None)
1251 self.send_and_assert_no_replies(self.pg0, resp)
1253 def verify_del_sa(self, packet):
1254 ih = self.get_ike_header(packet)
1255 self.assertEqual(ih.id, self.sa.msg_id)
1256 self.assertEqual(ih.exch_type, 37) # exchange informational
1257 self.assertIn("Response", ih.flags)
1258 self.assertNotIn("Initiator", ih.flags)
1259 self.assertEqual(ih.next_payload, 46) # Encrypted
1260 self.assertEqual(ih.init_SPI, self.sa.ispi)
1261 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1262 plain = self.sa.hmac_and_decrypt(ih)
1263 self.assertEqual(plain, b"")
1265 def initiate_del_sa_from_initiator(self):
1266 header = ikev2.IKEv2(
1267 init_SPI=self.sa.ispi,
1268 resp_SPI=self.sa.rspi,
1270 exch_type="INFORMATIONAL",
1271 id=self.sa.new_msg_id(),
1273 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1274 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1275 packet = self.create_packet(
1276 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1278 self.pg0.add_stream(packet)
1279 self.pg0.enable_capture()
1281 capture = self.pg0.get_capture(1)
1282 self.verify_del_sa(capture[0])
1284 def send_sa_init_req(self):
1285 tr_attr = self.sa.ike_crypto_attr()
1287 ikev2.IKEv2_payload_Transform(
1288 transform_type="Encryption",
1289 transform_id=self.sa.ike_crypto,
1291 key_length=tr_attr[0],
1293 / ikev2.IKEv2_payload_Transform(
1294 transform_type="Integrity", transform_id=self.sa.ike_integ
1296 / ikev2.IKEv2_payload_Transform(
1297 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1299 / ikev2.IKEv2_payload_Transform(
1300 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1304 props = ikev2.IKEv2_payload_Proposal(
1305 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1308 next_payload = None if self.ip6 else "Notify"
1310 self.sa.init_req_packet = (
1312 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1314 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1315 / ikev2.IKEv2_payload_KE(
1316 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1318 / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1323 src_address = b"\x0a\x0a\x0a\x01"
1325 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1328 dst_address = b"\x0a\x0a\x0a\x0a"
1330 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1332 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1333 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1334 nat_src_detection = ikev2.IKEv2_payload_Notify(
1335 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1337 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1338 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1340 self.sa.init_req_packet = (
1341 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1344 ike_msg = self.create_packet(
1346 self.sa.init_req_packet,
1352 self.pg0.add_stream(ike_msg)
1353 self.pg0.enable_capture()
1355 capture = self.pg0.get_capture(1)
1356 self.verify_sa_init(capture[0])
1358 def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1359 tr_attr = self.sa.esp_crypto_attr()
1360 last_payload = last_payload or "Notify"
1363 ikev2.IKEv2_payload_Transform(
1364 transform_type="Encryption",
1365 transform_id=self.sa.esp_crypto,
1367 key_length=tr_attr[0],
1369 / ikev2.IKEv2_payload_Transform(
1370 transform_type="Integrity", transform_id=self.sa.esp_integ
1372 / ikev2.IKEv2_payload_Transform(
1373 transform_type="Extended Sequence Number", transform_id="No ESN"
1375 / ikev2.IKEv2_payload_Transform(
1376 transform_type="Extended Sequence Number", transform_id="ESN"
1382 trans /= ikev2.IKEv2_payload_Transform(
1383 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1386 c = self.sa.child_sas[0]
1387 props = ikev2.IKEv2_payload_Proposal(
1396 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1398 ikev2.IKEv2_payload_AUTH(
1400 auth_type=AuthMethod.value(self.sa.auth_method),
1401 load=self.sa.auth_data,
1403 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1404 / ikev2.IKEv2_payload_TSi(
1405 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1407 / ikev2.IKEv2_payload_TSr(
1408 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1413 first_payload = "Nonce"
1415 head = ikev2.IKEv2_payload_Nonce(
1416 load=self.sa.i_nonce, next_payload="KE"
1417 ) / ikev2.IKEv2_payload_KE(
1418 group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1421 head = ikev2.IKEv2_payload_Nonce(
1422 load=self.sa.i_nonce, next_payload="SA"
1427 / ikev2.IKEv2_payload_Notify(
1431 length=8 + len(c.ispi),
1432 next_payload="Notify",
1434 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1437 first_payload = "IDi"
1438 if self.no_idr_auth:
1439 ids = ikev2.IKEv2_payload_IDi(
1440 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1443 ids = ikev2.IKEv2_payload_IDi(
1444 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1445 ) / ikev2.IKEv2_payload_IDr(
1446 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1449 return plain, first_payload
1451 def send_sa_auth(self):
1452 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1453 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1454 header = ikev2.IKEv2(
1455 init_SPI=self.sa.ispi,
1456 resp_SPI=self.sa.rspi,
1457 id=self.sa.new_msg_id(),
1459 exch_type="IKE_AUTH",
1462 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1463 packet = self.create_packet(
1464 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1466 self.pg0.add_stream(packet)
1467 self.pg0.enable_capture()
1469 capture = self.pg0.get_capture(1)
1470 self.verify_sa_auth_resp(capture[0])
1472 def verify_sa_init(self, packet):
1473 ih = self.get_ike_header(packet)
1475 self.assertEqual(ih.id, self.sa.msg_id)
1476 self.assertEqual(ih.exch_type, 34)
1477 self.assertIn("Response", ih.flags)
1478 self.assertEqual(ih.init_SPI, self.sa.ispi)
1479 self.assertNotEqual(ih.resp_SPI, 0)
1480 self.sa.rspi = ih.resp_SPI
1482 sa = ih[ikev2.IKEv2_payload_SA]
1483 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1484 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1485 except IndexError as e:
1486 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1487 self.logger.error(ih.show())
1489 self.sa.complete_dh_data()
1493 def verify_sa_auth_resp(self, packet):
1494 ike = self.get_ike_header(packet)
1496 self.verify_udp(udp)
1497 self.assertEqual(ike.id, self.sa.msg_id)
1498 plain = self.sa.hmac_and_decrypt(ike)
1499 idr = ikev2.IKEv2_payload_IDr(plain)
1500 prop = idr[ikev2.IKEv2_payload_Proposal]
1501 self.assertEqual(prop.SPIsize, 4)
1502 self.sa.child_sas[0].rspi = prop.SPI
1503 self.sa.calc_child_keys()
1505 IKE_NODE_SUFFIX = "ip4"
1507 def verify_counters(self):
1508 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1509 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1510 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1512 r = self.vapi.ikev2_sa_dump()
1514 self.assertEqual(1, s.n_sa_auth_req)
1515 self.assertEqual(1, s.n_sa_init_req)
1517 def test_responder(self):
1518 self.send_sa_init_req()
1520 self.verify_ipsec_sas()
1521 self.verify_ike_sas()
1522 self.verify_counters()
1525 class Ikev2Params(object):
1526 def config_params(self, params={}):
1527 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1528 ei = VppEnum.vl_api_ipsec_integ_alg_t
1530 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1531 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1532 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1533 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1534 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1535 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1536 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1537 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1538 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1539 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1542 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1544 self.vapi.cli("ikev2 dpd disable")
1545 self.del_sa_from_responder = (
1547 if "del_sa_from_responder" not in params
1548 else params["del_sa_from_responder"]
1550 i_natt = False if "i_natt" not in params else params["i_natt"]
1551 r_natt = False if "r_natt" not in params else params["r_natt"]
1552 self.p = Profile(self, "pr1")
1553 self.ip6 = False if "ip6" not in params else params["ip6"]
1555 if "auth" in params and params["auth"] == "rsa-sig":
1556 auth_method = "rsa-sig"
1557 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1558 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1560 client_file = work_dir + params["client-cert"]
1561 server_pem = open(work_dir + params["server-cert"]).read()
1562 client_priv = open(work_dir + params["client-key"]).read()
1563 client_priv = load_pem_private_key(
1564 str.encode(client_priv), None, default_backend()
1566 self.peer_cert = x509.load_pem_x509_certificate(
1567 str.encode(server_pem), default_backend()
1569 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1572 auth_data = b"$3cr3tpa$$w0rd"
1573 self.p.add_auth(method="shared-key", data=auth_data)
1574 auth_method = "shared-key"
1577 is_init = True if "is_initiator" not in params else params["is_initiator"]
1578 self.no_idr_auth = params.get("no_idr_in_auth", False)
1580 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1581 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1582 r_id = self.idr = idr["data"]
1583 i_id = self.idi = idi["data"]
1585 # scapy is initiator, VPP is responder
1586 self.p.add_local_id(**idr)
1587 self.p.add_remote_id(**idi)
1588 if self.no_idr_auth:
1591 # VPP is initiator, scapy is responder
1592 self.p.add_local_id(**idi)
1593 if not self.no_idr_auth:
1594 self.p.add_remote_id(**idr)
1597 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1598 if "loc_ts" not in params
1599 else params["loc_ts"]
1602 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1603 if "rem_ts" not in params
1604 else params["rem_ts"]
1606 self.p.add_local_ts(**loc_ts)
1607 self.p.add_remote_ts(**rem_ts)
1608 if "responder" in params:
1609 self.p.add_responder(params["responder"])
1610 if "ike_transforms" in params:
1611 self.p.add_ike_transforms(params["ike_transforms"])
1612 if "esp_transforms" in params:
1613 self.p.add_esp_transforms(params["esp_transforms"])
1615 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1617 self.p.set_udp_encap(True)
1619 if "responder_hostname" in params:
1620 hn = params["responder_hostname"]
1621 self.p.add_responder_hostname(hn)
1623 # configure static dns record
1624 self.vapi.dns_name_server_add_del(
1625 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1627 self.vapi.dns_enable_disable(enable=1)
1629 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1636 is_initiator=is_init,
1637 id_type=self.p.local_id["id_type"],
1640 priv_key=client_priv,
1641 auth_method=auth_method,
1642 nonce=params.get("nonce"),
1643 auth_data=auth_data,
1644 udp_encap=udp_encap,
1645 local_ts=self.p.remote_ts,
1646 remote_ts=self.p.local_ts,
1651 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1654 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1656 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1659 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1662 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1665 self.sa.set_ike_props(
1666 crypto=ike_crypto[0],
1667 crypto_key_len=ike_crypto[1],
1669 prf="PRF_HMAC_SHA2_256",
1672 self.sa.set_esp_props(
1673 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1677 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1678 class TestApi(VppTestCase):
1679 """Test IKEV2 API"""
1682 def setUpClass(cls):
1683 super(TestApi, cls).setUpClass()
1686 def tearDownClass(cls):
1687 super(TestApi, cls).tearDownClass()
1690 super(TestApi, self).tearDown()
1691 self.p1.remove_vpp_config()
1692 self.p2.remove_vpp_config()
1693 r = self.vapi.ikev2_profile_dump()
1694 self.assertEqual(len(r), 0)
1696 def configure_profile(self, cfg):
1697 p = Profile(self, cfg["name"])
1698 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1699 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1700 p.add_local_ts(**cfg["loc_ts"])
1701 p.add_remote_ts(**cfg["rem_ts"])
1702 p.add_responder(cfg["responder"])
1703 p.add_ike_transforms(cfg["ike_ts"])
1704 p.add_esp_transforms(cfg["esp_ts"])
1705 p.add_auth(**cfg["auth"])
1706 p.set_udp_encap(cfg["udp_encap"])
1707 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1708 if "lifetime_data" in cfg:
1709 p.set_lifetime_data(cfg["lifetime_data"])
1710 if "tun_itf" in cfg:
1711 p.set_tunnel_interface(cfg["tun_itf"])
1712 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1717 def test_profile_api(self):
1718 """test profile dump API"""
1723 "start_addr": "3.3.3.2",
1724 "end_addr": "3.3.3.3",
1730 "start_addr": "4.5.76.80",
1731 "end_addr": "2.3.4.6",
1738 "start_addr": "ab::1",
1739 "end_addr": "ab::4",
1745 "start_addr": "cd::12",
1746 "end_addr": "cd::13",
1752 "natt_disabled": True,
1753 "loc_id": ("fqdn", b"vpp.home"),
1754 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1757 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1760 "crypto_key_size": 32,
1764 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1765 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1767 "ipsec_over_udp_port": 4501,
1770 "lifetime_maxdata": 20192,
1771 "lifetime_jitter": 9,
1777 "loc_id": ("ip4-addr", b"192.168.2.1"),
1778 "rem_id": ("ip6-addr", b"abcd::1"),
1781 "responder": {"sw_if_index": 4, "addr": "def::10"},
1784 "crypto_key_size": 16,
1788 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1789 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1791 "ipsec_over_udp_port": 4600,
1795 self.p1 = self.configure_profile(conf["p1"])
1796 self.p2 = self.configure_profile(conf["p2"])
1798 r = self.vapi.ikev2_profile_dump()
1799 self.assertEqual(len(r), 2)
1800 self.verify_profile(r[0].profile, conf["p1"])
1801 self.verify_profile(r[1].profile, conf["p2"])
1803 def verify_id(self, api_id, cfg_id):
1804 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1805 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1807 def verify_ts(self, api_ts, cfg_ts):
1808 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1809 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1810 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1811 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1812 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1814 def verify_responder(self, api_r, cfg_r):
1815 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1816 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1818 def verify_transforms(self, api_ts, cfg_ts):
1819 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1820 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1821 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1823 def verify_ike_transforms(self, api_ts, cfg_ts):
1824 self.verify_transforms(api_ts, cfg_ts)
1825 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1827 def verify_esp_transforms(self, api_ts, cfg_ts):
1828 self.verify_transforms(api_ts, cfg_ts)
1830 def verify_auth(self, api_auth, cfg_auth):
1831 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1832 self.assertEqual(api_auth.data, cfg_auth["data"])
1833 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1835 def verify_lifetime_data(self, p, ld):
1836 self.assertEqual(p.lifetime, ld["lifetime"])
1837 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1838 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1839 self.assertEqual(p.handover, ld["handover"])
1841 def verify_profile(self, ap, cp):
1842 self.assertEqual(ap.name, cp["name"])
1843 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1844 self.verify_id(ap.loc_id, cp["loc_id"])
1845 self.verify_id(ap.rem_id, cp["rem_id"])
1846 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1847 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1848 self.verify_responder(ap.responder, cp["responder"])
1849 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1850 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1851 self.verify_auth(ap.auth, cp["auth"])
1852 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1853 self.assertTrue(natt_dis == ap.natt_disabled)
1855 if "lifetime_data" in cp:
1856 self.verify_lifetime_data(ap, cp["lifetime_data"])
1857 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1859 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1861 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1864 @tag_fixme_vpp_workers
1865 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1866 """test responder - responder behind NAT"""
1868 IKE_NODE_SUFFIX = "ip4-natt"
1870 def config_tc(self):
1871 self.config_params({"r_natt": True})
1874 @tag_fixme_vpp_workers
1875 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1876 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1878 def config_tc(self):
1882 "is_initiator": False, # seen from test case perspective
1883 # thus vpp is initiator
1885 "sw_if_index": self.pg0.sw_if_index,
1886 "addr": self.pg0.remote_ip4,
1888 "ike-crypto": ("AES-GCM-16ICV", 32),
1889 "ike-integ": "NULL",
1890 "ike-dh": "3072MODPgr",
1892 "crypto_alg": 20, # "aes-gcm-16"
1893 "crypto_key_size": 256,
1894 "dh_group": 15, # "modp-3072"
1897 "crypto_alg": 12, # "aes-cbc"
1898 "crypto_key_size": 256,
1899 # "hmac-sha2-256-128"
1906 @tag_fixme_vpp_workers
1907 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1908 """test ikev2 initiator - pre shared key auth"""
1910 def config_tc(self):
1913 "is_initiator": False, # seen from test case perspective
1914 # thus vpp is initiator
1915 "ike-crypto": ("AES-GCM-16ICV", 32),
1916 "ike-integ": "NULL",
1917 "ike-dh": "3072MODPgr",
1919 "crypto_alg": 20, # "aes-gcm-16"
1920 "crypto_key_size": 256,
1921 "dh_group": 15, # "modp-3072"
1924 "crypto_alg": 12, # "aes-cbc"
1925 "crypto_key_size": 256,
1926 # "hmac-sha2-256-128"
1929 "responder_hostname": {
1930 "hostname": "vpp.responder.org",
1931 "sw_if_index": self.pg0.sw_if_index,
1937 @tag_fixme_vpp_workers
1938 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1939 """test initiator - request window size (1)"""
1941 def rekey_respond(self, req, update_child_sa_data):
1942 ih = self.get_ike_header(req)
1943 plain = self.sa.hmac_and_decrypt(ih)
1944 sa = ikev2.IKEv2_payload_SA(plain)
1945 if update_child_sa_data:
1946 prop = sa[ikev2.IKEv2_payload_Proposal]
1947 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1948 self.sa.r_nonce = self.sa.i_nonce
1949 self.sa.child_sas[0].ispi = prop.SPI
1950 self.sa.child_sas[0].rspi = prop.SPI
1951 self.sa.calc_child_keys()
1953 header = ikev2.IKEv2(
1954 init_SPI=self.sa.ispi,
1955 resp_SPI=self.sa.rspi,
1959 next_payload="Encrypted",
1961 resp = self.encrypt_ike_msg(header, sa, "SA")
1962 packet = self.create_packet(
1963 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1965 self.send_and_assert_no_replies(self.pg0, packet)
1967 def test_initiator(self):
1968 super(TestInitiatorRequestWindowSize, self).test_initiator()
1969 self.pg0.enable_capture()
1971 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1972 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1973 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1974 capture = self.pg0.get_capture(2)
1976 # reply in reverse order
1977 self.rekey_respond(capture[1], True)
1978 self.rekey_respond(capture[0], False)
1980 # verify that only the second request was accepted
1981 self.verify_ike_sas()
1982 self.verify_ipsec_sas(is_rekey=True)
1985 @tag_fixme_vpp_workers
1986 class TestInitiatorRekey(TestInitiatorPsk):
1987 """test ikev2 initiator - rekey"""
1989 def rekey_from_initiator(self):
1990 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1991 self.pg0.enable_capture()
1993 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1994 capture = self.pg0.get_capture(1)
1995 ih = self.get_ike_header(capture[0])
1996 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1997 self.assertNotIn("Response", ih.flags)
1998 self.assertIn("Initiator", ih.flags)
1999 plain = self.sa.hmac_and_decrypt(ih)
2000 sa = ikev2.IKEv2_payload_SA(plain)
2001 prop = sa[ikev2.IKEv2_payload_Proposal]
2002 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2003 self.sa.r_nonce = self.sa.i_nonce
2004 # update new responder SPI
2005 self.sa.child_sas[0].ispi = prop.SPI
2006 self.sa.child_sas[0].rspi = prop.SPI
2007 self.sa.calc_child_keys()
2008 header = ikev2.IKEv2(
2009 init_SPI=self.sa.ispi,
2010 resp_SPI=self.sa.rspi,
2014 next_payload="Encrypted",
2016 resp = self.encrypt_ike_msg(header, sa, "SA")
2017 packet = self.create_packet(
2018 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2020 self.send_and_assert_no_replies(self.pg0, packet)
2022 def test_initiator(self):
2023 super(TestInitiatorRekey, self).test_initiator()
2024 self.rekey_from_initiator()
2025 self.verify_ike_sas()
2026 self.verify_ipsec_sas(is_rekey=True)
2029 @tag_fixme_vpp_workers
2030 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2031 """test ikev2 initiator - delete IKE SA from responder"""
2033 def config_tc(self):
2036 "del_sa_from_responder": True,
2037 "is_initiator": False, # seen from test case perspective
2038 # thus vpp is initiator
2040 "sw_if_index": self.pg0.sw_if_index,
2041 "addr": self.pg0.remote_ip4,
2043 "ike-crypto": ("AES-GCM-16ICV", 32),
2044 "ike-integ": "NULL",
2045 "ike-dh": "3072MODPgr",
2047 "crypto_alg": 20, # "aes-gcm-16"
2048 "crypto_key_size": 256,
2049 "dh_group": 15, # "modp-3072"
2052 "crypto_alg": 12, # "aes-cbc"
2053 "crypto_key_size": 256,
2054 # "hmac-sha2-256-128"
2057 "no_idr_in_auth": True,
2062 @tag_fixme_vpp_workers
2063 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2064 """test ikev2 responder - initiator behind NAT"""
2066 IKE_NODE_SUFFIX = "ip4-natt"
2068 def config_tc(self):
2069 self.config_params({"i_natt": True})
2072 @tag_fixme_vpp_workers
2073 class TestResponderPsk(TemplateResponder, Ikev2Params):
2074 """test ikev2 responder - pre shared key auth"""
2076 def config_tc(self):
2077 self.config_params()
2080 @tag_fixme_vpp_workers
2081 class TestResponderDpd(TestResponderPsk):
2083 Dead peer detection test
2086 def config_tc(self):
2087 self.config_params({"dpd_disabled": False})
2092 def test_responder(self):
2093 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2094 super(TestResponderDpd, self).test_responder()
2095 self.pg0.enable_capture()
2097 # capture empty request but don't reply
2098 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2099 ih = self.get_ike_header(capture[0])
2100 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2101 plain = self.sa.hmac_and_decrypt(ih)
2102 self.assertEqual(plain, b"")
2103 # wait for SA expiration
2105 ike_sas = self.vapi.ikev2_sa_dump()
2106 self.assertEqual(len(ike_sas), 0)
2107 ipsec_sas = self.vapi.ipsec_sa_dump()
2108 self.assertEqual(len(ipsec_sas), 0)
2111 @tag_fixme_vpp_workers
2112 class TestResponderRekey(TestResponderPsk):
2113 """test ikev2 responder - rekey"""
2117 def send_rekey_from_initiator(self):
2119 self.sa.generate_dh_data()
2120 packet = self.create_rekey_request(kex=self.WITH_KEX)
2121 self.pg0.add_stream(packet)
2122 self.pg0.enable_capture()
2124 capture = self.pg0.get_capture(1)
2127 def process_rekey_response(self, capture):
2128 ih = self.get_ike_header(capture[0])
2129 plain = self.sa.hmac_and_decrypt(ih)
2130 sa = ikev2.IKEv2_payload_SA(plain)
2131 prop = sa[ikev2.IKEv2_payload_Proposal]
2132 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2133 # update new responder SPI
2134 self.sa.child_sas[0].rspi = prop.SPI
2136 self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2137 self.sa.complete_dh_data()
2138 self.sa.calc_child_keys(kex=self.WITH_KEX)
2140 def test_responder(self):
2141 super(TestResponderRekey, self).test_responder()
2142 self.process_rekey_response(self.send_rekey_from_initiator())
2143 self.verify_ike_sas()
2144 self.verify_ipsec_sas(is_rekey=True)
2145 self.assert_counter(1, "rekey_req", "ip4")
2146 r = self.vapi.ikev2_sa_dump()
2147 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2150 @tag_fixme_vpp_workers
2151 class TestResponderRekeyRepeat(TestResponderRekey):
2152 """test ikev2 responder - rekey repeat"""
2154 def test_responder(self):
2155 super(TestResponderRekeyRepeat, self).test_responder()
2156 # rekey request is not accepted until old IPsec SA is expired
2157 capture = self.send_rekey_from_initiator()
2158 ih = self.get_ike_header(capture[0])
2159 plain = self.sa.hmac_and_decrypt(ih)
2160 notify = ikev2.IKEv2_payload_Notify(plain)
2161 self.assertEqual(notify.type, 43)
2162 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2163 # rekey request is accepted after old IPsec SA was expired
2165 if len(self.vapi.ipsec_sa_dump()) != 3:
2169 self.fail("old IPsec SA not expired")
2170 self.process_rekey_response(self.send_rekey_from_initiator())
2171 self.verify_ike_sas()
2172 self.verify_ipsec_sas(sa_count=3)
2175 @tag_fixme_vpp_workers
2176 class TestResponderRekeyKEX(TestResponderRekey):
2177 """test ikev2 responder - rekey with key exchange"""
2182 @tag_fixme_vpp_workers
2183 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2184 """test ikev2 responder - rekey repeat with key exchange"""
2189 @tag_fixme_ubuntu2204
2191 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2192 """test ikev2 responder - non-default table id"""
2195 def setUpClass(cls):
2196 import scapy.contrib.ikev2 as _ikev2
2198 globals()["ikev2"] = _ikev2
2199 super(IkePeer, cls).setUpClass()
2200 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2204 cls.create_pg_interfaces(range(1))
2205 cls.vapi.cli("ip table add 1")
2206 cls.vapi.cli("set interface ip table pg0 1")
2207 for i in cls.pg_interfaces:
2214 def config_tc(self):
2215 self.config_params({"dpd_disabled": False})
2217 def test_responder(self):
2218 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2219 super(TestResponderVrf, self).test_responder()
2220 self.pg0.enable_capture()
2222 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2223 ih = self.get_ike_header(capture[0])
2224 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2225 plain = self.sa.hmac_and_decrypt(ih)
2226 self.assertEqual(plain, b"")
2229 @tag_fixme_vpp_workers
2230 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2231 """test ikev2 responder - cert based auth"""
2233 def config_tc(self):
2238 "server-key": "server-key.pem",
2239 "client-key": "client-key.pem",
2240 "client-cert": "client-cert.pem",
2241 "server-cert": "server-cert.pem",
2246 @tag_fixme_vpp_workers
2247 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2248 TemplateResponder, Ikev2Params
2251 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2254 def config_tc(self):
2257 "ike-crypto": ("AES-CBC", 16),
2258 "ike-integ": "SHA2-256-128",
2259 "esp-crypto": ("AES-CBC", 24),
2260 "esp-integ": "SHA2-384-192",
2261 "ike-dh": "2048MODPgr",
2262 "nonce": os.urandom(256),
2263 "no_idr_in_auth": True,
2268 @tag_fixme_vpp_workers
2269 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2270 TemplateResponder, Ikev2Params
2274 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2277 def config_tc(self):
2280 "ike-crypto": ("AES-CBC", 32),
2281 "ike-integ": "SHA2-256-128",
2282 "esp-crypto": ("AES-GCM-16ICV", 32),
2283 "esp-integ": "NULL",
2284 "ike-dh": "3072MODPgr",
2289 @tag_fixme_vpp_workers
2290 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2295 IKE_NODE_SUFFIX = "ip6"
2297 def config_tc(self):
2300 "del_sa_from_responder": True,
2303 "ike-crypto": ("AES-GCM-16ICV", 32),
2304 "ike-integ": "NULL",
2305 "ike-dh": "2048MODPgr",
2306 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2307 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2312 @tag_fixme_vpp_workers
2313 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2315 Test for keep alive messages
2318 def send_empty_req_from_responder(self):
2319 packet = self.create_empty_request()
2320 self.pg0.add_stream(packet)
2321 self.pg0.enable_capture()
2323 capture = self.pg0.get_capture(1)
2324 ih = self.get_ike_header(capture[0])
2325 self.assertEqual(ih.id, self.sa.msg_id)
2326 plain = self.sa.hmac_and_decrypt(ih)
2327 self.assertEqual(plain, b"")
2328 self.assert_counter(1, "keepalive", "ip4")
2329 r = self.vapi.ikev2_sa_dump()
2330 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2332 def test_initiator(self):
2333 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2334 self.send_empty_req_from_responder()
2337 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2338 """malformed packet test"""
2343 def config_tc(self):
2344 self.config_params()
2346 def create_ike_init_msg(self, length=None, payload=None):
2349 init_SPI="\x11" * 8,
2351 exch_type="IKE_SA_INIT",
2353 if payload is not None:
2355 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2357 def verify_bad_packet_length(self):
2358 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2359 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2360 self.assert_counter(self.pkt_count, "bad_length")
2362 def verify_bad_sa_payload_length(self):
2363 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2364 ike_msg = self.create_ike_init_msg(payload=p)
2365 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2366 self.assert_counter(self.pkt_count, "malformed_packet")
2368 def test_responder(self):
2369 self.pkt_count = 254
2370 self.verify_bad_packet_length()
2371 self.verify_bad_sa_payload_length()
2374 if __name__ == "__main__":
2375 unittest.main(testRunner=VppTestRunner)