2 from cryptography.hazmat.backends import default_backend
3 from cryptography.hazmat.primitives import hashes, hmac
4 from cryptography.hazmat.primitives.asymmetric import dh
5 from cryptography.hazmat.primitives.ciphers import (
10 from scapy.layers.inet import IP, UDP, Ether
11 from scapy.packet import raw, Raw
12 from scapy.utils import long_converter
13 from framework import VppTestCase, VppTestRunner
14 from vpp_ikev2 import Profile, IDType
17 KEY_PAD = b"Key Pad for IKEv2"
21 # tuple structure is (p, g, key_len)
23 '2048MODPgr': (long_converter("""
24 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
25 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
26 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
27 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
28 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
29 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
30 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
31 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
32 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
33 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
34 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256)
38 class CryptoAlgo(object):
39 def __init__(self, name, cipher, mode):
43 if self.cipher is not None:
44 self.bs = self.cipher.block_size // 8
46 def encrypt(self, data, key):
47 iv = os.urandom(self.bs)
48 encryptor = Cipher(self.cipher(key), self.mode(iv),
49 default_backend()).encryptor()
50 return iv + encryptor.update(data) + encryptor.finalize()
52 def decrypt(self, data, key, icv=None):
55 decryptor = Cipher(algorithms.AES(key),
57 default_backend()).decryptor()
58 return decryptor.update(ct) + decryptor.finalize()
61 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
62 data = data + b'\x00' * (pad_len - 1)
63 return data + bytes([pad_len])
66 class AuthAlgo(object):
67 def __init__(self, name, mac, mod, key_len, trunc_len=None):
71 self.key_len = key_len
72 self.trunc_len = trunc_len or key_len
76 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
77 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
81 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
82 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
86 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
87 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
92 class IKEv2ChildSA(object):
93 def __init__(self, local_ts, remote_ts, spi=None):
94 self.spi = spi or os.urandom(4)
95 self.local_ts = local_ts
96 self.remote_ts = remote_ts
99 class IKEv2SA(object):
100 def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
101 id=None, id_type='fqdn', nonce=None, auth_data=None,
102 local_ts=None, remote_ts=None, auth_method='shared-key'):
103 self.dh_params = None
105 self.is_initiator = is_initiator
106 nonce = nonce or os.urandom(32)
107 self.auth_data = auth_data
108 if isinstance(id_type, str):
109 self.id_type = IDType.value(id_type)
111 self.id_type = id_type
112 self.auth_method = auth_method
113 if self.is_initiator:
123 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
125 def dh_pub_key(self):
126 return self.i_dh_data
128 def compute_secret(self):
129 priv = self.dh_private_key
130 peer = self.r_dh_data
131 p, g, l = self.ike_group
132 return pow(int.from_bytes(peer, 'big'),
133 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
135 def generate_dh_data(self):
137 if self.is_initiator:
138 if self.ike_dh not in DH:
139 raise NotImplementedError('%s not in DH group' % self.ike_dh)
140 if self.dh_params is None:
141 dhg = DH[self.ike_dh]
142 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
143 self.dh_params = pn.parameters(default_backend())
144 priv = self.dh_params.generate_private_key()
145 pub = priv.public_key()
146 x = priv.private_numbers().x
147 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
148 y = pub.public_numbers().y
149 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
151 def complete_dh_data(self):
152 self.dh_shared_secret = self.compute_secret()
154 def calc_child_keys(self):
155 prf = self.ike_prf_alg.mod()
156 s = self.i_nonce + self.r_nonce
157 c = self.child_sas[0]
159 encr_key_len = self.esp_crypto_key_len
160 integ_key_len = self.ike_integ_alg.key_len
161 l = (integ_key_len * 2 +
163 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
166 c.sk_ei = keymat[pos:pos+encr_key_len]
169 c.sk_ai = keymat[pos:pos+integ_key_len]
172 c.sk_er = keymat[pos:pos+encr_key_len]
175 c.sk_ar = keymat[pos:pos+integ_key_len]
178 def calc_prfplus(self, prf, key, seed, length):
182 while len(r) < length and x < 255:
187 s = s + seed + bytes([x])
188 t = self.calc_prf(prf, key, s)
196 def calc_prf(self, prf, key, data):
197 h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
202 prf = self.ike_prf_alg.mod()
203 # SKEYSEED = prf(Ni | Nr, g^ir)
204 s = self.i_nonce + self.r_nonce
205 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
207 # calculate S = Ni | Nr | SPIi SPIr
208 s = s + self.ispi + self.rspi
210 prf_key_trunc = self.ike_prf_alg.trunc_len
211 encr_key_len = self.ike_crypto_key_len
212 tr_prf_key_len = self.ike_prf_alg.key_len
213 integ_key_len = self.ike_integ_alg.key_len
218 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
221 self.sk_d = keymat[:pos+prf_key_trunc]
224 self.sk_ai = keymat[pos:pos+integ_key_len]
226 self.sk_ar = keymat[pos:pos+integ_key_len]
229 self.sk_ei = keymat[pos:pos+encr_key_len]
231 self.sk_er = keymat[pos:pos+encr_key_len]
234 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
235 pos += tr_prf_key_len
236 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
238 def generate_authmsg(self, prf, packet):
239 if self.is_initiator:
243 data = bytes([self.id_type, 0, 0, 0]) + id
244 id_hash = self.calc_prf(prf, key, data)
245 return packet + nonce + id_hash
248 prf = self.ike_prf_alg.mod()
249 authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
250 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
251 self.auth_data = self.calc_prf(prf, psk, authmsg)
253 def encrypt(self, data):
254 data = self.ike_crypto_alg.pad(data)
255 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
258 def peer_authkey(self):
259 if self.is_initiator:
264 def my_authkey(self):
265 if self.is_initiator:
270 def my_cryptokey(self):
271 if self.is_initiator:
276 def peer_cryptokey(self):
277 if self.is_initiator:
281 def verify_hmac(self, ikemsg):
282 integ_trunc = self.ike_integ_alg.trunc_len
283 exp_hmac = ikemsg[-integ_trunc:]
284 data = ikemsg[:-integ_trunc]
285 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
286 self.peer_authkey, data)
287 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
289 def compute_hmac(self, integ, key, data):
290 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
294 def decrypt(self, data):
295 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
297 def hmac_and_decrypt(self, ike):
298 ep = ike[ikev2.IKEv2_payload_Encrypted]
299 self.verify_hmac(raw(ike))
300 integ_trunc = self.ike_integ_alg.trunc_len
302 # remove ICV and decrypt payload
303 ct = ep.load[:-integ_trunc]
304 return self.decrypt(ct)
306 def generate_ts(self):
307 c = self.child_sas[0]
308 ts1 = ikev2.IPv4TrafficSelector(
310 starting_address_v4=c.local_ts['start_addr'],
311 ending_address_v4=c.local_ts['end_addr'])
312 ts2 = ikev2.IPv4TrafficSelector(
314 starting_address_v4=c.remote_ts['start_addr'],
315 ending_address_v4=c.remote_ts['end_addr'])
316 return ([ts1], [ts2])
318 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
319 if crypto not in CRYPTO_ALGOS:
320 raise TypeError('unsupported encryption algo %r' % crypto)
321 self.ike_crypto = crypto
322 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
323 self.ike_crypto_key_len = crypto_key_len
325 if integ not in AUTH_ALGOS:
326 raise TypeError('unsupported auth algo %r' % integ)
327 self.ike_integ = integ
328 self.ike_integ_alg = AUTH_ALGOS[integ]
330 if prf not in PRF_ALGOS:
331 raise TypeError('unsupported prf algo %r' % prf)
333 self.ike_prf_alg = PRF_ALGOS[prf]
335 self.ike_group = DH[self.ike_dh]
337 def set_esp_props(self, crypto, crypto_key_len, integ):
338 self.esp_crypto_key_len = crypto_key_len
339 if crypto not in CRYPTO_ALGOS:
340 raise TypeError('unsupported encryption algo %r' % crypto)
341 self.esp_crypto = crypto
342 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
344 if integ not in AUTH_ALGOS:
345 raise TypeError('unsupported auth algo %r' % integ)
346 self.esp_integ = integ
347 self.esp_integ_alg = AUTH_ALGOS[integ]
349 def crypto_attr(self, key_len):
350 if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
351 return (0x800e << 16 | key_len << 3, 12)
353 raise Exception('unsupported attribute type')
355 def ike_crypto_attr(self):
356 return self.crypto_attr(self.ike_crypto_key_len)
358 def esp_crypto_attr(self):
359 return self.crypto_attr(self.esp_crypto_key_len)
362 class TestResponder(VppTestCase):
363 """ responder test """
367 import scapy.contrib.ikev2 as _ikev2
368 globals()['ikev2'] = _ikev2
369 super(TestResponder, cls).setUpClass()
370 cls.create_pg_interfaces(range(2))
371 for i in cls.pg_interfaces:
377 def tearDownClass(cls):
378 super(TestResponder, cls).tearDownClass()
381 super(TestResponder, self).setUp()
385 self.p = Profile(self, 'pr1')
386 self.p.add_auth(method='shared-key', data=b'$3cr3tpa$$w0rd')
387 self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
388 self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
389 self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
390 self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
391 self.p.add_vpp_config()
393 self.sa = IKEv2SA(self, id=self.p.remote_id['data'], is_initiator=True,
394 auth_data=self.p.auth['data'],
395 id_type=self.p.local_id['id_type'],
396 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
398 self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
399 integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
401 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
402 integ='HMAC-SHA1-96')
403 self.sa.generate_dh_data()
405 def create_ike_msg(self, src_if, msg, sport=500, dport=500):
406 return (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
407 IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
408 UDP(sport=sport, dport=dport) / msg)
410 def send_sa_init(self):
411 tr_attr = self.sa.ike_crypto_attr()
412 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
413 transform_id=self.sa.ike_crypto, length=tr_attr[1],
414 key_length=tr_attr[0]) /
415 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
416 transform_id=self.sa.ike_integ) /
417 ikev2.IKEv2_payload_Transform(transform_type='PRF',
418 transform_id=self.sa.ike_prf_alg.name) /
419 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
420 transform_id=self.sa.ike_dh))
422 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
423 trans_nb=4, trans=trans))
425 self.sa.init_req_packet = (
426 ikev2.IKEv2(init_SPI=self.sa.ispi,
427 flags='Initiator', exch_type='IKE_SA_INIT') /
428 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
429 ikev2.IKEv2_payload_KE(next_payload='Nonce',
430 group=self.sa.ike_dh,
431 load=self.sa.dh_pub_key()) /
432 ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce))
434 ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet)
435 self.pg0.add_stream(ike_msg)
436 self.pg0.enable_capture()
438 capture = self.pg0.get_capture(1)
439 self.verify_sa_init(capture[0])
441 def send_sa_auth(self):
442 tr_attr = self.sa.esp_crypto_attr()
443 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
444 transform_id=self.sa.esp_crypto, length=tr_attr[1],
445 key_length=tr_attr[0]) /
446 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
447 transform_id=self.sa.esp_integ) /
448 ikev2.IKEv2_payload_Transform(
449 transform_type='Extended Sequence Number',
450 transform_id='No ESN') /
451 ikev2.IKEv2_payload_Transform(
452 transform_type='Extended Sequence Number',
455 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
456 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
458 tsi, tsr = self.sa.generate_ts()
459 plain = (ikev2.IKEv2_payload_IDi(next_payload='AUTH',
460 IDtype=self.sa.id_type, load=self.sa.i_id) /
461 ikev2.IKEv2_payload_AUTH(next_payload='SA',
462 auth_type=2, load=self.sa.auth_data) /
463 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
464 ikev2.IKEv2_payload_TSi(next_payload='TSr',
465 number_of_TSs=len(tsi),
466 traffic_selector=tsi) /
467 ikev2.IKEv2_payload_TSr(next_payload='Notify',
468 number_of_TSs=len(tsr),
469 traffic_selector=tsr) /
470 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
471 encr = self.sa.encrypt(raw(plain))
473 trunc_len = self.sa.ike_integ_alg.trunc_len
474 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
475 tlen = plen + len(ikev2.IKEv2())
477 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
478 length=plen, load=encr)
479 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
480 length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
483 integ_data = raw(sa_auth)
484 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
485 self.sa.my_authkey, integ_data)
486 sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
487 assert(len(sa_auth) == tlen)
489 packet = self.create_ike_msg(self.pg0, sa_auth)
490 self.pg0.add_stream(packet)
491 self.pg0.enable_capture()
493 capture = self.pg0.get_capture(1)
494 self.verify_sa_auth(capture[0])
496 def verify_sa_init(self, packet):
497 ih = packet[ikev2.IKEv2]
498 self.assertEqual(ih.exch_type, 34)
499 self.assertTrue('Response' in ih.flags)
500 self.assertEqual(ih.init_SPI, self.sa.ispi)
501 self.assertNotEqual(ih.resp_SPI, 0)
502 self.sa.rspi = ih.resp_SPI
504 sa = ih[ikev2.IKEv2_payload_SA]
505 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
506 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
507 except AttributeError as e:
508 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
510 self.sa.complete_dh_data()
514 def verify_sa_auth(self, packet):
516 ike = packet[ikev2.IKEv2]
517 ep = packet[ikev2.IKEv2_payload_Encrypted]
518 except KeyError as e:
519 self.logger.error("unexpected reply: no IKEv2/Encrypt payload!")
521 plain = self.sa.hmac_and_decrypt(ike)
522 self.sa.calc_child_keys()
524 def verify_child_sas(self):
525 sas = self.vapi.ipsec_sa_dump()
526 self.assertEqual(len(sas), 2)
529 c = self.sa.child_sas[0]
532 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
533 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
534 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
535 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
538 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
539 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
540 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
541 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
543 def test_responder(self):
546 self.verify_child_sas()
549 if __name__ == '__main__':
550 unittest.main(testRunner=VppTestRunner)