3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
32 KEY_PAD = b"Key Pad for IKEv2"
39 # tuple structure is (p, g, key_len)
44 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
45 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
46 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
47 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
48 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
49 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
50 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
51 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
52 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
53 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
54 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
62 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
63 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
64 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
65 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
66 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
67 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
68 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
69 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
70 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
71 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
72 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
73 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
74 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
75 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
76 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
77 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
85 class CryptoAlgo(object):
86 def __init__(self, name, cipher, mode):
90 if self.cipher is not None:
91 self.bs = self.cipher.block_size // 8
93 if self.name == "AES-GCM-16ICV":
94 self.iv_len = GCM_IV_SIZE
98 def encrypt(self, data, key, aad=None):
99 iv = os.urandom(self.iv_len)
102 self.cipher(key), self.mode(iv), default_backend()
104 return iv + encryptor.update(data) + encryptor.finalize()
106 salt = key[-SALT_SIZE:]
109 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
111 encryptor.authenticate_additional_data(aad)
112 data = encryptor.update(data) + encryptor.finalize()
113 data += encryptor.tag[:GCM_ICV_SIZE]
116 def decrypt(self, data, key, aad=None, icv=None):
118 iv = data[: self.iv_len]
119 ct = data[self.iv_len :]
121 algorithms.AES(key), self.mode(iv), default_backend()
123 return decryptor.update(ct) + decryptor.finalize()
125 salt = key[-SALT_SIZE:]
126 nonce = salt + data[:GCM_IV_SIZE]
127 ct = data[GCM_IV_SIZE:]
128 key = key[:-SALT_SIZE]
130 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
132 decryptor.authenticate_additional_data(aad)
133 return decryptor.update(ct) + decryptor.finalize()
136 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
137 data = data + b"\x00" * (pad_len - 1)
138 return data + bytes([pad_len - 1])
141 class AuthAlgo(object):
142 def __init__(self, name, mac, mod, key_len, trunc_len=None):
146 self.key_len = key_len
147 self.trunc_len = trunc_len or key_len
151 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
152 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
153 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
157 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
158 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
159 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
160 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
161 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
165 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
182 class IKEv2ChildSA(object):
183 def __init__(self, local_ts, remote_ts, is_initiator):
191 self.local_ts = local_ts
192 self.remote_ts = remote_ts
195 class IKEv2SA(object):
202 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
208 auth_method="shared-key",
214 self.udp_encap = udp_encap
224 self.dh_params = None
226 self.priv_key = priv_key
227 self.is_initiator = is_initiator
228 nonce = nonce or os.urandom(32)
229 self.auth_data = auth_data
232 if isinstance(id_type, str):
233 self.id_type = IDType.value(id_type)
235 self.id_type = id_type
236 self.auth_method = auth_method
237 if self.is_initiator:
238 self.rspi = 8 * b"\x00"
243 self.ispi = 8 * b"\x00"
245 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
247 def new_msg_id(self):
252 def my_dh_pub_key(self):
253 if self.is_initiator:
254 return self.i_dh_data
255 return self.r_dh_data
258 def peer_dh_pub_key(self):
259 if self.is_initiator:
260 return self.r_dh_data
261 return self.i_dh_data
265 return self.i_natt or self.r_natt
267 def compute_secret(self):
268 priv = self.dh_private_key
269 peer = self.peer_dh_pub_key
270 p, g, l = self.ike_group
272 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
275 def generate_dh_data(self):
277 if self.ike_dh not in DH:
278 raise NotImplementedError("%s not in DH group" % self.ike_dh)
280 if self.dh_params is None:
281 dhg = DH[self.ike_dh]
282 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
283 self.dh_params = pn.parameters(default_backend())
285 priv = self.dh_params.generate_private_key()
286 pub = priv.public_key()
287 x = priv.private_numbers().x
288 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
289 y = pub.public_numbers().y
291 if self.is_initiator:
292 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
294 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
296 def complete_dh_data(self):
297 self.dh_shared_secret = self.compute_secret()
299 def calc_child_keys(self):
300 prf = self.ike_prf_alg.mod()
301 s = self.i_nonce + self.r_nonce
302 c = self.child_sas[0]
304 encr_key_len = self.esp_crypto_key_len
305 integ_key_len = self.esp_integ_alg.key_len
306 salt_len = 0 if integ_key_len else 4
308 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
309 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
312 c.sk_ei = keymat[pos : pos + encr_key_len]
316 c.sk_ai = keymat[pos : pos + integ_key_len]
319 c.salt_ei = keymat[pos : pos + salt_len]
322 c.sk_er = keymat[pos : pos + encr_key_len]
326 c.sk_ar = keymat[pos : pos + integ_key_len]
329 c.salt_er = keymat[pos : pos + salt_len]
332 def calc_prfplus(self, prf, key, seed, length):
336 while len(r) < length and x < 255:
341 s = s + seed + bytes([x])
342 t = self.calc_prf(prf, key, s)
350 def calc_prf(self, prf, key, data):
351 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
356 prf = self.ike_prf_alg.mod()
357 # SKEYSEED = prf(Ni | Nr, g^ir)
358 s = self.i_nonce + self.r_nonce
359 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
361 # calculate S = Ni | Nr | SPIi SPIr
362 s = s + self.ispi + self.rspi
364 prf_key_trunc = self.ike_prf_alg.trunc_len
365 encr_key_len = self.ike_crypto_key_len
366 tr_prf_key_len = self.ike_prf_alg.key_len
367 integ_key_len = self.ike_integ_alg.key_len
368 if integ_key_len == 0:
380 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
383 self.sk_d = keymat[: pos + prf_key_trunc]
386 self.sk_ai = keymat[pos : pos + integ_key_len]
388 self.sk_ar = keymat[pos : pos + integ_key_len]
391 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
392 pos += encr_key_len + salt_size
393 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
394 pos += encr_key_len + salt_size
396 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
397 pos += tr_prf_key_len
398 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
400 def generate_authmsg(self, prf, packet):
401 if self.is_initiator:
409 data = bytes([self.id_type, 0, 0, 0]) + id
410 id_hash = self.calc_prf(prf, key, data)
411 return packet + nonce + id_hash
414 prf = self.ike_prf_alg.mod()
415 if self.is_initiator:
416 packet = self.init_req_packet
418 packet = self.init_resp_packet
419 authmsg = self.generate_authmsg(prf, raw(packet))
420 if self.auth_method == "shared-key":
421 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
422 self.auth_data = self.calc_prf(prf, psk, authmsg)
423 elif self.auth_method == "rsa-sig":
424 self.auth_data = self.priv_key.sign(
425 authmsg, padding.PKCS1v15(), hashes.SHA1()
428 raise TypeError("unknown auth method type!")
430 def encrypt(self, data, aad=None):
431 data = self.ike_crypto_alg.pad(data)
432 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
435 def peer_authkey(self):
436 if self.is_initiator:
441 def my_authkey(self):
442 if self.is_initiator:
447 def my_cryptokey(self):
448 if self.is_initiator:
453 def peer_cryptokey(self):
454 if self.is_initiator:
458 def concat(self, alg, key_len):
459 return alg + "-" + str(key_len * 8)
462 def vpp_ike_cypto_alg(self):
463 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
466 def vpp_esp_cypto_alg(self):
467 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
469 def verify_hmac(self, ikemsg):
470 integ_trunc = self.ike_integ_alg.trunc_len
471 exp_hmac = ikemsg[-integ_trunc:]
472 data = ikemsg[:-integ_trunc]
473 computed_hmac = self.compute_hmac(
474 self.ike_integ_alg.mod(), self.peer_authkey, data
476 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
478 def compute_hmac(self, integ, key, data):
479 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
483 def decrypt(self, data, aad=None, icv=None):
484 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
486 def hmac_and_decrypt(self, ike):
487 ep = ike[ikev2.IKEv2_payload_Encrypted]
488 if self.ike_crypto == "AES-GCM-16ICV":
489 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
490 ct = ep.load[:-GCM_ICV_SIZE]
491 tag = ep.load[-GCM_ICV_SIZE:]
492 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
494 self.verify_hmac(raw(ike))
495 integ_trunc = self.ike_integ_alg.trunc_len
497 # remove ICV and decrypt payload
498 ct = ep.load[:-integ_trunc]
499 plain = self.decrypt(ct)
502 return plain[: -pad_len - 1]
504 def build_ts_addr(self, ts, version):
506 "starting_address_v" + version: ts["start_addr"],
507 "ending_address_v" + version: ts["end_addr"],
510 def generate_ts(self, is_ip4):
511 c = self.child_sas[0]
512 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
514 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
515 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
516 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
517 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
519 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
520 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
521 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
522 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
524 if self.is_initiator:
525 return ([ts1], [ts2])
526 return ([ts2], [ts1])
528 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
529 if crypto not in CRYPTO_ALGOS:
530 raise TypeError("unsupported encryption algo %r" % crypto)
531 self.ike_crypto = crypto
532 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
533 self.ike_crypto_key_len = crypto_key_len
535 if integ not in AUTH_ALGOS:
536 raise TypeError("unsupported auth algo %r" % integ)
537 self.ike_integ = None if integ == "NULL" else integ
538 self.ike_integ_alg = AUTH_ALGOS[integ]
540 if prf not in PRF_ALGOS:
541 raise TypeError("unsupported prf algo %r" % prf)
543 self.ike_prf_alg = PRF_ALGOS[prf]
545 self.ike_group = DH[self.ike_dh]
547 def set_esp_props(self, crypto, crypto_key_len, integ):
548 self.esp_crypto_key_len = crypto_key_len
549 if crypto not in CRYPTO_ALGOS:
550 raise TypeError("unsupported encryption algo %r" % crypto)
551 self.esp_crypto = crypto
552 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
554 if integ not in AUTH_ALGOS:
555 raise TypeError("unsupported auth algo %r" % integ)
556 self.esp_integ = None if integ == "NULL" else integ
557 self.esp_integ_alg = AUTH_ALGOS[integ]
559 def crypto_attr(self, key_len):
560 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
561 return (0x800E << 16 | key_len << 3, 12)
563 raise Exception("unsupported attribute type")
565 def ike_crypto_attr(self):
566 return self.crypto_attr(self.ike_crypto_key_len)
568 def esp_crypto_attr(self):
569 return self.crypto_attr(self.esp_crypto_key_len)
571 def compute_nat_sha1(self, ip, port, rspi=None):
574 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
575 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
577 return digest.finalize()
580 class IkePeer(VppTestCase):
581 """common class for initiator and responder"""
585 import scapy.contrib.ikev2 as _ikev2
587 globals()["ikev2"] = _ikev2
588 super(IkePeer, cls).setUpClass()
589 cls.create_pg_interfaces(range(2))
590 for i in cls.pg_interfaces:
598 def tearDownClass(cls):
599 super(IkePeer, cls).tearDownClass()
602 super(IkePeer, self).tearDown()
603 if self.del_sa_from_responder:
604 self.initiate_del_sa_from_responder()
606 self.initiate_del_sa_from_initiator()
607 r = self.vapi.ikev2_sa_dump()
608 self.assertEqual(len(r), 0)
609 sas = self.vapi.ipsec_sa_dump()
610 self.assertEqual(len(sas), 0)
611 self.p.remove_vpp_config()
612 self.assertIsNone(self.p.query_vpp_config())
615 super(IkePeer, self).setUp()
617 self.p.add_vpp_config()
618 self.assertIsNotNone(self.p.query_vpp_config())
619 if self.sa.is_initiator:
620 self.sa.generate_dh_data()
621 self.vapi.cli("ikev2 set logging level 4")
622 self.vapi.cli("event-lo clear")
624 def assert_counter(self, count, name, version="ip4"):
625 node_name = "/err/ikev2-%s/" % version + name
626 self.assertEqual(count, self.statistics.get_err_counter(node_name))
628 def create_rekey_request(self):
629 sa, first_payload = self.generate_auth_payload(is_rekey=True)
630 header = ikev2.IKEv2(
631 init_SPI=self.sa.ispi,
632 resp_SPI=self.sa.rspi,
633 id=self.sa.new_msg_id(),
635 exch_type="CREATE_CHILD_SA",
638 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
639 return self.create_packet(
640 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
643 def create_empty_request(self):
644 header = ikev2.IKEv2(
645 init_SPI=self.sa.ispi,
646 resp_SPI=self.sa.rspi,
647 id=self.sa.new_msg_id(),
649 exch_type="INFORMATIONAL",
650 next_payload="Encrypted",
653 msg = self.encrypt_ike_msg(header, b"", None)
654 return self.create_packet(
655 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
659 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
662 src_ip = src_if.remote_ip6
663 dst_ip = src_if.local_ip6
666 src_ip = src_if.remote_ip4
667 dst_ip = src_if.local_ip4
670 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
671 / ip_layer(src=src_ip, dst=dst_ip)
672 / UDP(sport=sport, dport=dport)
675 # insert non ESP marker
676 res = res / Raw(b"\x00" * 4)
679 def verify_udp(self, udp):
680 self.assertEqual(udp.sport, self.sa.sport)
681 self.assertEqual(udp.dport, self.sa.dport)
683 def get_ike_header(self, packet):
685 ih = packet[ikev2.IKEv2]
686 ih = self.verify_and_remove_non_esp_marker(ih)
687 except IndexError as e:
688 # this is a workaround for getting IKEv2 layer as both ikev2 and
689 # ipsec register for port 4500
691 ih = self.verify_and_remove_non_esp_marker(esp)
692 self.assertEqual(ih.version, 0x20)
693 self.assertNotIn("Version", ih.flags)
696 def verify_and_remove_non_esp_marker(self, packet):
698 # if we are in nat traversal mode check for non esp marker
701 self.assertEqual(data[:4], b"\x00" * 4)
702 return ikev2.IKEv2(data[4:])
706 def encrypt_ike_msg(self, header, plain, first_payload):
707 if self.sa.ike_crypto == "AES-GCM-16ICV":
708 data = self.sa.ike_crypto_alg.pad(raw(plain))
713 + len(ikev2.IKEv2_payload_Encrypted())
715 tlen = plen + len(ikev2.IKEv2())
718 sk_p = ikev2.IKEv2_payload_Encrypted(
719 next_payload=first_payload, length=plen
723 encr = self.sa.encrypt(raw(plain), raw(res))
724 sk_p = ikev2.IKEv2_payload_Encrypted(
725 next_payload=first_payload, length=plen, load=encr
729 encr = self.sa.encrypt(raw(plain))
730 trunc_len = self.sa.ike_integ_alg.trunc_len
731 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
732 tlen = plen + len(ikev2.IKEv2())
734 sk_p = ikev2.IKEv2_payload_Encrypted(
735 next_payload=first_payload, length=plen, load=encr
740 integ_data = raw(res)
741 hmac_data = self.sa.compute_hmac(
742 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
744 res = res / Raw(hmac_data[:trunc_len])
745 assert len(res) == tlen
748 def verify_udp_encap(self, ipsec_sa):
749 e = VppEnum.vl_api_ipsec_sad_flags_t
750 if self.sa.udp_encap or self.sa.natt:
751 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
753 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
755 def verify_ipsec_sas(self, is_rekey=False):
756 sas = self.vapi.ipsec_sa_dump()
758 # after rekey there is a short period of time in which old
759 # inbound SA is still present
763 self.assertEqual(len(sas), sa_count)
764 if self.sa.is_initiator:
779 c = self.sa.child_sas[0]
781 self.verify_udp_encap(sa0)
782 self.verify_udp_encap(sa1)
783 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
784 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
785 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
787 if self.sa.esp_integ is None:
790 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
791 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
792 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
795 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
796 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
797 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
798 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
802 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
803 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
804 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
805 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
807 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
808 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
810 def verify_keymat(self, api_keys, keys, name):
811 km = getattr(keys, name)
812 api_km = getattr(api_keys, name)
813 api_km_len = getattr(api_keys, name + "_len")
814 self.assertEqual(len(km), api_km_len)
815 self.assertEqual(km, api_km[:api_km_len])
817 def verify_id(self, api_id, exp_id):
818 self.assertEqual(api_id.type, IDType.value(exp_id.type))
819 self.assertEqual(api_id.data_len, exp_id.data_len)
820 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
822 def verify_ike_sas(self):
823 r = self.vapi.ikev2_sa_dump()
824 self.assertEqual(len(r), 1)
826 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
827 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
829 if self.sa.is_initiator:
830 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
831 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
833 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
834 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
836 if self.sa.is_initiator:
837 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
838 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
840 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
841 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
842 self.verify_keymat(sa.keys, self.sa, "sk_d")
843 self.verify_keymat(sa.keys, self.sa, "sk_ai")
844 self.verify_keymat(sa.keys, self.sa, "sk_ar")
845 self.verify_keymat(sa.keys, self.sa, "sk_ei")
846 self.verify_keymat(sa.keys, self.sa, "sk_er")
847 self.verify_keymat(sa.keys, self.sa, "sk_pi")
848 self.verify_keymat(sa.keys, self.sa, "sk_pr")
850 self.assertEqual(sa.i_id.type, self.sa.id_type)
851 self.assertEqual(sa.r_id.type, self.sa.id_type)
852 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
853 self.assertEqual(sa.r_id.data_len, len(self.idr))
854 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
855 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
857 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
858 self.assertEqual(len(r), 1)
860 self.assertEqual(csa.sa_index, sa.sa_index)
861 c = self.sa.child_sas[0]
862 if hasattr(c, "sk_ai"):
863 self.verify_keymat(csa.keys, c, "sk_ai")
864 self.verify_keymat(csa.keys, c, "sk_ar")
865 self.verify_keymat(csa.keys, c, "sk_ei")
866 self.verify_keymat(csa.keys, c, "sk_er")
867 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
868 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
870 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
873 r = self.vapi.ikev2_traffic_selector_dump(
874 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
876 self.assertEqual(len(r), 1)
878 self.verify_ts(r[0].ts, tsi[0], True)
880 r = self.vapi.ikev2_traffic_selector_dump(
881 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
883 self.assertEqual(len(r), 1)
884 self.verify_ts(r[0].ts, tsr[0], False)
886 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
887 self.verify_nonce(n, self.sa.i_nonce)
888 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
889 self.verify_nonce(n, self.sa.r_nonce)
891 def verify_nonce(self, api_nonce, nonce):
892 self.assertEqual(api_nonce.data_len, len(nonce))
893 self.assertEqual(api_nonce.nonce, nonce)
895 def verify_ts(self, api_ts, ts, is_initiator):
897 self.assertTrue(api_ts.is_local)
899 self.assertFalse(api_ts.is_local)
902 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
903 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
905 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
906 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
907 self.assertEqual(api_ts.start_port, ts.start_port)
908 self.assertEqual(api_ts.end_port, ts.end_port)
909 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
912 class TemplateInitiator(IkePeer):
913 """initiator test template"""
915 def initiate_del_sa_from_initiator(self):
916 ispi = int.from_bytes(self.sa.ispi, "little")
917 self.pg0.enable_capture()
919 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
920 capture = self.pg0.get_capture(1)
921 ih = self.get_ike_header(capture[0])
922 self.assertNotIn("Response", ih.flags)
923 self.assertIn("Initiator", ih.flags)
924 self.assertEqual(ih.init_SPI, self.sa.ispi)
925 self.assertEqual(ih.resp_SPI, self.sa.rspi)
926 plain = self.sa.hmac_and_decrypt(ih)
927 d = ikev2.IKEv2_payload_Delete(plain)
928 self.assertEqual(d.proto, 1) # proto=IKEv2
929 header = ikev2.IKEv2(
930 init_SPI=self.sa.ispi,
931 resp_SPI=self.sa.rspi,
933 exch_type="INFORMATIONAL",
935 next_payload="Encrypted",
937 resp = self.encrypt_ike_msg(header, b"", None)
938 self.send_and_assert_no_replies(self.pg0, resp)
940 def verify_del_sa(self, packet):
941 ih = self.get_ike_header(packet)
942 self.assertEqual(ih.id, self.sa.msg_id)
943 self.assertEqual(ih.exch_type, 37) # exchange informational
944 self.assertIn("Response", ih.flags)
945 self.assertIn("Initiator", ih.flags)
946 plain = self.sa.hmac_and_decrypt(ih)
947 self.assertEqual(plain, b"")
949 def initiate_del_sa_from_responder(self):
950 header = ikev2.IKEv2(
951 init_SPI=self.sa.ispi,
952 resp_SPI=self.sa.rspi,
953 exch_type="INFORMATIONAL",
954 id=self.sa.new_msg_id(),
956 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
957 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
958 packet = self.create_packet(
959 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
961 self.pg0.add_stream(packet)
962 self.pg0.enable_capture()
964 capture = self.pg0.get_capture(1)
965 self.verify_del_sa(capture[0])
968 def find_notify_payload(packet, notify_type):
969 n = packet[ikev2.IKEv2_payload_Notify]
971 if n.type == notify_type:
976 def verify_nat_detection(self, packet):
983 # NAT_DETECTION_SOURCE_IP
984 s = self.find_notify_payload(packet, 16388)
985 self.assertIsNotNone(s)
986 src_sha = self.sa.compute_nat_sha1(
987 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
989 self.assertEqual(s.load, src_sha)
991 # NAT_DETECTION_DESTINATION_IP
992 s = self.find_notify_payload(packet, 16389)
993 self.assertIsNotNone(s)
994 dst_sha = self.sa.compute_nat_sha1(
995 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
997 self.assertEqual(s.load, dst_sha)
999 def verify_sa_init_request(self, packet):
1001 self.sa.dport = udp.sport
1002 ih = packet[ikev2.IKEv2]
1003 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1004 self.assertEqual(ih.exch_type, 34) # SA_INIT
1005 self.sa.ispi = ih.init_SPI
1006 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1007 self.assertIn("Initiator", ih.flags)
1008 self.assertNotIn("Response", ih.flags)
1009 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1010 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1012 prop = packet[ikev2.IKEv2_payload_Proposal]
1013 self.assertEqual(prop.proto, 1) # proto = ikev2
1014 self.assertEqual(prop.proposal, 1)
1015 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1017 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1019 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1020 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1021 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1022 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1024 self.verify_nat_detection(packet)
1025 self.sa.set_ike_props(
1026 crypto="AES-GCM-16ICV",
1029 prf="PRF_HMAC_SHA2_256",
1032 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1033 self.sa.generate_dh_data()
1034 self.sa.complete_dh_data()
1037 def update_esp_transforms(self, trans, sa):
1039 if trans.transform_type == 1: # ecryption
1040 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1041 elif trans.transform_type == 3: # integrity
1042 sa.esp_integ = INTEG_IDS[trans.transform_id]
1043 trans = trans.payload
1045 def verify_sa_auth_req(self, packet):
1047 self.sa.dport = udp.sport
1048 ih = self.get_ike_header(packet)
1049 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1050 self.assertEqual(ih.init_SPI, self.sa.ispi)
1051 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1052 self.assertIn("Initiator", ih.flags)
1053 self.assertNotIn("Response", ih.flags)
1056 self.verify_udp(udp)
1057 self.assertEqual(ih.id, self.sa.msg_id + 1)
1059 plain = self.sa.hmac_and_decrypt(ih)
1060 idi = ikev2.IKEv2_payload_IDi(plain)
1061 self.assertEqual(idi.load, self.sa.i_id)
1062 if self.no_idr_auth:
1063 self.assertEqual(idi.next_payload, 39) # AUTH
1065 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1066 self.assertEqual(idr.load, self.sa.r_id)
1067 prop = idi[ikev2.IKEv2_payload_Proposal]
1068 c = self.sa.child_sas[0]
1070 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1072 def send_init_response(self):
1073 tr_attr = self.sa.ike_crypto_attr()
1075 ikev2.IKEv2_payload_Transform(
1076 transform_type="Encryption",
1077 transform_id=self.sa.ike_crypto,
1079 key_length=tr_attr[0],
1081 / ikev2.IKEv2_payload_Transform(
1082 transform_type="Integrity", transform_id=self.sa.ike_integ
1084 / ikev2.IKEv2_payload_Transform(
1085 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1087 / ikev2.IKEv2_payload_Transform(
1088 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1091 props = ikev2.IKEv2_payload_Proposal(
1092 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1095 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1097 dst_address = b"\x0a\x0a\x0a\x0a"
1099 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1100 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1101 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1103 self.sa.init_resp_packet = (
1105 init_SPI=self.sa.ispi,
1106 resp_SPI=self.sa.rspi,
1107 exch_type="IKE_SA_INIT",
1110 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1111 / ikev2.IKEv2_payload_KE(
1112 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1114 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1115 / ikev2.IKEv2_payload_Notify(
1116 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1118 / ikev2.IKEv2_payload_Notify(
1119 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1123 ike_msg = self.create_packet(
1125 self.sa.init_resp_packet,
1131 self.pg_send(self.pg0, ike_msg)
1132 capture = self.pg0.get_capture(1)
1133 self.verify_sa_auth_req(capture[0])
1135 def initiate_sa_init(self):
1136 self.pg0.enable_capture()
1138 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1140 capture = self.pg0.get_capture(1)
1141 self.verify_sa_init_request(capture[0])
1142 self.send_init_response()
1144 def send_auth_response(self):
1145 tr_attr = self.sa.esp_crypto_attr()
1147 ikev2.IKEv2_payload_Transform(
1148 transform_type="Encryption",
1149 transform_id=self.sa.esp_crypto,
1151 key_length=tr_attr[0],
1153 / ikev2.IKEv2_payload_Transform(
1154 transform_type="Integrity", transform_id=self.sa.esp_integ
1156 / ikev2.IKEv2_payload_Transform(
1157 transform_type="Extended Sequence Number", transform_id="No ESN"
1159 / ikev2.IKEv2_payload_Transform(
1160 transform_type="Extended Sequence Number", transform_id="ESN"
1164 c = self.sa.child_sas[0]
1165 props = ikev2.IKEv2_payload_Proposal(
1166 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1169 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1171 ikev2.IKEv2_payload_IDi(
1172 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1174 / ikev2.IKEv2_payload_IDr(
1175 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1177 / ikev2.IKEv2_payload_AUTH(
1179 auth_type=AuthMethod.value(self.sa.auth_method),
1180 load=self.sa.auth_data,
1182 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1183 / ikev2.IKEv2_payload_TSi(
1184 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1186 / ikev2.IKEv2_payload_TSr(
1187 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1189 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1192 header = ikev2.IKEv2(
1193 init_SPI=self.sa.ispi,
1194 resp_SPI=self.sa.rspi,
1195 id=self.sa.new_msg_id(),
1197 exch_type="IKE_AUTH",
1200 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1201 packet = self.create_packet(
1202 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1204 self.pg_send(self.pg0, packet)
1206 def test_initiator(self):
1207 self.initiate_sa_init()
1209 self.sa.calc_child_keys()
1210 self.send_auth_response()
1211 self.verify_ike_sas()
1214 class TemplateResponder(IkePeer):
1215 """responder test template"""
1217 def initiate_del_sa_from_responder(self):
1218 self.pg0.enable_capture()
1220 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1221 capture = self.pg0.get_capture(1)
1222 ih = self.get_ike_header(capture[0])
1223 self.assertNotIn("Response", ih.flags)
1224 self.assertNotIn("Initiator", ih.flags)
1225 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1226 plain = self.sa.hmac_and_decrypt(ih)
1227 d = ikev2.IKEv2_payload_Delete(plain)
1228 self.assertEqual(d.proto, 1) # proto=IKEv2
1229 self.assertEqual(ih.init_SPI, self.sa.ispi)
1230 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1231 header = ikev2.IKEv2(
1232 init_SPI=self.sa.ispi,
1233 resp_SPI=self.sa.rspi,
1234 flags="Initiator+Response",
1235 exch_type="INFORMATIONAL",
1237 next_payload="Encrypted",
1239 resp = self.encrypt_ike_msg(header, b"", None)
1240 self.send_and_assert_no_replies(self.pg0, resp)
1242 def verify_del_sa(self, packet):
1243 ih = self.get_ike_header(packet)
1244 self.assertEqual(ih.id, self.sa.msg_id)
1245 self.assertEqual(ih.exch_type, 37) # exchange informational
1246 self.assertIn("Response", ih.flags)
1247 self.assertNotIn("Initiator", ih.flags)
1248 self.assertEqual(ih.next_payload, 46) # Encrypted
1249 self.assertEqual(ih.init_SPI, self.sa.ispi)
1250 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1251 plain = self.sa.hmac_and_decrypt(ih)
1252 self.assertEqual(plain, b"")
1254 def initiate_del_sa_from_initiator(self):
1255 header = ikev2.IKEv2(
1256 init_SPI=self.sa.ispi,
1257 resp_SPI=self.sa.rspi,
1259 exch_type="INFORMATIONAL",
1260 id=self.sa.new_msg_id(),
1262 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1263 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1264 packet = self.create_packet(
1265 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1267 self.pg0.add_stream(packet)
1268 self.pg0.enable_capture()
1270 capture = self.pg0.get_capture(1)
1271 self.verify_del_sa(capture[0])
1273 def send_sa_init_req(self):
1274 tr_attr = self.sa.ike_crypto_attr()
1276 ikev2.IKEv2_payload_Transform(
1277 transform_type="Encryption",
1278 transform_id=self.sa.ike_crypto,
1280 key_length=tr_attr[0],
1282 / ikev2.IKEv2_payload_Transform(
1283 transform_type="Integrity", transform_id=self.sa.ike_integ
1285 / ikev2.IKEv2_payload_Transform(
1286 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1288 / ikev2.IKEv2_payload_Transform(
1289 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1293 props = ikev2.IKEv2_payload_Proposal(
1294 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1297 next_payload = None if self.ip6 else "Notify"
1299 self.sa.init_req_packet = (
1301 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1303 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1304 / ikev2.IKEv2_payload_KE(
1305 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1307 / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1312 src_address = b"\x0a\x0a\x0a\x01"
1314 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1317 dst_address = b"\x0a\x0a\x0a\x0a"
1319 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1321 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1322 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1323 nat_src_detection = ikev2.IKEv2_payload_Notify(
1324 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1326 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1327 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1329 self.sa.init_req_packet = (
1330 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1333 ike_msg = self.create_packet(
1335 self.sa.init_req_packet,
1341 self.pg0.add_stream(ike_msg)
1342 self.pg0.enable_capture()
1344 capture = self.pg0.get_capture(1)
1345 self.verify_sa_init(capture[0])
1347 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1348 tr_attr = self.sa.esp_crypto_attr()
1349 last_payload = last_payload or "Notify"
1351 ikev2.IKEv2_payload_Transform(
1352 transform_type="Encryption",
1353 transform_id=self.sa.esp_crypto,
1355 key_length=tr_attr[0],
1357 / ikev2.IKEv2_payload_Transform(
1358 transform_type="Integrity", transform_id=self.sa.esp_integ
1360 / ikev2.IKEv2_payload_Transform(
1361 transform_type="Extended Sequence Number", transform_id="No ESN"
1363 / ikev2.IKEv2_payload_Transform(
1364 transform_type="Extended Sequence Number", transform_id="ESN"
1368 c = self.sa.child_sas[0]
1369 props = ikev2.IKEv2_payload_Proposal(
1370 proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
1373 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1375 ikev2.IKEv2_payload_AUTH(
1377 auth_type=AuthMethod.value(self.sa.auth_method),
1378 load=self.sa.auth_data,
1380 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1381 / ikev2.IKEv2_payload_TSi(
1382 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1384 / ikev2.IKEv2_payload_TSr(
1385 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1390 first_payload = "Nonce"
1392 ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
1394 / ikev2.IKEv2_payload_Notify(type="REKEY_SA", proto="ESP", SPI=c.ispi)
1397 first_payload = "IDi"
1398 if self.no_idr_auth:
1399 ids = ikev2.IKEv2_payload_IDi(
1400 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1403 ids = ikev2.IKEv2_payload_IDi(
1404 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1405 ) / ikev2.IKEv2_payload_IDr(
1406 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1409 return plain, first_payload
1411 def send_sa_auth(self):
1412 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1413 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1414 header = ikev2.IKEv2(
1415 init_SPI=self.sa.ispi,
1416 resp_SPI=self.sa.rspi,
1417 id=self.sa.new_msg_id(),
1419 exch_type="IKE_AUTH",
1422 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1423 packet = self.create_packet(
1424 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1426 self.pg0.add_stream(packet)
1427 self.pg0.enable_capture()
1429 capture = self.pg0.get_capture(1)
1430 self.verify_sa_auth_resp(capture[0])
1432 def verify_sa_init(self, packet):
1433 ih = self.get_ike_header(packet)
1435 self.assertEqual(ih.id, self.sa.msg_id)
1436 self.assertEqual(ih.exch_type, 34)
1437 self.assertIn("Response", ih.flags)
1438 self.assertEqual(ih.init_SPI, self.sa.ispi)
1439 self.assertNotEqual(ih.resp_SPI, 0)
1440 self.sa.rspi = ih.resp_SPI
1442 sa = ih[ikev2.IKEv2_payload_SA]
1443 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1444 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1445 except IndexError as e:
1446 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1447 self.logger.error(ih.show())
1449 self.sa.complete_dh_data()
1453 def verify_sa_auth_resp(self, packet):
1454 ike = self.get_ike_header(packet)
1456 self.verify_udp(udp)
1457 self.assertEqual(ike.id, self.sa.msg_id)
1458 plain = self.sa.hmac_and_decrypt(ike)
1459 idr = ikev2.IKEv2_payload_IDr(plain)
1460 prop = idr[ikev2.IKEv2_payload_Proposal]
1461 self.assertEqual(prop.SPIsize, 4)
1462 self.sa.child_sas[0].rspi = prop.SPI
1463 self.sa.calc_child_keys()
1465 IKE_NODE_SUFFIX = "ip4"
1467 def verify_counters(self):
1468 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1469 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1470 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1472 r = self.vapi.ikev2_sa_dump()
1474 self.assertEqual(1, s.n_sa_auth_req)
1475 self.assertEqual(1, s.n_sa_init_req)
1477 def test_responder(self):
1478 self.send_sa_init_req()
1480 self.verify_ipsec_sas()
1481 self.verify_ike_sas()
1482 self.verify_counters()
1485 class Ikev2Params(object):
1486 def config_params(self, params={}):
1487 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1488 ei = VppEnum.vl_api_ipsec_integ_alg_t
1490 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1491 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1492 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1493 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1494 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1495 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1496 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1497 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1498 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1499 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1502 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1504 self.vapi.cli("ikev2 dpd disable")
1505 self.del_sa_from_responder = (
1507 if "del_sa_from_responder" not in params
1508 else params["del_sa_from_responder"]
1510 i_natt = False if "i_natt" not in params else params["i_natt"]
1511 r_natt = False if "r_natt" not in params else params["r_natt"]
1512 self.p = Profile(self, "pr1")
1513 self.ip6 = False if "ip6" not in params else params["ip6"]
1515 if "auth" in params and params["auth"] == "rsa-sig":
1516 auth_method = "rsa-sig"
1517 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1518 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1520 client_file = work_dir + params["client-cert"]
1521 server_pem = open(work_dir + params["server-cert"]).read()
1522 client_priv = open(work_dir + params["client-key"]).read()
1523 client_priv = load_pem_private_key(
1524 str.encode(client_priv), None, default_backend()
1526 self.peer_cert = x509.load_pem_x509_certificate(
1527 str.encode(server_pem), default_backend()
1529 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1532 auth_data = b"$3cr3tpa$$w0rd"
1533 self.p.add_auth(method="shared-key", data=auth_data)
1534 auth_method = "shared-key"
1537 is_init = True if "is_initiator" not in params else params["is_initiator"]
1538 self.no_idr_auth = params.get("no_idr_in_auth", False)
1540 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1541 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1542 r_id = self.idr = idr["data"]
1543 i_id = self.idi = idi["data"]
1545 # scapy is initiator, VPP is responder
1546 self.p.add_local_id(**idr)
1547 self.p.add_remote_id(**idi)
1548 if self.no_idr_auth:
1551 # VPP is initiator, scapy is responder
1552 self.p.add_local_id(**idi)
1553 if not self.no_idr_auth:
1554 self.p.add_remote_id(**idr)
1557 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1558 if "loc_ts" not in params
1559 else params["loc_ts"]
1562 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1563 if "rem_ts" not in params
1564 else params["rem_ts"]
1566 self.p.add_local_ts(**loc_ts)
1567 self.p.add_remote_ts(**rem_ts)
1568 if "responder" in params:
1569 self.p.add_responder(params["responder"])
1570 if "ike_transforms" in params:
1571 self.p.add_ike_transforms(params["ike_transforms"])
1572 if "esp_transforms" in params:
1573 self.p.add_esp_transforms(params["esp_transforms"])
1575 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1577 self.p.set_udp_encap(True)
1579 if "responder_hostname" in params:
1580 hn = params["responder_hostname"]
1581 self.p.add_responder_hostname(hn)
1583 # configure static dns record
1584 self.vapi.dns_name_server_add_del(
1585 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1587 self.vapi.dns_enable_disable(enable=1)
1589 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1596 is_initiator=is_init,
1597 id_type=self.p.local_id["id_type"],
1600 priv_key=client_priv,
1601 auth_method=auth_method,
1602 nonce=params.get("nonce"),
1603 auth_data=auth_data,
1604 udp_encap=udp_encap,
1605 local_ts=self.p.remote_ts,
1606 remote_ts=self.p.local_ts,
1611 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1614 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1616 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1619 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1622 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1625 self.sa.set_ike_props(
1626 crypto=ike_crypto[0],
1627 crypto_key_len=ike_crypto[1],
1629 prf="PRF_HMAC_SHA2_256",
1632 self.sa.set_esp_props(
1633 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1637 class TestApi(VppTestCase):
1638 """Test IKEV2 API"""
1641 def setUpClass(cls):
1642 super(TestApi, cls).setUpClass()
1645 def tearDownClass(cls):
1646 super(TestApi, cls).tearDownClass()
1649 super(TestApi, self).tearDown()
1650 self.p1.remove_vpp_config()
1651 self.p2.remove_vpp_config()
1652 r = self.vapi.ikev2_profile_dump()
1653 self.assertEqual(len(r), 0)
1655 def configure_profile(self, cfg):
1656 p = Profile(self, cfg["name"])
1657 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1658 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1659 p.add_local_ts(**cfg["loc_ts"])
1660 p.add_remote_ts(**cfg["rem_ts"])
1661 p.add_responder(cfg["responder"])
1662 p.add_ike_transforms(cfg["ike_ts"])
1663 p.add_esp_transforms(cfg["esp_ts"])
1664 p.add_auth(**cfg["auth"])
1665 p.set_udp_encap(cfg["udp_encap"])
1666 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1667 if "lifetime_data" in cfg:
1668 p.set_lifetime_data(cfg["lifetime_data"])
1669 if "tun_itf" in cfg:
1670 p.set_tunnel_interface(cfg["tun_itf"])
1671 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1676 def test_profile_api(self):
1677 """test profile dump API"""
1682 "start_addr": "3.3.3.2",
1683 "end_addr": "3.3.3.3",
1689 "start_addr": "4.5.76.80",
1690 "end_addr": "2.3.4.6",
1697 "start_addr": "ab::1",
1698 "end_addr": "ab::4",
1704 "start_addr": "cd::12",
1705 "end_addr": "cd::13",
1711 "natt_disabled": True,
1712 "loc_id": ("fqdn", b"vpp.home"),
1713 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1716 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1719 "crypto_key_size": 32,
1723 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1724 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1726 "ipsec_over_udp_port": 4501,
1729 "lifetime_maxdata": 20192,
1730 "lifetime_jitter": 9,
1736 "loc_id": ("ip4-addr", b"192.168.2.1"),
1737 "rem_id": ("ip6-addr", b"abcd::1"),
1740 "responder": {"sw_if_index": 4, "addr": "def::10"},
1743 "crypto_key_size": 16,
1747 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1748 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1750 "ipsec_over_udp_port": 4600,
1754 self.p1 = self.configure_profile(conf["p1"])
1755 self.p2 = self.configure_profile(conf["p2"])
1757 r = self.vapi.ikev2_profile_dump()
1758 self.assertEqual(len(r), 2)
1759 self.verify_profile(r[0].profile, conf["p1"])
1760 self.verify_profile(r[1].profile, conf["p2"])
1762 def verify_id(self, api_id, cfg_id):
1763 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1764 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1766 def verify_ts(self, api_ts, cfg_ts):
1767 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1768 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1769 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1770 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1771 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1773 def verify_responder(self, api_r, cfg_r):
1774 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1775 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1777 def verify_transforms(self, api_ts, cfg_ts):
1778 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1779 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1780 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1782 def verify_ike_transforms(self, api_ts, cfg_ts):
1783 self.verify_transforms(api_ts, cfg_ts)
1784 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1786 def verify_esp_transforms(self, api_ts, cfg_ts):
1787 self.verify_transforms(api_ts, cfg_ts)
1789 def verify_auth(self, api_auth, cfg_auth):
1790 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1791 self.assertEqual(api_auth.data, cfg_auth["data"])
1792 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1794 def verify_lifetime_data(self, p, ld):
1795 self.assertEqual(p.lifetime, ld["lifetime"])
1796 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1797 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1798 self.assertEqual(p.handover, ld["handover"])
1800 def verify_profile(self, ap, cp):
1801 self.assertEqual(ap.name, cp["name"])
1802 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1803 self.verify_id(ap.loc_id, cp["loc_id"])
1804 self.verify_id(ap.rem_id, cp["rem_id"])
1805 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1806 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1807 self.verify_responder(ap.responder, cp["responder"])
1808 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1809 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1810 self.verify_auth(ap.auth, cp["auth"])
1811 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1812 self.assertTrue(natt_dis == ap.natt_disabled)
1814 if "lifetime_data" in cp:
1815 self.verify_lifetime_data(ap, cp["lifetime_data"])
1816 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1818 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1820 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1823 @tag_fixme_vpp_workers
1824 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1825 """test responder - responder behind NAT"""
1827 IKE_NODE_SUFFIX = "ip4-natt"
1829 def config_tc(self):
1830 self.config_params({"r_natt": True})
1833 @tag_fixme_vpp_workers
1834 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1835 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1837 def config_tc(self):
1841 "is_initiator": False, # seen from test case perspective
1842 # thus vpp is initiator
1844 "sw_if_index": self.pg0.sw_if_index,
1845 "addr": self.pg0.remote_ip4,
1847 "ike-crypto": ("AES-GCM-16ICV", 32),
1848 "ike-integ": "NULL",
1849 "ike-dh": "3072MODPgr",
1851 "crypto_alg": 20, # "aes-gcm-16"
1852 "crypto_key_size": 256,
1853 "dh_group": 15, # "modp-3072"
1856 "crypto_alg": 12, # "aes-cbc"
1857 "crypto_key_size": 256,
1858 # "hmac-sha2-256-128"
1865 @tag_fixme_vpp_workers
1866 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1867 """test ikev2 initiator - pre shared key auth"""
1869 def config_tc(self):
1872 "is_initiator": False, # seen from test case perspective
1873 # thus vpp is initiator
1874 "ike-crypto": ("AES-GCM-16ICV", 32),
1875 "ike-integ": "NULL",
1876 "ike-dh": "3072MODPgr",
1878 "crypto_alg": 20, # "aes-gcm-16"
1879 "crypto_key_size": 256,
1880 "dh_group": 15, # "modp-3072"
1883 "crypto_alg": 12, # "aes-cbc"
1884 "crypto_key_size": 256,
1885 # "hmac-sha2-256-128"
1888 "responder_hostname": {
1889 "hostname": "vpp.responder.org",
1890 "sw_if_index": self.pg0.sw_if_index,
1896 @tag_fixme_vpp_workers
1897 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1898 """test initiator - request window size (1)"""
1900 def rekey_respond(self, req, update_child_sa_data):
1901 ih = self.get_ike_header(req)
1902 plain = self.sa.hmac_and_decrypt(ih)
1903 sa = ikev2.IKEv2_payload_SA(plain)
1904 if update_child_sa_data:
1905 prop = sa[ikev2.IKEv2_payload_Proposal]
1906 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1907 self.sa.r_nonce = self.sa.i_nonce
1908 self.sa.child_sas[0].ispi = prop.SPI
1909 self.sa.child_sas[0].rspi = prop.SPI
1910 self.sa.calc_child_keys()
1912 header = ikev2.IKEv2(
1913 init_SPI=self.sa.ispi,
1914 resp_SPI=self.sa.rspi,
1918 next_payload="Encrypted",
1920 resp = self.encrypt_ike_msg(header, sa, "SA")
1921 packet = self.create_packet(
1922 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1924 self.send_and_assert_no_replies(self.pg0, packet)
1926 def test_initiator(self):
1927 super(TestInitiatorRequestWindowSize, self).test_initiator()
1928 self.pg0.enable_capture()
1930 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1931 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1932 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1933 capture = self.pg0.get_capture(2)
1935 # reply in reverse order
1936 self.rekey_respond(capture[1], True)
1937 self.rekey_respond(capture[0], False)
1939 # verify that only the second request was accepted
1940 self.verify_ike_sas()
1941 self.verify_ipsec_sas(is_rekey=True)
1944 @tag_fixme_vpp_workers
1945 class TestInitiatorRekey(TestInitiatorPsk):
1946 """test ikev2 initiator - rekey"""
1948 def rekey_from_initiator(self):
1949 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1950 self.pg0.enable_capture()
1952 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1953 capture = self.pg0.get_capture(1)
1954 ih = self.get_ike_header(capture[0])
1955 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1956 self.assertNotIn("Response", ih.flags)
1957 self.assertIn("Initiator", ih.flags)
1958 plain = self.sa.hmac_and_decrypt(ih)
1959 sa = ikev2.IKEv2_payload_SA(plain)
1960 prop = sa[ikev2.IKEv2_payload_Proposal]
1961 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1962 self.sa.r_nonce = self.sa.i_nonce
1963 # update new responder SPI
1964 self.sa.child_sas[0].ispi = prop.SPI
1965 self.sa.child_sas[0].rspi = prop.SPI
1966 self.sa.calc_child_keys()
1967 header = ikev2.IKEv2(
1968 init_SPI=self.sa.ispi,
1969 resp_SPI=self.sa.rspi,
1973 next_payload="Encrypted",
1975 resp = self.encrypt_ike_msg(header, sa, "SA")
1976 packet = self.create_packet(
1977 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1979 self.send_and_assert_no_replies(self.pg0, packet)
1981 def test_initiator(self):
1982 super(TestInitiatorRekey, self).test_initiator()
1983 self.rekey_from_initiator()
1984 self.verify_ike_sas()
1985 self.verify_ipsec_sas(is_rekey=True)
1988 @tag_fixme_vpp_workers
1989 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1990 """test ikev2 initiator - delete IKE SA from responder"""
1992 def config_tc(self):
1995 "del_sa_from_responder": True,
1996 "is_initiator": False, # seen from test case perspective
1997 # thus vpp is initiator
1999 "sw_if_index": self.pg0.sw_if_index,
2000 "addr": self.pg0.remote_ip4,
2002 "ike-crypto": ("AES-GCM-16ICV", 32),
2003 "ike-integ": "NULL",
2004 "ike-dh": "3072MODPgr",
2006 "crypto_alg": 20, # "aes-gcm-16"
2007 "crypto_key_size": 256,
2008 "dh_group": 15, # "modp-3072"
2011 "crypto_alg": 12, # "aes-cbc"
2012 "crypto_key_size": 256,
2013 # "hmac-sha2-256-128"
2016 "no_idr_in_auth": True,
2021 @tag_fixme_vpp_workers
2022 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2023 """test ikev2 responder - initiator behind NAT"""
2025 IKE_NODE_SUFFIX = "ip4-natt"
2027 def config_tc(self):
2028 self.config_params({"i_natt": True})
2031 @tag_fixme_vpp_workers
2032 class TestResponderPsk(TemplateResponder, Ikev2Params):
2033 """test ikev2 responder - pre shared key auth"""
2035 def config_tc(self):
2036 self.config_params()
2039 @tag_fixme_vpp_workers
2040 class TestResponderDpd(TestResponderPsk):
2042 Dead peer detection test
2045 def config_tc(self):
2046 self.config_params({"dpd_disabled": False})
2051 def test_responder(self):
2052 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2053 super(TestResponderDpd, self).test_responder()
2054 self.pg0.enable_capture()
2056 # capture empty request but don't reply
2057 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2058 ih = self.get_ike_header(capture[0])
2059 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2060 plain = self.sa.hmac_and_decrypt(ih)
2061 self.assertEqual(plain, b"")
2062 # wait for SA expiration
2064 ike_sas = self.vapi.ikev2_sa_dump()
2065 self.assertEqual(len(ike_sas), 0)
2066 ipsec_sas = self.vapi.ipsec_sa_dump()
2067 self.assertEqual(len(ipsec_sas), 0)
2070 @tag_fixme_vpp_workers
2071 class TestResponderRekey(TestResponderPsk):
2072 """test ikev2 responder - rekey"""
2074 def rekey_from_initiator(self):
2075 packet = self.create_rekey_request()
2076 self.pg0.add_stream(packet)
2077 self.pg0.enable_capture()
2079 capture = self.pg0.get_capture(1)
2080 ih = self.get_ike_header(capture[0])
2081 plain = self.sa.hmac_and_decrypt(ih)
2082 sa = ikev2.IKEv2_payload_SA(plain)
2083 prop = sa[ikev2.IKEv2_payload_Proposal]
2084 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2085 # update new responder SPI
2086 self.sa.child_sas[0].rspi = prop.SPI
2088 def test_responder(self):
2089 super(TestResponderRekey, self).test_responder()
2090 self.rekey_from_initiator()
2091 self.sa.calc_child_keys()
2092 self.verify_ike_sas()
2093 self.verify_ipsec_sas(is_rekey=True)
2094 self.assert_counter(1, "rekey_req", "ip4")
2095 r = self.vapi.ikev2_sa_dump()
2096 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2099 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2100 """test ikev2 responder - non-default table id"""
2103 def setUpClass(cls):
2104 import scapy.contrib.ikev2 as _ikev2
2106 globals()["ikev2"] = _ikev2
2107 super(IkePeer, cls).setUpClass()
2108 cls.create_pg_interfaces(range(1))
2109 cls.vapi.cli("ip table add 1")
2110 cls.vapi.cli("set interface ip table pg0 1")
2111 for i in cls.pg_interfaces:
2118 def config_tc(self):
2119 self.config_params({"dpd_disabled": False})
2121 def test_responder(self):
2122 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2123 super(TestResponderVrf, self).test_responder()
2124 self.pg0.enable_capture()
2126 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2127 ih = self.get_ike_header(capture[0])
2128 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2129 plain = self.sa.hmac_and_decrypt(ih)
2130 self.assertEqual(plain, b"")
2133 @tag_fixme_vpp_workers
2134 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2135 """test ikev2 responder - cert based auth"""
2137 def config_tc(self):
2142 "server-key": "server-key.pem",
2143 "client-key": "client-key.pem",
2144 "client-cert": "client-cert.pem",
2145 "server-cert": "server-cert.pem",
2150 @tag_fixme_vpp_workers
2151 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2152 TemplateResponder, Ikev2Params
2155 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2158 def config_tc(self):
2161 "ike-crypto": ("AES-CBC", 16),
2162 "ike-integ": "SHA2-256-128",
2163 "esp-crypto": ("AES-CBC", 24),
2164 "esp-integ": "SHA2-384-192",
2165 "ike-dh": "2048MODPgr",
2166 "nonce": os.urandom(256),
2167 "no_idr_in_auth": True,
2172 @tag_fixme_vpp_workers
2173 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2174 TemplateResponder, Ikev2Params
2178 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2181 def config_tc(self):
2184 "ike-crypto": ("AES-CBC", 32),
2185 "ike-integ": "SHA2-256-128",
2186 "esp-crypto": ("AES-GCM-16ICV", 32),
2187 "esp-integ": "NULL",
2188 "ike-dh": "3072MODPgr",
2193 @tag_fixme_vpp_workers
2194 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2199 IKE_NODE_SUFFIX = "ip6"
2201 def config_tc(self):
2204 "del_sa_from_responder": True,
2207 "ike-crypto": ("AES-GCM-16ICV", 32),
2208 "ike-integ": "NULL",
2209 "ike-dh": "2048MODPgr",
2210 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2211 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2216 @tag_fixme_vpp_workers
2217 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2219 Test for keep alive messages
2222 def send_empty_req_from_responder(self):
2223 packet = self.create_empty_request()
2224 self.pg0.add_stream(packet)
2225 self.pg0.enable_capture()
2227 capture = self.pg0.get_capture(1)
2228 ih = self.get_ike_header(capture[0])
2229 self.assertEqual(ih.id, self.sa.msg_id)
2230 plain = self.sa.hmac_and_decrypt(ih)
2231 self.assertEqual(plain, b"")
2232 self.assert_counter(1, "keepalive", "ip4")
2233 r = self.vapi.ikev2_sa_dump()
2234 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2236 def test_initiator(self):
2237 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2238 self.send_empty_req_from_responder()
2241 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2242 """malformed packet test"""
2247 def config_tc(self):
2248 self.config_params()
2250 def create_ike_init_msg(self, length=None, payload=None):
2253 init_SPI="\x11" * 8,
2255 exch_type="IKE_SA_INIT",
2257 if payload is not None:
2259 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2261 def verify_bad_packet_length(self):
2262 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2263 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2264 self.assert_counter(self.pkt_count, "bad_length")
2266 def verify_bad_sa_payload_length(self):
2267 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2268 ike_msg = self.create_ike_init_msg(payload=p)
2269 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2270 self.assert_counter(self.pkt_count, "malformed_packet")
2272 def test_responder(self):
2273 self.pkt_count = 254
2274 self.verify_bad_packet_length()
2275 self.verify_bad_sa_payload_length()
2278 if __name__ == "__main__":
2279 unittest.main(testRunner=VppTestRunner)