2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
12 from ipaddress import IPv4Address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.packet import raw, Raw
16 from scapy.utils import long_converter
17 from framework import VppTestCase, VppTestRunner
18 from vpp_ikev2 import Profile, IDType, AuthMethod
19 from vpp_papi import VppEnum
22 KEY_PAD = b"Key Pad for IKEv2"
29 # tuple structure is (p, g, key_len)
31 '2048MODPgr': (long_converter("""
32 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
33 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
34 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
35 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
36 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
37 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
38 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
39 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
40 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
41 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
42 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
44 '3072MODPgr': (long_converter("""
45 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
56 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
57 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
58 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
59 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
60 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
64 class CryptoAlgo(object):
65 def __init__(self, name, cipher, mode):
69 if self.cipher is not None:
70 self.bs = self.cipher.block_size // 8
72 if self.name == 'AES-GCM-16ICV':
73 self.iv_len = GCM_IV_SIZE
77 def encrypt(self, data, key, aad=None):
78 iv = os.urandom(self.iv_len)
80 encryptor = Cipher(self.cipher(key), self.mode(iv),
81 default_backend()).encryptor()
82 return iv + encryptor.update(data) + encryptor.finalize()
84 salt = key[-SALT_SIZE:]
86 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
87 default_backend()).encryptor()
88 encryptor.authenticate_additional_data(aad)
89 data = encryptor.update(data) + encryptor.finalize()
90 data += encryptor.tag[:GCM_ICV_SIZE]
93 def decrypt(self, data, key, aad=None, icv=None):
95 iv = data[:self.iv_len]
96 ct = data[self.iv_len:]
97 decryptor = Cipher(algorithms.AES(key),
99 default_backend()).decryptor()
100 return decryptor.update(ct) + decryptor.finalize()
102 salt = key[-SALT_SIZE:]
103 nonce = salt + data[:GCM_IV_SIZE]
104 ct = data[GCM_IV_SIZE:]
105 key = key[:-SALT_SIZE]
106 decryptor = Cipher(algorithms.AES(key),
107 self.mode(nonce, icv, len(icv)),
108 default_backend()).decryptor()
109 decryptor.authenticate_additional_data(aad)
110 pt = decryptor.update(ct) + decryptor.finalize()
115 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
116 data = data + b'\x00' * (pad_len - 1)
117 return data + bytes([pad_len])
120 class AuthAlgo(object):
121 def __init__(self, name, mac, mod, key_len, trunc_len=None):
125 self.key_len = key_len
126 self.trunc_len = trunc_len or key_len
130 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
131 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
132 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
137 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
138 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
139 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
140 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
141 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
145 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
151 class IKEv2ChildSA(object):
152 def __init__(self, local_ts, remote_ts, spi=None):
153 self.spi = spi or os.urandom(4)
154 self.local_ts = local_ts
155 self.remote_ts = remote_ts
158 class IKEv2SA(object):
159 def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
160 i_id=None, r_id=None, id_type='fqdn', nonce=None,
161 auth_data=None, local_ts=None, remote_ts=None,
162 auth_method='shared-key', priv_key=None, natt=False):
170 self.dh_params = None
172 self.priv_key = priv_key
173 self.is_initiator = is_initiator
174 nonce = nonce or os.urandom(32)
175 self.auth_data = auth_data
178 if isinstance(id_type, str):
179 self.id_type = IDType.value(id_type)
181 self.id_type = id_type
182 self.auth_method = auth_method
183 if self.is_initiator:
191 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
193 def dh_pub_key(self):
194 return self.i_dh_data
196 def compute_secret(self):
197 priv = self.dh_private_key
198 peer = self.r_dh_data
199 p, g, l = self.ike_group
200 return pow(int.from_bytes(peer, 'big'),
201 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
203 def generate_dh_data(self):
205 if self.is_initiator:
206 if self.ike_dh not in DH:
207 raise NotImplementedError('%s not in DH group' % self.ike_dh)
208 if self.dh_params is None:
209 dhg = DH[self.ike_dh]
210 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
211 self.dh_params = pn.parameters(default_backend())
212 priv = self.dh_params.generate_private_key()
213 pub = priv.public_key()
214 x = priv.private_numbers().x
215 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
216 y = pub.public_numbers().y
217 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
219 def complete_dh_data(self):
220 self.dh_shared_secret = self.compute_secret()
222 def calc_child_keys(self):
223 prf = self.ike_prf_alg.mod()
224 s = self.i_nonce + self.r_nonce
225 c = self.child_sas[0]
227 encr_key_len = self.esp_crypto_key_len
228 integ_key_len = self.esp_integ_alg.key_len
229 salt_len = 0 if integ_key_len else 4
231 l = (integ_key_len * 2 +
234 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
237 c.sk_ei = keymat[pos:pos+encr_key_len]
241 c.sk_ai = keymat[pos:pos+integ_key_len]
244 c.salt_ei = keymat[pos:pos+salt_len]
247 c.sk_er = keymat[pos:pos+encr_key_len]
251 c.sk_ar = keymat[pos:pos+integ_key_len]
254 c.salt_er = keymat[pos:pos+salt_len]
257 def calc_prfplus(self, prf, key, seed, length):
261 while len(r) < length and x < 255:
266 s = s + seed + bytes([x])
267 t = self.calc_prf(prf, key, s)
275 def calc_prf(self, prf, key, data):
276 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
281 prf = self.ike_prf_alg.mod()
282 # SKEYSEED = prf(Ni | Nr, g^ir)
283 s = self.i_nonce + self.r_nonce
284 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
286 # calculate S = Ni | Nr | SPIi SPIr
287 s = s + self.ispi + self.rspi
289 prf_key_trunc = self.ike_prf_alg.trunc_len
290 encr_key_len = self.ike_crypto_key_len
291 tr_prf_key_len = self.ike_prf_alg.key_len
292 integ_key_len = self.ike_integ_alg.key_len
293 if integ_key_len == 0:
303 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
306 self.sk_d = keymat[:pos+prf_key_trunc]
309 self.sk_ai = keymat[pos:pos+integ_key_len]
311 self.sk_ar = keymat[pos:pos+integ_key_len]
314 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
315 pos += encr_key_len + salt_size
316 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
317 pos += encr_key_len + salt_size
319 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
320 pos += tr_prf_key_len
321 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
323 def generate_authmsg(self, prf, packet):
324 if self.is_initiator:
328 data = bytes([self.id_type, 0, 0, 0]) + id
329 id_hash = self.calc_prf(prf, key, data)
330 return packet + nonce + id_hash
333 prf = self.ike_prf_alg.mod()
334 authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
335 if self.auth_method == 'shared-key':
336 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
337 self.auth_data = self.calc_prf(prf, psk, authmsg)
338 elif self.auth_method == 'rsa-sig':
339 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
342 raise TypeError('unknown auth method type!')
344 def encrypt(self, data, aad=None):
345 data = self.ike_crypto_alg.pad(data)
346 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
349 def peer_authkey(self):
350 if self.is_initiator:
355 def my_authkey(self):
356 if self.is_initiator:
361 def my_cryptokey(self):
362 if self.is_initiator:
367 def peer_cryptokey(self):
368 if self.is_initiator:
372 def concat(self, alg, key_len):
373 return alg + '-' + str(key_len * 8)
376 def vpp_ike_cypto_alg(self):
377 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
380 def vpp_esp_cypto_alg(self):
381 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
383 def verify_hmac(self, ikemsg):
384 integ_trunc = self.ike_integ_alg.trunc_len
385 exp_hmac = ikemsg[-integ_trunc:]
386 data = ikemsg[:-integ_trunc]
387 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
388 self.peer_authkey, data)
389 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
391 def compute_hmac(self, integ, key, data):
392 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
396 def decrypt(self, data, aad=None, icv=None):
397 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
399 def hmac_and_decrypt(self, ike):
400 ep = ike[ikev2.IKEv2_payload_Encrypted]
401 if self.ike_crypto == 'AES-GCM-16ICV':
402 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
403 ct = ep.load[:-GCM_ICV_SIZE]
404 tag = ep.load[-GCM_ICV_SIZE:]
405 return self.decrypt(ct, raw(ike)[:aad_len], tag)
407 self.verify_hmac(raw(ike))
408 integ_trunc = self.ike_integ_alg.trunc_len
410 # remove ICV and decrypt payload
411 ct = ep.load[:-integ_trunc]
412 return self.decrypt(ct)
414 def generate_ts(self):
415 c = self.child_sas[0]
416 ts1 = ikev2.IPv4TrafficSelector(
418 starting_address_v4=c.local_ts['start_addr'],
419 ending_address_v4=c.local_ts['end_addr'])
420 ts2 = ikev2.IPv4TrafficSelector(
422 starting_address_v4=c.remote_ts['start_addr'],
423 ending_address_v4=c.remote_ts['end_addr'])
424 return ([ts1], [ts2])
426 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
427 if crypto not in CRYPTO_ALGOS:
428 raise TypeError('unsupported encryption algo %r' % crypto)
429 self.ike_crypto = crypto
430 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
431 self.ike_crypto_key_len = crypto_key_len
433 if integ not in AUTH_ALGOS:
434 raise TypeError('unsupported auth algo %r' % integ)
435 self.ike_integ = None if integ == 'NULL' else integ
436 self.ike_integ_alg = AUTH_ALGOS[integ]
438 if prf not in PRF_ALGOS:
439 raise TypeError('unsupported prf algo %r' % prf)
441 self.ike_prf_alg = PRF_ALGOS[prf]
443 self.ike_group = DH[self.ike_dh]
445 def set_esp_props(self, crypto, crypto_key_len, integ):
446 self.esp_crypto_key_len = crypto_key_len
447 if crypto not in CRYPTO_ALGOS:
448 raise TypeError('unsupported encryption algo %r' % crypto)
449 self.esp_crypto = crypto
450 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
452 if integ not in AUTH_ALGOS:
453 raise TypeError('unsupported auth algo %r' % integ)
454 self.esp_integ = None if integ == 'NULL' else integ
455 self.esp_integ_alg = AUTH_ALGOS[integ]
457 def crypto_attr(self, key_len):
458 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
459 return (0x800e << 16 | key_len << 3, 12)
461 raise Exception('unsupported attribute type')
463 def ike_crypto_attr(self):
464 return self.crypto_attr(self.ike_crypto_key_len)
466 def esp_crypto_attr(self):
467 return self.crypto_attr(self.esp_crypto_key_len)
469 def compute_nat_sha1(self, ip, port):
470 data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
471 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
473 return digest.finalize()
476 class TemplateResponder(VppTestCase):
477 """ responder test template """
481 import scapy.contrib.ikev2 as _ikev2
482 globals()['ikev2'] = _ikev2
483 super(TemplateResponder, cls).setUpClass()
484 cls.create_pg_interfaces(range(2))
485 for i in cls.pg_interfaces:
491 def tearDownClass(cls):
492 super(TemplateResponder, cls).tearDownClass()
495 super(TemplateResponder, self).setUp()
497 self.p.add_vpp_config()
498 self.assertIsNotNone(self.p.query_vpp_config())
499 self.sa.generate_dh_data()
502 super(TemplateResponder, self).tearDown()
503 self.p.remove_vpp_config()
504 self.assertIsNone(self.p.query_vpp_config())
506 def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
507 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
508 IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
509 UDP(sport=sport, dport=dport))
511 # insert non ESP marker
512 res = res / Raw(b'\x00' * 4)
515 def send_sa_init(self, behind_nat=False):
516 tr_attr = self.sa.ike_crypto_attr()
517 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
518 transform_id=self.sa.ike_crypto, length=tr_attr[1],
519 key_length=tr_attr[0]) /
520 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
521 transform_id=self.sa.ike_integ) /
522 ikev2.IKEv2_payload_Transform(transform_type='PRF',
523 transform_id=self.sa.ike_prf_alg.name) /
524 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
525 transform_id=self.sa.ike_dh))
527 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
528 trans_nb=4, trans=trans))
531 next_payload = 'Notify'
535 self.sa.init_req_packet = (
536 ikev2.IKEv2(init_SPI=self.sa.ispi,
537 flags='Initiator', exch_type='IKE_SA_INIT') /
538 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
539 ikev2.IKEv2_payload_KE(next_payload='Nonce',
540 group=self.sa.ike_dh,
541 load=self.sa.dh_pub_key()) /
542 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
543 load=self.sa.i_nonce))
546 src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
548 nat_detection = ikev2.IKEv2_payload_Notify(
549 type='NAT_DETECTION_SOURCE_IP',
551 self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
553 ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
554 self.sa.sport, self.sa.dport,
556 self.pg0.add_stream(ike_msg)
557 self.pg0.enable_capture()
559 capture = self.pg0.get_capture(1)
560 self.verify_sa_init(capture[0])
562 def send_sa_auth(self):
563 tr_attr = self.sa.esp_crypto_attr()
564 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
565 transform_id=self.sa.esp_crypto, length=tr_attr[1],
566 key_length=tr_attr[0]) /
567 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
568 transform_id=self.sa.esp_integ) /
569 ikev2.IKEv2_payload_Transform(
570 transform_type='Extended Sequence Number',
571 transform_id='No ESN') /
572 ikev2.IKEv2_payload_Transform(
573 transform_type='Extended Sequence Number',
576 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
577 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
579 tsi, tsr = self.sa.generate_ts()
580 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
581 IDtype=self.sa.id_type, load=self.sa.i_id) /
582 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
583 IDtype=self.sa.id_type, load=self.sa.r_id) /
584 ikev2.IKEv2_payload_AUTH(next_payload='SA',
585 auth_type=AuthMethod.value(self.sa.auth_method),
586 load=self.sa.auth_data) /
587 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
588 ikev2.IKEv2_payload_TSi(next_payload='TSr',
589 number_of_TSs=len(tsi),
590 traffic_selector=tsi) /
591 ikev2.IKEv2_payload_TSr(next_payload='Notify',
592 number_of_TSs=len(tsr),
593 traffic_selector=tsr) /
594 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
596 if self.sa.ike_crypto == 'AES-GCM-16ICV':
597 data = self.sa.ike_crypto_alg.pad(raw(plain))
598 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
599 len(ikev2.IKEv2_payload_Encrypted())
600 tlen = plen + len(ikev2.IKEv2())
603 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
605 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
606 resp_SPI=self.sa.rspi, id=1,
607 length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
610 encr = self.sa.encrypt(raw(plain), raw(sa_auth))
611 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
612 length=plen, load=encr)
613 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
614 resp_SPI=self.sa.rspi, id=1,
615 length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
618 encr = self.sa.encrypt(raw(plain))
619 trunc_len = self.sa.ike_integ_alg.trunc_len
620 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
621 tlen = plen + len(ikev2.IKEv2())
623 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
624 length=plen, load=encr)
625 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
626 resp_SPI=self.sa.rspi, id=1,
627 length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
630 integ_data = raw(sa_auth)
631 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
632 self.sa.my_authkey, integ_data)
633 sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
635 assert(len(sa_auth) == tlen)
636 packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
637 self.sa.dport, self.sa.natt)
638 self.pg0.add_stream(packet)
639 self.pg0.enable_capture()
641 capture = self.pg0.get_capture(1)
642 self.verify_sa_auth(capture[0])
644 def get_ike_header(self, packet):
646 ih = packet[ikev2.IKEv2]
647 except IndexError as e:
648 # this is a workaround for getting IKEv2 layer as both ikev2 and
649 # ipsec register for port 4500
651 ih = self.verify_and_remove_non_esp_marker(esp)
654 def verify_sa_init(self, packet):
655 ih = self.get_ike_header(packet)
657 self.assertEqual(ih.exch_type, 34)
658 self.assertTrue('Response' in ih.flags)
659 self.assertEqual(ih.init_SPI, self.sa.ispi)
660 self.assertNotEqual(ih.resp_SPI, 0)
661 self.sa.rspi = ih.resp_SPI
663 sa = ih[ikev2.IKEv2_payload_SA]
664 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
665 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
666 except IndexError as e:
667 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
668 self.logger.error(ih.show())
670 self.sa.complete_dh_data()
674 def verify_and_remove_non_esp_marker(self, packet):
676 # if we are in nat traversal mode check for non esp marker
679 self.assertEqual(data[:4], b'\x00' * 4)
680 return ikev2.IKEv2(data[4:])
684 def verify_udp(self, udp):
685 self.assertEqual(udp.sport, self.sa.sport)
686 self.assertEqual(udp.dport, self.sa.dport)
688 def verify_sa_auth(self, packet):
689 ike = self.get_ike_header(packet)
692 plain = self.sa.hmac_and_decrypt(ike)
693 self.sa.calc_child_keys()
695 def verify_child_sas(self):
696 sas = self.vapi.ipsec_sa_dump()
697 self.assertEqual(len(sas), 2)
700 c = self.sa.child_sas[0]
702 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
703 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
704 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
706 if self.sa.esp_integ is None:
709 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
710 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
711 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
714 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
715 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
716 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
717 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
721 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
722 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
723 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
724 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
726 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
727 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
729 def test_responder(self):
730 self.send_sa_init(self.sa.natt)
732 self.verify_child_sas()
735 class Ikev2Params(object):
736 def config_params(self, params={}):
737 ec = VppEnum.vl_api_ipsec_crypto_alg_t
738 ei = VppEnum.vl_api_ipsec_integ_alg_t
740 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
741 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
742 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
743 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
744 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
745 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
747 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
748 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
749 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
750 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
752 is_natt = 'natt' in params and params['natt'] or False
753 self.p = Profile(self, 'pr1')
755 if 'auth' in params and params['auth'] == 'rsa-sig':
756 auth_method = 'rsa-sig'
757 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
758 self.vapi.ikev2_set_local_key(
759 key_file=work_dir + params['server-key'])
761 client_file = work_dir + params['client-cert']
762 server_pem = open(work_dir + params['server-cert']).read()
763 client_priv = open(work_dir + params['client-key']).read()
764 client_priv = load_pem_private_key(str.encode(client_priv), None,
766 self.peer_cert = x509.load_pem_x509_certificate(
767 str.encode(server_pem),
769 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
772 auth_data = b'$3cr3tpa$$w0rd'
773 self.p.add_auth(method='shared-key', data=auth_data)
774 auth_method = 'shared-key'
777 self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
778 self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
779 self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
780 self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
782 self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
783 r_id=self.p.local_id['data'],
784 id_type=self.p.local_id['id_type'], natt=is_natt,
785 priv_key=client_priv, auth_method=auth_method,
787 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
789 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
791 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
793 ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
795 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
797 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
800 self.sa.set_ike_props(
801 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
802 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
803 self.sa.set_esp_props(
804 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
808 class TestApi(VppTestCase):
809 """ Test IKEV2 API """
812 super(TestApi, cls).setUpClass()
815 def tearDownClass(cls):
816 super(TestApi, cls).tearDownClass()
819 super(TestApi, self).tearDown()
820 self.p1.remove_vpp_config()
821 self.p2.remove_vpp_config()
822 r = self.vapi.ikev2_profile_dump()
823 self.assertEqual(len(r), 0)
825 def configure_profile(self, cfg):
826 p = Profile(self, cfg['name'])
827 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
828 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
829 p.add_local_ts(**cfg['loc_ts'])
830 p.add_remote_ts(**cfg['rem_ts'])
831 p.add_responder(cfg['responder'])
832 p.add_ike_transforms(cfg['ike_ts'])
833 p.add_esp_transforms(cfg['esp_ts'])
834 p.add_auth(**cfg['auth'])
835 p.set_udp_encap(cfg['udp_encap'])
836 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
837 if 'lifetime_data' in cfg:
838 p.set_lifetime_data(cfg['lifetime_data'])
840 p.set_tunnel_interface(cfg['tun_itf'])
844 def test_profile_api(self):
845 """ test profile dump API """
850 'start_addr': '3.3.3.2',
851 'end_addr': '3.3.3.3',
857 'start_addr': '4.5.76.80',
858 'end_addr': '2.3.4.6',
864 'loc_id': ('fqdn', b'vpp.home'),
865 'rem_id': ('fqdn', b'roadwarrior.example.com'),
868 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
871 'crypto_key_size': 32,
876 'crypto_key_size': 24,
878 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
880 'ipsec_over_udp_port': 4501,
883 'lifetime_maxdata': 20192,
884 'lifetime_jitter': 9,
889 'loc_id': ('ip4-addr', b'192.168.2.1'),
890 'rem_id': ('ip4-addr', b'192.168.2.2'),
893 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
896 'crypto_key_size': 16,
901 'crypto_key_size': 24,
903 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
905 'ipsec_over_udp_port': 4600,
908 self.p1 = self.configure_profile(conf['p1'])
909 self.p2 = self.configure_profile(conf['p2'])
911 r = self.vapi.ikev2_profile_dump()
912 self.assertEqual(len(r), 2)
913 self.verify_profile(r[0].profile, conf['p1'])
914 self.verify_profile(r[1].profile, conf['p2'])
916 def verify_id(self, api_id, cfg_id):
917 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
918 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
920 def verify_ts(self, api_ts, cfg_ts):
921 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
922 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
923 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
924 self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
925 self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
927 def verify_responder(self, api_r, cfg_r):
928 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
929 self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
931 def verify_transforms(self, api_ts, cfg_ts):
932 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
933 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
934 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
936 def verify_ike_transforms(self, api_ts, cfg_ts):
937 self.verify_transforms(api_ts, cfg_ts)
938 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
940 def verify_esp_transforms(self, api_ts, cfg_ts):
941 self.verify_transforms(api_ts, cfg_ts)
943 def verify_auth(self, api_auth, cfg_auth):
944 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
945 self.assertEqual(api_auth.data, cfg_auth['data'])
946 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
948 def verify_lifetime_data(self, p, ld):
949 self.assertEqual(p.lifetime, ld['lifetime'])
950 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
951 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
952 self.assertEqual(p.handover, ld['handover'])
954 def verify_profile(self, ap, cp):
955 self.assertEqual(ap.name, cp['name'])
956 self.assertEqual(ap.udp_encap, cp['udp_encap'])
957 self.verify_id(ap.loc_id, cp['loc_id'])
958 self.verify_id(ap.rem_id, cp['rem_id'])
959 self.verify_ts(ap.loc_ts, cp['loc_ts'])
960 self.verify_ts(ap.rem_ts, cp['rem_ts'])
961 self.verify_responder(ap.responder, cp['responder'])
962 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
963 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
964 self.verify_auth(ap.auth, cp['auth'])
965 if 'lifetime_data' in cp:
966 self.verify_lifetime_data(ap, cp['lifetime_data'])
967 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
969 self.assertEqual(ap.tun_itf, cp['tun_itf'])
971 self.assertEqual(ap.tun_itf, 0xffffffff)
974 class TestResponderNATT(TemplateResponder, Ikev2Params):
975 """ test ikev2 responder - nat traversal """
981 class TestResponderPsk(TemplateResponder, Ikev2Params):
982 """ test ikev2 responder - pre shared key auth """
987 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
988 """ test ikev2 responder - cert based auth """
992 'server-key': 'server-key.pem',
993 'client-key': 'client-key.pem',
994 'client-cert': 'client-cert.pem',
995 'server-cert': 'server-cert.pem'})
998 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
999 (TemplateResponder, Ikev2Params):
1001 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1003 def config_tc(self):
1004 self.config_params({
1005 'ike-crypto': ('AES-CBC', 16),
1006 'ike-integ': 'SHA2-256-128',
1007 'esp-crypto': ('AES-CBC', 24),
1008 'esp-integ': 'SHA2-384-192',
1009 'ike-dh': '2048MODPgr'})
1012 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1013 (TemplateResponder, Ikev2Params):
1015 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1017 def config_tc(self):
1018 self.config_params({
1019 'ike-crypto': ('AES-CBC', 32),
1020 'ike-integ': 'SHA2-256-128',
1021 'esp-crypto': ('AES-GCM-16ICV', 32),
1022 'esp-integ': 'NULL',
1023 'ike-dh': '3072MODPgr'})
1026 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1030 def config_tc(self):
1031 self.config_params({
1032 'ike-crypto': ('AES-GCM-16ICV', 32),
1033 'ike-integ': 'NULL',
1034 'ike-dh': '2048MODPgr'})
1037 if __name__ == '__main__':
1038 unittest.main(testRunner=VppTestRunner)