import os
+import time
from socket import inet_pton
from cryptography import x509
from cryptography.hazmat.backends import default_backend
modes,
)
from ipaddress import IPv4Address, IPv6Address, ip_address
+import unittest
from scapy.layers.ipsec import ESP
from scapy.layers.inet import IP, UDP, Ether
from scapy.layers.inet6 import IPv6
from scapy.packet import raw, Raw
from scapy.utils import long_converter
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner
from vpp_ikev2 import Profile, IDType, AuthMethod
from vpp_papi import VppEnum
self.mode(nonce, icv, len(icv)),
default_backend()).decryptor()
decryptor.authenticate_additional_data(aad)
- pt = decryptor.update(ct) + decryptor.finalize()
- pad_len = pt[-1] + 1
- return pt[:-pad_len]
+ return decryptor.update(ct) + decryptor.finalize()
def pad(self, data):
pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
hashes.SHA256, 32),
}
+CRYPTO_IDS = {
+ 12: 'AES-CBC',
+ 20: 'AES-GCM-16ICV',
+}
+
+INTEG_IDS = {
+ 2: 'HMAC-SHA1-96',
+ 12: 'SHA2-256-128',
+ 13: 'SHA2-384-192',
+ 14: 'SHA2-512-256',
+}
+
class IKEv2ChildSA(object):
- def __init__(self, local_ts, remote_ts, spi=None):
- self.spi = spi or os.urandom(4)
+ def __init__(self, local_ts, remote_ts, is_initiator):
+ spi = os.urandom(4)
+ if is_initiator:
+ self.ispi = spi
+ self.rspi = None
+ else:
+ self.rspi = spi
+ self.ispi = None
self.local_ts = local_ts
self.remote_ts = remote_ts
def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
nonce=None, auth_data=None, local_ts=None, remote_ts=None,
- auth_method='shared-key', priv_key=None, natt=False):
- self.natt = natt
- if natt:
+ auth_method='shared-key', priv_key=None, i_natt=False,
+ r_natt=False, udp_encap=False):
+ self.udp_encap = udp_encap
+ self.i_natt = i_natt
+ self.r_natt = r_natt
+ if i_natt or r_natt:
self.sport = 4500
self.dport = 4500
else:
self.rspi = spi
self.ispi = 8 * b'\x00'
self.r_nonce = nonce
- self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
+ self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
+ self.is_initiator)]
def new_msg_id(self):
self.msg_id += 1
return self.r_dh_data
return self.i_dh_data
+ @property
+ def natt(self):
+ return self.i_natt or self.r_natt
+
def compute_secret(self):
priv = self.dh_private_key
peer = self.peer_dh_pub_key
aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
ct = ep.load[:-GCM_ICV_SIZE]
tag = ep.load[-GCM_ICV_SIZE:]
- return self.decrypt(ct, raw(ike)[:aad_len], tag)
+ plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
else:
self.verify_hmac(raw(ike))
integ_trunc = self.ike_integ_alg.trunc_len
# remove ICV and decrypt payload
ct = ep.load[:-integ_trunc]
- return self.decrypt(ct)
+ plain = self.decrypt(ct)
+ # remove padding
+ pad_len = plain[-1]
+ return plain[:-pad_len - 1]
def build_ts_addr(self, ts, version):
return {'starting_address_v' + version: ts['start_addr'],
def tearDownClass(cls):
super(IkePeer, cls).tearDownClass()
+ def tearDown(self):
+ super(IkePeer, self).tearDown()
+ if self.del_sa_from_responder:
+ self.initiate_del_sa_from_responder()
+ else:
+ self.initiate_del_sa_from_initiator()
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(r), 0)
+ sas = self.vapi.ipsec_sa_dump()
+ self.assertEqual(len(sas), 0)
+ self.p.remove_vpp_config()
+ self.assertIsNone(self.p.query_vpp_config())
+
def setUp(self):
super(IkePeer, self).setUp()
self.config_tc()
self.p.add_vpp_config()
self.assertIsNotNone(self.p.query_vpp_config())
- self.sa.generate_dh_data()
+ if self.sa.is_initiator:
+ self.sa.generate_dh_data()
self.vapi.cli('ikev2 set logging level 4')
self.vapi.cli('event-lo clear')
+ def assert_counter(self, count, name, version='ip4'):
+ node_name = '/err/ikev2-%s/' % version + name
+ self.assertEqual(count, self.statistics.get_err_counter(node_name))
+
+ def create_rekey_request(self):
+ sa, first_payload = self.generate_auth_payload(is_rekey=True)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+ ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+ return self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+
+ def create_empty_request(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(), flags='Initiator',
+ exch_type='INFORMATIONAL',
+ next_payload='Encrypted')
+
+ msg = self.encrypt_ike_msg(header, b'', None)
+ return self.create_packet(self.pg0, msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+
def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
use_ip6=False):
if use_ip6:
def get_ike_header(self, packet):
try:
ih = packet[ikev2.IKEv2]
+ ih = self.verify_and_remove_non_esp_marker(ih)
except IndexError as e:
# this is a workaround for getting IKEv2 layer as both ikev2 and
# ipsec register for port 4500
esp = packet[ESP]
ih = self.verify_and_remove_non_esp_marker(esp)
self.assertEqual(ih.version, 0x20)
+ self.assertNotIn('Version', ih.flags)
return ih
def verify_and_remove_non_esp_marker(self, packet):
assert(len(res) == tlen)
return res
- def verify_ipsec_sas(self):
- sas = self.vapi.ipsec_sa_dump()
- self.assertEqual(len(sas), 2)
+ def verify_udp_encap(self, ipsec_sa):
e = VppEnum.vl_api_ipsec_sad_flags_t
+ if self.sa.udp_encap or self.sa.natt:
+ self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+ else:
+ self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+
+ def verify_ipsec_sas(self, is_rekey=False):
+ sas = self.vapi.ipsec_sa_dump()
+ if is_rekey:
+ # after rekey there is a short period of time in which old
+ # inbound SA is still present
+ sa_count = 3
+ else:
+ sa_count = 2
+ self.assertEqual(len(sas), sa_count)
if self.sa.is_initiator:
- sa0 = sas[0].entry
- sa1 = sas[1].entry
+ if is_rekey:
+ sa0 = sas[0].entry
+ sa1 = sas[2].entry
+ else:
+ sa0 = sas[0].entry
+ sa1 = sas[1].entry
else:
- sa1 = sas[0].entry
- sa0 = sas[1].entry
+ if is_rekey:
+ sa0 = sas[2].entry
+ sa1 = sas[0].entry
+ else:
+ sa1 = sas[0].entry
+ sa0 = sas[1].entry
c = self.sa.child_sas[0]
+ self.verify_udp_encap(sa0)
+ self.verify_udp_encap(sa1)
vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
self.verify_keymat(csa.keys, c, 'sk_ar')
self.verify_keymat(csa.keys, c, 'sk_ei')
self.verify_keymat(csa.keys, c, 'sk_er')
+ self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
+ self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
tsi = tsi[0]
class TemplateInitiator(IkePeer):
""" initiator test template """
- def tearDown(self):
- super(TemplateInitiator, self).tearDown()
+ def initiate_del_sa_from_initiator(self):
+ ispi = int.from_bytes(self.sa.ispi, 'little')
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertNotIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ plain = self.sa.hmac_and_decrypt(ih)
+ d = ikev2.IKEv2_payload_Delete(plain)
+ self.assertEqual(d.proto, 1) # proto=IKEv2
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type='INFORMATIONAL',
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, b'', None)
+ self.send_and_assert_no_replies(self.pg0, resp)
+
+ def verify_del_sa(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 37) # exchange informational
+ self.assertIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+ def initiate_del_sa_from_responder(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ exch_type='INFORMATIONAL',
+ id=self.sa.new_msg_id())
+ del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+ ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+ packet = self.create_packet(self.pg0, ike_msg,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_del_sa(capture[0])
@staticmethod
def find_notify_payload(packet, notify_type):
self.assertEqual(s.load, dst_sha)
def verify_sa_init_request(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
ih = packet[ikev2.IKEv2]
self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
self.assertEqual(ih.exch_type, 34) # SA_INIT
self.p.ike_transforms['dh_group'])
self.verify_nat_detection(packet)
+ self.sa.set_ike_props(
+ crypto='AES-GCM-16ICV', crypto_key_len=32,
+ integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
+ self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
+ integ='SHA2-256-128')
+ self.sa.generate_dh_data()
self.sa.complete_dh_data()
self.sa.calc_keys()
+ def update_esp_transforms(self, trans, sa):
+ while trans:
+ if trans.transform_type == 1: # ecryption
+ sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
+ elif trans.transform_type == 3: # integrity
+ sa.esp_integ = INTEG_IDS[trans.transform_id]
+ trans = trans.payload
+
def verify_sa_auth_req(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
ih = self.get_ike_header(packet)
self.assertEqual(ih.resp_SPI, self.sa.rspi)
self.assertEqual(ih.init_SPI, self.sa.ispi)
idr = ikev2.IKEv2_payload_IDr(idi.payload)
self.assertEqual(idi.load, self.sa.i_id)
self.assertEqual(idr.load, self.sa.r_id)
+ prop = idi[ikev2.IKEv2_payload_Proposal]
+ c = self.sa.child_sas[0]
+ c.ispi = prop.SPI
+ self.update_esp_transforms(
+ prop[ikev2.IKEv2_payload_Transform], self.sa)
def send_init_response(self):
tr_attr = self.sa.ike_crypto_attr()
transform_id=self.sa.ike_dh))
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
trans_nb=4, trans=trans))
+
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+ if self.sa.natt:
+ dst_address = b'\x0a\x0a\x0a\x0a'
+ else:
+ dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+
self.sa.init_resp_packet = (
ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
exch_type='IKE_SA_INIT', flags='Response') /
ikev2.IKEv2_payload_KE(next_payload='Nonce',
group=self.sa.ike_dh,
load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
+ ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
+ next_payload='Notify') /
+ ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify') / ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
self.sa.sport, self.sa.dport,
- self.sa.natt, self.ip6)
+ False, self.ip6)
self.pg_send(self.pg0, ike_msg)
capture = self.pg0.get_capture(1)
self.verify_sa_auth_req(capture[0])
transform_type='Extended Sequence Number',
transform_id='ESN'))
+ c = self.sa.child_sas[0]
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
- SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+ SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
class TemplateResponder(IkePeer):
""" responder test template """
- def tearDown(self):
- super(TemplateResponder, self).tearDown()
- if self.sa.is_initiator:
- self.initiate_del_sa()
- r = self.vapi.ikev2_sa_dump()
- self.assertEqual(len(r), 0)
-
- self.p.remove_vpp_config()
- self.assertIsNone(self.p.query_vpp_config())
+ def initiate_del_sa_from_responder(self):
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_del_ike_sa(
+ ispi=int.from_bytes(self.sa.ispi, 'little'))
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertNotIn('Response', ih.flags)
+ self.assertNotIn('Initiator', ih.flags)
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ d = ikev2.IKEv2_payload_Delete(plain)
+ self.assertEqual(d.proto, 1) # proto=IKEv2
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Initiator+Response',
+ exch_type='INFORMATIONAL',
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, b'', None)
+ self.send_and_assert_no_replies(self.pg0, resp)
def verify_del_sa(self, packet):
ih = self.get_ike_header(packet)
self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 37) # exchange informational
+ self.assertIn('Response', ih.flags)
+ self.assertNotIn('Initiator', ih.flags)
+ self.assertEqual(ih.next_payload, 46) # Encrypted
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
- def initiate_del_sa(self):
+ def initiate_del_sa_from_initiator(self):
header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
flags='Initiator', exch_type='INFORMATIONAL',
id=self.sa.new_msg_id())
capture = self.pg0.get_capture(1)
self.verify_del_sa(capture[0])
- def send_sa_init_req(self, behind_nat=False):
+ def send_sa_init_req(self):
tr_attr = self.sa.ike_crypto_attr()
trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
transform_id=self.sa.ike_crypto, length=tr_attr[1],
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
trans_nb=4, trans=trans))
+ next_payload = None if self.ip6 else 'Notify'
+
self.sa.init_req_packet = (
ikev2.IKEv2(init_SPI=self.sa.ispi,
flags='Initiator', exch_type='IKE_SA_INIT') /
ikev2.IKEv2_payload_KE(next_payload='Nonce',
group=self.sa.ike_dh,
load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(next_payload='Notify',
+ ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
load=self.sa.i_nonce))
- if behind_nat:
- src_address = b'\x0a\x0a\x0a\x01'
- else:
- src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+ if not self.ip6:
+ if self.sa.i_natt:
+ src_address = b'\x0a\x0a\x0a\x01'
+ else:
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
- src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
- dst_nat = self.sa.compute_nat_sha1(
- inet_pton(socket.AF_INET, self.pg0.local_ip4),
- self.sa.dport)
- nat_src_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP', load=src_nat,
- next_payload='Notify')
- nat_dst_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
- self.sa.init_req_packet = (self.sa.init_req_packet /
- nat_src_detection /
- nat_dst_detection)
+ if self.sa.r_natt:
+ dst_address = b'\x0a\x0a\x0a\x0a'
+ else:
+ dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+ nat_src_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify')
+ nat_dst_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
+ self.sa.init_req_packet = (self.sa.init_req_packet /
+ nat_src_detection /
+ nat_dst_detection)
ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
self.sa.sport, self.sa.dport,
capture = self.pg0.get_capture(1)
self.verify_sa_init(capture[0])
- def send_sa_auth(self):
+ def generate_auth_payload(self, last_payload=None, is_rekey=False):
tr_attr = self.sa.esp_crypto_attr()
+ last_payload = last_payload or 'Notify'
trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
transform_id=self.sa.esp_crypto, length=tr_attr[1],
key_length=tr_attr[0]) /
transform_type='Extended Sequence Number',
transform_id='ESN'))
+ c = self.sa.child_sas[0]
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
- SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+ SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
- plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
- IDtype=self.sa.id_type, load=self.sa.i_id) /
- ikev2.IKEv2_payload_IDr(next_payload='AUTH',
- IDtype=self.sa.id_type, load=self.sa.r_id) /
- ikev2.IKEv2_payload_AUTH(next_payload='SA',
+ plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
auth_type=AuthMethod.value(self.sa.auth_method),
load=self.sa.auth_data) /
ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
ikev2.IKEv2_payload_TSi(next_payload='TSr',
- number_of_TSs=len(tsi),
- traffic_selector=tsi) /
- ikev2.IKEv2_payload_TSr(next_payload='Notify',
- number_of_TSs=len(tsr),
- traffic_selector=tsr) /
- ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+ number_of_TSs=len(tsi), traffic_selector=tsi) /
+ ikev2.IKEv2_payload_TSr(next_payload=last_payload,
+ number_of_TSs=len(tsr), traffic_selector=tsr))
+
+ if is_rekey:
+ first_payload = 'Nonce'
+ plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
+ next_payload='SA') / plain /
+ ikev2.IKEv2_payload_Notify(type='REKEY_SA',
+ proto='ESP', SPI=c.ispi))
+ else:
+ first_payload = 'IDi'
+ ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
+ IDtype=self.sa.id_type, load=self.sa.i_id) /
+ ikev2.IKEv2_payload_IDr(next_payload='AUTH',
+ IDtype=self.sa.id_type, load=self.sa.r_id))
+ plain = ids / plain
+ return plain, first_payload
+ def send_sa_auth(self):
+ plain, first_payload = self.generate_auth_payload(
+ last_payload='Notify')
+ plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
header = ikev2.IKEv2(
init_SPI=self.sa.ispi,
resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
flags='Initiator', exch_type='IKE_AUTH')
- ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
+ ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
self.sa.dport, self.sa.natt, self.ip6)
self.pg0.add_stream(packet)
self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 34)
- self.assertTrue('Response' in ih.flags)
+ self.assertIn('Response', ih.flags)
self.assertEqual(ih.init_SPI, self.sa.ispi)
self.assertNotEqual(ih.resp_SPI, 0)
self.sa.rspi = ih.resp_SPI
self.verify_udp(udp)
self.assertEqual(ike.id, self.sa.msg_id)
plain = self.sa.hmac_and_decrypt(ike)
+ idr = ikev2.IKEv2_payload_IDr(plain)
+ prop = idr[ikev2.IKEv2_payload_Proposal]
+ self.assertEqual(prop.SPIsize, 4)
+ self.sa.child_sas[0].rspi = prop.SPI
self.sa.calc_child_keys()
+ IKE_NODE_SUFFIX = 'ip4'
+
+ def verify_counters(self):
+ self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
+
+ r = self.vapi.ikev2_sa_dump()
+ s = r[0].sa.stats
+ self.assertEqual(1, s.n_sa_auth_req)
+ self.assertEqual(1, s.n_sa_init_req)
+
def test_responder(self):
- self.send_sa_init_req(self.sa.natt)
+ self.send_sa_init_req()
self.send_sa_auth()
self.verify_ipsec_sas()
self.verify_ike_sas()
+ self.verify_counters()
class Ikev2Params(object):
'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
- is_natt = 'natt' in params and params['natt'] or False
+ dpd_disabled = True if 'dpd_disabled' not in params else\
+ params['dpd_disabled']
+ if dpd_disabled:
+ self.vapi.cli('ikev2 dpd disable')
+ self.del_sa_from_responder = False if 'del_sa_from_responder'\
+ not in params else params['del_sa_from_responder']
+ i_natt = False if 'i_natt' not in params else params['i_natt']
+ r_natt = False if 'r_natt' not in params else params['r_natt']
self.p = Profile(self, 'pr1')
self.ip6 = False if 'ip6' not in params else params['ip6']
if 'esp_transforms' in params:
self.p.add_esp_transforms(params['esp_transforms'])
+ udp_encap = False if 'udp_encap' not in params else\
+ params['udp_encap']
+ if udp_encap:
+ self.p.set_udp_encap(True)
+
+ if 'responder_hostname' in params:
+ hn = params['responder_hostname']
+ self.p.add_responder_hostname(hn)
+
+ # configure static dns record
+ self.vapi.dns_name_server_add_del(
+ is_ip6=0, is_add=1,
+ server_address=IPv4Address(u'8.8.8.8').packed)
+ self.vapi.dns_enable_disable(enable=1)
+
+ cmd = "dns cache add {} {}".format(hn['hostname'],
+ self.pg0.remote_ip4)
+ self.vapi.cli(cmd)
+
self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
is_initiator=is_init,
- id_type=self.p.local_id['id_type'], natt=is_natt,
+ id_type=self.p.local_id['id_type'],
+ i_natt=i_natt, r_natt=r_natt,
priv_key=client_priv, auth_method=auth_method,
- auth_data=auth_data,
+ auth_data=auth_data, udp_encap=udp_encap,
local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
-
- ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
- params['ike-crypto']
- ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
- params['ike-integ']
- ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
- params['ike-dh']
-
- esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
- params['esp-crypto']
- esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
- params['esp-integ']
-
- self.sa.set_ike_props(
- crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
- integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
- self.sa.set_esp_props(
- crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
- integ=esp_integ)
+ if is_init:
+ ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
+ params['ike-crypto']
+ ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
+ params['ike-integ']
+ ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
+ params['ike-dh']
+
+ esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
+ params['esp-crypto']
+ esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
+ params['esp-integ']
+
+ self.sa.set_ike_props(
+ crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
+ integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
+ self.sa.set_esp_props(
+ crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
+ integ=esp_integ)
class TestApi(VppTestCase):
p.set_lifetime_data(cfg['lifetime_data'])
if 'tun_itf' in cfg:
p.set_tunnel_interface(cfg['tun_itf'])
+ if 'natt_disabled' in cfg and cfg['natt_disabled']:
+ p.disable_natt()
p.add_vpp_config()
return p
conf = {
'p1': {
'name': 'p1',
+ 'natt_disabled': True,
'loc_id': ('fqdn', b'vpp.home'),
'rem_id': ('fqdn', b'roadwarrior.example.com'),
'loc_ts': loc_ts4,
self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
self.verify_auth(ap.auth, cp['auth'])
+ natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
+ self.assertTrue(natt_dis == ap.natt_disabled)
+
if 'lifetime_data' in cp:
self.verify_lifetime_data(ap, cp['lifetime_data'])
self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
self.assertEqual(ap.tun_itf, 0xffffffff)
+@tag_fixme_vpp_workers
+class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
+ """ test responder - responder behind NAT """
+
+ IKE_NODE_SUFFIX = 'ip4-natt'
+
+ def config_tc(self):
+ self.config_params({'r_natt': True})
+
+
+@tag_fixme_vpp_workers
+class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
+ """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
+
+ def config_tc(self):
+ self.config_params({
+ 'i_natt': True,
+ 'is_initiator': False, # seen from test case perspective
+ # thus vpp is initiator
+ 'responder': {'sw_if_index': self.pg0.sw_if_index,
+ 'addr': self.pg0.remote_ip4},
+ 'ike-crypto': ('AES-GCM-16ICV', 32),
+ 'ike-integ': 'NULL',
+ 'ike-dh': '3072MODPgr',
+ 'ike_transforms': {
+ 'crypto_alg': 20, # "aes-gcm-16"
+ 'crypto_key_size': 256,
+ 'dh_group': 15, # "modp-3072"
+ },
+ 'esp_transforms': {
+ 'crypto_alg': 12, # "aes-cbc"
+ 'crypto_key_size': 256,
+ # "hmac-sha2-256-128"
+ 'integ_alg': 12}})
+
+
+@tag_fixme_vpp_workers
class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - pre shared key auth """
+
+ def config_tc(self):
+ self.config_params({
+ 'is_initiator': False, # seen from test case perspective
+ # thus vpp is initiator
+ 'ike-crypto': ('AES-GCM-16ICV', 32),
+ 'ike-integ': 'NULL',
+ 'ike-dh': '3072MODPgr',
+ 'ike_transforms': {
+ 'crypto_alg': 20, # "aes-gcm-16"
+ 'crypto_key_size': 256,
+ 'dh_group': 15, # "modp-3072"
+ },
+ 'esp_transforms': {
+ 'crypto_alg': 12, # "aes-cbc"
+ 'crypto_key_size': 256,
+ # "hmac-sha2-256-128"
+ 'integ_alg': 12},
+ 'responder_hostname': {'hostname': 'vpp.responder.org',
+ 'sw_if_index': self.pg0.sw_if_index}})
+
+
+@tag_fixme_vpp_workers
+class TestInitiatorRequestWindowSize(TestInitiatorPsk):
+ """ test initiator - request window size (1) """
+
+ def rekey_respond(self, req, update_child_sa_data):
+ ih = self.get_ike_header(req)
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ if update_child_sa_data:
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ self.sa.r_nonce = self.sa.i_nonce
+ self.sa.child_sas[0].ispi = prop.SPI
+ self.sa.child_sas[0].rspi = prop.SPI
+ self.sa.calc_child_keys()
+
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type=36,
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, sa, 'SA')
+ packet = self.create_packet(self.pg0, resp, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.send_and_assert_no_replies(self.pg0, packet)
+
+ def test_initiator(self):
+ super(TestInitiatorRequestWindowSize, self).test_initiator()
+ self.pg0.enable_capture()
+ self.pg_start()
+ ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ capture = self.pg0.get_capture(2)
+
+ # reply in reverse order
+ self.rekey_respond(capture[1], True)
+ self.rekey_respond(capture[0], False)
+
+ # verify that only the second request was accepted
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(is_rekey=True)
+
+
+@tag_fixme_vpp_workers
+class TestInitiatorRekey(TestInitiatorPsk):
+ """ test ikev2 initiator - rekey """
+
+ def rekey_from_initiator(self):
+ ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.exch_type, 36) # CHILD_SA
+ self.assertNotIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ self.sa.r_nonce = self.sa.i_nonce
+ # update new responder SPI
+ self.sa.child_sas[0].ispi = prop.SPI
+ self.sa.child_sas[0].rspi = prop.SPI
+ self.sa.calc_child_keys()
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type=36,
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, sa, 'SA')
+ packet = self.create_packet(self.pg0, resp, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.send_and_assert_no_replies(self.pg0, packet)
+
+ def test_initiator(self):
+ super(TestInitiatorRekey, self).test_initiator()
+ self.rekey_from_initiator()
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(is_rekey=True)
+
+
+@tag_fixme_vpp_workers
+class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
+ """ test ikev2 initiator - delete IKE SA from responder """
+
def config_tc(self):
self.config_params({
+ 'del_sa_from_responder': True,
'is_initiator': False, # seen from test case perspective
# thus vpp is initiator
'responder': {'sw_if_index': self.pg0.sw_if_index,
'integ_alg': 12}})
-class TestResponderNATT(TemplateResponder, Ikev2Params):
- """ test ikev2 responder - nat traversal """
+@tag_fixme_vpp_workers
+class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
+ """ test ikev2 responder - initiator behind NAT """
+
+ IKE_NODE_SUFFIX = 'ip4-natt'
+
def config_tc(self):
self.config_params(
- {'natt': True})
+ {'i_natt': True})
+@tag_fixme_vpp_workers
class TestResponderPsk(TemplateResponder, Ikev2Params):
""" test ikev2 responder - pre shared key auth """
def config_tc(self):
self.config_params()
+@tag_fixme_vpp_workers
+class TestResponderDpd(TestResponderPsk):
+ """
+ Dead peer detection test
+ """
+ def config_tc(self):
+ self.config_params({'dpd_disabled': False})
+
+ def tearDown(self):
+ pass
+
+ def test_responder(self):
+ self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+ super(TestResponderDpd, self).test_responder()
+ self.pg0.enable_capture()
+ self.pg_start()
+ # capture empty request but don't reply
+ capture = self.pg0.get_capture(expected_count=1, timeout=5)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+ # wait for SA expiration
+ time.sleep(3)
+ ike_sas = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(ike_sas), 0)
+ ipsec_sas = self.vapi.ipsec_sa_dump()
+ self.assertEqual(len(ipsec_sas), 0)
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekey(TestResponderPsk):
+ """ test ikev2 responder - rekey """
+
+ def rekey_from_initiator(self):
+ packet = self.create_rekey_request()
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ # update new responder SPI
+ self.sa.child_sas[0].rspi = prop.SPI
+
+ def test_responder(self):
+ super(TestResponderRekey, self).test_responder()
+ self.rekey_from_initiator()
+ self.sa.calc_child_keys()
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(is_rekey=True)
+ self.assert_counter(1, 'rekey_req', 'ip4')
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+
+
+class TestResponderVrf(TestResponderPsk, Ikev2Params):
+ """ test ikev2 responder - non-default table id """
+
+ @classmethod
+ def setUpClass(cls):
+ import scapy.contrib.ikev2 as _ikev2
+ globals()['ikev2'] = _ikev2
+ super(IkePeer, cls).setUpClass()
+ cls.create_pg_interfaces(range(1))
+ cls.vapi.cli("ip table add 1")
+ cls.vapi.cli("set interface ip table pg0 1")
+ for i in cls.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def config_tc(self):
+ self.config_params({'dpd_disabled': False})
+
+ def test_responder(self):
+ self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+ super(TestResponderVrf, self).test_responder()
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(expected_count=1, timeout=5)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+
+@tag_fixme_vpp_workers
class TestResponderRsaSign(TemplateResponder, Ikev2Params):
""" test ikev2 responder - cert based auth """
def config_tc(self):
self.config_params({
+ 'udp_encap': True,
'auth': 'rsa-sig',
'server-key': 'server-key.pem',
'client-key': 'client-key.pem',
'server-cert': 'server-cert.pem'})
+@tag_fixme_vpp_workers
class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
(TemplateResponder, Ikev2Params):
"""
'ike-dh': '2048MODPgr'})
+@tag_fixme_vpp_workers
class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
(TemplateResponder, Ikev2Params):
+
"""
IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
"""
'ike-dh': '3072MODPgr'})
+@tag_fixme_vpp_workers
class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
"""
IKE:AES_GCM_16_256
"""
+
+ IKE_NODE_SUFFIX = 'ip6'
+
def config_tc(self):
self.config_params({
+ 'del_sa_from_responder': True,
'ip6': True,
'natt': True,
'ike-crypto': ('AES-GCM-16ICV', 32),
'end_addr': '11::100'}})
+@tag_fixme_vpp_workers
+class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
+ """
+ Test for keep alive messages
+ """
+
+ def send_empty_req_from_responder(self):
+ packet = self.create_empty_request()
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.id, self.sa.msg_id)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+ self.assert_counter(1, 'keepalive', 'ip4')
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(1, r[0].sa.stats.n_keepalives)
+
+ def test_initiator(self):
+ super(TestInitiatorKeepaliveMsg, self).test_initiator()
+ self.send_empty_req_from_responder()
+
+
class TestMalformedMessages(TemplateResponder, Ikev2Params):
""" malformed packet test """
def config_tc(self):
self.config_params()
- def assert_counter(self, count, name, version='ip4'):
- node_name = '/err/ikev2-%s/' % version + name
- self.assertEqual(count, self.statistics.get_err_counter(node_name))
-
def create_ike_init_msg(self, length=None, payload=None):
msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
flags='Initiator', exch_type='IKE_SA_INIT')
def verify_bad_packet_length(self):
ike_msg = self.create_ike_init_msg(length=0xdead)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'Bad packet length')
+ self.assert_counter(self.pkt_count, 'bad_length')
def verify_bad_sa_payload_length(self):
p = ikev2.IKEv2_payload_SA(length=0xdead)
ike_msg = self.create_ike_init_msg(payload=p)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'Malformed packet')
+ self.assert_counter(self.pkt_count, 'malformed_packet')
def test_responder(self):
self.pkt_count = 254