from scapy.layers.inet6 import IPv6
from scapy.packet import raw, Raw
from scapy.utils import long_converter
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import (
+ tag_fixme_vpp_workers,
+ tag_fixme_ubuntu2204,
+ tag_fixme_debian11,
+ is_distro_ubuntu2204,
+ is_distro_debian11,
+ VppTestRunner,
+)
from vpp_ikev2 import Profile, IDType, AuthMethod
from vpp_papi import VppEnum
def complete_dh_data(self):
self.dh_shared_secret = self.compute_secret()
- def calc_child_keys(self):
+ def calc_child_keys(self, kex=False):
prf = self.ike_prf_alg.mod()
s = self.i_nonce + self.r_nonce
+ if kex:
+ s = self.dh_shared_secret + s
c = self.child_sas[0]
encr_key_len = self.esp_crypto_key_len
h.update(data)
return h.finalize()
- def calc_keys(self):
+ def calc_keys(self, sk_d=None):
prf = self.ike_prf_alg.mod()
- # SKEYSEED = prf(Ni | Nr, g^ir)
- s = self.i_nonce + self.r_nonce
- self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
+ if sk_d is None:
+ # SKEYSEED = prf(Ni | Nr, g^ir)
+ self.skeyseed = self.calc_prf(
+ prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
+ )
+ else:
+ # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
+ self.skeyseed = self.calc_prf(
+ prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
+ )
# calculate S = Ni | Nr | SPIi SPIr
- s = s + self.ispi + self.rspi
+ s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
prf_key_trunc = self.ike_prf_alg.trunc_len
encr_key_len = self.ike_crypto_key_len
digest.update(data)
return digest.finalize()
+ def clone(self, test, **kwargs):
+ if "spi" not in kwargs:
+ kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
+ if "nonce" not in kwargs:
+ kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
+ if self.child_sas:
+ if "local_ts" not in kwargs:
+ kwargs["local_ts"] = self.child_sas[0].local_ts
+ if "remote_ts" not in kwargs:
+ kwargs["remote_ts"] = self.child_sas[0].remote_ts
+ sa = type(self)(
+ test,
+ is_initiator=self.is_initiator,
+ i_id=self.i_id,
+ r_id=self.r_id,
+ id_type=self.id_type,
+ auth_data=self.auth_data,
+ auth_method=self.auth_method,
+ priv_key=self.priv_key,
+ i_natt=self.i_natt,
+ r_natt=self.r_natt,
+ udp_encap=self.udp_encap,
+ **kwargs,
+ )
+ if sa.is_initiator:
+ sa.set_ike_props(
+ crypto=self.ike_crypto,
+ crypto_key_len=self.ike_crypto_key_len,
+ integ=self.ike_integ,
+ prf=self.ike_prf,
+ dh=self.ike_dh,
+ )
+ sa.set_esp_props(
+ crypto=self.esp_crypto,
+ crypto_key_len=self.esp_crypto_key_len,
+ integ=self.esp_integ,
+ )
+ return sa
+
+@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
class IkePeer(VppTestCase):
"""common class for initiator and responder"""
self.initiate_del_sa_from_initiator()
r = self.vapi.ikev2_sa_dump()
self.assertEqual(len(r), 0)
+ r = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(len(r), 0)
sas = self.vapi.ipsec_sa_dump()
self.assertEqual(len(sas), 0)
self.p.remove_vpp_config()
node_name = "/err/ikev2-%s/" % version + name
self.assertEqual(count, self.statistics.get_err_counter(node_name))
- def create_rekey_request(self):
- sa, first_payload = self.generate_auth_payload(is_rekey=True)
+ def create_rekey_request(self, kex=False):
+ sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
header = ikev2.IKEv2(
init_SPI=self.sa.ispi,
resp_SPI=self.sa.rspi,
self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
)
+ def create_sa_rekey_request(self, **kwargs):
+ sa = self.generate_sa_init_payload(**kwargs)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(),
+ flags="Initiator",
+ exch_type="CREATE_CHILD_SA",
+ )
+ ike_msg = self.encrypt_ike_msg(header, sa, "SA")
+ return self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
+
def create_empty_request(self):
header = ikev2.IKEv2(
init_SPI=self.sa.ispi,
else:
self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
- def verify_ipsec_sas(self, is_rekey=False):
+ def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
sas = self.vapi.ipsec_sa_dump()
- if is_rekey:
- # after rekey there is a short period of time in which old
- # inbound SA is still present
- sa_count = 3
- else:
- sa_count = 2
+ if sa_count is None:
+ if is_rekey:
+ # after rekey there is a short period of time in which old
+ # inbound SA is still present
+ sa_count = 3
+ else:
+ sa_count = 2
self.assertEqual(len(sas), sa_count)
if self.sa.is_initiator:
if is_rekey:
self.assertEqual(api_id.data_len, exp_id.data_len)
self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
- def verify_ike_sas(self):
+ def verify_ike_sas(self, is_rekey=False):
r = self.vapi.ikev2_sa_dump()
+ if is_rekey:
+ sa_count = 2
+ sa = r[1].sa
+ else:
+ sa_count = 1
+ sa = r[0].sa
+ self.assertEqual(len(r), sa_count)
+ self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
+ self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
+ if self.ip6:
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
+ else:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
+ else:
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+ else:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
+ self.verify_keymat(sa.keys, self.sa, "sk_d")
+ self.verify_keymat(sa.keys, self.sa, "sk_ai")
+ self.verify_keymat(sa.keys, self.sa, "sk_ar")
+ self.verify_keymat(sa.keys, self.sa, "sk_ei")
+ self.verify_keymat(sa.keys, self.sa, "sk_er")
+ self.verify_keymat(sa.keys, self.sa, "sk_pi")
+ self.verify_keymat(sa.keys, self.sa, "sk_pr")
+
+ self.assertEqual(sa.i_id.type, self.sa.id_type)
+ self.assertEqual(sa.r_id.type, self.sa.id_type)
+ self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
+ self.assertEqual(sa.r_id.data_len, len(self.idr))
+ self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
+ self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
+
+ n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.i_nonce)
+ n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.r_nonce)
+
+ r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
+ if is_rekey:
+ self.assertEqual(len(r), 0)
+ return
+
+ self.assertEqual(len(r), 1)
+ csa = r[0].child_sa
+ self.assertEqual(csa.sa_index, sa.sa_index)
+ c = self.sa.child_sas[0]
+ if hasattr(c, "sk_ai"):
+ self.verify_keymat(csa.keys, c, "sk_ai")
+ self.verify_keymat(csa.keys, c, "sk_ar")
+ self.verify_keymat(csa.keys, c, "sk_ei")
+ self.verify_keymat(csa.keys, c, "sk_er")
+ self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
+ self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
+
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+ tsi = tsi[0]
+ tsr = tsr[0]
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+ )
+ self.assertEqual(len(r), 1)
+ ts = r[0].ts
+ self.verify_ts(r[0].ts, tsi[0], True)
+
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+ )
+ self.assertEqual(len(r), 1)
+ self.verify_ts(r[0].ts, tsr[0], False)
+
+ def verify_ike_sas_v2(self):
+ r = self.vapi.ikev2_sa_v2_dump()
self.assertEqual(len(r), 1)
sa = r[0].sa
+ self.assertEqual(self.p.profile_name, sa.profile_name)
self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
if self.ip6:
self.sa.calc_child_keys()
self.send_auth_response()
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
class TemplateResponder(IkePeer):
capture = self.pg0.get_capture(1)
self.verify_del_sa(capture[0])
- def send_sa_init_req(self):
+ def generate_sa_init_payload(
+ self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
+ ):
tr_attr = self.sa.ike_crypto_attr()
trans = (
ikev2.IKEv2_payload_Transform(
)
)
+ if spi is None:
+ pargs = {}
+ else:
+ pargs = {"SPI": spi, "SPIsize": len(spi)}
props = ikev2.IKEv2_payload_Proposal(
- proposal=1, proto="IKEv2", trans_nb=4, trans=trans
+ proposal=1,
+ proto="IKEv2",
+ trans_nb=4,
+ trans=trans,
+ **pargs,
)
- next_payload = None if self.ip6 else "Notify"
-
- self.sa.init_req_packet = (
- ikev2.IKEv2(
- init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
- )
- / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
+ return (
+ ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
/ ikev2.IKEv2_payload_KE(
- next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
+ next_payload="Nonce",
+ group=self.sa.ike_dh,
+ load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
+ )
+ / ikev2.IKEv2_payload_Nonce(
+ next_payload=next_payload,
+ load=self.sa.i_nonce if nonce is None else nonce,
)
- / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
)
+ def send_sa_init_req(self):
+ self.sa.init_req_packet = ikev2.IKEv2(
+ init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
+ ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
+
if not self.ip6:
if self.sa.i_natt:
src_address = b"\x0a\x0a\x0a\x01"
capture = self.pg0.get_capture(1)
self.verify_sa_init(capture[0])
- def generate_auth_payload(self, last_payload=None, is_rekey=False):
+ def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
tr_attr = self.sa.esp_crypto_attr()
last_payload = last_payload or "Notify"
+ trans_nb = 4
trans = (
ikev2.IKEv2_payload_Transform(
transform_type="Encryption",
)
)
+ if kex:
+ trans_nb += 1
+ trans /= ikev2.IKEv2_payload_Transform(
+ transform_type="GroupDesc", transform_id=self.sa.ike_dh
+ )
+
c = self.sa.child_sas[0]
props = ikev2.IKEv2_payload_Proposal(
- proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
+ proposal=1,
+ proto="ESP",
+ SPIsize=4,
+ SPI=c.ispi,
+ trans_nb=trans_nb,
+ trans=trans,
)
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
if is_rekey:
first_payload = "Nonce"
+ if kex:
+ head = ikev2.IKEv2_payload_Nonce(
+ load=self.sa.i_nonce, next_payload="KE"
+ ) / ikev2.IKEv2_payload_KE(
+ group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
+ )
+ else:
+ head = ikev2.IKEv2_payload_Nonce(
+ load=self.sa.i_nonce, next_payload="SA"
+ )
plain = (
- ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
+ head
/ plain
- / ikev2.IKEv2_payload_Notify(type="REKEY_SA", proto="ESP", SPI=c.ispi)
+ / ikev2.IKEv2_payload_Notify(
+ type="REKEY_SA",
+ proto="ESP",
+ SPI=c.ispi,
+ length=8 + len(c.ispi),
+ next_payload="Notify",
+ )
+ / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
)
else:
first_payload = "IDi"
self.assertEqual(1, s.n_sa_auth_req)
self.assertEqual(1, s.n_sa_init_req)
+ r = self.vapi.ikev2_sa_v2_dump()
+ s = r[0].sa.stats
+ self.assertEqual(1, s.n_sa_auth_req)
+ self.assertEqual(1, s.n_sa_init_req)
+
def test_responder(self):
self.send_sa_init_req()
self.send_sa_auth()
self.verify_ipsec_sas()
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_counters()
)
+@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
class TestApi(VppTestCase):
"""Test IKEV2 API"""
# verify that only the second request was accepted
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_ipsec_sas(is_rekey=True)
super(TestInitiatorRekey, self).test_initiator()
self.rekey_from_initiator()
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_ipsec_sas(is_rekey=True)
time.sleep(3)
ike_sas = self.vapi.ikev2_sa_dump()
self.assertEqual(len(ike_sas), 0)
+ ike_sas = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(len(ike_sas), 0)
ipsec_sas = self.vapi.ipsec_sa_dump()
self.assertEqual(len(ipsec_sas), 0)
class TestResponderRekey(TestResponderPsk):
"""test ikev2 responder - rekey"""
- def rekey_from_initiator(self):
- packet = self.create_rekey_request()
+ WITH_KEX = False
+
+ def send_rekey_from_initiator(self):
+ if self.WITH_KEX:
+ self.sa.generate_dh_data()
+ packet = self.create_rekey_request(kex=self.WITH_KEX)
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
+ return capture
+
+ def process_rekey_response(self, capture):
ih = self.get_ike_header(capture[0])
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
# update new responder SPI
self.sa.child_sas[0].rspi = prop.SPI
+ if self.WITH_KEX:
+ self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+ self.sa.complete_dh_data()
+ self.sa.calc_child_keys(kex=self.WITH_KEX)
def test_responder(self):
super(TestResponderRekey, self).test_responder()
- self.rekey_from_initiator()
- self.sa.calc_child_keys()
+ self.process_rekey_response(self.send_rekey_from_initiator())
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_ipsec_sas(is_rekey=True)
self.assert_counter(1, "rekey_req", "ip4")
r = self.vapi.ikev2_sa_dump()
self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+ r = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeat(TestResponderRekey):
+ """test ikev2 responder - rekey repeat"""
+
+ def test_responder(self):
+ super(TestResponderRekeyRepeat, self).test_responder()
+ # rekey request is not accepted until old IPsec SA is expired
+ capture = self.send_rekey_from_initiator()
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ notify = ikev2.IKEv2_payload_Notify(plain)
+ self.assertEqual(notify.type, 43)
+ self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
+ # rekey request is accepted after old IPsec SA was expired
+ for _ in range(50):
+ if len(self.vapi.ipsec_sa_dump()) != 3:
+ break
+ time.sleep(0.2)
+ else:
+ self.fail("old IPsec SA not expired")
+ self.process_rekey_response(self.send_rekey_from_initiator())
+ self.verify_ike_sas()
+ self.verify_ike_sas_v2()
+ self.verify_ipsec_sas(sa_count=3)
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyKEX(TestResponderRekey):
+ """test ikev2 responder - rekey with key exchange"""
+
+ WITH_KEX = True
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
+ """test ikev2 responder - rekey repeat with key exchange"""
+ WITH_KEX = True
+
+@tag_fixme_vpp_workers
+class TestResponderRekeySA(TestResponderPsk):
+ """test ikev2 responder - rekey IKE SA"""
+
+ def send_rekey_from_initiator(self, newsa):
+ packet = self.create_sa_rekey_request(
+ spi=newsa.ispi,
+ dh_pub_key=newsa.my_dh_pub_key,
+ nonce=newsa.i_nonce,
+ )
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ return capture
+
+ def process_rekey_response(self, newsa, capture):
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ newsa.rspi = prop.SPI
+ newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+ newsa.complete_dh_data()
+ newsa.calc_keys(sk_d=self.sa.sk_d)
+ newsa.child_sas = self.sa.child_sas
+ self.sa.child_sas = []
+
+ def test_responder(self):
+ super(TestResponderRekeySA, self).test_responder()
+ newsa = self.sa.clone(self, spi=os.urandom(8))
+ newsa.generate_dh_data()
+ capture = self.send_rekey_from_initiator(newsa)
+ self.process_rekey_response(newsa, capture)
+ self.verify_ike_sas(is_rekey=True)
+ self.assert_counter(1, "rekey_req", "ip4")
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
+ self.initiate_del_sa_from_initiator()
+ self.sa = newsa
+ self.verify_ike_sas()
+
+
+@tag_fixme_ubuntu2204
+@tag_fixme_debian11
class TestResponderVrf(TestResponderPsk, Ikev2Params):
"""test ikev2 responder - non-default table id"""
globals()["ikev2"] = _ikev2
super(IkePeer, cls).setUpClass()
+ if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
+ cls, "vpp"
+ ):
+ return
cls.create_pg_interfaces(range(1))
cls.vapi.cli("ip table add 1")
cls.vapi.cli("set interface ip table pg0 1")
self.assert_counter(1, "keepalive", "ip4")
r = self.vapi.ikev2_sa_dump()
self.assertEqual(1, r[0].sa.stats.n_keepalives)
+ r = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(1, r[0].sa.stats.n_keepalives)
def test_initiator(self):
super(TestInitiatorKeepaliveMsg, self).test_initiator()