2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
12 from scapy.layers.ipsec import ESP
13 from scapy.layers.inet import IP, UDP, Ether
14 from scapy.packet import raw, Raw
15 from scapy.utils import long_converter
16 from framework import VppTestCase, VppTestRunner
17 from vpp_ikev2 import Profile, IDType, AuthMethod
20 KEY_PAD = b"Key Pad for IKEv2"
24 # tuple structure is (p, g, key_len)
26 '2048MODPgr': (long_converter("""
27 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
28 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
29 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
30 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
31 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
32 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
33 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
34 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
35 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
36 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
37 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256)
41 class CryptoAlgo(object):
42 def __init__(self, name, cipher, mode):
46 if self.cipher is not None:
47 self.bs = self.cipher.block_size // 8
49 def encrypt(self, data, key):
50 iv = os.urandom(self.bs)
51 encryptor = Cipher(self.cipher(key), self.mode(iv),
52 default_backend()).encryptor()
53 return iv + encryptor.update(data) + encryptor.finalize()
55 def decrypt(self, data, key, icv=None):
58 decryptor = Cipher(algorithms.AES(key),
60 default_backend()).decryptor()
61 return decryptor.update(ct) + decryptor.finalize()
64 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
65 data = data + b'\x00' * (pad_len - 1)
66 return data + bytes([pad_len])
69 class AuthAlgo(object):
70 def __init__(self, name, mac, mod, key_len, trunc_len=None):
74 self.key_len = key_len
75 self.trunc_len = trunc_len or key_len
79 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
80 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
84 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
85 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
89 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
90 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
95 class IKEv2ChildSA(object):
96 def __init__(self, local_ts, remote_ts, spi=None):
97 self.spi = spi or os.urandom(4)
98 self.local_ts = local_ts
99 self.remote_ts = remote_ts
102 class IKEv2SA(object):
103 def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
104 i_id=None, r_id=None, id_type='fqdn', nonce=None,
105 auth_data=None, local_ts=None, remote_ts=None,
106 auth_method='shared-key', priv_key=None, natt=False):
114 self.dh_params = None
116 self.priv_key = priv_key
117 self.is_initiator = is_initiator
118 nonce = nonce or os.urandom(32)
119 self.auth_data = auth_data
122 if isinstance(id_type, str):
123 self.id_type = IDType.value(id_type)
125 self.id_type = id_type
126 self.auth_method = auth_method
127 if self.is_initiator:
135 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
137 def dh_pub_key(self):
138 return self.i_dh_data
140 def compute_secret(self):
141 priv = self.dh_private_key
142 peer = self.r_dh_data
143 p, g, l = self.ike_group
144 return pow(int.from_bytes(peer, 'big'),
145 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
147 def generate_dh_data(self):
149 if self.is_initiator:
150 if self.ike_dh not in DH:
151 raise NotImplementedError('%s not in DH group' % self.ike_dh)
152 if self.dh_params is None:
153 dhg = DH[self.ike_dh]
154 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
155 self.dh_params = pn.parameters(default_backend())
156 priv = self.dh_params.generate_private_key()
157 pub = priv.public_key()
158 x = priv.private_numbers().x
159 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
160 y = pub.public_numbers().y
161 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
163 def complete_dh_data(self):
164 self.dh_shared_secret = self.compute_secret()
166 def calc_child_keys(self):
167 prf = self.ike_prf_alg.mod()
168 s = self.i_nonce + self.r_nonce
169 c = self.child_sas[0]
171 encr_key_len = self.esp_crypto_key_len
172 integ_key_len = self.ike_integ_alg.key_len
173 l = (integ_key_len * 2 +
175 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
178 c.sk_ei = keymat[pos:pos+encr_key_len]
181 c.sk_ai = keymat[pos:pos+integ_key_len]
184 c.sk_er = keymat[pos:pos+encr_key_len]
187 c.sk_ar = keymat[pos:pos+integ_key_len]
190 def calc_prfplus(self, prf, key, seed, length):
194 while len(r) < length and x < 255:
199 s = s + seed + bytes([x])
200 t = self.calc_prf(prf, key, s)
208 def calc_prf(self, prf, key, data):
209 h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
214 prf = self.ike_prf_alg.mod()
215 # SKEYSEED = prf(Ni | Nr, g^ir)
216 s = self.i_nonce + self.r_nonce
217 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
219 # calculate S = Ni | Nr | SPIi SPIr
220 s = s + self.ispi + self.rspi
222 prf_key_trunc = self.ike_prf_alg.trunc_len
223 encr_key_len = self.ike_crypto_key_len
224 tr_prf_key_len = self.ike_prf_alg.key_len
225 integ_key_len = self.ike_integ_alg.key_len
230 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
233 self.sk_d = keymat[:pos+prf_key_trunc]
236 self.sk_ai = keymat[pos:pos+integ_key_len]
238 self.sk_ar = keymat[pos:pos+integ_key_len]
241 self.sk_ei = keymat[pos:pos+encr_key_len]
243 self.sk_er = keymat[pos:pos+encr_key_len]
246 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
247 pos += tr_prf_key_len
248 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
250 def generate_authmsg(self, prf, packet):
251 if self.is_initiator:
255 data = bytes([self.id_type, 0, 0, 0]) + id
256 id_hash = self.calc_prf(prf, key, data)
257 return packet + nonce + id_hash
260 prf = self.ike_prf_alg.mod()
261 authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
262 if self.auth_method == 'shared-key':
263 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
264 self.auth_data = self.calc_prf(prf, psk, authmsg)
265 elif self.auth_method == 'rsa-sig':
266 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
269 raise TypeError('unknown auth method type!')
271 def encrypt(self, data):
272 data = self.ike_crypto_alg.pad(data)
273 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
276 def peer_authkey(self):
277 if self.is_initiator:
282 def my_authkey(self):
283 if self.is_initiator:
288 def my_cryptokey(self):
289 if self.is_initiator:
294 def peer_cryptokey(self):
295 if self.is_initiator:
299 def verify_hmac(self, ikemsg):
300 integ_trunc = self.ike_integ_alg.trunc_len
301 exp_hmac = ikemsg[-integ_trunc:]
302 data = ikemsg[:-integ_trunc]
303 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
304 self.peer_authkey, data)
305 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
307 def compute_hmac(self, integ, key, data):
308 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
312 def decrypt(self, data):
313 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
315 def hmac_and_decrypt(self, ike):
316 ep = ike[ikev2.IKEv2_payload_Encrypted]
317 self.verify_hmac(raw(ike))
318 integ_trunc = self.ike_integ_alg.trunc_len
320 # remove ICV and decrypt payload
321 ct = ep.load[:-integ_trunc]
322 return self.decrypt(ct)
324 def generate_ts(self):
325 c = self.child_sas[0]
326 ts1 = ikev2.IPv4TrafficSelector(
328 starting_address_v4=c.local_ts['start_addr'],
329 ending_address_v4=c.local_ts['end_addr'])
330 ts2 = ikev2.IPv4TrafficSelector(
332 starting_address_v4=c.remote_ts['start_addr'],
333 ending_address_v4=c.remote_ts['end_addr'])
334 return ([ts1], [ts2])
336 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
337 if crypto not in CRYPTO_ALGOS:
338 raise TypeError('unsupported encryption algo %r' % crypto)
339 self.ike_crypto = crypto
340 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
341 self.ike_crypto_key_len = crypto_key_len
343 if integ not in AUTH_ALGOS:
344 raise TypeError('unsupported auth algo %r' % integ)
345 self.ike_integ = integ
346 self.ike_integ_alg = AUTH_ALGOS[integ]
348 if prf not in PRF_ALGOS:
349 raise TypeError('unsupported prf algo %r' % prf)
351 self.ike_prf_alg = PRF_ALGOS[prf]
353 self.ike_group = DH[self.ike_dh]
355 def set_esp_props(self, crypto, crypto_key_len, integ):
356 self.esp_crypto_key_len = crypto_key_len
357 if crypto not in CRYPTO_ALGOS:
358 raise TypeError('unsupported encryption algo %r' % crypto)
359 self.esp_crypto = crypto
360 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
362 if integ not in AUTH_ALGOS:
363 raise TypeError('unsupported auth algo %r' % integ)
364 self.esp_integ = integ
365 self.esp_integ_alg = AUTH_ALGOS[integ]
367 def crypto_attr(self, key_len):
368 if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
369 return (0x800e << 16 | key_len << 3, 12)
371 raise Exception('unsupported attribute type')
373 def ike_crypto_attr(self):
374 return self.crypto_attr(self.ike_crypto_key_len)
376 def esp_crypto_attr(self):
377 return self.crypto_attr(self.esp_crypto_key_len)
379 def compute_nat_sha1(self, ip, port):
380 data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
381 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
383 return digest.finalize()
386 class TemplateResponder(VppTestCase):
387 """ responder test """
391 import scapy.contrib.ikev2 as _ikev2
392 globals()['ikev2'] = _ikev2
393 super(TemplateResponder, cls).setUpClass()
394 cls.create_pg_interfaces(range(2))
395 for i in cls.pg_interfaces:
401 def tearDownClass(cls):
402 super(TemplateResponder, cls).tearDownClass()
405 super(TemplateResponder, self).setUp()
407 self.p.add_vpp_config()
408 self.sa.generate_dh_data()
410 def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
411 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
412 IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
413 UDP(sport=sport, dport=dport))
415 # insert non ESP marker
416 res = res / Raw(b'\x00' * 4)
419 def send_sa_init(self, behind_nat=False):
420 tr_attr = self.sa.ike_crypto_attr()
421 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
422 transform_id=self.sa.ike_crypto, length=tr_attr[1],
423 key_length=tr_attr[0]) /
424 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
425 transform_id=self.sa.ike_integ) /
426 ikev2.IKEv2_payload_Transform(transform_type='PRF',
427 transform_id=self.sa.ike_prf_alg.name) /
428 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
429 transform_id=self.sa.ike_dh))
431 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
432 trans_nb=4, trans=trans))
435 next_payload = 'Notify'
439 self.sa.init_req_packet = (
440 ikev2.IKEv2(init_SPI=self.sa.ispi,
441 flags='Initiator', exch_type='IKE_SA_INIT') /
442 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
443 ikev2.IKEv2_payload_KE(next_payload='Nonce',
444 group=self.sa.ike_dh,
445 load=self.sa.dh_pub_key()) /
446 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
447 load=self.sa.i_nonce))
450 src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
452 nat_detection = ikev2.IKEv2_payload_Notify(
453 type='NAT_DETECTION_SOURCE_IP',
455 self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
457 ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
458 self.sa.sport, self.sa.dport,
460 self.pg0.add_stream(ike_msg)
461 self.pg0.enable_capture()
463 capture = self.pg0.get_capture(1)
464 self.verify_sa_init(capture[0])
466 def send_sa_auth(self):
467 tr_attr = self.sa.esp_crypto_attr()
468 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
469 transform_id=self.sa.esp_crypto, length=tr_attr[1],
470 key_length=tr_attr[0]) /
471 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
472 transform_id=self.sa.esp_integ) /
473 ikev2.IKEv2_payload_Transform(
474 transform_type='Extended Sequence Number',
475 transform_id='No ESN') /
476 ikev2.IKEv2_payload_Transform(
477 transform_type='Extended Sequence Number',
480 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
481 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
483 tsi, tsr = self.sa.generate_ts()
484 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
485 IDtype=self.sa.id_type, load=self.sa.i_id) /
486 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
487 IDtype=self.sa.id_type, load=self.sa.r_id) /
488 ikev2.IKEv2_payload_AUTH(next_payload='SA',
489 auth_type=AuthMethod.value(self.sa.auth_method),
490 load=self.sa.auth_data) /
491 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
492 ikev2.IKEv2_payload_TSi(next_payload='TSr',
493 number_of_TSs=len(tsi),
494 traffic_selector=tsi) /
495 ikev2.IKEv2_payload_TSr(next_payload='Notify',
496 number_of_TSs=len(tsr),
497 traffic_selector=tsr) /
498 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
499 encr = self.sa.encrypt(raw(plain))
501 trunc_len = self.sa.ike_integ_alg.trunc_len
502 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
503 tlen = plen + len(ikev2.IKEv2())
505 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
506 length=plen, load=encr)
507 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
508 length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
511 integ_data = raw(sa_auth)
512 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
513 self.sa.my_authkey, integ_data)
514 sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
515 assert(len(sa_auth) == tlen)
517 packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
518 self.sa.dport, self.sa.natt)
519 self.pg0.add_stream(packet)
520 self.pg0.enable_capture()
522 capture = self.pg0.get_capture(1)
523 self.verify_sa_auth(capture[0])
525 def get_ike_header(self, packet):
527 ih = packet[ikev2.IKEv2]
528 except IndexError as e:
529 # this is a workaround for getting IKEv2 layer as both ikev2 and
530 # ipsec register for port 4500
532 ih = self.verify_and_remove_non_esp_marker(esp)
535 def verify_sa_init(self, packet):
536 ih = self.get_ike_header(packet)
538 self.assertEqual(ih.exch_type, 34)
539 self.assertTrue('Response' in ih.flags)
540 self.assertEqual(ih.init_SPI, self.sa.ispi)
541 self.assertNotEqual(ih.resp_SPI, 0)
542 self.sa.rspi = ih.resp_SPI
544 sa = ih[ikev2.IKEv2_payload_SA]
545 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
546 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
547 except AttributeError as e:
548 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
550 self.sa.complete_dh_data()
554 def verify_and_remove_non_esp_marker(self, packet):
556 # if we are in nat traversal mode check for non esp marker
559 self.assertEqual(data[:4], b'\x00' * 4)
560 return ikev2.IKEv2(data[4:])
564 def verify_udp(self, udp):
565 self.assertEqual(udp.sport, self.sa.sport)
566 self.assertEqual(udp.dport, self.sa.dport)
568 def verify_sa_auth(self, packet):
569 ike = self.get_ike_header(packet)
572 plain = self.sa.hmac_and_decrypt(ike)
573 self.sa.calc_child_keys()
575 def verify_child_sas(self):
576 sas = self.vapi.ipsec_sa_dump()
577 self.assertEqual(len(sas), 2)
580 c = self.sa.child_sas[0]
583 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
584 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
585 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
586 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
589 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
590 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
591 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
592 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
594 def test_responder(self):
595 self.send_sa_init(self.sa.natt)
597 self.verify_child_sas()
600 class Ikev2Params(object):
601 def config_params(self, params={}):
602 is_natt = 'natt' in params and params['natt'] or False
603 self.p = Profile(self, 'pr1')
605 if 'auth' in params and params['auth'] == 'rsa-sig':
606 auth_method = 'rsa-sig'
607 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
608 self.vapi.ikev2_set_local_key(
609 key_file=work_dir + params['server-key'])
611 client_file = work_dir + params['client-cert']
612 server_pem = open(work_dir + params['server-cert']).read()
613 client_priv = open(work_dir + params['client-key']).read()
614 client_priv = load_pem_private_key(str.encode(client_priv), None,
616 self.peer_cert = x509.load_pem_x509_certificate(
617 str.encode(server_pem),
619 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
622 auth_data = b'$3cr3tpa$$w0rd'
623 self.p.add_auth(method='shared-key', data=auth_data)
624 auth_method = 'shared-key'
627 self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
628 self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
629 self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
630 self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
632 self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
633 r_id=self.p.local_id['data'],
634 id_type=self.p.local_id['id_type'], natt=is_natt,
635 priv_key=client_priv, auth_method=auth_method,
637 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
639 self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
640 integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
642 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
643 integ='HMAC-SHA1-96')
646 class TestResponderNATT(TemplateResponder, Ikev2Params):
647 """ test ikev2 responder - nat traversal """
653 class TestResponderPsk(TemplateResponder, Ikev2Params):
654 """ test ikev2 responder - pre shared key auth """
659 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
660 """ test ikev2 responder - cert based auth """
664 'server-key': 'server-key.pem',
665 'client-key': 'client-key.pem',
666 'client-cert': 'client-cert.pem',
667 'server-cert': 'server-cert.pem'})
669 if __name__ == '__main__':
670 unittest.main(testRunner=VppTestRunner)