2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
12 from ipaddress import IPv4Address, IPv6Address, ip_address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.layers.inet6 import IPv6
16 from scapy.packet import raw, Raw
17 from scapy.utils import long_converter
18 from framework import VppTestCase, VppTestRunner
19 from vpp_ikev2 import Profile, IDType, AuthMethod
20 from vpp_papi import VppEnum
27 KEY_PAD = b"Key Pad for IKEv2"
34 # tuple structure is (p, g, key_len)
36 '2048MODPgr': (long_converter("""
37 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
38 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
39 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
40 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
41 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
42 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
43 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
44 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
45 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
46 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
47 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
49 '3072MODPgr': (long_converter("""
50 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
51 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
52 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
53 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
54 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
55 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
56 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
57 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
58 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
59 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
60 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
61 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
62 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
63 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
64 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
65 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
69 class CryptoAlgo(object):
70 def __init__(self, name, cipher, mode):
74 if self.cipher is not None:
75 self.bs = self.cipher.block_size // 8
77 if self.name == 'AES-GCM-16ICV':
78 self.iv_len = GCM_IV_SIZE
82 def encrypt(self, data, key, aad=None):
83 iv = os.urandom(self.iv_len)
85 encryptor = Cipher(self.cipher(key), self.mode(iv),
86 default_backend()).encryptor()
87 return iv + encryptor.update(data) + encryptor.finalize()
89 salt = key[-SALT_SIZE:]
91 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
92 default_backend()).encryptor()
93 encryptor.authenticate_additional_data(aad)
94 data = encryptor.update(data) + encryptor.finalize()
95 data += encryptor.tag[:GCM_ICV_SIZE]
98 def decrypt(self, data, key, aad=None, icv=None):
100 iv = data[:self.iv_len]
101 ct = data[self.iv_len:]
102 decryptor = Cipher(algorithms.AES(key),
104 default_backend()).decryptor()
105 return decryptor.update(ct) + decryptor.finalize()
107 salt = key[-SALT_SIZE:]
108 nonce = salt + data[:GCM_IV_SIZE]
109 ct = data[GCM_IV_SIZE:]
110 key = key[:-SALT_SIZE]
111 decryptor = Cipher(algorithms.AES(key),
112 self.mode(nonce, icv, len(icv)),
113 default_backend()).decryptor()
114 decryptor.authenticate_additional_data(aad)
115 pt = decryptor.update(ct) + decryptor.finalize()
120 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121 data = data + b'\x00' * (pad_len - 1)
122 return data + bytes([pad_len - 1])
125 class AuthAlgo(object):
126 def __init__(self, name, mac, mod, key_len, trunc_len=None):
130 self.key_len = key_len
131 self.trunc_len = trunc_len or key_len
135 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
142 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
150 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
156 class IKEv2ChildSA(object):
157 def __init__(self, local_ts, remote_ts, spi=None):
158 self.spi = spi or os.urandom(4)
159 self.local_ts = local_ts
160 self.remote_ts = remote_ts
163 class IKEv2SA(object):
164 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
165 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
166 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
167 auth_method='shared-key', priv_key=None, natt=False):
176 self.dh_params = None
178 self.priv_key = priv_key
179 self.is_initiator = is_initiator
180 nonce = nonce or os.urandom(32)
181 self.auth_data = auth_data
184 if isinstance(id_type, str):
185 self.id_type = IDType.value(id_type)
187 self.id_type = id_type
188 self.auth_method = auth_method
189 if self.is_initiator:
190 self.rspi = 8 * b'\x00'
195 self.ispi = 8 * b'\x00'
197 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
199 def new_msg_id(self):
203 def dh_pub_key(self):
204 return self.i_dh_data
206 def compute_secret(self):
207 priv = self.dh_private_key
208 peer = self.r_dh_data
209 p, g, l = self.ike_group
210 return pow(int.from_bytes(peer, 'big'),
211 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
213 def generate_dh_data(self):
215 if self.is_initiator:
216 if self.ike_dh not in DH:
217 raise NotImplementedError('%s not in DH group' % self.ike_dh)
218 if self.dh_params is None:
219 dhg = DH[self.ike_dh]
220 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
221 self.dh_params = pn.parameters(default_backend())
222 priv = self.dh_params.generate_private_key()
223 pub = priv.public_key()
224 x = priv.private_numbers().x
225 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
226 y = pub.public_numbers().y
227 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
229 def complete_dh_data(self):
230 self.dh_shared_secret = self.compute_secret()
232 def calc_child_keys(self):
233 prf = self.ike_prf_alg.mod()
234 s = self.i_nonce + self.r_nonce
235 c = self.child_sas[0]
237 encr_key_len = self.esp_crypto_key_len
238 integ_key_len = self.esp_integ_alg.key_len
239 salt_len = 0 if integ_key_len else 4
241 l = (integ_key_len * 2 +
244 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
247 c.sk_ei = keymat[pos:pos+encr_key_len]
251 c.sk_ai = keymat[pos:pos+integ_key_len]
254 c.salt_ei = keymat[pos:pos+salt_len]
257 c.sk_er = keymat[pos:pos+encr_key_len]
261 c.sk_ar = keymat[pos:pos+integ_key_len]
264 c.salt_er = keymat[pos:pos+salt_len]
267 def calc_prfplus(self, prf, key, seed, length):
271 while len(r) < length and x < 255:
276 s = s + seed + bytes([x])
277 t = self.calc_prf(prf, key, s)
285 def calc_prf(self, prf, key, data):
286 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
291 prf = self.ike_prf_alg.mod()
292 # SKEYSEED = prf(Ni | Nr, g^ir)
293 s = self.i_nonce + self.r_nonce
294 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
296 # calculate S = Ni | Nr | SPIi SPIr
297 s = s + self.ispi + self.rspi
299 prf_key_trunc = self.ike_prf_alg.trunc_len
300 encr_key_len = self.ike_crypto_key_len
301 tr_prf_key_len = self.ike_prf_alg.key_len
302 integ_key_len = self.ike_integ_alg.key_len
303 if integ_key_len == 0:
313 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
316 self.sk_d = keymat[:pos+prf_key_trunc]
319 self.sk_ai = keymat[pos:pos+integ_key_len]
321 self.sk_ar = keymat[pos:pos+integ_key_len]
324 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
325 pos += encr_key_len + salt_size
326 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
327 pos += encr_key_len + salt_size
329 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
330 pos += tr_prf_key_len
331 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
333 def generate_authmsg(self, prf, packet):
334 if self.is_initiator:
338 data = bytes([self.id_type, 0, 0, 0]) + id
339 id_hash = self.calc_prf(prf, key, data)
340 return packet + nonce + id_hash
343 prf = self.ike_prf_alg.mod()
344 authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
345 if self.auth_method == 'shared-key':
346 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
347 self.auth_data = self.calc_prf(prf, psk, authmsg)
348 elif self.auth_method == 'rsa-sig':
349 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
352 raise TypeError('unknown auth method type!')
354 def encrypt(self, data, aad=None):
355 data = self.ike_crypto_alg.pad(data)
356 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
359 def peer_authkey(self):
360 if self.is_initiator:
365 def my_authkey(self):
366 if self.is_initiator:
371 def my_cryptokey(self):
372 if self.is_initiator:
377 def peer_cryptokey(self):
378 if self.is_initiator:
382 def concat(self, alg, key_len):
383 return alg + '-' + str(key_len * 8)
386 def vpp_ike_cypto_alg(self):
387 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
390 def vpp_esp_cypto_alg(self):
391 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
393 def verify_hmac(self, ikemsg):
394 integ_trunc = self.ike_integ_alg.trunc_len
395 exp_hmac = ikemsg[-integ_trunc:]
396 data = ikemsg[:-integ_trunc]
397 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
398 self.peer_authkey, data)
399 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
401 def compute_hmac(self, integ, key, data):
402 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
406 def decrypt(self, data, aad=None, icv=None):
407 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
409 def hmac_and_decrypt(self, ike):
410 ep = ike[ikev2.IKEv2_payload_Encrypted]
411 if self.ike_crypto == 'AES-GCM-16ICV':
412 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
413 ct = ep.load[:-GCM_ICV_SIZE]
414 tag = ep.load[-GCM_ICV_SIZE:]
415 return self.decrypt(ct, raw(ike)[:aad_len], tag)
417 self.verify_hmac(raw(ike))
418 integ_trunc = self.ike_integ_alg.trunc_len
420 # remove ICV and decrypt payload
421 ct = ep.load[:-integ_trunc]
422 return self.decrypt(ct)
424 def build_ts_addr(self, ts, version):
425 return {'starting_address_v' + version: ts['start_addr'],
426 'ending_address_v' + version: ts['end_addr']}
428 def generate_ts(self, is_ip4):
429 c = self.child_sas[0]
430 ts_data = {'IP_protocol_ID': 0,
434 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
435 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
436 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
437 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
439 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
440 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
441 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
442 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
443 return ([ts1], [ts2])
445 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
446 if crypto not in CRYPTO_ALGOS:
447 raise TypeError('unsupported encryption algo %r' % crypto)
448 self.ike_crypto = crypto
449 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
450 self.ike_crypto_key_len = crypto_key_len
452 if integ not in AUTH_ALGOS:
453 raise TypeError('unsupported auth algo %r' % integ)
454 self.ike_integ = None if integ == 'NULL' else integ
455 self.ike_integ_alg = AUTH_ALGOS[integ]
457 if prf not in PRF_ALGOS:
458 raise TypeError('unsupported prf algo %r' % prf)
460 self.ike_prf_alg = PRF_ALGOS[prf]
462 self.ike_group = DH[self.ike_dh]
464 def set_esp_props(self, crypto, crypto_key_len, integ):
465 self.esp_crypto_key_len = crypto_key_len
466 if crypto not in CRYPTO_ALGOS:
467 raise TypeError('unsupported encryption algo %r' % crypto)
468 self.esp_crypto = crypto
469 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
471 if integ not in AUTH_ALGOS:
472 raise TypeError('unsupported auth algo %r' % integ)
473 self.esp_integ = None if integ == 'NULL' else integ
474 self.esp_integ_alg = AUTH_ALGOS[integ]
476 def crypto_attr(self, key_len):
477 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
478 return (0x800e << 16 | key_len << 3, 12)
480 raise Exception('unsupported attribute type')
482 def ike_crypto_attr(self):
483 return self.crypto_attr(self.ike_crypto_key_len)
485 def esp_crypto_attr(self):
486 return self.crypto_attr(self.esp_crypto_key_len)
488 def compute_nat_sha1(self, ip, port):
489 data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
490 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
492 return digest.finalize()
495 class TemplateResponder(VppTestCase):
496 """ responder test template """
500 import scapy.contrib.ikev2 as _ikev2
501 globals()['ikev2'] = _ikev2
502 super(TemplateResponder, cls).setUpClass()
503 cls.create_pg_interfaces(range(2))
504 for i in cls.pg_interfaces:
512 def tearDownClass(cls):
513 super(TemplateResponder, cls).tearDownClass()
516 super(TemplateResponder, self).setUp()
518 self.p.add_vpp_config()
519 self.assertIsNotNone(self.p.query_vpp_config())
520 self.sa.generate_dh_data()
521 self.vapi.cli('ikev2 set logging level 4')
522 self.vapi.cli('event-lo clear')
525 super(TemplateResponder, self).tearDown()
526 if self.sa.is_initiator:
527 self.initiate_del_sa()
528 r = self.vapi.ikev2_sa_dump()
529 self.assertEqual(len(r), 0)
531 self.p.remove_vpp_config()
532 self.assertIsNone(self.p.query_vpp_config())
534 def verify_del_sa(self, packet):
535 ih = self.get_ike_header(packet)
536 self.assertEqual(ih.id, self.sa.msg_id)
537 self.assertEqual(ih.exch_type, 37) # exchange informational
539 def initiate_del_sa(self):
540 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
541 flags='Initiator', exch_type='INFORMATIONAL',
542 id=self.sa.new_msg_id())
543 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
544 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
545 packet = self.create_packet(self.pg0, ike_msg,
546 self.sa.sport, self.sa.dport,
547 self.sa.natt, self.ip6)
548 self.pg0.add_stream(packet)
549 self.pg0.enable_capture()
551 capture = self.pg0.get_capture(1)
552 self.verify_del_sa(capture[0])
554 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
557 src_ip = src_if.remote_ip6
558 dst_ip = src_if.local_ip6
561 src_ip = src_if.remote_ip4
562 dst_ip = src_if.local_ip4
564 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
565 ip_layer(src=src_ip, dst=dst_ip) /
566 UDP(sport=sport, dport=dport))
568 # insert non ESP marker
569 res = res / Raw(b'\x00' * 4)
572 def send_sa_init(self, behind_nat=False):
573 tr_attr = self.sa.ike_crypto_attr()
574 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
575 transform_id=self.sa.ike_crypto, length=tr_attr[1],
576 key_length=tr_attr[0]) /
577 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
578 transform_id=self.sa.ike_integ) /
579 ikev2.IKEv2_payload_Transform(transform_type='PRF',
580 transform_id=self.sa.ike_prf_alg.name) /
581 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
582 transform_id=self.sa.ike_dh))
584 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
585 trans_nb=4, trans=trans))
588 next_payload = 'Notify'
592 self.sa.init_req_packet = (
593 ikev2.IKEv2(init_SPI=self.sa.ispi,
594 flags='Initiator', exch_type='IKE_SA_INIT') /
595 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
596 ikev2.IKEv2_payload_KE(next_payload='Nonce',
597 group=self.sa.ike_dh,
598 load=self.sa.dh_pub_key()) /
599 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
600 load=self.sa.i_nonce))
603 src_address = b'\x0a\x0a\x0a\x01'
605 src_address = bytes(self.pg0.local_ip4, 'ascii')
607 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
608 dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
610 nat_src_detection = ikev2.IKEv2_payload_Notify(
611 type='NAT_DETECTION_SOURCE_IP', load=src_nat)
612 nat_dst_detection = ikev2.IKEv2_payload_Notify(
613 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
614 self.sa.init_req_packet = (self.sa.init_req_packet /
618 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
619 self.sa.sport, self.sa.dport,
620 self.sa.natt, self.ip6)
621 self.pg0.add_stream(ike_msg)
622 self.pg0.enable_capture()
624 capture = self.pg0.get_capture(1)
625 self.verify_sa_init(capture[0])
627 def encrypt_ike_msg(self, header, plain, first_payload):
628 if self.sa.ike_crypto == 'AES-GCM-16ICV':
629 data = self.sa.ike_crypto_alg.pad(raw(plain))
630 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
631 len(ikev2.IKEv2_payload_Encrypted())
632 tlen = plen + len(ikev2.IKEv2())
635 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
639 encr = self.sa.encrypt(raw(plain), raw(res))
640 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
641 length=plen, load=encr)
644 encr = self.sa.encrypt(raw(plain))
645 trunc_len = self.sa.ike_integ_alg.trunc_len
646 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
647 tlen = plen + len(ikev2.IKEv2())
649 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
650 length=plen, load=encr)
654 integ_data = raw(res)
655 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
656 self.sa.my_authkey, integ_data)
657 res = res / Raw(hmac_data[:trunc_len])
658 assert(len(res) == tlen)
661 def send_sa_auth(self):
662 tr_attr = self.sa.esp_crypto_attr()
663 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
664 transform_id=self.sa.esp_crypto, length=tr_attr[1],
665 key_length=tr_attr[0]) /
666 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
667 transform_id=self.sa.esp_integ) /
668 ikev2.IKEv2_payload_Transform(
669 transform_type='Extended Sequence Number',
670 transform_id='No ESN') /
671 ikev2.IKEv2_payload_Transform(
672 transform_type='Extended Sequence Number',
675 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
676 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
678 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
679 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
680 IDtype=self.sa.id_type, load=self.sa.i_id) /
681 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
682 IDtype=self.sa.id_type, load=self.sa.r_id) /
683 ikev2.IKEv2_payload_AUTH(next_payload='SA',
684 auth_type=AuthMethod.value(self.sa.auth_method),
685 load=self.sa.auth_data) /
686 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
687 ikev2.IKEv2_payload_TSi(next_payload='TSr',
688 number_of_TSs=len(tsi),
689 traffic_selector=tsi) /
690 ikev2.IKEv2_payload_TSr(next_payload='Notify',
691 number_of_TSs=len(tsr),
692 traffic_selector=tsr) /
693 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
695 header = ikev2.IKEv2(
696 init_SPI=self.sa.ispi,
697 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
698 flags='Initiator', exch_type='IKE_AUTH')
700 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
701 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
702 self.sa.dport, self.sa.natt, self.ip6)
703 self.pg0.add_stream(packet)
704 self.pg0.enable_capture()
706 capture = self.pg0.get_capture(1)
707 self.verify_sa_auth(capture[0])
709 def get_ike_header(self, packet):
711 ih = packet[ikev2.IKEv2]
712 except IndexError as e:
713 # this is a workaround for getting IKEv2 layer as both ikev2 and
714 # ipsec register for port 4500
716 ih = self.verify_and_remove_non_esp_marker(esp)
718 self.assertEqual(ih.version, 0x20)
721 def verify_sa_init(self, packet):
722 ih = self.get_ike_header(packet)
724 self.assertEqual(ih.id, self.sa.msg_id)
725 self.assertEqual(ih.exch_type, 34)
726 self.assertTrue('Response' in ih.flags)
727 self.assertEqual(ih.init_SPI, self.sa.ispi)
728 self.assertNotEqual(ih.resp_SPI, 0)
729 self.sa.rspi = ih.resp_SPI
731 sa = ih[ikev2.IKEv2_payload_SA]
732 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
733 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
734 except IndexError as e:
735 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
736 self.logger.error(ih.show())
738 self.sa.complete_dh_data()
742 def verify_and_remove_non_esp_marker(self, packet):
744 # if we are in nat traversal mode check for non esp marker
747 self.assertEqual(data[:4], b'\x00' * 4)
748 return ikev2.IKEv2(data[4:])
752 def verify_udp(self, udp):
753 self.assertEqual(udp.sport, self.sa.sport)
754 self.assertEqual(udp.dport, self.sa.dport)
756 def verify_sa_auth(self, packet):
757 ike = self.get_ike_header(packet)
760 self.assertEqual(ike.id, self.sa.msg_id)
761 plain = self.sa.hmac_and_decrypt(ike)
762 self.sa.calc_child_keys()
764 def verify_ipsec_sas(self):
765 sas = self.vapi.ipsec_sa_dump()
766 self.assertEqual(len(sas), 2)
769 c = self.sa.child_sas[0]
771 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
772 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
773 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
775 if self.sa.esp_integ is None:
778 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
779 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
780 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
783 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
784 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
785 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
786 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
790 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
791 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
792 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
793 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
795 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
796 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
798 def verify_keymat(self, api_keys, keys, name):
799 km = getattr(keys, name)
800 api_km = getattr(api_keys, name)
801 api_km_len = getattr(api_keys, name + '_len')
802 self.assertEqual(len(km), api_km_len)
803 self.assertEqual(km, api_km[:api_km_len])
805 def verify_id(self, api_id, exp_id):
806 self.assertEqual(api_id.type, IDType.value(exp_id.type))
807 self.assertEqual(api_id.data_len, exp_id.data_len)
808 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
810 def verify_ike_sas(self):
811 r = self.vapi.ikev2_sa_dump()
812 self.assertEqual(len(r), 1)
814 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
815 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
817 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
818 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
820 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
821 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
822 self.verify_keymat(sa.keys, self.sa, 'sk_d')
823 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
824 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
825 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
826 self.verify_keymat(sa.keys, self.sa, 'sk_er')
827 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
828 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
830 self.assertEqual(sa.i_id.type, self.sa.id_type)
831 self.assertEqual(sa.r_id.type, self.sa.id_type)
832 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
833 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
834 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
835 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
837 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
838 self.assertEqual(len(r), 1)
840 self.assertEqual(csa.sa_index, sa.sa_index)
841 c = self.sa.child_sas[0]
842 if hasattr(c, 'sk_ai'):
843 self.verify_keymat(csa.keys, c, 'sk_ai')
844 self.verify_keymat(csa.keys, c, 'sk_ar')
845 self.verify_keymat(csa.keys, c, 'sk_ei')
846 self.verify_keymat(csa.keys, c, 'sk_er')
848 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
851 r = self.vapi.ikev2_traffic_selector_dump(
852 is_initiator=True, sa_index=sa.sa_index,
853 child_sa_index=csa.child_sa_index)
854 self.assertEqual(len(r), 1)
856 self.verify_ts(r[0].ts, tsi[0], True)
858 r = self.vapi.ikev2_traffic_selector_dump(
859 is_initiator=False, sa_index=sa.sa_index,
860 child_sa_index=csa.child_sa_index)
861 self.assertEqual(len(r), 1)
862 self.verify_ts(r[0].ts, tsr[0], False)
864 n = self.vapi.ikev2_nonce_get(is_initiator=True,
865 sa_index=sa.sa_index)
866 self.verify_nonce(n, self.sa.i_nonce)
867 n = self.vapi.ikev2_nonce_get(is_initiator=False,
868 sa_index=sa.sa_index)
869 self.verify_nonce(n, self.sa.r_nonce)
871 def verify_nonce(self, api_nonce, nonce):
872 self.assertEqual(api_nonce.data_len, len(nonce))
873 self.assertEqual(api_nonce.nonce, nonce)
875 def verify_ts(self, api_ts, ts, is_initiator):
877 self.assertTrue(api_ts.is_local)
879 self.assertFalse(api_ts.is_local)
882 self.assertEqual(api_ts.start_addr,
883 IPv4Address(ts.starting_address_v4))
884 self.assertEqual(api_ts.end_addr,
885 IPv4Address(ts.ending_address_v4))
887 self.assertEqual(api_ts.start_addr,
888 IPv6Address(ts.starting_address_v6))
889 self.assertEqual(api_ts.end_addr,
890 IPv6Address(ts.ending_address_v6))
891 self.assertEqual(api_ts.start_port, ts.start_port)
892 self.assertEqual(api_ts.end_port, ts.end_port)
893 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
895 def test_responder(self):
896 self.send_sa_init(self.sa.natt)
898 self.verify_ipsec_sas()
899 self.verify_ike_sas()
902 class Ikev2Params(object):
903 def config_params(self, params={}):
904 ec = VppEnum.vl_api_ipsec_crypto_alg_t
905 ei = VppEnum.vl_api_ipsec_integ_alg_t
907 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
908 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
909 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
910 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
911 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
912 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
914 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
915 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
916 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
917 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
919 is_natt = 'natt' in params and params['natt'] or False
920 self.p = Profile(self, 'pr1')
921 self.ip6 = False if 'ip6' not in params else params['ip6']
923 if 'auth' in params and params['auth'] == 'rsa-sig':
924 auth_method = 'rsa-sig'
925 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
926 self.vapi.ikev2_set_local_key(
927 key_file=work_dir + params['server-key'])
929 client_file = work_dir + params['client-cert']
930 server_pem = open(work_dir + params['server-cert']).read()
931 client_priv = open(work_dir + params['client-key']).read()
932 client_priv = load_pem_private_key(str.encode(client_priv), None,
934 self.peer_cert = x509.load_pem_x509_certificate(
935 str.encode(server_pem),
937 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
940 auth_data = b'$3cr3tpa$$w0rd'
941 self.p.add_auth(method='shared-key', data=auth_data)
942 auth_method = 'shared-key'
945 self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
946 self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
947 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
948 'loc_ts' not in params else params['loc_ts']
949 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
950 'rem_ts' not in params else params['rem_ts']
951 self.p.add_local_ts(**loc_ts)
952 self.p.add_remote_ts(**rem_ts)
954 self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
955 r_id=self.p.local_id['data'],
956 id_type=self.p.local_id['id_type'], natt=is_natt,
957 priv_key=client_priv, auth_method=auth_method,
959 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
961 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
963 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
965 ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
967 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
969 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
972 self.sa.set_ike_props(
973 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
974 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
975 self.sa.set_esp_props(
976 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
980 class TestApi(VppTestCase):
981 """ Test IKEV2 API """
984 super(TestApi, cls).setUpClass()
987 def tearDownClass(cls):
988 super(TestApi, cls).tearDownClass()
991 super(TestApi, self).tearDown()
992 self.p1.remove_vpp_config()
993 self.p2.remove_vpp_config()
994 r = self.vapi.ikev2_profile_dump()
995 self.assertEqual(len(r), 0)
997 def configure_profile(self, cfg):
998 p = Profile(self, cfg['name'])
999 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1000 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1001 p.add_local_ts(**cfg['loc_ts'])
1002 p.add_remote_ts(**cfg['rem_ts'])
1003 p.add_responder(cfg['responder'])
1004 p.add_ike_transforms(cfg['ike_ts'])
1005 p.add_esp_transforms(cfg['esp_ts'])
1006 p.add_auth(**cfg['auth'])
1007 p.set_udp_encap(cfg['udp_encap'])
1008 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1009 if 'lifetime_data' in cfg:
1010 p.set_lifetime_data(cfg['lifetime_data'])
1011 if 'tun_itf' in cfg:
1012 p.set_tunnel_interface(cfg['tun_itf'])
1016 def test_profile_api(self):
1017 """ test profile dump API """
1022 'start_addr': '3.3.3.2',
1023 'end_addr': '3.3.3.3',
1029 'start_addr': '4.5.76.80',
1030 'end_addr': '2.3.4.6',
1037 'start_addr': 'ab::1',
1038 'end_addr': 'ab::4',
1044 'start_addr': 'cd::12',
1045 'end_addr': 'cd::13',
1051 'loc_id': ('fqdn', b'vpp.home'),
1052 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1055 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1058 'crypto_key_size': 32,
1063 'crypto_key_size': 24,
1065 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1067 'ipsec_over_udp_port': 4501,
1070 'lifetime_maxdata': 20192,
1071 'lifetime_jitter': 9,
1076 'loc_id': ('ip4-addr', b'192.168.2.1'),
1077 'rem_id': ('ip6-addr', b'abcd::1'),
1080 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1083 'crypto_key_size': 16,
1088 'crypto_key_size': 24,
1090 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1092 'ipsec_over_udp_port': 4600,
1095 self.p1 = self.configure_profile(conf['p1'])
1096 self.p2 = self.configure_profile(conf['p2'])
1098 r = self.vapi.ikev2_profile_dump()
1099 self.assertEqual(len(r), 2)
1100 self.verify_profile(r[0].profile, conf['p1'])
1101 self.verify_profile(r[1].profile, conf['p2'])
1103 def verify_id(self, api_id, cfg_id):
1104 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1105 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1107 def verify_ts(self, api_ts, cfg_ts):
1108 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1109 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1110 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1111 self.assertEqual(api_ts.start_addr,
1112 ip_address(text_type(cfg_ts['start_addr'])))
1113 self.assertEqual(api_ts.end_addr,
1114 ip_address(text_type(cfg_ts['end_addr'])))
1116 def verify_responder(self, api_r, cfg_r):
1117 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1118 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1120 def verify_transforms(self, api_ts, cfg_ts):
1121 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1122 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1123 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1125 def verify_ike_transforms(self, api_ts, cfg_ts):
1126 self.verify_transforms(api_ts, cfg_ts)
1127 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1129 def verify_esp_transforms(self, api_ts, cfg_ts):
1130 self.verify_transforms(api_ts, cfg_ts)
1132 def verify_auth(self, api_auth, cfg_auth):
1133 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1134 self.assertEqual(api_auth.data, cfg_auth['data'])
1135 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1137 def verify_lifetime_data(self, p, ld):
1138 self.assertEqual(p.lifetime, ld['lifetime'])
1139 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1140 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1141 self.assertEqual(p.handover, ld['handover'])
1143 def verify_profile(self, ap, cp):
1144 self.assertEqual(ap.name, cp['name'])
1145 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1146 self.verify_id(ap.loc_id, cp['loc_id'])
1147 self.verify_id(ap.rem_id, cp['rem_id'])
1148 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1149 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1150 self.verify_responder(ap.responder, cp['responder'])
1151 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1152 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1153 self.verify_auth(ap.auth, cp['auth'])
1154 if 'lifetime_data' in cp:
1155 self.verify_lifetime_data(ap, cp['lifetime_data'])
1156 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1158 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1160 self.assertEqual(ap.tun_itf, 0xffffffff)
1163 class TestResponderNATT(TemplateResponder, Ikev2Params):
1164 """ test ikev2 responder - nat traversal """
1165 def config_tc(self):
1170 class TestResponderPsk(TemplateResponder, Ikev2Params):
1171 """ test ikev2 responder - pre shared key auth """
1172 def config_tc(self):
1173 self.config_params()
1176 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1177 """ test ikev2 responder - cert based auth """
1178 def config_tc(self):
1179 self.config_params({
1181 'server-key': 'server-key.pem',
1182 'client-key': 'client-key.pem',
1183 'client-cert': 'client-cert.pem',
1184 'server-cert': 'server-cert.pem'})
1187 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1188 (TemplateResponder, Ikev2Params):
1190 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1192 def config_tc(self):
1193 self.config_params({
1194 'ike-crypto': ('AES-CBC', 16),
1195 'ike-integ': 'SHA2-256-128',
1196 'esp-crypto': ('AES-CBC', 24),
1197 'esp-integ': 'SHA2-384-192',
1198 'ike-dh': '2048MODPgr'})
1201 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1202 (TemplateResponder, Ikev2Params):
1204 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1206 def config_tc(self):
1207 self.config_params({
1208 'ike-crypto': ('AES-CBC', 32),
1209 'ike-integ': 'SHA2-256-128',
1210 'esp-crypto': ('AES-GCM-16ICV', 32),
1211 'esp-integ': 'NULL',
1212 'ike-dh': '3072MODPgr'})
1215 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1219 def config_tc(self):
1220 self.config_params({
1223 'ike-crypto': ('AES-GCM-16ICV', 32),
1224 'ike-integ': 'NULL',
1225 'ike-dh': '2048MODPgr',
1226 'loc_ts': {'start_addr': 'ab:cd::0',
1227 'end_addr': 'ab:cd::10'},
1228 'rem_ts': {'start_addr': '11::0',
1229 'end_addr': '11::100'}})
1232 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1233 """ malformed packet test """
1238 def config_tc(self):
1239 self.config_params()
1241 def assert_counter(self, count, name, version='ip4'):
1242 node_name = '/err/ikev2-%s/' % version + name
1243 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1245 def create_ike_init_msg(self, length=None, payload=None):
1246 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1247 flags='Initiator', exch_type='IKE_SA_INIT')
1248 if payload is not None:
1250 return self.create_packet(self.pg0, msg, self.sa.sport,
1253 def verify_bad_packet_length(self):
1254 ike_msg = self.create_ike_init_msg(length=0xdead)
1255 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1256 self.assert_counter(self.pkt_count, 'Bad packet length')
1258 def verify_bad_sa_payload_length(self):
1259 p = ikev2.IKEv2_payload_SA(length=0xdead)
1260 ike_msg = self.create_ike_init_msg(payload=p)
1261 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1262 self.assert_counter(self.pkt_count, 'Malformed packet')
1264 def test_responder(self):
1265 self.pkt_count = 254
1266 self.verify_bad_packet_length()
1267 self.verify_bad_sa_payload_length()
1270 if __name__ == '__main__':
1271 unittest.main(testRunner=VppTestRunner)