3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
32 KEY_PAD = b"Key Pad for IKEv2"
39 # tuple structure is (p, g, key_len)
44 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
45 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
46 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
47 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
48 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
49 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
50 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
51 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
52 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
53 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
54 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
62 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
63 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
64 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
65 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
66 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
67 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
68 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
69 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
70 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
71 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
72 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
73 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
74 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
75 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
76 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
77 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
85 class CryptoAlgo(object):
86 def __init__(self, name, cipher, mode):
90 if self.cipher is not None:
91 self.bs = self.cipher.block_size // 8
93 if self.name == "AES-GCM-16ICV":
94 self.iv_len = GCM_IV_SIZE
98 def encrypt(self, data, key, aad=None):
99 iv = os.urandom(self.iv_len)
102 self.cipher(key), self.mode(iv), default_backend()
104 return iv + encryptor.update(data) + encryptor.finalize()
106 salt = key[-SALT_SIZE:]
109 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
111 encryptor.authenticate_additional_data(aad)
112 data = encryptor.update(data) + encryptor.finalize()
113 data += encryptor.tag[:GCM_ICV_SIZE]
116 def decrypt(self, data, key, aad=None, icv=None):
118 iv = data[: self.iv_len]
119 ct = data[self.iv_len :]
121 algorithms.AES(key), self.mode(iv), default_backend()
123 return decryptor.update(ct) + decryptor.finalize()
125 salt = key[-SALT_SIZE:]
126 nonce = salt + data[:GCM_IV_SIZE]
127 ct = data[GCM_IV_SIZE:]
128 key = key[:-SALT_SIZE]
130 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
132 decryptor.authenticate_additional_data(aad)
133 return decryptor.update(ct) + decryptor.finalize()
136 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
137 data = data + b"\x00" * (pad_len - 1)
138 return data + bytes([pad_len - 1])
141 class AuthAlgo(object):
142 def __init__(self, name, mac, mod, key_len, trunc_len=None):
146 self.key_len = key_len
147 self.trunc_len = trunc_len or key_len
151 "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
152 "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
153 "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
157 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
158 "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
159 "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
160 "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
161 "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
165 "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166 "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
182 class IKEv2ChildSA(object):
183 def __init__(self, local_ts, remote_ts, is_initiator):
191 self.local_ts = local_ts
192 self.remote_ts = remote_ts
195 class IKEv2SA(object):
202 spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
208 auth_method="shared-key",
214 self.udp_encap = udp_encap
224 self.dh_params = None
226 self.priv_key = priv_key
227 self.is_initiator = is_initiator
228 nonce = nonce or os.urandom(32)
229 self.auth_data = auth_data
232 if isinstance(id_type, str):
233 self.id_type = IDType.value(id_type)
235 self.id_type = id_type
236 self.auth_method = auth_method
237 if self.is_initiator:
238 self.rspi = 8 * b"\x00"
243 self.ispi = 8 * b"\x00"
245 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
247 def new_msg_id(self):
252 def my_dh_pub_key(self):
253 if self.is_initiator:
254 return self.i_dh_data
255 return self.r_dh_data
258 def peer_dh_pub_key(self):
259 if self.is_initiator:
260 return self.r_dh_data
261 return self.i_dh_data
265 return self.i_natt or self.r_natt
267 def compute_secret(self):
268 priv = self.dh_private_key
269 peer = self.peer_dh_pub_key
270 p, g, l = self.ike_group
272 int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
275 def generate_dh_data(self):
277 if self.ike_dh not in DH:
278 raise NotImplementedError("%s not in DH group" % self.ike_dh)
280 if self.dh_params is None:
281 dhg = DH[self.ike_dh]
282 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
283 self.dh_params = pn.parameters(default_backend())
285 priv = self.dh_params.generate_private_key()
286 pub = priv.public_key()
287 x = priv.private_numbers().x
288 self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
289 y = pub.public_numbers().y
291 if self.is_initiator:
292 self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
294 self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
296 def complete_dh_data(self):
297 self.dh_shared_secret = self.compute_secret()
299 def calc_child_keys(self):
300 prf = self.ike_prf_alg.mod()
301 s = self.i_nonce + self.r_nonce
302 c = self.child_sas[0]
304 encr_key_len = self.esp_crypto_key_len
305 integ_key_len = self.esp_integ_alg.key_len
306 salt_len = 0 if integ_key_len else 4
308 l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
309 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
312 c.sk_ei = keymat[pos : pos + encr_key_len]
316 c.sk_ai = keymat[pos : pos + integ_key_len]
319 c.salt_ei = keymat[pos : pos + salt_len]
322 c.sk_er = keymat[pos : pos + encr_key_len]
326 c.sk_ar = keymat[pos : pos + integ_key_len]
329 c.salt_er = keymat[pos : pos + salt_len]
332 def calc_prfplus(self, prf, key, seed, length):
336 while len(r) < length and x < 255:
341 s = s + seed + bytes([x])
342 t = self.calc_prf(prf, key, s)
350 def calc_prf(self, prf, key, data):
351 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
356 prf = self.ike_prf_alg.mod()
357 # SKEYSEED = prf(Ni | Nr, g^ir)
358 s = self.i_nonce + self.r_nonce
359 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
361 # calculate S = Ni | Nr | SPIi SPIr
362 s = s + self.ispi + self.rspi
364 prf_key_trunc = self.ike_prf_alg.trunc_len
365 encr_key_len = self.ike_crypto_key_len
366 tr_prf_key_len = self.ike_prf_alg.key_len
367 integ_key_len = self.ike_integ_alg.key_len
368 if integ_key_len == 0:
380 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
383 self.sk_d = keymat[: pos + prf_key_trunc]
386 self.sk_ai = keymat[pos : pos + integ_key_len]
388 self.sk_ar = keymat[pos : pos + integ_key_len]
391 self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
392 pos += encr_key_len + salt_size
393 self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
394 pos += encr_key_len + salt_size
396 self.sk_pi = keymat[pos : pos + tr_prf_key_len]
397 pos += tr_prf_key_len
398 self.sk_pr = keymat[pos : pos + tr_prf_key_len]
400 def generate_authmsg(self, prf, packet):
401 if self.is_initiator:
409 data = bytes([self.id_type, 0, 0, 0]) + id
410 id_hash = self.calc_prf(prf, key, data)
411 return packet + nonce + id_hash
414 prf = self.ike_prf_alg.mod()
415 if self.is_initiator:
416 packet = self.init_req_packet
418 packet = self.init_resp_packet
419 authmsg = self.generate_authmsg(prf, raw(packet))
420 if self.auth_method == "shared-key":
421 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
422 self.auth_data = self.calc_prf(prf, psk, authmsg)
423 elif self.auth_method == "rsa-sig":
424 self.auth_data = self.priv_key.sign(
425 authmsg, padding.PKCS1v15(), hashes.SHA1()
428 raise TypeError("unknown auth method type!")
430 def encrypt(self, data, aad=None):
431 data = self.ike_crypto_alg.pad(data)
432 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
435 def peer_authkey(self):
436 if self.is_initiator:
441 def my_authkey(self):
442 if self.is_initiator:
447 def my_cryptokey(self):
448 if self.is_initiator:
453 def peer_cryptokey(self):
454 if self.is_initiator:
458 def concat(self, alg, key_len):
459 return alg + "-" + str(key_len * 8)
462 def vpp_ike_cypto_alg(self):
463 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
466 def vpp_esp_cypto_alg(self):
467 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
469 def verify_hmac(self, ikemsg):
470 integ_trunc = self.ike_integ_alg.trunc_len
471 exp_hmac = ikemsg[-integ_trunc:]
472 data = ikemsg[:-integ_trunc]
473 computed_hmac = self.compute_hmac(
474 self.ike_integ_alg.mod(), self.peer_authkey, data
476 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
478 def compute_hmac(self, integ, key, data):
479 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
483 def decrypt(self, data, aad=None, icv=None):
484 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
486 def hmac_and_decrypt(self, ike):
487 ep = ike[ikev2.IKEv2_payload_Encrypted]
488 if self.ike_crypto == "AES-GCM-16ICV":
489 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
490 ct = ep.load[:-GCM_ICV_SIZE]
491 tag = ep.load[-GCM_ICV_SIZE:]
492 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
494 self.verify_hmac(raw(ike))
495 integ_trunc = self.ike_integ_alg.trunc_len
497 # remove ICV and decrypt payload
498 ct = ep.load[:-integ_trunc]
499 plain = self.decrypt(ct)
502 return plain[: -pad_len - 1]
504 def build_ts_addr(self, ts, version):
506 "starting_address_v" + version: ts["start_addr"],
507 "ending_address_v" + version: ts["end_addr"],
510 def generate_ts(self, is_ip4):
511 c = self.child_sas[0]
512 ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
514 ts_data.update(self.build_ts_addr(c.local_ts, "4"))
515 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
516 ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
517 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
519 ts_data.update(self.build_ts_addr(c.local_ts, "6"))
520 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
521 ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
522 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
524 if self.is_initiator:
525 return ([ts1], [ts2])
526 return ([ts2], [ts1])
528 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
529 if crypto not in CRYPTO_ALGOS:
530 raise TypeError("unsupported encryption algo %r" % crypto)
531 self.ike_crypto = crypto
532 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
533 self.ike_crypto_key_len = crypto_key_len
535 if integ not in AUTH_ALGOS:
536 raise TypeError("unsupported auth algo %r" % integ)
537 self.ike_integ = None if integ == "NULL" else integ
538 self.ike_integ_alg = AUTH_ALGOS[integ]
540 if prf not in PRF_ALGOS:
541 raise TypeError("unsupported prf algo %r" % prf)
543 self.ike_prf_alg = PRF_ALGOS[prf]
545 self.ike_group = DH[self.ike_dh]
547 def set_esp_props(self, crypto, crypto_key_len, integ):
548 self.esp_crypto_key_len = crypto_key_len
549 if crypto not in CRYPTO_ALGOS:
550 raise TypeError("unsupported encryption algo %r" % crypto)
551 self.esp_crypto = crypto
552 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
554 if integ not in AUTH_ALGOS:
555 raise TypeError("unsupported auth algo %r" % integ)
556 self.esp_integ = None if integ == "NULL" else integ
557 self.esp_integ_alg = AUTH_ALGOS[integ]
559 def crypto_attr(self, key_len):
560 if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
561 return (0x800E << 16 | key_len << 3, 12)
563 raise Exception("unsupported attribute type")
565 def ike_crypto_attr(self):
566 return self.crypto_attr(self.ike_crypto_key_len)
568 def esp_crypto_attr(self):
569 return self.crypto_attr(self.esp_crypto_key_len)
571 def compute_nat_sha1(self, ip, port, rspi=None):
574 data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
575 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
577 return digest.finalize()
580 class IkePeer(VppTestCase):
581 """common class for initiator and responder"""
585 import scapy.contrib.ikev2 as _ikev2
587 globals()["ikev2"] = _ikev2
588 super(IkePeer, cls).setUpClass()
589 cls.create_pg_interfaces(range(2))
590 for i in cls.pg_interfaces:
598 def tearDownClass(cls):
599 super(IkePeer, cls).tearDownClass()
602 super(IkePeer, self).tearDown()
603 if self.del_sa_from_responder:
604 self.initiate_del_sa_from_responder()
606 self.initiate_del_sa_from_initiator()
607 r = self.vapi.ikev2_sa_dump()
608 self.assertEqual(len(r), 0)
609 sas = self.vapi.ipsec_sa_dump()
610 self.assertEqual(len(sas), 0)
611 self.p.remove_vpp_config()
612 self.assertIsNone(self.p.query_vpp_config())
615 super(IkePeer, self).setUp()
617 self.p.add_vpp_config()
618 self.assertIsNotNone(self.p.query_vpp_config())
619 if self.sa.is_initiator:
620 self.sa.generate_dh_data()
621 self.vapi.cli("ikev2 set logging level 4")
622 self.vapi.cli("event-lo clear")
624 def assert_counter(self, count, name, version="ip4"):
625 node_name = "/err/ikev2-%s/" % version + name
626 self.assertEqual(count, self.statistics.get_err_counter(node_name))
628 def create_rekey_request(self):
629 sa, first_payload = self.generate_auth_payload(is_rekey=True)
630 header = ikev2.IKEv2(
631 init_SPI=self.sa.ispi,
632 resp_SPI=self.sa.rspi,
633 id=self.sa.new_msg_id(),
635 exch_type="CREATE_CHILD_SA",
638 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
639 return self.create_packet(
640 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
643 def create_empty_request(self):
644 header = ikev2.IKEv2(
645 init_SPI=self.sa.ispi,
646 resp_SPI=self.sa.rspi,
647 id=self.sa.new_msg_id(),
649 exch_type="INFORMATIONAL",
650 next_payload="Encrypted",
653 msg = self.encrypt_ike_msg(header, b"", None)
654 return self.create_packet(
655 self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
659 self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
662 src_ip = src_if.remote_ip6
663 dst_ip = src_if.local_ip6
666 src_ip = src_if.remote_ip4
667 dst_ip = src_if.local_ip4
670 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
671 / ip_layer(src=src_ip, dst=dst_ip)
672 / UDP(sport=sport, dport=dport)
675 # insert non ESP marker
676 res = res / Raw(b"\x00" * 4)
679 def verify_udp(self, udp):
680 self.assertEqual(udp.sport, self.sa.sport)
681 self.assertEqual(udp.dport, self.sa.dport)
683 def get_ike_header(self, packet):
685 ih = packet[ikev2.IKEv2]
686 ih = self.verify_and_remove_non_esp_marker(ih)
687 except IndexError as e:
688 # this is a workaround for getting IKEv2 layer as both ikev2 and
689 # ipsec register for port 4500
691 ih = self.verify_and_remove_non_esp_marker(esp)
692 self.assertEqual(ih.version, 0x20)
693 self.assertNotIn("Version", ih.flags)
696 def verify_and_remove_non_esp_marker(self, packet):
698 # if we are in nat traversal mode check for non esp marker
701 self.assertEqual(data[:4], b"\x00" * 4)
702 return ikev2.IKEv2(data[4:])
706 def encrypt_ike_msg(self, header, plain, first_payload):
707 if self.sa.ike_crypto == "AES-GCM-16ICV":
708 data = self.sa.ike_crypto_alg.pad(raw(plain))
713 + len(ikev2.IKEv2_payload_Encrypted())
715 tlen = plen + len(ikev2.IKEv2())
718 sk_p = ikev2.IKEv2_payload_Encrypted(
719 next_payload=first_payload, length=plen
723 encr = self.sa.encrypt(raw(plain), raw(res))
724 sk_p = ikev2.IKEv2_payload_Encrypted(
725 next_payload=first_payload, length=plen, load=encr
729 encr = self.sa.encrypt(raw(plain))
730 trunc_len = self.sa.ike_integ_alg.trunc_len
731 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
732 tlen = plen + len(ikev2.IKEv2())
734 sk_p = ikev2.IKEv2_payload_Encrypted(
735 next_payload=first_payload, length=plen, load=encr
740 integ_data = raw(res)
741 hmac_data = self.sa.compute_hmac(
742 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
744 res = res / Raw(hmac_data[:trunc_len])
745 assert len(res) == tlen
748 def verify_udp_encap(self, ipsec_sa):
749 e = VppEnum.vl_api_ipsec_sad_flags_t
750 if self.sa.udp_encap or self.sa.natt:
751 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
753 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
755 def verify_ipsec_sas(self, is_rekey=False):
756 sas = self.vapi.ipsec_sa_dump()
758 # after rekey there is a short period of time in which old
759 # inbound SA is still present
763 self.assertEqual(len(sas), sa_count)
764 if self.sa.is_initiator:
779 c = self.sa.child_sas[0]
781 self.verify_udp_encap(sa0)
782 self.verify_udp_encap(sa1)
783 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
784 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
785 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
787 if self.sa.esp_integ is None:
790 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
791 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
792 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
795 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
796 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
797 self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
798 self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
802 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
803 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
804 self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
805 self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
807 self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
808 self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
810 def verify_keymat(self, api_keys, keys, name):
811 km = getattr(keys, name)
812 api_km = getattr(api_keys, name)
813 api_km_len = getattr(api_keys, name + "_len")
814 self.assertEqual(len(km), api_km_len)
815 self.assertEqual(km, api_km[:api_km_len])
817 def verify_id(self, api_id, exp_id):
818 self.assertEqual(api_id.type, IDType.value(exp_id.type))
819 self.assertEqual(api_id.data_len, exp_id.data_len)
820 self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
822 def verify_ike_sas(self):
823 r = self.vapi.ikev2_sa_dump()
824 self.assertEqual(len(r), 1)
826 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
827 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
829 if self.sa.is_initiator:
830 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
831 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
833 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
834 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
836 if self.sa.is_initiator:
837 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
838 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
840 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
841 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
842 self.verify_keymat(sa.keys, self.sa, "sk_d")
843 self.verify_keymat(sa.keys, self.sa, "sk_ai")
844 self.verify_keymat(sa.keys, self.sa, "sk_ar")
845 self.verify_keymat(sa.keys, self.sa, "sk_ei")
846 self.verify_keymat(sa.keys, self.sa, "sk_er")
847 self.verify_keymat(sa.keys, self.sa, "sk_pi")
848 self.verify_keymat(sa.keys, self.sa, "sk_pr")
850 self.assertEqual(sa.i_id.type, self.sa.id_type)
851 self.assertEqual(sa.r_id.type, self.sa.id_type)
852 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
853 self.assertEqual(sa.r_id.data_len, len(self.idr))
854 self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
855 self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
857 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
858 self.assertEqual(len(r), 1)
860 self.assertEqual(csa.sa_index, sa.sa_index)
861 c = self.sa.child_sas[0]
862 if hasattr(c, "sk_ai"):
863 self.verify_keymat(csa.keys, c, "sk_ai")
864 self.verify_keymat(csa.keys, c, "sk_ar")
865 self.verify_keymat(csa.keys, c, "sk_ei")
866 self.verify_keymat(csa.keys, c, "sk_er")
867 self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
868 self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
870 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
873 r = self.vapi.ikev2_traffic_selector_dump(
874 is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
876 self.assertEqual(len(r), 1)
878 self.verify_ts(r[0].ts, tsi[0], True)
880 r = self.vapi.ikev2_traffic_selector_dump(
881 is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
883 self.assertEqual(len(r), 1)
884 self.verify_ts(r[0].ts, tsr[0], False)
886 n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
887 self.verify_nonce(n, self.sa.i_nonce)
888 n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
889 self.verify_nonce(n, self.sa.r_nonce)
891 def verify_nonce(self, api_nonce, nonce):
892 self.assertEqual(api_nonce.data_len, len(nonce))
893 self.assertEqual(api_nonce.nonce, nonce)
895 def verify_ts(self, api_ts, ts, is_initiator):
897 self.assertTrue(api_ts.is_local)
899 self.assertFalse(api_ts.is_local)
902 self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
903 self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
905 self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
906 self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
907 self.assertEqual(api_ts.start_port, ts.start_port)
908 self.assertEqual(api_ts.end_port, ts.end_port)
909 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
912 class TemplateInitiator(IkePeer):
913 """initiator test template"""
915 def initiate_del_sa_from_initiator(self):
916 ispi = int.from_bytes(self.sa.ispi, "little")
917 self.pg0.enable_capture()
919 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
920 capture = self.pg0.get_capture(1)
921 ih = self.get_ike_header(capture[0])
922 self.assertNotIn("Response", ih.flags)
923 self.assertIn("Initiator", ih.flags)
924 self.assertEqual(ih.init_SPI, self.sa.ispi)
925 self.assertEqual(ih.resp_SPI, self.sa.rspi)
926 plain = self.sa.hmac_and_decrypt(ih)
927 d = ikev2.IKEv2_payload_Delete(plain)
928 self.assertEqual(d.proto, 1) # proto=IKEv2
929 header = ikev2.IKEv2(
930 init_SPI=self.sa.ispi,
931 resp_SPI=self.sa.rspi,
933 exch_type="INFORMATIONAL",
935 next_payload="Encrypted",
937 resp = self.encrypt_ike_msg(header, b"", None)
938 self.send_and_assert_no_replies(self.pg0, resp)
940 def verify_del_sa(self, packet):
941 ih = self.get_ike_header(packet)
942 self.assertEqual(ih.id, self.sa.msg_id)
943 self.assertEqual(ih.exch_type, 37) # exchange informational
944 self.assertIn("Response", ih.flags)
945 self.assertIn("Initiator", ih.flags)
946 plain = self.sa.hmac_and_decrypt(ih)
947 self.assertEqual(plain, b"")
949 def initiate_del_sa_from_responder(self):
950 header = ikev2.IKEv2(
951 init_SPI=self.sa.ispi,
952 resp_SPI=self.sa.rspi,
953 exch_type="INFORMATIONAL",
954 id=self.sa.new_msg_id(),
956 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
957 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
958 packet = self.create_packet(
959 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
961 self.pg0.add_stream(packet)
962 self.pg0.enable_capture()
964 capture = self.pg0.get_capture(1)
965 self.verify_del_sa(capture[0])
968 def find_notify_payload(packet, notify_type):
969 n = packet[ikev2.IKEv2_payload_Notify]
971 if n.type == notify_type:
976 def verify_nat_detection(self, packet):
983 # NAT_DETECTION_SOURCE_IP
984 s = self.find_notify_payload(packet, 16388)
985 self.assertIsNotNone(s)
986 src_sha = self.sa.compute_nat_sha1(
987 inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
989 self.assertEqual(s.load, src_sha)
991 # NAT_DETECTION_DESTINATION_IP
992 s = self.find_notify_payload(packet, 16389)
993 self.assertIsNotNone(s)
994 dst_sha = self.sa.compute_nat_sha1(
995 inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
997 self.assertEqual(s.load, dst_sha)
999 def verify_sa_init_request(self, packet):
1001 self.sa.dport = udp.sport
1002 ih = packet[ikev2.IKEv2]
1003 self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1004 self.assertEqual(ih.exch_type, 34) # SA_INIT
1005 self.sa.ispi = ih.init_SPI
1006 self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1007 self.assertIn("Initiator", ih.flags)
1008 self.assertNotIn("Response", ih.flags)
1009 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1010 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1012 prop = packet[ikev2.IKEv2_payload_Proposal]
1013 self.assertEqual(prop.proto, 1) # proto = ikev2
1014 self.assertEqual(prop.proposal, 1)
1015 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
1017 prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1019 self.assertEqual(prop.trans[1].transform_type, 2) # prf
1020 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
1021 self.assertEqual(prop.trans[2].transform_type, 4) # dh
1022 self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1024 self.verify_nat_detection(packet)
1025 self.sa.set_ike_props(
1026 crypto="AES-GCM-16ICV",
1029 prf="PRF_HMAC_SHA2_256",
1032 self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1033 self.sa.generate_dh_data()
1034 self.sa.complete_dh_data()
1037 def update_esp_transforms(self, trans, sa):
1039 if trans.transform_type == 1: # ecryption
1040 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1041 elif trans.transform_type == 3: # integrity
1042 sa.esp_integ = INTEG_IDS[trans.transform_id]
1043 trans = trans.payload
1045 def verify_sa_auth_req(self, packet):
1047 self.sa.dport = udp.sport
1048 ih = self.get_ike_header(packet)
1049 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1050 self.assertEqual(ih.init_SPI, self.sa.ispi)
1051 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
1052 self.assertIn("Initiator", ih.flags)
1053 self.assertNotIn("Response", ih.flags)
1056 self.verify_udp(udp)
1057 self.assertEqual(ih.id, self.sa.msg_id + 1)
1059 plain = self.sa.hmac_and_decrypt(ih)
1060 idi = ikev2.IKEv2_payload_IDi(plain)
1061 self.assertEqual(idi.load, self.sa.i_id)
1062 if self.no_idr_auth:
1063 self.assertEqual(idi.next_payload, 39) # AUTH
1065 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1066 self.assertEqual(idr.load, self.sa.r_id)
1067 prop = idi[ikev2.IKEv2_payload_Proposal]
1068 c = self.sa.child_sas[0]
1070 self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1072 def send_init_response(self):
1073 tr_attr = self.sa.ike_crypto_attr()
1075 ikev2.IKEv2_payload_Transform(
1076 transform_type="Encryption",
1077 transform_id=self.sa.ike_crypto,
1079 key_length=tr_attr[0],
1081 / ikev2.IKEv2_payload_Transform(
1082 transform_type="Integrity", transform_id=self.sa.ike_integ
1084 / ikev2.IKEv2_payload_Transform(
1085 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1087 / ikev2.IKEv2_payload_Transform(
1088 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1091 props = ikev2.IKEv2_payload_Proposal(
1092 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1095 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1097 dst_address = b"\x0a\x0a\x0a\x0a"
1099 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1100 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1101 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1103 self.sa.init_resp_packet = (
1105 init_SPI=self.sa.ispi,
1106 resp_SPI=self.sa.rspi,
1107 exch_type="IKE_SA_INIT",
1110 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1111 / ikev2.IKEv2_payload_KE(
1112 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1114 / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1115 / ikev2.IKEv2_payload_Notify(
1116 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1118 / ikev2.IKEv2_payload_Notify(
1119 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1123 ike_msg = self.create_packet(
1125 self.sa.init_resp_packet,
1131 self.pg_send(self.pg0, ike_msg)
1132 capture = self.pg0.get_capture(1)
1133 self.verify_sa_auth_req(capture[0])
1135 def initiate_sa_init(self):
1136 self.pg0.enable_capture()
1138 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1140 capture = self.pg0.get_capture(1)
1141 self.verify_sa_init_request(capture[0])
1142 self.send_init_response()
1144 def send_auth_response(self):
1145 tr_attr = self.sa.esp_crypto_attr()
1147 ikev2.IKEv2_payload_Transform(
1148 transform_type="Encryption",
1149 transform_id=self.sa.esp_crypto,
1151 key_length=tr_attr[0],
1153 / ikev2.IKEv2_payload_Transform(
1154 transform_type="Integrity", transform_id=self.sa.esp_integ
1156 / ikev2.IKEv2_payload_Transform(
1157 transform_type="Extended Sequence Number", transform_id="No ESN"
1159 / ikev2.IKEv2_payload_Transform(
1160 transform_type="Extended Sequence Number", transform_id="ESN"
1164 c = self.sa.child_sas[0]
1165 props = ikev2.IKEv2_payload_Proposal(
1166 proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1169 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1171 ikev2.IKEv2_payload_IDi(
1172 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1174 / ikev2.IKEv2_payload_IDr(
1175 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1177 / ikev2.IKEv2_payload_AUTH(
1179 auth_type=AuthMethod.value(self.sa.auth_method),
1180 load=self.sa.auth_data,
1182 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1183 / ikev2.IKEv2_payload_TSi(
1184 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1186 / ikev2.IKEv2_payload_TSr(
1187 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1189 / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1192 header = ikev2.IKEv2(
1193 init_SPI=self.sa.ispi,
1194 resp_SPI=self.sa.rspi,
1195 id=self.sa.new_msg_id(),
1197 exch_type="IKE_AUTH",
1200 ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1201 packet = self.create_packet(
1202 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1204 self.pg_send(self.pg0, packet)
1206 def test_initiator(self):
1207 self.initiate_sa_init()
1209 self.sa.calc_child_keys()
1210 self.send_auth_response()
1211 self.verify_ike_sas()
1214 class TemplateResponder(IkePeer):
1215 """responder test template"""
1217 def initiate_del_sa_from_responder(self):
1218 self.pg0.enable_capture()
1220 self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1221 capture = self.pg0.get_capture(1)
1222 ih = self.get_ike_header(capture[0])
1223 self.assertNotIn("Response", ih.flags)
1224 self.assertNotIn("Initiator", ih.flags)
1225 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1226 plain = self.sa.hmac_and_decrypt(ih)
1227 d = ikev2.IKEv2_payload_Delete(plain)
1228 self.assertEqual(d.proto, 1) # proto=IKEv2
1229 self.assertEqual(ih.init_SPI, self.sa.ispi)
1230 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1231 header = ikev2.IKEv2(
1232 init_SPI=self.sa.ispi,
1233 resp_SPI=self.sa.rspi,
1234 flags="Initiator+Response",
1235 exch_type="INFORMATIONAL",
1237 next_payload="Encrypted",
1239 resp = self.encrypt_ike_msg(header, b"", None)
1240 self.send_and_assert_no_replies(self.pg0, resp)
1242 def verify_del_sa(self, packet):
1243 ih = self.get_ike_header(packet)
1244 self.assertEqual(ih.id, self.sa.msg_id)
1245 self.assertEqual(ih.exch_type, 37) # exchange informational
1246 self.assertIn("Response", ih.flags)
1247 self.assertNotIn("Initiator", ih.flags)
1248 self.assertEqual(ih.next_payload, 46) # Encrypted
1249 self.assertEqual(ih.init_SPI, self.sa.ispi)
1250 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1251 plain = self.sa.hmac_and_decrypt(ih)
1252 self.assertEqual(plain, b"")
1254 def initiate_del_sa_from_initiator(self):
1255 header = ikev2.IKEv2(
1256 init_SPI=self.sa.ispi,
1257 resp_SPI=self.sa.rspi,
1259 exch_type="INFORMATIONAL",
1260 id=self.sa.new_msg_id(),
1262 del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1263 ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1264 packet = self.create_packet(
1265 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1267 self.pg0.add_stream(packet)
1268 self.pg0.enable_capture()
1270 capture = self.pg0.get_capture(1)
1271 self.verify_del_sa(capture[0])
1273 def send_sa_init_req(self):
1274 tr_attr = self.sa.ike_crypto_attr()
1276 ikev2.IKEv2_payload_Transform(
1277 transform_type="Encryption",
1278 transform_id=self.sa.ike_crypto,
1280 key_length=tr_attr[0],
1282 / ikev2.IKEv2_payload_Transform(
1283 transform_type="Integrity", transform_id=self.sa.ike_integ
1285 / ikev2.IKEv2_payload_Transform(
1286 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1288 / ikev2.IKEv2_payload_Transform(
1289 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1293 props = ikev2.IKEv2_payload_Proposal(
1294 proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1297 next_payload = None if self.ip6 else "Notify"
1299 self.sa.init_req_packet = (
1301 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1303 / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1304 / ikev2.IKEv2_payload_KE(
1305 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1307 / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1312 src_address = b"\x0a\x0a\x0a\x01"
1314 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1317 dst_address = b"\x0a\x0a\x0a\x0a"
1319 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1321 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1322 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1323 nat_src_detection = ikev2.IKEv2_payload_Notify(
1324 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1326 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1327 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1329 self.sa.init_req_packet = (
1330 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1333 ike_msg = self.create_packet(
1335 self.sa.init_req_packet,
1341 self.pg0.add_stream(ike_msg)
1342 self.pg0.enable_capture()
1344 capture = self.pg0.get_capture(1)
1345 self.verify_sa_init(capture[0])
1347 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1348 tr_attr = self.sa.esp_crypto_attr()
1349 last_payload = last_payload or "Notify"
1351 ikev2.IKEv2_payload_Transform(
1352 transform_type="Encryption",
1353 transform_id=self.sa.esp_crypto,
1355 key_length=tr_attr[0],
1357 / ikev2.IKEv2_payload_Transform(
1358 transform_type="Integrity", transform_id=self.sa.esp_integ
1360 / ikev2.IKEv2_payload_Transform(
1361 transform_type="Extended Sequence Number", transform_id="No ESN"
1363 / ikev2.IKEv2_payload_Transform(
1364 transform_type="Extended Sequence Number", transform_id="ESN"
1368 c = self.sa.child_sas[0]
1369 props = ikev2.IKEv2_payload_Proposal(
1370 proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
1373 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1375 ikev2.IKEv2_payload_AUTH(
1377 auth_type=AuthMethod.value(self.sa.auth_method),
1378 load=self.sa.auth_data,
1380 / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1381 / ikev2.IKEv2_payload_TSi(
1382 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1384 / ikev2.IKEv2_payload_TSr(
1385 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1390 first_payload = "Nonce"
1392 ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
1394 / ikev2.IKEv2_payload_Notify(
1398 length=8 + len(c.ispi),
1399 next_payload="Notify",
1401 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1404 first_payload = "IDi"
1405 if self.no_idr_auth:
1406 ids = ikev2.IKEv2_payload_IDi(
1407 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1410 ids = ikev2.IKEv2_payload_IDi(
1411 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1412 ) / ikev2.IKEv2_payload_IDr(
1413 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1416 return plain, first_payload
1418 def send_sa_auth(self):
1419 plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1420 plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1421 header = ikev2.IKEv2(
1422 init_SPI=self.sa.ispi,
1423 resp_SPI=self.sa.rspi,
1424 id=self.sa.new_msg_id(),
1426 exch_type="IKE_AUTH",
1429 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1430 packet = self.create_packet(
1431 self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1433 self.pg0.add_stream(packet)
1434 self.pg0.enable_capture()
1436 capture = self.pg0.get_capture(1)
1437 self.verify_sa_auth_resp(capture[0])
1439 def verify_sa_init(self, packet):
1440 ih = self.get_ike_header(packet)
1442 self.assertEqual(ih.id, self.sa.msg_id)
1443 self.assertEqual(ih.exch_type, 34)
1444 self.assertIn("Response", ih.flags)
1445 self.assertEqual(ih.init_SPI, self.sa.ispi)
1446 self.assertNotEqual(ih.resp_SPI, 0)
1447 self.sa.rspi = ih.resp_SPI
1449 sa = ih[ikev2.IKEv2_payload_SA]
1450 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1451 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1452 except IndexError as e:
1453 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1454 self.logger.error(ih.show())
1456 self.sa.complete_dh_data()
1460 def verify_sa_auth_resp(self, packet):
1461 ike = self.get_ike_header(packet)
1463 self.verify_udp(udp)
1464 self.assertEqual(ike.id, self.sa.msg_id)
1465 plain = self.sa.hmac_and_decrypt(ike)
1466 idr = ikev2.IKEv2_payload_IDr(plain)
1467 prop = idr[ikev2.IKEv2_payload_Proposal]
1468 self.assertEqual(prop.SPIsize, 4)
1469 self.sa.child_sas[0].rspi = prop.SPI
1470 self.sa.calc_child_keys()
1472 IKE_NODE_SUFFIX = "ip4"
1474 def verify_counters(self):
1475 self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1476 self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1477 self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1479 r = self.vapi.ikev2_sa_dump()
1481 self.assertEqual(1, s.n_sa_auth_req)
1482 self.assertEqual(1, s.n_sa_init_req)
1484 def test_responder(self):
1485 self.send_sa_init_req()
1487 self.verify_ipsec_sas()
1488 self.verify_ike_sas()
1489 self.verify_counters()
1492 class Ikev2Params(object):
1493 def config_params(self, params={}):
1494 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1495 ei = VppEnum.vl_api_ipsec_integ_alg_t
1497 "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1498 "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1499 "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1500 "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1501 "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1502 "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1503 "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1504 "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1505 "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1506 "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1509 dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1511 self.vapi.cli("ikev2 dpd disable")
1512 self.del_sa_from_responder = (
1514 if "del_sa_from_responder" not in params
1515 else params["del_sa_from_responder"]
1517 i_natt = False if "i_natt" not in params else params["i_natt"]
1518 r_natt = False if "r_natt" not in params else params["r_natt"]
1519 self.p = Profile(self, "pr1")
1520 self.ip6 = False if "ip6" not in params else params["ip6"]
1522 if "auth" in params and params["auth"] == "rsa-sig":
1523 auth_method = "rsa-sig"
1524 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1525 self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1527 client_file = work_dir + params["client-cert"]
1528 server_pem = open(work_dir + params["server-cert"]).read()
1529 client_priv = open(work_dir + params["client-key"]).read()
1530 client_priv = load_pem_private_key(
1531 str.encode(client_priv), None, default_backend()
1533 self.peer_cert = x509.load_pem_x509_certificate(
1534 str.encode(server_pem), default_backend()
1536 self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1539 auth_data = b"$3cr3tpa$$w0rd"
1540 self.p.add_auth(method="shared-key", data=auth_data)
1541 auth_method = "shared-key"
1544 is_init = True if "is_initiator" not in params else params["is_initiator"]
1545 self.no_idr_auth = params.get("no_idr_in_auth", False)
1547 idr = {"id_type": "fqdn", "data": b"vpp.home"}
1548 idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1549 r_id = self.idr = idr["data"]
1550 i_id = self.idi = idi["data"]
1552 # scapy is initiator, VPP is responder
1553 self.p.add_local_id(**idr)
1554 self.p.add_remote_id(**idi)
1555 if self.no_idr_auth:
1558 # VPP is initiator, scapy is responder
1559 self.p.add_local_id(**idi)
1560 if not self.no_idr_auth:
1561 self.p.add_remote_id(**idr)
1564 {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1565 if "loc_ts" not in params
1566 else params["loc_ts"]
1569 {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1570 if "rem_ts" not in params
1571 else params["rem_ts"]
1573 self.p.add_local_ts(**loc_ts)
1574 self.p.add_remote_ts(**rem_ts)
1575 if "responder" in params:
1576 self.p.add_responder(params["responder"])
1577 if "ike_transforms" in params:
1578 self.p.add_ike_transforms(params["ike_transforms"])
1579 if "esp_transforms" in params:
1580 self.p.add_esp_transforms(params["esp_transforms"])
1582 udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1584 self.p.set_udp_encap(True)
1586 if "responder_hostname" in params:
1587 hn = params["responder_hostname"]
1588 self.p.add_responder_hostname(hn)
1590 # configure static dns record
1591 self.vapi.dns_name_server_add_del(
1592 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1594 self.vapi.dns_enable_disable(enable=1)
1596 cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1603 is_initiator=is_init,
1604 id_type=self.p.local_id["id_type"],
1607 priv_key=client_priv,
1608 auth_method=auth_method,
1609 nonce=params.get("nonce"),
1610 auth_data=auth_data,
1611 udp_encap=udp_encap,
1612 local_ts=self.p.remote_ts,
1613 remote_ts=self.p.local_ts,
1618 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1621 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1623 ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1626 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1629 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1632 self.sa.set_ike_props(
1633 crypto=ike_crypto[0],
1634 crypto_key_len=ike_crypto[1],
1636 prf="PRF_HMAC_SHA2_256",
1639 self.sa.set_esp_props(
1640 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1644 class TestApi(VppTestCase):
1645 """Test IKEV2 API"""
1648 def setUpClass(cls):
1649 super(TestApi, cls).setUpClass()
1652 def tearDownClass(cls):
1653 super(TestApi, cls).tearDownClass()
1656 super(TestApi, self).tearDown()
1657 self.p1.remove_vpp_config()
1658 self.p2.remove_vpp_config()
1659 r = self.vapi.ikev2_profile_dump()
1660 self.assertEqual(len(r), 0)
1662 def configure_profile(self, cfg):
1663 p = Profile(self, cfg["name"])
1664 p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1665 p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1666 p.add_local_ts(**cfg["loc_ts"])
1667 p.add_remote_ts(**cfg["rem_ts"])
1668 p.add_responder(cfg["responder"])
1669 p.add_ike_transforms(cfg["ike_ts"])
1670 p.add_esp_transforms(cfg["esp_ts"])
1671 p.add_auth(**cfg["auth"])
1672 p.set_udp_encap(cfg["udp_encap"])
1673 p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1674 if "lifetime_data" in cfg:
1675 p.set_lifetime_data(cfg["lifetime_data"])
1676 if "tun_itf" in cfg:
1677 p.set_tunnel_interface(cfg["tun_itf"])
1678 if "natt_disabled" in cfg and cfg["natt_disabled"]:
1683 def test_profile_api(self):
1684 """test profile dump API"""
1689 "start_addr": "3.3.3.2",
1690 "end_addr": "3.3.3.3",
1696 "start_addr": "4.5.76.80",
1697 "end_addr": "2.3.4.6",
1704 "start_addr": "ab::1",
1705 "end_addr": "ab::4",
1711 "start_addr": "cd::12",
1712 "end_addr": "cd::13",
1718 "natt_disabled": True,
1719 "loc_id": ("fqdn", b"vpp.home"),
1720 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1723 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1726 "crypto_key_size": 32,
1730 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1731 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1733 "ipsec_over_udp_port": 4501,
1736 "lifetime_maxdata": 20192,
1737 "lifetime_jitter": 9,
1743 "loc_id": ("ip4-addr", b"192.168.2.1"),
1744 "rem_id": ("ip6-addr", b"abcd::1"),
1747 "responder": {"sw_if_index": 4, "addr": "def::10"},
1750 "crypto_key_size": 16,
1754 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1755 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1757 "ipsec_over_udp_port": 4600,
1761 self.p1 = self.configure_profile(conf["p1"])
1762 self.p2 = self.configure_profile(conf["p2"])
1764 r = self.vapi.ikev2_profile_dump()
1765 self.assertEqual(len(r), 2)
1766 self.verify_profile(r[0].profile, conf["p1"])
1767 self.verify_profile(r[1].profile, conf["p2"])
1769 def verify_id(self, api_id, cfg_id):
1770 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1771 self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1773 def verify_ts(self, api_ts, cfg_ts):
1774 self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1775 self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1776 self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1777 self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1778 self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1780 def verify_responder(self, api_r, cfg_r):
1781 self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1782 self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1784 def verify_transforms(self, api_ts, cfg_ts):
1785 self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1786 self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1787 self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1789 def verify_ike_transforms(self, api_ts, cfg_ts):
1790 self.verify_transforms(api_ts, cfg_ts)
1791 self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1793 def verify_esp_transforms(self, api_ts, cfg_ts):
1794 self.verify_transforms(api_ts, cfg_ts)
1796 def verify_auth(self, api_auth, cfg_auth):
1797 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1798 self.assertEqual(api_auth.data, cfg_auth["data"])
1799 self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1801 def verify_lifetime_data(self, p, ld):
1802 self.assertEqual(p.lifetime, ld["lifetime"])
1803 self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1804 self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1805 self.assertEqual(p.handover, ld["handover"])
1807 def verify_profile(self, ap, cp):
1808 self.assertEqual(ap.name, cp["name"])
1809 self.assertEqual(ap.udp_encap, cp["udp_encap"])
1810 self.verify_id(ap.loc_id, cp["loc_id"])
1811 self.verify_id(ap.rem_id, cp["rem_id"])
1812 self.verify_ts(ap.loc_ts, cp["loc_ts"])
1813 self.verify_ts(ap.rem_ts, cp["rem_ts"])
1814 self.verify_responder(ap.responder, cp["responder"])
1815 self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1816 self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1817 self.verify_auth(ap.auth, cp["auth"])
1818 natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1819 self.assertTrue(natt_dis == ap.natt_disabled)
1821 if "lifetime_data" in cp:
1822 self.verify_lifetime_data(ap, cp["lifetime_data"])
1823 self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1825 self.assertEqual(ap.tun_itf, cp["tun_itf"])
1827 self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1830 @tag_fixme_vpp_workers
1831 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1832 """test responder - responder behind NAT"""
1834 IKE_NODE_SUFFIX = "ip4-natt"
1836 def config_tc(self):
1837 self.config_params({"r_natt": True})
1840 @tag_fixme_vpp_workers
1841 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1842 """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1844 def config_tc(self):
1848 "is_initiator": False, # seen from test case perspective
1849 # thus vpp is initiator
1851 "sw_if_index": self.pg0.sw_if_index,
1852 "addr": self.pg0.remote_ip4,
1854 "ike-crypto": ("AES-GCM-16ICV", 32),
1855 "ike-integ": "NULL",
1856 "ike-dh": "3072MODPgr",
1858 "crypto_alg": 20, # "aes-gcm-16"
1859 "crypto_key_size": 256,
1860 "dh_group": 15, # "modp-3072"
1863 "crypto_alg": 12, # "aes-cbc"
1864 "crypto_key_size": 256,
1865 # "hmac-sha2-256-128"
1872 @tag_fixme_vpp_workers
1873 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1874 """test ikev2 initiator - pre shared key auth"""
1876 def config_tc(self):
1879 "is_initiator": False, # seen from test case perspective
1880 # thus vpp is initiator
1881 "ike-crypto": ("AES-GCM-16ICV", 32),
1882 "ike-integ": "NULL",
1883 "ike-dh": "3072MODPgr",
1885 "crypto_alg": 20, # "aes-gcm-16"
1886 "crypto_key_size": 256,
1887 "dh_group": 15, # "modp-3072"
1890 "crypto_alg": 12, # "aes-cbc"
1891 "crypto_key_size": 256,
1892 # "hmac-sha2-256-128"
1895 "responder_hostname": {
1896 "hostname": "vpp.responder.org",
1897 "sw_if_index": self.pg0.sw_if_index,
1903 @tag_fixme_vpp_workers
1904 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1905 """test initiator - request window size (1)"""
1907 def rekey_respond(self, req, update_child_sa_data):
1908 ih = self.get_ike_header(req)
1909 plain = self.sa.hmac_and_decrypt(ih)
1910 sa = ikev2.IKEv2_payload_SA(plain)
1911 if update_child_sa_data:
1912 prop = sa[ikev2.IKEv2_payload_Proposal]
1913 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1914 self.sa.r_nonce = self.sa.i_nonce
1915 self.sa.child_sas[0].ispi = prop.SPI
1916 self.sa.child_sas[0].rspi = prop.SPI
1917 self.sa.calc_child_keys()
1919 header = ikev2.IKEv2(
1920 init_SPI=self.sa.ispi,
1921 resp_SPI=self.sa.rspi,
1925 next_payload="Encrypted",
1927 resp = self.encrypt_ike_msg(header, sa, "SA")
1928 packet = self.create_packet(
1929 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1931 self.send_and_assert_no_replies(self.pg0, packet)
1933 def test_initiator(self):
1934 super(TestInitiatorRequestWindowSize, self).test_initiator()
1935 self.pg0.enable_capture()
1937 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1938 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1939 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1940 capture = self.pg0.get_capture(2)
1942 # reply in reverse order
1943 self.rekey_respond(capture[1], True)
1944 self.rekey_respond(capture[0], False)
1946 # verify that only the second request was accepted
1947 self.verify_ike_sas()
1948 self.verify_ipsec_sas(is_rekey=True)
1951 @tag_fixme_vpp_workers
1952 class TestInitiatorRekey(TestInitiatorPsk):
1953 """test ikev2 initiator - rekey"""
1955 def rekey_from_initiator(self):
1956 ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1957 self.pg0.enable_capture()
1959 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1960 capture = self.pg0.get_capture(1)
1961 ih = self.get_ike_header(capture[0])
1962 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1963 self.assertNotIn("Response", ih.flags)
1964 self.assertIn("Initiator", ih.flags)
1965 plain = self.sa.hmac_and_decrypt(ih)
1966 sa = ikev2.IKEv2_payload_SA(plain)
1967 prop = sa[ikev2.IKEv2_payload_Proposal]
1968 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1969 self.sa.r_nonce = self.sa.i_nonce
1970 # update new responder SPI
1971 self.sa.child_sas[0].ispi = prop.SPI
1972 self.sa.child_sas[0].rspi = prop.SPI
1973 self.sa.calc_child_keys()
1974 header = ikev2.IKEv2(
1975 init_SPI=self.sa.ispi,
1976 resp_SPI=self.sa.rspi,
1980 next_payload="Encrypted",
1982 resp = self.encrypt_ike_msg(header, sa, "SA")
1983 packet = self.create_packet(
1984 self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1986 self.send_and_assert_no_replies(self.pg0, packet)
1988 def test_initiator(self):
1989 super(TestInitiatorRekey, self).test_initiator()
1990 self.rekey_from_initiator()
1991 self.verify_ike_sas()
1992 self.verify_ipsec_sas(is_rekey=True)
1995 @tag_fixme_vpp_workers
1996 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1997 """test ikev2 initiator - delete IKE SA from responder"""
1999 def config_tc(self):
2002 "del_sa_from_responder": True,
2003 "is_initiator": False, # seen from test case perspective
2004 # thus vpp is initiator
2006 "sw_if_index": self.pg0.sw_if_index,
2007 "addr": self.pg0.remote_ip4,
2009 "ike-crypto": ("AES-GCM-16ICV", 32),
2010 "ike-integ": "NULL",
2011 "ike-dh": "3072MODPgr",
2013 "crypto_alg": 20, # "aes-gcm-16"
2014 "crypto_key_size": 256,
2015 "dh_group": 15, # "modp-3072"
2018 "crypto_alg": 12, # "aes-cbc"
2019 "crypto_key_size": 256,
2020 # "hmac-sha2-256-128"
2023 "no_idr_in_auth": True,
2028 @tag_fixme_vpp_workers
2029 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2030 """test ikev2 responder - initiator behind NAT"""
2032 IKE_NODE_SUFFIX = "ip4-natt"
2034 def config_tc(self):
2035 self.config_params({"i_natt": True})
2038 @tag_fixme_vpp_workers
2039 class TestResponderPsk(TemplateResponder, Ikev2Params):
2040 """test ikev2 responder - pre shared key auth"""
2042 def config_tc(self):
2043 self.config_params()
2046 @tag_fixme_vpp_workers
2047 class TestResponderDpd(TestResponderPsk):
2049 Dead peer detection test
2052 def config_tc(self):
2053 self.config_params({"dpd_disabled": False})
2058 def test_responder(self):
2059 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2060 super(TestResponderDpd, self).test_responder()
2061 self.pg0.enable_capture()
2063 # capture empty request but don't reply
2064 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2065 ih = self.get_ike_header(capture[0])
2066 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2067 plain = self.sa.hmac_and_decrypt(ih)
2068 self.assertEqual(plain, b"")
2069 # wait for SA expiration
2071 ike_sas = self.vapi.ikev2_sa_dump()
2072 self.assertEqual(len(ike_sas), 0)
2073 ipsec_sas = self.vapi.ipsec_sa_dump()
2074 self.assertEqual(len(ipsec_sas), 0)
2077 @tag_fixme_vpp_workers
2078 class TestResponderRekey(TestResponderPsk):
2079 """test ikev2 responder - rekey"""
2081 def rekey_from_initiator(self):
2082 packet = self.create_rekey_request()
2083 self.pg0.add_stream(packet)
2084 self.pg0.enable_capture()
2086 capture = self.pg0.get_capture(1)
2087 ih = self.get_ike_header(capture[0])
2088 plain = self.sa.hmac_and_decrypt(ih)
2089 sa = ikev2.IKEv2_payload_SA(plain)
2090 prop = sa[ikev2.IKEv2_payload_Proposal]
2091 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2092 # update new responder SPI
2093 self.sa.child_sas[0].rspi = prop.SPI
2095 def test_responder(self):
2096 super(TestResponderRekey, self).test_responder()
2097 self.rekey_from_initiator()
2098 self.sa.calc_child_keys()
2099 self.verify_ike_sas()
2100 self.verify_ipsec_sas(is_rekey=True)
2101 self.assert_counter(1, "rekey_req", "ip4")
2102 r = self.vapi.ikev2_sa_dump()
2103 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2106 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2107 """test ikev2 responder - non-default table id"""
2110 def setUpClass(cls):
2111 import scapy.contrib.ikev2 as _ikev2
2113 globals()["ikev2"] = _ikev2
2114 super(IkePeer, cls).setUpClass()
2115 cls.create_pg_interfaces(range(1))
2116 cls.vapi.cli("ip table add 1")
2117 cls.vapi.cli("set interface ip table pg0 1")
2118 for i in cls.pg_interfaces:
2125 def config_tc(self):
2126 self.config_params({"dpd_disabled": False})
2128 def test_responder(self):
2129 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2130 super(TestResponderVrf, self).test_responder()
2131 self.pg0.enable_capture()
2133 capture = self.pg0.get_capture(expected_count=1, timeout=5)
2134 ih = self.get_ike_header(capture[0])
2135 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
2136 plain = self.sa.hmac_and_decrypt(ih)
2137 self.assertEqual(plain, b"")
2140 @tag_fixme_vpp_workers
2141 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2142 """test ikev2 responder - cert based auth"""
2144 def config_tc(self):
2149 "server-key": "server-key.pem",
2150 "client-key": "client-key.pem",
2151 "client-cert": "client-cert.pem",
2152 "server-cert": "server-cert.pem",
2157 @tag_fixme_vpp_workers
2158 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2159 TemplateResponder, Ikev2Params
2162 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2165 def config_tc(self):
2168 "ike-crypto": ("AES-CBC", 16),
2169 "ike-integ": "SHA2-256-128",
2170 "esp-crypto": ("AES-CBC", 24),
2171 "esp-integ": "SHA2-384-192",
2172 "ike-dh": "2048MODPgr",
2173 "nonce": os.urandom(256),
2174 "no_idr_in_auth": True,
2179 @tag_fixme_vpp_workers
2180 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2181 TemplateResponder, Ikev2Params
2185 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2188 def config_tc(self):
2191 "ike-crypto": ("AES-CBC", 32),
2192 "ike-integ": "SHA2-256-128",
2193 "esp-crypto": ("AES-GCM-16ICV", 32),
2194 "esp-integ": "NULL",
2195 "ike-dh": "3072MODPgr",
2200 @tag_fixme_vpp_workers
2201 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2206 IKE_NODE_SUFFIX = "ip6"
2208 def config_tc(self):
2211 "del_sa_from_responder": True,
2214 "ike-crypto": ("AES-GCM-16ICV", 32),
2215 "ike-integ": "NULL",
2216 "ike-dh": "2048MODPgr",
2217 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2218 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2223 @tag_fixme_vpp_workers
2224 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2226 Test for keep alive messages
2229 def send_empty_req_from_responder(self):
2230 packet = self.create_empty_request()
2231 self.pg0.add_stream(packet)
2232 self.pg0.enable_capture()
2234 capture = self.pg0.get_capture(1)
2235 ih = self.get_ike_header(capture[0])
2236 self.assertEqual(ih.id, self.sa.msg_id)
2237 plain = self.sa.hmac_and_decrypt(ih)
2238 self.assertEqual(plain, b"")
2239 self.assert_counter(1, "keepalive", "ip4")
2240 r = self.vapi.ikev2_sa_dump()
2241 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2243 def test_initiator(self):
2244 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2245 self.send_empty_req_from_responder()
2248 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2249 """malformed packet test"""
2254 def config_tc(self):
2255 self.config_params()
2257 def create_ike_init_msg(self, length=None, payload=None):
2260 init_SPI="\x11" * 8,
2262 exch_type="IKE_SA_INIT",
2264 if payload is not None:
2266 return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2268 def verify_bad_packet_length(self):
2269 ike_msg = self.create_ike_init_msg(length=0xDEAD)
2270 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2271 self.assert_counter(self.pkt_count, "bad_length")
2273 def verify_bad_sa_payload_length(self):
2274 p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2275 ike_msg = self.create_ike_init_msg(payload=p)
2276 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2277 self.assert_counter(self.pkt_count, "malformed_packet")
2279 def test_responder(self):
2280 self.pkt_count = 254
2281 self.verify_bad_packet_length()
2282 self.verify_bad_sa_payload_length()
2285 if __name__ == "__main__":
2286 unittest.main(testRunner=VppTestRunner)