2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
12 from scapy.layers.ipsec import ESP
13 from scapy.layers.inet import IP, UDP, Ether
14 from scapy.packet import raw, Raw
15 from scapy.utils import long_converter
16 from framework import VppTestCase, VppTestRunner
17 from vpp_ikev2 import Profile, IDType, AuthMethod
18 from vpp_papi import VppEnum
21 KEY_PAD = b"Key Pad for IKEv2"
25 # tuple structure is (p, g, key_len)
27 '2048MODPgr': (long_converter("""
28 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
29 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
30 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
31 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
32 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
33 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
34 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
35 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
36 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
37 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
38 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
40 '3072MODPgr': (long_converter("""
41 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
51 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
52 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
53 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
54 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
55 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
56 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
60 class CryptoAlgo(object):
61 def __init__(self, name, cipher, mode):
65 if self.cipher is not None:
66 self.bs = self.cipher.block_size // 8
68 def encrypt(self, data, key):
69 iv = os.urandom(self.bs)
70 encryptor = Cipher(self.cipher(key), self.mode(iv),
71 default_backend()).encryptor()
72 return iv + encryptor.update(data) + encryptor.finalize()
74 def decrypt(self, data, key, icv=None):
77 decryptor = Cipher(algorithms.AES(key),
79 default_backend()).decryptor()
80 return decryptor.update(ct) + decryptor.finalize()
83 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
84 data = data + b'\x00' * (pad_len - 1)
85 return data + bytes([pad_len])
88 class AuthAlgo(object):
89 def __init__(self, name, mac, mod, key_len, trunc_len=None):
93 self.key_len = key_len
94 self.trunc_len = trunc_len or key_len
98 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
99 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
100 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
105 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
106 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
107 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
108 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
109 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
113 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
114 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
119 class IKEv2ChildSA(object):
120 def __init__(self, local_ts, remote_ts, spi=None):
121 self.spi = spi or os.urandom(4)
122 self.local_ts = local_ts
123 self.remote_ts = remote_ts
126 class IKEv2SA(object):
127 def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
128 i_id=None, r_id=None, id_type='fqdn', nonce=None,
129 auth_data=None, local_ts=None, remote_ts=None,
130 auth_method='shared-key', priv_key=None, natt=False):
138 self.dh_params = None
140 self.priv_key = priv_key
141 self.is_initiator = is_initiator
142 nonce = nonce or os.urandom(32)
143 self.auth_data = auth_data
146 if isinstance(id_type, str):
147 self.id_type = IDType.value(id_type)
149 self.id_type = id_type
150 self.auth_method = auth_method
151 if self.is_initiator:
159 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
161 def dh_pub_key(self):
162 return self.i_dh_data
164 def compute_secret(self):
165 priv = self.dh_private_key
166 peer = self.r_dh_data
167 p, g, l = self.ike_group
168 return pow(int.from_bytes(peer, 'big'),
169 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
171 def generate_dh_data(self):
173 if self.is_initiator:
174 if self.ike_dh not in DH:
175 raise NotImplementedError('%s not in DH group' % self.ike_dh)
176 if self.dh_params is None:
177 dhg = DH[self.ike_dh]
178 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
179 self.dh_params = pn.parameters(default_backend())
180 priv = self.dh_params.generate_private_key()
181 pub = priv.public_key()
182 x = priv.private_numbers().x
183 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
184 y = pub.public_numbers().y
185 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
187 def complete_dh_data(self):
188 self.dh_shared_secret = self.compute_secret()
190 def calc_child_keys(self):
191 prf = self.ike_prf_alg.mod()
192 s = self.i_nonce + self.r_nonce
193 c = self.child_sas[0]
195 encr_key_len = self.esp_crypto_key_len
196 integ_key_len = self.esp_integ_alg.key_len
197 salt_len = 0 if integ_key_len else 4
199 l = (integ_key_len * 2 +
202 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
205 c.sk_ei = keymat[pos:pos+encr_key_len]
209 c.sk_ai = keymat[pos:pos+integ_key_len]
212 c.salt_ei = keymat[pos:pos+salt_len]
215 c.sk_er = keymat[pos:pos+encr_key_len]
219 c.sk_ar = keymat[pos:pos+integ_key_len]
222 c.salt_er = keymat[pos:pos+salt_len]
225 def calc_prfplus(self, prf, key, seed, length):
229 while len(r) < length and x < 255:
234 s = s + seed + bytes([x])
235 t = self.calc_prf(prf, key, s)
243 def calc_prf(self, prf, key, data):
244 h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
249 prf = self.ike_prf_alg.mod()
250 # SKEYSEED = prf(Ni | Nr, g^ir)
251 s = self.i_nonce + self.r_nonce
252 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
254 # calculate S = Ni | Nr | SPIi SPIr
255 s = s + self.ispi + self.rspi
257 prf_key_trunc = self.ike_prf_alg.trunc_len
258 encr_key_len = self.ike_crypto_key_len
259 tr_prf_key_len = self.ike_prf_alg.key_len
260 integ_key_len = self.ike_integ_alg.key_len
265 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
268 self.sk_d = keymat[:pos+prf_key_trunc]
271 self.sk_ai = keymat[pos:pos+integ_key_len]
273 self.sk_ar = keymat[pos:pos+integ_key_len]
276 self.sk_ei = keymat[pos:pos+encr_key_len]
278 self.sk_er = keymat[pos:pos+encr_key_len]
281 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
282 pos += tr_prf_key_len
283 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
285 def generate_authmsg(self, prf, packet):
286 if self.is_initiator:
290 data = bytes([self.id_type, 0, 0, 0]) + id
291 id_hash = self.calc_prf(prf, key, data)
292 return packet + nonce + id_hash
295 prf = self.ike_prf_alg.mod()
296 authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
297 if self.auth_method == 'shared-key':
298 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
299 self.auth_data = self.calc_prf(prf, psk, authmsg)
300 elif self.auth_method == 'rsa-sig':
301 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
304 raise TypeError('unknown auth method type!')
306 def encrypt(self, data):
307 data = self.ike_crypto_alg.pad(data)
308 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
311 def peer_authkey(self):
312 if self.is_initiator:
317 def my_authkey(self):
318 if self.is_initiator:
323 def my_cryptokey(self):
324 if self.is_initiator:
329 def peer_cryptokey(self):
330 if self.is_initiator:
334 def concat(self, alg, key_len):
335 return alg + '-' + str(key_len * 8)
338 def vpp_ike_cypto_alg(self):
339 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
342 def vpp_esp_cypto_alg(self):
343 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
345 def verify_hmac(self, ikemsg):
346 integ_trunc = self.ike_integ_alg.trunc_len
347 exp_hmac = ikemsg[-integ_trunc:]
348 data = ikemsg[:-integ_trunc]
349 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
350 self.peer_authkey, data)
351 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
353 def compute_hmac(self, integ, key, data):
354 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
358 def decrypt(self, data):
359 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
361 def hmac_and_decrypt(self, ike):
362 ep = ike[ikev2.IKEv2_payload_Encrypted]
363 self.verify_hmac(raw(ike))
364 integ_trunc = self.ike_integ_alg.trunc_len
366 # remove ICV and decrypt payload
367 ct = ep.load[:-integ_trunc]
368 return self.decrypt(ct)
370 def generate_ts(self):
371 c = self.child_sas[0]
372 ts1 = ikev2.IPv4TrafficSelector(
374 starting_address_v4=c.local_ts['start_addr'],
375 ending_address_v4=c.local_ts['end_addr'])
376 ts2 = ikev2.IPv4TrafficSelector(
378 starting_address_v4=c.remote_ts['start_addr'],
379 ending_address_v4=c.remote_ts['end_addr'])
380 return ([ts1], [ts2])
382 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
383 if crypto not in CRYPTO_ALGOS:
384 raise TypeError('unsupported encryption algo %r' % crypto)
385 self.ike_crypto = crypto
386 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
387 self.ike_crypto_key_len = crypto_key_len
389 if integ not in AUTH_ALGOS:
390 raise TypeError('unsupported auth algo %r' % integ)
391 self.ike_integ = integ
392 self.ike_integ_alg = AUTH_ALGOS[integ]
394 if prf not in PRF_ALGOS:
395 raise TypeError('unsupported prf algo %r' % prf)
397 self.ike_prf_alg = PRF_ALGOS[prf]
399 self.ike_group = DH[self.ike_dh]
401 def set_esp_props(self, crypto, crypto_key_len, integ):
402 self.esp_crypto_key_len = crypto_key_len
403 if crypto not in CRYPTO_ALGOS:
404 raise TypeError('unsupported encryption algo %r' % crypto)
405 self.esp_crypto = crypto
406 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
408 if integ not in AUTH_ALGOS:
409 raise TypeError('unsupported auth algo %r' % integ)
410 self.esp_integ = None if integ == 'NULL' else integ
411 self.esp_integ_alg = AUTH_ALGOS[integ]
413 def crypto_attr(self, key_len):
414 if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
415 return (0x800e << 16 | key_len << 3, 12)
417 raise Exception('unsupported attribute type')
419 def ike_crypto_attr(self):
420 return self.crypto_attr(self.ike_crypto_key_len)
422 def esp_crypto_attr(self):
423 return self.crypto_attr(self.esp_crypto_key_len)
425 def compute_nat_sha1(self, ip, port):
426 data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
427 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
429 return digest.finalize()
432 class TemplateResponder(VppTestCase):
433 """ responder test template """
437 import scapy.contrib.ikev2 as _ikev2
438 globals()['ikev2'] = _ikev2
439 super(TemplateResponder, cls).setUpClass()
440 cls.create_pg_interfaces(range(2))
441 for i in cls.pg_interfaces:
447 def tearDownClass(cls):
448 super(TemplateResponder, cls).tearDownClass()
451 super(TemplateResponder, self).setUp()
453 self.p.add_vpp_config()
454 self.sa.generate_dh_data()
456 def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
457 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
458 IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
459 UDP(sport=sport, dport=dport))
461 # insert non ESP marker
462 res = res / Raw(b'\x00' * 4)
465 def send_sa_init(self, behind_nat=False):
466 tr_attr = self.sa.ike_crypto_attr()
467 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
468 transform_id=self.sa.ike_crypto, length=tr_attr[1],
469 key_length=tr_attr[0]) /
470 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
471 transform_id=self.sa.ike_integ) /
472 ikev2.IKEv2_payload_Transform(transform_type='PRF',
473 transform_id=self.sa.ike_prf_alg.name) /
474 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
475 transform_id=self.sa.ike_dh))
477 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
478 trans_nb=4, trans=trans))
481 next_payload = 'Notify'
485 self.sa.init_req_packet = (
486 ikev2.IKEv2(init_SPI=self.sa.ispi,
487 flags='Initiator', exch_type='IKE_SA_INIT') /
488 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
489 ikev2.IKEv2_payload_KE(next_payload='Nonce',
490 group=self.sa.ike_dh,
491 load=self.sa.dh_pub_key()) /
492 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
493 load=self.sa.i_nonce))
496 src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
498 nat_detection = ikev2.IKEv2_payload_Notify(
499 type='NAT_DETECTION_SOURCE_IP',
501 self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
503 ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
504 self.sa.sport, self.sa.dport,
506 self.pg0.add_stream(ike_msg)
507 self.pg0.enable_capture()
509 capture = self.pg0.get_capture(1)
510 self.verify_sa_init(capture[0])
512 def send_sa_auth(self):
513 tr_attr = self.sa.esp_crypto_attr()
514 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
515 transform_id=self.sa.esp_crypto, length=tr_attr[1],
516 key_length=tr_attr[0]) /
517 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
518 transform_id=self.sa.esp_integ) /
519 ikev2.IKEv2_payload_Transform(
520 transform_type='Extended Sequence Number',
521 transform_id='No ESN') /
522 ikev2.IKEv2_payload_Transform(
523 transform_type='Extended Sequence Number',
526 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
527 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
529 tsi, tsr = self.sa.generate_ts()
530 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
531 IDtype=self.sa.id_type, load=self.sa.i_id) /
532 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
533 IDtype=self.sa.id_type, load=self.sa.r_id) /
534 ikev2.IKEv2_payload_AUTH(next_payload='SA',
535 auth_type=AuthMethod.value(self.sa.auth_method),
536 load=self.sa.auth_data) /
537 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
538 ikev2.IKEv2_payload_TSi(next_payload='TSr',
539 number_of_TSs=len(tsi),
540 traffic_selector=tsi) /
541 ikev2.IKEv2_payload_TSr(next_payload='Notify',
542 number_of_TSs=len(tsr),
543 traffic_selector=tsr) /
544 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
545 encr = self.sa.encrypt(raw(plain))
547 trunc_len = self.sa.ike_integ_alg.trunc_len
548 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
549 tlen = plen + len(ikev2.IKEv2())
551 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
552 length=plen, load=encr)
553 sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
554 length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
557 integ_data = raw(sa_auth)
558 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
559 self.sa.my_authkey, integ_data)
560 sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
561 assert(len(sa_auth) == tlen)
563 packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
564 self.sa.dport, self.sa.natt)
565 self.pg0.add_stream(packet)
566 self.pg0.enable_capture()
568 capture = self.pg0.get_capture(1)
569 self.verify_sa_auth(capture[0])
571 def get_ike_header(self, packet):
573 ih = packet[ikev2.IKEv2]
574 except IndexError as e:
575 # this is a workaround for getting IKEv2 layer as both ikev2 and
576 # ipsec register for port 4500
578 ih = self.verify_and_remove_non_esp_marker(esp)
581 def verify_sa_init(self, packet):
582 ih = self.get_ike_header(packet)
584 self.assertEqual(ih.exch_type, 34)
585 self.assertTrue('Response' in ih.flags)
586 self.assertEqual(ih.init_SPI, self.sa.ispi)
587 self.assertNotEqual(ih.resp_SPI, 0)
588 self.sa.rspi = ih.resp_SPI
590 sa = ih[ikev2.IKEv2_payload_SA]
591 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
592 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
593 except AttributeError as e:
594 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
596 self.sa.complete_dh_data()
600 def verify_and_remove_non_esp_marker(self, packet):
602 # if we are in nat traversal mode check for non esp marker
605 self.assertEqual(data[:4], b'\x00' * 4)
606 return ikev2.IKEv2(data[4:])
610 def verify_udp(self, udp):
611 self.assertEqual(udp.sport, self.sa.sport)
612 self.assertEqual(udp.dport, self.sa.dport)
614 def verify_sa_auth(self, packet):
615 ike = self.get_ike_header(packet)
618 plain = self.sa.hmac_and_decrypt(ike)
619 self.sa.calc_child_keys()
621 def verify_child_sas(self):
622 sas = self.vapi.ipsec_sa_dump()
623 self.assertEqual(len(sas), 2)
626 c = self.sa.child_sas[0]
628 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
629 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
630 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
632 if self.sa.esp_integ is None:
635 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
636 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
637 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
640 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
641 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
642 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
643 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
647 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
648 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
649 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
650 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
652 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
653 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
655 def test_responder(self):
656 self.send_sa_init(self.sa.natt)
658 self.verify_child_sas()
661 class Ikev2Params(object):
662 def config_params(self, params={}):
663 ec = VppEnum.vl_api_ipsec_crypto_alg_t
664 ei = VppEnum.vl_api_ipsec_integ_alg_t
666 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
667 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
668 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
669 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
670 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
671 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
673 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
674 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
675 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
676 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
678 is_natt = 'natt' in params and params['natt'] or False
679 self.p = Profile(self, 'pr1')
681 if 'auth' in params and params['auth'] == 'rsa-sig':
682 auth_method = 'rsa-sig'
683 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
684 self.vapi.ikev2_set_local_key(
685 key_file=work_dir + params['server-key'])
687 client_file = work_dir + params['client-cert']
688 server_pem = open(work_dir + params['server-cert']).read()
689 client_priv = open(work_dir + params['client-key']).read()
690 client_priv = load_pem_private_key(str.encode(client_priv), None,
692 self.peer_cert = x509.load_pem_x509_certificate(
693 str.encode(server_pem),
695 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
698 auth_data = b'$3cr3tpa$$w0rd'
699 self.p.add_auth(method='shared-key', data=auth_data)
700 auth_method = 'shared-key'
703 self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
704 self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
705 self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
706 self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
708 self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
709 r_id=self.p.local_id['data'],
710 id_type=self.p.local_id['id_type'], natt=is_natt,
711 priv_key=client_priv, auth_method=auth_method,
713 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
715 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
717 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
719 ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
721 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
723 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
726 self.sa.set_ike_props(
727 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
728 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
729 self.sa.set_esp_props(
730 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
734 class TestResponderNATT(TemplateResponder, Ikev2Params):
735 """ test ikev2 responder - nat traversal """
741 class TestResponderPsk(TemplateResponder, Ikev2Params):
742 """ test ikev2 responder - pre shared key auth """
747 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
748 """ test ikev2 responder - cert based auth """
752 'server-key': 'server-key.pem',
753 'client-key': 'client-key.pem',
754 'client-cert': 'client-cert.pem',
755 'server-cert': 'server-cert.pem'})
758 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
759 (TemplateResponder, Ikev2Params):
761 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
765 'ike-crypto': ('AES-CBC', 16),
766 'ike-integ': 'SHA2-256-128',
767 'esp-crypto': ('AES-CBC', 24),
768 'esp-integ': 'SHA2-384-192',
769 'ike-dh': '2048MODPgr'})
772 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
773 (TemplateResponder, Ikev2Params):
775 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
779 'ike-crypto': ('AES-CBC', 32),
780 'ike-integ': 'SHA2-256-128',
781 'esp-crypto': ('AES-GCM-16ICV', 32),
783 'ike-dh': '3072MODPgr'})
786 if __name__ == '__main__':
787 unittest.main(testRunner=VppTestRunner)