algorithms,
modes,
)
-from ipaddress import IPv4Address
+from ipaddress import IPv4Address, IPv6Address, ip_address
from scapy.layers.ipsec import ESP
from scapy.layers.inet import IP, UDP, Ether
+from scapy.layers.inet6 import IPv6
from scapy.packet import raw, Raw
from scapy.utils import long_converter
from framework import VppTestCase, VppTestRunner
from vpp_ikev2 import Profile, IDType, AuthMethod
from vpp_papi import VppEnum
+try:
+ text_type = unicode
+except NameError:
+ text_type = str
KEY_PAD = b"Key Pad for IKEv2"
SALT_SIZE = 4
def pad(self, data):
pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
data = data + b'\x00' * (pad_len - 1)
- return data + bytes([pad_len])
+ return data + bytes([pad_len - 1])
class AuthAlgo(object):
class IKEv2SA(object):
- def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
- i_id=None, r_id=None, id_type='fqdn', nonce=None,
- auth_data=None, local_ts=None, remote_ts=None,
+ def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
+ spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
+ nonce=None, auth_data=None, local_ts=None, remote_ts=None,
auth_method='shared-key', priv_key=None, natt=False):
self.natt = natt
if natt:
else:
self.sport = 500
self.dport = 500
+ self.msg_id = 0
self.dh_params = None
self.test = test
self.priv_key = priv_key
self.id_type = id_type
self.auth_method = auth_method
if self.is_initiator:
- self.rspi = None
+ self.rspi = 8 * b'\x00'
self.ispi = spi
self.i_nonce = nonce
else:
self.rspi = spi
- self.ispi = None
+ self.ispi = 8 * b'\x00'
self.r_nonce = None
self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
+ def new_msg_id(self):
+ self.msg_id += 1
+ return self.msg_id
+
def dh_pub_key(self):
return self.i_dh_data
ct = ep.load[:-integ_trunc]
return self.decrypt(ct)
- def generate_ts(self):
+ def build_ts_addr(self, ts, version):
+ return {'starting_address_v' + version: ts['start_addr'],
+ 'ending_address_v' + version: ts['end_addr']}
+
+ def generate_ts(self, is_ip4):
c = self.child_sas[0]
- ts1 = ikev2.IPv4TrafficSelector(
- IP_protocol_ID=0,
- start_port=0,
- end_port=0xffff,
- starting_address_v4=c.local_ts['start_addr'],
- ending_address_v4=c.local_ts['end_addr'])
- ts2 = ikev2.IPv4TrafficSelector(
- IP_protocol_ID=0,
- starting_address_v4=c.remote_ts['start_addr'],
- ending_address_v4=c.remote_ts['end_addr'])
+ ts_data = {'IP_protocol_ID': 0,
+ 'start_port': 0,
+ 'end_port': 0xffff}
+ if is_ip4:
+ ts_data.update(self.build_ts_addr(c.local_ts, '4'))
+ ts1 = ikev2.IPv4TrafficSelector(**ts_data)
+ ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
+ ts2 = ikev2.IPv4TrafficSelector(**ts_data)
+ else:
+ ts_data.update(self.build_ts_addr(c.local_ts, '6'))
+ ts1 = ikev2.IPv6TrafficSelector(**ts_data)
+ ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
+ ts2 = ikev2.IPv6TrafficSelector(**ts_data)
return ([ts1], [ts2])
def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
return self.crypto_attr(self.esp_crypto_key_len)
def compute_nat_sha1(self, ip, port):
- data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
+ data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
digest.update(data)
return digest.finalize()
i.admin_up()
i.config_ip4()
i.resolve_arp()
+ i.config_ip6()
+ i.resolve_ndp()
@classmethod
def tearDownClass(cls):
self.p.add_vpp_config()
self.assertIsNotNone(self.p.query_vpp_config())
self.sa.generate_dh_data()
+ self.vapi.cli('ikev2 set logging level 4')
+ self.vapi.cli('event-lo clear')
def tearDown(self):
super(TemplateResponder, self).tearDown()
+ if self.sa.is_initiator:
+ self.initiate_del_sa()
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(r), 0)
+
self.p.remove_vpp_config()
self.assertIsNone(self.p.query_vpp_config())
- def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
+ def verify_del_sa(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 37) # exchange informational
+
+ def initiate_del_sa(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Initiator', exch_type='INFORMATIONAL',
+ id=self.sa.new_msg_id())
+ del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+ ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+ packet = self.create_packet(self.pg0, ike_msg,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_del_sa(capture[0])
+
+ def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
+ use_ip6=False):
+ if use_ip6:
+ src_ip = src_if.remote_ip6
+ dst_ip = src_if.local_ip6
+ ip_layer = IPv6
+ else:
+ src_ip = src_if.remote_ip4
+ dst_ip = src_if.local_ip4
+ ip_layer = IP
res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
+ ip_layer(src=src_ip, dst=dst_ip) /
UDP(sport=sport, dport=dport))
if natt:
# insert non ESP marker
load=self.sa.i_nonce))
if behind_nat:
- src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
- self.sa.sport)
- nat_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP',
- load=src_nat)
- self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
-
- ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
- self.sa.sport, self.sa.dport,
- self.sa.natt)
+ src_address = b'\x0a\x0a\x0a\x01'
+ else:
+ src_address = bytes(self.pg0.local_ip4, 'ascii')
+
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
+ self.sa.sport)
+ nat_src_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat)
+ nat_dst_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
+ self.sa.init_req_packet = (self.sa.init_req_packet /
+ nat_src_detection /
+ nat_dst_detection)
+
+ ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
self.pg0.add_stream(ike_msg)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
self.verify_sa_init(capture[0])
+ def encrypt_ike_msg(self, header, plain, first_payload):
+ if self.sa.ike_crypto == 'AES-GCM-16ICV':
+ data = self.sa.ike_crypto_alg.pad(raw(plain))
+ plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
+ len(ikev2.IKEv2_payload_Encrypted())
+ tlen = plen + len(ikev2.IKEv2())
+
+ # prepare aad data
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
+ length=plen)
+ header.length = tlen
+ res = header / sk_p
+ encr = self.sa.encrypt(raw(plain), raw(res))
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
+ length=plen, load=encr)
+ res = header / sk_p
+ else:
+ encr = self.sa.encrypt(raw(plain))
+ trunc_len = self.sa.ike_integ_alg.trunc_len
+ plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
+ tlen = plen + len(ikev2.IKEv2())
+
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
+ length=plen, load=encr)
+ header.length = tlen
+ res = header / sk_p
+
+ integ_data = raw(res)
+ hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
+ self.sa.my_authkey, integ_data)
+ res = res / Raw(hmac_data[:trunc_len])
+ assert(len(res) == tlen)
+ return res
+
def send_sa_auth(self):
tr_attr = self.sa.esp_crypto_attr()
trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
- tsi, tsr = self.sa.generate_ts()
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
IDtype=self.sa.id_type, load=self.sa.i_id) /
ikev2.IKEv2_payload_IDr(next_payload='AUTH',
traffic_selector=tsr) /
ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
- if self.sa.ike_crypto == 'AES-GCM-16ICV':
- data = self.sa.ike_crypto_alg.pad(raw(plain))
- plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
- len(ikev2.IKEv2_payload_Encrypted())
- tlen = plen + len(ikev2.IKEv2())
-
- # prepare aad data
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
- length=plen)
- sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=1,
- length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
- sa_auth /= sk_p
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Initiator', exch_type='IKE_AUTH')
- encr = self.sa.encrypt(raw(plain), raw(sa_auth))
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
- length=plen, load=encr)
- sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=1,
- length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
- sa_auth /= sk_p
- else:
- encr = self.sa.encrypt(raw(plain))
- trunc_len = self.sa.ike_integ_alg.trunc_len
- plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
- tlen = plen + len(ikev2.IKEv2())
-
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
- length=plen, load=encr)
- sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=1,
- length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
- sa_auth /= sk_p
-
- integ_data = raw(sa_auth)
- hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
- self.sa.my_authkey, integ_data)
- sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
-
- assert(len(sa_auth) == tlen)
- packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
- self.sa.dport, self.sa.natt)
+ ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
+ packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
# ipsec register for port 4500
esp = packet[ESP]
ih = self.verify_and_remove_non_esp_marker(esp)
+
+ self.assertEqual(ih.version, 0x20)
return ih
def verify_sa_init(self, packet):
ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 34)
self.assertTrue('Response' in ih.flags)
self.assertEqual(ih.init_SPI, self.sa.ispi)
ike = self.get_ike_header(packet)
udp = packet[UDP]
self.verify_udp(udp)
+ self.assertEqual(ike.id, self.sa.msg_id)
plain = self.sa.hmac_and_decrypt(ike)
self.sa.calc_child_keys()
r = self.vapi.ikev2_sa_dump()
self.assertEqual(len(r), 1)
sa = r[0].sa
- self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'little'))
+ self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
- self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
- self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+ if self.ip6:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
+ else:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
self.verify_keymat(sa.keys, self.sa, 'sk_d')
self.verify_keymat(sa.keys, self.sa, 'sk_ai')
self.verify_keymat(sa.keys, self.sa, 'sk_ar')
self.verify_keymat(csa.keys, c, 'sk_ei')
self.verify_keymat(csa.keys, c, 'sk_er')
- tsi, tsr = self.sa.generate_ts()
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
tsi = tsi[0]
tsr = tsr[0]
r = self.vapi.ikev2_traffic_selector_dump(
self.assertTrue(api_ts.is_local)
else:
self.assertFalse(api_ts.is_local)
- self.assertEqual(api_ts.start_addr,
- IPv4Address(ts.starting_address_v4))
- self.assertEqual(api_ts.end_addr,
- IPv4Address(ts.ending_address_v4))
+
+ if self.p.ts_is_ip4:
+ self.assertEqual(api_ts.start_addr,
+ IPv4Address(ts.starting_address_v4))
+ self.assertEqual(api_ts.end_addr,
+ IPv4Address(ts.ending_address_v4))
+ else:
+ self.assertEqual(api_ts.start_addr,
+ IPv6Address(ts.starting_address_v6))
+ self.assertEqual(api_ts.end_addr,
+ IPv6Address(ts.ending_address_v6))
self.assertEqual(api_ts.start_port, ts.start_port)
self.assertEqual(api_ts.end_port, ts.end_port)
self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
is_natt = 'natt' in params and params['natt'] or False
self.p = Profile(self, 'pr1')
+ self.ip6 = False if 'ip6' not in params else params['ip6']
if 'auth' in params and params['auth'] == 'rsa-sig':
auth_method = 'rsa-sig'
self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
- self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
- self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
+ loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
+ 'loc_ts' not in params else params['loc_ts']
+ rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
+ 'rem_ts' not in params else params['rem_ts']
+ self.p.add_local_ts(**loc_ts)
+ self.p.add_remote_ts(**rem_ts)
self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
r_id=self.p.local_id['data'],
def test_profile_api(self):
""" test profile dump API """
- loc_ts = {
+ loc_ts4 = {
'proto': 8,
'start_port': 1,
'end_port': 19,
'start_addr': '3.3.3.2',
'end_addr': '3.3.3.3',
}
- rem_ts = {
+ rem_ts4 = {
'proto': 9,
'start_port': 10,
'end_port': 119,
'end_addr': '2.3.4.6',
}
+ loc_ts6 = {
+ 'proto': 8,
+ 'start_port': 1,
+ 'end_port': 19,
+ 'start_addr': 'ab::1',
+ 'end_addr': 'ab::4',
+ }
+ rem_ts6 = {
+ 'proto': 9,
+ 'start_port': 10,
+ 'end_port': 119,
+ 'start_addr': 'cd::12',
+ 'end_addr': 'cd::13',
+ }
+
conf = {
'p1': {
'name': 'p1',
'loc_id': ('fqdn', b'vpp.home'),
'rem_id': ('fqdn', b'roadwarrior.example.com'),
- 'loc_ts': loc_ts,
- 'rem_ts': rem_ts,
- 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
+ 'loc_ts': loc_ts4,
+ 'rem_ts': rem_ts4,
+ 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
'ike_ts': {
'crypto_alg': 20,
'crypto_key_size': 32,
'p2': {
'name': 'p2',
'loc_id': ('ip4-addr', b'192.168.2.1'),
- 'rem_id': ('ip4-addr', b'192.168.2.2'),
- 'loc_ts': loc_ts,
- 'rem_ts': rem_ts,
- 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
+ 'rem_id': ('ip6-addr', b'abcd::1'),
+ 'loc_ts': loc_ts6,
+ 'rem_ts': rem_ts6,
+ 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
'ike_ts': {
'crypto_alg': 12,
'crypto_key_size': 16,
self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
- self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
- self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
+ self.assertEqual(api_ts.start_addr,
+ ip_address(text_type(cfg_ts['start_addr'])))
+ self.assertEqual(api_ts.end_addr,
+ ip_address(text_type(cfg_ts['end_addr'])))
def verify_responder(self, api_r, cfg_r):
self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
- self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
+ self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
def verify_transforms(self, api_ts, cfg_ts):
self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
"""
def config_tc(self):
self.config_params({
+ 'ip6': True,
+ 'natt': True,
'ike-crypto': ('AES-GCM-16ICV', 32),
'ike-integ': 'NULL',
- 'ike-dh': '2048MODPgr'})
+ 'ike-dh': '2048MODPgr',
+ 'loc_ts': {'start_addr': 'ab:cd::0',
+ 'end_addr': 'ab:cd::10'},
+ 'rem_ts': {'start_addr': '11::0',
+ 'end_addr': '11::100'}})
+
+
+class TestMalformedMessages(TemplateResponder, Ikev2Params):
+ """ malformed packet test """
+
+ def tearDown(self):
+ pass
+
+ def config_tc(self):
+ self.config_params()
+
+ def assert_counter(self, count, name, version='ip4'):
+ node_name = '/err/ikev2-%s/' % version + name
+ self.assertEqual(count, self.statistics.get_err_counter(node_name))
+
+ def create_ike_init_msg(self, length=None, payload=None):
+ msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
+ flags='Initiator', exch_type='IKE_SA_INIT')
+ if payload is not None:
+ msg /= payload
+ return self.create_packet(self.pg0, msg, self.sa.sport,
+ self.sa.dport)
+
+ def verify_bad_packet_length(self):
+ ike_msg = self.create_ike_init_msg(length=0xdead)
+ self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
+ self.assert_counter(self.pkt_count, 'Bad packet length')
+
+ def verify_bad_sa_payload_length(self):
+ p = ikev2.IKEv2_payload_SA(length=0xdead)
+ ike_msg = self.create_ike_init_msg(payload=p)
+ self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
+ self.assert_counter(self.pkt_count, 'Malformed packet')
+
+ def test_responder(self):
+ self.pkt_count = 254
+ self.verify_bad_packet_length()
+ self.verify_bad_sa_payload_length()
if __name__ == '__main__':