2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
12 from ipaddress import IPv4Address, IPv6Address, ip_address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.layers.inet6 import IPv6
16 from scapy.packet import raw, Raw
17 from scapy.utils import long_converter
18 from framework import VppTestCase, VppTestRunner
19 from vpp_ikev2 import Profile, IDType, AuthMethod
20 from vpp_papi import VppEnum
27 KEY_PAD = b"Key Pad for IKEv2"
34 # tuple structure is (p, g, key_len)
36 '2048MODPgr': (long_converter("""
37 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
38 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
39 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
40 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
41 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
42 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
43 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
44 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
45 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
46 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
47 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
49 '3072MODPgr': (long_converter("""
50 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
51 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
52 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
53 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
54 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
55 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
56 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
57 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
58 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
59 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
60 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
61 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
62 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
63 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
64 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
65 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
69 class CryptoAlgo(object):
70 def __init__(self, name, cipher, mode):
74 if self.cipher is not None:
75 self.bs = self.cipher.block_size // 8
77 if self.name == 'AES-GCM-16ICV':
78 self.iv_len = GCM_IV_SIZE
82 def encrypt(self, data, key, aad=None):
83 iv = os.urandom(self.iv_len)
85 encryptor = Cipher(self.cipher(key), self.mode(iv),
86 default_backend()).encryptor()
87 return iv + encryptor.update(data) + encryptor.finalize()
89 salt = key[-SALT_SIZE:]
91 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
92 default_backend()).encryptor()
93 encryptor.authenticate_additional_data(aad)
94 data = encryptor.update(data) + encryptor.finalize()
95 data += encryptor.tag[:GCM_ICV_SIZE]
98 def decrypt(self, data, key, aad=None, icv=None):
100 iv = data[:self.iv_len]
101 ct = data[self.iv_len:]
102 decryptor = Cipher(algorithms.AES(key),
104 default_backend()).decryptor()
105 return decryptor.update(ct) + decryptor.finalize()
107 salt = key[-SALT_SIZE:]
108 nonce = salt + data[:GCM_IV_SIZE]
109 ct = data[GCM_IV_SIZE:]
110 key = key[:-SALT_SIZE]
111 decryptor = Cipher(algorithms.AES(key),
112 self.mode(nonce, icv, len(icv)),
113 default_backend()).decryptor()
114 decryptor.authenticate_additional_data(aad)
115 pt = decryptor.update(ct) + decryptor.finalize()
120 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121 data = data + b'\x00' * (pad_len - 1)
122 return data + bytes([pad_len - 1])
125 class AuthAlgo(object):
126 def __init__(self, name, mac, mod, key_len, trunc_len=None):
130 self.key_len = key_len
131 self.trunc_len = trunc_len or key_len
135 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
142 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
150 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
156 class IKEv2ChildSA(object):
157 def __init__(self, local_ts, remote_ts, spi=None):
158 self.spi = spi or os.urandom(4)
159 self.local_ts = local_ts
160 self.remote_ts = remote_ts
163 class IKEv2SA(object):
164 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
165 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
166 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
167 auth_method='shared-key', priv_key=None, natt=False):
176 self.dh_params = None
178 self.priv_key = priv_key
179 self.is_initiator = is_initiator
180 nonce = nonce or os.urandom(32)
181 self.auth_data = auth_data
184 if isinstance(id_type, str):
185 self.id_type = IDType.value(id_type)
187 self.id_type = id_type
188 self.auth_method = auth_method
189 if self.is_initiator:
190 self.rspi = 8 * b'\x00'
195 self.ispi = 8 * b'\x00'
197 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
199 def new_msg_id(self):
204 def my_dh_pub_key(self):
205 if self.is_initiator:
206 return self.i_dh_data
207 return self.r_dh_data
210 def peer_dh_pub_key(self):
211 if self.is_initiator:
212 return self.r_dh_data
213 return self.i_dh_data
215 def compute_secret(self):
216 priv = self.dh_private_key
217 peer = self.peer_dh_pub_key
218 p, g, l = self.ike_group
219 return pow(int.from_bytes(peer, 'big'),
220 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
222 def generate_dh_data(self):
224 if self.ike_dh not in DH:
225 raise NotImplementedError('%s not in DH group' % self.ike_dh)
227 if self.dh_params is None:
228 dhg = DH[self.ike_dh]
229 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
230 self.dh_params = pn.parameters(default_backend())
232 priv = self.dh_params.generate_private_key()
233 pub = priv.public_key()
234 x = priv.private_numbers().x
235 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
236 y = pub.public_numbers().y
238 if self.is_initiator:
239 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
241 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
243 def complete_dh_data(self):
244 self.dh_shared_secret = self.compute_secret()
246 def calc_child_keys(self):
247 prf = self.ike_prf_alg.mod()
248 s = self.i_nonce + self.r_nonce
249 c = self.child_sas[0]
251 encr_key_len = self.esp_crypto_key_len
252 integ_key_len = self.esp_integ_alg.key_len
253 salt_len = 0 if integ_key_len else 4
255 l = (integ_key_len * 2 +
258 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
261 c.sk_ei = keymat[pos:pos+encr_key_len]
265 c.sk_ai = keymat[pos:pos+integ_key_len]
268 c.salt_ei = keymat[pos:pos+salt_len]
271 c.sk_er = keymat[pos:pos+encr_key_len]
275 c.sk_ar = keymat[pos:pos+integ_key_len]
278 c.salt_er = keymat[pos:pos+salt_len]
281 def calc_prfplus(self, prf, key, seed, length):
285 while len(r) < length and x < 255:
290 s = s + seed + bytes([x])
291 t = self.calc_prf(prf, key, s)
299 def calc_prf(self, prf, key, data):
300 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
305 prf = self.ike_prf_alg.mod()
306 # SKEYSEED = prf(Ni | Nr, g^ir)
307 s = self.i_nonce + self.r_nonce
308 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
310 # calculate S = Ni | Nr | SPIi SPIr
311 s = s + self.ispi + self.rspi
313 prf_key_trunc = self.ike_prf_alg.trunc_len
314 encr_key_len = self.ike_crypto_key_len
315 tr_prf_key_len = self.ike_prf_alg.key_len
316 integ_key_len = self.ike_integ_alg.key_len
317 if integ_key_len == 0:
327 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
330 self.sk_d = keymat[:pos+prf_key_trunc]
333 self.sk_ai = keymat[pos:pos+integ_key_len]
335 self.sk_ar = keymat[pos:pos+integ_key_len]
338 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
339 pos += encr_key_len + salt_size
340 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
341 pos += encr_key_len + salt_size
343 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
344 pos += tr_prf_key_len
345 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
347 def generate_authmsg(self, prf, packet):
348 if self.is_initiator:
356 data = bytes([self.id_type, 0, 0, 0]) + id
357 id_hash = self.calc_prf(prf, key, data)
358 return packet + nonce + id_hash
361 prf = self.ike_prf_alg.mod()
362 if self.is_initiator:
363 packet = self.init_req_packet
365 packet = self.init_resp_packet
366 authmsg = self.generate_authmsg(prf, raw(packet))
367 if self.auth_method == 'shared-key':
368 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
369 self.auth_data = self.calc_prf(prf, psk, authmsg)
370 elif self.auth_method == 'rsa-sig':
371 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
374 raise TypeError('unknown auth method type!')
376 def encrypt(self, data, aad=None):
377 data = self.ike_crypto_alg.pad(data)
378 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
381 def peer_authkey(self):
382 if self.is_initiator:
387 def my_authkey(self):
388 if self.is_initiator:
393 def my_cryptokey(self):
394 if self.is_initiator:
399 def peer_cryptokey(self):
400 if self.is_initiator:
404 def concat(self, alg, key_len):
405 return alg + '-' + str(key_len * 8)
408 def vpp_ike_cypto_alg(self):
409 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
412 def vpp_esp_cypto_alg(self):
413 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
415 def verify_hmac(self, ikemsg):
416 integ_trunc = self.ike_integ_alg.trunc_len
417 exp_hmac = ikemsg[-integ_trunc:]
418 data = ikemsg[:-integ_trunc]
419 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
420 self.peer_authkey, data)
421 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
423 def compute_hmac(self, integ, key, data):
424 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
428 def decrypt(self, data, aad=None, icv=None):
429 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
431 def hmac_and_decrypt(self, ike):
432 ep = ike[ikev2.IKEv2_payload_Encrypted]
433 if self.ike_crypto == 'AES-GCM-16ICV':
434 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
435 ct = ep.load[:-GCM_ICV_SIZE]
436 tag = ep.load[-GCM_ICV_SIZE:]
437 return self.decrypt(ct, raw(ike)[:aad_len], tag)
439 self.verify_hmac(raw(ike))
440 integ_trunc = self.ike_integ_alg.trunc_len
442 # remove ICV and decrypt payload
443 ct = ep.load[:-integ_trunc]
444 return self.decrypt(ct)
446 def build_ts_addr(self, ts, version):
447 return {'starting_address_v' + version: ts['start_addr'],
448 'ending_address_v' + version: ts['end_addr']}
450 def generate_ts(self, is_ip4):
451 c = self.child_sas[0]
452 ts_data = {'IP_protocol_ID': 0,
456 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
457 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
458 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
459 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
461 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
462 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
463 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
464 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
466 if self.is_initiator:
467 return ([ts1], [ts2])
468 return ([ts2], [ts1])
470 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
471 if crypto not in CRYPTO_ALGOS:
472 raise TypeError('unsupported encryption algo %r' % crypto)
473 self.ike_crypto = crypto
474 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
475 self.ike_crypto_key_len = crypto_key_len
477 if integ not in AUTH_ALGOS:
478 raise TypeError('unsupported auth algo %r' % integ)
479 self.ike_integ = None if integ == 'NULL' else integ
480 self.ike_integ_alg = AUTH_ALGOS[integ]
482 if prf not in PRF_ALGOS:
483 raise TypeError('unsupported prf algo %r' % prf)
485 self.ike_prf_alg = PRF_ALGOS[prf]
487 self.ike_group = DH[self.ike_dh]
489 def set_esp_props(self, crypto, crypto_key_len, integ):
490 self.esp_crypto_key_len = crypto_key_len
491 if crypto not in CRYPTO_ALGOS:
492 raise TypeError('unsupported encryption algo %r' % crypto)
493 self.esp_crypto = crypto
494 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
496 if integ not in AUTH_ALGOS:
497 raise TypeError('unsupported auth algo %r' % integ)
498 self.esp_integ = None if integ == 'NULL' else integ
499 self.esp_integ_alg = AUTH_ALGOS[integ]
501 def crypto_attr(self, key_len):
502 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
503 return (0x800e << 16 | key_len << 3, 12)
505 raise Exception('unsupported attribute type')
507 def ike_crypto_attr(self):
508 return self.crypto_attr(self.ike_crypto_key_len)
510 def esp_crypto_attr(self):
511 return self.crypto_attr(self.esp_crypto_key_len)
513 def compute_nat_sha1(self, ip, port):
514 data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
515 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
517 return digest.finalize()
520 class IkePeer(VppTestCase):
521 """ common class for initiator and responder """
525 import scapy.contrib.ikev2 as _ikev2
526 globals()['ikev2'] = _ikev2
527 super(IkePeer, cls).setUpClass()
528 cls.create_pg_interfaces(range(2))
529 for i in cls.pg_interfaces:
537 def tearDownClass(cls):
538 super(IkePeer, cls).tearDownClass()
541 super(IkePeer, self).setUp()
543 self.p.add_vpp_config()
544 self.assertIsNotNone(self.p.query_vpp_config())
545 self.sa.generate_dh_data()
546 self.vapi.cli('ikev2 set logging level 4')
547 self.vapi.cli('event-lo clear')
549 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
552 src_ip = src_if.remote_ip6
553 dst_ip = src_if.local_ip6
556 src_ip = src_if.remote_ip4
557 dst_ip = src_if.local_ip4
559 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
560 ip_layer(src=src_ip, dst=dst_ip) /
561 UDP(sport=sport, dport=dport))
563 # insert non ESP marker
564 res = res / Raw(b'\x00' * 4)
567 def verify_udp(self, udp):
568 self.assertEqual(udp.sport, self.sa.sport)
569 self.assertEqual(udp.dport, self.sa.dport)
571 def get_ike_header(self, packet):
573 ih = packet[ikev2.IKEv2]
574 except IndexError as e:
575 # this is a workaround for getting IKEv2 layer as both ikev2 and
576 # ipsec register for port 4500
578 ih = self.verify_and_remove_non_esp_marker(esp)
579 self.assertEqual(ih.version, 0x20)
582 def verify_and_remove_non_esp_marker(self, packet):
584 # if we are in nat traversal mode check for non esp marker
587 self.assertEqual(data[:4], b'\x00' * 4)
588 return ikev2.IKEv2(data[4:])
592 def encrypt_ike_msg(self, header, plain, first_payload):
593 if self.sa.ike_crypto == 'AES-GCM-16ICV':
594 data = self.sa.ike_crypto_alg.pad(raw(plain))
595 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
596 len(ikev2.IKEv2_payload_Encrypted())
597 tlen = plen + len(ikev2.IKEv2())
600 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
604 encr = self.sa.encrypt(raw(plain), raw(res))
605 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
606 length=plen, load=encr)
609 encr = self.sa.encrypt(raw(plain))
610 trunc_len = self.sa.ike_integ_alg.trunc_len
611 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
612 tlen = plen + len(ikev2.IKEv2())
614 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
615 length=plen, load=encr)
619 integ_data = raw(res)
620 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
621 self.sa.my_authkey, integ_data)
622 res = res / Raw(hmac_data[:trunc_len])
623 assert(len(res) == tlen)
626 def verify_ipsec_sas(self):
627 sas = self.vapi.ipsec_sa_dump()
628 self.assertEqual(len(sas), 2)
629 e = VppEnum.vl_api_ipsec_sad_flags_t
630 if self.sa.is_initiator:
637 c = self.sa.child_sas[0]
639 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
640 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
641 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
643 if self.sa.esp_integ is None:
646 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
647 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
648 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
651 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
652 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
653 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
654 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
658 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
659 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
660 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
661 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
663 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
664 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
666 def verify_keymat(self, api_keys, keys, name):
667 km = getattr(keys, name)
668 api_km = getattr(api_keys, name)
669 api_km_len = getattr(api_keys, name + '_len')
670 self.assertEqual(len(km), api_km_len)
671 self.assertEqual(km, api_km[:api_km_len])
673 def verify_id(self, api_id, exp_id):
674 self.assertEqual(api_id.type, IDType.value(exp_id.type))
675 self.assertEqual(api_id.data_len, exp_id.data_len)
676 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
678 def verify_ike_sas(self):
679 r = self.vapi.ikev2_sa_dump()
680 self.assertEqual(len(r), 1)
682 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
683 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
685 if self.sa.is_initiator:
686 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
687 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
689 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
690 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
692 if self.sa.is_initiator:
693 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
694 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
696 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
697 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
698 self.verify_keymat(sa.keys, self.sa, 'sk_d')
699 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
700 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
701 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
702 self.verify_keymat(sa.keys, self.sa, 'sk_er')
703 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
704 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
706 self.assertEqual(sa.i_id.type, self.sa.id_type)
707 self.assertEqual(sa.r_id.type, self.sa.id_type)
708 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
709 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
710 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
711 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
713 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
714 self.assertEqual(len(r), 1)
716 self.assertEqual(csa.sa_index, sa.sa_index)
717 c = self.sa.child_sas[0]
718 if hasattr(c, 'sk_ai'):
719 self.verify_keymat(csa.keys, c, 'sk_ai')
720 self.verify_keymat(csa.keys, c, 'sk_ar')
721 self.verify_keymat(csa.keys, c, 'sk_ei')
722 self.verify_keymat(csa.keys, c, 'sk_er')
724 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
727 r = self.vapi.ikev2_traffic_selector_dump(
728 is_initiator=True, sa_index=sa.sa_index,
729 child_sa_index=csa.child_sa_index)
730 self.assertEqual(len(r), 1)
732 self.verify_ts(r[0].ts, tsi[0], True)
734 r = self.vapi.ikev2_traffic_selector_dump(
735 is_initiator=False, sa_index=sa.sa_index,
736 child_sa_index=csa.child_sa_index)
737 self.assertEqual(len(r), 1)
738 self.verify_ts(r[0].ts, tsr[0], False)
740 n = self.vapi.ikev2_nonce_get(is_initiator=True,
741 sa_index=sa.sa_index)
742 self.verify_nonce(n, self.sa.i_nonce)
743 n = self.vapi.ikev2_nonce_get(is_initiator=False,
744 sa_index=sa.sa_index)
745 self.verify_nonce(n, self.sa.r_nonce)
747 def verify_nonce(self, api_nonce, nonce):
748 self.assertEqual(api_nonce.data_len, len(nonce))
749 self.assertEqual(api_nonce.nonce, nonce)
751 def verify_ts(self, api_ts, ts, is_initiator):
753 self.assertTrue(api_ts.is_local)
755 self.assertFalse(api_ts.is_local)
758 self.assertEqual(api_ts.start_addr,
759 IPv4Address(ts.starting_address_v4))
760 self.assertEqual(api_ts.end_addr,
761 IPv4Address(ts.ending_address_v4))
763 self.assertEqual(api_ts.start_addr,
764 IPv6Address(ts.starting_address_v6))
765 self.assertEqual(api_ts.end_addr,
766 IPv6Address(ts.ending_address_v6))
767 self.assertEqual(api_ts.start_port, ts.start_port)
768 self.assertEqual(api_ts.end_port, ts.end_port)
769 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
772 class TemplateInitiator(IkePeer):
773 """ initiator test template """
776 super(TemplateInitiator, self).tearDown()
778 def verify_sa_init_request(self, packet):
779 ih = packet[ikev2.IKEv2]
780 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
781 self.assertEqual(ih.exch_type, 34) # SA_INIT
782 self.sa.ispi = ih.init_SPI
783 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
784 self.assertIn('Initiator', ih.flags)
785 self.assertNotIn('Response', ih.flags)
786 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
787 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
789 prop = packet[ikev2.IKEv2_payload_Proposal]
790 self.assertEqual(prop.proto, 1) # proto = ikev2
791 self.assertEqual(prop.proposal, 1)
792 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
793 self.assertEqual(prop.trans[0].transform_id,
794 self.p.ike_transforms['crypto_alg'])
795 self.assertEqual(prop.trans[1].transform_type, 2) # prf
796 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
797 self.assertEqual(prop.trans[2].transform_type, 4) # dh
798 self.assertEqual(prop.trans[2].transform_id,
799 self.p.ike_transforms['dh_group'])
801 self.sa.complete_dh_data()
804 def verify_sa_auth_req(self, packet):
805 ih = self.get_ike_header(packet)
806 self.assertEqual(ih.resp_SPI, self.sa.rspi)
807 self.assertEqual(ih.init_SPI, self.sa.ispi)
808 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
809 self.assertIn('Initiator', ih.flags)
810 self.assertNotIn('Response', ih.flags)
814 self.assertEqual(ih.id, self.sa.msg_id + 1)
816 plain = self.sa.hmac_and_decrypt(ih)
817 idi = ikev2.IKEv2_payload_IDi(plain)
818 idr = ikev2.IKEv2_payload_IDr(idi.payload)
819 self.assertEqual(idi.load, self.sa.i_id)
820 self.assertEqual(idr.load, self.sa.r_id)
822 def send_init_response(self):
823 tr_attr = self.sa.ike_crypto_attr()
824 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
825 transform_id=self.sa.ike_crypto, length=tr_attr[1],
826 key_length=tr_attr[0]) /
827 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
828 transform_id=self.sa.ike_integ) /
829 ikev2.IKEv2_payload_Transform(transform_type='PRF',
830 transform_id=self.sa.ike_prf_alg.name) /
831 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
832 transform_id=self.sa.ike_dh))
833 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
834 trans_nb=4, trans=trans))
835 self.sa.init_resp_packet = (
836 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
837 exch_type='IKE_SA_INIT', flags='Response') /
838 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
839 ikev2.IKEv2_payload_KE(next_payload='Nonce',
840 group=self.sa.ike_dh,
841 load=self.sa.my_dh_pub_key) /
842 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
844 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
845 self.sa.sport, self.sa.dport,
846 self.sa.natt, self.ip6)
847 self.pg_send(self.pg0, ike_msg)
848 capture = self.pg0.get_capture(1)
849 self.verify_sa_auth_req(capture[0])
851 def initiate_sa_init(self):
852 self.pg0.enable_capture()
854 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
856 capture = self.pg0.get_capture(1)
857 self.verify_sa_init_request(capture[0])
858 self.send_init_response()
860 def send_auth_response(self):
861 tr_attr = self.sa.esp_crypto_attr()
862 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
863 transform_id=self.sa.esp_crypto, length=tr_attr[1],
864 key_length=tr_attr[0]) /
865 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
866 transform_id=self.sa.esp_integ) /
867 ikev2.IKEv2_payload_Transform(
868 transform_type='Extended Sequence Number',
869 transform_id='No ESN') /
870 ikev2.IKEv2_payload_Transform(
871 transform_type='Extended Sequence Number',
874 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
875 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
877 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
878 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
879 IDtype=self.sa.id_type, load=self.sa.i_id) /
880 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
881 IDtype=self.sa.id_type, load=self.sa.r_id) /
882 ikev2.IKEv2_payload_AUTH(next_payload='SA',
883 auth_type=AuthMethod.value(self.sa.auth_method),
884 load=self.sa.auth_data) /
885 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
886 ikev2.IKEv2_payload_TSi(next_payload='TSr',
887 number_of_TSs=len(tsi),
888 traffic_selector=tsi) /
889 ikev2.IKEv2_payload_TSr(next_payload='Notify',
890 number_of_TSs=len(tsr),
891 traffic_selector=tsr) /
892 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
894 header = ikev2.IKEv2(
895 init_SPI=self.sa.ispi,
896 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
897 flags='Response', exch_type='IKE_AUTH')
899 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
900 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
901 self.sa.dport, self.sa.natt, self.ip6)
902 self.pg_send(self.pg0, packet)
904 def test_initiator(self):
905 self.initiate_sa_init()
907 self.sa.calc_child_keys()
908 self.send_auth_response()
909 self.verify_ike_sas()
912 class TemplateResponder(IkePeer):
913 """ responder test template """
916 super(TemplateResponder, self).tearDown()
917 if self.sa.is_initiator:
918 self.initiate_del_sa()
919 r = self.vapi.ikev2_sa_dump()
920 self.assertEqual(len(r), 0)
922 self.p.remove_vpp_config()
923 self.assertIsNone(self.p.query_vpp_config())
925 def verify_del_sa(self, packet):
926 ih = self.get_ike_header(packet)
927 self.assertEqual(ih.id, self.sa.msg_id)
928 self.assertEqual(ih.exch_type, 37) # exchange informational
930 def initiate_del_sa(self):
931 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
932 flags='Initiator', exch_type='INFORMATIONAL',
933 id=self.sa.new_msg_id())
934 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
935 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
936 packet = self.create_packet(self.pg0, ike_msg,
937 self.sa.sport, self.sa.dport,
938 self.sa.natt, self.ip6)
939 self.pg0.add_stream(packet)
940 self.pg0.enable_capture()
942 capture = self.pg0.get_capture(1)
943 self.verify_del_sa(capture[0])
945 def send_sa_init_req(self, behind_nat=False):
946 tr_attr = self.sa.ike_crypto_attr()
947 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
948 transform_id=self.sa.ike_crypto, length=tr_attr[1],
949 key_length=tr_attr[0]) /
950 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
951 transform_id=self.sa.ike_integ) /
952 ikev2.IKEv2_payload_Transform(transform_type='PRF',
953 transform_id=self.sa.ike_prf_alg.name) /
954 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
955 transform_id=self.sa.ike_dh))
957 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
958 trans_nb=4, trans=trans))
961 next_payload = 'Notify'
965 self.sa.init_req_packet = (
966 ikev2.IKEv2(init_SPI=self.sa.ispi,
967 flags='Initiator', exch_type='IKE_SA_INIT') /
968 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
969 ikev2.IKEv2_payload_KE(next_payload='Nonce',
970 group=self.sa.ike_dh,
971 load=self.sa.my_dh_pub_key) /
972 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
973 load=self.sa.i_nonce))
976 src_address = b'\x0a\x0a\x0a\x01'
978 src_address = bytes(self.pg0.local_ip4, 'ascii')
980 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
981 dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
983 nat_src_detection = ikev2.IKEv2_payload_Notify(
984 type='NAT_DETECTION_SOURCE_IP', load=src_nat)
985 nat_dst_detection = ikev2.IKEv2_payload_Notify(
986 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
987 self.sa.init_req_packet = (self.sa.init_req_packet /
991 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
992 self.sa.sport, self.sa.dport,
993 self.sa.natt, self.ip6)
994 self.pg0.add_stream(ike_msg)
995 self.pg0.enable_capture()
997 capture = self.pg0.get_capture(1)
998 self.verify_sa_init(capture[0])
1000 def send_sa_auth(self):
1001 tr_attr = self.sa.esp_crypto_attr()
1002 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1003 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1004 key_length=tr_attr[0]) /
1005 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1006 transform_id=self.sa.esp_integ) /
1007 ikev2.IKEv2_payload_Transform(
1008 transform_type='Extended Sequence Number',
1009 transform_id='No ESN') /
1010 ikev2.IKEv2_payload_Transform(
1011 transform_type='Extended Sequence Number',
1012 transform_id='ESN'))
1014 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1015 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
1017 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1018 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1019 IDtype=self.sa.id_type, load=self.sa.i_id) /
1020 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1021 IDtype=self.sa.id_type, load=self.sa.r_id) /
1022 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1023 auth_type=AuthMethod.value(self.sa.auth_method),
1024 load=self.sa.auth_data) /
1025 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1026 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1027 number_of_TSs=len(tsi),
1028 traffic_selector=tsi) /
1029 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1030 number_of_TSs=len(tsr),
1031 traffic_selector=tsr) /
1032 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1034 header = ikev2.IKEv2(
1035 init_SPI=self.sa.ispi,
1036 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1037 flags='Initiator', exch_type='IKE_AUTH')
1039 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1040 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1041 self.sa.dport, self.sa.natt, self.ip6)
1042 self.pg0.add_stream(packet)
1043 self.pg0.enable_capture()
1045 capture = self.pg0.get_capture(1)
1046 self.verify_sa_auth_resp(capture[0])
1048 def verify_sa_init(self, packet):
1049 ih = self.get_ike_header(packet)
1051 self.assertEqual(ih.id, self.sa.msg_id)
1052 self.assertEqual(ih.exch_type, 34)
1053 self.assertTrue('Response' in ih.flags)
1054 self.assertEqual(ih.init_SPI, self.sa.ispi)
1055 self.assertNotEqual(ih.resp_SPI, 0)
1056 self.sa.rspi = ih.resp_SPI
1058 sa = ih[ikev2.IKEv2_payload_SA]
1059 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1060 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1061 except IndexError as e:
1062 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1063 self.logger.error(ih.show())
1065 self.sa.complete_dh_data()
1069 def verify_sa_auth_resp(self, packet):
1070 ike = self.get_ike_header(packet)
1072 self.verify_udp(udp)
1073 self.assertEqual(ike.id, self.sa.msg_id)
1074 plain = self.sa.hmac_and_decrypt(ike)
1075 self.sa.calc_child_keys()
1077 def test_responder(self):
1078 self.send_sa_init_req(self.sa.natt)
1080 self.verify_ipsec_sas()
1081 self.verify_ike_sas()
1084 class Ikev2Params(object):
1085 def config_params(self, params={}):
1086 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1087 ei = VppEnum.vl_api_ipsec_integ_alg_t
1089 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1090 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1091 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1092 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1093 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1094 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1096 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1097 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1098 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1099 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1101 is_natt = 'natt' in params and params['natt'] or False
1102 self.p = Profile(self, 'pr1')
1103 self.ip6 = False if 'ip6' not in params else params['ip6']
1105 if 'auth' in params and params['auth'] == 'rsa-sig':
1106 auth_method = 'rsa-sig'
1107 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1108 self.vapi.ikev2_set_local_key(
1109 key_file=work_dir + params['server-key'])
1111 client_file = work_dir + params['client-cert']
1112 server_pem = open(work_dir + params['server-cert']).read()
1113 client_priv = open(work_dir + params['client-key']).read()
1114 client_priv = load_pem_private_key(str.encode(client_priv), None,
1116 self.peer_cert = x509.load_pem_x509_certificate(
1117 str.encode(server_pem),
1119 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1122 auth_data = b'$3cr3tpa$$w0rd'
1123 self.p.add_auth(method='shared-key', data=auth_data)
1124 auth_method = 'shared-key'
1127 is_init = True if 'is_initiator' not in params else\
1128 params['is_initiator']
1130 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1131 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1133 self.p.add_local_id(**idr)
1134 self.p.add_remote_id(**idi)
1136 self.p.add_local_id(**idi)
1137 self.p.add_remote_id(**idr)
1139 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1140 'loc_ts' not in params else params['loc_ts']
1141 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1142 'rem_ts' not in params else params['rem_ts']
1143 self.p.add_local_ts(**loc_ts)
1144 self.p.add_remote_ts(**rem_ts)
1145 if 'responder' in params:
1146 self.p.add_responder(params['responder'])
1147 if 'ike_transforms' in params:
1148 self.p.add_ike_transforms(params['ike_transforms'])
1149 if 'esp_transforms' in params:
1150 self.p.add_esp_transforms(params['esp_transforms'])
1152 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1153 is_initiator=is_init,
1154 id_type=self.p.local_id['id_type'], natt=is_natt,
1155 priv_key=client_priv, auth_method=auth_method,
1156 auth_data=auth_data,
1157 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1159 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1160 params['ike-crypto']
1161 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1163 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1166 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1167 params['esp-crypto']
1168 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1171 self.sa.set_ike_props(
1172 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1173 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1174 self.sa.set_esp_props(
1175 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1179 class TestApi(VppTestCase):
1180 """ Test IKEV2 API """
1182 def setUpClass(cls):
1183 super(TestApi, cls).setUpClass()
1186 def tearDownClass(cls):
1187 super(TestApi, cls).tearDownClass()
1190 super(TestApi, self).tearDown()
1191 self.p1.remove_vpp_config()
1192 self.p2.remove_vpp_config()
1193 r = self.vapi.ikev2_profile_dump()
1194 self.assertEqual(len(r), 0)
1196 def configure_profile(self, cfg):
1197 p = Profile(self, cfg['name'])
1198 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1199 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1200 p.add_local_ts(**cfg['loc_ts'])
1201 p.add_remote_ts(**cfg['rem_ts'])
1202 p.add_responder(cfg['responder'])
1203 p.add_ike_transforms(cfg['ike_ts'])
1204 p.add_esp_transforms(cfg['esp_ts'])
1205 p.add_auth(**cfg['auth'])
1206 p.set_udp_encap(cfg['udp_encap'])
1207 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1208 if 'lifetime_data' in cfg:
1209 p.set_lifetime_data(cfg['lifetime_data'])
1210 if 'tun_itf' in cfg:
1211 p.set_tunnel_interface(cfg['tun_itf'])
1215 def test_profile_api(self):
1216 """ test profile dump API """
1221 'start_addr': '3.3.3.2',
1222 'end_addr': '3.3.3.3',
1228 'start_addr': '4.5.76.80',
1229 'end_addr': '2.3.4.6',
1236 'start_addr': 'ab::1',
1237 'end_addr': 'ab::4',
1243 'start_addr': 'cd::12',
1244 'end_addr': 'cd::13',
1250 'loc_id': ('fqdn', b'vpp.home'),
1251 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1254 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1257 'crypto_key_size': 32,
1262 'crypto_key_size': 24,
1264 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1266 'ipsec_over_udp_port': 4501,
1269 'lifetime_maxdata': 20192,
1270 'lifetime_jitter': 9,
1275 'loc_id': ('ip4-addr', b'192.168.2.1'),
1276 'rem_id': ('ip6-addr', b'abcd::1'),
1279 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1282 'crypto_key_size': 16,
1287 'crypto_key_size': 24,
1289 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1291 'ipsec_over_udp_port': 4600,
1294 self.p1 = self.configure_profile(conf['p1'])
1295 self.p2 = self.configure_profile(conf['p2'])
1297 r = self.vapi.ikev2_profile_dump()
1298 self.assertEqual(len(r), 2)
1299 self.verify_profile(r[0].profile, conf['p1'])
1300 self.verify_profile(r[1].profile, conf['p2'])
1302 def verify_id(self, api_id, cfg_id):
1303 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1304 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1306 def verify_ts(self, api_ts, cfg_ts):
1307 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1308 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1309 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1310 self.assertEqual(api_ts.start_addr,
1311 ip_address(text_type(cfg_ts['start_addr'])))
1312 self.assertEqual(api_ts.end_addr,
1313 ip_address(text_type(cfg_ts['end_addr'])))
1315 def verify_responder(self, api_r, cfg_r):
1316 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1317 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1319 def verify_transforms(self, api_ts, cfg_ts):
1320 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1321 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1322 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1324 def verify_ike_transforms(self, api_ts, cfg_ts):
1325 self.verify_transforms(api_ts, cfg_ts)
1326 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1328 def verify_esp_transforms(self, api_ts, cfg_ts):
1329 self.verify_transforms(api_ts, cfg_ts)
1331 def verify_auth(self, api_auth, cfg_auth):
1332 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1333 self.assertEqual(api_auth.data, cfg_auth['data'])
1334 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1336 def verify_lifetime_data(self, p, ld):
1337 self.assertEqual(p.lifetime, ld['lifetime'])
1338 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1339 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1340 self.assertEqual(p.handover, ld['handover'])
1342 def verify_profile(self, ap, cp):
1343 self.assertEqual(ap.name, cp['name'])
1344 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1345 self.verify_id(ap.loc_id, cp['loc_id'])
1346 self.verify_id(ap.rem_id, cp['rem_id'])
1347 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1348 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1349 self.verify_responder(ap.responder, cp['responder'])
1350 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1351 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1352 self.verify_auth(ap.auth, cp['auth'])
1353 if 'lifetime_data' in cp:
1354 self.verify_lifetime_data(ap, cp['lifetime_data'])
1355 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1357 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1359 self.assertEqual(ap.tun_itf, 0xffffffff)
1362 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1363 """ test ikev2 initiator - pre shared key auth """
1364 def config_tc(self):
1365 self.config_params({
1366 'is_initiator': False, # seen from test case perspective
1367 # thus vpp is initiator
1368 'responder': {'sw_if_index': self.pg0.sw_if_index,
1369 'addr': self.pg0.remote_ip4},
1370 'ike-crypto': ('AES-GCM-16ICV', 32),
1371 'ike-integ': 'NULL',
1372 'ike-dh': '3072MODPgr',
1374 'crypto_alg': 20, # "aes-gcm-16"
1375 'crypto_key_size': 256,
1376 'dh_group': 15, # "modp-3072"
1379 'crypto_alg': 12, # "aes-cbc"
1380 'crypto_key_size': 256,
1381 # "hmac-sha2-256-128"
1385 class TestResponderNATT(TemplateResponder, Ikev2Params):
1386 """ test ikev2 responder - nat traversal """
1387 def config_tc(self):
1392 class TestResponderPsk(TemplateResponder, Ikev2Params):
1393 """ test ikev2 responder - pre shared key auth """
1394 def config_tc(self):
1395 self.config_params()
1398 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1399 """ test ikev2 responder - cert based auth """
1400 def config_tc(self):
1401 self.config_params({
1403 'server-key': 'server-key.pem',
1404 'client-key': 'client-key.pem',
1405 'client-cert': 'client-cert.pem',
1406 'server-cert': 'server-cert.pem'})
1409 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1410 (TemplateResponder, Ikev2Params):
1412 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1414 def config_tc(self):
1415 self.config_params({
1416 'ike-crypto': ('AES-CBC', 16),
1417 'ike-integ': 'SHA2-256-128',
1418 'esp-crypto': ('AES-CBC', 24),
1419 'esp-integ': 'SHA2-384-192',
1420 'ike-dh': '2048MODPgr'})
1423 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1424 (TemplateResponder, Ikev2Params):
1426 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1428 def config_tc(self):
1429 self.config_params({
1430 'ike-crypto': ('AES-CBC', 32),
1431 'ike-integ': 'SHA2-256-128',
1432 'esp-crypto': ('AES-GCM-16ICV', 32),
1433 'esp-integ': 'NULL',
1434 'ike-dh': '3072MODPgr'})
1437 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1441 def config_tc(self):
1442 self.config_params({
1445 'ike-crypto': ('AES-GCM-16ICV', 32),
1446 'ike-integ': 'NULL',
1447 'ike-dh': '2048MODPgr',
1448 'loc_ts': {'start_addr': 'ab:cd::0',
1449 'end_addr': 'ab:cd::10'},
1450 'rem_ts': {'start_addr': '11::0',
1451 'end_addr': '11::100'}})
1454 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1455 """ malformed packet test """
1460 def config_tc(self):
1461 self.config_params()
1463 def assert_counter(self, count, name, version='ip4'):
1464 node_name = '/err/ikev2-%s/' % version + name
1465 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1467 def create_ike_init_msg(self, length=None, payload=None):
1468 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1469 flags='Initiator', exch_type='IKE_SA_INIT')
1470 if payload is not None:
1472 return self.create_packet(self.pg0, msg, self.sa.sport,
1475 def verify_bad_packet_length(self):
1476 ike_msg = self.create_ike_init_msg(length=0xdead)
1477 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1478 self.assert_counter(self.pkt_count, 'Bad packet length')
1480 def verify_bad_sa_payload_length(self):
1481 p = ikev2.IKEv2_payload_SA(length=0xdead)
1482 ike_msg = self.create_ike_init_msg(payload=p)
1483 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1484 self.assert_counter(self.pkt_count, 'Malformed packet')
1486 def test_responder(self):
1487 self.pkt_count = 254
1488 self.verify_bad_packet_length()
1489 self.verify_bad_sa_payload_length()
1492 if __name__ == '__main__':
1493 unittest.main(testRunner=VppTestRunner)