2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
28 KEY_PAD = b"Key Pad for IKEv2"
35 # tuple structure is (p, g, key_len)
37 '2048MODPgr': (long_converter("""
38 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
50 '3072MODPgr': (long_converter("""
51 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
70 class CryptoAlgo(object):
71 def __init__(self, name, cipher, mode):
75 if self.cipher is not None:
76 self.bs = self.cipher.block_size // 8
78 if self.name == 'AES-GCM-16ICV':
79 self.iv_len = GCM_IV_SIZE
83 def encrypt(self, data, key, aad=None):
84 iv = os.urandom(self.iv_len)
86 encryptor = Cipher(self.cipher(key), self.mode(iv),
87 default_backend()).encryptor()
88 return iv + encryptor.update(data) + encryptor.finalize()
90 salt = key[-SALT_SIZE:]
92 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93 default_backend()).encryptor()
94 encryptor.authenticate_additional_data(aad)
95 data = encryptor.update(data) + encryptor.finalize()
96 data += encryptor.tag[:GCM_ICV_SIZE]
99 def decrypt(self, data, key, aad=None, icv=None):
101 iv = data[:self.iv_len]
102 ct = data[self.iv_len:]
103 decryptor = Cipher(algorithms.AES(key),
105 default_backend()).decryptor()
106 return decryptor.update(ct) + decryptor.finalize()
108 salt = key[-SALT_SIZE:]
109 nonce = salt + data[:GCM_IV_SIZE]
110 ct = data[GCM_IV_SIZE:]
111 key = key[:-SALT_SIZE]
112 decryptor = Cipher(algorithms.AES(key),
113 self.mode(nonce, icv, len(icv)),
114 default_backend()).decryptor()
115 decryptor.authenticate_additional_data(aad)
116 pt = decryptor.update(ct) + decryptor.finalize()
121 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122 data = data + b'\x00' * (pad_len - 1)
123 return data + bytes([pad_len - 1])
126 class AuthAlgo(object):
127 def __init__(self, name, mac, mod, key_len, trunc_len=None):
131 self.key_len = key_len
132 self.trunc_len = trunc_len or key_len
136 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
143 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
151 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
157 class IKEv2ChildSA(object):
158 def __init__(self, local_ts, remote_ts, spi=None):
159 self.spi = spi or os.urandom(4)
160 self.local_ts = local_ts
161 self.remote_ts = remote_ts
164 class IKEv2SA(object):
165 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
166 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
167 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
168 auth_method='shared-key', priv_key=None, natt=False):
177 self.dh_params = None
179 self.priv_key = priv_key
180 self.is_initiator = is_initiator
181 nonce = nonce or os.urandom(32)
182 self.auth_data = auth_data
185 if isinstance(id_type, str):
186 self.id_type = IDType.value(id_type)
188 self.id_type = id_type
189 self.auth_method = auth_method
190 if self.is_initiator:
191 self.rspi = 8 * b'\x00'
196 self.ispi = 8 * b'\x00'
198 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
200 def new_msg_id(self):
205 def my_dh_pub_key(self):
206 if self.is_initiator:
207 return self.i_dh_data
208 return self.r_dh_data
211 def peer_dh_pub_key(self):
212 if self.is_initiator:
213 return self.r_dh_data
214 return self.i_dh_data
216 def compute_secret(self):
217 priv = self.dh_private_key
218 peer = self.peer_dh_pub_key
219 p, g, l = self.ike_group
220 return pow(int.from_bytes(peer, 'big'),
221 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
223 def generate_dh_data(self):
225 if self.ike_dh not in DH:
226 raise NotImplementedError('%s not in DH group' % self.ike_dh)
228 if self.dh_params is None:
229 dhg = DH[self.ike_dh]
230 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
231 self.dh_params = pn.parameters(default_backend())
233 priv = self.dh_params.generate_private_key()
234 pub = priv.public_key()
235 x = priv.private_numbers().x
236 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
237 y = pub.public_numbers().y
239 if self.is_initiator:
240 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
242 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
244 def complete_dh_data(self):
245 self.dh_shared_secret = self.compute_secret()
247 def calc_child_keys(self):
248 prf = self.ike_prf_alg.mod()
249 s = self.i_nonce + self.r_nonce
250 c = self.child_sas[0]
252 encr_key_len = self.esp_crypto_key_len
253 integ_key_len = self.esp_integ_alg.key_len
254 salt_len = 0 if integ_key_len else 4
256 l = (integ_key_len * 2 +
259 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
262 c.sk_ei = keymat[pos:pos+encr_key_len]
266 c.sk_ai = keymat[pos:pos+integ_key_len]
269 c.salt_ei = keymat[pos:pos+salt_len]
272 c.sk_er = keymat[pos:pos+encr_key_len]
276 c.sk_ar = keymat[pos:pos+integ_key_len]
279 c.salt_er = keymat[pos:pos+salt_len]
282 def calc_prfplus(self, prf, key, seed, length):
286 while len(r) < length and x < 255:
291 s = s + seed + bytes([x])
292 t = self.calc_prf(prf, key, s)
300 def calc_prf(self, prf, key, data):
301 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
306 prf = self.ike_prf_alg.mod()
307 # SKEYSEED = prf(Ni | Nr, g^ir)
308 s = self.i_nonce + self.r_nonce
309 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
311 # calculate S = Ni | Nr | SPIi SPIr
312 s = s + self.ispi + self.rspi
314 prf_key_trunc = self.ike_prf_alg.trunc_len
315 encr_key_len = self.ike_crypto_key_len
316 tr_prf_key_len = self.ike_prf_alg.key_len
317 integ_key_len = self.ike_integ_alg.key_len
318 if integ_key_len == 0:
328 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
331 self.sk_d = keymat[:pos+prf_key_trunc]
334 self.sk_ai = keymat[pos:pos+integ_key_len]
336 self.sk_ar = keymat[pos:pos+integ_key_len]
339 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
340 pos += encr_key_len + salt_size
341 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
342 pos += encr_key_len + salt_size
344 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
345 pos += tr_prf_key_len
346 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
348 def generate_authmsg(self, prf, packet):
349 if self.is_initiator:
357 data = bytes([self.id_type, 0, 0, 0]) + id
358 id_hash = self.calc_prf(prf, key, data)
359 return packet + nonce + id_hash
362 prf = self.ike_prf_alg.mod()
363 if self.is_initiator:
364 packet = self.init_req_packet
366 packet = self.init_resp_packet
367 authmsg = self.generate_authmsg(prf, raw(packet))
368 if self.auth_method == 'shared-key':
369 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
370 self.auth_data = self.calc_prf(prf, psk, authmsg)
371 elif self.auth_method == 'rsa-sig':
372 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
375 raise TypeError('unknown auth method type!')
377 def encrypt(self, data, aad=None):
378 data = self.ike_crypto_alg.pad(data)
379 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
382 def peer_authkey(self):
383 if self.is_initiator:
388 def my_authkey(self):
389 if self.is_initiator:
394 def my_cryptokey(self):
395 if self.is_initiator:
400 def peer_cryptokey(self):
401 if self.is_initiator:
405 def concat(self, alg, key_len):
406 return alg + '-' + str(key_len * 8)
409 def vpp_ike_cypto_alg(self):
410 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
413 def vpp_esp_cypto_alg(self):
414 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
416 def verify_hmac(self, ikemsg):
417 integ_trunc = self.ike_integ_alg.trunc_len
418 exp_hmac = ikemsg[-integ_trunc:]
419 data = ikemsg[:-integ_trunc]
420 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
421 self.peer_authkey, data)
422 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
424 def compute_hmac(self, integ, key, data):
425 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
429 def decrypt(self, data, aad=None, icv=None):
430 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
432 def hmac_and_decrypt(self, ike):
433 ep = ike[ikev2.IKEv2_payload_Encrypted]
434 if self.ike_crypto == 'AES-GCM-16ICV':
435 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
436 ct = ep.load[:-GCM_ICV_SIZE]
437 tag = ep.load[-GCM_ICV_SIZE:]
438 return self.decrypt(ct, raw(ike)[:aad_len], tag)
440 self.verify_hmac(raw(ike))
441 integ_trunc = self.ike_integ_alg.trunc_len
443 # remove ICV and decrypt payload
444 ct = ep.load[:-integ_trunc]
445 return self.decrypt(ct)
447 def build_ts_addr(self, ts, version):
448 return {'starting_address_v' + version: ts['start_addr'],
449 'ending_address_v' + version: ts['end_addr']}
451 def generate_ts(self, is_ip4):
452 c = self.child_sas[0]
453 ts_data = {'IP_protocol_ID': 0,
457 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
458 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
459 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
460 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
462 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
463 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
464 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
465 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
467 if self.is_initiator:
468 return ([ts1], [ts2])
469 return ([ts2], [ts1])
471 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
472 if crypto not in CRYPTO_ALGOS:
473 raise TypeError('unsupported encryption algo %r' % crypto)
474 self.ike_crypto = crypto
475 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
476 self.ike_crypto_key_len = crypto_key_len
478 if integ not in AUTH_ALGOS:
479 raise TypeError('unsupported auth algo %r' % integ)
480 self.ike_integ = None if integ == 'NULL' else integ
481 self.ike_integ_alg = AUTH_ALGOS[integ]
483 if prf not in PRF_ALGOS:
484 raise TypeError('unsupported prf algo %r' % prf)
486 self.ike_prf_alg = PRF_ALGOS[prf]
488 self.ike_group = DH[self.ike_dh]
490 def set_esp_props(self, crypto, crypto_key_len, integ):
491 self.esp_crypto_key_len = crypto_key_len
492 if crypto not in CRYPTO_ALGOS:
493 raise TypeError('unsupported encryption algo %r' % crypto)
494 self.esp_crypto = crypto
495 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
497 if integ not in AUTH_ALGOS:
498 raise TypeError('unsupported auth algo %r' % integ)
499 self.esp_integ = None if integ == 'NULL' else integ
500 self.esp_integ_alg = AUTH_ALGOS[integ]
502 def crypto_attr(self, key_len):
503 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
504 return (0x800e << 16 | key_len << 3, 12)
506 raise Exception('unsupported attribute type')
508 def ike_crypto_attr(self):
509 return self.crypto_attr(self.ike_crypto_key_len)
511 def esp_crypto_attr(self):
512 return self.crypto_attr(self.esp_crypto_key_len)
514 def compute_nat_sha1(self, ip, port, rspi=None):
517 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
518 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
520 return digest.finalize()
523 class IkePeer(VppTestCase):
524 """ common class for initiator and responder """
528 import scapy.contrib.ikev2 as _ikev2
529 globals()['ikev2'] = _ikev2
530 super(IkePeer, cls).setUpClass()
531 cls.create_pg_interfaces(range(2))
532 for i in cls.pg_interfaces:
540 def tearDownClass(cls):
541 super(IkePeer, cls).tearDownClass()
544 super(IkePeer, self).setUp()
546 self.p.add_vpp_config()
547 self.assertIsNotNone(self.p.query_vpp_config())
548 self.sa.generate_dh_data()
549 self.vapi.cli('ikev2 set logging level 4')
550 self.vapi.cli('event-lo clear')
552 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
555 src_ip = src_if.remote_ip6
556 dst_ip = src_if.local_ip6
559 src_ip = src_if.remote_ip4
560 dst_ip = src_if.local_ip4
562 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
563 ip_layer(src=src_ip, dst=dst_ip) /
564 UDP(sport=sport, dport=dport))
566 # insert non ESP marker
567 res = res / Raw(b'\x00' * 4)
570 def verify_udp(self, udp):
571 self.assertEqual(udp.sport, self.sa.sport)
572 self.assertEqual(udp.dport, self.sa.dport)
574 def get_ike_header(self, packet):
576 ih = packet[ikev2.IKEv2]
577 except IndexError as e:
578 # this is a workaround for getting IKEv2 layer as both ikev2 and
579 # ipsec register for port 4500
581 ih = self.verify_and_remove_non_esp_marker(esp)
582 self.assertEqual(ih.version, 0x20)
585 def verify_and_remove_non_esp_marker(self, packet):
587 # if we are in nat traversal mode check for non esp marker
590 self.assertEqual(data[:4], b'\x00' * 4)
591 return ikev2.IKEv2(data[4:])
595 def encrypt_ike_msg(self, header, plain, first_payload):
596 if self.sa.ike_crypto == 'AES-GCM-16ICV':
597 data = self.sa.ike_crypto_alg.pad(raw(plain))
598 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
599 len(ikev2.IKEv2_payload_Encrypted())
600 tlen = plen + len(ikev2.IKEv2())
603 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
607 encr = self.sa.encrypt(raw(plain), raw(res))
608 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
609 length=plen, load=encr)
612 encr = self.sa.encrypt(raw(plain))
613 trunc_len = self.sa.ike_integ_alg.trunc_len
614 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
615 tlen = plen + len(ikev2.IKEv2())
617 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
618 length=plen, load=encr)
622 integ_data = raw(res)
623 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
624 self.sa.my_authkey, integ_data)
625 res = res / Raw(hmac_data[:trunc_len])
626 assert(len(res) == tlen)
629 def verify_ipsec_sas(self):
630 sas = self.vapi.ipsec_sa_dump()
631 self.assertEqual(len(sas), 2)
632 e = VppEnum.vl_api_ipsec_sad_flags_t
633 if self.sa.is_initiator:
640 c = self.sa.child_sas[0]
642 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
643 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
644 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
646 if self.sa.esp_integ is None:
649 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
650 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
651 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
654 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
655 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
656 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
657 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
661 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
662 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
663 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
664 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
666 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
667 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
669 def verify_keymat(self, api_keys, keys, name):
670 km = getattr(keys, name)
671 api_km = getattr(api_keys, name)
672 api_km_len = getattr(api_keys, name + '_len')
673 self.assertEqual(len(km), api_km_len)
674 self.assertEqual(km, api_km[:api_km_len])
676 def verify_id(self, api_id, exp_id):
677 self.assertEqual(api_id.type, IDType.value(exp_id.type))
678 self.assertEqual(api_id.data_len, exp_id.data_len)
679 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
681 def verify_ike_sas(self):
682 r = self.vapi.ikev2_sa_dump()
683 self.assertEqual(len(r), 1)
685 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
686 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
688 if self.sa.is_initiator:
689 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
690 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
692 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
693 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
695 if self.sa.is_initiator:
696 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
697 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
699 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
700 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
701 self.verify_keymat(sa.keys, self.sa, 'sk_d')
702 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
703 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
704 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
705 self.verify_keymat(sa.keys, self.sa, 'sk_er')
706 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
707 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
709 self.assertEqual(sa.i_id.type, self.sa.id_type)
710 self.assertEqual(sa.r_id.type, self.sa.id_type)
711 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
712 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
713 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
714 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
716 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
717 self.assertEqual(len(r), 1)
719 self.assertEqual(csa.sa_index, sa.sa_index)
720 c = self.sa.child_sas[0]
721 if hasattr(c, 'sk_ai'):
722 self.verify_keymat(csa.keys, c, 'sk_ai')
723 self.verify_keymat(csa.keys, c, 'sk_ar')
724 self.verify_keymat(csa.keys, c, 'sk_ei')
725 self.verify_keymat(csa.keys, c, 'sk_er')
727 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
730 r = self.vapi.ikev2_traffic_selector_dump(
731 is_initiator=True, sa_index=sa.sa_index,
732 child_sa_index=csa.child_sa_index)
733 self.assertEqual(len(r), 1)
735 self.verify_ts(r[0].ts, tsi[0], True)
737 r = self.vapi.ikev2_traffic_selector_dump(
738 is_initiator=False, sa_index=sa.sa_index,
739 child_sa_index=csa.child_sa_index)
740 self.assertEqual(len(r), 1)
741 self.verify_ts(r[0].ts, tsr[0], False)
743 n = self.vapi.ikev2_nonce_get(is_initiator=True,
744 sa_index=sa.sa_index)
745 self.verify_nonce(n, self.sa.i_nonce)
746 n = self.vapi.ikev2_nonce_get(is_initiator=False,
747 sa_index=sa.sa_index)
748 self.verify_nonce(n, self.sa.r_nonce)
750 def verify_nonce(self, api_nonce, nonce):
751 self.assertEqual(api_nonce.data_len, len(nonce))
752 self.assertEqual(api_nonce.nonce, nonce)
754 def verify_ts(self, api_ts, ts, is_initiator):
756 self.assertTrue(api_ts.is_local)
758 self.assertFalse(api_ts.is_local)
761 self.assertEqual(api_ts.start_addr,
762 IPv4Address(ts.starting_address_v4))
763 self.assertEqual(api_ts.end_addr,
764 IPv4Address(ts.ending_address_v4))
766 self.assertEqual(api_ts.start_addr,
767 IPv6Address(ts.starting_address_v6))
768 self.assertEqual(api_ts.end_addr,
769 IPv6Address(ts.ending_address_v6))
770 self.assertEqual(api_ts.start_port, ts.start_port)
771 self.assertEqual(api_ts.end_port, ts.end_port)
772 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
775 class TemplateInitiator(IkePeer):
776 """ initiator test template """
779 super(TemplateInitiator, self).tearDown()
782 def find_notify_payload(packet, notify_type):
783 n = packet[ikev2.IKEv2_payload_Notify]
785 if n.type == notify_type:
790 def verify_nat_detection(self, packet):
797 # NAT_DETECTION_SOURCE_IP
798 s = self.find_notify_payload(packet, 16388)
799 self.assertIsNotNone(s)
800 src_sha = self.sa.compute_nat_sha1(
801 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
802 self.assertEqual(s.load, src_sha)
804 # NAT_DETECTION_DESTINATION_IP
805 s = self.find_notify_payload(packet, 16389)
806 self.assertIsNotNone(s)
807 dst_sha = self.sa.compute_nat_sha1(
808 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
809 self.assertEqual(s.load, dst_sha)
811 def verify_sa_init_request(self, packet):
812 ih = packet[ikev2.IKEv2]
813 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
814 self.assertEqual(ih.exch_type, 34) # SA_INIT
815 self.sa.ispi = ih.init_SPI
816 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
817 self.assertIn('Initiator', ih.flags)
818 self.assertNotIn('Response', ih.flags)
819 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
820 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
822 prop = packet[ikev2.IKEv2_payload_Proposal]
823 self.assertEqual(prop.proto, 1) # proto = ikev2
824 self.assertEqual(prop.proposal, 1)
825 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
826 self.assertEqual(prop.trans[0].transform_id,
827 self.p.ike_transforms['crypto_alg'])
828 self.assertEqual(prop.trans[1].transform_type, 2) # prf
829 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
830 self.assertEqual(prop.trans[2].transform_type, 4) # dh
831 self.assertEqual(prop.trans[2].transform_id,
832 self.p.ike_transforms['dh_group'])
834 self.verify_nat_detection(packet)
835 self.sa.complete_dh_data()
838 def verify_sa_auth_req(self, packet):
839 ih = self.get_ike_header(packet)
840 self.assertEqual(ih.resp_SPI, self.sa.rspi)
841 self.assertEqual(ih.init_SPI, self.sa.ispi)
842 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
843 self.assertIn('Initiator', ih.flags)
844 self.assertNotIn('Response', ih.flags)
848 self.assertEqual(ih.id, self.sa.msg_id + 1)
850 plain = self.sa.hmac_and_decrypt(ih)
851 idi = ikev2.IKEv2_payload_IDi(plain)
852 idr = ikev2.IKEv2_payload_IDr(idi.payload)
853 self.assertEqual(idi.load, self.sa.i_id)
854 self.assertEqual(idr.load, self.sa.r_id)
856 def send_init_response(self):
857 tr_attr = self.sa.ike_crypto_attr()
858 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
859 transform_id=self.sa.ike_crypto, length=tr_attr[1],
860 key_length=tr_attr[0]) /
861 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
862 transform_id=self.sa.ike_integ) /
863 ikev2.IKEv2_payload_Transform(transform_type='PRF',
864 transform_id=self.sa.ike_prf_alg.name) /
865 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
866 transform_id=self.sa.ike_dh))
867 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
868 trans_nb=4, trans=trans))
869 self.sa.init_resp_packet = (
870 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
871 exch_type='IKE_SA_INIT', flags='Response') /
872 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
873 ikev2.IKEv2_payload_KE(next_payload='Nonce',
874 group=self.sa.ike_dh,
875 load=self.sa.my_dh_pub_key) /
876 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
878 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
879 self.sa.sport, self.sa.dport,
880 self.sa.natt, self.ip6)
881 self.pg_send(self.pg0, ike_msg)
882 capture = self.pg0.get_capture(1)
883 self.verify_sa_auth_req(capture[0])
885 def initiate_sa_init(self):
886 self.pg0.enable_capture()
888 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
890 capture = self.pg0.get_capture(1)
891 self.verify_sa_init_request(capture[0])
892 self.send_init_response()
894 def send_auth_response(self):
895 tr_attr = self.sa.esp_crypto_attr()
896 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
897 transform_id=self.sa.esp_crypto, length=tr_attr[1],
898 key_length=tr_attr[0]) /
899 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
900 transform_id=self.sa.esp_integ) /
901 ikev2.IKEv2_payload_Transform(
902 transform_type='Extended Sequence Number',
903 transform_id='No ESN') /
904 ikev2.IKEv2_payload_Transform(
905 transform_type='Extended Sequence Number',
908 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
909 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
911 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
912 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
913 IDtype=self.sa.id_type, load=self.sa.i_id) /
914 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
915 IDtype=self.sa.id_type, load=self.sa.r_id) /
916 ikev2.IKEv2_payload_AUTH(next_payload='SA',
917 auth_type=AuthMethod.value(self.sa.auth_method),
918 load=self.sa.auth_data) /
919 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
920 ikev2.IKEv2_payload_TSi(next_payload='TSr',
921 number_of_TSs=len(tsi),
922 traffic_selector=tsi) /
923 ikev2.IKEv2_payload_TSr(next_payload='Notify',
924 number_of_TSs=len(tsr),
925 traffic_selector=tsr) /
926 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
928 header = ikev2.IKEv2(
929 init_SPI=self.sa.ispi,
930 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
931 flags='Response', exch_type='IKE_AUTH')
933 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
934 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
935 self.sa.dport, self.sa.natt, self.ip6)
936 self.pg_send(self.pg0, packet)
938 def test_initiator(self):
939 self.initiate_sa_init()
941 self.sa.calc_child_keys()
942 self.send_auth_response()
943 self.verify_ike_sas()
946 class TemplateResponder(IkePeer):
947 """ responder test template """
950 super(TemplateResponder, self).tearDown()
951 if self.sa.is_initiator:
952 self.initiate_del_sa()
953 r = self.vapi.ikev2_sa_dump()
954 self.assertEqual(len(r), 0)
956 self.p.remove_vpp_config()
957 self.assertIsNone(self.p.query_vpp_config())
959 def verify_del_sa(self, packet):
960 ih = self.get_ike_header(packet)
961 self.assertEqual(ih.id, self.sa.msg_id)
962 self.assertEqual(ih.exch_type, 37) # exchange informational
964 def initiate_del_sa(self):
965 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
966 flags='Initiator', exch_type='INFORMATIONAL',
967 id=self.sa.new_msg_id())
968 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
969 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
970 packet = self.create_packet(self.pg0, ike_msg,
971 self.sa.sport, self.sa.dport,
972 self.sa.natt, self.ip6)
973 self.pg0.add_stream(packet)
974 self.pg0.enable_capture()
976 capture = self.pg0.get_capture(1)
977 self.verify_del_sa(capture[0])
979 def send_sa_init_req(self, behind_nat=False):
980 tr_attr = self.sa.ike_crypto_attr()
981 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
982 transform_id=self.sa.ike_crypto, length=tr_attr[1],
983 key_length=tr_attr[0]) /
984 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
985 transform_id=self.sa.ike_integ) /
986 ikev2.IKEv2_payload_Transform(transform_type='PRF',
987 transform_id=self.sa.ike_prf_alg.name) /
988 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
989 transform_id=self.sa.ike_dh))
991 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
992 trans_nb=4, trans=trans))
994 self.sa.init_req_packet = (
995 ikev2.IKEv2(init_SPI=self.sa.ispi,
996 flags='Initiator', exch_type='IKE_SA_INIT') /
997 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
998 ikev2.IKEv2_payload_KE(next_payload='Nonce',
999 group=self.sa.ike_dh,
1000 load=self.sa.my_dh_pub_key) /
1001 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1002 load=self.sa.i_nonce))
1005 src_address = b'\x0a\x0a\x0a\x01'
1007 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1009 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1010 dst_nat = self.sa.compute_nat_sha1(
1011 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1013 nat_src_detection = ikev2.IKEv2_payload_Notify(
1014 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1015 next_payload='Notify')
1016 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1017 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1018 self.sa.init_req_packet = (self.sa.init_req_packet /
1022 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1023 self.sa.sport, self.sa.dport,
1024 self.sa.natt, self.ip6)
1025 self.pg0.add_stream(ike_msg)
1026 self.pg0.enable_capture()
1028 capture = self.pg0.get_capture(1)
1029 self.verify_sa_init(capture[0])
1031 def send_sa_auth(self):
1032 tr_attr = self.sa.esp_crypto_attr()
1033 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1034 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1035 key_length=tr_attr[0]) /
1036 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1037 transform_id=self.sa.esp_integ) /
1038 ikev2.IKEv2_payload_Transform(
1039 transform_type='Extended Sequence Number',
1040 transform_id='No ESN') /
1041 ikev2.IKEv2_payload_Transform(
1042 transform_type='Extended Sequence Number',
1043 transform_id='ESN'))
1045 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1046 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
1048 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1049 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1050 IDtype=self.sa.id_type, load=self.sa.i_id) /
1051 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1052 IDtype=self.sa.id_type, load=self.sa.r_id) /
1053 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1054 auth_type=AuthMethod.value(self.sa.auth_method),
1055 load=self.sa.auth_data) /
1056 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1057 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1058 number_of_TSs=len(tsi),
1059 traffic_selector=tsi) /
1060 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1061 number_of_TSs=len(tsr),
1062 traffic_selector=tsr) /
1063 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1065 header = ikev2.IKEv2(
1066 init_SPI=self.sa.ispi,
1067 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1068 flags='Initiator', exch_type='IKE_AUTH')
1070 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1071 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1072 self.sa.dport, self.sa.natt, self.ip6)
1073 self.pg0.add_stream(packet)
1074 self.pg0.enable_capture()
1076 capture = self.pg0.get_capture(1)
1077 self.verify_sa_auth_resp(capture[0])
1079 def verify_sa_init(self, packet):
1080 ih = self.get_ike_header(packet)
1082 self.assertEqual(ih.id, self.sa.msg_id)
1083 self.assertEqual(ih.exch_type, 34)
1084 self.assertTrue('Response' in ih.flags)
1085 self.assertEqual(ih.init_SPI, self.sa.ispi)
1086 self.assertNotEqual(ih.resp_SPI, 0)
1087 self.sa.rspi = ih.resp_SPI
1089 sa = ih[ikev2.IKEv2_payload_SA]
1090 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1091 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1092 except IndexError as e:
1093 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1094 self.logger.error(ih.show())
1096 self.sa.complete_dh_data()
1100 def verify_sa_auth_resp(self, packet):
1101 ike = self.get_ike_header(packet)
1103 self.verify_udp(udp)
1104 self.assertEqual(ike.id, self.sa.msg_id)
1105 plain = self.sa.hmac_and_decrypt(ike)
1106 self.sa.calc_child_keys()
1108 def test_responder(self):
1109 self.send_sa_init_req(self.sa.natt)
1111 self.verify_ipsec_sas()
1112 self.verify_ike_sas()
1115 class Ikev2Params(object):
1116 def config_params(self, params={}):
1117 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1118 ei = VppEnum.vl_api_ipsec_integ_alg_t
1120 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1121 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1122 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1123 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1124 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1125 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1127 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1128 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1129 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1130 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1132 is_natt = 'natt' in params and params['natt'] or False
1133 self.p = Profile(self, 'pr1')
1134 self.ip6 = False if 'ip6' not in params else params['ip6']
1136 if 'auth' in params and params['auth'] == 'rsa-sig':
1137 auth_method = 'rsa-sig'
1138 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1139 self.vapi.ikev2_set_local_key(
1140 key_file=work_dir + params['server-key'])
1142 client_file = work_dir + params['client-cert']
1143 server_pem = open(work_dir + params['server-cert']).read()
1144 client_priv = open(work_dir + params['client-key']).read()
1145 client_priv = load_pem_private_key(str.encode(client_priv), None,
1147 self.peer_cert = x509.load_pem_x509_certificate(
1148 str.encode(server_pem),
1150 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1153 auth_data = b'$3cr3tpa$$w0rd'
1154 self.p.add_auth(method='shared-key', data=auth_data)
1155 auth_method = 'shared-key'
1158 is_init = True if 'is_initiator' not in params else\
1159 params['is_initiator']
1161 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1162 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1164 self.p.add_local_id(**idr)
1165 self.p.add_remote_id(**idi)
1167 self.p.add_local_id(**idi)
1168 self.p.add_remote_id(**idr)
1170 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1171 'loc_ts' not in params else params['loc_ts']
1172 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1173 'rem_ts' not in params else params['rem_ts']
1174 self.p.add_local_ts(**loc_ts)
1175 self.p.add_remote_ts(**rem_ts)
1176 if 'responder' in params:
1177 self.p.add_responder(params['responder'])
1178 if 'ike_transforms' in params:
1179 self.p.add_ike_transforms(params['ike_transforms'])
1180 if 'esp_transforms' in params:
1181 self.p.add_esp_transforms(params['esp_transforms'])
1183 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1184 is_initiator=is_init,
1185 id_type=self.p.local_id['id_type'], natt=is_natt,
1186 priv_key=client_priv, auth_method=auth_method,
1187 auth_data=auth_data,
1188 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1190 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1191 params['ike-crypto']
1192 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1194 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1197 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1198 params['esp-crypto']
1199 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1202 self.sa.set_ike_props(
1203 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1204 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1205 self.sa.set_esp_props(
1206 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1210 class TestApi(VppTestCase):
1211 """ Test IKEV2 API """
1213 def setUpClass(cls):
1214 super(TestApi, cls).setUpClass()
1217 def tearDownClass(cls):
1218 super(TestApi, cls).tearDownClass()
1221 super(TestApi, self).tearDown()
1222 self.p1.remove_vpp_config()
1223 self.p2.remove_vpp_config()
1224 r = self.vapi.ikev2_profile_dump()
1225 self.assertEqual(len(r), 0)
1227 def configure_profile(self, cfg):
1228 p = Profile(self, cfg['name'])
1229 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1230 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1231 p.add_local_ts(**cfg['loc_ts'])
1232 p.add_remote_ts(**cfg['rem_ts'])
1233 p.add_responder(cfg['responder'])
1234 p.add_ike_transforms(cfg['ike_ts'])
1235 p.add_esp_transforms(cfg['esp_ts'])
1236 p.add_auth(**cfg['auth'])
1237 p.set_udp_encap(cfg['udp_encap'])
1238 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1239 if 'lifetime_data' in cfg:
1240 p.set_lifetime_data(cfg['lifetime_data'])
1241 if 'tun_itf' in cfg:
1242 p.set_tunnel_interface(cfg['tun_itf'])
1246 def test_profile_api(self):
1247 """ test profile dump API """
1252 'start_addr': '3.3.3.2',
1253 'end_addr': '3.3.3.3',
1259 'start_addr': '4.5.76.80',
1260 'end_addr': '2.3.4.6',
1267 'start_addr': 'ab::1',
1268 'end_addr': 'ab::4',
1274 'start_addr': 'cd::12',
1275 'end_addr': 'cd::13',
1281 'loc_id': ('fqdn', b'vpp.home'),
1282 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1285 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1288 'crypto_key_size': 32,
1293 'crypto_key_size': 24,
1295 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1297 'ipsec_over_udp_port': 4501,
1300 'lifetime_maxdata': 20192,
1301 'lifetime_jitter': 9,
1306 'loc_id': ('ip4-addr', b'192.168.2.1'),
1307 'rem_id': ('ip6-addr', b'abcd::1'),
1310 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1313 'crypto_key_size': 16,
1318 'crypto_key_size': 24,
1320 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1322 'ipsec_over_udp_port': 4600,
1325 self.p1 = self.configure_profile(conf['p1'])
1326 self.p2 = self.configure_profile(conf['p2'])
1328 r = self.vapi.ikev2_profile_dump()
1329 self.assertEqual(len(r), 2)
1330 self.verify_profile(r[0].profile, conf['p1'])
1331 self.verify_profile(r[1].profile, conf['p2'])
1333 def verify_id(self, api_id, cfg_id):
1334 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1335 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1337 def verify_ts(self, api_ts, cfg_ts):
1338 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1339 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1340 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1341 self.assertEqual(api_ts.start_addr,
1342 ip_address(text_type(cfg_ts['start_addr'])))
1343 self.assertEqual(api_ts.end_addr,
1344 ip_address(text_type(cfg_ts['end_addr'])))
1346 def verify_responder(self, api_r, cfg_r):
1347 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1348 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1350 def verify_transforms(self, api_ts, cfg_ts):
1351 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1352 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1353 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1355 def verify_ike_transforms(self, api_ts, cfg_ts):
1356 self.verify_transforms(api_ts, cfg_ts)
1357 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1359 def verify_esp_transforms(self, api_ts, cfg_ts):
1360 self.verify_transforms(api_ts, cfg_ts)
1362 def verify_auth(self, api_auth, cfg_auth):
1363 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1364 self.assertEqual(api_auth.data, cfg_auth['data'])
1365 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1367 def verify_lifetime_data(self, p, ld):
1368 self.assertEqual(p.lifetime, ld['lifetime'])
1369 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1370 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1371 self.assertEqual(p.handover, ld['handover'])
1373 def verify_profile(self, ap, cp):
1374 self.assertEqual(ap.name, cp['name'])
1375 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1376 self.verify_id(ap.loc_id, cp['loc_id'])
1377 self.verify_id(ap.rem_id, cp['rem_id'])
1378 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1379 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1380 self.verify_responder(ap.responder, cp['responder'])
1381 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1382 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1383 self.verify_auth(ap.auth, cp['auth'])
1384 if 'lifetime_data' in cp:
1385 self.verify_lifetime_data(ap, cp['lifetime_data'])
1386 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1388 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1390 self.assertEqual(ap.tun_itf, 0xffffffff)
1393 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1394 """ test ikev2 initiator - pre shared key auth """
1395 def config_tc(self):
1396 self.config_params({
1397 'is_initiator': False, # seen from test case perspective
1398 # thus vpp is initiator
1399 'responder': {'sw_if_index': self.pg0.sw_if_index,
1400 'addr': self.pg0.remote_ip4},
1401 'ike-crypto': ('AES-GCM-16ICV', 32),
1402 'ike-integ': 'NULL',
1403 'ike-dh': '3072MODPgr',
1405 'crypto_alg': 20, # "aes-gcm-16"
1406 'crypto_key_size': 256,
1407 'dh_group': 15, # "modp-3072"
1410 'crypto_alg': 12, # "aes-cbc"
1411 'crypto_key_size': 256,
1412 # "hmac-sha2-256-128"
1416 class TestResponderNATT(TemplateResponder, Ikev2Params):
1417 """ test ikev2 responder - nat traversal """
1418 def config_tc(self):
1423 class TestResponderPsk(TemplateResponder, Ikev2Params):
1424 """ test ikev2 responder - pre shared key auth """
1425 def config_tc(self):
1426 self.config_params()
1429 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1430 """ test ikev2 responder - cert based auth """
1431 def config_tc(self):
1432 self.config_params({
1434 'server-key': 'server-key.pem',
1435 'client-key': 'client-key.pem',
1436 'client-cert': 'client-cert.pem',
1437 'server-cert': 'server-cert.pem'})
1440 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1441 (TemplateResponder, Ikev2Params):
1443 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1445 def config_tc(self):
1446 self.config_params({
1447 'ike-crypto': ('AES-CBC', 16),
1448 'ike-integ': 'SHA2-256-128',
1449 'esp-crypto': ('AES-CBC', 24),
1450 'esp-integ': 'SHA2-384-192',
1451 'ike-dh': '2048MODPgr'})
1454 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1455 (TemplateResponder, Ikev2Params):
1457 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1459 def config_tc(self):
1460 self.config_params({
1461 'ike-crypto': ('AES-CBC', 32),
1462 'ike-integ': 'SHA2-256-128',
1463 'esp-crypto': ('AES-GCM-16ICV', 32),
1464 'esp-integ': 'NULL',
1465 'ike-dh': '3072MODPgr'})
1468 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1472 def config_tc(self):
1473 self.config_params({
1476 'ike-crypto': ('AES-GCM-16ICV', 32),
1477 'ike-integ': 'NULL',
1478 'ike-dh': '2048MODPgr',
1479 'loc_ts': {'start_addr': 'ab:cd::0',
1480 'end_addr': 'ab:cd::10'},
1481 'rem_ts': {'start_addr': '11::0',
1482 'end_addr': '11::100'}})
1485 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1486 """ malformed packet test """
1491 def config_tc(self):
1492 self.config_params()
1494 def assert_counter(self, count, name, version='ip4'):
1495 node_name = '/err/ikev2-%s/' % version + name
1496 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1498 def create_ike_init_msg(self, length=None, payload=None):
1499 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1500 flags='Initiator', exch_type='IKE_SA_INIT')
1501 if payload is not None:
1503 return self.create_packet(self.pg0, msg, self.sa.sport,
1506 def verify_bad_packet_length(self):
1507 ike_msg = self.create_ike_init_msg(length=0xdead)
1508 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1509 self.assert_counter(self.pkt_count, 'Bad packet length')
1511 def verify_bad_sa_payload_length(self):
1512 p = ikev2.IKEv2_payload_SA(length=0xdead)
1513 ike_msg = self.create_ike_init_msg(payload=p)
1514 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1515 self.assert_counter(self.pkt_count, 'Malformed packet')
1517 def test_responder(self):
1518 self.pkt_count = 254
1519 self.verify_bad_packet_length()
1520 self.verify_bad_sa_payload_length()
1523 if __name__ == '__main__':
1524 unittest.main(testRunner=VppTestRunner)