2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
28 KEY_PAD = b"Key Pad for IKEv2"
35 # tuple structure is (p, g, key_len)
37 '2048MODPgr': (long_converter("""
38 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
50 '3072MODPgr': (long_converter("""
51 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
70 class CryptoAlgo(object):
71 def __init__(self, name, cipher, mode):
75 if self.cipher is not None:
76 self.bs = self.cipher.block_size // 8
78 if self.name == 'AES-GCM-16ICV':
79 self.iv_len = GCM_IV_SIZE
83 def encrypt(self, data, key, aad=None):
84 iv = os.urandom(self.iv_len)
86 encryptor = Cipher(self.cipher(key), self.mode(iv),
87 default_backend()).encryptor()
88 return iv + encryptor.update(data) + encryptor.finalize()
90 salt = key[-SALT_SIZE:]
92 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93 default_backend()).encryptor()
94 encryptor.authenticate_additional_data(aad)
95 data = encryptor.update(data) + encryptor.finalize()
96 data += encryptor.tag[:GCM_ICV_SIZE]
99 def decrypt(self, data, key, aad=None, icv=None):
101 iv = data[:self.iv_len]
102 ct = data[self.iv_len:]
103 decryptor = Cipher(algorithms.AES(key),
105 default_backend()).decryptor()
106 return decryptor.update(ct) + decryptor.finalize()
108 salt = key[-SALT_SIZE:]
109 nonce = salt + data[:GCM_IV_SIZE]
110 ct = data[GCM_IV_SIZE:]
111 key = key[:-SALT_SIZE]
112 decryptor = Cipher(algorithms.AES(key),
113 self.mode(nonce, icv, len(icv)),
114 default_backend()).decryptor()
115 decryptor.authenticate_additional_data(aad)
116 return decryptor.update(ct) + decryptor.finalize()
119 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
120 data = data + b'\x00' * (pad_len - 1)
121 return data + bytes([pad_len - 1])
124 class AuthAlgo(object):
125 def __init__(self, name, mac, mod, key_len, trunc_len=None):
129 self.key_len = key_len
130 self.trunc_len = trunc_len or key_len
134 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
135 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
136 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
141 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
142 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
143 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
144 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
145 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
149 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
150 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
167 class IKEv2ChildSA(object):
168 def __init__(self, local_ts, remote_ts, is_initiator):
176 self.local_ts = local_ts
177 self.remote_ts = remote_ts
180 class IKEv2SA(object):
181 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
182 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
183 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
184 auth_method='shared-key', priv_key=None, natt=False,
186 self.udp_encap = udp_encap
195 self.dh_params = None
197 self.priv_key = priv_key
198 self.is_initiator = is_initiator
199 nonce = nonce or os.urandom(32)
200 self.auth_data = auth_data
203 if isinstance(id_type, str):
204 self.id_type = IDType.value(id_type)
206 self.id_type = id_type
207 self.auth_method = auth_method
208 if self.is_initiator:
209 self.rspi = 8 * b'\x00'
214 self.ispi = 8 * b'\x00'
216 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
219 def new_msg_id(self):
224 def my_dh_pub_key(self):
225 if self.is_initiator:
226 return self.i_dh_data
227 return self.r_dh_data
230 def peer_dh_pub_key(self):
231 if self.is_initiator:
232 return self.r_dh_data
233 return self.i_dh_data
235 def compute_secret(self):
236 priv = self.dh_private_key
237 peer = self.peer_dh_pub_key
238 p, g, l = self.ike_group
239 return pow(int.from_bytes(peer, 'big'),
240 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
242 def generate_dh_data(self):
244 if self.ike_dh not in DH:
245 raise NotImplementedError('%s not in DH group' % self.ike_dh)
247 if self.dh_params is None:
248 dhg = DH[self.ike_dh]
249 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
250 self.dh_params = pn.parameters(default_backend())
252 priv = self.dh_params.generate_private_key()
253 pub = priv.public_key()
254 x = priv.private_numbers().x
255 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
256 y = pub.public_numbers().y
258 if self.is_initiator:
259 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
261 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
263 def complete_dh_data(self):
264 self.dh_shared_secret = self.compute_secret()
266 def calc_child_keys(self):
267 prf = self.ike_prf_alg.mod()
268 s = self.i_nonce + self.r_nonce
269 c = self.child_sas[0]
271 encr_key_len = self.esp_crypto_key_len
272 integ_key_len = self.esp_integ_alg.key_len
273 salt_len = 0 if integ_key_len else 4
275 l = (integ_key_len * 2 +
278 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
281 c.sk_ei = keymat[pos:pos+encr_key_len]
285 c.sk_ai = keymat[pos:pos+integ_key_len]
288 c.salt_ei = keymat[pos:pos+salt_len]
291 c.sk_er = keymat[pos:pos+encr_key_len]
295 c.sk_ar = keymat[pos:pos+integ_key_len]
298 c.salt_er = keymat[pos:pos+salt_len]
301 def calc_prfplus(self, prf, key, seed, length):
305 while len(r) < length and x < 255:
310 s = s + seed + bytes([x])
311 t = self.calc_prf(prf, key, s)
319 def calc_prf(self, prf, key, data):
320 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
325 prf = self.ike_prf_alg.mod()
326 # SKEYSEED = prf(Ni | Nr, g^ir)
327 s = self.i_nonce + self.r_nonce
328 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
330 # calculate S = Ni | Nr | SPIi SPIr
331 s = s + self.ispi + self.rspi
333 prf_key_trunc = self.ike_prf_alg.trunc_len
334 encr_key_len = self.ike_crypto_key_len
335 tr_prf_key_len = self.ike_prf_alg.key_len
336 integ_key_len = self.ike_integ_alg.key_len
337 if integ_key_len == 0:
347 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
350 self.sk_d = keymat[:pos+prf_key_trunc]
353 self.sk_ai = keymat[pos:pos+integ_key_len]
355 self.sk_ar = keymat[pos:pos+integ_key_len]
358 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
359 pos += encr_key_len + salt_size
360 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
361 pos += encr_key_len + salt_size
363 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
364 pos += tr_prf_key_len
365 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
367 def generate_authmsg(self, prf, packet):
368 if self.is_initiator:
376 data = bytes([self.id_type, 0, 0, 0]) + id
377 id_hash = self.calc_prf(prf, key, data)
378 return packet + nonce + id_hash
381 prf = self.ike_prf_alg.mod()
382 if self.is_initiator:
383 packet = self.init_req_packet
385 packet = self.init_resp_packet
386 authmsg = self.generate_authmsg(prf, raw(packet))
387 if self.auth_method == 'shared-key':
388 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
389 self.auth_data = self.calc_prf(prf, psk, authmsg)
390 elif self.auth_method == 'rsa-sig':
391 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
394 raise TypeError('unknown auth method type!')
396 def encrypt(self, data, aad=None):
397 data = self.ike_crypto_alg.pad(data)
398 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
401 def peer_authkey(self):
402 if self.is_initiator:
407 def my_authkey(self):
408 if self.is_initiator:
413 def my_cryptokey(self):
414 if self.is_initiator:
419 def peer_cryptokey(self):
420 if self.is_initiator:
424 def concat(self, alg, key_len):
425 return alg + '-' + str(key_len * 8)
428 def vpp_ike_cypto_alg(self):
429 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
432 def vpp_esp_cypto_alg(self):
433 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
435 def verify_hmac(self, ikemsg):
436 integ_trunc = self.ike_integ_alg.trunc_len
437 exp_hmac = ikemsg[-integ_trunc:]
438 data = ikemsg[:-integ_trunc]
439 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
440 self.peer_authkey, data)
441 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
443 def compute_hmac(self, integ, key, data):
444 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
448 def decrypt(self, data, aad=None, icv=None):
449 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
451 def hmac_and_decrypt(self, ike):
452 ep = ike[ikev2.IKEv2_payload_Encrypted]
453 if self.ike_crypto == 'AES-GCM-16ICV':
454 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
455 ct = ep.load[:-GCM_ICV_SIZE]
456 tag = ep.load[-GCM_ICV_SIZE:]
457 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
459 self.verify_hmac(raw(ike))
460 integ_trunc = self.ike_integ_alg.trunc_len
462 # remove ICV and decrypt payload
463 ct = ep.load[:-integ_trunc]
464 plain = self.decrypt(ct)
467 return plain[:-pad_len - 1]
469 def build_ts_addr(self, ts, version):
470 return {'starting_address_v' + version: ts['start_addr'],
471 'ending_address_v' + version: ts['end_addr']}
473 def generate_ts(self, is_ip4):
474 c = self.child_sas[0]
475 ts_data = {'IP_protocol_ID': 0,
479 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
480 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
481 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
482 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
484 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
485 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
486 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
487 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
489 if self.is_initiator:
490 return ([ts1], [ts2])
491 return ([ts2], [ts1])
493 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
494 if crypto not in CRYPTO_ALGOS:
495 raise TypeError('unsupported encryption algo %r' % crypto)
496 self.ike_crypto = crypto
497 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
498 self.ike_crypto_key_len = crypto_key_len
500 if integ not in AUTH_ALGOS:
501 raise TypeError('unsupported auth algo %r' % integ)
502 self.ike_integ = None if integ == 'NULL' else integ
503 self.ike_integ_alg = AUTH_ALGOS[integ]
505 if prf not in PRF_ALGOS:
506 raise TypeError('unsupported prf algo %r' % prf)
508 self.ike_prf_alg = PRF_ALGOS[prf]
510 self.ike_group = DH[self.ike_dh]
512 def set_esp_props(self, crypto, crypto_key_len, integ):
513 self.esp_crypto_key_len = crypto_key_len
514 if crypto not in CRYPTO_ALGOS:
515 raise TypeError('unsupported encryption algo %r' % crypto)
516 self.esp_crypto = crypto
517 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
519 if integ not in AUTH_ALGOS:
520 raise TypeError('unsupported auth algo %r' % integ)
521 self.esp_integ = None if integ == 'NULL' else integ
522 self.esp_integ_alg = AUTH_ALGOS[integ]
524 def crypto_attr(self, key_len):
525 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
526 return (0x800e << 16 | key_len << 3, 12)
528 raise Exception('unsupported attribute type')
530 def ike_crypto_attr(self):
531 return self.crypto_attr(self.ike_crypto_key_len)
533 def esp_crypto_attr(self):
534 return self.crypto_attr(self.esp_crypto_key_len)
536 def compute_nat_sha1(self, ip, port, rspi=None):
539 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
540 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
542 return digest.finalize()
545 class IkePeer(VppTestCase):
546 """ common class for initiator and responder """
550 import scapy.contrib.ikev2 as _ikev2
551 globals()['ikev2'] = _ikev2
552 super(IkePeer, cls).setUpClass()
553 cls.create_pg_interfaces(range(2))
554 for i in cls.pg_interfaces:
562 def tearDownClass(cls):
563 super(IkePeer, cls).tearDownClass()
566 super(IkePeer, self).tearDown()
567 if self.del_sa_from_responder:
568 self.initiate_del_sa_from_responder()
570 self.initiate_del_sa_from_initiator()
571 r = self.vapi.ikev2_sa_dump()
572 self.assertEqual(len(r), 0)
573 sas = self.vapi.ipsec_sa_dump()
574 self.assertEqual(len(sas), 0)
575 self.p.remove_vpp_config()
576 self.assertIsNone(self.p.query_vpp_config())
579 super(IkePeer, self).setUp()
581 self.p.add_vpp_config()
582 self.assertIsNotNone(self.p.query_vpp_config())
583 if self.sa.is_initiator:
584 self.sa.generate_dh_data()
585 self.vapi.cli('ikev2 set logging level 4')
586 self.vapi.cli('event-lo clear')
587 self.vapi.cli('ikev2 dpd disable')
589 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
592 src_ip = src_if.remote_ip6
593 dst_ip = src_if.local_ip6
596 src_ip = src_if.remote_ip4
597 dst_ip = src_if.local_ip4
599 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
600 ip_layer(src=src_ip, dst=dst_ip) /
601 UDP(sport=sport, dport=dport))
603 # insert non ESP marker
604 res = res / Raw(b'\x00' * 4)
607 def verify_udp(self, udp):
608 self.assertEqual(udp.sport, self.sa.sport)
609 self.assertEqual(udp.dport, self.sa.dport)
611 def get_ike_header(self, packet):
613 ih = packet[ikev2.IKEv2]
614 except IndexError as e:
615 # this is a workaround for getting IKEv2 layer as both ikev2 and
616 # ipsec register for port 4500
618 ih = self.verify_and_remove_non_esp_marker(esp)
619 self.assertEqual(ih.version, 0x20)
620 self.assertNotIn('Version', ih.flags)
623 def verify_and_remove_non_esp_marker(self, packet):
625 # if we are in nat traversal mode check for non esp marker
628 self.assertEqual(data[:4], b'\x00' * 4)
629 return ikev2.IKEv2(data[4:])
633 def encrypt_ike_msg(self, header, plain, first_payload):
634 if self.sa.ike_crypto == 'AES-GCM-16ICV':
635 data = self.sa.ike_crypto_alg.pad(raw(plain))
636 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
637 len(ikev2.IKEv2_payload_Encrypted())
638 tlen = plen + len(ikev2.IKEv2())
641 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
645 encr = self.sa.encrypt(raw(plain), raw(res))
646 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
647 length=plen, load=encr)
650 encr = self.sa.encrypt(raw(plain))
651 trunc_len = self.sa.ike_integ_alg.trunc_len
652 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
653 tlen = plen + len(ikev2.IKEv2())
655 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
656 length=plen, load=encr)
660 integ_data = raw(res)
661 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
662 self.sa.my_authkey, integ_data)
663 res = res / Raw(hmac_data[:trunc_len])
664 assert(len(res) == tlen)
667 def verify_udp_encap(self, ipsec_sa):
668 e = VppEnum.vl_api_ipsec_sad_flags_t
669 if self.sa.udp_encap or self.sa.natt:
670 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
672 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
674 def verify_ipsec_sas(self, is_rekey=False):
675 sas = self.vapi.ipsec_sa_dump()
677 # after rekey there is a short period of time in which old
678 # inbound SA is still present
682 self.assertEqual(len(sas), sa_count)
683 if self.sa.is_initiator:
698 c = self.sa.child_sas[0]
700 self.verify_udp_encap(sa0)
701 self.verify_udp_encap(sa1)
702 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
703 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
704 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
706 if self.sa.esp_integ is None:
709 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
710 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
711 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
714 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
715 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
716 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
717 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
721 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
722 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
723 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
724 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
726 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
727 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
729 def verify_keymat(self, api_keys, keys, name):
730 km = getattr(keys, name)
731 api_km = getattr(api_keys, name)
732 api_km_len = getattr(api_keys, name + '_len')
733 self.assertEqual(len(km), api_km_len)
734 self.assertEqual(km, api_km[:api_km_len])
736 def verify_id(self, api_id, exp_id):
737 self.assertEqual(api_id.type, IDType.value(exp_id.type))
738 self.assertEqual(api_id.data_len, exp_id.data_len)
739 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
741 def verify_ike_sas(self):
742 r = self.vapi.ikev2_sa_dump()
743 self.assertEqual(len(r), 1)
745 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
746 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
748 if self.sa.is_initiator:
749 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
750 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
752 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
753 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
755 if self.sa.is_initiator:
756 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
757 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
759 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
760 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
761 self.verify_keymat(sa.keys, self.sa, 'sk_d')
762 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
763 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
764 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
765 self.verify_keymat(sa.keys, self.sa, 'sk_er')
766 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
767 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
769 self.assertEqual(sa.i_id.type, self.sa.id_type)
770 self.assertEqual(sa.r_id.type, self.sa.id_type)
771 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
772 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
773 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
774 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
776 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
777 self.assertEqual(len(r), 1)
779 self.assertEqual(csa.sa_index, sa.sa_index)
780 c = self.sa.child_sas[0]
781 if hasattr(c, 'sk_ai'):
782 self.verify_keymat(csa.keys, c, 'sk_ai')
783 self.verify_keymat(csa.keys, c, 'sk_ar')
784 self.verify_keymat(csa.keys, c, 'sk_ei')
785 self.verify_keymat(csa.keys, c, 'sk_er')
786 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
787 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
789 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
792 r = self.vapi.ikev2_traffic_selector_dump(
793 is_initiator=True, sa_index=sa.sa_index,
794 child_sa_index=csa.child_sa_index)
795 self.assertEqual(len(r), 1)
797 self.verify_ts(r[0].ts, tsi[0], True)
799 r = self.vapi.ikev2_traffic_selector_dump(
800 is_initiator=False, sa_index=sa.sa_index,
801 child_sa_index=csa.child_sa_index)
802 self.assertEqual(len(r), 1)
803 self.verify_ts(r[0].ts, tsr[0], False)
805 n = self.vapi.ikev2_nonce_get(is_initiator=True,
806 sa_index=sa.sa_index)
807 self.verify_nonce(n, self.sa.i_nonce)
808 n = self.vapi.ikev2_nonce_get(is_initiator=False,
809 sa_index=sa.sa_index)
810 self.verify_nonce(n, self.sa.r_nonce)
812 def verify_nonce(self, api_nonce, nonce):
813 self.assertEqual(api_nonce.data_len, len(nonce))
814 self.assertEqual(api_nonce.nonce, nonce)
816 def verify_ts(self, api_ts, ts, is_initiator):
818 self.assertTrue(api_ts.is_local)
820 self.assertFalse(api_ts.is_local)
823 self.assertEqual(api_ts.start_addr,
824 IPv4Address(ts.starting_address_v4))
825 self.assertEqual(api_ts.end_addr,
826 IPv4Address(ts.ending_address_v4))
828 self.assertEqual(api_ts.start_addr,
829 IPv6Address(ts.starting_address_v6))
830 self.assertEqual(api_ts.end_addr,
831 IPv6Address(ts.ending_address_v6))
832 self.assertEqual(api_ts.start_port, ts.start_port)
833 self.assertEqual(api_ts.end_port, ts.end_port)
834 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
837 class TemplateInitiator(IkePeer):
838 """ initiator test template """
840 def initiate_del_sa_from_initiator(self):
841 ispi = int.from_bytes(self.sa.ispi, 'little')
842 self.pg0.enable_capture()
844 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
845 capture = self.pg0.get_capture(1)
846 ih = self.get_ike_header(capture[0])
847 self.assertNotIn('Response', ih.flags)
848 self.assertIn('Initiator', ih.flags)
849 self.assertEqual(ih.init_SPI, self.sa.ispi)
850 self.assertEqual(ih.resp_SPI, self.sa.rspi)
851 plain = self.sa.hmac_and_decrypt(ih)
852 d = ikev2.IKEv2_payload_Delete(plain)
853 self.assertEqual(d.proto, 1) # proto=IKEv2
854 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
855 flags='Response', exch_type='INFORMATIONAL',
856 id=ih.id, next_payload='Encrypted')
857 resp = self.encrypt_ike_msg(header, b'', None)
858 self.send_and_assert_no_replies(self.pg0, resp)
860 def verify_del_sa(self, packet):
861 ih = self.get_ike_header(packet)
862 self.assertEqual(ih.id, self.sa.msg_id)
863 self.assertEqual(ih.exch_type, 37) # exchange informational
864 self.assertIn('Response', ih.flags)
865 self.assertIn('Initiator', ih.flags)
866 plain = self.sa.hmac_and_decrypt(ih)
867 self.assertEqual(plain, b'')
869 def initiate_del_sa_from_responder(self):
870 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
871 exch_type='INFORMATIONAL',
872 id=self.sa.new_msg_id())
873 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
874 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
875 packet = self.create_packet(self.pg0, ike_msg,
876 self.sa.sport, self.sa.dport,
877 self.sa.natt, self.ip6)
878 self.pg0.add_stream(packet)
879 self.pg0.enable_capture()
881 capture = self.pg0.get_capture(1)
882 self.verify_del_sa(capture[0])
885 def find_notify_payload(packet, notify_type):
886 n = packet[ikev2.IKEv2_payload_Notify]
888 if n.type == notify_type:
893 def verify_nat_detection(self, packet):
900 # NAT_DETECTION_SOURCE_IP
901 s = self.find_notify_payload(packet, 16388)
902 self.assertIsNotNone(s)
903 src_sha = self.sa.compute_nat_sha1(
904 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
905 self.assertEqual(s.load, src_sha)
907 # NAT_DETECTION_DESTINATION_IP
908 s = self.find_notify_payload(packet, 16389)
909 self.assertIsNotNone(s)
910 dst_sha = self.sa.compute_nat_sha1(
911 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
912 self.assertEqual(s.load, dst_sha)
914 def verify_sa_init_request(self, packet):
915 ih = packet[ikev2.IKEv2]
916 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
917 self.assertEqual(ih.exch_type, 34) # SA_INIT
918 self.sa.ispi = ih.init_SPI
919 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
920 self.assertIn('Initiator', ih.flags)
921 self.assertNotIn('Response', ih.flags)
922 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
923 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
925 prop = packet[ikev2.IKEv2_payload_Proposal]
926 self.assertEqual(prop.proto, 1) # proto = ikev2
927 self.assertEqual(prop.proposal, 1)
928 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
929 self.assertEqual(prop.trans[0].transform_id,
930 self.p.ike_transforms['crypto_alg'])
931 self.assertEqual(prop.trans[1].transform_type, 2) # prf
932 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
933 self.assertEqual(prop.trans[2].transform_type, 4) # dh
934 self.assertEqual(prop.trans[2].transform_id,
935 self.p.ike_transforms['dh_group'])
937 self.verify_nat_detection(packet)
938 self.sa.set_ike_props(
939 crypto='AES-GCM-16ICV', crypto_key_len=32,
940 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
941 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
942 integ='SHA2-256-128')
943 self.sa.generate_dh_data()
944 self.sa.complete_dh_data()
947 def update_esp_transforms(self, trans, sa):
949 if trans.transform_type == 1: # ecryption
950 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
951 elif trans.transform_type == 3: # integrity
952 sa.esp_integ = INTEG_IDS[trans.transform_id]
953 trans = trans.payload
955 def verify_sa_auth_req(self, packet):
956 ih = self.get_ike_header(packet)
957 self.assertEqual(ih.resp_SPI, self.sa.rspi)
958 self.assertEqual(ih.init_SPI, self.sa.ispi)
959 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
960 self.assertIn('Initiator', ih.flags)
961 self.assertNotIn('Response', ih.flags)
965 self.assertEqual(ih.id, self.sa.msg_id + 1)
967 plain = self.sa.hmac_and_decrypt(ih)
968 idi = ikev2.IKEv2_payload_IDi(plain)
969 idr = ikev2.IKEv2_payload_IDr(idi.payload)
970 self.assertEqual(idi.load, self.sa.i_id)
971 self.assertEqual(idr.load, self.sa.r_id)
972 prop = idi[ikev2.IKEv2_payload_Proposal]
973 c = self.sa.child_sas[0]
975 self.update_esp_transforms(
976 prop[ikev2.IKEv2_payload_Transform], self.sa)
978 def send_init_response(self):
979 tr_attr = self.sa.ike_crypto_attr()
980 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
981 transform_id=self.sa.ike_crypto, length=tr_attr[1],
982 key_length=tr_attr[0]) /
983 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
984 transform_id=self.sa.ike_integ) /
985 ikev2.IKEv2_payload_Transform(transform_type='PRF',
986 transform_id=self.sa.ike_prf_alg.name) /
987 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
988 transform_id=self.sa.ike_dh))
989 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
990 trans_nb=4, trans=trans))
991 self.sa.init_resp_packet = (
992 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
993 exch_type='IKE_SA_INIT', flags='Response') /
994 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
995 ikev2.IKEv2_payload_KE(next_payload='Nonce',
996 group=self.sa.ike_dh,
997 load=self.sa.my_dh_pub_key) /
998 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
1000 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1001 self.sa.sport, self.sa.dport,
1002 self.sa.natt, self.ip6)
1003 self.pg_send(self.pg0, ike_msg)
1004 capture = self.pg0.get_capture(1)
1005 self.verify_sa_auth_req(capture[0])
1007 def initiate_sa_init(self):
1008 self.pg0.enable_capture()
1010 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1012 capture = self.pg0.get_capture(1)
1013 self.verify_sa_init_request(capture[0])
1014 self.send_init_response()
1016 def send_auth_response(self):
1017 tr_attr = self.sa.esp_crypto_attr()
1018 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1019 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1020 key_length=tr_attr[0]) /
1021 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1022 transform_id=self.sa.esp_integ) /
1023 ikev2.IKEv2_payload_Transform(
1024 transform_type='Extended Sequence Number',
1025 transform_id='No ESN') /
1026 ikev2.IKEv2_payload_Transform(
1027 transform_type='Extended Sequence Number',
1028 transform_id='ESN'))
1030 c = self.sa.child_sas[0]
1031 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1032 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1034 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1035 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1036 IDtype=self.sa.id_type, load=self.sa.i_id) /
1037 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1038 IDtype=self.sa.id_type, load=self.sa.r_id) /
1039 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1040 auth_type=AuthMethod.value(self.sa.auth_method),
1041 load=self.sa.auth_data) /
1042 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1043 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1044 number_of_TSs=len(tsi),
1045 traffic_selector=tsi) /
1046 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1047 number_of_TSs=len(tsr),
1048 traffic_selector=tsr) /
1049 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1051 header = ikev2.IKEv2(
1052 init_SPI=self.sa.ispi,
1053 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1054 flags='Response', exch_type='IKE_AUTH')
1056 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1057 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1058 self.sa.dport, self.sa.natt, self.ip6)
1059 self.pg_send(self.pg0, packet)
1061 def test_initiator(self):
1062 self.initiate_sa_init()
1064 self.sa.calc_child_keys()
1065 self.send_auth_response()
1066 self.verify_ike_sas()
1069 class TemplateResponder(IkePeer):
1070 """ responder test template """
1072 def initiate_del_sa_from_responder(self):
1073 self.pg0.enable_capture()
1075 self.vapi.ikev2_initiate_del_ike_sa(
1076 ispi=int.from_bytes(self.sa.ispi, 'little'))
1077 capture = self.pg0.get_capture(1)
1078 ih = self.get_ike_header(capture[0])
1079 self.assertNotIn('Response', ih.flags)
1080 self.assertNotIn('Initiator', ih.flags)
1081 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1082 plain = self.sa.hmac_and_decrypt(ih)
1083 d = ikev2.IKEv2_payload_Delete(plain)
1084 self.assertEqual(d.proto, 1) # proto=IKEv2
1085 self.assertEqual(ih.init_SPI, self.sa.ispi)
1086 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1087 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1088 flags='Initiator+Response',
1089 exch_type='INFORMATIONAL',
1090 id=ih.id, next_payload='Encrypted')
1091 resp = self.encrypt_ike_msg(header, b'', None)
1092 self.send_and_assert_no_replies(self.pg0, resp)
1094 def verify_del_sa(self, packet):
1095 ih = self.get_ike_header(packet)
1096 self.assertEqual(ih.id, self.sa.msg_id)
1097 self.assertEqual(ih.exch_type, 37) # exchange informational
1098 self.assertIn('Response', ih.flags)
1099 self.assertNotIn('Initiator', ih.flags)
1100 self.assertEqual(ih.next_payload, 46) # Encrypted
1101 self.assertEqual(ih.init_SPI, self.sa.ispi)
1102 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1103 plain = self.sa.hmac_and_decrypt(ih)
1104 self.assertEqual(plain, b'')
1106 def initiate_del_sa_from_initiator(self):
1107 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1108 flags='Initiator', exch_type='INFORMATIONAL',
1109 id=self.sa.new_msg_id())
1110 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1111 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1112 packet = self.create_packet(self.pg0, ike_msg,
1113 self.sa.sport, self.sa.dport,
1114 self.sa.natt, self.ip6)
1115 self.pg0.add_stream(packet)
1116 self.pg0.enable_capture()
1118 capture = self.pg0.get_capture(1)
1119 self.verify_del_sa(capture[0])
1121 def send_sa_init_req(self, behind_nat=False):
1122 tr_attr = self.sa.ike_crypto_attr()
1123 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1124 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1125 key_length=tr_attr[0]) /
1126 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1127 transform_id=self.sa.ike_integ) /
1128 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1129 transform_id=self.sa.ike_prf_alg.name) /
1130 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1131 transform_id=self.sa.ike_dh))
1133 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1134 trans_nb=4, trans=trans))
1136 self.sa.init_req_packet = (
1137 ikev2.IKEv2(init_SPI=self.sa.ispi,
1138 flags='Initiator', exch_type='IKE_SA_INIT') /
1139 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1140 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1141 group=self.sa.ike_dh,
1142 load=self.sa.my_dh_pub_key) /
1143 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1144 load=self.sa.i_nonce))
1147 src_address = b'\x0a\x0a\x0a\x01'
1149 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1151 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1152 dst_nat = self.sa.compute_nat_sha1(
1153 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1155 nat_src_detection = ikev2.IKEv2_payload_Notify(
1156 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1157 next_payload='Notify')
1158 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1159 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1160 self.sa.init_req_packet = (self.sa.init_req_packet /
1164 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1165 self.sa.sport, self.sa.dport,
1166 self.sa.natt, self.ip6)
1167 self.pg0.add_stream(ike_msg)
1168 self.pg0.enable_capture()
1170 capture = self.pg0.get_capture(1)
1171 self.verify_sa_init(capture[0])
1173 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1174 tr_attr = self.sa.esp_crypto_attr()
1175 last_payload = last_payload or 'Notify'
1176 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1177 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1178 key_length=tr_attr[0]) /
1179 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1180 transform_id=self.sa.esp_integ) /
1181 ikev2.IKEv2_payload_Transform(
1182 transform_type='Extended Sequence Number',
1183 transform_id='No ESN') /
1184 ikev2.IKEv2_payload_Transform(
1185 transform_type='Extended Sequence Number',
1186 transform_id='ESN'))
1188 c = self.sa.child_sas[0]
1189 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1190 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1192 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1193 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1194 auth_type=AuthMethod.value(self.sa.auth_method),
1195 load=self.sa.auth_data) /
1196 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1197 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1198 number_of_TSs=len(tsi), traffic_selector=tsi) /
1199 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1200 number_of_TSs=len(tsr), traffic_selector=tsr))
1203 first_payload = 'Nonce'
1204 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1205 next_payload='SA') / plain /
1206 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1207 proto='ESP', SPI=c.ispi))
1209 first_payload = 'IDi'
1210 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1211 IDtype=self.sa.id_type, load=self.sa.i_id) /
1212 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1213 IDtype=self.sa.id_type, load=self.sa.r_id))
1215 return plain, first_payload
1217 def send_sa_auth(self):
1218 plain, first_payload = self.generate_auth_payload(
1219 last_payload='Notify')
1220 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1221 header = ikev2.IKEv2(
1222 init_SPI=self.sa.ispi,
1223 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1224 flags='Initiator', exch_type='IKE_AUTH')
1226 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1227 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1228 self.sa.dport, self.sa.natt, self.ip6)
1229 self.pg0.add_stream(packet)
1230 self.pg0.enable_capture()
1232 capture = self.pg0.get_capture(1)
1233 self.verify_sa_auth_resp(capture[0])
1235 def verify_sa_init(self, packet):
1236 ih = self.get_ike_header(packet)
1238 self.assertEqual(ih.id, self.sa.msg_id)
1239 self.assertEqual(ih.exch_type, 34)
1240 self.assertIn('Response', ih.flags)
1241 self.assertEqual(ih.init_SPI, self.sa.ispi)
1242 self.assertNotEqual(ih.resp_SPI, 0)
1243 self.sa.rspi = ih.resp_SPI
1245 sa = ih[ikev2.IKEv2_payload_SA]
1246 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1247 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1248 except IndexError as e:
1249 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1250 self.logger.error(ih.show())
1252 self.sa.complete_dh_data()
1256 def verify_sa_auth_resp(self, packet):
1257 ike = self.get_ike_header(packet)
1259 self.verify_udp(udp)
1260 self.assertEqual(ike.id, self.sa.msg_id)
1261 plain = self.sa.hmac_and_decrypt(ike)
1262 idr = ikev2.IKEv2_payload_IDr(plain)
1263 prop = idr[ikev2.IKEv2_payload_Proposal]
1264 self.assertEqual(prop.SPIsize, 4)
1265 self.sa.child_sas[0].rspi = prop.SPI
1266 self.sa.calc_child_keys()
1268 def test_responder(self):
1269 self.send_sa_init_req(self.sa.natt)
1271 self.verify_ipsec_sas()
1272 self.verify_ike_sas()
1275 class Ikev2Params(object):
1276 def config_params(self, params={}):
1277 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1278 ei = VppEnum.vl_api_ipsec_integ_alg_t
1280 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1281 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1282 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1283 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1284 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1285 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1287 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1288 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1289 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1290 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1292 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1293 not in params else params['del_sa_from_responder']
1294 is_natt = 'natt' in params and params['natt'] or False
1295 self.p = Profile(self, 'pr1')
1296 self.ip6 = False if 'ip6' not in params else params['ip6']
1298 if 'auth' in params and params['auth'] == 'rsa-sig':
1299 auth_method = 'rsa-sig'
1300 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1301 self.vapi.ikev2_set_local_key(
1302 key_file=work_dir + params['server-key'])
1304 client_file = work_dir + params['client-cert']
1305 server_pem = open(work_dir + params['server-cert']).read()
1306 client_priv = open(work_dir + params['client-key']).read()
1307 client_priv = load_pem_private_key(str.encode(client_priv), None,
1309 self.peer_cert = x509.load_pem_x509_certificate(
1310 str.encode(server_pem),
1312 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1315 auth_data = b'$3cr3tpa$$w0rd'
1316 self.p.add_auth(method='shared-key', data=auth_data)
1317 auth_method = 'shared-key'
1320 is_init = True if 'is_initiator' not in params else\
1321 params['is_initiator']
1323 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1324 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1326 self.p.add_local_id(**idr)
1327 self.p.add_remote_id(**idi)
1329 self.p.add_local_id(**idi)
1330 self.p.add_remote_id(**idr)
1332 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1333 'loc_ts' not in params else params['loc_ts']
1334 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1335 'rem_ts' not in params else params['rem_ts']
1336 self.p.add_local_ts(**loc_ts)
1337 self.p.add_remote_ts(**rem_ts)
1338 if 'responder' in params:
1339 self.p.add_responder(params['responder'])
1340 if 'ike_transforms' in params:
1341 self.p.add_ike_transforms(params['ike_transforms'])
1342 if 'esp_transforms' in params:
1343 self.p.add_esp_transforms(params['esp_transforms'])
1345 udp_encap = False if 'udp_encap' not in params else\
1348 self.p.set_udp_encap(True)
1350 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1351 is_initiator=is_init,
1352 id_type=self.p.local_id['id_type'], natt=is_natt,
1353 priv_key=client_priv, auth_method=auth_method,
1354 auth_data=auth_data, udp_encap=udp_encap,
1355 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1357 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1358 params['ike-crypto']
1359 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1361 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1364 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1365 params['esp-crypto']
1366 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1369 self.sa.set_ike_props(
1370 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1371 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1372 self.sa.set_esp_props(
1373 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1377 class TestApi(VppTestCase):
1378 """ Test IKEV2 API """
1380 def setUpClass(cls):
1381 super(TestApi, cls).setUpClass()
1384 def tearDownClass(cls):
1385 super(TestApi, cls).tearDownClass()
1388 super(TestApi, self).tearDown()
1389 self.p1.remove_vpp_config()
1390 self.p2.remove_vpp_config()
1391 r = self.vapi.ikev2_profile_dump()
1392 self.assertEqual(len(r), 0)
1394 def configure_profile(self, cfg):
1395 p = Profile(self, cfg['name'])
1396 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1397 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1398 p.add_local_ts(**cfg['loc_ts'])
1399 p.add_remote_ts(**cfg['rem_ts'])
1400 p.add_responder(cfg['responder'])
1401 p.add_ike_transforms(cfg['ike_ts'])
1402 p.add_esp_transforms(cfg['esp_ts'])
1403 p.add_auth(**cfg['auth'])
1404 p.set_udp_encap(cfg['udp_encap'])
1405 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1406 if 'lifetime_data' in cfg:
1407 p.set_lifetime_data(cfg['lifetime_data'])
1408 if 'tun_itf' in cfg:
1409 p.set_tunnel_interface(cfg['tun_itf'])
1410 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1415 def test_profile_api(self):
1416 """ test profile dump API """
1421 'start_addr': '3.3.3.2',
1422 'end_addr': '3.3.3.3',
1428 'start_addr': '4.5.76.80',
1429 'end_addr': '2.3.4.6',
1436 'start_addr': 'ab::1',
1437 'end_addr': 'ab::4',
1443 'start_addr': 'cd::12',
1444 'end_addr': 'cd::13',
1450 'natt_disabled': True,
1451 'loc_id': ('fqdn', b'vpp.home'),
1452 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1455 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1458 'crypto_key_size': 32,
1463 'crypto_key_size': 24,
1465 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1467 'ipsec_over_udp_port': 4501,
1470 'lifetime_maxdata': 20192,
1471 'lifetime_jitter': 9,
1476 'loc_id': ('ip4-addr', b'192.168.2.1'),
1477 'rem_id': ('ip6-addr', b'abcd::1'),
1480 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1483 'crypto_key_size': 16,
1488 'crypto_key_size': 24,
1490 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1492 'ipsec_over_udp_port': 4600,
1495 self.p1 = self.configure_profile(conf['p1'])
1496 self.p2 = self.configure_profile(conf['p2'])
1498 r = self.vapi.ikev2_profile_dump()
1499 self.assertEqual(len(r), 2)
1500 self.verify_profile(r[0].profile, conf['p1'])
1501 self.verify_profile(r[1].profile, conf['p2'])
1503 def verify_id(self, api_id, cfg_id):
1504 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1505 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1507 def verify_ts(self, api_ts, cfg_ts):
1508 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1509 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1510 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1511 self.assertEqual(api_ts.start_addr,
1512 ip_address(text_type(cfg_ts['start_addr'])))
1513 self.assertEqual(api_ts.end_addr,
1514 ip_address(text_type(cfg_ts['end_addr'])))
1516 def verify_responder(self, api_r, cfg_r):
1517 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1518 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1520 def verify_transforms(self, api_ts, cfg_ts):
1521 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1522 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1523 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1525 def verify_ike_transforms(self, api_ts, cfg_ts):
1526 self.verify_transforms(api_ts, cfg_ts)
1527 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1529 def verify_esp_transforms(self, api_ts, cfg_ts):
1530 self.verify_transforms(api_ts, cfg_ts)
1532 def verify_auth(self, api_auth, cfg_auth):
1533 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1534 self.assertEqual(api_auth.data, cfg_auth['data'])
1535 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1537 def verify_lifetime_data(self, p, ld):
1538 self.assertEqual(p.lifetime, ld['lifetime'])
1539 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1540 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1541 self.assertEqual(p.handover, ld['handover'])
1543 def verify_profile(self, ap, cp):
1544 self.assertEqual(ap.name, cp['name'])
1545 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1546 self.verify_id(ap.loc_id, cp['loc_id'])
1547 self.verify_id(ap.rem_id, cp['rem_id'])
1548 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1549 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1550 self.verify_responder(ap.responder, cp['responder'])
1551 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1552 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1553 self.verify_auth(ap.auth, cp['auth'])
1554 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1555 self.assertTrue(natt_dis == ap.natt_disabled)
1557 if 'lifetime_data' in cp:
1558 self.verify_lifetime_data(ap, cp['lifetime_data'])
1559 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1561 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1563 self.assertEqual(ap.tun_itf, 0xffffffff)
1566 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1567 """ test ikev2 initiator - pre shared key auth """
1569 def config_tc(self):
1570 self.config_params({
1571 'is_initiator': False, # seen from test case perspective
1572 # thus vpp is initiator
1573 'responder': {'sw_if_index': self.pg0.sw_if_index,
1574 'addr': self.pg0.remote_ip4},
1575 'ike-crypto': ('AES-GCM-16ICV', 32),
1576 'ike-integ': 'NULL',
1577 'ike-dh': '3072MODPgr',
1579 'crypto_alg': 20, # "aes-gcm-16"
1580 'crypto_key_size': 256,
1581 'dh_group': 15, # "modp-3072"
1584 'crypto_alg': 12, # "aes-cbc"
1585 'crypto_key_size': 256,
1586 # "hmac-sha2-256-128"
1590 class TestInitiatorRekey(TestInitiatorPsk):
1591 """ test ikev2 initiator - rekey """
1593 def rekey_from_initiator(self):
1594 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1595 self.pg0.enable_capture()
1597 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1598 capture = self.pg0.get_capture(1)
1599 ih = self.get_ike_header(capture[0])
1600 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1601 self.assertNotIn('Response', ih.flags)
1602 self.assertIn('Initiator', ih.flags)
1603 plain = self.sa.hmac_and_decrypt(ih)
1604 sa = ikev2.IKEv2_payload_SA(plain)
1605 prop = sa[ikev2.IKEv2_payload_Proposal]
1606 nonce = sa[ikev2.IKEv2_payload_Nonce]
1607 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1608 self.sa.r_nonce = self.sa.i_nonce
1609 # update new responder SPI
1610 self.sa.child_sas[0].ispi = prop.SPI
1611 self.sa.child_sas[0].rspi = prop.SPI
1612 self.sa.calc_child_keys()
1613 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1614 flags='Response', exch_type=36,
1615 id=ih.id, next_payload='Encrypted')
1616 resp = self.encrypt_ike_msg(header, sa, 'SA')
1617 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1618 self.sa.dport, self.sa.natt, self.ip6)
1619 self.send_and_assert_no_replies(self.pg0, packet)
1621 def test_initiator(self):
1622 super(TestInitiatorRekey, self).test_initiator()
1623 self.rekey_from_initiator()
1624 self.verify_ike_sas()
1625 self.verify_ipsec_sas(is_rekey=True)
1628 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1629 """ test ikev2 initiator - delete IKE SA from responder """
1631 def config_tc(self):
1632 self.config_params({
1633 'del_sa_from_responder': True,
1634 'is_initiator': False, # seen from test case perspective
1635 # thus vpp is initiator
1636 'responder': {'sw_if_index': self.pg0.sw_if_index,
1637 'addr': self.pg0.remote_ip4},
1638 'ike-crypto': ('AES-GCM-16ICV', 32),
1639 'ike-integ': 'NULL',
1640 'ike-dh': '3072MODPgr',
1642 'crypto_alg': 20, # "aes-gcm-16"
1643 'crypto_key_size': 256,
1644 'dh_group': 15, # "modp-3072"
1647 'crypto_alg': 12, # "aes-cbc"
1648 'crypto_key_size': 256,
1649 # "hmac-sha2-256-128"
1653 class TestResponderNATT(TemplateResponder, Ikev2Params):
1654 """ test ikev2 responder - nat traversal """
1655 def config_tc(self):
1660 class TestResponderPsk(TemplateResponder, Ikev2Params):
1661 """ test ikev2 responder - pre shared key auth """
1662 def config_tc(self):
1663 self.config_params()
1666 class TestResponderRekey(TestResponderPsk):
1667 """ test ikev2 responder - rekey """
1669 def rekey_from_initiator(self):
1670 sa, first_payload = self.generate_auth_payload(is_rekey=True)
1671 header = ikev2.IKEv2(
1672 init_SPI=self.sa.ispi,
1673 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1674 flags='Initiator', exch_type='CREATE_CHILD_SA')
1676 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
1677 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1678 self.sa.dport, self.sa.natt, self.ip6)
1679 self.pg0.add_stream(packet)
1680 self.pg0.enable_capture()
1682 capture = self.pg0.get_capture(1)
1683 ih = self.get_ike_header(capture[0])
1684 plain = self.sa.hmac_and_decrypt(ih)
1685 sa = ikev2.IKEv2_payload_SA(plain)
1686 prop = sa[ikev2.IKEv2_payload_Proposal]
1687 nonce = sa[ikev2.IKEv2_payload_Nonce]
1688 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1689 # update new responder SPI
1690 self.sa.child_sas[0].rspi = prop.SPI
1692 def test_responder(self):
1693 super(TestResponderRekey, self).test_responder()
1694 self.rekey_from_initiator()
1695 self.sa.calc_child_keys()
1696 self.verify_ike_sas()
1697 self.verify_ipsec_sas(is_rekey=True)
1700 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1701 """ test ikev2 responder - cert based auth """
1702 def config_tc(self):
1703 self.config_params({
1706 'server-key': 'server-key.pem',
1707 'client-key': 'client-key.pem',
1708 'client-cert': 'client-cert.pem',
1709 'server-cert': 'server-cert.pem'})
1712 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1713 (TemplateResponder, Ikev2Params):
1715 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1717 def config_tc(self):
1718 self.config_params({
1719 'ike-crypto': ('AES-CBC', 16),
1720 'ike-integ': 'SHA2-256-128',
1721 'esp-crypto': ('AES-CBC', 24),
1722 'esp-integ': 'SHA2-384-192',
1723 'ike-dh': '2048MODPgr'})
1726 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1727 (TemplateResponder, Ikev2Params):
1729 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1731 def config_tc(self):
1732 self.config_params({
1733 'ike-crypto': ('AES-CBC', 32),
1734 'ike-integ': 'SHA2-256-128',
1735 'esp-crypto': ('AES-GCM-16ICV', 32),
1736 'esp-integ': 'NULL',
1737 'ike-dh': '3072MODPgr'})
1740 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1744 def config_tc(self):
1745 self.config_params({
1746 'del_sa_from_responder': True,
1749 'ike-crypto': ('AES-GCM-16ICV', 32),
1750 'ike-integ': 'NULL',
1751 'ike-dh': '2048MODPgr',
1752 'loc_ts': {'start_addr': 'ab:cd::0',
1753 'end_addr': 'ab:cd::10'},
1754 'rem_ts': {'start_addr': '11::0',
1755 'end_addr': '11::100'}})
1758 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1759 """ malformed packet test """
1764 def config_tc(self):
1765 self.config_params()
1767 def assert_counter(self, count, name, version='ip4'):
1768 node_name = '/err/ikev2-%s/' % version + name
1769 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1771 def create_ike_init_msg(self, length=None, payload=None):
1772 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1773 flags='Initiator', exch_type='IKE_SA_INIT')
1774 if payload is not None:
1776 return self.create_packet(self.pg0, msg, self.sa.sport,
1779 def verify_bad_packet_length(self):
1780 ike_msg = self.create_ike_init_msg(length=0xdead)
1781 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1782 self.assert_counter(self.pkt_count, 'Bad packet length')
1784 def verify_bad_sa_payload_length(self):
1785 p = ikev2.IKEv2_payload_SA(length=0xdead)
1786 ike_msg = self.create_ike_init_msg(payload=p)
1787 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1788 self.assert_counter(self.pkt_count, 'Malformed packet')
1790 def test_responder(self):
1791 self.pkt_count = 254
1792 self.verify_bad_packet_length()
1793 self.verify_bad_sa_payload_length()
1796 if __name__ == '__main__':
1797 unittest.main(testRunner=VppTestRunner)