2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
28 KEY_PAD = b"Key Pad for IKEv2"
35 # tuple structure is (p, g, key_len)
37 '2048MODPgr': (long_converter("""
38 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
50 '3072MODPgr': (long_converter("""
51 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
70 class CryptoAlgo(object):
71 def __init__(self, name, cipher, mode):
75 if self.cipher is not None:
76 self.bs = self.cipher.block_size // 8
78 if self.name == 'AES-GCM-16ICV':
79 self.iv_len = GCM_IV_SIZE
83 def encrypt(self, data, key, aad=None):
84 iv = os.urandom(self.iv_len)
86 encryptor = Cipher(self.cipher(key), self.mode(iv),
87 default_backend()).encryptor()
88 return iv + encryptor.update(data) + encryptor.finalize()
90 salt = key[-SALT_SIZE:]
92 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93 default_backend()).encryptor()
94 encryptor.authenticate_additional_data(aad)
95 data = encryptor.update(data) + encryptor.finalize()
96 data += encryptor.tag[:GCM_ICV_SIZE]
99 def decrypt(self, data, key, aad=None, icv=None):
101 iv = data[:self.iv_len]
102 ct = data[self.iv_len:]
103 decryptor = Cipher(algorithms.AES(key),
105 default_backend()).decryptor()
106 return decryptor.update(ct) + decryptor.finalize()
108 salt = key[-SALT_SIZE:]
109 nonce = salt + data[:GCM_IV_SIZE]
110 ct = data[GCM_IV_SIZE:]
111 key = key[:-SALT_SIZE]
112 decryptor = Cipher(algorithms.AES(key),
113 self.mode(nonce, icv, len(icv)),
114 default_backend()).decryptor()
115 decryptor.authenticate_additional_data(aad)
116 return decryptor.update(ct) + decryptor.finalize()
119 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
120 data = data + b'\x00' * (pad_len - 1)
121 return data + bytes([pad_len - 1])
124 class AuthAlgo(object):
125 def __init__(self, name, mac, mod, key_len, trunc_len=None):
129 self.key_len = key_len
130 self.trunc_len = trunc_len or key_len
134 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
135 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
136 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
141 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
142 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
143 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
144 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
145 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
149 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
150 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
155 class IKEv2ChildSA(object):
156 def __init__(self, local_ts, remote_ts, spi=None):
157 self.spi = spi or os.urandom(4)
158 self.local_ts = local_ts
159 self.remote_ts = remote_ts
162 class IKEv2SA(object):
163 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
164 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
165 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
166 auth_method='shared-key', priv_key=None, natt=False):
175 self.dh_params = None
177 self.priv_key = priv_key
178 self.is_initiator = is_initiator
179 nonce = nonce or os.urandom(32)
180 self.auth_data = auth_data
183 if isinstance(id_type, str):
184 self.id_type = IDType.value(id_type)
186 self.id_type = id_type
187 self.auth_method = auth_method
188 if self.is_initiator:
189 self.rspi = 8 * b'\x00'
194 self.ispi = 8 * b'\x00'
196 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
198 def new_msg_id(self):
203 def my_dh_pub_key(self):
204 if self.is_initiator:
205 return self.i_dh_data
206 return self.r_dh_data
209 def peer_dh_pub_key(self):
210 if self.is_initiator:
211 return self.r_dh_data
212 return self.i_dh_data
214 def compute_secret(self):
215 priv = self.dh_private_key
216 peer = self.peer_dh_pub_key
217 p, g, l = self.ike_group
218 return pow(int.from_bytes(peer, 'big'),
219 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
221 def generate_dh_data(self):
223 if self.ike_dh not in DH:
224 raise NotImplementedError('%s not in DH group' % self.ike_dh)
226 if self.dh_params is None:
227 dhg = DH[self.ike_dh]
228 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
229 self.dh_params = pn.parameters(default_backend())
231 priv = self.dh_params.generate_private_key()
232 pub = priv.public_key()
233 x = priv.private_numbers().x
234 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
235 y = pub.public_numbers().y
237 if self.is_initiator:
238 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
240 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
242 def complete_dh_data(self):
243 self.dh_shared_secret = self.compute_secret()
245 def calc_child_keys(self):
246 prf = self.ike_prf_alg.mod()
247 s = self.i_nonce + self.r_nonce
248 c = self.child_sas[0]
250 encr_key_len = self.esp_crypto_key_len
251 integ_key_len = self.esp_integ_alg.key_len
252 salt_len = 0 if integ_key_len else 4
254 l = (integ_key_len * 2 +
257 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
260 c.sk_ei = keymat[pos:pos+encr_key_len]
264 c.sk_ai = keymat[pos:pos+integ_key_len]
267 c.salt_ei = keymat[pos:pos+salt_len]
270 c.sk_er = keymat[pos:pos+encr_key_len]
274 c.sk_ar = keymat[pos:pos+integ_key_len]
277 c.salt_er = keymat[pos:pos+salt_len]
280 def calc_prfplus(self, prf, key, seed, length):
284 while len(r) < length and x < 255:
289 s = s + seed + bytes([x])
290 t = self.calc_prf(prf, key, s)
298 def calc_prf(self, prf, key, data):
299 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
304 prf = self.ike_prf_alg.mod()
305 # SKEYSEED = prf(Ni | Nr, g^ir)
306 s = self.i_nonce + self.r_nonce
307 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
309 # calculate S = Ni | Nr | SPIi SPIr
310 s = s + self.ispi + self.rspi
312 prf_key_trunc = self.ike_prf_alg.trunc_len
313 encr_key_len = self.ike_crypto_key_len
314 tr_prf_key_len = self.ike_prf_alg.key_len
315 integ_key_len = self.ike_integ_alg.key_len
316 if integ_key_len == 0:
326 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
329 self.sk_d = keymat[:pos+prf_key_trunc]
332 self.sk_ai = keymat[pos:pos+integ_key_len]
334 self.sk_ar = keymat[pos:pos+integ_key_len]
337 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
338 pos += encr_key_len + salt_size
339 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
340 pos += encr_key_len + salt_size
342 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
343 pos += tr_prf_key_len
344 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
346 def generate_authmsg(self, prf, packet):
347 if self.is_initiator:
355 data = bytes([self.id_type, 0, 0, 0]) + id
356 id_hash = self.calc_prf(prf, key, data)
357 return packet + nonce + id_hash
360 prf = self.ike_prf_alg.mod()
361 if self.is_initiator:
362 packet = self.init_req_packet
364 packet = self.init_resp_packet
365 authmsg = self.generate_authmsg(prf, raw(packet))
366 if self.auth_method == 'shared-key':
367 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
368 self.auth_data = self.calc_prf(prf, psk, authmsg)
369 elif self.auth_method == 'rsa-sig':
370 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
373 raise TypeError('unknown auth method type!')
375 def encrypt(self, data, aad=None):
376 data = self.ike_crypto_alg.pad(data)
377 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
380 def peer_authkey(self):
381 if self.is_initiator:
386 def my_authkey(self):
387 if self.is_initiator:
392 def my_cryptokey(self):
393 if self.is_initiator:
398 def peer_cryptokey(self):
399 if self.is_initiator:
403 def concat(self, alg, key_len):
404 return alg + '-' + str(key_len * 8)
407 def vpp_ike_cypto_alg(self):
408 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
411 def vpp_esp_cypto_alg(self):
412 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
414 def verify_hmac(self, ikemsg):
415 integ_trunc = self.ike_integ_alg.trunc_len
416 exp_hmac = ikemsg[-integ_trunc:]
417 data = ikemsg[:-integ_trunc]
418 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
419 self.peer_authkey, data)
420 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
422 def compute_hmac(self, integ, key, data):
423 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
427 def decrypt(self, data, aad=None, icv=None):
428 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
430 def hmac_and_decrypt(self, ike):
431 ep = ike[ikev2.IKEv2_payload_Encrypted]
432 if self.ike_crypto == 'AES-GCM-16ICV':
433 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
434 ct = ep.load[:-GCM_ICV_SIZE]
435 tag = ep.load[-GCM_ICV_SIZE:]
436 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
438 self.verify_hmac(raw(ike))
439 integ_trunc = self.ike_integ_alg.trunc_len
441 # remove ICV and decrypt payload
442 ct = ep.load[:-integ_trunc]
443 plain = self.decrypt(ct)
446 return plain[:-pad_len - 1]
448 def build_ts_addr(self, ts, version):
449 return {'starting_address_v' + version: ts['start_addr'],
450 'ending_address_v' + version: ts['end_addr']}
452 def generate_ts(self, is_ip4):
453 c = self.child_sas[0]
454 ts_data = {'IP_protocol_ID': 0,
458 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
459 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
460 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
461 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
463 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
464 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
465 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
466 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
468 if self.is_initiator:
469 return ([ts1], [ts2])
470 return ([ts2], [ts1])
472 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
473 if crypto not in CRYPTO_ALGOS:
474 raise TypeError('unsupported encryption algo %r' % crypto)
475 self.ike_crypto = crypto
476 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
477 self.ike_crypto_key_len = crypto_key_len
479 if integ not in AUTH_ALGOS:
480 raise TypeError('unsupported auth algo %r' % integ)
481 self.ike_integ = None if integ == 'NULL' else integ
482 self.ike_integ_alg = AUTH_ALGOS[integ]
484 if prf not in PRF_ALGOS:
485 raise TypeError('unsupported prf algo %r' % prf)
487 self.ike_prf_alg = PRF_ALGOS[prf]
489 self.ike_group = DH[self.ike_dh]
491 def set_esp_props(self, crypto, crypto_key_len, integ):
492 self.esp_crypto_key_len = crypto_key_len
493 if crypto not in CRYPTO_ALGOS:
494 raise TypeError('unsupported encryption algo %r' % crypto)
495 self.esp_crypto = crypto
496 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
498 if integ not in AUTH_ALGOS:
499 raise TypeError('unsupported auth algo %r' % integ)
500 self.esp_integ = None if integ == 'NULL' else integ
501 self.esp_integ_alg = AUTH_ALGOS[integ]
503 def crypto_attr(self, key_len):
504 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
505 return (0x800e << 16 | key_len << 3, 12)
507 raise Exception('unsupported attribute type')
509 def ike_crypto_attr(self):
510 return self.crypto_attr(self.ike_crypto_key_len)
512 def esp_crypto_attr(self):
513 return self.crypto_attr(self.esp_crypto_key_len)
515 def compute_nat_sha1(self, ip, port, rspi=None):
518 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
519 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
521 return digest.finalize()
524 class IkePeer(VppTestCase):
525 """ common class for initiator and responder """
529 import scapy.contrib.ikev2 as _ikev2
530 globals()['ikev2'] = _ikev2
531 super(IkePeer, cls).setUpClass()
532 cls.create_pg_interfaces(range(2))
533 for i in cls.pg_interfaces:
541 def tearDownClass(cls):
542 super(IkePeer, cls).tearDownClass()
545 super(IkePeer, self).tearDown()
546 if self.del_sa_from_responder:
547 self.initiate_del_sa_from_responder()
549 self.initiate_del_sa_from_initiator()
550 r = self.vapi.ikev2_sa_dump()
551 self.assertEqual(len(r), 0)
552 sas = self.vapi.ipsec_sa_dump()
553 self.assertEqual(len(sas), 0)
554 self.p.remove_vpp_config()
555 self.assertIsNone(self.p.query_vpp_config())
558 super(IkePeer, self).setUp()
560 self.p.add_vpp_config()
561 self.assertIsNotNone(self.p.query_vpp_config())
562 self.sa.generate_dh_data()
563 self.vapi.cli('ikev2 set logging level 4')
564 self.vapi.cli('event-lo clear')
566 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
569 src_ip = src_if.remote_ip6
570 dst_ip = src_if.local_ip6
573 src_ip = src_if.remote_ip4
574 dst_ip = src_if.local_ip4
576 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
577 ip_layer(src=src_ip, dst=dst_ip) /
578 UDP(sport=sport, dport=dport))
580 # insert non ESP marker
581 res = res / Raw(b'\x00' * 4)
584 def verify_udp(self, udp):
585 self.assertEqual(udp.sport, self.sa.sport)
586 self.assertEqual(udp.dport, self.sa.dport)
588 def get_ike_header(self, packet):
590 ih = packet[ikev2.IKEv2]
591 except IndexError as e:
592 # this is a workaround for getting IKEv2 layer as both ikev2 and
593 # ipsec register for port 4500
595 ih = self.verify_and_remove_non_esp_marker(esp)
596 self.assertEqual(ih.version, 0x20)
597 self.assertNotIn('Version', ih.flags)
600 def verify_and_remove_non_esp_marker(self, packet):
602 # if we are in nat traversal mode check for non esp marker
605 self.assertEqual(data[:4], b'\x00' * 4)
606 return ikev2.IKEv2(data[4:])
610 def encrypt_ike_msg(self, header, plain, first_payload):
611 if self.sa.ike_crypto == 'AES-GCM-16ICV':
612 data = self.sa.ike_crypto_alg.pad(raw(plain))
613 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
614 len(ikev2.IKEv2_payload_Encrypted())
615 tlen = plen + len(ikev2.IKEv2())
618 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
622 encr = self.sa.encrypt(raw(plain), raw(res))
623 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
624 length=plen, load=encr)
627 encr = self.sa.encrypt(raw(plain))
628 trunc_len = self.sa.ike_integ_alg.trunc_len
629 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
630 tlen = plen + len(ikev2.IKEv2())
632 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
633 length=plen, load=encr)
637 integ_data = raw(res)
638 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
639 self.sa.my_authkey, integ_data)
640 res = res / Raw(hmac_data[:trunc_len])
641 assert(len(res) == tlen)
644 def verify_ipsec_sas(self):
645 sas = self.vapi.ipsec_sa_dump()
646 self.assertEqual(len(sas), 2)
647 e = VppEnum.vl_api_ipsec_sad_flags_t
648 if self.sa.is_initiator:
655 c = self.sa.child_sas[0]
657 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
658 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
659 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
661 if self.sa.esp_integ is None:
664 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
665 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
666 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
669 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
670 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
671 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
672 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
676 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
677 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
678 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
679 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
681 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
682 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
684 def verify_keymat(self, api_keys, keys, name):
685 km = getattr(keys, name)
686 api_km = getattr(api_keys, name)
687 api_km_len = getattr(api_keys, name + '_len')
688 self.assertEqual(len(km), api_km_len)
689 self.assertEqual(km, api_km[:api_km_len])
691 def verify_id(self, api_id, exp_id):
692 self.assertEqual(api_id.type, IDType.value(exp_id.type))
693 self.assertEqual(api_id.data_len, exp_id.data_len)
694 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
696 def verify_ike_sas(self):
697 r = self.vapi.ikev2_sa_dump()
698 self.assertEqual(len(r), 1)
700 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
701 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
703 if self.sa.is_initiator:
704 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
705 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
707 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
708 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
710 if self.sa.is_initiator:
711 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
712 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
714 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
715 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
716 self.verify_keymat(sa.keys, self.sa, 'sk_d')
717 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
718 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
719 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
720 self.verify_keymat(sa.keys, self.sa, 'sk_er')
721 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
722 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
724 self.assertEqual(sa.i_id.type, self.sa.id_type)
725 self.assertEqual(sa.r_id.type, self.sa.id_type)
726 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
727 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
728 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
729 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
731 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
732 self.assertEqual(len(r), 1)
734 self.assertEqual(csa.sa_index, sa.sa_index)
735 c = self.sa.child_sas[0]
736 if hasattr(c, 'sk_ai'):
737 self.verify_keymat(csa.keys, c, 'sk_ai')
738 self.verify_keymat(csa.keys, c, 'sk_ar')
739 self.verify_keymat(csa.keys, c, 'sk_ei')
740 self.verify_keymat(csa.keys, c, 'sk_er')
742 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
745 r = self.vapi.ikev2_traffic_selector_dump(
746 is_initiator=True, sa_index=sa.sa_index,
747 child_sa_index=csa.child_sa_index)
748 self.assertEqual(len(r), 1)
750 self.verify_ts(r[0].ts, tsi[0], True)
752 r = self.vapi.ikev2_traffic_selector_dump(
753 is_initiator=False, sa_index=sa.sa_index,
754 child_sa_index=csa.child_sa_index)
755 self.assertEqual(len(r), 1)
756 self.verify_ts(r[0].ts, tsr[0], False)
758 n = self.vapi.ikev2_nonce_get(is_initiator=True,
759 sa_index=sa.sa_index)
760 self.verify_nonce(n, self.sa.i_nonce)
761 n = self.vapi.ikev2_nonce_get(is_initiator=False,
762 sa_index=sa.sa_index)
763 self.verify_nonce(n, self.sa.r_nonce)
765 def verify_nonce(self, api_nonce, nonce):
766 self.assertEqual(api_nonce.data_len, len(nonce))
767 self.assertEqual(api_nonce.nonce, nonce)
769 def verify_ts(self, api_ts, ts, is_initiator):
771 self.assertTrue(api_ts.is_local)
773 self.assertFalse(api_ts.is_local)
776 self.assertEqual(api_ts.start_addr,
777 IPv4Address(ts.starting_address_v4))
778 self.assertEqual(api_ts.end_addr,
779 IPv4Address(ts.ending_address_v4))
781 self.assertEqual(api_ts.start_addr,
782 IPv6Address(ts.starting_address_v6))
783 self.assertEqual(api_ts.end_addr,
784 IPv6Address(ts.ending_address_v6))
785 self.assertEqual(api_ts.start_port, ts.start_port)
786 self.assertEqual(api_ts.end_port, ts.end_port)
787 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
790 class TemplateInitiator(IkePeer):
791 """ initiator test template """
793 def initiate_del_sa_from_initiator(self):
794 ispi = int.from_bytes(self.sa.ispi, 'little')
795 self.pg0.enable_capture()
797 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
798 capture = self.pg0.get_capture(1)
799 ih = self.get_ike_header(capture[0])
800 self.assertNotIn('Response', ih.flags)
801 self.assertIn('Initiator', ih.flags)
802 self.assertEqual(ih.init_SPI, self.sa.ispi)
803 self.assertEqual(ih.resp_SPI, self.sa.rspi)
804 plain = self.sa.hmac_and_decrypt(ih)
805 d = ikev2.IKEv2_payload_Delete(plain)
806 self.assertEqual(d.proto, 1) # proto=IKEv2
807 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
808 flags='Response', exch_type='INFORMATIONAL',
809 id=ih.id, next_payload='Encrypted')
810 resp = self.encrypt_ike_msg(header, b'', None)
811 self.send_and_assert_no_replies(self.pg0, resp)
813 def verify_del_sa(self, packet):
814 ih = self.get_ike_header(packet)
815 self.assertEqual(ih.id, self.sa.msg_id)
816 self.assertEqual(ih.exch_type, 37) # exchange informational
817 self.assertIn('Response', ih.flags)
818 self.assertIn('Initiator', ih.flags)
819 plain = self.sa.hmac_and_decrypt(ih)
820 self.assertEqual(plain, b'')
822 def initiate_del_sa_from_responder(self):
823 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
824 exch_type='INFORMATIONAL',
825 id=self.sa.new_msg_id())
826 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
827 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
828 packet = self.create_packet(self.pg0, ike_msg,
829 self.sa.sport, self.sa.dport,
830 self.sa.natt, self.ip6)
831 self.pg0.add_stream(packet)
832 self.pg0.enable_capture()
834 capture = self.pg0.get_capture(1)
835 self.verify_del_sa(capture[0])
838 def find_notify_payload(packet, notify_type):
839 n = packet[ikev2.IKEv2_payload_Notify]
841 if n.type == notify_type:
846 def verify_nat_detection(self, packet):
853 # NAT_DETECTION_SOURCE_IP
854 s = self.find_notify_payload(packet, 16388)
855 self.assertIsNotNone(s)
856 src_sha = self.sa.compute_nat_sha1(
857 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
858 self.assertEqual(s.load, src_sha)
860 # NAT_DETECTION_DESTINATION_IP
861 s = self.find_notify_payload(packet, 16389)
862 self.assertIsNotNone(s)
863 dst_sha = self.sa.compute_nat_sha1(
864 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
865 self.assertEqual(s.load, dst_sha)
867 def verify_sa_init_request(self, packet):
868 ih = packet[ikev2.IKEv2]
869 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
870 self.assertEqual(ih.exch_type, 34) # SA_INIT
871 self.sa.ispi = ih.init_SPI
872 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
873 self.assertIn('Initiator', ih.flags)
874 self.assertNotIn('Response', ih.flags)
875 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
876 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
878 prop = packet[ikev2.IKEv2_payload_Proposal]
879 self.assertEqual(prop.proto, 1) # proto = ikev2
880 self.assertEqual(prop.proposal, 1)
881 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
882 self.assertEqual(prop.trans[0].transform_id,
883 self.p.ike_transforms['crypto_alg'])
884 self.assertEqual(prop.trans[1].transform_type, 2) # prf
885 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
886 self.assertEqual(prop.trans[2].transform_type, 4) # dh
887 self.assertEqual(prop.trans[2].transform_id,
888 self.p.ike_transforms['dh_group'])
890 self.verify_nat_detection(packet)
891 self.sa.complete_dh_data()
894 def verify_sa_auth_req(self, packet):
895 ih = self.get_ike_header(packet)
896 self.assertEqual(ih.resp_SPI, self.sa.rspi)
897 self.assertEqual(ih.init_SPI, self.sa.ispi)
898 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
899 self.assertIn('Initiator', ih.flags)
900 self.assertNotIn('Response', ih.flags)
904 self.assertEqual(ih.id, self.sa.msg_id + 1)
906 plain = self.sa.hmac_and_decrypt(ih)
907 idi = ikev2.IKEv2_payload_IDi(plain)
908 idr = ikev2.IKEv2_payload_IDr(idi.payload)
909 self.assertEqual(idi.load, self.sa.i_id)
910 self.assertEqual(idr.load, self.sa.r_id)
912 def send_init_response(self):
913 tr_attr = self.sa.ike_crypto_attr()
914 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
915 transform_id=self.sa.ike_crypto, length=tr_attr[1],
916 key_length=tr_attr[0]) /
917 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
918 transform_id=self.sa.ike_integ) /
919 ikev2.IKEv2_payload_Transform(transform_type='PRF',
920 transform_id=self.sa.ike_prf_alg.name) /
921 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
922 transform_id=self.sa.ike_dh))
923 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
924 trans_nb=4, trans=trans))
925 self.sa.init_resp_packet = (
926 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
927 exch_type='IKE_SA_INIT', flags='Response') /
928 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
929 ikev2.IKEv2_payload_KE(next_payload='Nonce',
930 group=self.sa.ike_dh,
931 load=self.sa.my_dh_pub_key) /
932 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
934 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
935 self.sa.sport, self.sa.dport,
936 self.sa.natt, self.ip6)
937 self.pg_send(self.pg0, ike_msg)
938 capture = self.pg0.get_capture(1)
939 self.verify_sa_auth_req(capture[0])
941 def initiate_sa_init(self):
942 self.pg0.enable_capture()
944 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
946 capture = self.pg0.get_capture(1)
947 self.verify_sa_init_request(capture[0])
948 self.send_init_response()
950 def send_auth_response(self):
951 tr_attr = self.sa.esp_crypto_attr()
952 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
953 transform_id=self.sa.esp_crypto, length=tr_attr[1],
954 key_length=tr_attr[0]) /
955 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
956 transform_id=self.sa.esp_integ) /
957 ikev2.IKEv2_payload_Transform(
958 transform_type='Extended Sequence Number',
959 transform_id='No ESN') /
960 ikev2.IKEv2_payload_Transform(
961 transform_type='Extended Sequence Number',
964 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
965 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
967 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
968 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
969 IDtype=self.sa.id_type, load=self.sa.i_id) /
970 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
971 IDtype=self.sa.id_type, load=self.sa.r_id) /
972 ikev2.IKEv2_payload_AUTH(next_payload='SA',
973 auth_type=AuthMethod.value(self.sa.auth_method),
974 load=self.sa.auth_data) /
975 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
976 ikev2.IKEv2_payload_TSi(next_payload='TSr',
977 number_of_TSs=len(tsi),
978 traffic_selector=tsi) /
979 ikev2.IKEv2_payload_TSr(next_payload='Notify',
980 number_of_TSs=len(tsr),
981 traffic_selector=tsr) /
982 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
984 header = ikev2.IKEv2(
985 init_SPI=self.sa.ispi,
986 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
987 flags='Response', exch_type='IKE_AUTH')
989 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
990 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
991 self.sa.dport, self.sa.natt, self.ip6)
992 self.pg_send(self.pg0, packet)
994 def test_initiator(self):
995 self.initiate_sa_init()
997 self.sa.calc_child_keys()
998 self.send_auth_response()
999 self.verify_ike_sas()
1002 class TemplateResponder(IkePeer):
1003 """ responder test template """
1005 def initiate_del_sa_from_responder(self):
1006 self.pg0.enable_capture()
1008 self.vapi.ikev2_initiate_del_ike_sa(
1009 ispi=int.from_bytes(self.sa.ispi, 'little'))
1010 capture = self.pg0.get_capture(1)
1011 ih = self.get_ike_header(capture[0])
1012 self.assertNotIn('Response', ih.flags)
1013 self.assertNotIn('Initiator', ih.flags)
1014 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1015 plain = self.sa.hmac_and_decrypt(ih)
1016 d = ikev2.IKEv2_payload_Delete(plain)
1017 self.assertEqual(d.proto, 1) # proto=IKEv2
1018 self.assertEqual(ih.init_SPI, self.sa.ispi)
1019 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1020 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1021 flags='Initiator+Response',
1022 exch_type='INFORMATIONAL',
1023 id=ih.id, next_payload='Encrypted')
1024 resp = self.encrypt_ike_msg(header, b'', None)
1025 self.send_and_assert_no_replies(self.pg0, resp)
1027 def verify_del_sa(self, packet):
1028 ih = self.get_ike_header(packet)
1029 self.assertEqual(ih.id, self.sa.msg_id)
1030 self.assertEqual(ih.exch_type, 37) # exchange informational
1031 self.assertIn('Response', ih.flags)
1032 self.assertNotIn('Initiator', ih.flags)
1033 self.assertEqual(ih.next_payload, 46) # Encrypted
1034 self.assertEqual(ih.init_SPI, self.sa.ispi)
1035 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1036 plain = self.sa.hmac_and_decrypt(ih)
1037 self.assertEqual(plain, b'')
1039 def initiate_del_sa_from_initiator(self):
1040 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1041 flags='Initiator', exch_type='INFORMATIONAL',
1042 id=self.sa.new_msg_id())
1043 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1044 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1045 packet = self.create_packet(self.pg0, ike_msg,
1046 self.sa.sport, self.sa.dport,
1047 self.sa.natt, self.ip6)
1048 self.pg0.add_stream(packet)
1049 self.pg0.enable_capture()
1051 capture = self.pg0.get_capture(1)
1052 self.verify_del_sa(capture[0])
1054 def send_sa_init_req(self, behind_nat=False):
1055 tr_attr = self.sa.ike_crypto_attr()
1056 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1057 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1058 key_length=tr_attr[0]) /
1059 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1060 transform_id=self.sa.ike_integ) /
1061 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1062 transform_id=self.sa.ike_prf_alg.name) /
1063 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1064 transform_id=self.sa.ike_dh))
1066 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1067 trans_nb=4, trans=trans))
1069 self.sa.init_req_packet = (
1070 ikev2.IKEv2(init_SPI=self.sa.ispi,
1071 flags='Initiator', exch_type='IKE_SA_INIT') /
1072 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1073 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1074 group=self.sa.ike_dh,
1075 load=self.sa.my_dh_pub_key) /
1076 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1077 load=self.sa.i_nonce))
1080 src_address = b'\x0a\x0a\x0a\x01'
1082 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1084 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1085 dst_nat = self.sa.compute_nat_sha1(
1086 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1088 nat_src_detection = ikev2.IKEv2_payload_Notify(
1089 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1090 next_payload='Notify')
1091 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1092 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1093 self.sa.init_req_packet = (self.sa.init_req_packet /
1097 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1098 self.sa.sport, self.sa.dport,
1099 self.sa.natt, self.ip6)
1100 self.pg0.add_stream(ike_msg)
1101 self.pg0.enable_capture()
1103 capture = self.pg0.get_capture(1)
1104 self.verify_sa_init(capture[0])
1106 def send_sa_auth(self):
1107 tr_attr = self.sa.esp_crypto_attr()
1108 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1109 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1110 key_length=tr_attr[0]) /
1111 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1112 transform_id=self.sa.esp_integ) /
1113 ikev2.IKEv2_payload_Transform(
1114 transform_type='Extended Sequence Number',
1115 transform_id='No ESN') /
1116 ikev2.IKEv2_payload_Transform(
1117 transform_type='Extended Sequence Number',
1118 transform_id='ESN'))
1120 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1121 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
1123 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1124 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1125 IDtype=self.sa.id_type, load=self.sa.i_id) /
1126 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1127 IDtype=self.sa.id_type, load=self.sa.r_id) /
1128 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1129 auth_type=AuthMethod.value(self.sa.auth_method),
1130 load=self.sa.auth_data) /
1131 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1132 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1133 number_of_TSs=len(tsi),
1134 traffic_selector=tsi) /
1135 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1136 number_of_TSs=len(tsr),
1137 traffic_selector=tsr) /
1138 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1140 header = ikev2.IKEv2(
1141 init_SPI=self.sa.ispi,
1142 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1143 flags='Initiator', exch_type='IKE_AUTH')
1145 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1146 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1147 self.sa.dport, self.sa.natt, self.ip6)
1148 self.pg0.add_stream(packet)
1149 self.pg0.enable_capture()
1151 capture = self.pg0.get_capture(1)
1152 self.verify_sa_auth_resp(capture[0])
1154 def verify_sa_init(self, packet):
1155 ih = self.get_ike_header(packet)
1157 self.assertEqual(ih.id, self.sa.msg_id)
1158 self.assertEqual(ih.exch_type, 34)
1159 self.assertIn('Response', ih.flags)
1160 self.assertEqual(ih.init_SPI, self.sa.ispi)
1161 self.assertNotEqual(ih.resp_SPI, 0)
1162 self.sa.rspi = ih.resp_SPI
1164 sa = ih[ikev2.IKEv2_payload_SA]
1165 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1166 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1167 except IndexError as e:
1168 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1169 self.logger.error(ih.show())
1171 self.sa.complete_dh_data()
1175 def verify_sa_auth_resp(self, packet):
1176 ike = self.get_ike_header(packet)
1178 self.verify_udp(udp)
1179 self.assertEqual(ike.id, self.sa.msg_id)
1180 plain = self.sa.hmac_and_decrypt(ike)
1181 self.sa.calc_child_keys()
1183 def test_responder(self):
1184 self.send_sa_init_req(self.sa.natt)
1186 self.verify_ipsec_sas()
1187 self.verify_ike_sas()
1190 class Ikev2Params(object):
1191 def config_params(self, params={}):
1192 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1193 ei = VppEnum.vl_api_ipsec_integ_alg_t
1195 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1196 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1197 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1198 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1199 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1200 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1202 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1203 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1204 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1205 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1207 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1208 not in params else params['del_sa_from_responder']
1209 is_natt = 'natt' in params and params['natt'] or False
1210 self.p = Profile(self, 'pr1')
1211 self.ip6 = False if 'ip6' not in params else params['ip6']
1213 if 'auth' in params and params['auth'] == 'rsa-sig':
1214 auth_method = 'rsa-sig'
1215 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1216 self.vapi.ikev2_set_local_key(
1217 key_file=work_dir + params['server-key'])
1219 client_file = work_dir + params['client-cert']
1220 server_pem = open(work_dir + params['server-cert']).read()
1221 client_priv = open(work_dir + params['client-key']).read()
1222 client_priv = load_pem_private_key(str.encode(client_priv), None,
1224 self.peer_cert = x509.load_pem_x509_certificate(
1225 str.encode(server_pem),
1227 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1230 auth_data = b'$3cr3tpa$$w0rd'
1231 self.p.add_auth(method='shared-key', data=auth_data)
1232 auth_method = 'shared-key'
1235 is_init = True if 'is_initiator' not in params else\
1236 params['is_initiator']
1238 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1239 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1241 self.p.add_local_id(**idr)
1242 self.p.add_remote_id(**idi)
1244 self.p.add_local_id(**idi)
1245 self.p.add_remote_id(**idr)
1247 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1248 'loc_ts' not in params else params['loc_ts']
1249 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1250 'rem_ts' not in params else params['rem_ts']
1251 self.p.add_local_ts(**loc_ts)
1252 self.p.add_remote_ts(**rem_ts)
1253 if 'responder' in params:
1254 self.p.add_responder(params['responder'])
1255 if 'ike_transforms' in params:
1256 self.p.add_ike_transforms(params['ike_transforms'])
1257 if 'esp_transforms' in params:
1258 self.p.add_esp_transforms(params['esp_transforms'])
1260 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1261 is_initiator=is_init,
1262 id_type=self.p.local_id['id_type'], natt=is_natt,
1263 priv_key=client_priv, auth_method=auth_method,
1264 auth_data=auth_data,
1265 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1267 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1268 params['ike-crypto']
1269 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1271 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1274 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1275 params['esp-crypto']
1276 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1279 self.sa.set_ike_props(
1280 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1281 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1282 self.sa.set_esp_props(
1283 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1287 class TestApi(VppTestCase):
1288 """ Test IKEV2 API """
1290 def setUpClass(cls):
1291 super(TestApi, cls).setUpClass()
1294 def tearDownClass(cls):
1295 super(TestApi, cls).tearDownClass()
1298 super(TestApi, self).tearDown()
1299 self.p1.remove_vpp_config()
1300 self.p2.remove_vpp_config()
1301 r = self.vapi.ikev2_profile_dump()
1302 self.assertEqual(len(r), 0)
1304 def configure_profile(self, cfg):
1305 p = Profile(self, cfg['name'])
1306 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1307 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1308 p.add_local_ts(**cfg['loc_ts'])
1309 p.add_remote_ts(**cfg['rem_ts'])
1310 p.add_responder(cfg['responder'])
1311 p.add_ike_transforms(cfg['ike_ts'])
1312 p.add_esp_transforms(cfg['esp_ts'])
1313 p.add_auth(**cfg['auth'])
1314 p.set_udp_encap(cfg['udp_encap'])
1315 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1316 if 'lifetime_data' in cfg:
1317 p.set_lifetime_data(cfg['lifetime_data'])
1318 if 'tun_itf' in cfg:
1319 p.set_tunnel_interface(cfg['tun_itf'])
1323 def test_profile_api(self):
1324 """ test profile dump API """
1329 'start_addr': '3.3.3.2',
1330 'end_addr': '3.3.3.3',
1336 'start_addr': '4.5.76.80',
1337 'end_addr': '2.3.4.6',
1344 'start_addr': 'ab::1',
1345 'end_addr': 'ab::4',
1351 'start_addr': 'cd::12',
1352 'end_addr': 'cd::13',
1358 'loc_id': ('fqdn', b'vpp.home'),
1359 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1362 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1365 'crypto_key_size': 32,
1370 'crypto_key_size': 24,
1372 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1374 'ipsec_over_udp_port': 4501,
1377 'lifetime_maxdata': 20192,
1378 'lifetime_jitter': 9,
1383 'loc_id': ('ip4-addr', b'192.168.2.1'),
1384 'rem_id': ('ip6-addr', b'abcd::1'),
1387 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1390 'crypto_key_size': 16,
1395 'crypto_key_size': 24,
1397 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1399 'ipsec_over_udp_port': 4600,
1402 self.p1 = self.configure_profile(conf['p1'])
1403 self.p2 = self.configure_profile(conf['p2'])
1405 r = self.vapi.ikev2_profile_dump()
1406 self.assertEqual(len(r), 2)
1407 self.verify_profile(r[0].profile, conf['p1'])
1408 self.verify_profile(r[1].profile, conf['p2'])
1410 def verify_id(self, api_id, cfg_id):
1411 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1412 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1414 def verify_ts(self, api_ts, cfg_ts):
1415 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1416 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1417 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1418 self.assertEqual(api_ts.start_addr,
1419 ip_address(text_type(cfg_ts['start_addr'])))
1420 self.assertEqual(api_ts.end_addr,
1421 ip_address(text_type(cfg_ts['end_addr'])))
1423 def verify_responder(self, api_r, cfg_r):
1424 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1425 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1427 def verify_transforms(self, api_ts, cfg_ts):
1428 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1429 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1430 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1432 def verify_ike_transforms(self, api_ts, cfg_ts):
1433 self.verify_transforms(api_ts, cfg_ts)
1434 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1436 def verify_esp_transforms(self, api_ts, cfg_ts):
1437 self.verify_transforms(api_ts, cfg_ts)
1439 def verify_auth(self, api_auth, cfg_auth):
1440 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1441 self.assertEqual(api_auth.data, cfg_auth['data'])
1442 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1444 def verify_lifetime_data(self, p, ld):
1445 self.assertEqual(p.lifetime, ld['lifetime'])
1446 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1447 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1448 self.assertEqual(p.handover, ld['handover'])
1450 def verify_profile(self, ap, cp):
1451 self.assertEqual(ap.name, cp['name'])
1452 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1453 self.verify_id(ap.loc_id, cp['loc_id'])
1454 self.verify_id(ap.rem_id, cp['rem_id'])
1455 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1456 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1457 self.verify_responder(ap.responder, cp['responder'])
1458 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1459 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1460 self.verify_auth(ap.auth, cp['auth'])
1461 if 'lifetime_data' in cp:
1462 self.verify_lifetime_data(ap, cp['lifetime_data'])
1463 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1465 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1467 self.assertEqual(ap.tun_itf, 0xffffffff)
1470 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1471 """ test ikev2 initiator - pre shared key auth """
1473 def config_tc(self):
1474 self.config_params({
1475 'is_initiator': False, # seen from test case perspective
1476 # thus vpp is initiator
1477 'responder': {'sw_if_index': self.pg0.sw_if_index,
1478 'addr': self.pg0.remote_ip4},
1479 'ike-crypto': ('AES-GCM-16ICV', 32),
1480 'ike-integ': 'NULL',
1481 'ike-dh': '3072MODPgr',
1483 'crypto_alg': 20, # "aes-gcm-16"
1484 'crypto_key_size': 256,
1485 'dh_group': 15, # "modp-3072"
1488 'crypto_alg': 12, # "aes-cbc"
1489 'crypto_key_size': 256,
1490 # "hmac-sha2-256-128"
1494 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1495 """ test ikev2 initiator - delete IKE SA from responder """
1497 def config_tc(self):
1498 self.config_params({
1499 'del_sa_from_responder': True,
1500 'is_initiator': False, # seen from test case perspective
1501 # thus vpp is initiator
1502 'responder': {'sw_if_index': self.pg0.sw_if_index,
1503 'addr': self.pg0.remote_ip4},
1504 'ike-crypto': ('AES-GCM-16ICV', 32),
1505 'ike-integ': 'NULL',
1506 'ike-dh': '3072MODPgr',
1508 'crypto_alg': 20, # "aes-gcm-16"
1509 'crypto_key_size': 256,
1510 'dh_group': 15, # "modp-3072"
1513 'crypto_alg': 12, # "aes-cbc"
1514 'crypto_key_size': 256,
1515 # "hmac-sha2-256-128"
1519 class TestResponderNATT(TemplateResponder, Ikev2Params):
1520 """ test ikev2 responder - nat traversal """
1521 def config_tc(self):
1526 class TestResponderPsk(TemplateResponder, Ikev2Params):
1527 """ test ikev2 responder - pre shared key auth """
1528 def config_tc(self):
1529 self.config_params()
1532 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1533 """ test ikev2 responder - cert based auth """
1534 def config_tc(self):
1535 self.config_params({
1537 'server-key': 'server-key.pem',
1538 'client-key': 'client-key.pem',
1539 'client-cert': 'client-cert.pem',
1540 'server-cert': 'server-cert.pem'})
1543 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1544 (TemplateResponder, Ikev2Params):
1546 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1548 def config_tc(self):
1549 self.config_params({
1550 'ike-crypto': ('AES-CBC', 16),
1551 'ike-integ': 'SHA2-256-128',
1552 'esp-crypto': ('AES-CBC', 24),
1553 'esp-integ': 'SHA2-384-192',
1554 'ike-dh': '2048MODPgr'})
1557 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1558 (TemplateResponder, Ikev2Params):
1560 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1562 def config_tc(self):
1563 self.config_params({
1564 'ike-crypto': ('AES-CBC', 32),
1565 'ike-integ': 'SHA2-256-128',
1566 'esp-crypto': ('AES-GCM-16ICV', 32),
1567 'esp-integ': 'NULL',
1568 'ike-dh': '3072MODPgr'})
1571 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1575 def config_tc(self):
1576 self.config_params({
1577 'del_sa_from_responder': True,
1580 'ike-crypto': ('AES-GCM-16ICV', 32),
1581 'ike-integ': 'NULL',
1582 'ike-dh': '2048MODPgr',
1583 'loc_ts': {'start_addr': 'ab:cd::0',
1584 'end_addr': 'ab:cd::10'},
1585 'rem_ts': {'start_addr': '11::0',
1586 'end_addr': '11::100'}})
1589 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1590 """ malformed packet test """
1595 def config_tc(self):
1596 self.config_params()
1598 def assert_counter(self, count, name, version='ip4'):
1599 node_name = '/err/ikev2-%s/' % version + name
1600 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1602 def create_ike_init_msg(self, length=None, payload=None):
1603 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1604 flags='Initiator', exch_type='IKE_SA_INIT')
1605 if payload is not None:
1607 return self.create_packet(self.pg0, msg, self.sa.sport,
1610 def verify_bad_packet_length(self):
1611 ike_msg = self.create_ike_init_msg(length=0xdead)
1612 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1613 self.assert_counter(self.pkt_count, 'Bad packet length')
1615 def verify_bad_sa_payload_length(self):
1616 p = ikev2.IKEv2_payload_SA(length=0xdead)
1617 ike_msg = self.create_ike_init_msg(payload=p)
1618 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1619 self.assert_counter(self.pkt_count, 'Malformed packet')
1621 def test_responder(self):
1622 self.pkt_count = 254
1623 self.verify_bad_packet_length()
1624 self.verify_bad_sa_payload_length()
1627 if __name__ == '__main__':
1628 unittest.main(testRunner=VppTestRunner)