2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
28 KEY_PAD = b"Key Pad for IKEv2"
35 # tuple structure is (p, g, key_len)
37 '2048MODPgr': (long_converter("""
38 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
50 '3072MODPgr': (long_converter("""
51 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
70 class CryptoAlgo(object):
71 def __init__(self, name, cipher, mode):
75 if self.cipher is not None:
76 self.bs = self.cipher.block_size // 8
78 if self.name == 'AES-GCM-16ICV':
79 self.iv_len = GCM_IV_SIZE
83 def encrypt(self, data, key, aad=None):
84 iv = os.urandom(self.iv_len)
86 encryptor = Cipher(self.cipher(key), self.mode(iv),
87 default_backend()).encryptor()
88 return iv + encryptor.update(data) + encryptor.finalize()
90 salt = key[-SALT_SIZE:]
92 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93 default_backend()).encryptor()
94 encryptor.authenticate_additional_data(aad)
95 data = encryptor.update(data) + encryptor.finalize()
96 data += encryptor.tag[:GCM_ICV_SIZE]
99 def decrypt(self, data, key, aad=None, icv=None):
101 iv = data[:self.iv_len]
102 ct = data[self.iv_len:]
103 decryptor = Cipher(algorithms.AES(key),
105 default_backend()).decryptor()
106 return decryptor.update(ct) + decryptor.finalize()
108 salt = key[-SALT_SIZE:]
109 nonce = salt + data[:GCM_IV_SIZE]
110 ct = data[GCM_IV_SIZE:]
111 key = key[:-SALT_SIZE]
112 decryptor = Cipher(algorithms.AES(key),
113 self.mode(nonce, icv, len(icv)),
114 default_backend()).decryptor()
115 decryptor.authenticate_additional_data(aad)
116 return decryptor.update(ct) + decryptor.finalize()
119 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
120 data = data + b'\x00' * (pad_len - 1)
121 return data + bytes([pad_len - 1])
124 class AuthAlgo(object):
125 def __init__(self, name, mac, mod, key_len, trunc_len=None):
129 self.key_len = key_len
130 self.trunc_len = trunc_len or key_len
134 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
135 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
136 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
141 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
142 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
143 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
144 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
145 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
149 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
150 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
167 class IKEv2ChildSA(object):
168 def __init__(self, local_ts, remote_ts, is_initiator):
176 self.local_ts = local_ts
177 self.remote_ts = remote_ts
180 class IKEv2SA(object):
181 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
182 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
183 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
184 auth_method='shared-key', priv_key=None, natt=False):
193 self.dh_params = None
195 self.priv_key = priv_key
196 self.is_initiator = is_initiator
197 nonce = nonce or os.urandom(32)
198 self.auth_data = auth_data
201 if isinstance(id_type, str):
202 self.id_type = IDType.value(id_type)
204 self.id_type = id_type
205 self.auth_method = auth_method
206 if self.is_initiator:
207 self.rspi = 8 * b'\x00'
212 self.ispi = 8 * b'\x00'
214 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
217 def new_msg_id(self):
222 def my_dh_pub_key(self):
223 if self.is_initiator:
224 return self.i_dh_data
225 return self.r_dh_data
228 def peer_dh_pub_key(self):
229 if self.is_initiator:
230 return self.r_dh_data
231 return self.i_dh_data
233 def compute_secret(self):
234 priv = self.dh_private_key
235 peer = self.peer_dh_pub_key
236 p, g, l = self.ike_group
237 return pow(int.from_bytes(peer, 'big'),
238 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
240 def generate_dh_data(self):
242 if self.ike_dh not in DH:
243 raise NotImplementedError('%s not in DH group' % self.ike_dh)
245 if self.dh_params is None:
246 dhg = DH[self.ike_dh]
247 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
248 self.dh_params = pn.parameters(default_backend())
250 priv = self.dh_params.generate_private_key()
251 pub = priv.public_key()
252 x = priv.private_numbers().x
253 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
254 y = pub.public_numbers().y
256 if self.is_initiator:
257 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
259 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
261 def complete_dh_data(self):
262 self.dh_shared_secret = self.compute_secret()
264 def calc_child_keys(self):
265 prf = self.ike_prf_alg.mod()
266 s = self.i_nonce + self.r_nonce
267 c = self.child_sas[0]
269 encr_key_len = self.esp_crypto_key_len
270 integ_key_len = self.esp_integ_alg.key_len
271 salt_len = 0 if integ_key_len else 4
273 l = (integ_key_len * 2 +
276 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
279 c.sk_ei = keymat[pos:pos+encr_key_len]
283 c.sk_ai = keymat[pos:pos+integ_key_len]
286 c.salt_ei = keymat[pos:pos+salt_len]
289 c.sk_er = keymat[pos:pos+encr_key_len]
293 c.sk_ar = keymat[pos:pos+integ_key_len]
296 c.salt_er = keymat[pos:pos+salt_len]
299 def calc_prfplus(self, prf, key, seed, length):
303 while len(r) < length and x < 255:
308 s = s + seed + bytes([x])
309 t = self.calc_prf(prf, key, s)
317 def calc_prf(self, prf, key, data):
318 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
323 prf = self.ike_prf_alg.mod()
324 # SKEYSEED = prf(Ni | Nr, g^ir)
325 s = self.i_nonce + self.r_nonce
326 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
328 # calculate S = Ni | Nr | SPIi SPIr
329 s = s + self.ispi + self.rspi
331 prf_key_trunc = self.ike_prf_alg.trunc_len
332 encr_key_len = self.ike_crypto_key_len
333 tr_prf_key_len = self.ike_prf_alg.key_len
334 integ_key_len = self.ike_integ_alg.key_len
335 if integ_key_len == 0:
345 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
348 self.sk_d = keymat[:pos+prf_key_trunc]
351 self.sk_ai = keymat[pos:pos+integ_key_len]
353 self.sk_ar = keymat[pos:pos+integ_key_len]
356 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
357 pos += encr_key_len + salt_size
358 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
359 pos += encr_key_len + salt_size
361 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
362 pos += tr_prf_key_len
363 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
365 def generate_authmsg(self, prf, packet):
366 if self.is_initiator:
374 data = bytes([self.id_type, 0, 0, 0]) + id
375 id_hash = self.calc_prf(prf, key, data)
376 return packet + nonce + id_hash
379 prf = self.ike_prf_alg.mod()
380 if self.is_initiator:
381 packet = self.init_req_packet
383 packet = self.init_resp_packet
384 authmsg = self.generate_authmsg(prf, raw(packet))
385 if self.auth_method == 'shared-key':
386 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
387 self.auth_data = self.calc_prf(prf, psk, authmsg)
388 elif self.auth_method == 'rsa-sig':
389 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
392 raise TypeError('unknown auth method type!')
394 def encrypt(self, data, aad=None):
395 data = self.ike_crypto_alg.pad(data)
396 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
399 def peer_authkey(self):
400 if self.is_initiator:
405 def my_authkey(self):
406 if self.is_initiator:
411 def my_cryptokey(self):
412 if self.is_initiator:
417 def peer_cryptokey(self):
418 if self.is_initiator:
422 def concat(self, alg, key_len):
423 return alg + '-' + str(key_len * 8)
426 def vpp_ike_cypto_alg(self):
427 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
430 def vpp_esp_cypto_alg(self):
431 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
433 def verify_hmac(self, ikemsg):
434 integ_trunc = self.ike_integ_alg.trunc_len
435 exp_hmac = ikemsg[-integ_trunc:]
436 data = ikemsg[:-integ_trunc]
437 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
438 self.peer_authkey, data)
439 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
441 def compute_hmac(self, integ, key, data):
442 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
446 def decrypt(self, data, aad=None, icv=None):
447 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
449 def hmac_and_decrypt(self, ike):
450 ep = ike[ikev2.IKEv2_payload_Encrypted]
451 if self.ike_crypto == 'AES-GCM-16ICV':
452 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
453 ct = ep.load[:-GCM_ICV_SIZE]
454 tag = ep.load[-GCM_ICV_SIZE:]
455 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
457 self.verify_hmac(raw(ike))
458 integ_trunc = self.ike_integ_alg.trunc_len
460 # remove ICV and decrypt payload
461 ct = ep.load[:-integ_trunc]
462 plain = self.decrypt(ct)
465 return plain[:-pad_len - 1]
467 def build_ts_addr(self, ts, version):
468 return {'starting_address_v' + version: ts['start_addr'],
469 'ending_address_v' + version: ts['end_addr']}
471 def generate_ts(self, is_ip4):
472 c = self.child_sas[0]
473 ts_data = {'IP_protocol_ID': 0,
477 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
478 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
479 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
480 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
482 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
483 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
484 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
485 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
487 if self.is_initiator:
488 return ([ts1], [ts2])
489 return ([ts2], [ts1])
491 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
492 if crypto not in CRYPTO_ALGOS:
493 raise TypeError('unsupported encryption algo %r' % crypto)
494 self.ike_crypto = crypto
495 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
496 self.ike_crypto_key_len = crypto_key_len
498 if integ not in AUTH_ALGOS:
499 raise TypeError('unsupported auth algo %r' % integ)
500 self.ike_integ = None if integ == 'NULL' else integ
501 self.ike_integ_alg = AUTH_ALGOS[integ]
503 if prf not in PRF_ALGOS:
504 raise TypeError('unsupported prf algo %r' % prf)
506 self.ike_prf_alg = PRF_ALGOS[prf]
508 self.ike_group = DH[self.ike_dh]
510 def set_esp_props(self, crypto, crypto_key_len, integ):
511 self.esp_crypto_key_len = crypto_key_len
512 if crypto not in CRYPTO_ALGOS:
513 raise TypeError('unsupported encryption algo %r' % crypto)
514 self.esp_crypto = crypto
515 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
517 if integ not in AUTH_ALGOS:
518 raise TypeError('unsupported auth algo %r' % integ)
519 self.esp_integ = None if integ == 'NULL' else integ
520 self.esp_integ_alg = AUTH_ALGOS[integ]
522 def crypto_attr(self, key_len):
523 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
524 return (0x800e << 16 | key_len << 3, 12)
526 raise Exception('unsupported attribute type')
528 def ike_crypto_attr(self):
529 return self.crypto_attr(self.ike_crypto_key_len)
531 def esp_crypto_attr(self):
532 return self.crypto_attr(self.esp_crypto_key_len)
534 def compute_nat_sha1(self, ip, port, rspi=None):
537 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
538 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
540 return digest.finalize()
543 class IkePeer(VppTestCase):
544 """ common class for initiator and responder """
548 import scapy.contrib.ikev2 as _ikev2
549 globals()['ikev2'] = _ikev2
550 super(IkePeer, cls).setUpClass()
551 cls.create_pg_interfaces(range(2))
552 for i in cls.pg_interfaces:
560 def tearDownClass(cls):
561 super(IkePeer, cls).tearDownClass()
564 super(IkePeer, self).tearDown()
565 if self.del_sa_from_responder:
566 self.initiate_del_sa_from_responder()
568 self.initiate_del_sa_from_initiator()
569 r = self.vapi.ikev2_sa_dump()
570 self.assertEqual(len(r), 0)
571 sas = self.vapi.ipsec_sa_dump()
572 self.assertEqual(len(sas), 0)
573 self.p.remove_vpp_config()
574 self.assertIsNone(self.p.query_vpp_config())
577 super(IkePeer, self).setUp()
579 self.p.add_vpp_config()
580 self.assertIsNotNone(self.p.query_vpp_config())
581 if self.sa.is_initiator:
582 self.sa.generate_dh_data()
583 self.vapi.cli('ikev2 set logging level 4')
584 self.vapi.cli('event-lo clear')
585 self.vapi.cli('ikev2 dpd disable')
587 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
590 src_ip = src_if.remote_ip6
591 dst_ip = src_if.local_ip6
594 src_ip = src_if.remote_ip4
595 dst_ip = src_if.local_ip4
597 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
598 ip_layer(src=src_ip, dst=dst_ip) /
599 UDP(sport=sport, dport=dport))
601 # insert non ESP marker
602 res = res / Raw(b'\x00' * 4)
605 def verify_udp(self, udp):
606 self.assertEqual(udp.sport, self.sa.sport)
607 self.assertEqual(udp.dport, self.sa.dport)
609 def get_ike_header(self, packet):
611 ih = packet[ikev2.IKEv2]
612 except IndexError as e:
613 # this is a workaround for getting IKEv2 layer as both ikev2 and
614 # ipsec register for port 4500
616 ih = self.verify_and_remove_non_esp_marker(esp)
617 self.assertEqual(ih.version, 0x20)
618 self.assertNotIn('Version', ih.flags)
621 def verify_and_remove_non_esp_marker(self, packet):
623 # if we are in nat traversal mode check for non esp marker
626 self.assertEqual(data[:4], b'\x00' * 4)
627 return ikev2.IKEv2(data[4:])
631 def encrypt_ike_msg(self, header, plain, first_payload):
632 if self.sa.ike_crypto == 'AES-GCM-16ICV':
633 data = self.sa.ike_crypto_alg.pad(raw(plain))
634 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
635 len(ikev2.IKEv2_payload_Encrypted())
636 tlen = plen + len(ikev2.IKEv2())
639 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
643 encr = self.sa.encrypt(raw(plain), raw(res))
644 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
645 length=plen, load=encr)
648 encr = self.sa.encrypt(raw(plain))
649 trunc_len = self.sa.ike_integ_alg.trunc_len
650 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
651 tlen = plen + len(ikev2.IKEv2())
653 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
654 length=plen, load=encr)
658 integ_data = raw(res)
659 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
660 self.sa.my_authkey, integ_data)
661 res = res / Raw(hmac_data[:trunc_len])
662 assert(len(res) == tlen)
665 def verify_ipsec_sas(self, is_rekey=False):
666 sas = self.vapi.ipsec_sa_dump()
668 # after rekey there is a short period of time in which old
669 # inbound SA is still present
673 self.assertEqual(len(sas), sa_count)
674 e = VppEnum.vl_api_ipsec_sad_flags_t
675 if self.sa.is_initiator:
690 c = self.sa.child_sas[0]
692 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
693 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
694 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
696 if self.sa.esp_integ is None:
699 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
700 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
701 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
704 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
705 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
706 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
707 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
711 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
712 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
713 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
714 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
716 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
717 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
719 def verify_keymat(self, api_keys, keys, name):
720 km = getattr(keys, name)
721 api_km = getattr(api_keys, name)
722 api_km_len = getattr(api_keys, name + '_len')
723 self.assertEqual(len(km), api_km_len)
724 self.assertEqual(km, api_km[:api_km_len])
726 def verify_id(self, api_id, exp_id):
727 self.assertEqual(api_id.type, IDType.value(exp_id.type))
728 self.assertEqual(api_id.data_len, exp_id.data_len)
729 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
731 def verify_ike_sas(self):
732 r = self.vapi.ikev2_sa_dump()
733 self.assertEqual(len(r), 1)
735 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
736 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
738 if self.sa.is_initiator:
739 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
740 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
742 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
743 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
745 if self.sa.is_initiator:
746 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
747 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
749 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
750 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
751 self.verify_keymat(sa.keys, self.sa, 'sk_d')
752 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
753 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
754 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
755 self.verify_keymat(sa.keys, self.sa, 'sk_er')
756 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
757 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
759 self.assertEqual(sa.i_id.type, self.sa.id_type)
760 self.assertEqual(sa.r_id.type, self.sa.id_type)
761 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
762 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
763 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
764 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
766 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
767 self.assertEqual(len(r), 1)
769 self.assertEqual(csa.sa_index, sa.sa_index)
770 c = self.sa.child_sas[0]
771 if hasattr(c, 'sk_ai'):
772 self.verify_keymat(csa.keys, c, 'sk_ai')
773 self.verify_keymat(csa.keys, c, 'sk_ar')
774 self.verify_keymat(csa.keys, c, 'sk_ei')
775 self.verify_keymat(csa.keys, c, 'sk_er')
776 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
777 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
779 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
782 r = self.vapi.ikev2_traffic_selector_dump(
783 is_initiator=True, sa_index=sa.sa_index,
784 child_sa_index=csa.child_sa_index)
785 self.assertEqual(len(r), 1)
787 self.verify_ts(r[0].ts, tsi[0], True)
789 r = self.vapi.ikev2_traffic_selector_dump(
790 is_initiator=False, sa_index=sa.sa_index,
791 child_sa_index=csa.child_sa_index)
792 self.assertEqual(len(r), 1)
793 self.verify_ts(r[0].ts, tsr[0], False)
795 n = self.vapi.ikev2_nonce_get(is_initiator=True,
796 sa_index=sa.sa_index)
797 self.verify_nonce(n, self.sa.i_nonce)
798 n = self.vapi.ikev2_nonce_get(is_initiator=False,
799 sa_index=sa.sa_index)
800 self.verify_nonce(n, self.sa.r_nonce)
802 def verify_nonce(self, api_nonce, nonce):
803 self.assertEqual(api_nonce.data_len, len(nonce))
804 self.assertEqual(api_nonce.nonce, nonce)
806 def verify_ts(self, api_ts, ts, is_initiator):
808 self.assertTrue(api_ts.is_local)
810 self.assertFalse(api_ts.is_local)
813 self.assertEqual(api_ts.start_addr,
814 IPv4Address(ts.starting_address_v4))
815 self.assertEqual(api_ts.end_addr,
816 IPv4Address(ts.ending_address_v4))
818 self.assertEqual(api_ts.start_addr,
819 IPv6Address(ts.starting_address_v6))
820 self.assertEqual(api_ts.end_addr,
821 IPv6Address(ts.ending_address_v6))
822 self.assertEqual(api_ts.start_port, ts.start_port)
823 self.assertEqual(api_ts.end_port, ts.end_port)
824 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
827 class TemplateInitiator(IkePeer):
828 """ initiator test template """
830 def initiate_del_sa_from_initiator(self):
831 ispi = int.from_bytes(self.sa.ispi, 'little')
832 self.pg0.enable_capture()
834 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
835 capture = self.pg0.get_capture(1)
836 ih = self.get_ike_header(capture[0])
837 self.assertNotIn('Response', ih.flags)
838 self.assertIn('Initiator', ih.flags)
839 self.assertEqual(ih.init_SPI, self.sa.ispi)
840 self.assertEqual(ih.resp_SPI, self.sa.rspi)
841 plain = self.sa.hmac_and_decrypt(ih)
842 d = ikev2.IKEv2_payload_Delete(plain)
843 self.assertEqual(d.proto, 1) # proto=IKEv2
844 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
845 flags='Response', exch_type='INFORMATIONAL',
846 id=ih.id, next_payload='Encrypted')
847 resp = self.encrypt_ike_msg(header, b'', None)
848 self.send_and_assert_no_replies(self.pg0, resp)
850 def verify_del_sa(self, packet):
851 ih = self.get_ike_header(packet)
852 self.assertEqual(ih.id, self.sa.msg_id)
853 self.assertEqual(ih.exch_type, 37) # exchange informational
854 self.assertIn('Response', ih.flags)
855 self.assertIn('Initiator', ih.flags)
856 plain = self.sa.hmac_and_decrypt(ih)
857 self.assertEqual(plain, b'')
859 def initiate_del_sa_from_responder(self):
860 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
861 exch_type='INFORMATIONAL',
862 id=self.sa.new_msg_id())
863 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
864 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
865 packet = self.create_packet(self.pg0, ike_msg,
866 self.sa.sport, self.sa.dport,
867 self.sa.natt, self.ip6)
868 self.pg0.add_stream(packet)
869 self.pg0.enable_capture()
871 capture = self.pg0.get_capture(1)
872 self.verify_del_sa(capture[0])
875 def find_notify_payload(packet, notify_type):
876 n = packet[ikev2.IKEv2_payload_Notify]
878 if n.type == notify_type:
883 def verify_nat_detection(self, packet):
890 # NAT_DETECTION_SOURCE_IP
891 s = self.find_notify_payload(packet, 16388)
892 self.assertIsNotNone(s)
893 src_sha = self.sa.compute_nat_sha1(
894 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
895 self.assertEqual(s.load, src_sha)
897 # NAT_DETECTION_DESTINATION_IP
898 s = self.find_notify_payload(packet, 16389)
899 self.assertIsNotNone(s)
900 dst_sha = self.sa.compute_nat_sha1(
901 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
902 self.assertEqual(s.load, dst_sha)
904 def verify_sa_init_request(self, packet):
905 ih = packet[ikev2.IKEv2]
906 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
907 self.assertEqual(ih.exch_type, 34) # SA_INIT
908 self.sa.ispi = ih.init_SPI
909 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
910 self.assertIn('Initiator', ih.flags)
911 self.assertNotIn('Response', ih.flags)
912 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
913 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
915 prop = packet[ikev2.IKEv2_payload_Proposal]
916 self.assertEqual(prop.proto, 1) # proto = ikev2
917 self.assertEqual(prop.proposal, 1)
918 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
919 self.assertEqual(prop.trans[0].transform_id,
920 self.p.ike_transforms['crypto_alg'])
921 self.assertEqual(prop.trans[1].transform_type, 2) # prf
922 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
923 self.assertEqual(prop.trans[2].transform_type, 4) # dh
924 self.assertEqual(prop.trans[2].transform_id,
925 self.p.ike_transforms['dh_group'])
927 self.verify_nat_detection(packet)
928 self.sa.set_ike_props(
929 crypto='AES-GCM-16ICV', crypto_key_len=32,
930 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
931 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
932 integ='SHA2-256-128')
933 self.sa.generate_dh_data()
934 self.sa.complete_dh_data()
937 def update_esp_transforms(self, trans, sa):
939 if trans.transform_type == 1: # ecryption
940 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
941 elif trans.transform_type == 3: # integrity
942 sa.esp_integ = INTEG_IDS[trans.transform_id]
943 trans = trans.payload
945 def verify_sa_auth_req(self, packet):
946 ih = self.get_ike_header(packet)
947 self.assertEqual(ih.resp_SPI, self.sa.rspi)
948 self.assertEqual(ih.init_SPI, self.sa.ispi)
949 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
950 self.assertIn('Initiator', ih.flags)
951 self.assertNotIn('Response', ih.flags)
955 self.assertEqual(ih.id, self.sa.msg_id + 1)
957 plain = self.sa.hmac_and_decrypt(ih)
958 idi = ikev2.IKEv2_payload_IDi(plain)
959 idr = ikev2.IKEv2_payload_IDr(idi.payload)
960 self.assertEqual(idi.load, self.sa.i_id)
961 self.assertEqual(idr.load, self.sa.r_id)
962 prop = idi[ikev2.IKEv2_payload_Proposal]
963 c = self.sa.child_sas[0]
965 self.update_esp_transforms(
966 prop[ikev2.IKEv2_payload_Transform], self.sa)
968 def send_init_response(self):
969 tr_attr = self.sa.ike_crypto_attr()
970 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
971 transform_id=self.sa.ike_crypto, length=tr_attr[1],
972 key_length=tr_attr[0]) /
973 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
974 transform_id=self.sa.ike_integ) /
975 ikev2.IKEv2_payload_Transform(transform_type='PRF',
976 transform_id=self.sa.ike_prf_alg.name) /
977 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
978 transform_id=self.sa.ike_dh))
979 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
980 trans_nb=4, trans=trans))
981 self.sa.init_resp_packet = (
982 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
983 exch_type='IKE_SA_INIT', flags='Response') /
984 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
985 ikev2.IKEv2_payload_KE(next_payload='Nonce',
986 group=self.sa.ike_dh,
987 load=self.sa.my_dh_pub_key) /
988 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
990 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
991 self.sa.sport, self.sa.dport,
992 self.sa.natt, self.ip6)
993 self.pg_send(self.pg0, ike_msg)
994 capture = self.pg0.get_capture(1)
995 self.verify_sa_auth_req(capture[0])
997 def initiate_sa_init(self):
998 self.pg0.enable_capture()
1000 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1002 capture = self.pg0.get_capture(1)
1003 self.verify_sa_init_request(capture[0])
1004 self.send_init_response()
1006 def send_auth_response(self):
1007 tr_attr = self.sa.esp_crypto_attr()
1008 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1009 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1010 key_length=tr_attr[0]) /
1011 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1012 transform_id=self.sa.esp_integ) /
1013 ikev2.IKEv2_payload_Transform(
1014 transform_type='Extended Sequence Number',
1015 transform_id='No ESN') /
1016 ikev2.IKEv2_payload_Transform(
1017 transform_type='Extended Sequence Number',
1018 transform_id='ESN'))
1020 c = self.sa.child_sas[0]
1021 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1022 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1024 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1025 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1026 IDtype=self.sa.id_type, load=self.sa.i_id) /
1027 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1028 IDtype=self.sa.id_type, load=self.sa.r_id) /
1029 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1030 auth_type=AuthMethod.value(self.sa.auth_method),
1031 load=self.sa.auth_data) /
1032 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1033 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1034 number_of_TSs=len(tsi),
1035 traffic_selector=tsi) /
1036 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1037 number_of_TSs=len(tsr),
1038 traffic_selector=tsr) /
1039 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1041 header = ikev2.IKEv2(
1042 init_SPI=self.sa.ispi,
1043 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1044 flags='Response', exch_type='IKE_AUTH')
1046 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1047 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1048 self.sa.dport, self.sa.natt, self.ip6)
1049 self.pg_send(self.pg0, packet)
1051 def test_initiator(self):
1052 self.initiate_sa_init()
1054 self.sa.calc_child_keys()
1055 self.send_auth_response()
1056 self.verify_ike_sas()
1059 class TemplateResponder(IkePeer):
1060 """ responder test template """
1062 def initiate_del_sa_from_responder(self):
1063 self.pg0.enable_capture()
1065 self.vapi.ikev2_initiate_del_ike_sa(
1066 ispi=int.from_bytes(self.sa.ispi, 'little'))
1067 capture = self.pg0.get_capture(1)
1068 ih = self.get_ike_header(capture[0])
1069 self.assertNotIn('Response', ih.flags)
1070 self.assertNotIn('Initiator', ih.flags)
1071 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1072 plain = self.sa.hmac_and_decrypt(ih)
1073 d = ikev2.IKEv2_payload_Delete(plain)
1074 self.assertEqual(d.proto, 1) # proto=IKEv2
1075 self.assertEqual(ih.init_SPI, self.sa.ispi)
1076 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1077 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1078 flags='Initiator+Response',
1079 exch_type='INFORMATIONAL',
1080 id=ih.id, next_payload='Encrypted')
1081 resp = self.encrypt_ike_msg(header, b'', None)
1082 self.send_and_assert_no_replies(self.pg0, resp)
1084 def verify_del_sa(self, packet):
1085 ih = self.get_ike_header(packet)
1086 self.assertEqual(ih.id, self.sa.msg_id)
1087 self.assertEqual(ih.exch_type, 37) # exchange informational
1088 self.assertIn('Response', ih.flags)
1089 self.assertNotIn('Initiator', ih.flags)
1090 self.assertEqual(ih.next_payload, 46) # Encrypted
1091 self.assertEqual(ih.init_SPI, self.sa.ispi)
1092 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1093 plain = self.sa.hmac_and_decrypt(ih)
1094 self.assertEqual(plain, b'')
1096 def initiate_del_sa_from_initiator(self):
1097 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1098 flags='Initiator', exch_type='INFORMATIONAL',
1099 id=self.sa.new_msg_id())
1100 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1101 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1102 packet = self.create_packet(self.pg0, ike_msg,
1103 self.sa.sport, self.sa.dport,
1104 self.sa.natt, self.ip6)
1105 self.pg0.add_stream(packet)
1106 self.pg0.enable_capture()
1108 capture = self.pg0.get_capture(1)
1109 self.verify_del_sa(capture[0])
1111 def send_sa_init_req(self, behind_nat=False):
1112 tr_attr = self.sa.ike_crypto_attr()
1113 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1114 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1115 key_length=tr_attr[0]) /
1116 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1117 transform_id=self.sa.ike_integ) /
1118 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1119 transform_id=self.sa.ike_prf_alg.name) /
1120 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1121 transform_id=self.sa.ike_dh))
1123 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1124 trans_nb=4, trans=trans))
1126 self.sa.init_req_packet = (
1127 ikev2.IKEv2(init_SPI=self.sa.ispi,
1128 flags='Initiator', exch_type='IKE_SA_INIT') /
1129 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1130 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1131 group=self.sa.ike_dh,
1132 load=self.sa.my_dh_pub_key) /
1133 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1134 load=self.sa.i_nonce))
1137 src_address = b'\x0a\x0a\x0a\x01'
1139 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1141 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1142 dst_nat = self.sa.compute_nat_sha1(
1143 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1145 nat_src_detection = ikev2.IKEv2_payload_Notify(
1146 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1147 next_payload='Notify')
1148 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1149 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1150 self.sa.init_req_packet = (self.sa.init_req_packet /
1154 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1155 self.sa.sport, self.sa.dport,
1156 self.sa.natt, self.ip6)
1157 self.pg0.add_stream(ike_msg)
1158 self.pg0.enable_capture()
1160 capture = self.pg0.get_capture(1)
1161 self.verify_sa_init(capture[0])
1163 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1164 tr_attr = self.sa.esp_crypto_attr()
1165 last_payload = last_payload or 'Notify'
1166 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1167 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1168 key_length=tr_attr[0]) /
1169 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1170 transform_id=self.sa.esp_integ) /
1171 ikev2.IKEv2_payload_Transform(
1172 transform_type='Extended Sequence Number',
1173 transform_id='No ESN') /
1174 ikev2.IKEv2_payload_Transform(
1175 transform_type='Extended Sequence Number',
1176 transform_id='ESN'))
1178 c = self.sa.child_sas[0]
1179 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1180 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1182 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1183 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1184 auth_type=AuthMethod.value(self.sa.auth_method),
1185 load=self.sa.auth_data) /
1186 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1187 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1188 number_of_TSs=len(tsi), traffic_selector=tsi) /
1189 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1190 number_of_TSs=len(tsr), traffic_selector=tsr))
1193 first_payload = 'Nonce'
1194 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1195 next_payload='SA') / plain /
1196 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1197 proto='ESP', SPI=c.ispi))
1199 first_payload = 'IDi'
1200 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1201 IDtype=self.sa.id_type, load=self.sa.i_id) /
1202 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1203 IDtype=self.sa.id_type, load=self.sa.r_id))
1205 return plain, first_payload
1207 def send_sa_auth(self):
1208 plain, first_payload = self.generate_auth_payload(
1209 last_payload='Notify')
1210 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1211 header = ikev2.IKEv2(
1212 init_SPI=self.sa.ispi,
1213 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1214 flags='Initiator', exch_type='IKE_AUTH')
1216 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1217 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1218 self.sa.dport, self.sa.natt, self.ip6)
1219 self.pg0.add_stream(packet)
1220 self.pg0.enable_capture()
1222 capture = self.pg0.get_capture(1)
1223 self.verify_sa_auth_resp(capture[0])
1225 def verify_sa_init(self, packet):
1226 ih = self.get_ike_header(packet)
1228 self.assertEqual(ih.id, self.sa.msg_id)
1229 self.assertEqual(ih.exch_type, 34)
1230 self.assertIn('Response', ih.flags)
1231 self.assertEqual(ih.init_SPI, self.sa.ispi)
1232 self.assertNotEqual(ih.resp_SPI, 0)
1233 self.sa.rspi = ih.resp_SPI
1235 sa = ih[ikev2.IKEv2_payload_SA]
1236 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1237 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1238 except IndexError as e:
1239 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1240 self.logger.error(ih.show())
1242 self.sa.complete_dh_data()
1246 def verify_sa_auth_resp(self, packet):
1247 ike = self.get_ike_header(packet)
1249 self.verify_udp(udp)
1250 self.assertEqual(ike.id, self.sa.msg_id)
1251 plain = self.sa.hmac_and_decrypt(ike)
1252 idr = ikev2.IKEv2_payload_IDr(plain)
1253 prop = idr[ikev2.IKEv2_payload_Proposal]
1254 self.assertEqual(prop.SPIsize, 4)
1255 self.sa.child_sas[0].rspi = prop.SPI
1256 self.sa.calc_child_keys()
1258 def test_responder(self):
1259 self.send_sa_init_req(self.sa.natt)
1261 self.verify_ipsec_sas()
1262 self.verify_ike_sas()
1265 class Ikev2Params(object):
1266 def config_params(self, params={}):
1267 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1268 ei = VppEnum.vl_api_ipsec_integ_alg_t
1270 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1271 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1272 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1273 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1274 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1275 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1277 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1278 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1279 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1280 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1282 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1283 not in params else params['del_sa_from_responder']
1284 is_natt = 'natt' in params and params['natt'] or False
1285 self.p = Profile(self, 'pr1')
1286 self.ip6 = False if 'ip6' not in params else params['ip6']
1288 if 'auth' in params and params['auth'] == 'rsa-sig':
1289 auth_method = 'rsa-sig'
1290 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1291 self.vapi.ikev2_set_local_key(
1292 key_file=work_dir + params['server-key'])
1294 client_file = work_dir + params['client-cert']
1295 server_pem = open(work_dir + params['server-cert']).read()
1296 client_priv = open(work_dir + params['client-key']).read()
1297 client_priv = load_pem_private_key(str.encode(client_priv), None,
1299 self.peer_cert = x509.load_pem_x509_certificate(
1300 str.encode(server_pem),
1302 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1305 auth_data = b'$3cr3tpa$$w0rd'
1306 self.p.add_auth(method='shared-key', data=auth_data)
1307 auth_method = 'shared-key'
1310 is_init = True if 'is_initiator' not in params else\
1311 params['is_initiator']
1313 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1314 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1316 self.p.add_local_id(**idr)
1317 self.p.add_remote_id(**idi)
1319 self.p.add_local_id(**idi)
1320 self.p.add_remote_id(**idr)
1322 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1323 'loc_ts' not in params else params['loc_ts']
1324 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1325 'rem_ts' not in params else params['rem_ts']
1326 self.p.add_local_ts(**loc_ts)
1327 self.p.add_remote_ts(**rem_ts)
1328 if 'responder' in params:
1329 self.p.add_responder(params['responder'])
1330 if 'ike_transforms' in params:
1331 self.p.add_ike_transforms(params['ike_transforms'])
1332 if 'esp_transforms' in params:
1333 self.p.add_esp_transforms(params['esp_transforms'])
1335 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1336 is_initiator=is_init,
1337 id_type=self.p.local_id['id_type'], natt=is_natt,
1338 priv_key=client_priv, auth_method=auth_method,
1339 auth_data=auth_data,
1340 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1343 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1344 params['ike-crypto']
1345 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1347 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1350 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1351 params['esp-crypto']
1352 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1355 self.sa.set_ike_props(
1356 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1357 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1358 self.sa.set_esp_props(
1359 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1363 class TestApi(VppTestCase):
1364 """ Test IKEV2 API """
1366 def setUpClass(cls):
1367 super(TestApi, cls).setUpClass()
1370 def tearDownClass(cls):
1371 super(TestApi, cls).tearDownClass()
1374 super(TestApi, self).tearDown()
1375 self.p1.remove_vpp_config()
1376 self.p2.remove_vpp_config()
1377 r = self.vapi.ikev2_profile_dump()
1378 self.assertEqual(len(r), 0)
1380 def configure_profile(self, cfg):
1381 p = Profile(self, cfg['name'])
1382 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1383 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1384 p.add_local_ts(**cfg['loc_ts'])
1385 p.add_remote_ts(**cfg['rem_ts'])
1386 p.add_responder(cfg['responder'])
1387 p.add_ike_transforms(cfg['ike_ts'])
1388 p.add_esp_transforms(cfg['esp_ts'])
1389 p.add_auth(**cfg['auth'])
1390 p.set_udp_encap(cfg['udp_encap'])
1391 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1392 if 'lifetime_data' in cfg:
1393 p.set_lifetime_data(cfg['lifetime_data'])
1394 if 'tun_itf' in cfg:
1395 p.set_tunnel_interface(cfg['tun_itf'])
1396 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1401 def test_profile_api(self):
1402 """ test profile dump API """
1407 'start_addr': '3.3.3.2',
1408 'end_addr': '3.3.3.3',
1414 'start_addr': '4.5.76.80',
1415 'end_addr': '2.3.4.6',
1422 'start_addr': 'ab::1',
1423 'end_addr': 'ab::4',
1429 'start_addr': 'cd::12',
1430 'end_addr': 'cd::13',
1436 'natt_disabled': True,
1437 'loc_id': ('fqdn', b'vpp.home'),
1438 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1441 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1444 'crypto_key_size': 32,
1449 'crypto_key_size': 24,
1451 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1453 'ipsec_over_udp_port': 4501,
1456 'lifetime_maxdata': 20192,
1457 'lifetime_jitter': 9,
1462 'loc_id': ('ip4-addr', b'192.168.2.1'),
1463 'rem_id': ('ip6-addr', b'abcd::1'),
1466 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1469 'crypto_key_size': 16,
1474 'crypto_key_size': 24,
1476 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1478 'ipsec_over_udp_port': 4600,
1481 self.p1 = self.configure_profile(conf['p1'])
1482 self.p2 = self.configure_profile(conf['p2'])
1484 r = self.vapi.ikev2_profile_dump()
1485 self.assertEqual(len(r), 2)
1486 self.verify_profile(r[0].profile, conf['p1'])
1487 self.verify_profile(r[1].profile, conf['p2'])
1489 def verify_id(self, api_id, cfg_id):
1490 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1491 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1493 def verify_ts(self, api_ts, cfg_ts):
1494 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1495 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1496 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1497 self.assertEqual(api_ts.start_addr,
1498 ip_address(text_type(cfg_ts['start_addr'])))
1499 self.assertEqual(api_ts.end_addr,
1500 ip_address(text_type(cfg_ts['end_addr'])))
1502 def verify_responder(self, api_r, cfg_r):
1503 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1504 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1506 def verify_transforms(self, api_ts, cfg_ts):
1507 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1508 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1509 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1511 def verify_ike_transforms(self, api_ts, cfg_ts):
1512 self.verify_transforms(api_ts, cfg_ts)
1513 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1515 def verify_esp_transforms(self, api_ts, cfg_ts):
1516 self.verify_transforms(api_ts, cfg_ts)
1518 def verify_auth(self, api_auth, cfg_auth):
1519 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1520 self.assertEqual(api_auth.data, cfg_auth['data'])
1521 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1523 def verify_lifetime_data(self, p, ld):
1524 self.assertEqual(p.lifetime, ld['lifetime'])
1525 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1526 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1527 self.assertEqual(p.handover, ld['handover'])
1529 def verify_profile(self, ap, cp):
1530 self.assertEqual(ap.name, cp['name'])
1531 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1532 self.verify_id(ap.loc_id, cp['loc_id'])
1533 self.verify_id(ap.rem_id, cp['rem_id'])
1534 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1535 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1536 self.verify_responder(ap.responder, cp['responder'])
1537 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1538 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1539 self.verify_auth(ap.auth, cp['auth'])
1540 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1541 self.assertTrue(natt_dis == ap.natt_disabled)
1543 if 'lifetime_data' in cp:
1544 self.verify_lifetime_data(ap, cp['lifetime_data'])
1545 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1547 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1549 self.assertEqual(ap.tun_itf, 0xffffffff)
1552 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1553 """ test ikev2 initiator - pre shared key auth """
1555 def config_tc(self):
1556 self.config_params({
1557 'is_initiator': False, # seen from test case perspective
1558 # thus vpp is initiator
1559 'responder': {'sw_if_index': self.pg0.sw_if_index,
1560 'addr': self.pg0.remote_ip4},
1561 'ike-crypto': ('AES-GCM-16ICV', 32),
1562 'ike-integ': 'NULL',
1563 'ike-dh': '3072MODPgr',
1565 'crypto_alg': 20, # "aes-gcm-16"
1566 'crypto_key_size': 256,
1567 'dh_group': 15, # "modp-3072"
1570 'crypto_alg': 12, # "aes-cbc"
1571 'crypto_key_size': 256,
1572 # "hmac-sha2-256-128"
1576 class TestInitiatorRekey(TestInitiatorPsk):
1577 """ test ikev2 initiator - rekey """
1579 def rekey_from_initiator(self):
1580 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1581 self.pg0.enable_capture()
1583 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1584 capture = self.pg0.get_capture(1)
1585 ih = self.get_ike_header(capture[0])
1586 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1587 self.assertNotIn('Response', ih.flags)
1588 self.assertIn('Initiator', ih.flags)
1589 plain = self.sa.hmac_and_decrypt(ih)
1590 sa = ikev2.IKEv2_payload_SA(plain)
1591 prop = sa[ikev2.IKEv2_payload_Proposal]
1592 nonce = sa[ikev2.IKEv2_payload_Nonce]
1593 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1594 self.sa.r_nonce = self.sa.i_nonce
1595 # update new responder SPI
1596 self.sa.child_sas[0].ispi = prop.SPI
1597 self.sa.child_sas[0].rspi = prop.SPI
1598 self.sa.calc_child_keys()
1599 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1600 flags='Response', exch_type=36,
1601 id=ih.id, next_payload='Encrypted')
1602 resp = self.encrypt_ike_msg(header, sa, 'SA')
1603 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1604 self.sa.dport, self.sa.natt, self.ip6)
1605 self.send_and_assert_no_replies(self.pg0, packet)
1607 def test_initiator(self):
1608 super(TestInitiatorRekey, self).test_initiator()
1609 self.rekey_from_initiator()
1610 self.verify_ike_sas()
1611 self.verify_ipsec_sas(is_rekey=True)
1614 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1615 """ test ikev2 initiator - delete IKE SA from responder """
1617 def config_tc(self):
1618 self.config_params({
1619 'del_sa_from_responder': True,
1620 'is_initiator': False, # seen from test case perspective
1621 # thus vpp is initiator
1622 'responder': {'sw_if_index': self.pg0.sw_if_index,
1623 'addr': self.pg0.remote_ip4},
1624 'ike-crypto': ('AES-GCM-16ICV', 32),
1625 'ike-integ': 'NULL',
1626 'ike-dh': '3072MODPgr',
1628 'crypto_alg': 20, # "aes-gcm-16"
1629 'crypto_key_size': 256,
1630 'dh_group': 15, # "modp-3072"
1633 'crypto_alg': 12, # "aes-cbc"
1634 'crypto_key_size': 256,
1635 # "hmac-sha2-256-128"
1639 class TestResponderNATT(TemplateResponder, Ikev2Params):
1640 """ test ikev2 responder - nat traversal """
1641 def config_tc(self):
1646 class TestResponderPsk(TemplateResponder, Ikev2Params):
1647 """ test ikev2 responder - pre shared key auth """
1648 def config_tc(self):
1649 self.config_params()
1652 class TestResponderRekey(TestResponderPsk):
1653 """ test ikev2 responder - rekey """
1655 def rekey_from_initiator(self):
1656 sa, first_payload = self.generate_auth_payload(is_rekey=True)
1657 header = ikev2.IKEv2(
1658 init_SPI=self.sa.ispi,
1659 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1660 flags='Initiator', exch_type='CREATE_CHILD_SA')
1662 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
1663 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1664 self.sa.dport, self.sa.natt, self.ip6)
1665 self.pg0.add_stream(packet)
1666 self.pg0.enable_capture()
1668 capture = self.pg0.get_capture(1)
1669 ih = self.get_ike_header(capture[0])
1670 plain = self.sa.hmac_and_decrypt(ih)
1671 sa = ikev2.IKEv2_payload_SA(plain)
1672 prop = sa[ikev2.IKEv2_payload_Proposal]
1673 nonce = sa[ikev2.IKEv2_payload_Nonce]
1674 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1675 # update new responder SPI
1676 self.sa.child_sas[0].rspi = prop.SPI
1678 def test_responder(self):
1679 super(TestResponderRekey, self).test_responder()
1680 self.rekey_from_initiator()
1681 self.sa.calc_child_keys()
1682 self.verify_ike_sas()
1683 self.verify_ipsec_sas(is_rekey=True)
1686 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1687 """ test ikev2 responder - cert based auth """
1688 def config_tc(self):
1689 self.config_params({
1691 'server-key': 'server-key.pem',
1692 'client-key': 'client-key.pem',
1693 'client-cert': 'client-cert.pem',
1694 'server-cert': 'server-cert.pem'})
1697 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1698 (TemplateResponder, Ikev2Params):
1700 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1702 def config_tc(self):
1703 self.config_params({
1704 'ike-crypto': ('AES-CBC', 16),
1705 'ike-integ': 'SHA2-256-128',
1706 'esp-crypto': ('AES-CBC', 24),
1707 'esp-integ': 'SHA2-384-192',
1708 'ike-dh': '2048MODPgr'})
1711 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1712 (TemplateResponder, Ikev2Params):
1714 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1716 def config_tc(self):
1717 self.config_params({
1718 'ike-crypto': ('AES-CBC', 32),
1719 'ike-integ': 'SHA2-256-128',
1720 'esp-crypto': ('AES-GCM-16ICV', 32),
1721 'esp-integ': 'NULL',
1722 'ike-dh': '3072MODPgr'})
1725 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1729 def config_tc(self):
1730 self.config_params({
1731 'del_sa_from_responder': True,
1734 'ike-crypto': ('AES-GCM-16ICV', 32),
1735 'ike-integ': 'NULL',
1736 'ike-dh': '2048MODPgr',
1737 'loc_ts': {'start_addr': 'ab:cd::0',
1738 'end_addr': 'ab:cd::10'},
1739 'rem_ts': {'start_addr': '11::0',
1740 'end_addr': '11::100'}})
1743 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1744 """ malformed packet test """
1749 def config_tc(self):
1750 self.config_params()
1752 def assert_counter(self, count, name, version='ip4'):
1753 node_name = '/err/ikev2-%s/' % version + name
1754 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1756 def create_ike_init_msg(self, length=None, payload=None):
1757 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1758 flags='Initiator', exch_type='IKE_SA_INIT')
1759 if payload is not None:
1761 return self.create_packet(self.pg0, msg, self.sa.sport,
1764 def verify_bad_packet_length(self):
1765 ike_msg = self.create_ike_init_msg(length=0xdead)
1766 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1767 self.assert_counter(self.pkt_count, 'Bad packet length')
1769 def verify_bad_sa_payload_length(self):
1770 p = ikev2.IKEv2_payload_SA(length=0xdead)
1771 ike_msg = self.create_ike_init_msg(payload=p)
1772 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1773 self.assert_counter(self.pkt_count, 'Malformed packet')
1775 def test_responder(self):
1776 self.pkt_count = 254
1777 self.verify_bad_packet_length()
1778 self.verify_bad_sa_payload_length()
1781 if __name__ == '__main__':
1782 unittest.main(testRunner=VppTestRunner)