3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
32 KEY_PAD = b"Key Pad for IKEv2"
39 # tuple structure is (p, g, key_len)
44 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
45 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
46 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
47 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
48 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
49 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
50 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
51 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
52 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
53 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
54 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
62 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
63 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
64 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
65 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
66 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
67 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
68 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
69 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
70 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
71 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
72 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
73 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
74 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
75 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
76 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
77 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
85 class CryptoAlgo(object):
86 def __init__(self, name, cipher, mode):
90 if self.cipher is not None:
91 self.bs = self.cipher.block_size // 8
93 if self.name == "AES-GCM-16ICV":
94 self.iv_len = GCM_IV_SIZE
98 def encrypt(self, data, key, aad=None):
99 iv = os.urandom(self.iv_len)
102 self.cipher(key), self.mode(iv), default_backend()
104 return iv + encryptor.update(data) + encryptor.finalize()
106 salt = key[-SALT_SIZE:]
109 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
111 encryptor.authenticate_additional_data(aad)
112 data = encryptor.update(data) + encryptor.finalize()
113 data += encryptor.tag[:GCM_ICV_SIZE]
116 def decrypt(self, data, key, aad=None, icv=None):
118 iv = data[: self.iv_len]
119 ct = data[self.iv_len :]
121 algorithms.AES(key), self.mode(iv), default_backend()
123 return decryptor.update(ct) + decryptor.finalize()
125 salt = key[-SALT_SIZE:]
126 nonce = salt + data[:GCM_IV_SIZE]
127 ct = data[GCM_IV_SIZE:]
128 key = key[:-SALT_SIZE]
130 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
132 decryptor.authenticate_additional_data(aad)
133 return decryptor.update(ct) + decryptor.finalize()
136 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
137 data = data + b"\x00" * (pad_len - 1)
138 return data + bytes([pad_len - 1])
141 class AuthAlgo(object):
142 def __init__(self, name, mac, mod, key_len, trunc_len=None):
146 self.key_len = key_len
147 self.trunc_len = trunc_len or key_len
151 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
152 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
153 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
157 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
158 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
159 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
160 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
161 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
165 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
182 class IKEv2ChildSA(object):
183 def __init__(self, local_ts, remote_ts, is_initiator):
191 self.local_ts = local_ts
192 self.remote_ts = remote_ts
195 class IKEv2SA(object):
202 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
208 auth_method="shared-key",
214 self.udp_encap = udp_encap
224 self.dh_params = None
226 self.priv_key = priv_key
227 self.is_initiator = is_initiator
228 nonce = nonce or os.urandom(32)
229 self.auth_data = auth_data
232 if isinstance(id_type, str):
233 self.id_type = IDType.value(id_type)
235 self.id_type = id_type
236 self.auth_method = auth_method
237 if self.is_initiator:
238 self.rspi = 8 * b"\x00"
243 self.ispi = 8 * b"\x00"
245 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
247 def new_msg_id(self):
252 def my_dh_pub_key(self):
253 if self.is_initiator:
254 return self.i_dh_data
255 return self.r_dh_data
258 def peer_dh_pub_key(self):
259 if self.is_initiator:
260 return self.r_dh_data
261 return self.i_dh_data
265 return self.i_natt or self.r_natt
267 def compute_secret(self):
268 priv = self.dh_private_key
269 peer = self.peer_dh_pub_key
270 p, g, l = self.ike_group
272 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
275 def generate_dh_data(self):
277 if self.ike_dh not in DH:
278 raise NotImplementedError("%s not in DH group" % self.ike_dh)
280 if self.dh_params is None:
281 dhg = DH[self.ike_dh]
282 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
283 self.dh_params = pn.parameters(default_backend())
285 priv = self.dh_params.generate_private_key()
286 pub = priv.public_key()
287 x = priv.private_numbers().x
288 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
289 y = pub.public_numbers().y
291 if self.is_initiator:
292 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
294 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
296 def complete_dh_data(self):
297 self.dh_shared_secret = self.compute_secret()
299 def calc_child_keys(self):
300 prf = self.ike_prf_alg.mod()
301 s = self.i_nonce + self.r_nonce
302 c = self.child_sas[0]
304 encr_key_len = self.esp_crypto_key_len
305 integ_key_len = self.esp_integ_alg.key_len
306 salt_len = 0 if integ_key_len else 4
308 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
309 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
312 c.sk_ei = keymat[pos : pos + encr_key_len]
316 c.sk_ai = keymat[pos : pos + integ_key_len]
319 c.salt_ei = keymat[pos : pos + salt_len]
322 c.sk_er = keymat[pos : pos + encr_key_len]
326 c.sk_ar = keymat[pos : pos + integ_key_len]
329 c.salt_er = keymat[pos : pos + salt_len]
332 def calc_prfplus(self, prf, key, seed, length):
336 while len(r) < length and x < 255:
341 s = s + seed + bytes([x])
342 t = self.calc_prf(prf, key, s)
350 def calc_prf(self, prf, key, data):
351 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
356 prf = self.ike_prf_alg.mod()
357 # SKEYSEED = prf(Ni | Nr, g^ir)
358 s = self.i_nonce + self.r_nonce
359 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
361 # calculate S = Ni | Nr | SPIi SPIr
362 s = s + self.ispi + self.rspi
364 prf_key_trunc = self.ike_prf_alg.trunc_len
365 encr_key_len = self.ike_crypto_key_len
366 tr_prf_key_len = self.ike_prf_alg.key_len
367 integ_key_len = self.ike_integ_alg.key_len
368 if integ_key_len == 0:
380 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
383 self.sk_d = keymat[: pos + prf_key_trunc]
386 self.sk_ai = keymat[pos : pos + integ_key_len]
388 self.sk_ar = keymat[pos : pos + integ_key_len]
391 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
392 pos += encr_key_len + salt_size
393 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
394 pos += encr_key_len + salt_size
396 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
397 pos += tr_prf_key_len
398 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
400 def generate_authmsg(self, prf, packet):
401 if self.is_initiator:
409 data = bytes([self.id_type, 0, 0, 0]) + id
410 id_hash = self.calc_prf(prf, key, data)
411 return packet + nonce + id_hash
414 prf = self.ike_prf_alg.mod()
415 if self.is_initiator:
416 packet = self.init_req_packet
418 packet = self.init_resp_packet
419 authmsg = self.generate_authmsg(prf, raw(packet))
420 if self.auth_method == "shared-key":
421 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
422 self.auth_data = self.calc_prf(prf, psk, authmsg)
423 elif self.auth_method == "rsa-sig":
424 self.auth_data = self.priv_key.sign(
425 authmsg, padding.PKCS1v15(), hashes.SHA1()
428 raise TypeError("unknown auth method type!")
430 def encrypt(self, data, aad=None):
431 data = self.ike_crypto_alg.pad(data)
432 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
435 def peer_authkey(self):
436 if self.is_initiator:
441 def my_authkey(self):
442 if self.is_initiator:
447 def my_cryptokey(self):
448 if self.is_initiator:
453 def peer_cryptokey(self):
454 if self.is_initiator:
458 def concat(self, alg, key_len):
459 return alg + "-" + str(key_len * 8)
462 def vpp_ike_cypto_alg(self):
463 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
466 def vpp_esp_cypto_alg(self):
467 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
469 def verify_hmac(self, ikemsg):
470 integ_trunc = self.ike_integ_alg.trunc_len
471 exp_hmac = ikemsg[-integ_trunc:]
472 data = ikemsg[:-integ_trunc]
473 computed_hmac = self.compute_hmac(
474 self.ike_integ_alg.mod(), self.peer_authkey, data
476 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
478 def compute_hmac(self, integ, key, data):
479 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
483 def decrypt(self, data, aad=None, icv=None):
484 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
486 def hmac_and_decrypt(self, ike):
487 ep = ike[ikev2.IKEv2_payload_Encrypted]
488 if self.ike_crypto == "AES-GCM-16ICV":
489 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
490 ct = ep.load[:-GCM_ICV_SIZE]
491 tag = ep.load[-GCM_ICV_SIZE:]
492 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
494 self.verify_hmac(raw(ike))
495 integ_trunc = self.ike_integ_alg.trunc_len
497 # remove ICV and decrypt payload
498 ct = ep.load[:-integ_trunc]
499 plain = self.decrypt(ct)
502 return plain[: -pad_len - 1]
504 def build_ts_addr(self, ts, version):
506 "starting_address_v" + version: ts["start_addr"],
507 "ending_address_v" + version: ts["end_addr"],
510 def generate_ts(self, is_ip4):
511 c = self.child_sas[0]
512 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
514 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
515 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
516 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
517 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
519 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
520 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
521 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
522 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
524 if self.is_initiator:
525 return ([ts1], [ts2])
526 return ([ts2], [ts1])
528 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
529 if crypto not in CRYPTO_ALGOS:
530 raise TypeError("unsupported encryption algo %r" % crypto)
531 self.ike_crypto = crypto
532 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
533 self.ike_crypto_key_len = crypto_key_len
535 if integ not in AUTH_ALGOS:
536 raise TypeError("unsupported auth algo %r" % integ)
537 self.ike_integ = None if integ == "NULL" else integ
538 self.ike_integ_alg = AUTH_ALGOS[integ]
540 if prf not in PRF_ALGOS:
541 raise TypeError("unsupported prf algo %r" % prf)
543 self.ike_prf_alg = PRF_ALGOS[prf]
545 self.ike_group = DH[self.ike_dh]
547 def set_esp_props(self, crypto, crypto_key_len, integ):
548 self.esp_crypto_key_len = crypto_key_len
549 if crypto not in CRYPTO_ALGOS:
550 raise TypeError("unsupported encryption algo %r" % crypto)
551 self.esp_crypto = crypto
552 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
554 if integ not in AUTH_ALGOS:
555 raise TypeError("unsupported auth algo %r" % integ)
556 self.esp_integ = None if integ == "NULL" else integ
557 self.esp_integ_alg = AUTH_ALGOS[integ]
559 def crypto_attr(self, key_len):
560 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
561 return (0x800E << 16 | key_len << 3, 12)
563 raise Exception("unsupported attribute type")
565 def ike_crypto_attr(self):
566 return self.crypto_attr(self.ike_crypto_key_len)
568 def esp_crypto_attr(self):
569 return self.crypto_attr(self.esp_crypto_key_len)
571 def compute_nat_sha1(self, ip, port, rspi=None):
574 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
575 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
577 return digest.finalize()
580 class IkePeer(VppTestCase):
581 """common class for initiator and responder"""
585 import scapy.contrib.ikev2 as _ikev2
587 globals()["ikev2"] = _ikev2
588 super(IkePeer, cls).setUpClass()
589 cls.create_pg_interfaces(range(2))
590 for i in cls.pg_interfaces:
598 def tearDownClass(cls):
599 super(IkePeer, cls).tearDownClass()
602 super(IkePeer, self).tearDown()
603 if self.del_sa_from_responder:
604 self.initiate_del_sa_from_responder()
606 self.initiate_del_sa_from_initiator()
607 r = self.vapi.ikev2_sa_dump()
608 self.assertEqual(len(r), 0)
609 sas = self.vapi.ipsec_sa_dump()
610 self.assertEqual(len(sas), 0)
611 self.p.remove_vpp_config()
612 self.assertIsNone(self.p.query_vpp_config())
615 super(IkePeer, self).setUp()
617 self.p.add_vpp_config()
618 self.assertIsNotNone(self.p.query_vpp_config())
619 if self.sa.is_initiator:
620 self.sa.generate_dh_data()
621 self.vapi.cli("ikev2 set logging level 4")
622 self.vapi.cli("event-lo clear")
624 def assert_counter(self, count, name, version="ip4"):
625 node_name = "/err/ikev2-%s/" % version + name
626 self.assertEqual(count, self.statistics.get_err_counter(node_name))
628 def create_rekey_request(self):
629 sa, first_payload = self.generate_auth_payload(is_rekey=True)
630 header = ikev2.IKEv2(
631 init_SPI=self.sa.ispi,
632 resp_SPI=self.sa.rspi,
633 id=self.sa.new_msg_id(),
635 exch_type="CREATE_CHILD_SA",
638 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
639 return self.create_packet(
640 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
643 def create_empty_request(self):
644 header = ikev2.IKEv2(
645 init_SPI=self.sa.ispi,
646 resp_SPI=self.sa.rspi,
647 id=self.sa.new_msg_id(),
649 exch_type="INFORMATIONAL",
650 next_payload="Encrypted",
653 msg = self.encrypt_ike_msg(header, b"", None)
654 return self.create_packet(
655 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
659 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
662 src_ip = src_if.remote_ip6
663 dst_ip = src_if.local_ip6
666 src_ip = src_if.remote_ip4
667 dst_ip = src_if.local_ip4
670 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
671 / ip_layer(src=src_ip, dst=dst_ip)
672 / UDP(sport=sport, dport=dport)
675 # insert non ESP marker
676 res = res / Raw(b"\x00" * 4)
679 def verify_udp(self, udp):
680 self.assertEqual(udp.sport, self.sa.sport)
681 self.assertEqual(udp.dport, self.sa.dport)
683 def get_ike_header(self, packet):
685 ih = packet[ikev2.IKEv2]
686 ih = self.verify_and_remove_non_esp_marker(ih)
687 except IndexError as e:
688 # this is a workaround for getting IKEv2 layer as both ikev2 and
689 # ipsec register for port 4500
691 ih = self.verify_and_remove_non_esp_marker(esp)
692 self.assertEqual(ih.version, 0x20)
693 self.assertNotIn("Version", ih.flags)
696 def verify_and_remove_non_esp_marker(self, packet):
698 # if we are in nat traversal mode check for non esp marker
701 self.assertEqual(data[:4], b"\x00" * 4)
702 return ikev2.IKEv2(data[4:])
706 def encrypt_ike_msg(self, header, plain, first_payload):
707 if self.sa.ike_crypto == "AES-GCM-16ICV":
708 data = self.sa.ike_crypto_alg.pad(raw(plain))
713 + len(ikev2.IKEv2_payload_Encrypted())
715 tlen = plen + len(ikev2.IKEv2())
718 sk_p = ikev2.IKEv2_payload_Encrypted(
719 next_payload=first_payload, length=plen
723 encr = self.sa.encrypt(raw(plain), raw(res))
724 sk_p = ikev2.IKEv2_payload_Encrypted(
725 next_payload=first_payload, length=plen, load=encr
729 encr = self.sa.encrypt(raw(plain))
730 trunc_len = self.sa.ike_integ_alg.trunc_len
731 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
732 tlen = plen + len(ikev2.IKEv2())
734 sk_p = ikev2.IKEv2_payload_Encrypted(
735 next_payload=first_payload, length=plen, load=encr
740 integ_data = raw(res)
741 hmac_data = self.sa.compute_hmac(
742 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
744 res = res / Raw(hmac_data[:trunc_len])
745 assert len(res) == tlen
748 def verify_udp_encap(self, ipsec_sa):
749 e = VppEnum.vl_api_ipsec_sad_flags_t
750 if self.sa.udp_encap or self.sa.natt:
751 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
753 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
755 def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
756 sas = self.vapi.ipsec_sa_dump()
759 # after rekey there is a short period of time in which old
760 # inbound SA is still present
764 self.assertEqual(len(sas), sa_count)
765 if self.sa.is_initiator:
780 c = self.sa.child_sas[0]
782 self.verify_udp_encap(sa0)
783 self.verify_udp_encap(sa1)
784 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
785 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
786 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
788 if self.sa.esp_integ is None:
791 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
792 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
793 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
796 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
797 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
798 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
799 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
803 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
804 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
805 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
806 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
808 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
809 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
811 def verify_keymat(self, api_keys, keys, name):
812 km = getattr(keys, name)
813 api_km = getattr(api_keys, name)
814 api_km_len = getattr(api_keys, name + "_len")
815 self.assertEqual(len(km), api_km_len)
816 self.assertEqual(km, api_km[:api_km_len])
818 def verify_id(self, api_id, exp_id):
819 self.assertEqual(api_id.type, IDType.value(exp_id.type))
820 self.assertEqual(api_id.data_len, exp_id.data_len)
821 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
823 def verify_ike_sas(self):
824 r = self.vapi.ikev2_sa_dump()
825 self.assertEqual(len(r), 1)
827 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
828 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
830 if self.sa.is_initiator:
831 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
832 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
834 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
835 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
837 if self.sa.is_initiator:
838 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
839 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
841 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
842 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
843 self.verify_keymat(sa.keys, self.sa, "sk_d")
844 self.verify_keymat(sa.keys, self.sa, "sk_ai")
845 self.verify_keymat(sa.keys, self.sa, "sk_ar")
846 self.verify_keymat(sa.keys, self.sa, "sk_ei")
847 self.verify_keymat(sa.keys, self.sa, "sk_er")
848 self.verify_keymat(sa.keys, self.sa, "sk_pi")
849 self.verify_keymat(sa.keys, self.sa, "sk_pr")
851 self.assertEqual(sa.i_id.type, self.sa.id_type)
852 self.assertEqual(sa.r_id.type, self.sa.id_type)
853 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
854 self.assertEqual(sa.r_id.data_len, len(self.idr))
855 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
856 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
858 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
859 self.assertEqual(len(r), 1)
861 self.assertEqual(csa.sa_index, sa.sa_index)
862 c = self.sa.child_sas[0]
863 if hasattr(c, "sk_ai"):
864 self.verify_keymat(csa.keys, c, "sk_ai")
865 self.verify_keymat(csa.keys, c, "sk_ar")
866 self.verify_keymat(csa.keys, c, "sk_ei")
867 self.verify_keymat(csa.keys, c, "sk_er")
868 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
869 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
871 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
874 r = self.vapi.ikev2_traffic_selector_dump(
875 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
877 self.assertEqual(len(r), 1)
879 self.verify_ts(r[0].ts, tsi[0], True)
881 r = self.vapi.ikev2_traffic_selector_dump(
882 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
884 self.assertEqual(len(r), 1)
885 self.verify_ts(r[0].ts, tsr[0], False)
887 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
888 self.verify_nonce(n, self.sa.i_nonce)
889 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
890 self.verify_nonce(n, self.sa.r_nonce)
892 def verify_nonce(self, api_nonce, nonce):
893 self.assertEqual(api_nonce.data_len, len(nonce))
894 self.assertEqual(api_nonce.nonce, nonce)
896 def verify_ts(self, api_ts, ts, is_initiator):
898 self.assertTrue(api_ts.is_local)
900 self.assertFalse(api_ts.is_local)
903 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
904 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
906 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
907 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
908 self.assertEqual(api_ts.start_port, ts.start_port)
909 self.assertEqual(api_ts.end_port, ts.end_port)
910 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
913 class TemplateInitiator(IkePeer):
914 """initiator test template"""
916 def initiate_del_sa_from_initiator(self):
917 ispi = int.from_bytes(self.sa.ispi, "little")
918 self.pg0.enable_capture()
920 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
921 capture = self.pg0.get_capture(1)
922 ih = self.get_ike_header(capture[0])
923 self.assertNotIn("Response", ih.flags)
924 self.assertIn("Initiator", ih.flags)
925 self.assertEqual(ih.init_SPI, self.sa.ispi)
926 self.assertEqual(ih.resp_SPI, self.sa.rspi)
927 plain = self.sa.hmac_and_decrypt(ih)
928 d = ikev2.IKEv2_payload_Delete(plain)
929 self.assertEqual(d.proto, 1) # proto=IKEv2
930 header = ikev2.IKEv2(
931 init_SPI=self.sa.ispi,
932 resp_SPI=self.sa.rspi,
934 exch_type="INFORMATIONAL",
936 next_payload="Encrypted",
938 resp = self.encrypt_ike_msg(header, b"", None)
939 self.send_and_assert_no_replies(self.pg0, resp)
941 def verify_del_sa(self, packet):
942 ih = self.get_ike_header(packet)
943 self.assertEqual(ih.id, self.sa.msg_id)
944 self.assertEqual(ih.exch_type, 37) # exchange informational
945 self.assertIn("Response", ih.flags)
946 self.assertIn("Initiator", ih.flags)
947 plain = self.sa.hmac_and_decrypt(ih)
948 self.assertEqual(plain, b"")
950 def initiate_del_sa_from_responder(self):
951 header = ikev2.IKEv2(
952 init_SPI=self.sa.ispi,
953 resp_SPI=self.sa.rspi,
954 exch_type="INFORMATIONAL",
955 id=self.sa.new_msg_id(),
957 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
958 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
959 packet = self.create_packet(
960 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
962 self.pg0.add_stream(packet)
963 self.pg0.enable_capture()
965 capture = self.pg0.get_capture(1)
966 self.verify_del_sa(capture[0])
969 def find_notify_payload(packet, notify_type):
970 n = packet[ikev2.IKEv2_payload_Notify]
972 if n.type == notify_type:
977 def verify_nat_detection(self, packet):
984 # NAT_DETECTION_SOURCE_IP
985 s = self.find_notify_payload(packet, 16388)
986 self.assertIsNotNone(s)
987 src_sha = self.sa.compute_nat_sha1(
988 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
990 self.assertEqual(s.load, src_sha)
992 # NAT_DETECTION_DESTINATION_IP
993 s = self.find_notify_payload(packet, 16389)
994 self.assertIsNotNone(s)
995 dst_sha = self.sa.compute_nat_sha1(
996 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
998 self.assertEqual(s.load, dst_sha)
1000 def verify_sa_init_request(self, packet):
1002 self.sa.dport = udp.sport
1003 ih = packet[ikev2.IKEv2]
1004 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1005 self.assertEqual(ih.exch_type, 34) # SA_INIT
1006 self.sa.ispi = ih.init_SPI
1007 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1008 self.assertIn("Initiator", ih.flags)
1009 self.assertNotIn("Response", ih.flags)
1010 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1011 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1013 prop = packet[ikev2.IKEv2_payload_Proposal]
1014 self.assertEqual(prop.proto, 1) # proto = ikev2
1015 self.assertEqual(prop.proposal, 1)
1016 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1018 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1020 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1021 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1022 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1023 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1025 self.verify_nat_detection(packet)
1026 self.sa.set_ike_props(
1027 crypto="AES-GCM-16ICV",
1030 prf="PRF_HMAC_SHA2_256",
1033 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1034 self.sa.generate_dh_data()
1035 self.sa.complete_dh_data()
1038 def update_esp_transforms(self, trans, sa):
1040 if trans.transform_type == 1: # ecryption
1041 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1042 elif trans.transform_type == 3: # integrity
1043 sa.esp_integ = INTEG_IDS[trans.transform_id]
1044 trans = trans.payload
1046 def verify_sa_auth_req(self, packet):
1048 self.sa.dport = udp.sport
1049 ih = self.get_ike_header(packet)
1050 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1051 self.assertEqual(ih.init_SPI, self.sa.ispi)
1052 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1053 self.assertIn("Initiator", ih.flags)
1054 self.assertNotIn("Response", ih.flags)
1057 self.verify_udp(udp)
1058 self.assertEqual(ih.id, self.sa.msg_id + 1)
1060 plain = self.sa.hmac_and_decrypt(ih)
1061 idi = ikev2.IKEv2_payload_IDi(plain)
1062 self.assertEqual(idi.load, self.sa.i_id)
1063 if self.no_idr_auth:
1064 self.assertEqual(idi.next_payload, 39) # AUTH
1066 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1067 self.assertEqual(idr.load, self.sa.r_id)
1068 prop = idi[ikev2.IKEv2_payload_Proposal]
1069 c = self.sa.child_sas[0]
1071 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1073 def send_init_response(self):
1074 tr_attr = self.sa.ike_crypto_attr()
1076 ikev2.IKEv2_payload_Transform(
1077 transform_type="Encryption",
1078 transform_id=self.sa.ike_crypto,
1080 key_length=tr_attr[0],
1082 / ikev2.IKEv2_payload_Transform(
1083 transform_type="Integrity", transform_id=self.sa.ike_integ
1085 / ikev2.IKEv2_payload_Transform(
1086 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1088 / ikev2.IKEv2_payload_Transform(
1089 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1092 props = ikev2.IKEv2_payload_Proposal(
1093 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1096 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1098 dst_address = b"\x0a\x0a\x0a\x0a"
1100 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1101 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1102 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1104 self.sa.init_resp_packet = (
1106 init_SPI=self.sa.ispi,
1107 resp_SPI=self.sa.rspi,
1108 exch_type="IKE_SA_INIT",
1111 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1112 / ikev2.IKEv2_payload_KE(
1113 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1115 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1116 / ikev2.IKEv2_payload_Notify(
1117 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1119 / ikev2.IKEv2_payload_Notify(
1120 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1124 ike_msg = self.create_packet(
1126 self.sa.init_resp_packet,
1132 self.pg_send(self.pg0, ike_msg)
1133 capture = self.pg0.get_capture(1)
1134 self.verify_sa_auth_req(capture[0])
1136 def initiate_sa_init(self):
1137 self.pg0.enable_capture()
1139 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1141 capture = self.pg0.get_capture(1)
1142 self.verify_sa_init_request(capture[0])
1143 self.send_init_response()
1145 def send_auth_response(self):
1146 tr_attr = self.sa.esp_crypto_attr()
1148 ikev2.IKEv2_payload_Transform(
1149 transform_type="Encryption",
1150 transform_id=self.sa.esp_crypto,
1152 key_length=tr_attr[0],
1154 / ikev2.IKEv2_payload_Transform(
1155 transform_type="Integrity", transform_id=self.sa.esp_integ
1157 / ikev2.IKEv2_payload_Transform(
1158 transform_type="Extended Sequence Number", transform_id="No ESN"
1160 / ikev2.IKEv2_payload_Transform(
1161 transform_type="Extended Sequence Number", transform_id="ESN"
1165 c = self.sa.child_sas[0]
1166 props = ikev2.IKEv2_payload_Proposal(
1167 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1170 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1172 ikev2.IKEv2_payload_IDi(
1173 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1175 / ikev2.IKEv2_payload_IDr(
1176 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1178 / ikev2.IKEv2_payload_AUTH(
1180 auth_type=AuthMethod.value(self.sa.auth_method),
1181 load=self.sa.auth_data,
1183 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1184 / ikev2.IKEv2_payload_TSi(
1185 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1187 / ikev2.IKEv2_payload_TSr(
1188 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1190 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1193 header = ikev2.IKEv2(
1194 init_SPI=self.sa.ispi,
1195 resp_SPI=self.sa.rspi,
1196 id=self.sa.new_msg_id(),
1198 exch_type="IKE_AUTH",
1201 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1202 packet = self.create_packet(
1203 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1205 self.pg_send(self.pg0, packet)
1207 def test_initiator(self):
1208 self.initiate_sa_init()
1210 self.sa.calc_child_keys()
1211 self.send_auth_response()
1212 self.verify_ike_sas()
1215 class TemplateResponder(IkePeer):
1216 """responder test template"""
1218 def initiate_del_sa_from_responder(self):
1219 self.pg0.enable_capture()
1221 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1222 capture = self.pg0.get_capture(1)
1223 ih = self.get_ike_header(capture[0])
1224 self.assertNotIn("Response", ih.flags)
1225 self.assertNotIn("Initiator", ih.flags)
1226 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1227 plain = self.sa.hmac_and_decrypt(ih)
1228 d = ikev2.IKEv2_payload_Delete(plain)
1229 self.assertEqual(d.proto, 1) # proto=IKEv2
1230 self.assertEqual(ih.init_SPI, self.sa.ispi)
1231 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1232 header = ikev2.IKEv2(
1233 init_SPI=self.sa.ispi,
1234 resp_SPI=self.sa.rspi,
1235 flags="Initiator+Response",
1236 exch_type="INFORMATIONAL",
1238 next_payload="Encrypted",
1240 resp = self.encrypt_ike_msg(header, b"", None)
1241 self.send_and_assert_no_replies(self.pg0, resp)
1243 def verify_del_sa(self, packet):
1244 ih = self.get_ike_header(packet)
1245 self.assertEqual(ih.id, self.sa.msg_id)
1246 self.assertEqual(ih.exch_type, 37) # exchange informational
1247 self.assertIn("Response", ih.flags)
1248 self.assertNotIn("Initiator", ih.flags)
1249 self.assertEqual(ih.next_payload, 46) # Encrypted
1250 self.assertEqual(ih.init_SPI, self.sa.ispi)
1251 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1252 plain = self.sa.hmac_and_decrypt(ih)
1253 self.assertEqual(plain, b"")
1255 def initiate_del_sa_from_initiator(self):
1256 header = ikev2.IKEv2(
1257 init_SPI=self.sa.ispi,
1258 resp_SPI=self.sa.rspi,
1260 exch_type="INFORMATIONAL",
1261 id=self.sa.new_msg_id(),
1263 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1264 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1265 packet = self.create_packet(
1266 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1268 self.pg0.add_stream(packet)
1269 self.pg0.enable_capture()
1271 capture = self.pg0.get_capture(1)
1272 self.verify_del_sa(capture[0])
1274 def send_sa_init_req(self):
1275 tr_attr = self.sa.ike_crypto_attr()
1277 ikev2.IKEv2_payload_Transform(
1278 transform_type="Encryption",
1279 transform_id=self.sa.ike_crypto,
1281 key_length=tr_attr[0],
1283 / ikev2.IKEv2_payload_Transform(
1284 transform_type="Integrity", transform_id=self.sa.ike_integ
1286 / ikev2.IKEv2_payload_Transform(
1287 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1289 / ikev2.IKEv2_payload_Transform(
1290 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1294 props = ikev2.IKEv2_payload_Proposal(
1295 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1298 next_payload = None if self.ip6 else "Notify"
1300 self.sa.init_req_packet = (
1302 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1304 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1305 / ikev2.IKEv2_payload_KE(
1306 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1308 / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1313 src_address = b"\x0a\x0a\x0a\x01"
1315 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1318 dst_address = b"\x0a\x0a\x0a\x0a"
1320 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1322 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1323 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1324 nat_src_detection = ikev2.IKEv2_payload_Notify(
1325 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1327 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1328 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1330 self.sa.init_req_packet = (
1331 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1334 ike_msg = self.create_packet(
1336 self.sa.init_req_packet,
1342 self.pg0.add_stream(ike_msg)
1343 self.pg0.enable_capture()
1345 capture = self.pg0.get_capture(1)
1346 self.verify_sa_init(capture[0])
1348 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1349 tr_attr = self.sa.esp_crypto_attr()
1350 last_payload = last_payload or "Notify"
1352 ikev2.IKEv2_payload_Transform(
1353 transform_type="Encryption",
1354 transform_id=self.sa.esp_crypto,
1356 key_length=tr_attr[0],
1358 / ikev2.IKEv2_payload_Transform(
1359 transform_type="Integrity", transform_id=self.sa.esp_integ
1361 / ikev2.IKEv2_payload_Transform(
1362 transform_type="Extended Sequence Number", transform_id="No ESN"
1364 / ikev2.IKEv2_payload_Transform(
1365 transform_type="Extended Sequence Number", transform_id="ESN"
1369 c = self.sa.child_sas[0]
1370 props = ikev2.IKEv2_payload_Proposal(
1371 proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
1374 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1376 ikev2.IKEv2_payload_AUTH(
1378 auth_type=AuthMethod.value(self.sa.auth_method),
1379 load=self.sa.auth_data,
1381 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1382 / ikev2.IKEv2_payload_TSi(
1383 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1385 / ikev2.IKEv2_payload_TSr(
1386 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1391 first_payload = "Nonce"
1393 ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
1395 / ikev2.IKEv2_payload_Notify(
1399 length=8 + len(c.ispi),
1400 next_payload="Notify",
1402 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1405 first_payload = "IDi"
1406 if self.no_idr_auth:
1407 ids = ikev2.IKEv2_payload_IDi(
1408 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1411 ids = ikev2.IKEv2_payload_IDi(
1412 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1413 ) / ikev2.IKEv2_payload_IDr(
1414 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1417 return plain, first_payload
1419 def send_sa_auth(self):
1420 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1421 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1422 header = ikev2.IKEv2(
1423 init_SPI=self.sa.ispi,
1424 resp_SPI=self.sa.rspi,
1425 id=self.sa.new_msg_id(),
1427 exch_type="IKE_AUTH",
1430 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1431 packet = self.create_packet(
1432 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1434 self.pg0.add_stream(packet)
1435 self.pg0.enable_capture()
1437 capture = self.pg0.get_capture(1)
1438 self.verify_sa_auth_resp(capture[0])
1440 def verify_sa_init(self, packet):
1441 ih = self.get_ike_header(packet)
1443 self.assertEqual(ih.id, self.sa.msg_id)
1444 self.assertEqual(ih.exch_type, 34)
1445 self.assertIn("Response", ih.flags)
1446 self.assertEqual(ih.init_SPI, self.sa.ispi)
1447 self.assertNotEqual(ih.resp_SPI, 0)
1448 self.sa.rspi = ih.resp_SPI
1450 sa = ih[ikev2.IKEv2_payload_SA]
1451 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1452 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1453 except IndexError as e:
1454 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1455 self.logger.error(ih.show())
1457 self.sa.complete_dh_data()
1461 def verify_sa_auth_resp(self, packet):
1462 ike = self.get_ike_header(packet)
1464 self.verify_udp(udp)
1465 self.assertEqual(ike.id, self.sa.msg_id)
1466 plain = self.sa.hmac_and_decrypt(ike)
1467 idr = ikev2.IKEv2_payload_IDr(plain)
1468 prop = idr[ikev2.IKEv2_payload_Proposal]
1469 self.assertEqual(prop.SPIsize, 4)
1470 self.sa.child_sas[0].rspi = prop.SPI
1471 self.sa.calc_child_keys()
1473 IKE_NODE_SUFFIX = "ip4"
1475 def verify_counters(self):
1476 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1477 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1478 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1480 r = self.vapi.ikev2_sa_dump()
1482 self.assertEqual(1, s.n_sa_auth_req)
1483 self.assertEqual(1, s.n_sa_init_req)
1485 def test_responder(self):
1486 self.send_sa_init_req()
1488 self.verify_ipsec_sas()
1489 self.verify_ike_sas()
1490 self.verify_counters()
1493 class Ikev2Params(object):
1494 def config_params(self, params={}):
1495 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1496 ei = VppEnum.vl_api_ipsec_integ_alg_t
1498 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1499 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1500 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1501 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1502 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1503 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1504 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1505 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1506 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1507 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1510 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1512 self.vapi.cli("ikev2 dpd disable")
1513 self.del_sa_from_responder = (
1515 if "del_sa_from_responder" not in params
1516 else params["del_sa_from_responder"]
1518 i_natt = False if "i_natt" not in params else params["i_natt"]
1519 r_natt = False if "r_natt" not in params else params["r_natt"]
1520 self.p = Profile(self, "pr1")
1521 self.ip6 = False if "ip6" not in params else params["ip6"]
1523 if "auth" in params and params["auth"] == "rsa-sig":
1524 auth_method = "rsa-sig"
1525 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1526 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1528 client_file = work_dir + params["client-cert"]
1529 server_pem = open(work_dir + params["server-cert"]).read()
1530 client_priv = open(work_dir + params["client-key"]).read()
1531 client_priv = load_pem_private_key(
1532 str.encode(client_priv), None, default_backend()
1534 self.peer_cert = x509.load_pem_x509_certificate(
1535 str.encode(server_pem), default_backend()
1537 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1540 auth_data = b"$3cr3tpa$$w0rd"
1541 self.p.add_auth(method="shared-key", data=auth_data)
1542 auth_method = "shared-key"
1545 is_init = True if "is_initiator" not in params else params["is_initiator"]
1546 self.no_idr_auth = params.get("no_idr_in_auth", False)
1548 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1549 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1550 r_id = self.idr = idr["data"]
1551 i_id = self.idi = idi["data"]
1553 # scapy is initiator, VPP is responder
1554 self.p.add_local_id(**idr)
1555 self.p.add_remote_id(**idi)
1556 if self.no_idr_auth:
1559 # VPP is initiator, scapy is responder
1560 self.p.add_local_id(**idi)
1561 if not self.no_idr_auth:
1562 self.p.add_remote_id(**idr)
1565 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1566 if "loc_ts" not in params
1567 else params["loc_ts"]
1570 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1571 if "rem_ts" not in params
1572 else params["rem_ts"]
1574 self.p.add_local_ts(**loc_ts)
1575 self.p.add_remote_ts(**rem_ts)
1576 if "responder" in params:
1577 self.p.add_responder(params["responder"])
1578 if "ike_transforms" in params:
1579 self.p.add_ike_transforms(params["ike_transforms"])
1580 if "esp_transforms" in params:
1581 self.p.add_esp_transforms(params["esp_transforms"])
1583 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1585 self.p.set_udp_encap(True)
1587 if "responder_hostname" in params:
1588 hn = params["responder_hostname"]
1589 self.p.add_responder_hostname(hn)
1591 # configure static dns record
1592 self.vapi.dns_name_server_add_del(
1593 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1595 self.vapi.dns_enable_disable(enable=1)
1597 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1604 is_initiator=is_init,
1605 id_type=self.p.local_id["id_type"],
1608 priv_key=client_priv,
1609 auth_method=auth_method,
1610 nonce=params.get("nonce"),
1611 auth_data=auth_data,
1612 udp_encap=udp_encap,
1613 local_ts=self.p.remote_ts,
1614 remote_ts=self.p.local_ts,
1619 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1622 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1624 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1627 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1630 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1633 self.sa.set_ike_props(
1634 crypto=ike_crypto[0],
1635 crypto_key_len=ike_crypto[1],
1637 prf="PRF_HMAC_SHA2_256",
1640 self.sa.set_esp_props(
1641 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1645 class TestApi(VppTestCase):
1646 """Test IKEV2 API"""
1649 def setUpClass(cls):
1650 super(TestApi, cls).setUpClass()
1653 def tearDownClass(cls):
1654 super(TestApi, cls).tearDownClass()
1657 super(TestApi, self).tearDown()
1658 self.p1.remove_vpp_config()
1659 self.p2.remove_vpp_config()
1660 r = self.vapi.ikev2_profile_dump()
1661 self.assertEqual(len(r), 0)
1663 def configure_profile(self, cfg):
1664 p = Profile(self, cfg["name"])
1665 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1666 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1667 p.add_local_ts(**cfg["loc_ts"])
1668 p.add_remote_ts(**cfg["rem_ts"])
1669 p.add_responder(cfg["responder"])
1670 p.add_ike_transforms(cfg["ike_ts"])
1671 p.add_esp_transforms(cfg["esp_ts"])
1672 p.add_auth(**cfg["auth"])
1673 p.set_udp_encap(cfg["udp_encap"])
1674 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1675 if "lifetime_data" in cfg:
1676 p.set_lifetime_data(cfg["lifetime_data"])
1677 if "tun_itf" in cfg:
1678 p.set_tunnel_interface(cfg["tun_itf"])
1679 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1684 def test_profile_api(self):
1685 """test profile dump API"""
1690 "start_addr": "3.3.3.2",
1691 "end_addr": "3.3.3.3",
1697 "start_addr": "4.5.76.80",
1698 "end_addr": "2.3.4.6",
1705 "start_addr": "ab::1",
1706 "end_addr": "ab::4",
1712 "start_addr": "cd::12",
1713 "end_addr": "cd::13",
1719 "natt_disabled": True,
1720 "loc_id": ("fqdn", b"vpp.home"),
1721 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1724 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1727 "crypto_key_size": 32,
1731 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1732 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1734 "ipsec_over_udp_port": 4501,
1737 "lifetime_maxdata": 20192,
1738 "lifetime_jitter": 9,
1744 "loc_id": ("ip4-addr", b"192.168.2.1"),
1745 "rem_id": ("ip6-addr", b"abcd::1"),
1748 "responder": {"sw_if_index": 4, "addr": "def::10"},
1751 "crypto_key_size": 16,
1755 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1756 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1758 "ipsec_over_udp_port": 4600,
1762 self.p1 = self.configure_profile(conf["p1"])
1763 self.p2 = self.configure_profile(conf["p2"])
1765 r = self.vapi.ikev2_profile_dump()
1766 self.assertEqual(len(r), 2)
1767 self.verify_profile(r[0].profile, conf["p1"])
1768 self.verify_profile(r[1].profile, conf["p2"])
1770 def verify_id(self, api_id, cfg_id):
1771 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1772 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1774 def verify_ts(self, api_ts, cfg_ts):
1775 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1776 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1777 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1778 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1779 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1781 def verify_responder(self, api_r, cfg_r):
1782 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1783 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1785 def verify_transforms(self, api_ts, cfg_ts):
1786 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1787 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1788 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1790 def verify_ike_transforms(self, api_ts, cfg_ts):
1791 self.verify_transforms(api_ts, cfg_ts)
1792 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1794 def verify_esp_transforms(self, api_ts, cfg_ts):
1795 self.verify_transforms(api_ts, cfg_ts)
1797 def verify_auth(self, api_auth, cfg_auth):
1798 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1799 self.assertEqual(api_auth.data, cfg_auth["data"])
1800 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1802 def verify_lifetime_data(self, p, ld):
1803 self.assertEqual(p.lifetime, ld["lifetime"])
1804 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1805 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1806 self.assertEqual(p.handover, ld["handover"])
1808 def verify_profile(self, ap, cp):
1809 self.assertEqual(ap.name, cp["name"])
1810 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1811 self.verify_id(ap.loc_id, cp["loc_id"])
1812 self.verify_id(ap.rem_id, cp["rem_id"])
1813 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1814 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1815 self.verify_responder(ap.responder, cp["responder"])
1816 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1817 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1818 self.verify_auth(ap.auth, cp["auth"])
1819 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1820 self.assertTrue(natt_dis == ap.natt_disabled)
1822 if "lifetime_data" in cp:
1823 self.verify_lifetime_data(ap, cp["lifetime_data"])
1824 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1826 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1828 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1831 @tag_fixme_vpp_workers
1832 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1833 """test responder - responder behind NAT"""
1835 IKE_NODE_SUFFIX = "ip4-natt"
1837 def config_tc(self):
1838 self.config_params({"r_natt": True})
1841 @tag_fixme_vpp_workers
1842 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1843 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1845 def config_tc(self):
1849 "is_initiator": False, # seen from test case perspective
1850 # thus vpp is initiator
1852 "sw_if_index": self.pg0.sw_if_index,
1853 "addr": self.pg0.remote_ip4,
1855 "ike-crypto": ("AES-GCM-16ICV", 32),
1856 "ike-integ": "NULL",
1857 "ike-dh": "3072MODPgr",
1859 "crypto_alg": 20, # "aes-gcm-16"
1860 "crypto_key_size": 256,
1861 "dh_group": 15, # "modp-3072"
1864 "crypto_alg": 12, # "aes-cbc"
1865 "crypto_key_size": 256,
1866 # "hmac-sha2-256-128"
1873 @tag_fixme_vpp_workers
1874 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1875 """test ikev2 initiator - pre shared key auth"""
1877 def config_tc(self):
1880 "is_initiator": False, # seen from test case perspective
1881 # thus vpp is initiator
1882 "ike-crypto": ("AES-GCM-16ICV", 32),
1883 "ike-integ": "NULL",
1884 "ike-dh": "3072MODPgr",
1886 "crypto_alg": 20, # "aes-gcm-16"
1887 "crypto_key_size": 256,
1888 "dh_group": 15, # "modp-3072"
1891 "crypto_alg": 12, # "aes-cbc"
1892 "crypto_key_size": 256,
1893 # "hmac-sha2-256-128"
1896 "responder_hostname": {
1897 "hostname": "vpp.responder.org",
1898 "sw_if_index": self.pg0.sw_if_index,
1904 @tag_fixme_vpp_workers
1905 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1906 """test initiator - request window size (1)"""
1908 def rekey_respond(self, req, update_child_sa_data):
1909 ih = self.get_ike_header(req)
1910 plain = self.sa.hmac_and_decrypt(ih)
1911 sa = ikev2.IKEv2_payload_SA(plain)
1912 if update_child_sa_data:
1913 prop = sa[ikev2.IKEv2_payload_Proposal]
1914 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1915 self.sa.r_nonce = self.sa.i_nonce
1916 self.sa.child_sas[0].ispi = prop.SPI
1917 self.sa.child_sas[0].rspi = prop.SPI
1918 self.sa.calc_child_keys()
1920 header = ikev2.IKEv2(
1921 init_SPI=self.sa.ispi,
1922 resp_SPI=self.sa.rspi,
1926 next_payload="Encrypted",
1928 resp = self.encrypt_ike_msg(header, sa, "SA")
1929 packet = self.create_packet(
1930 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1932 self.send_and_assert_no_replies(self.pg0, packet)
1934 def test_initiator(self):
1935 super(TestInitiatorRequestWindowSize, self).test_initiator()
1936 self.pg0.enable_capture()
1938 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1939 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1940 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1941 capture = self.pg0.get_capture(2)
1943 # reply in reverse order
1944 self.rekey_respond(capture[1], True)
1945 self.rekey_respond(capture[0], False)
1947 # verify that only the second request was accepted
1948 self.verify_ike_sas()
1949 self.verify_ipsec_sas(is_rekey=True)
1952 @tag_fixme_vpp_workers
1953 class TestInitiatorRekey(TestInitiatorPsk):
1954 """test ikev2 initiator - rekey"""
1956 def rekey_from_initiator(self):
1957 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1958 self.pg0.enable_capture()
1960 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1961 capture = self.pg0.get_capture(1)
1962 ih = self.get_ike_header(capture[0])
1963 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1964 self.assertNotIn("Response", ih.flags)
1965 self.assertIn("Initiator", ih.flags)
1966 plain = self.sa.hmac_and_decrypt(ih)
1967 sa = ikev2.IKEv2_payload_SA(plain)
1968 prop = sa[ikev2.IKEv2_payload_Proposal]
1969 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1970 self.sa.r_nonce = self.sa.i_nonce
1971 # update new responder SPI
1972 self.sa.child_sas[0].ispi = prop.SPI
1973 self.sa.child_sas[0].rspi = prop.SPI
1974 self.sa.calc_child_keys()
1975 header = ikev2.IKEv2(
1976 init_SPI=self.sa.ispi,
1977 resp_SPI=self.sa.rspi,
1981 next_payload="Encrypted",
1983 resp = self.encrypt_ike_msg(header, sa, "SA")
1984 packet = self.create_packet(
1985 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1987 self.send_and_assert_no_replies(self.pg0, packet)
1989 def test_initiator(self):
1990 super(TestInitiatorRekey, self).test_initiator()
1991 self.rekey_from_initiator()
1992 self.verify_ike_sas()
1993 self.verify_ipsec_sas(is_rekey=True)
1996 @tag_fixme_vpp_workers
1997 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1998 """test ikev2 initiator - delete IKE SA from responder"""
2000 def config_tc(self):
2003 "del_sa_from_responder": True,
2004 "is_initiator": False, # seen from test case perspective
2005 # thus vpp is initiator
2007 "sw_if_index": self.pg0.sw_if_index,
2008 "addr": self.pg0.remote_ip4,
2010 "ike-crypto": ("AES-GCM-16ICV", 32),
2011 "ike-integ": "NULL",
2012 "ike-dh": "3072MODPgr",
2014 "crypto_alg": 20, # "aes-gcm-16"
2015 "crypto_key_size": 256,
2016 "dh_group": 15, # "modp-3072"
2019 "crypto_alg": 12, # "aes-cbc"
2020 "crypto_key_size": 256,
2021 # "hmac-sha2-256-128"
2024 "no_idr_in_auth": True,
2029 @tag_fixme_vpp_workers
2030 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2031 """test ikev2 responder - initiator behind NAT"""
2033 IKE_NODE_SUFFIX = "ip4-natt"
2035 def config_tc(self):
2036 self.config_params({"i_natt": True})
2039 @tag_fixme_vpp_workers
2040 class TestResponderPsk(TemplateResponder, Ikev2Params):
2041 """test ikev2 responder - pre shared key auth"""
2043 def config_tc(self):
2044 self.config_params()
2047 @tag_fixme_vpp_workers
2048 class TestResponderDpd(TestResponderPsk):
2050 Dead peer detection test
2053 def config_tc(self):
2054 self.config_params({"dpd_disabled": False})
2059 def test_responder(self):
2060 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2061 super(TestResponderDpd, self).test_responder()
2062 self.pg0.enable_capture()
2064 # capture empty request but don't reply
2065 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2066 ih = self.get_ike_header(capture[0])
2067 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2068 plain = self.sa.hmac_and_decrypt(ih)
2069 self.assertEqual(plain, b"")
2070 # wait for SA expiration
2072 ike_sas = self.vapi.ikev2_sa_dump()
2073 self.assertEqual(len(ike_sas), 0)
2074 ipsec_sas = self.vapi.ipsec_sa_dump()
2075 self.assertEqual(len(ipsec_sas), 0)
2078 @tag_fixme_vpp_workers
2079 class TestResponderRekey(TestResponderPsk):
2080 """test ikev2 responder - rekey"""
2082 def send_rekey_from_initiator(self):
2083 packet = self.create_rekey_request()
2084 self.pg0.add_stream(packet)
2085 self.pg0.enable_capture()
2087 capture = self.pg0.get_capture(1)
2090 def process_rekey_response(self, capture):
2091 ih = self.get_ike_header(capture[0])
2092 plain = self.sa.hmac_and_decrypt(ih)
2093 sa = ikev2.IKEv2_payload_SA(plain)
2094 prop = sa[ikev2.IKEv2_payload_Proposal]
2095 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2096 # update new responder SPI
2097 self.sa.child_sas[0].rspi = prop.SPI
2099 def test_responder(self):
2100 super(TestResponderRekey, self).test_responder()
2101 self.process_rekey_response(self.send_rekey_from_initiator())
2102 self.sa.calc_child_keys()
2103 self.verify_ike_sas()
2104 self.verify_ipsec_sas(is_rekey=True)
2105 self.assert_counter(1, "rekey_req", "ip4")
2106 r = self.vapi.ikev2_sa_dump()
2107 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2110 @tag_fixme_vpp_workers
2111 class TestResponderRekeyRepeat(TestResponderRekey):
2112 """test ikev2 responder - rekey repeat"""
2114 def test_responder(self):
2115 super(TestResponderRekeyRepeat, self).test_responder()
2116 # rekey request is not accepted until old IPsec SA is expired
2117 capture = self.send_rekey_from_initiator()
2118 ih = self.get_ike_header(capture[0])
2119 plain = self.sa.hmac_and_decrypt(ih)
2120 notify = ikev2.IKEv2_payload_Notify(plain)
2121 self.assertEqual(notify.type, 43)
2122 self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2123 # rekey request is accepted after old IPsec SA was expired
2125 if len(self.vapi.ipsec_sa_dump()) != 3:
2129 self.fail("old IPsec SA not expired")
2130 self.process_rekey_response(self.send_rekey_from_initiator())
2131 self.sa.calc_child_keys()
2132 self.verify_ike_sas()
2133 self.verify_ipsec_sas(sa_count=3)
2136 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2137 """test ikev2 responder - non-default table id"""
2140 def setUpClass(cls):
2141 import scapy.contrib.ikev2 as _ikev2
2143 globals()["ikev2"] = _ikev2
2144 super(IkePeer, cls).setUpClass()
2145 cls.create_pg_interfaces(range(1))
2146 cls.vapi.cli("ip table add 1")
2147 cls.vapi.cli("set interface ip table pg0 1")
2148 for i in cls.pg_interfaces:
2155 def config_tc(self):
2156 self.config_params({"dpd_disabled": False})
2158 def test_responder(self):
2159 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2160 super(TestResponderVrf, self).test_responder()
2161 self.pg0.enable_capture()
2163 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2164 ih = self.get_ike_header(capture[0])
2165 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2166 plain = self.sa.hmac_and_decrypt(ih)
2167 self.assertEqual(plain, b"")
2170 @tag_fixme_vpp_workers
2171 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2172 """test ikev2 responder - cert based auth"""
2174 def config_tc(self):
2179 "server-key": "server-key.pem",
2180 "client-key": "client-key.pem",
2181 "client-cert": "client-cert.pem",
2182 "server-cert": "server-cert.pem",
2187 @tag_fixme_vpp_workers
2188 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2189 TemplateResponder, Ikev2Params
2192 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2195 def config_tc(self):
2198 "ike-crypto": ("AES-CBC", 16),
2199 "ike-integ": "SHA2-256-128",
2200 "esp-crypto": ("AES-CBC", 24),
2201 "esp-integ": "SHA2-384-192",
2202 "ike-dh": "2048MODPgr",
2203 "nonce": os.urandom(256),
2204 "no_idr_in_auth": True,
2209 @tag_fixme_vpp_workers
2210 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2211 TemplateResponder, Ikev2Params
2215 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2218 def config_tc(self):
2221 "ike-crypto": ("AES-CBC", 32),
2222 "ike-integ": "SHA2-256-128",
2223 "esp-crypto": ("AES-GCM-16ICV", 32),
2224 "esp-integ": "NULL",
2225 "ike-dh": "3072MODPgr",
2230 @tag_fixme_vpp_workers
2231 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2236 IKE_NODE_SUFFIX = "ip6"
2238 def config_tc(self):
2241 "del_sa_from_responder": True,
2244 "ike-crypto": ("AES-GCM-16ICV", 32),
2245 "ike-integ": "NULL",
2246 "ike-dh": "2048MODPgr",
2247 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2248 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2253 @tag_fixme_vpp_workers
2254 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2256 Test for keep alive messages
2259 def send_empty_req_from_responder(self):
2260 packet = self.create_empty_request()
2261 self.pg0.add_stream(packet)
2262 self.pg0.enable_capture()
2264 capture = self.pg0.get_capture(1)
2265 ih = self.get_ike_header(capture[0])
2266 self.assertEqual(ih.id, self.sa.msg_id)
2267 plain = self.sa.hmac_and_decrypt(ih)
2268 self.assertEqual(plain, b"")
2269 self.assert_counter(1, "keepalive", "ip4")
2270 r = self.vapi.ikev2_sa_dump()
2271 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2273 def test_initiator(self):
2274 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2275 self.send_empty_req_from_responder()
2278 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2279 """malformed packet test"""
2284 def config_tc(self):
2285 self.config_params()
2287 def create_ike_init_msg(self, length=None, payload=None):
2290 init_SPI="\x11" * 8,
2292 exch_type="IKE_SA_INIT",
2294 if payload is not None:
2296 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2298 def verify_bad_packet_length(self):
2299 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2300 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2301 self.assert_counter(self.pkt_count, "bad_length")
2303 def verify_bad_sa_payload_length(self):
2304 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2305 ike_msg = self.create_ike_init_msg(payload=p)
2306 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2307 self.assert_counter(self.pkt_count, "malformed_packet")
2309 def test_responder(self):
2310 self.pkt_count = 254
2311 self.verify_bad_packet_length()
2312 self.verify_bad_sa_payload_length()
2315 if __name__ == "__main__":
2316 unittest.main(testRunner=VppTestRunner)