2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
12 from ipaddress import IPv4Address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.packet import raw, Raw
16 from scapy.utils import long_converter
17 from framework import VppTestCase, VppTestRunner
18 from vpp_ikev2 import Profile, IDType, AuthMethod
19 from vpp_papi import VppEnum
22 KEY_PAD = b"Key Pad for IKEv2"
29 # tuple structure is (p, g, key_len)
31 '2048MODPgr': (long_converter("""
32 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
33 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
34 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
35 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
36 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
37 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
38 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
39 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
40 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
41 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
42 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
44 '3072MODPgr': (long_converter("""
45 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
56 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
57 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
58 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
59 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
60 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
64 class CryptoAlgo(object):
65 def __init__(self, name, cipher, mode):
69 if self.cipher is not None:
70 self.bs = self.cipher.block_size // 8
72 if self.name == 'AES-GCM-16ICV':
73 self.iv_len = GCM_IV_SIZE
77 def encrypt(self, data, key, aad=None):
78 iv = os.urandom(self.iv_len)
80 encryptor = Cipher(self.cipher(key), self.mode(iv),
81 default_backend()).encryptor()
82 return iv + encryptor.update(data) + encryptor.finalize()
84 salt = key[-SALT_SIZE:]
86 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
87 default_backend()).encryptor()
88 encryptor.authenticate_additional_data(aad)
89 data = encryptor.update(data) + encryptor.finalize()
90 data += encryptor.tag[:GCM_ICV_SIZE]
93 def decrypt(self, data, key, aad=None, icv=None):
95 iv = data[:self.iv_len]
96 ct = data[self.iv_len:]
97 decryptor = Cipher(algorithms.AES(key),
99 default_backend()).decryptor()
100 return decryptor.update(ct) + decryptor.finalize()
102 salt = key[-SALT_SIZE:]
103 nonce = salt + data[:GCM_IV_SIZE]
104 ct = data[GCM_IV_SIZE:]
105 key = key[:-SALT_SIZE]
106 decryptor = Cipher(algorithms.AES(key),
107 self.mode(nonce, icv, len(icv)),
108 default_backend()).decryptor()
109 decryptor.authenticate_additional_data(aad)
110 pt = decryptor.update(ct) + decryptor.finalize()
115 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
116 data = data + b'\x00' * (pad_len - 1)
117 return data + bytes([pad_len - 1])
120 class AuthAlgo(object):
121 def __init__(self, name, mac, mod, key_len, trunc_len=None):
125 self.key_len = key_len
126 self.trunc_len = trunc_len or key_len
130 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
131 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
132 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
137 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
138 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
139 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
140 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
141 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
145 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
151 class IKEv2ChildSA(object):
152 def __init__(self, local_ts, remote_ts, spi=None):
153 self.spi = spi or os.urandom(4)
154 self.local_ts = local_ts
155 self.remote_ts = remote_ts
158 class IKEv2SA(object):
159 def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
160 i_id=None, r_id=None, id_type='fqdn', nonce=None,
161 auth_data=None, local_ts=None, remote_ts=None,
162 auth_method='shared-key', priv_key=None, natt=False):
171 self.dh_params = None
173 self.priv_key = priv_key
174 self.is_initiator = is_initiator
175 nonce = nonce or os.urandom(32)
176 self.auth_data = auth_data
179 if isinstance(id_type, str):
180 self.id_type = IDType.value(id_type)
182 self.id_type = id_type
183 self.auth_method = auth_method
184 if self.is_initiator:
192 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
194 def new_msg_id(self):
198 def dh_pub_key(self):
199 return self.i_dh_data
201 def compute_secret(self):
202 priv = self.dh_private_key
203 peer = self.r_dh_data
204 p, g, l = self.ike_group
205 return pow(int.from_bytes(peer, 'big'),
206 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
208 def generate_dh_data(self):
210 if self.is_initiator:
211 if self.ike_dh not in DH:
212 raise NotImplementedError('%s not in DH group' % self.ike_dh)
213 if self.dh_params is None:
214 dhg = DH[self.ike_dh]
215 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
216 self.dh_params = pn.parameters(default_backend())
217 priv = self.dh_params.generate_private_key()
218 pub = priv.public_key()
219 x = priv.private_numbers().x
220 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
221 y = pub.public_numbers().y
222 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
224 def complete_dh_data(self):
225 self.dh_shared_secret = self.compute_secret()
227 def calc_child_keys(self):
228 prf = self.ike_prf_alg.mod()
229 s = self.i_nonce + self.r_nonce
230 c = self.child_sas[0]
232 encr_key_len = self.esp_crypto_key_len
233 integ_key_len = self.esp_integ_alg.key_len
234 salt_len = 0 if integ_key_len else 4
236 l = (integ_key_len * 2 +
239 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
242 c.sk_ei = keymat[pos:pos+encr_key_len]
246 c.sk_ai = keymat[pos:pos+integ_key_len]
249 c.salt_ei = keymat[pos:pos+salt_len]
252 c.sk_er = keymat[pos:pos+encr_key_len]
256 c.sk_ar = keymat[pos:pos+integ_key_len]
259 c.salt_er = keymat[pos:pos+salt_len]
262 def calc_prfplus(self, prf, key, seed, length):
266 while len(r) < length and x < 255:
271 s = s + seed + bytes([x])
272 t = self.calc_prf(prf, key, s)
280 def calc_prf(self, prf, key, data):
281 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
286 prf = self.ike_prf_alg.mod()
287 # SKEYSEED = prf(Ni | Nr, g^ir)
288 s = self.i_nonce + self.r_nonce
289 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
291 # calculate S = Ni | Nr | SPIi SPIr
292 s = s + self.ispi + self.rspi
294 prf_key_trunc = self.ike_prf_alg.trunc_len
295 encr_key_len = self.ike_crypto_key_len
296 tr_prf_key_len = self.ike_prf_alg.key_len
297 integ_key_len = self.ike_integ_alg.key_len
298 if integ_key_len == 0:
308 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
311 self.sk_d = keymat[:pos+prf_key_trunc]
314 self.sk_ai = keymat[pos:pos+integ_key_len]
316 self.sk_ar = keymat[pos:pos+integ_key_len]
319 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
320 pos += encr_key_len + salt_size
321 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
322 pos += encr_key_len + salt_size
324 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
325 pos += tr_prf_key_len
326 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
328 def generate_authmsg(self, prf, packet):
329 if self.is_initiator:
333 data = bytes([self.id_type, 0, 0, 0]) + id
334 id_hash = self.calc_prf(prf, key, data)
335 return packet + nonce + id_hash
338 prf = self.ike_prf_alg.mod()
339 authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
340 if self.auth_method == 'shared-key':
341 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
342 self.auth_data = self.calc_prf(prf, psk, authmsg)
343 elif self.auth_method == 'rsa-sig':
344 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
347 raise TypeError('unknown auth method type!')
349 def encrypt(self, data, aad=None):
350 data = self.ike_crypto_alg.pad(data)
351 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
354 def peer_authkey(self):
355 if self.is_initiator:
360 def my_authkey(self):
361 if self.is_initiator:
366 def my_cryptokey(self):
367 if self.is_initiator:
372 def peer_cryptokey(self):
373 if self.is_initiator:
377 def concat(self, alg, key_len):
378 return alg + '-' + str(key_len * 8)
381 def vpp_ike_cypto_alg(self):
382 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
385 def vpp_esp_cypto_alg(self):
386 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
388 def verify_hmac(self, ikemsg):
389 integ_trunc = self.ike_integ_alg.trunc_len
390 exp_hmac = ikemsg[-integ_trunc:]
391 data = ikemsg[:-integ_trunc]
392 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
393 self.peer_authkey, data)
394 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
396 def compute_hmac(self, integ, key, data):
397 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
401 def decrypt(self, data, aad=None, icv=None):
402 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
404 def hmac_and_decrypt(self, ike):
405 ep = ike[ikev2.IKEv2_payload_Encrypted]
406 if self.ike_crypto == 'AES-GCM-16ICV':
407 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
408 ct = ep.load[:-GCM_ICV_SIZE]
409 tag = ep.load[-GCM_ICV_SIZE:]
410 return self.decrypt(ct, raw(ike)[:aad_len], tag)
412 self.verify_hmac(raw(ike))
413 integ_trunc = self.ike_integ_alg.trunc_len
415 # remove ICV and decrypt payload
416 ct = ep.load[:-integ_trunc]
417 return self.decrypt(ct)
419 def generate_ts(self):
420 c = self.child_sas[0]
421 ts1 = ikev2.IPv4TrafficSelector(
425 starting_address_v4=c.local_ts['start_addr'],
426 ending_address_v4=c.local_ts['end_addr'])
427 ts2 = ikev2.IPv4TrafficSelector(
429 starting_address_v4=c.remote_ts['start_addr'],
430 ending_address_v4=c.remote_ts['end_addr'])
431 return ([ts1], [ts2])
433 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
434 if crypto not in CRYPTO_ALGOS:
435 raise TypeError('unsupported encryption algo %r' % crypto)
436 self.ike_crypto = crypto
437 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
438 self.ike_crypto_key_len = crypto_key_len
440 if integ not in AUTH_ALGOS:
441 raise TypeError('unsupported auth algo %r' % integ)
442 self.ike_integ = None if integ == 'NULL' else integ
443 self.ike_integ_alg = AUTH_ALGOS[integ]
445 if prf not in PRF_ALGOS:
446 raise TypeError('unsupported prf algo %r' % prf)
448 self.ike_prf_alg = PRF_ALGOS[prf]
450 self.ike_group = DH[self.ike_dh]
452 def set_esp_props(self, crypto, crypto_key_len, integ):
453 self.esp_crypto_key_len = crypto_key_len
454 if crypto not in CRYPTO_ALGOS:
455 raise TypeError('unsupported encryption algo %r' % crypto)
456 self.esp_crypto = crypto
457 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
459 if integ not in AUTH_ALGOS:
460 raise TypeError('unsupported auth algo %r' % integ)
461 self.esp_integ = None if integ == 'NULL' else integ
462 self.esp_integ_alg = AUTH_ALGOS[integ]
464 def crypto_attr(self, key_len):
465 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
466 return (0x800e << 16 | key_len << 3, 12)
468 raise Exception('unsupported attribute type')
470 def ike_crypto_attr(self):
471 return self.crypto_attr(self.ike_crypto_key_len)
473 def esp_crypto_attr(self):
474 return self.crypto_attr(self.esp_crypto_key_len)
476 def compute_nat_sha1(self, ip, port):
477 data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
478 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
480 return digest.finalize()
483 class TemplateResponder(VppTestCase):
484 """ responder test template """
488 import scapy.contrib.ikev2 as _ikev2
489 globals()['ikev2'] = _ikev2
490 super(TemplateResponder, cls).setUpClass()
491 cls.create_pg_interfaces(range(2))
492 for i in cls.pg_interfaces:
498 def tearDownClass(cls):
499 super(TemplateResponder, cls).tearDownClass()
502 super(TemplateResponder, self).setUp()
504 self.p.add_vpp_config()
505 self.assertIsNotNone(self.p.query_vpp_config())
506 self.sa.generate_dh_data()
509 super(TemplateResponder, self).tearDown()
510 if self.sa.is_initiator:
511 self.initiate_del_sa()
512 r = self.vapi.ikev2_sa_dump()
513 self.assertEqual(len(r), 0)
515 self.p.remove_vpp_config()
516 self.assertIsNone(self.p.query_vpp_config())
518 def verify_del_sa(self, packet):
519 ih = self.get_ike_header(packet)
520 self.assertEqual(ih.id, self.sa.msg_id)
521 self.assertEqual(ih.exch_type, 37) # exchange informational
523 def initiate_del_sa(self):
524 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
525 flags='Initiator', exch_type='INFORMATIONAL',
526 id=self.sa.new_msg_id())
527 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
528 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
529 packet = self.create_packet(self.pg0, ike_msg,
530 self.sa.sport, self.sa.dport,
532 self.pg0.add_stream(packet)
533 self.pg0.enable_capture()
535 capture = self.pg0.get_capture(1)
536 self.verify_del_sa(capture[0])
538 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False):
539 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
540 IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
541 UDP(sport=sport, dport=dport))
543 # insert non ESP marker
544 res = res / Raw(b'\x00' * 4)
547 def send_sa_init(self, behind_nat=False):
548 tr_attr = self.sa.ike_crypto_attr()
549 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
550 transform_id=self.sa.ike_crypto, length=tr_attr[1],
551 key_length=tr_attr[0]) /
552 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
553 transform_id=self.sa.ike_integ) /
554 ikev2.IKEv2_payload_Transform(transform_type='PRF',
555 transform_id=self.sa.ike_prf_alg.name) /
556 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
557 transform_id=self.sa.ike_dh))
559 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
560 trans_nb=4, trans=trans))
563 next_payload = 'Notify'
567 self.sa.init_req_packet = (
568 ikev2.IKEv2(init_SPI=self.sa.ispi,
569 flags='Initiator', exch_type='IKE_SA_INIT') /
570 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
571 ikev2.IKEv2_payload_KE(next_payload='Nonce',
572 group=self.sa.ike_dh,
573 load=self.sa.dh_pub_key()) /
574 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
575 load=self.sa.i_nonce))
578 src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
580 nat_detection = ikev2.IKEv2_payload_Notify(
581 type='NAT_DETECTION_SOURCE_IP',
583 self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
585 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
586 self.sa.sport, self.sa.dport,
588 self.pg0.add_stream(ike_msg)
589 self.pg0.enable_capture()
591 capture = self.pg0.get_capture(1)
592 self.verify_sa_init(capture[0])
594 def encrypt_ike_msg(self, header, plain, first_payload):
595 if self.sa.ike_crypto == 'AES-GCM-16ICV':
596 data = self.sa.ike_crypto_alg.pad(raw(plain))
597 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
598 len(ikev2.IKEv2_payload_Encrypted())
599 tlen = plen + len(ikev2.IKEv2())
602 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
606 encr = self.sa.encrypt(raw(plain), raw(res))
607 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
608 length=plen, load=encr)
611 encr = self.sa.encrypt(raw(plain))
612 trunc_len = self.sa.ike_integ_alg.trunc_len
613 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
614 tlen = plen + len(ikev2.IKEv2())
616 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
617 length=plen, load=encr)
621 integ_data = raw(res)
622 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
623 self.sa.my_authkey, integ_data)
624 res = res / Raw(hmac_data[:trunc_len])
625 assert(len(res) == tlen)
628 def send_sa_auth(self):
629 tr_attr = self.sa.esp_crypto_attr()
630 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
631 transform_id=self.sa.esp_crypto, length=tr_attr[1],
632 key_length=tr_attr[0]) /
633 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
634 transform_id=self.sa.esp_integ) /
635 ikev2.IKEv2_payload_Transform(
636 transform_type='Extended Sequence Number',
637 transform_id='No ESN') /
638 ikev2.IKEv2_payload_Transform(
639 transform_type='Extended Sequence Number',
642 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
643 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
645 tsi, tsr = self.sa.generate_ts()
646 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
647 IDtype=self.sa.id_type, load=self.sa.i_id) /
648 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
649 IDtype=self.sa.id_type, load=self.sa.r_id) /
650 ikev2.IKEv2_payload_AUTH(next_payload='SA',
651 auth_type=AuthMethod.value(self.sa.auth_method),
652 load=self.sa.auth_data) /
653 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
654 ikev2.IKEv2_payload_TSi(next_payload='TSr',
655 number_of_TSs=len(tsi),
656 traffic_selector=tsi) /
657 ikev2.IKEv2_payload_TSr(next_payload='Notify',
658 number_of_TSs=len(tsr),
659 traffic_selector=tsr) /
660 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
662 header = ikev2.IKEv2(
663 init_SPI=self.sa.ispi,
664 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
665 flags='Initiator', exch_type='IKE_AUTH')
667 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
668 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
669 self.sa.dport, self.sa.natt)
670 self.pg0.add_stream(packet)
671 self.pg0.enable_capture()
673 capture = self.pg0.get_capture(1)
674 self.verify_sa_auth(capture[0])
676 def get_ike_header(self, packet):
678 ih = packet[ikev2.IKEv2]
679 except IndexError as e:
680 # this is a workaround for getting IKEv2 layer as both ikev2 and
681 # ipsec register for port 4500
683 ih = self.verify_and_remove_non_esp_marker(esp)
686 def verify_sa_init(self, packet):
687 ih = self.get_ike_header(packet)
689 self.assertEqual(ih.id, self.sa.msg_id)
690 self.assertEqual(ih.exch_type, 34)
691 self.assertTrue('Response' in ih.flags)
692 self.assertEqual(ih.init_SPI, self.sa.ispi)
693 self.assertNotEqual(ih.resp_SPI, 0)
694 self.sa.rspi = ih.resp_SPI
696 sa = ih[ikev2.IKEv2_payload_SA]
697 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
698 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
699 except IndexError as e:
700 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
701 self.logger.error(ih.show())
703 self.sa.complete_dh_data()
707 def verify_and_remove_non_esp_marker(self, packet):
709 # if we are in nat traversal mode check for non esp marker
712 self.assertEqual(data[:4], b'\x00' * 4)
713 return ikev2.IKEv2(data[4:])
717 def verify_udp(self, udp):
718 self.assertEqual(udp.sport, self.sa.sport)
719 self.assertEqual(udp.dport, self.sa.dport)
721 def verify_sa_auth(self, packet):
722 ike = self.get_ike_header(packet)
725 self.assertEqual(ike.id, self.sa.msg_id)
726 plain = self.sa.hmac_and_decrypt(ike)
727 self.sa.calc_child_keys()
729 def verify_ipsec_sas(self):
730 sas = self.vapi.ipsec_sa_dump()
731 self.assertEqual(len(sas), 2)
734 c = self.sa.child_sas[0]
736 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
737 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
738 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
740 if self.sa.esp_integ is None:
743 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
744 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
745 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
748 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
749 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
750 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
751 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
755 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
756 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
757 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
758 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
760 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
761 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
763 def verify_keymat(self, api_keys, keys, name):
764 km = getattr(keys, name)
765 api_km = getattr(api_keys, name)
766 api_km_len = getattr(api_keys, name + '_len')
767 self.assertEqual(len(km), api_km_len)
768 self.assertEqual(km, api_km[:api_km_len])
770 def verify_id(self, api_id, exp_id):
771 self.assertEqual(api_id.type, IDType.value(exp_id.type))
772 self.assertEqual(api_id.data_len, exp_id.data_len)
773 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
775 def verify_ike_sas(self):
776 r = self.vapi.ikev2_sa_dump()
777 self.assertEqual(len(r), 1)
779 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'little'))
780 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
781 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
782 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
783 self.verify_keymat(sa.keys, self.sa, 'sk_d')
784 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
785 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
786 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
787 self.verify_keymat(sa.keys, self.sa, 'sk_er')
788 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
789 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
791 self.assertEqual(sa.i_id.type, self.sa.id_type)
792 self.assertEqual(sa.r_id.type, self.sa.id_type)
793 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
794 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
795 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
796 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
798 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
799 self.assertEqual(len(r), 1)
801 self.assertEqual(csa.sa_index, sa.sa_index)
802 c = self.sa.child_sas[0]
803 if hasattr(c, 'sk_ai'):
804 self.verify_keymat(csa.keys, c, 'sk_ai')
805 self.verify_keymat(csa.keys, c, 'sk_ar')
806 self.verify_keymat(csa.keys, c, 'sk_ei')
807 self.verify_keymat(csa.keys, c, 'sk_er')
809 tsi, tsr = self.sa.generate_ts()
812 r = self.vapi.ikev2_traffic_selector_dump(
813 is_initiator=True, sa_index=sa.sa_index,
814 child_sa_index=csa.child_sa_index)
815 self.assertEqual(len(r), 1)
817 self.verify_ts(r[0].ts, tsi[0], True)
819 r = self.vapi.ikev2_traffic_selector_dump(
820 is_initiator=False, sa_index=sa.sa_index,
821 child_sa_index=csa.child_sa_index)
822 self.assertEqual(len(r), 1)
823 self.verify_ts(r[0].ts, tsr[0], False)
825 n = self.vapi.ikev2_nonce_get(is_initiator=True,
826 sa_index=sa.sa_index)
827 self.verify_nonce(n, self.sa.i_nonce)
828 n = self.vapi.ikev2_nonce_get(is_initiator=False,
829 sa_index=sa.sa_index)
830 self.verify_nonce(n, self.sa.r_nonce)
832 def verify_nonce(self, api_nonce, nonce):
833 self.assertEqual(api_nonce.data_len, len(nonce))
834 self.assertEqual(api_nonce.nonce, nonce)
836 def verify_ts(self, api_ts, ts, is_initiator):
838 self.assertTrue(api_ts.is_local)
840 self.assertFalse(api_ts.is_local)
841 self.assertEqual(api_ts.start_addr,
842 IPv4Address(ts.starting_address_v4))
843 self.assertEqual(api_ts.end_addr,
844 IPv4Address(ts.ending_address_v4))
845 self.assertEqual(api_ts.start_port, ts.start_port)
846 self.assertEqual(api_ts.end_port, ts.end_port)
847 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
849 def test_responder(self):
850 self.send_sa_init(self.sa.natt)
852 self.verify_ipsec_sas()
853 self.verify_ike_sas()
856 class Ikev2Params(object):
857 def config_params(self, params={}):
858 ec = VppEnum.vl_api_ipsec_crypto_alg_t
859 ei = VppEnum.vl_api_ipsec_integ_alg_t
861 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
862 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
863 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
864 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
865 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
866 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
868 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
869 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
870 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
871 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
873 is_natt = 'natt' in params and params['natt'] or False
874 self.p = Profile(self, 'pr1')
876 if 'auth' in params and params['auth'] == 'rsa-sig':
877 auth_method = 'rsa-sig'
878 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
879 self.vapi.ikev2_set_local_key(
880 key_file=work_dir + params['server-key'])
882 client_file = work_dir + params['client-cert']
883 server_pem = open(work_dir + params['server-cert']).read()
884 client_priv = open(work_dir + params['client-key']).read()
885 client_priv = load_pem_private_key(str.encode(client_priv), None,
887 self.peer_cert = x509.load_pem_x509_certificate(
888 str.encode(server_pem),
890 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
893 auth_data = b'$3cr3tpa$$w0rd'
894 self.p.add_auth(method='shared-key', data=auth_data)
895 auth_method = 'shared-key'
898 self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
899 self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
900 self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
901 self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
903 self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
904 r_id=self.p.local_id['data'],
905 id_type=self.p.local_id['id_type'], natt=is_natt,
906 priv_key=client_priv, auth_method=auth_method,
908 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
910 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
912 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
914 ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
916 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
918 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
921 self.sa.set_ike_props(
922 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
923 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
924 self.sa.set_esp_props(
925 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
929 class TestApi(VppTestCase):
930 """ Test IKEV2 API """
933 super(TestApi, cls).setUpClass()
936 def tearDownClass(cls):
937 super(TestApi, cls).tearDownClass()
940 super(TestApi, self).tearDown()
941 self.p1.remove_vpp_config()
942 self.p2.remove_vpp_config()
943 r = self.vapi.ikev2_profile_dump()
944 self.assertEqual(len(r), 0)
946 def configure_profile(self, cfg):
947 p = Profile(self, cfg['name'])
948 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
949 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
950 p.add_local_ts(**cfg['loc_ts'])
951 p.add_remote_ts(**cfg['rem_ts'])
952 p.add_responder(cfg['responder'])
953 p.add_ike_transforms(cfg['ike_ts'])
954 p.add_esp_transforms(cfg['esp_ts'])
955 p.add_auth(**cfg['auth'])
956 p.set_udp_encap(cfg['udp_encap'])
957 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
958 if 'lifetime_data' in cfg:
959 p.set_lifetime_data(cfg['lifetime_data'])
961 p.set_tunnel_interface(cfg['tun_itf'])
965 def test_profile_api(self):
966 """ test profile dump API """
971 'start_addr': '3.3.3.2',
972 'end_addr': '3.3.3.3',
978 'start_addr': '4.5.76.80',
979 'end_addr': '2.3.4.6',
985 'loc_id': ('fqdn', b'vpp.home'),
986 'rem_id': ('fqdn', b'roadwarrior.example.com'),
989 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
992 'crypto_key_size': 32,
997 'crypto_key_size': 24,
999 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1001 'ipsec_over_udp_port': 4501,
1004 'lifetime_maxdata': 20192,
1005 'lifetime_jitter': 9,
1010 'loc_id': ('ip4-addr', b'192.168.2.1'),
1011 'rem_id': ('ip4-addr', b'192.168.2.2'),
1014 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
1017 'crypto_key_size': 16,
1022 'crypto_key_size': 24,
1024 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1026 'ipsec_over_udp_port': 4600,
1029 self.p1 = self.configure_profile(conf['p1'])
1030 self.p2 = self.configure_profile(conf['p2'])
1032 r = self.vapi.ikev2_profile_dump()
1033 self.assertEqual(len(r), 2)
1034 self.verify_profile(r[0].profile, conf['p1'])
1035 self.verify_profile(r[1].profile, conf['p2'])
1037 def verify_id(self, api_id, cfg_id):
1038 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1039 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1041 def verify_ts(self, api_ts, cfg_ts):
1042 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1043 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1044 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1045 self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
1046 self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
1048 def verify_responder(self, api_r, cfg_r):
1049 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1050 self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
1052 def verify_transforms(self, api_ts, cfg_ts):
1053 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1054 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1055 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1057 def verify_ike_transforms(self, api_ts, cfg_ts):
1058 self.verify_transforms(api_ts, cfg_ts)
1059 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1061 def verify_esp_transforms(self, api_ts, cfg_ts):
1062 self.verify_transforms(api_ts, cfg_ts)
1064 def verify_auth(self, api_auth, cfg_auth):
1065 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1066 self.assertEqual(api_auth.data, cfg_auth['data'])
1067 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1069 def verify_lifetime_data(self, p, ld):
1070 self.assertEqual(p.lifetime, ld['lifetime'])
1071 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1072 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1073 self.assertEqual(p.handover, ld['handover'])
1075 def verify_profile(self, ap, cp):
1076 self.assertEqual(ap.name, cp['name'])
1077 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1078 self.verify_id(ap.loc_id, cp['loc_id'])
1079 self.verify_id(ap.rem_id, cp['rem_id'])
1080 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1081 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1082 self.verify_responder(ap.responder, cp['responder'])
1083 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1084 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1085 self.verify_auth(ap.auth, cp['auth'])
1086 if 'lifetime_data' in cp:
1087 self.verify_lifetime_data(ap, cp['lifetime_data'])
1088 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1090 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1092 self.assertEqual(ap.tun_itf, 0xffffffff)
1095 class TestResponderNATT(TemplateResponder, Ikev2Params):
1096 """ test ikev2 responder - nat traversal """
1097 def config_tc(self):
1102 class TestResponderPsk(TemplateResponder, Ikev2Params):
1103 """ test ikev2 responder - pre shared key auth """
1104 def config_tc(self):
1105 self.config_params()
1108 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1109 """ test ikev2 responder - cert based auth """
1110 def config_tc(self):
1111 self.config_params({
1113 'server-key': 'server-key.pem',
1114 'client-key': 'client-key.pem',
1115 'client-cert': 'client-cert.pem',
1116 'server-cert': 'server-cert.pem'})
1119 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1120 (TemplateResponder, Ikev2Params):
1122 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1124 def config_tc(self):
1125 self.config_params({
1126 'ike-crypto': ('AES-CBC', 16),
1127 'ike-integ': 'SHA2-256-128',
1128 'esp-crypto': ('AES-CBC', 24),
1129 'esp-integ': 'SHA2-384-192',
1130 'ike-dh': '2048MODPgr'})
1133 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1134 (TemplateResponder, Ikev2Params):
1136 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1138 def config_tc(self):
1139 self.config_params({
1140 'ike-crypto': ('AES-CBC', 32),
1141 'ike-integ': 'SHA2-256-128',
1142 'esp-crypto': ('AES-GCM-16ICV', 32),
1143 'esp-integ': 'NULL',
1144 'ike-dh': '3072MODPgr'})
1147 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1151 def config_tc(self):
1152 self.config_params({
1153 'ike-crypto': ('AES-GCM-16ICV', 32),
1154 'ike-integ': 'NULL',
1155 'ike-dh': '2048MODPgr'})
1158 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1159 """ malformed packet test """
1164 def config_tc(self):
1165 self.config_params()
1167 def assert_counter(self, count, name):
1168 node_name = '/err/ikev2/' + name
1169 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1171 def create_ike_init_msg(self, length=None, payload=None):
1172 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1173 flags='Initiator', exch_type='IKE_SA_INIT')
1174 if payload is not None:
1176 return self.create_packet(self.pg0, msg, self.sa.sport,
1179 def verify_bad_packet_length(self):
1180 ike_msg = self.create_ike_init_msg(length=0xdead)
1181 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1182 self.assert_counter(self.pkt_count, 'Bad packet length')
1184 def verify_bad_sa_payload_length(self):
1185 p = ikev2.IKEv2_payload_SA(length=0xdead)
1186 ike_msg = self.create_ike_init_msg(payload=p)
1187 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1188 self.assert_counter(self.pkt_count, 'Malformed packet')
1190 def test_responder(self):
1191 self.pkt_count = 254
1192 self.verify_bad_packet_length()
1193 self.verify_bad_sa_payload_length()
1196 if __name__ == '__main__':
1197 unittest.main(testRunner=VppTestRunner)