algorithms,
modes,
)
+from ipaddress import IPv4Address
from scapy.layers.ipsec import ESP
from scapy.layers.inet import IP, UDP, Ether
from scapy.packet import raw, Raw
from scapy.utils import long_converter
from framework import VppTestCase, VppTestRunner
from vpp_ikev2 import Profile, IDType, AuthMethod
+from vpp_papi import VppEnum
KEY_PAD = b"Key Pad for IKEv2"
+SALT_SIZE = 4
+GCM_ICV_SIZE = 16
+GCM_IV_SIZE = 8
# defined in rfc3526
670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
- 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256)
+ 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
+
+ '3072MODPgr': (long_converter("""
+ FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
+ 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
+ EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
+ E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
+ EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
+ C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
+ 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
+ 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
+ E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
+ DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
+ 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
+ ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
+ ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
+ F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
+ BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
+ 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
}
if self.cipher is not None:
self.bs = self.cipher.block_size // 8
- def encrypt(self, data, key):
- iv = os.urandom(self.bs)
- encryptor = Cipher(self.cipher(key), self.mode(iv),
- default_backend()).encryptor()
- return iv + encryptor.update(data) + encryptor.finalize()
-
- def decrypt(self, data, key, icv=None):
- iv = data[:self.bs]
- ct = data[self.bs:]
- decryptor = Cipher(algorithms.AES(key),
- modes.CBC(iv),
- default_backend()).decryptor()
- return decryptor.update(ct) + decryptor.finalize()
+ if self.name == 'AES-GCM-16ICV':
+ self.iv_len = GCM_IV_SIZE
+ else:
+ self.iv_len = self.bs
+
+ def encrypt(self, data, key, aad=None):
+ iv = os.urandom(self.iv_len)
+ if aad is None:
+ encryptor = Cipher(self.cipher(key), self.mode(iv),
+ default_backend()).encryptor()
+ return iv + encryptor.update(data) + encryptor.finalize()
+ else:
+ salt = key[-SALT_SIZE:]
+ nonce = salt + iv
+ encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
+ default_backend()).encryptor()
+ encryptor.authenticate_additional_data(aad)
+ data = encryptor.update(data) + encryptor.finalize()
+ data += encryptor.tag[:GCM_ICV_SIZE]
+ return iv + data
+
+ def decrypt(self, data, key, aad=None, icv=None):
+ if aad is None:
+ iv = data[:self.iv_len]
+ ct = data[self.iv_len:]
+ decryptor = Cipher(algorithms.AES(key),
+ self.mode(iv),
+ default_backend()).decryptor()
+ return decryptor.update(ct) + decryptor.finalize()
+ else:
+ salt = key[-SALT_SIZE:]
+ nonce = salt + data[:GCM_IV_SIZE]
+ ct = data[GCM_IV_SIZE:]
+ key = key[:-SALT_SIZE]
+ decryptor = Cipher(algorithms.AES(key),
+ self.mode(nonce, icv, len(icv)),
+ default_backend()).decryptor()
+ decryptor.authenticate_additional_data(aad)
+ pt = decryptor.update(ct) + decryptor.finalize()
+ pad_len = pt[-1] + 1
+ return pt[:-pad_len]
def pad(self, data):
pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
CRYPTO_ALGOS = {
'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
+ 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
+ mode=modes.GCM),
}
AUTH_ALGOS = {
'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
+ 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
+ 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
+ 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
}
PRF_ALGOS = {
c = self.child_sas[0]
encr_key_len = self.esp_crypto_key_len
- integ_key_len = self.ike_integ_alg.key_len
+ integ_key_len = self.esp_integ_alg.key_len
+ salt_len = 0 if integ_key_len else 4
+
l = (integ_key_len * 2 +
- encr_key_len * 2)
+ encr_key_len * 2 +
+ salt_len * 2)
keymat = self.calc_prfplus(prf, self.sk_d, s, l)
pos = 0
c.sk_ei = keymat[pos:pos+encr_key_len]
pos += encr_key_len
- c.sk_ai = keymat[pos:pos+integ_key_len]
- pos += integ_key_len
+ if integ_key_len:
+ c.sk_ai = keymat[pos:pos+integ_key_len]
+ pos += integ_key_len
+ else:
+ c.salt_ei = keymat[pos:pos+salt_len]
+ pos += salt_len
c.sk_er = keymat[pos:pos+encr_key_len]
pos += encr_key_len
- c.sk_ar = keymat[pos:pos+integ_key_len]
- pos += integ_key_len
+ if integ_key_len:
+ c.sk_ar = keymat[pos:pos+integ_key_len]
+ pos += integ_key_len
+ else:
+ c.salt_er = keymat[pos:pos+salt_len]
+ pos += salt_len
def calc_prfplus(self, prf, key, seed, length):
r = b''
return r
def calc_prf(self, prf, key, data):
- h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
+ h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
h.update(data)
return h.finalize()
encr_key_len = self.ike_crypto_key_len
tr_prf_key_len = self.ike_prf_alg.key_len
integ_key_len = self.ike_integ_alg.key_len
+ if integ_key_len == 0:
+ salt_size = 4
+ else:
+ salt_size = 0
+
l = (prf_key_trunc +
integ_key_len * 2 +
encr_key_len * 2 +
- tr_prf_key_len * 2)
+ tr_prf_key_len * 2 +
+ salt_size * 2)
keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
pos = 0
self.sk_ar = keymat[pos:pos+integ_key_len]
pos += integ_key_len
- self.sk_ei = keymat[pos:pos+encr_key_len]
- pos += encr_key_len
- self.sk_er = keymat[pos:pos+encr_key_len]
- pos += encr_key_len
+ self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
+ pos += encr_key_len + salt_size
+ self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
+ pos += encr_key_len + salt_size
self.sk_pi = keymat[pos:pos+tr_prf_key_len]
pos += tr_prf_key_len
else:
raise TypeError('unknown auth method type!')
- def encrypt(self, data):
+ def encrypt(self, data, aad=None):
data = self.ike_crypto_alg.pad(data)
- return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
+ return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
@property
def peer_authkey(self):
return self.sk_er
return self.sk_ei
+ def concat(self, alg, key_len):
+ return alg + '-' + str(key_len * 8)
+
+ @property
+ def vpp_ike_cypto_alg(self):
+ return self.concat(self.ike_crypto, self.ike_crypto_key_len)
+
+ @property
+ def vpp_esp_cypto_alg(self):
+ return self.concat(self.esp_crypto, self.esp_crypto_key_len)
+
def verify_hmac(self, ikemsg):
integ_trunc = self.ike_integ_alg.trunc_len
exp_hmac = ikemsg[-integ_trunc:]
h.update(data)
return h.finalize()
- def decrypt(self, data):
- return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
+ def decrypt(self, data, aad=None, icv=None):
+ return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
def hmac_and_decrypt(self, ike):
ep = ike[ikev2.IKEv2_payload_Encrypted]
- self.verify_hmac(raw(ike))
- integ_trunc = self.ike_integ_alg.trunc_len
+ if self.ike_crypto == 'AES-GCM-16ICV':
+ aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
+ ct = ep.load[:-GCM_ICV_SIZE]
+ tag = ep.load[-GCM_ICV_SIZE:]
+ return self.decrypt(ct, raw(ike)[:aad_len], tag)
+ else:
+ self.verify_hmac(raw(ike))
+ integ_trunc = self.ike_integ_alg.trunc_len
- # remove ICV and decrypt payload
- ct = ep.load[:-integ_trunc]
- return self.decrypt(ct)
+ # remove ICV and decrypt payload
+ ct = ep.load[:-integ_trunc]
+ return self.decrypt(ct)
def generate_ts(self):
c = self.child_sas[0]
ts1 = ikev2.IPv4TrafficSelector(
IP_protocol_ID=0,
+ start_port=0,
+ end_port=0xffff,
starting_address_v4=c.local_ts['start_addr'],
ending_address_v4=c.local_ts['end_addr'])
ts2 = ikev2.IPv4TrafficSelector(
if integ not in AUTH_ALGOS:
raise TypeError('unsupported auth algo %r' % integ)
- self.ike_integ = integ
+ self.ike_integ = None if integ == 'NULL' else integ
self.ike_integ_alg = AUTH_ALGOS[integ]
if prf not in PRF_ALGOS:
if integ not in AUTH_ALGOS:
raise TypeError('unsupported auth algo %r' % integ)
- self.esp_integ = integ
+ self.esp_integ = None if integ == 'NULL' else integ
self.esp_integ_alg = AUTH_ALGOS[integ]
def crypto_attr(self, key_len):
- if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
+ if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
return (0x800e << 16 | key_len << 3, 12)
else:
raise Exception('unsupported attribute type')
class TemplateResponder(VppTestCase):
- """ responder test """
+ """ responder test template """
@classmethod
def setUpClass(cls):
super(TemplateResponder, self).setUp()
self.config_tc()
self.p.add_vpp_config()
+ self.assertIsNotNone(self.p.query_vpp_config())
self.sa.generate_dh_data()
+ def tearDown(self):
+ super(TemplateResponder, self).tearDown()
+ self.p.remove_vpp_config()
+ self.assertIsNone(self.p.query_vpp_config())
+
def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
number_of_TSs=len(tsr),
traffic_selector=tsr) /
ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
- encr = self.sa.encrypt(raw(plain))
-
- trunc_len = self.sa.ike_integ_alg.trunc_len
- plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
- tlen = plen + len(ikev2.IKEv2())
-
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
- length=plen, load=encr)
- sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
- sa_auth /= sk_p
-
- integ_data = raw(sa_auth)
- hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
- self.sa.my_authkey, integ_data)
- sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
- assert(len(sa_auth) == tlen)
+ if self.sa.ike_crypto == 'AES-GCM-16ICV':
+ data = self.sa.ike_crypto_alg.pad(raw(plain))
+ plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
+ len(ikev2.IKEv2_payload_Encrypted())
+ tlen = plen + len(ikev2.IKEv2())
+
+ # prepare aad data
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
+ length=plen)
+ sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=1,
+ length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
+ sa_auth /= sk_p
+
+ encr = self.sa.encrypt(raw(plain), raw(sa_auth))
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
+ length=plen, load=encr)
+ sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=1,
+ length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
+ sa_auth /= sk_p
+ else:
+ encr = self.sa.encrypt(raw(plain))
+ trunc_len = self.sa.ike_integ_alg.trunc_len
+ plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
+ tlen = plen + len(ikev2.IKEv2())
+
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
+ length=plen, load=encr)
+ sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=1,
+ length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
+ sa_auth /= sk_p
+
+ integ_data = raw(sa_auth)
+ hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
+ self.sa.my_authkey, integ_data)
+ sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
+
+ assert(len(sa_auth) == tlen)
packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
self.sa.dport, self.sa.natt)
self.pg0.add_stream(packet)
sa = ih[ikev2.IKEv2_payload_SA]
self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
- except AttributeError as e:
+ except IndexError as e:
self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
+ self.logger.error(ih.show())
raise
self.sa.complete_dh_data()
self.sa.calc_keys()
plain = self.sa.hmac_and_decrypt(ike)
self.sa.calc_child_keys()
- def verify_child_sas(self):
+ def verify_ipsec_sas(self):
sas = self.vapi.ipsec_sa_dump()
self.assertEqual(len(sas), 2)
sa0 = sas[0].entry
sa1 = sas[1].entry
c = self.sa.child_sas[0]
+ vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
+ self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
+ self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
+
+ if self.sa.esp_integ is None:
+ vpp_integ_alg = 0
+ else:
+ vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
+ self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
+ self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
+
# verify crypto keys
self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
# verify integ keys
- self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
- self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
- self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
- self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
+ if vpp_integ_alg:
+ self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
+ self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
+ self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
+ self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
+ else:
+ self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
+ self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
+
+ def verify_keymat(self, api_keys, keys, name):
+ km = getattr(keys, name)
+ api_km = getattr(api_keys, name)
+ api_km_len = getattr(api_keys, name + '_len')
+ self.assertEqual(len(km), api_km_len)
+ self.assertEqual(km, api_km[:api_km_len])
+
+ def verify_id(self, api_id, exp_id):
+ self.assertEqual(api_id.type, IDType.value(exp_id.type))
+ self.assertEqual(api_id.data_len, exp_id.data_len)
+ self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
+
+ def verify_ike_sas(self):
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(r), 1)
+ sa = r[0].sa
+ self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'little'))
+ self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+ self.verify_keymat(sa.keys, self.sa, 'sk_d')
+ self.verify_keymat(sa.keys, self.sa, 'sk_ai')
+ self.verify_keymat(sa.keys, self.sa, 'sk_ar')
+ self.verify_keymat(sa.keys, self.sa, 'sk_ei')
+ self.verify_keymat(sa.keys, self.sa, 'sk_er')
+ self.verify_keymat(sa.keys, self.sa, 'sk_pi')
+ self.verify_keymat(sa.keys, self.sa, 'sk_pr')
+
+ self.assertEqual(sa.i_id.type, self.sa.id_type)
+ self.assertEqual(sa.r_id.type, self.sa.id_type)
+ self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
+ self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
+ self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
+ self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
+
+ r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
+ self.assertEqual(len(r), 1)
+ csa = r[0].child_sa
+ self.assertEqual(csa.sa_index, sa.sa_index)
+ c = self.sa.child_sas[0]
+ if hasattr(c, 'sk_ai'):
+ self.verify_keymat(csa.keys, c, 'sk_ai')
+ self.verify_keymat(csa.keys, c, 'sk_ar')
+ self.verify_keymat(csa.keys, c, 'sk_ei')
+ self.verify_keymat(csa.keys, c, 'sk_er')
+
+ tsi, tsr = self.sa.generate_ts()
+ tsi = tsi[0]
+ tsr = tsr[0]
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=True, sa_index=sa.sa_index,
+ child_sa_index=csa.child_sa_index)
+ self.assertEqual(len(r), 1)
+ ts = r[0].ts
+ self.verify_ts(r[0].ts, tsi[0], True)
+
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=False, sa_index=sa.sa_index,
+ child_sa_index=csa.child_sa_index)
+ self.assertEqual(len(r), 1)
+ self.verify_ts(r[0].ts, tsr[0], False)
+
+ n = self.vapi.ikev2_nonce_get(is_initiator=True,
+ sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.i_nonce)
+ n = self.vapi.ikev2_nonce_get(is_initiator=False,
+ sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.r_nonce)
+
+ def verify_nonce(self, api_nonce, nonce):
+ self.assertEqual(api_nonce.data_len, len(nonce))
+ self.assertEqual(api_nonce.nonce, nonce)
+
+ def verify_ts(self, api_ts, ts, is_initiator):
+ if is_initiator:
+ self.assertTrue(api_ts.is_local)
+ else:
+ self.assertFalse(api_ts.is_local)
+ self.assertEqual(api_ts.start_addr,
+ IPv4Address(ts.starting_address_v4))
+ self.assertEqual(api_ts.end_addr,
+ IPv4Address(ts.ending_address_v4))
+ self.assertEqual(api_ts.start_port, ts.start_port)
+ self.assertEqual(api_ts.end_port, ts.end_port)
+ self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
def test_responder(self):
self.send_sa_init(self.sa.natt)
self.send_sa_auth()
- self.verify_child_sas()
+ self.verify_ipsec_sas()
+ self.verify_ike_sas()
class Ikev2Params(object):
def config_params(self, params={}):
+ ec = VppEnum.vl_api_ipsec_crypto_alg_t
+ ei = VppEnum.vl_api_ipsec_integ_alg_t
+ self.vpp_enums = {
+ 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
+ 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
+ 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
+ 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
+ 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
+ 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
+
+ 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
+ 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
+ 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
+ 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
+
is_natt = 'natt' in params and params['natt'] or False
self.p = Profile(self, 'pr1')
self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
- self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
- self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
+ self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
+ self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
r_id=self.p.local_id['data'],
auth_data=auth_data,
local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
- self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
- integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
- dh='2048MODPgr')
- self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
- integ='HMAC-SHA1-96')
+ ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
+ params['ike-crypto']
+ ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
+ params['ike-integ']
+ ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
+
+ esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
+ params['esp-crypto']
+ esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
+ params['esp-integ']
+
+ self.sa.set_ike_props(
+ crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
+ integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
+ self.sa.set_esp_props(
+ crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
+ integ=esp_integ)
+
+
+class TestApi(VppTestCase):
+ """ Test IKEV2 API """
+ @classmethod
+ def setUpClass(cls):
+ super(TestApi, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestApi, cls).tearDownClass()
+
+ def tearDown(self):
+ super(TestApi, self).tearDown()
+ self.p1.remove_vpp_config()
+ self.p2.remove_vpp_config()
+ r = self.vapi.ikev2_profile_dump()
+ self.assertEqual(len(r), 0)
+
+ def configure_profile(self, cfg):
+ p = Profile(self, cfg['name'])
+ p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
+ p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
+ p.add_local_ts(**cfg['loc_ts'])
+ p.add_remote_ts(**cfg['rem_ts'])
+ p.add_responder(cfg['responder'])
+ p.add_ike_transforms(cfg['ike_ts'])
+ p.add_esp_transforms(cfg['esp_ts'])
+ p.add_auth(**cfg['auth'])
+ p.set_udp_encap(cfg['udp_encap'])
+ p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
+ if 'lifetime_data' in cfg:
+ p.set_lifetime_data(cfg['lifetime_data'])
+ if 'tun_itf' in cfg:
+ p.set_tunnel_interface(cfg['tun_itf'])
+ p.add_vpp_config()
+ return p
+
+ def test_profile_api(self):
+ """ test profile dump API """
+ loc_ts = {
+ 'proto': 8,
+ 'start_port': 1,
+ 'end_port': 19,
+ 'start_addr': '3.3.3.2',
+ 'end_addr': '3.3.3.3',
+ }
+ rem_ts = {
+ 'proto': 9,
+ 'start_port': 10,
+ 'end_port': 119,
+ 'start_addr': '4.5.76.80',
+ 'end_addr': '2.3.4.6',
+ }
+
+ conf = {
+ 'p1': {
+ 'name': 'p1',
+ 'loc_id': ('fqdn', b'vpp.home'),
+ 'rem_id': ('fqdn', b'roadwarrior.example.com'),
+ 'loc_ts': loc_ts,
+ 'rem_ts': rem_ts,
+ 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
+ 'ike_ts': {
+ 'crypto_alg': 20,
+ 'crypto_key_size': 32,
+ 'integ_alg': 1,
+ 'dh_group': 1},
+ 'esp_ts': {
+ 'crypto_alg': 13,
+ 'crypto_key_size': 24,
+ 'integ_alg': 2},
+ 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
+ 'udp_encap': True,
+ 'ipsec_over_udp_port': 4501,
+ 'lifetime_data': {
+ 'lifetime': 123,
+ 'lifetime_maxdata': 20192,
+ 'lifetime_jitter': 9,
+ 'handover': 132},
+ },
+ 'p2': {
+ 'name': 'p2',
+ 'loc_id': ('ip4-addr', b'192.168.2.1'),
+ 'rem_id': ('ip4-addr', b'192.168.2.2'),
+ 'loc_ts': loc_ts,
+ 'rem_ts': rem_ts,
+ 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
+ 'ike_ts': {
+ 'crypto_alg': 12,
+ 'crypto_key_size': 16,
+ 'integ_alg': 3,
+ 'dh_group': 3},
+ 'esp_ts': {
+ 'crypto_alg': 9,
+ 'crypto_key_size': 24,
+ 'integ_alg': 4},
+ 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
+ 'udp_encap': False,
+ 'ipsec_over_udp_port': 4600,
+ 'tun_itf': 0}
+ }
+ self.p1 = self.configure_profile(conf['p1'])
+ self.p2 = self.configure_profile(conf['p2'])
+
+ r = self.vapi.ikev2_profile_dump()
+ self.assertEqual(len(r), 2)
+ self.verify_profile(r[0].profile, conf['p1'])
+ self.verify_profile(r[1].profile, conf['p2'])
+
+ def verify_id(self, api_id, cfg_id):
+ self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
+ self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
+
+ def verify_ts(self, api_ts, cfg_ts):
+ self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
+ self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
+ self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
+ self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
+ self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
+
+ def verify_responder(self, api_r, cfg_r):
+ self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
+ self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
+
+ def verify_transforms(self, api_ts, cfg_ts):
+ self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
+ self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
+ self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
+
+ def verify_ike_transforms(self, api_ts, cfg_ts):
+ self.verify_transforms(api_ts, cfg_ts)
+ self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
+
+ def verify_esp_transforms(self, api_ts, cfg_ts):
+ self.verify_transforms(api_ts, cfg_ts)
+
+ def verify_auth(self, api_auth, cfg_auth):
+ self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
+ self.assertEqual(api_auth.data, cfg_auth['data'])
+ self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
+
+ def verify_lifetime_data(self, p, ld):
+ self.assertEqual(p.lifetime, ld['lifetime'])
+ self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
+ self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
+ self.assertEqual(p.handover, ld['handover'])
+
+ def verify_profile(self, ap, cp):
+ self.assertEqual(ap.name, cp['name'])
+ self.assertEqual(ap.udp_encap, cp['udp_encap'])
+ self.verify_id(ap.loc_id, cp['loc_id'])
+ self.verify_id(ap.rem_id, cp['rem_id'])
+ self.verify_ts(ap.loc_ts, cp['loc_ts'])
+ self.verify_ts(ap.rem_ts, cp['rem_ts'])
+ self.verify_responder(ap.responder, cp['responder'])
+ self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
+ self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
+ self.verify_auth(ap.auth, cp['auth'])
+ if 'lifetime_data' in cp:
+ self.verify_lifetime_data(ap, cp['lifetime_data'])
+ self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
+ if 'tun_itf' in cp:
+ self.assertEqual(ap.tun_itf, cp['tun_itf'])
+ else:
+ self.assertEqual(ap.tun_itf, 0xffffffff)
class TestResponderNATT(TemplateResponder, Ikev2Params):
'client-cert': 'client-cert.pem',
'server-cert': 'server-cert.pem'})
+
+class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
+ (TemplateResponder, Ikev2Params):
+ """
+ IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
+ """
+ def config_tc(self):
+ self.config_params({
+ 'ike-crypto': ('AES-CBC', 16),
+ 'ike-integ': 'SHA2-256-128',
+ 'esp-crypto': ('AES-CBC', 24),
+ 'esp-integ': 'SHA2-384-192',
+ 'ike-dh': '2048MODPgr'})
+
+
+class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
+ (TemplateResponder, Ikev2Params):
+ """
+ IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
+ """
+ def config_tc(self):
+ self.config_params({
+ 'ike-crypto': ('AES-CBC', 32),
+ 'ike-integ': 'SHA2-256-128',
+ 'esp-crypto': ('AES-GCM-16ICV', 32),
+ 'esp-integ': 'NULL',
+ 'ike-dh': '3072MODPgr'})
+
+
+class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
+ """
+ IKE:AES_GCM_16_256
+ """
+ def config_tc(self):
+ self.config_params({
+ 'ike-crypto': ('AES-GCM-16ICV', 32),
+ 'ike-integ': 'NULL',
+ 'ike-dh': '2048MODPgr'})
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)