import os
+import time
from socket import inet_pton
from cryptography import x509
from cryptography.hazmat.backends import default_backend
def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
nonce=None, auth_data=None, local_ts=None, remote_ts=None,
- auth_method='shared-key', priv_key=None, natt=False):
+ auth_method='shared-key', priv_key=None, natt=False,
+ udp_encap=False):
+ self.udp_encap = udp_encap
self.natt = natt
if natt:
self.sport = 4500
self.sa.generate_dh_data()
self.vapi.cli('ikev2 set logging level 4')
self.vapi.cli('event-lo clear')
- self.vapi.cli('ikev2 dpd disable')
+
+ def create_rekey_request(self):
+ sa, first_payload = self.generate_auth_payload(is_rekey=True)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+ ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+ return self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+
+ def create_empty_request(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(), flags='Initiator',
+ exch_type='INFORMATIONAL',
+ next_payload='Encrypted')
+
+ msg = self.encrypt_ike_msg(header, b'', None)
+ return self.create_packet(self.pg0, msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
use_ip6=False):
def get_ike_header(self, packet):
try:
ih = packet[ikev2.IKEv2]
+ ih = self.verify_and_remove_non_esp_marker(ih)
except IndexError as e:
# this is a workaround for getting IKEv2 layer as both ikev2 and
# ipsec register for port 4500
assert(len(res) == tlen)
return res
+ def verify_udp_encap(self, ipsec_sa):
+ e = VppEnum.vl_api_ipsec_sad_flags_t
+ if self.sa.udp_encap or self.sa.natt:
+ self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+ else:
+ self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+
def verify_ipsec_sas(self, is_rekey=False):
sas = self.vapi.ipsec_sa_dump()
if is_rekey:
else:
sa_count = 2
self.assertEqual(len(sas), sa_count)
- e = VppEnum.vl_api_ipsec_sad_flags_t
if self.sa.is_initiator:
if is_rekey:
sa0 = sas[0].entry
c = self.sa.child_sas[0]
+ self.verify_udp_encap(sa0)
+ self.verify_udp_encap(sa1)
vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
self.assertEqual(s.load, dst_sha)
def verify_sa_init_request(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
ih = packet[ikev2.IKEv2]
self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
self.assertEqual(ih.exch_type, 34) # SA_INIT
trans = trans.payload
def verify_sa_auth_req(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
ih = self.get_ike_header(packet)
self.assertEqual(ih.resp_SPI, self.sa.rspi)
self.assertEqual(ih.init_SPI, self.sa.ispi)
transform_id=self.sa.ike_dh))
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
trans_nb=4, trans=trans))
+
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+ if self.sa.natt:
+ dst_address = b'\x0a\x0a\x0a\x0a'
+ else:
+ dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+
self.sa.init_resp_packet = (
ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
exch_type='IKE_SA_INIT', flags='Response') /
ikev2.IKEv2_payload_KE(next_payload='Nonce',
group=self.sa.ike_dh,
load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
+ ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
+ next_payload='Notify') /
+ ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify') / ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
self.sa.sport, self.sa.dport,
- self.sa.natt, self.ip6)
+ False, self.ip6)
self.pg_send(self.pg0, ike_msg)
capture = self.pg0.get_capture(1)
self.verify_sa_auth_req(capture[0])
'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
+ dpd_disabled = True if 'dpd_disabled' not in params else\
+ params['dpd_disabled']
+ if dpd_disabled:
+ self.vapi.cli('ikev2 dpd disable')
self.del_sa_from_responder = False if 'del_sa_from_responder'\
not in params else params['del_sa_from_responder']
is_natt = 'natt' in params and params['natt'] or False
if 'esp_transforms' in params:
self.p.add_esp_transforms(params['esp_transforms'])
+ udp_encap = False if 'udp_encap' not in params else\
+ params['udp_encap']
+ if udp_encap:
+ self.p.set_udp_encap(True)
+
self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
is_initiator=is_init,
id_type=self.p.local_id['id_type'], natt=is_natt,
priv_key=client_priv, auth_method=auth_method,
- auth_data=auth_data,
+ auth_data=auth_data, udp_encap=udp_encap,
local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
-
if is_init:
ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
params['ike-crypto']
p.set_lifetime_data(cfg['lifetime_data'])
if 'tun_itf' in cfg:
p.set_tunnel_interface(cfg['tun_itf'])
+ if 'natt_disabled' in cfg and cfg['natt_disabled']:
+ p.disable_natt()
p.add_vpp_config()
return p
conf = {
'p1': {
'name': 'p1',
+ 'natt_disabled': True,
'loc_id': ('fqdn', b'vpp.home'),
'rem_id': ('fqdn', b'roadwarrior.example.com'),
'loc_ts': loc_ts4,
self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
self.verify_auth(ap.auth, cp['auth'])
+ natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
+ self.assertTrue(natt_dis == ap.natt_disabled)
+
if 'lifetime_data' in cp:
self.verify_lifetime_data(ap, cp['lifetime_data'])
self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
self.assertEqual(ap.tun_itf, 0xffffffff)
+class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
+ """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
+
+ def config_tc(self):
+ self.config_params({
+ 'natt': True,
+ 'is_initiator': False, # seen from test case perspective
+ # thus vpp is initiator
+ 'responder': {'sw_if_index': self.pg0.sw_if_index,
+ 'addr': self.pg0.remote_ip4},
+ 'ike-crypto': ('AES-GCM-16ICV', 32),
+ 'ike-integ': 'NULL',
+ 'ike-dh': '3072MODPgr',
+ 'ike_transforms': {
+ 'crypto_alg': 20, # "aes-gcm-16"
+ 'crypto_key_size': 256,
+ 'dh_group': 15, # "modp-3072"
+ },
+ 'esp_transforms': {
+ 'crypto_alg': 12, # "aes-cbc"
+ 'crypto_key_size': 256,
+ # "hmac-sha2-256-128"
+ 'integ_alg': 12}})
+
+
class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - pre shared key auth """
'integ_alg': 12}})
+class TestInitiatorRequestWindowSize(TestInitiatorPsk):
+ """ test initiator - request window size (1) """
+
+ def rekey_respond(self, req, update_child_sa_data):
+ ih = self.get_ike_header(req)
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ if update_child_sa_data:
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ self.sa.r_nonce = self.sa.i_nonce
+ self.sa.child_sas[0].ispi = prop.SPI
+ self.sa.child_sas[0].rspi = prop.SPI
+ self.sa.calc_child_keys()
+
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type=36,
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, sa, 'SA')
+ packet = self.create_packet(self.pg0, resp, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.send_and_assert_no_replies(self.pg0, packet)
+
+ def test_initiator(self):
+ super(TestInitiatorRequestWindowSize, self).test_initiator()
+ self.pg0.enable_capture()
+ self.pg_start()
+ ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ capture = self.pg0.get_capture(2)
+
+ # reply in reverse order
+ self.rekey_respond(capture[1], True)
+ self.rekey_respond(capture[0], False)
+
+ # verify that only the second request was accepted
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(is_rekey=True)
+
+
class TestInitiatorRekey(TestInitiatorPsk):
""" test ikev2 initiator - rekey """
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
prop = sa[ikev2.IKEv2_payload_Proposal]
- nonce = sa[ikev2.IKEv2_payload_Nonce]
self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
self.sa.r_nonce = self.sa.i_nonce
# update new responder SPI
self.config_params()
+class TestResponderDpd(TestResponderPsk):
+ """
+ Dead peer detection test
+ """
+ def config_tc(self):
+ self.config_params({'dpd_disabled': False})
+
+ def tearDown(self):
+ pass
+
+ def test_responder(self):
+ self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+ super(TestResponderDpd, self).test_responder()
+ self.pg0.enable_capture()
+ self.pg_start()
+ # capture empty request but don't reply
+ capture = self.pg0.get_capture(expected_count=1, timeout=5)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+ # wait for SA expiration
+ time.sleep(3)
+ ike_sas = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(ike_sas), 0)
+ ipsec_sas = self.vapi.ipsec_sa_dump()
+ self.assertEqual(len(ipsec_sas), 0)
+
+
class TestResponderRekey(TestResponderPsk):
""" test ikev2 responder - rekey """
def rekey_from_initiator(self):
- sa, first_payload = self.generate_auth_payload(is_rekey=True)
- header = ikev2.IKEv2(
- init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
- flags='Initiator', exch_type='CREATE_CHILD_SA')
-
- ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
- packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ packet = self.create_rekey_request()
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
prop = sa[ikev2.IKEv2_payload_Proposal]
- nonce = sa[ikev2.IKEv2_payload_Nonce]
self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
# update new responder SPI
self.sa.child_sas[0].rspi = prop.SPI
""" test ikev2 responder - cert based auth """
def config_tc(self):
self.config_params({
+ 'udp_encap': True,
'auth': 'rsa-sig',
'server-key': 'server-key.pem',
'client-key': 'client-key.pem',
'end_addr': '11::100'}})
+class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
+ """
+ Test for keep alive messages
+ """
+
+ def send_empty_req_from_responder(self):
+ packet = self.create_empty_request()
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.id, self.sa.msg_id)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+ def test_initiator(self):
+ super(TestInitiatorKeepaliveMsg, self).test_initiator()
+ self.send_empty_req_from_responder()
+
+
class TestMalformedMessages(TemplateResponder, Ikev2Params):
""" malformed packet test """