2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
12 from ipaddress import IPv4Address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.packet import raw, Raw
16 from scapy.utils import long_converter
17 from framework import VppTestCase, VppTestRunner
18 from vpp_ikev2 import Profile, IDType, AuthMethod
19 from vpp_papi import VppEnum
22 KEY_PAD = b"Key Pad for IKEv2"
29 # tuple structure is (p, g, key_len)
31 '2048MODPgr': (long_converter("""
32 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
33 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
34 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
35 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
36 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
37 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
38 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
39 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
40 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
41 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
42 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
44 '3072MODPgr': (long_converter("""
45 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
56 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
57 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
58 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
59 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
60 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
64 class CryptoAlgo(object):
65 def __init__(self, name, cipher, mode):
69 if self.cipher is not None:
70 self.bs = self.cipher.block_size // 8
72 if self.name == 'AES-GCM-16ICV':
73 self.iv_len = GCM_IV_SIZE
77 def encrypt(self, data, key, aad=None):
78 iv = os.urandom(self.iv_len)
80 encryptor = Cipher(self.cipher(key), self.mode(iv),
81 default_backend()).encryptor()
82 return iv + encryptor.update(data) + encryptor.finalize()
84 salt = key[-SALT_SIZE:]
86 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
87 default_backend()).encryptor()
88 encryptor.authenticate_additional_data(aad)
89 data = encryptor.update(data) + encryptor.finalize()
90 data += encryptor.tag[:GCM_ICV_SIZE]
93 def decrypt(self, data, key, aad=None, icv=None):
95 iv = data[:self.iv_len]
96 ct = data[self.iv_len:]
97 decryptor = Cipher(algorithms.AES(key),
99 default_backend()).decryptor()
100 return decryptor.update(ct) + decryptor.finalize()
102 salt = key[-SALT_SIZE:]
103 nonce = salt + data[:GCM_IV_SIZE]
104 ct = data[GCM_IV_SIZE:]
105 key = key[:-SALT_SIZE]
106 decryptor = Cipher(algorithms.AES(key),
107 self.mode(nonce, icv, len(icv)),
108 default_backend()).decryptor()
109 decryptor.authenticate_additional_data(aad)
110 pt = decryptor.update(ct) + decryptor.finalize()
115 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
116 data = data + b'\x00' * (pad_len - 1)
117 return data + bytes([pad_len])
120 class AuthAlgo(object):
121 def __init__(self, name, mac, mod, key_len, trunc_len=None):
125 self.key_len = key_len
126 self.trunc_len = trunc_len or key_len
130 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
131 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
132 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
137 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
138 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
139 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
140 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
141 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
145 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
151 class IKEv2ChildSA(object):
152 def __init__(self, local_ts, remote_ts, spi=None):
153 self.spi = spi or os.urandom(4)
154 self.local_ts = local_ts
155 self.remote_ts = remote_ts
158 class IKEv2SA(object):
159 def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
160 i_id=None, r_id=None, id_type='fqdn', nonce=None,
161 auth_data=None, local_ts=None, remote_ts=None,
162 auth_method='shared-key', priv_key=None, natt=False):
170 self.dh_params = None
172 self.priv_key = priv_key
173 self.is_initiator = is_initiator
174 nonce = nonce or os.urandom(32)
175 self.auth_data = auth_data
178 if isinstance(id_type, str):
179 self.id_type = IDType.value(id_type)
181 self.id_type = id_type
182 self.auth_method = auth_method
183 if self.is_initiator:
191 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
193 def dh_pub_key(self):
194 return self.i_dh_data
196 def compute_secret(self):
197 priv = self.dh_private_key
198 peer = self.r_dh_data
199 p, g, l = self.ike_group
200 return pow(int.from_bytes(peer, 'big'),
201 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
203 def generate_dh_data(self):
205 if self.is_initiator:
206 if self.ike_dh not in DH:
207 raise NotImplementedError('%s not in DH group' % self.ike_dh)
208 if self.dh_params is None:
209 dhg = DH[self.ike_dh]
210 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
211 self.dh_params = pn.parameters(default_backend())
212 priv = self.dh_params.generate_private_key()
213 pub = priv.public_key()
214 x = priv.private_numbers().x
215 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
216 y = pub.public_numbers().y
217 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
219 def complete_dh_data(self):
220 self.dh_shared_secret = self.compute_secret()
222 def calc_child_keys(self):
223 prf = self.ike_prf_alg.mod()
224 s = self.i_nonce + self.r_nonce
225 c = self.child_sas[0]
227 encr_key_len = self.esp_crypto_key_len
228 integ_key_len = self.esp_integ_alg.key_len
229 salt_len = 0 if integ_key_len else 4
231 l = (integ_key_len * 2 +
234 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
237 c.sk_ei = keymat[pos:pos+encr_key_len]
241 c.sk_ai = keymat[pos:pos+integ_key_len]
244 c.salt_ei = keymat[pos:pos+salt_len]
247 c.sk_er = keymat[pos:pos+encr_key_len]
251 c.sk_ar = keymat[pos:pos+integ_key_len]
254 c.salt_er = keymat[pos:pos+salt_len]
257 def calc_prfplus(self, prf, key, seed, length):
261 while len(r) < length and x < 255:
266 s = s + seed + bytes([x])
267 t = self.calc_prf(prf, key, s)
275 def calc_prf(self, prf, key, data):
276 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
281 prf = self.ike_prf_alg.mod()
282 # SKEYSEED = prf(Ni | Nr, g^ir)
283 s = self.i_nonce + self.r_nonce
284 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
286 # calculate S = Ni | Nr | SPIi SPIr
287 s = s + self.ispi + self.rspi
289 prf_key_trunc = self.ike_prf_alg.trunc_len
290 encr_key_len = self.ike_crypto_key_len
291 tr_prf_key_len = self.ike_prf_alg.key_len
292 integ_key_len = self.ike_integ_alg.key_len
293 if integ_key_len == 0:
303 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
306 self.sk_d = keymat[:pos+prf_key_trunc]
309 self.sk_ai = keymat[pos:pos+integ_key_len]
311 self.sk_ar = keymat[pos:pos+integ_key_len]
314 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
315 pos += encr_key_len + salt_size
316 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
317 pos += encr_key_len + salt_size
319 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
320 pos += tr_prf_key_len
321 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
323 def generate_authmsg(self, prf, packet):
324 if self.is_initiator:
328 data = bytes([self.id_type, 0, 0, 0]) + id
329 id_hash = self.calc_prf(prf, key, data)
330 return packet + nonce + id_hash
333 prf = self.ike_prf_alg.mod()
334 authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
335 if self.auth_method == 'shared-key':
336 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
337 self.auth_data = self.calc_prf(prf, psk, authmsg)
338 elif self.auth_method == 'rsa-sig':
339 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
342 raise TypeError('unknown auth method type!')
344 def encrypt(self, data, aad=None):
345 data = self.ike_crypto_alg.pad(data)
346 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
349 def peer_authkey(self):
350 if self.is_initiator:
355 def my_authkey(self):
356 if self.is_initiator:
361 def my_cryptokey(self):
362 if self.is_initiator:
367 def peer_cryptokey(self):
368 if self.is_initiator:
372 def concat(self, alg, key_len):
373 return alg + '-' + str(key_len * 8)
376 def vpp_ike_cypto_alg(self):
377 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
380 def vpp_esp_cypto_alg(self):
381 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
383 def verify_hmac(self, ikemsg):
384 integ_trunc = self.ike_integ_alg.trunc_len
385 exp_hmac = ikemsg[-integ_trunc:]
386 data = ikemsg[:-integ_trunc]
387 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
388 self.peer_authkey, data)
389 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
391 def compute_hmac(self, integ, key, data):
392 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
396 def decrypt(self, data, aad=None, icv=None):
397 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
399 def hmac_and_decrypt(self, ike):
400 ep = ike[ikev2.IKEv2_payload_Encrypted]
401 if self.ike_crypto == 'AES-GCM-16ICV':
402 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
403 ct = ep.load[:-GCM_ICV_SIZE]
404 tag = ep.load[-GCM_ICV_SIZE:]
405 return self.decrypt(ct, raw(ike)[:aad_len], tag)
407 self.verify_hmac(raw(ike))
408 integ_trunc = self.ike_integ_alg.trunc_len
410 # remove ICV and decrypt payload
411 ct = ep.load[:-integ_trunc]
412 return self.decrypt(ct)
414 def generate_ts(self):
415 c = self.child_sas[0]
416 ts1 = ikev2.IPv4TrafficSelector(
420 starting_address_v4=c.local_ts['start_addr'],
421 ending_address_v4=c.local_ts['end_addr'])
422 ts2 = ikev2.IPv4TrafficSelector(
424 starting_address_v4=c.remote_ts['start_addr'],
425 ending_address_v4=c.remote_ts['end_addr'])
426 return ([ts1], [ts2])
428 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
429 if crypto not in CRYPTO_ALGOS:
430 raise TypeError('unsupported encryption algo %r' % crypto)
431 self.ike_crypto = crypto
432 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
433 self.ike_crypto_key_len = crypto_key_len
435 if integ not in AUTH_ALGOS:
436 raise TypeError('unsupported auth algo %r' % integ)
437 self.ike_integ = None if integ == 'NULL' else integ
438 self.ike_integ_alg = AUTH_ALGOS[integ]
440 if prf not in PRF_ALGOS:
441 raise TypeError('unsupported prf algo %r' % prf)
443 self.ike_prf_alg = PRF_ALGOS[prf]
445 self.ike_group = DH[self.ike_dh]
447 def set_esp_props(self, crypto, crypto_key_len, integ):
448 self.esp_crypto_key_len = crypto_key_len
449 if crypto not in CRYPTO_ALGOS:
450 raise TypeError('unsupported encryption algo %r' % crypto)
451 self.esp_crypto = crypto
452 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
454 if integ not in AUTH_ALGOS:
455 raise TypeError('unsupported auth algo %r' % integ)
456 self.esp_integ = None if integ == 'NULL' else integ
457 self.esp_integ_alg = AUTH_ALGOS[integ]
459 def crypto_attr(self, key_len):
460 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
461 return (0x800e << 16 | key_len << 3, 12)
463 raise Exception('unsupported attribute type')
465 def ike_crypto_attr(self):
466 return self.crypto_attr(self.ike_crypto_key_len)
468 def esp_crypto_attr(self):
469 return self.crypto_attr(self.esp_crypto_key_len)
471 def compute_nat_sha1(self, ip, port):
472 data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
473 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
475 return digest.finalize()
478 class TemplateResponder(VppTestCase):
479 """ responder test template """
483 import scapy.contrib.ikev2 as _ikev2
484 globals()['ikev2'] = _ikev2
485 super(TemplateResponder, cls).setUpClass()
486 cls.create_pg_interfaces(range(2))
487 for i in cls.pg_interfaces:
493 def tearDownClass(cls):
494 super(TemplateResponder, cls).tearDownClass()
497 super(TemplateResponder, self).setUp()
499 self.p.add_vpp_config()
500 self.assertIsNotNone(self.p.query_vpp_config())
501 self.sa.generate_dh_data()
504 super(TemplateResponder, self).tearDown()
505 self.p.remove_vpp_config()
506 self.assertIsNone(self.p.query_vpp_config())
508 def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
509 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
510 IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
511 UDP(sport=sport, dport=dport))
513 # insert non ESP marker
514 res = res / Raw(b'\x00' * 4)
517 def send_sa_init(self, behind_nat=False):
518 tr_attr = self.sa.ike_crypto_attr()
519 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
520 transform_id=self.sa.ike_crypto, length=tr_attr[1],
521 key_length=tr_attr[0]) /
522 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
523 transform_id=self.sa.ike_integ) /
524 ikev2.IKEv2_payload_Transform(transform_type='PRF',
525 transform_id=self.sa.ike_prf_alg.name) /
526 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
527 transform_id=self.sa.ike_dh))
529 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
530 trans_nb=4, trans=trans))
533 next_payload = 'Notify'
537 self.sa.init_req_packet = (
538 ikev2.IKEv2(init_SPI=self.sa.ispi,
539 flags='Initiator', exch_type='IKE_SA_INIT') /
540 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
541 ikev2.IKEv2_payload_KE(next_payload='Nonce',
542 group=self.sa.ike_dh,
543 load=self.sa.dh_pub_key()) /
544 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
545 load=self.sa.i_nonce))
548 src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
550 nat_detection = ikev2.IKEv2_payload_Notify(
551 type='NAT_DETECTION_SOURCE_IP',
553 self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
555 ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
556 self.sa.sport, self.sa.dport,
558 self.pg0.add_stream(ike_msg)
559 self.pg0.enable_capture()
561 capture = self.pg0.get_capture(1)
562 self.verify_sa_init(capture[0])
564 def send_sa_auth(self):
565 tr_attr = self.sa.esp_crypto_attr()
566 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
567 transform_id=self.sa.esp_crypto, length=tr_attr[1],
568 key_length=tr_attr[0]) /
569 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
570 transform_id=self.sa.esp_integ) /
571 ikev2.IKEv2_payload_Transform(
572 transform_type='Extended Sequence Number',
573 transform_id='No ESN') /
574 ikev2.IKEv2_payload_Transform(
575 transform_type='Extended Sequence Number',
578 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
579 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
581 tsi, tsr = self.sa.generate_ts()
582 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
583 IDtype=self.sa.id_type, load=self.sa.i_id) /
584 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
585 IDtype=self.sa.id_type, load=self.sa.r_id) /
586 ikev2.IKEv2_payload_AUTH(next_payload='SA',
587 auth_type=AuthMethod.value(self.sa.auth_method),
588 load=self.sa.auth_data) /
589 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
590 ikev2.IKEv2_payload_TSi(next_payload='TSr',
591 number_of_TSs=len(tsi),
592 traffic_selector=tsi) /
593 ikev2.IKEv2_payload_TSr(next_payload='Notify',
594 number_of_TSs=len(tsr),
595 traffic_selector=tsr) /
596 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
598 if self.sa.ike_crypto == 'AES-GCM-16ICV':
599 data = self.sa.ike_crypto_alg.pad(raw(plain))
600 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
601 len(ikev2.IKEv2_payload_Encrypted())
602 tlen = plen + len(ikev2.IKEv2())
605 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
607 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
608 resp_SPI=self.sa.rspi, id=1,
609 length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
612 encr = self.sa.encrypt(raw(plain), raw(sa_auth))
613 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
614 length=plen, load=encr)
615 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
616 resp_SPI=self.sa.rspi, id=1,
617 length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
620 encr = self.sa.encrypt(raw(plain))
621 trunc_len = self.sa.ike_integ_alg.trunc_len
622 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
623 tlen = plen + len(ikev2.IKEv2())
625 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
626 length=plen, load=encr)
627 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
628 resp_SPI=self.sa.rspi, id=1,
629 length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
632 integ_data = raw(sa_auth)
633 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
634 self.sa.my_authkey, integ_data)
635 sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
637 assert(len(sa_auth) == tlen)
638 packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
639 self.sa.dport, self.sa.natt)
640 self.pg0.add_stream(packet)
641 self.pg0.enable_capture()
643 capture = self.pg0.get_capture(1)
644 self.verify_sa_auth(capture[0])
646 def get_ike_header(self, packet):
648 ih = packet[ikev2.IKEv2]
649 except IndexError as e:
650 # this is a workaround for getting IKEv2 layer as both ikev2 and
651 # ipsec register for port 4500
653 ih = self.verify_and_remove_non_esp_marker(esp)
656 def verify_sa_init(self, packet):
657 ih = self.get_ike_header(packet)
659 self.assertEqual(ih.exch_type, 34)
660 self.assertTrue('Response' in ih.flags)
661 self.assertEqual(ih.init_SPI, self.sa.ispi)
662 self.assertNotEqual(ih.resp_SPI, 0)
663 self.sa.rspi = ih.resp_SPI
665 sa = ih[ikev2.IKEv2_payload_SA]
666 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
667 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
668 except IndexError as e:
669 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
670 self.logger.error(ih.show())
672 self.sa.complete_dh_data()
676 def verify_and_remove_non_esp_marker(self, packet):
678 # if we are in nat traversal mode check for non esp marker
681 self.assertEqual(data[:4], b'\x00' * 4)
682 return ikev2.IKEv2(data[4:])
686 def verify_udp(self, udp):
687 self.assertEqual(udp.sport, self.sa.sport)
688 self.assertEqual(udp.dport, self.sa.dport)
690 def verify_sa_auth(self, packet):
691 ike = self.get_ike_header(packet)
694 plain = self.sa.hmac_and_decrypt(ike)
695 self.sa.calc_child_keys()
697 def verify_ipsec_sas(self):
698 sas = self.vapi.ipsec_sa_dump()
699 self.assertEqual(len(sas), 2)
702 c = self.sa.child_sas[0]
704 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
705 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
706 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
708 if self.sa.esp_integ is None:
711 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
712 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
713 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
716 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
717 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
718 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
719 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
723 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
724 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
725 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
726 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
728 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
729 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
731 def verify_keymat(self, api_keys, keys, name):
732 km = getattr(keys, name)
733 api_km = getattr(api_keys, name)
734 api_km_len = getattr(api_keys, name + '_len')
735 self.assertEqual(len(km), api_km_len)
736 self.assertEqual(km, api_km[:api_km_len])
738 def verify_id(self, api_id, exp_id):
739 self.assertEqual(api_id.type, IDType.value(exp_id.type))
740 self.assertEqual(api_id.data_len, exp_id.data_len)
741 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
743 def verify_ike_sas(self):
744 r = self.vapi.ikev2_sa_dump()
745 self.assertEqual(len(r), 1)
747 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'little'))
748 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
749 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
750 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
751 self.verify_keymat(sa.keys, self.sa, 'sk_d')
752 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
753 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
754 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
755 self.verify_keymat(sa.keys, self.sa, 'sk_er')
756 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
757 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
759 self.assertEqual(sa.i_id.type, self.sa.id_type)
760 self.assertEqual(sa.r_id.type, self.sa.id_type)
761 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
762 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
763 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
764 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
766 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
767 self.assertEqual(len(r), 1)
769 self.assertEqual(csa.sa_index, sa.sa_index)
770 c = self.sa.child_sas[0]
771 if hasattr(c, 'sk_ai'):
772 self.verify_keymat(csa.keys, c, 'sk_ai')
773 self.verify_keymat(csa.keys, c, 'sk_ar')
774 self.verify_keymat(csa.keys, c, 'sk_ei')
775 self.verify_keymat(csa.keys, c, 'sk_er')
777 tsi, tsr = self.sa.generate_ts()
780 r = self.vapi.ikev2_traffic_selector_dump(
781 is_initiator=True, sa_index=sa.sa_index,
782 child_sa_index=csa.child_sa_index)
783 self.assertEqual(len(r), 1)
785 self.verify_ts(r[0].ts, tsi[0], True)
787 r = self.vapi.ikev2_traffic_selector_dump(
788 is_initiator=False, sa_index=sa.sa_index,
789 child_sa_index=csa.child_sa_index)
790 self.assertEqual(len(r), 1)
791 self.verify_ts(r[0].ts, tsr[0], False)
793 n = self.vapi.ikev2_nonce_get(is_initiator=True,
794 sa_index=sa.sa_index)
795 self.verify_nonce(n, self.sa.i_nonce)
796 n = self.vapi.ikev2_nonce_get(is_initiator=False,
797 sa_index=sa.sa_index)
798 self.verify_nonce(n, self.sa.r_nonce)
800 def verify_nonce(self, api_nonce, nonce):
801 self.assertEqual(api_nonce.data_len, len(nonce))
802 self.assertEqual(api_nonce.nonce, nonce)
804 def verify_ts(self, api_ts, ts, is_initiator):
806 self.assertTrue(api_ts.is_local)
808 self.assertFalse(api_ts.is_local)
809 self.assertEqual(api_ts.start_addr,
810 IPv4Address(ts.starting_address_v4))
811 self.assertEqual(api_ts.end_addr,
812 IPv4Address(ts.ending_address_v4))
813 self.assertEqual(api_ts.start_port, ts.start_port)
814 self.assertEqual(api_ts.end_port, ts.end_port)
815 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
817 def test_responder(self):
818 self.send_sa_init(self.sa.natt)
820 self.verify_ipsec_sas()
821 self.verify_ike_sas()
824 class Ikev2Params(object):
825 def config_params(self, params={}):
826 ec = VppEnum.vl_api_ipsec_crypto_alg_t
827 ei = VppEnum.vl_api_ipsec_integ_alg_t
829 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
830 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
831 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
832 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
833 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
834 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
836 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
837 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
838 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
839 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
841 is_natt = 'natt' in params and params['natt'] or False
842 self.p = Profile(self, 'pr1')
844 if 'auth' in params and params['auth'] == 'rsa-sig':
845 auth_method = 'rsa-sig'
846 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
847 self.vapi.ikev2_set_local_key(
848 key_file=work_dir + params['server-key'])
850 client_file = work_dir + params['client-cert']
851 server_pem = open(work_dir + params['server-cert']).read()
852 client_priv = open(work_dir + params['client-key']).read()
853 client_priv = load_pem_private_key(str.encode(client_priv), None,
855 self.peer_cert = x509.load_pem_x509_certificate(
856 str.encode(server_pem),
858 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
861 auth_data = b'$3cr3tpa$$w0rd'
862 self.p.add_auth(method='shared-key', data=auth_data)
863 auth_method = 'shared-key'
866 self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
867 self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
868 self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
869 self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
871 self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
872 r_id=self.p.local_id['data'],
873 id_type=self.p.local_id['id_type'], natt=is_natt,
874 priv_key=client_priv, auth_method=auth_method,
876 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
878 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
880 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
882 ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
884 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
886 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
889 self.sa.set_ike_props(
890 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
891 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
892 self.sa.set_esp_props(
893 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
897 class TestApi(VppTestCase):
898 """ Test IKEV2 API """
901 super(TestApi, cls).setUpClass()
904 def tearDownClass(cls):
905 super(TestApi, cls).tearDownClass()
908 super(TestApi, self).tearDown()
909 self.p1.remove_vpp_config()
910 self.p2.remove_vpp_config()
911 r = self.vapi.ikev2_profile_dump()
912 self.assertEqual(len(r), 0)
914 def configure_profile(self, cfg):
915 p = Profile(self, cfg['name'])
916 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
917 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
918 p.add_local_ts(**cfg['loc_ts'])
919 p.add_remote_ts(**cfg['rem_ts'])
920 p.add_responder(cfg['responder'])
921 p.add_ike_transforms(cfg['ike_ts'])
922 p.add_esp_transforms(cfg['esp_ts'])
923 p.add_auth(**cfg['auth'])
924 p.set_udp_encap(cfg['udp_encap'])
925 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
926 if 'lifetime_data' in cfg:
927 p.set_lifetime_data(cfg['lifetime_data'])
929 p.set_tunnel_interface(cfg['tun_itf'])
933 def test_profile_api(self):
934 """ test profile dump API """
939 'start_addr': '3.3.3.2',
940 'end_addr': '3.3.3.3',
946 'start_addr': '4.5.76.80',
947 'end_addr': '2.3.4.6',
953 'loc_id': ('fqdn', b'vpp.home'),
954 'rem_id': ('fqdn', b'roadwarrior.example.com'),
957 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
960 'crypto_key_size': 32,
965 'crypto_key_size': 24,
967 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
969 'ipsec_over_udp_port': 4501,
972 'lifetime_maxdata': 20192,
973 'lifetime_jitter': 9,
978 'loc_id': ('ip4-addr', b'192.168.2.1'),
979 'rem_id': ('ip4-addr', b'192.168.2.2'),
982 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
985 'crypto_key_size': 16,
990 'crypto_key_size': 24,
992 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
994 'ipsec_over_udp_port': 4600,
997 self.p1 = self.configure_profile(conf['p1'])
998 self.p2 = self.configure_profile(conf['p2'])
1000 r = self.vapi.ikev2_profile_dump()
1001 self.assertEqual(len(r), 2)
1002 self.verify_profile(r[0].profile, conf['p1'])
1003 self.verify_profile(r[1].profile, conf['p2'])
1005 def verify_id(self, api_id, cfg_id):
1006 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1007 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1009 def verify_ts(self, api_ts, cfg_ts):
1010 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1011 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1012 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1013 self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
1014 self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
1016 def verify_responder(self, api_r, cfg_r):
1017 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1018 self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
1020 def verify_transforms(self, api_ts, cfg_ts):
1021 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1022 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1023 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1025 def verify_ike_transforms(self, api_ts, cfg_ts):
1026 self.verify_transforms(api_ts, cfg_ts)
1027 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1029 def verify_esp_transforms(self, api_ts, cfg_ts):
1030 self.verify_transforms(api_ts, cfg_ts)
1032 def verify_auth(self, api_auth, cfg_auth):
1033 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1034 self.assertEqual(api_auth.data, cfg_auth['data'])
1035 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1037 def verify_lifetime_data(self, p, ld):
1038 self.assertEqual(p.lifetime, ld['lifetime'])
1039 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1040 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1041 self.assertEqual(p.handover, ld['handover'])
1043 def verify_profile(self, ap, cp):
1044 self.assertEqual(ap.name, cp['name'])
1045 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1046 self.verify_id(ap.loc_id, cp['loc_id'])
1047 self.verify_id(ap.rem_id, cp['rem_id'])
1048 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1049 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1050 self.verify_responder(ap.responder, cp['responder'])
1051 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1052 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1053 self.verify_auth(ap.auth, cp['auth'])
1054 if 'lifetime_data' in cp:
1055 self.verify_lifetime_data(ap, cp['lifetime_data'])
1056 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1058 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1060 self.assertEqual(ap.tun_itf, 0xffffffff)
1063 class TestResponderNATT(TemplateResponder, Ikev2Params):
1064 """ test ikev2 responder - nat traversal """
1065 def config_tc(self):
1070 class TestResponderPsk(TemplateResponder, Ikev2Params):
1071 """ test ikev2 responder - pre shared key auth """
1072 def config_tc(self):
1073 self.config_params()
1076 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1077 """ test ikev2 responder - cert based auth """
1078 def config_tc(self):
1079 self.config_params({
1081 'server-key': 'server-key.pem',
1082 'client-key': 'client-key.pem',
1083 'client-cert': 'client-cert.pem',
1084 'server-cert': 'server-cert.pem'})
1087 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1088 (TemplateResponder, Ikev2Params):
1090 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1092 def config_tc(self):
1093 self.config_params({
1094 'ike-crypto': ('AES-CBC', 16),
1095 'ike-integ': 'SHA2-256-128',
1096 'esp-crypto': ('AES-CBC', 24),
1097 'esp-integ': 'SHA2-384-192',
1098 'ike-dh': '2048MODPgr'})
1101 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1102 (TemplateResponder, Ikev2Params):
1104 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1106 def config_tc(self):
1107 self.config_params({
1108 'ike-crypto': ('AES-CBC', 32),
1109 'ike-integ': 'SHA2-256-128',
1110 'esp-crypto': ('AES-GCM-16ICV', 32),
1111 'esp-integ': 'NULL',
1112 'ike-dh': '3072MODPgr'})
1115 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1119 def config_tc(self):
1120 self.config_params({
1121 'ike-crypto': ('AES-GCM-16ICV', 32),
1122 'ike-integ': 'NULL',
1123 'ike-dh': '2048MODPgr'})
1126 if __name__ == '__main__':
1127 unittest.main(testRunner=VppTestRunner)