3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
30 KEY_PAD = b"Key Pad for IKEv2"
37 # tuple structure is (p, g, key_len)
39 '2048MODPgr': (long_converter("""
40 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52 '3072MODPgr': (long_converter("""
53 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
72 class CryptoAlgo(object):
73 def __init__(self, name, cipher, mode):
77 if self.cipher is not None:
78 self.bs = self.cipher.block_size // 8
80 if self.name == 'AES-GCM-16ICV':
81 self.iv_len = GCM_IV_SIZE
85 def encrypt(self, data, key, aad=None):
86 iv = os.urandom(self.iv_len)
88 encryptor = Cipher(self.cipher(key), self.mode(iv),
89 default_backend()).encryptor()
90 return iv + encryptor.update(data) + encryptor.finalize()
92 salt = key[-SALT_SIZE:]
94 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95 default_backend()).encryptor()
96 encryptor.authenticate_additional_data(aad)
97 data = encryptor.update(data) + encryptor.finalize()
98 data += encryptor.tag[:GCM_ICV_SIZE]
101 def decrypt(self, data, key, aad=None, icv=None):
103 iv = data[:self.iv_len]
104 ct = data[self.iv_len:]
105 decryptor = Cipher(algorithms.AES(key),
107 default_backend()).decryptor()
108 return decryptor.update(ct) + decryptor.finalize()
110 salt = key[-SALT_SIZE:]
111 nonce = salt + data[:GCM_IV_SIZE]
112 ct = data[GCM_IV_SIZE:]
113 key = key[:-SALT_SIZE]
114 decryptor = Cipher(algorithms.AES(key),
115 self.mode(nonce, icv, len(icv)),
116 default_backend()).decryptor()
117 decryptor.authenticate_additional_data(aad)
118 return decryptor.update(ct) + decryptor.finalize()
121 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122 data = data + b'\x00' * (pad_len - 1)
123 return data + bytes([pad_len - 1])
126 class AuthAlgo(object):
127 def __init__(self, name, mac, mod, key_len, trunc_len=None):
131 self.key_len = key_len
132 self.trunc_len = trunc_len or key_len
136 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
143 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
151 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
169 class IKEv2ChildSA(object):
170 def __init__(self, local_ts, remote_ts, is_initiator):
178 self.local_ts = local_ts
179 self.remote_ts = remote_ts
182 class IKEv2SA(object):
183 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186 auth_method='shared-key', priv_key=None, natt=False,
188 self.udp_encap = udp_encap
197 self.dh_params = None
199 self.priv_key = priv_key
200 self.is_initiator = is_initiator
201 nonce = nonce or os.urandom(32)
202 self.auth_data = auth_data
205 if isinstance(id_type, str):
206 self.id_type = IDType.value(id_type)
208 self.id_type = id_type
209 self.auth_method = auth_method
210 if self.is_initiator:
211 self.rspi = 8 * b'\x00'
216 self.ispi = 8 * b'\x00'
218 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
221 def new_msg_id(self):
226 def my_dh_pub_key(self):
227 if self.is_initiator:
228 return self.i_dh_data
229 return self.r_dh_data
232 def peer_dh_pub_key(self):
233 if self.is_initiator:
234 return self.r_dh_data
235 return self.i_dh_data
237 def compute_secret(self):
238 priv = self.dh_private_key
239 peer = self.peer_dh_pub_key
240 p, g, l = self.ike_group
241 return pow(int.from_bytes(peer, 'big'),
242 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
244 def generate_dh_data(self):
246 if self.ike_dh not in DH:
247 raise NotImplementedError('%s not in DH group' % self.ike_dh)
249 if self.dh_params is None:
250 dhg = DH[self.ike_dh]
251 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
252 self.dh_params = pn.parameters(default_backend())
254 priv = self.dh_params.generate_private_key()
255 pub = priv.public_key()
256 x = priv.private_numbers().x
257 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
258 y = pub.public_numbers().y
260 if self.is_initiator:
261 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
263 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
265 def complete_dh_data(self):
266 self.dh_shared_secret = self.compute_secret()
268 def calc_child_keys(self):
269 prf = self.ike_prf_alg.mod()
270 s = self.i_nonce + self.r_nonce
271 c = self.child_sas[0]
273 encr_key_len = self.esp_crypto_key_len
274 integ_key_len = self.esp_integ_alg.key_len
275 salt_len = 0 if integ_key_len else 4
277 l = (integ_key_len * 2 +
280 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
283 c.sk_ei = keymat[pos:pos+encr_key_len]
287 c.sk_ai = keymat[pos:pos+integ_key_len]
290 c.salt_ei = keymat[pos:pos+salt_len]
293 c.sk_er = keymat[pos:pos+encr_key_len]
297 c.sk_ar = keymat[pos:pos+integ_key_len]
300 c.salt_er = keymat[pos:pos+salt_len]
303 def calc_prfplus(self, prf, key, seed, length):
307 while len(r) < length and x < 255:
312 s = s + seed + bytes([x])
313 t = self.calc_prf(prf, key, s)
321 def calc_prf(self, prf, key, data):
322 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
327 prf = self.ike_prf_alg.mod()
328 # SKEYSEED = prf(Ni | Nr, g^ir)
329 s = self.i_nonce + self.r_nonce
330 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
332 # calculate S = Ni | Nr | SPIi SPIr
333 s = s + self.ispi + self.rspi
335 prf_key_trunc = self.ike_prf_alg.trunc_len
336 encr_key_len = self.ike_crypto_key_len
337 tr_prf_key_len = self.ike_prf_alg.key_len
338 integ_key_len = self.ike_integ_alg.key_len
339 if integ_key_len == 0:
349 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
352 self.sk_d = keymat[:pos+prf_key_trunc]
355 self.sk_ai = keymat[pos:pos+integ_key_len]
357 self.sk_ar = keymat[pos:pos+integ_key_len]
360 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
361 pos += encr_key_len + salt_size
362 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
363 pos += encr_key_len + salt_size
365 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
366 pos += tr_prf_key_len
367 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
369 def generate_authmsg(self, prf, packet):
370 if self.is_initiator:
378 data = bytes([self.id_type, 0, 0, 0]) + id
379 id_hash = self.calc_prf(prf, key, data)
380 return packet + nonce + id_hash
383 prf = self.ike_prf_alg.mod()
384 if self.is_initiator:
385 packet = self.init_req_packet
387 packet = self.init_resp_packet
388 authmsg = self.generate_authmsg(prf, raw(packet))
389 if self.auth_method == 'shared-key':
390 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
391 self.auth_data = self.calc_prf(prf, psk, authmsg)
392 elif self.auth_method == 'rsa-sig':
393 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
396 raise TypeError('unknown auth method type!')
398 def encrypt(self, data, aad=None):
399 data = self.ike_crypto_alg.pad(data)
400 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
403 def peer_authkey(self):
404 if self.is_initiator:
409 def my_authkey(self):
410 if self.is_initiator:
415 def my_cryptokey(self):
416 if self.is_initiator:
421 def peer_cryptokey(self):
422 if self.is_initiator:
426 def concat(self, alg, key_len):
427 return alg + '-' + str(key_len * 8)
430 def vpp_ike_cypto_alg(self):
431 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
434 def vpp_esp_cypto_alg(self):
435 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
437 def verify_hmac(self, ikemsg):
438 integ_trunc = self.ike_integ_alg.trunc_len
439 exp_hmac = ikemsg[-integ_trunc:]
440 data = ikemsg[:-integ_trunc]
441 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
442 self.peer_authkey, data)
443 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
445 def compute_hmac(self, integ, key, data):
446 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
450 def decrypt(self, data, aad=None, icv=None):
451 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
453 def hmac_and_decrypt(self, ike):
454 ep = ike[ikev2.IKEv2_payload_Encrypted]
455 if self.ike_crypto == 'AES-GCM-16ICV':
456 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
457 ct = ep.load[:-GCM_ICV_SIZE]
458 tag = ep.load[-GCM_ICV_SIZE:]
459 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
461 self.verify_hmac(raw(ike))
462 integ_trunc = self.ike_integ_alg.trunc_len
464 # remove ICV and decrypt payload
465 ct = ep.load[:-integ_trunc]
466 plain = self.decrypt(ct)
469 return plain[:-pad_len - 1]
471 def build_ts_addr(self, ts, version):
472 return {'starting_address_v' + version: ts['start_addr'],
473 'ending_address_v' + version: ts['end_addr']}
475 def generate_ts(self, is_ip4):
476 c = self.child_sas[0]
477 ts_data = {'IP_protocol_ID': 0,
481 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
482 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
483 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
484 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
486 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
487 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
488 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
489 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
491 if self.is_initiator:
492 return ([ts1], [ts2])
493 return ([ts2], [ts1])
495 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
496 if crypto not in CRYPTO_ALGOS:
497 raise TypeError('unsupported encryption algo %r' % crypto)
498 self.ike_crypto = crypto
499 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
500 self.ike_crypto_key_len = crypto_key_len
502 if integ not in AUTH_ALGOS:
503 raise TypeError('unsupported auth algo %r' % integ)
504 self.ike_integ = None if integ == 'NULL' else integ
505 self.ike_integ_alg = AUTH_ALGOS[integ]
507 if prf not in PRF_ALGOS:
508 raise TypeError('unsupported prf algo %r' % prf)
510 self.ike_prf_alg = PRF_ALGOS[prf]
512 self.ike_group = DH[self.ike_dh]
514 def set_esp_props(self, crypto, crypto_key_len, integ):
515 self.esp_crypto_key_len = crypto_key_len
516 if crypto not in CRYPTO_ALGOS:
517 raise TypeError('unsupported encryption algo %r' % crypto)
518 self.esp_crypto = crypto
519 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
521 if integ not in AUTH_ALGOS:
522 raise TypeError('unsupported auth algo %r' % integ)
523 self.esp_integ = None if integ == 'NULL' else integ
524 self.esp_integ_alg = AUTH_ALGOS[integ]
526 def crypto_attr(self, key_len):
527 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
528 return (0x800e << 16 | key_len << 3, 12)
530 raise Exception('unsupported attribute type')
532 def ike_crypto_attr(self):
533 return self.crypto_attr(self.ike_crypto_key_len)
535 def esp_crypto_attr(self):
536 return self.crypto_attr(self.esp_crypto_key_len)
538 def compute_nat_sha1(self, ip, port, rspi=None):
541 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
542 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
544 return digest.finalize()
547 class IkePeer(VppTestCase):
548 """ common class for initiator and responder """
552 import scapy.contrib.ikev2 as _ikev2
553 globals()['ikev2'] = _ikev2
554 super(IkePeer, cls).setUpClass()
555 cls.create_pg_interfaces(range(2))
556 for i in cls.pg_interfaces:
564 def tearDownClass(cls):
565 super(IkePeer, cls).tearDownClass()
568 super(IkePeer, self).tearDown()
569 if self.del_sa_from_responder:
570 self.initiate_del_sa_from_responder()
572 self.initiate_del_sa_from_initiator()
573 r = self.vapi.ikev2_sa_dump()
574 self.assertEqual(len(r), 0)
575 sas = self.vapi.ipsec_sa_dump()
576 self.assertEqual(len(sas), 0)
577 self.p.remove_vpp_config()
578 self.assertIsNone(self.p.query_vpp_config())
581 super(IkePeer, self).setUp()
583 self.p.add_vpp_config()
584 self.assertIsNotNone(self.p.query_vpp_config())
585 if self.sa.is_initiator:
586 self.sa.generate_dh_data()
587 self.vapi.cli('ikev2 set logging level 4')
588 self.vapi.cli('event-lo clear')
590 def create_rekey_request(self):
591 sa, first_payload = self.generate_auth_payload(is_rekey=True)
592 header = ikev2.IKEv2(
593 init_SPI=self.sa.ispi,
594 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
595 flags='Initiator', exch_type='CREATE_CHILD_SA')
597 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
598 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
599 self.sa.dport, self.sa.natt, self.ip6)
601 def create_empty_request(self):
602 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
603 id=self.sa.new_msg_id(), flags='Initiator',
604 exch_type='INFORMATIONAL',
605 next_payload='Encrypted')
607 msg = self.encrypt_ike_msg(header, b'', None)
608 return self.create_packet(self.pg0, msg, self.sa.sport,
609 self.sa.dport, self.sa.natt, self.ip6)
611 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
614 src_ip = src_if.remote_ip6
615 dst_ip = src_if.local_ip6
618 src_ip = src_if.remote_ip4
619 dst_ip = src_if.local_ip4
621 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
622 ip_layer(src=src_ip, dst=dst_ip) /
623 UDP(sport=sport, dport=dport))
625 # insert non ESP marker
626 res = res / Raw(b'\x00' * 4)
629 def verify_udp(self, udp):
630 self.assertEqual(udp.sport, self.sa.sport)
631 self.assertEqual(udp.dport, self.sa.dport)
633 def get_ike_header(self, packet):
635 ih = packet[ikev2.IKEv2]
636 ih = self.verify_and_remove_non_esp_marker(ih)
637 except IndexError as e:
638 # this is a workaround for getting IKEv2 layer as both ikev2 and
639 # ipsec register for port 4500
641 ih = self.verify_and_remove_non_esp_marker(esp)
642 self.assertEqual(ih.version, 0x20)
643 self.assertNotIn('Version', ih.flags)
646 def verify_and_remove_non_esp_marker(self, packet):
648 # if we are in nat traversal mode check for non esp marker
651 self.assertEqual(data[:4], b'\x00' * 4)
652 return ikev2.IKEv2(data[4:])
656 def encrypt_ike_msg(self, header, plain, first_payload):
657 if self.sa.ike_crypto == 'AES-GCM-16ICV':
658 data = self.sa.ike_crypto_alg.pad(raw(plain))
659 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
660 len(ikev2.IKEv2_payload_Encrypted())
661 tlen = plen + len(ikev2.IKEv2())
664 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
668 encr = self.sa.encrypt(raw(plain), raw(res))
669 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
670 length=plen, load=encr)
673 encr = self.sa.encrypt(raw(plain))
674 trunc_len = self.sa.ike_integ_alg.trunc_len
675 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
676 tlen = plen + len(ikev2.IKEv2())
678 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
679 length=plen, load=encr)
683 integ_data = raw(res)
684 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
685 self.sa.my_authkey, integ_data)
686 res = res / Raw(hmac_data[:trunc_len])
687 assert(len(res) == tlen)
690 def verify_udp_encap(self, ipsec_sa):
691 e = VppEnum.vl_api_ipsec_sad_flags_t
692 if self.sa.udp_encap or self.sa.natt:
693 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
695 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
697 def verify_ipsec_sas(self, is_rekey=False):
698 sas = self.vapi.ipsec_sa_dump()
700 # after rekey there is a short period of time in which old
701 # inbound SA is still present
705 self.assertEqual(len(sas), sa_count)
706 if self.sa.is_initiator:
721 c = self.sa.child_sas[0]
723 self.verify_udp_encap(sa0)
724 self.verify_udp_encap(sa1)
725 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
726 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
727 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
729 if self.sa.esp_integ is None:
732 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
733 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
734 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
737 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
738 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
739 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
740 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
744 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
745 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
746 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
747 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
749 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
750 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
752 def verify_keymat(self, api_keys, keys, name):
753 km = getattr(keys, name)
754 api_km = getattr(api_keys, name)
755 api_km_len = getattr(api_keys, name + '_len')
756 self.assertEqual(len(km), api_km_len)
757 self.assertEqual(km, api_km[:api_km_len])
759 def verify_id(self, api_id, exp_id):
760 self.assertEqual(api_id.type, IDType.value(exp_id.type))
761 self.assertEqual(api_id.data_len, exp_id.data_len)
762 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
764 def verify_ike_sas(self):
765 r = self.vapi.ikev2_sa_dump()
766 self.assertEqual(len(r), 1)
768 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
769 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
771 if self.sa.is_initiator:
772 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
773 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
775 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
776 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
778 if self.sa.is_initiator:
779 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
780 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
782 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
783 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
784 self.verify_keymat(sa.keys, self.sa, 'sk_d')
785 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
786 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
787 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
788 self.verify_keymat(sa.keys, self.sa, 'sk_er')
789 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
790 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
792 self.assertEqual(sa.i_id.type, self.sa.id_type)
793 self.assertEqual(sa.r_id.type, self.sa.id_type)
794 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
795 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
796 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
797 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
799 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
800 self.assertEqual(len(r), 1)
802 self.assertEqual(csa.sa_index, sa.sa_index)
803 c = self.sa.child_sas[0]
804 if hasattr(c, 'sk_ai'):
805 self.verify_keymat(csa.keys, c, 'sk_ai')
806 self.verify_keymat(csa.keys, c, 'sk_ar')
807 self.verify_keymat(csa.keys, c, 'sk_ei')
808 self.verify_keymat(csa.keys, c, 'sk_er')
809 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
810 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
812 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
815 r = self.vapi.ikev2_traffic_selector_dump(
816 is_initiator=True, sa_index=sa.sa_index,
817 child_sa_index=csa.child_sa_index)
818 self.assertEqual(len(r), 1)
820 self.verify_ts(r[0].ts, tsi[0], True)
822 r = self.vapi.ikev2_traffic_selector_dump(
823 is_initiator=False, sa_index=sa.sa_index,
824 child_sa_index=csa.child_sa_index)
825 self.assertEqual(len(r), 1)
826 self.verify_ts(r[0].ts, tsr[0], False)
828 n = self.vapi.ikev2_nonce_get(is_initiator=True,
829 sa_index=sa.sa_index)
830 self.verify_nonce(n, self.sa.i_nonce)
831 n = self.vapi.ikev2_nonce_get(is_initiator=False,
832 sa_index=sa.sa_index)
833 self.verify_nonce(n, self.sa.r_nonce)
835 def verify_nonce(self, api_nonce, nonce):
836 self.assertEqual(api_nonce.data_len, len(nonce))
837 self.assertEqual(api_nonce.nonce, nonce)
839 def verify_ts(self, api_ts, ts, is_initiator):
841 self.assertTrue(api_ts.is_local)
843 self.assertFalse(api_ts.is_local)
846 self.assertEqual(api_ts.start_addr,
847 IPv4Address(ts.starting_address_v4))
848 self.assertEqual(api_ts.end_addr,
849 IPv4Address(ts.ending_address_v4))
851 self.assertEqual(api_ts.start_addr,
852 IPv6Address(ts.starting_address_v6))
853 self.assertEqual(api_ts.end_addr,
854 IPv6Address(ts.ending_address_v6))
855 self.assertEqual(api_ts.start_port, ts.start_port)
856 self.assertEqual(api_ts.end_port, ts.end_port)
857 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
860 class TemplateInitiator(IkePeer):
861 """ initiator test template """
863 def initiate_del_sa_from_initiator(self):
864 ispi = int.from_bytes(self.sa.ispi, 'little')
865 self.pg0.enable_capture()
867 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
868 capture = self.pg0.get_capture(1)
869 ih = self.get_ike_header(capture[0])
870 self.assertNotIn('Response', ih.flags)
871 self.assertIn('Initiator', ih.flags)
872 self.assertEqual(ih.init_SPI, self.sa.ispi)
873 self.assertEqual(ih.resp_SPI, self.sa.rspi)
874 plain = self.sa.hmac_and_decrypt(ih)
875 d = ikev2.IKEv2_payload_Delete(plain)
876 self.assertEqual(d.proto, 1) # proto=IKEv2
877 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
878 flags='Response', exch_type='INFORMATIONAL',
879 id=ih.id, next_payload='Encrypted')
880 resp = self.encrypt_ike_msg(header, b'', None)
881 self.send_and_assert_no_replies(self.pg0, resp)
883 def verify_del_sa(self, packet):
884 ih = self.get_ike_header(packet)
885 self.assertEqual(ih.id, self.sa.msg_id)
886 self.assertEqual(ih.exch_type, 37) # exchange informational
887 self.assertIn('Response', ih.flags)
888 self.assertIn('Initiator', ih.flags)
889 plain = self.sa.hmac_and_decrypt(ih)
890 self.assertEqual(plain, b'')
892 def initiate_del_sa_from_responder(self):
893 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
894 exch_type='INFORMATIONAL',
895 id=self.sa.new_msg_id())
896 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
897 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
898 packet = self.create_packet(self.pg0, ike_msg,
899 self.sa.sport, self.sa.dport,
900 self.sa.natt, self.ip6)
901 self.pg0.add_stream(packet)
902 self.pg0.enable_capture()
904 capture = self.pg0.get_capture(1)
905 self.verify_del_sa(capture[0])
908 def find_notify_payload(packet, notify_type):
909 n = packet[ikev2.IKEv2_payload_Notify]
911 if n.type == notify_type:
916 def verify_nat_detection(self, packet):
923 # NAT_DETECTION_SOURCE_IP
924 s = self.find_notify_payload(packet, 16388)
925 self.assertIsNotNone(s)
926 src_sha = self.sa.compute_nat_sha1(
927 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
928 self.assertEqual(s.load, src_sha)
930 # NAT_DETECTION_DESTINATION_IP
931 s = self.find_notify_payload(packet, 16389)
932 self.assertIsNotNone(s)
933 dst_sha = self.sa.compute_nat_sha1(
934 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
935 self.assertEqual(s.load, dst_sha)
937 def verify_sa_init_request(self, packet):
939 self.sa.dport = udp.sport
940 ih = packet[ikev2.IKEv2]
941 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
942 self.assertEqual(ih.exch_type, 34) # SA_INIT
943 self.sa.ispi = ih.init_SPI
944 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
945 self.assertIn('Initiator', ih.flags)
946 self.assertNotIn('Response', ih.flags)
947 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
948 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
950 prop = packet[ikev2.IKEv2_payload_Proposal]
951 self.assertEqual(prop.proto, 1) # proto = ikev2
952 self.assertEqual(prop.proposal, 1)
953 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
954 self.assertEqual(prop.trans[0].transform_id,
955 self.p.ike_transforms['crypto_alg'])
956 self.assertEqual(prop.trans[1].transform_type, 2) # prf
957 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
958 self.assertEqual(prop.trans[2].transform_type, 4) # dh
959 self.assertEqual(prop.trans[2].transform_id,
960 self.p.ike_transforms['dh_group'])
962 self.verify_nat_detection(packet)
963 self.sa.set_ike_props(
964 crypto='AES-GCM-16ICV', crypto_key_len=32,
965 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
966 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
967 integ='SHA2-256-128')
968 self.sa.generate_dh_data()
969 self.sa.complete_dh_data()
972 def update_esp_transforms(self, trans, sa):
974 if trans.transform_type == 1: # ecryption
975 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
976 elif trans.transform_type == 3: # integrity
977 sa.esp_integ = INTEG_IDS[trans.transform_id]
978 trans = trans.payload
980 def verify_sa_auth_req(self, packet):
982 self.sa.dport = udp.sport
983 ih = self.get_ike_header(packet)
984 self.assertEqual(ih.resp_SPI, self.sa.rspi)
985 self.assertEqual(ih.init_SPI, self.sa.ispi)
986 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
987 self.assertIn('Initiator', ih.flags)
988 self.assertNotIn('Response', ih.flags)
992 self.assertEqual(ih.id, self.sa.msg_id + 1)
994 plain = self.sa.hmac_and_decrypt(ih)
995 idi = ikev2.IKEv2_payload_IDi(plain)
996 idr = ikev2.IKEv2_payload_IDr(idi.payload)
997 self.assertEqual(idi.load, self.sa.i_id)
998 self.assertEqual(idr.load, self.sa.r_id)
999 prop = idi[ikev2.IKEv2_payload_Proposal]
1000 c = self.sa.child_sas[0]
1002 self.update_esp_transforms(
1003 prop[ikev2.IKEv2_payload_Transform], self.sa)
1005 def send_init_response(self):
1006 tr_attr = self.sa.ike_crypto_attr()
1007 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1008 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1009 key_length=tr_attr[0]) /
1010 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1011 transform_id=self.sa.ike_integ) /
1012 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1013 transform_id=self.sa.ike_prf_alg.name) /
1014 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1015 transform_id=self.sa.ike_dh))
1016 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1017 trans_nb=4, trans=trans))
1019 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1021 dst_address = b'\x0a\x0a\x0a\x0a'
1023 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1024 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1025 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1027 self.sa.init_resp_packet = (
1028 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1029 exch_type='IKE_SA_INIT', flags='Response') /
1030 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1031 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1032 group=self.sa.ike_dh,
1033 load=self.sa.my_dh_pub_key) /
1034 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1035 next_payload='Notify') /
1036 ikev2.IKEv2_payload_Notify(
1037 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1038 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1039 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1041 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1042 self.sa.sport, self.sa.dport,
1044 self.pg_send(self.pg0, ike_msg)
1045 capture = self.pg0.get_capture(1)
1046 self.verify_sa_auth_req(capture[0])
1048 def initiate_sa_init(self):
1049 self.pg0.enable_capture()
1051 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1053 capture = self.pg0.get_capture(1)
1054 self.verify_sa_init_request(capture[0])
1055 self.send_init_response()
1057 def send_auth_response(self):
1058 tr_attr = self.sa.esp_crypto_attr()
1059 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1060 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1061 key_length=tr_attr[0]) /
1062 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1063 transform_id=self.sa.esp_integ) /
1064 ikev2.IKEv2_payload_Transform(
1065 transform_type='Extended Sequence Number',
1066 transform_id='No ESN') /
1067 ikev2.IKEv2_payload_Transform(
1068 transform_type='Extended Sequence Number',
1069 transform_id='ESN'))
1071 c = self.sa.child_sas[0]
1072 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1073 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1075 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1076 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1077 IDtype=self.sa.id_type, load=self.sa.i_id) /
1078 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1079 IDtype=self.sa.id_type, load=self.sa.r_id) /
1080 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1081 auth_type=AuthMethod.value(self.sa.auth_method),
1082 load=self.sa.auth_data) /
1083 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1084 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1085 number_of_TSs=len(tsi),
1086 traffic_selector=tsi) /
1087 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1088 number_of_TSs=len(tsr),
1089 traffic_selector=tsr) /
1090 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1092 header = ikev2.IKEv2(
1093 init_SPI=self.sa.ispi,
1094 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1095 flags='Response', exch_type='IKE_AUTH')
1097 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1098 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1099 self.sa.dport, self.sa.natt, self.ip6)
1100 self.pg_send(self.pg0, packet)
1102 def test_initiator(self):
1103 self.initiate_sa_init()
1105 self.sa.calc_child_keys()
1106 self.send_auth_response()
1107 self.verify_ike_sas()
1110 class TemplateResponder(IkePeer):
1111 """ responder test template """
1113 def initiate_del_sa_from_responder(self):
1114 self.pg0.enable_capture()
1116 self.vapi.ikev2_initiate_del_ike_sa(
1117 ispi=int.from_bytes(self.sa.ispi, 'little'))
1118 capture = self.pg0.get_capture(1)
1119 ih = self.get_ike_header(capture[0])
1120 self.assertNotIn('Response', ih.flags)
1121 self.assertNotIn('Initiator', ih.flags)
1122 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1123 plain = self.sa.hmac_and_decrypt(ih)
1124 d = ikev2.IKEv2_payload_Delete(plain)
1125 self.assertEqual(d.proto, 1) # proto=IKEv2
1126 self.assertEqual(ih.init_SPI, self.sa.ispi)
1127 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1128 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1129 flags='Initiator+Response',
1130 exch_type='INFORMATIONAL',
1131 id=ih.id, next_payload='Encrypted')
1132 resp = self.encrypt_ike_msg(header, b'', None)
1133 self.send_and_assert_no_replies(self.pg0, resp)
1135 def verify_del_sa(self, packet):
1136 ih = self.get_ike_header(packet)
1137 self.assertEqual(ih.id, self.sa.msg_id)
1138 self.assertEqual(ih.exch_type, 37) # exchange informational
1139 self.assertIn('Response', ih.flags)
1140 self.assertNotIn('Initiator', ih.flags)
1141 self.assertEqual(ih.next_payload, 46) # Encrypted
1142 self.assertEqual(ih.init_SPI, self.sa.ispi)
1143 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1144 plain = self.sa.hmac_and_decrypt(ih)
1145 self.assertEqual(plain, b'')
1147 def initiate_del_sa_from_initiator(self):
1148 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1149 flags='Initiator', exch_type='INFORMATIONAL',
1150 id=self.sa.new_msg_id())
1151 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1152 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1153 packet = self.create_packet(self.pg0, ike_msg,
1154 self.sa.sport, self.sa.dport,
1155 self.sa.natt, self.ip6)
1156 self.pg0.add_stream(packet)
1157 self.pg0.enable_capture()
1159 capture = self.pg0.get_capture(1)
1160 self.verify_del_sa(capture[0])
1162 def send_sa_init_req(self, behind_nat=False):
1163 tr_attr = self.sa.ike_crypto_attr()
1164 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1165 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1166 key_length=tr_attr[0]) /
1167 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1168 transform_id=self.sa.ike_integ) /
1169 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1170 transform_id=self.sa.ike_prf_alg.name) /
1171 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1172 transform_id=self.sa.ike_dh))
1174 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1175 trans_nb=4, trans=trans))
1177 self.sa.init_req_packet = (
1178 ikev2.IKEv2(init_SPI=self.sa.ispi,
1179 flags='Initiator', exch_type='IKE_SA_INIT') /
1180 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1181 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1182 group=self.sa.ike_dh,
1183 load=self.sa.my_dh_pub_key) /
1184 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1185 load=self.sa.i_nonce))
1188 src_address = b'\x0a\x0a\x0a\x01'
1190 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1192 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1193 dst_nat = self.sa.compute_nat_sha1(
1194 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1196 nat_src_detection = ikev2.IKEv2_payload_Notify(
1197 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1198 next_payload='Notify')
1199 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1200 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1201 self.sa.init_req_packet = (self.sa.init_req_packet /
1205 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1206 self.sa.sport, self.sa.dport,
1207 self.sa.natt, self.ip6)
1208 self.pg0.add_stream(ike_msg)
1209 self.pg0.enable_capture()
1211 capture = self.pg0.get_capture(1)
1212 self.verify_sa_init(capture[0])
1214 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1215 tr_attr = self.sa.esp_crypto_attr()
1216 last_payload = last_payload or 'Notify'
1217 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1218 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1219 key_length=tr_attr[0]) /
1220 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1221 transform_id=self.sa.esp_integ) /
1222 ikev2.IKEv2_payload_Transform(
1223 transform_type='Extended Sequence Number',
1224 transform_id='No ESN') /
1225 ikev2.IKEv2_payload_Transform(
1226 transform_type='Extended Sequence Number',
1227 transform_id='ESN'))
1229 c = self.sa.child_sas[0]
1230 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1231 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1233 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1234 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1235 auth_type=AuthMethod.value(self.sa.auth_method),
1236 load=self.sa.auth_data) /
1237 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1238 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1239 number_of_TSs=len(tsi), traffic_selector=tsi) /
1240 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1241 number_of_TSs=len(tsr), traffic_selector=tsr))
1244 first_payload = 'Nonce'
1245 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1246 next_payload='SA') / plain /
1247 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1248 proto='ESP', SPI=c.ispi))
1250 first_payload = 'IDi'
1251 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1252 IDtype=self.sa.id_type, load=self.sa.i_id) /
1253 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1254 IDtype=self.sa.id_type, load=self.sa.r_id))
1256 return plain, first_payload
1258 def send_sa_auth(self):
1259 plain, first_payload = self.generate_auth_payload(
1260 last_payload='Notify')
1261 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1262 header = ikev2.IKEv2(
1263 init_SPI=self.sa.ispi,
1264 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1265 flags='Initiator', exch_type='IKE_AUTH')
1267 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1268 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1269 self.sa.dport, self.sa.natt, self.ip6)
1270 self.pg0.add_stream(packet)
1271 self.pg0.enable_capture()
1273 capture = self.pg0.get_capture(1)
1274 self.verify_sa_auth_resp(capture[0])
1276 def verify_sa_init(self, packet):
1277 ih = self.get_ike_header(packet)
1279 self.assertEqual(ih.id, self.sa.msg_id)
1280 self.assertEqual(ih.exch_type, 34)
1281 self.assertIn('Response', ih.flags)
1282 self.assertEqual(ih.init_SPI, self.sa.ispi)
1283 self.assertNotEqual(ih.resp_SPI, 0)
1284 self.sa.rspi = ih.resp_SPI
1286 sa = ih[ikev2.IKEv2_payload_SA]
1287 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1288 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1289 except IndexError as e:
1290 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1291 self.logger.error(ih.show())
1293 self.sa.complete_dh_data()
1297 def verify_sa_auth_resp(self, packet):
1298 ike = self.get_ike_header(packet)
1300 self.verify_udp(udp)
1301 self.assertEqual(ike.id, self.sa.msg_id)
1302 plain = self.sa.hmac_and_decrypt(ike)
1303 idr = ikev2.IKEv2_payload_IDr(plain)
1304 prop = idr[ikev2.IKEv2_payload_Proposal]
1305 self.assertEqual(prop.SPIsize, 4)
1306 self.sa.child_sas[0].rspi = prop.SPI
1307 self.sa.calc_child_keys()
1309 def test_responder(self):
1310 self.send_sa_init_req(self.sa.natt)
1312 self.verify_ipsec_sas()
1313 self.verify_ike_sas()
1316 class Ikev2Params(object):
1317 def config_params(self, params={}):
1318 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1319 ei = VppEnum.vl_api_ipsec_integ_alg_t
1321 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1322 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1323 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1324 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1325 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1326 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1328 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1329 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1330 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1331 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1333 dpd_disabled = True if 'dpd_disabled' not in params else\
1334 params['dpd_disabled']
1336 self.vapi.cli('ikev2 dpd disable')
1337 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1338 not in params else params['del_sa_from_responder']
1339 is_natt = 'natt' in params and params['natt'] or False
1340 self.p = Profile(self, 'pr1')
1341 self.ip6 = False if 'ip6' not in params else params['ip6']
1343 if 'auth' in params and params['auth'] == 'rsa-sig':
1344 auth_method = 'rsa-sig'
1345 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1346 self.vapi.ikev2_set_local_key(
1347 key_file=work_dir + params['server-key'])
1349 client_file = work_dir + params['client-cert']
1350 server_pem = open(work_dir + params['server-cert']).read()
1351 client_priv = open(work_dir + params['client-key']).read()
1352 client_priv = load_pem_private_key(str.encode(client_priv), None,
1354 self.peer_cert = x509.load_pem_x509_certificate(
1355 str.encode(server_pem),
1357 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1360 auth_data = b'$3cr3tpa$$w0rd'
1361 self.p.add_auth(method='shared-key', data=auth_data)
1362 auth_method = 'shared-key'
1365 is_init = True if 'is_initiator' not in params else\
1366 params['is_initiator']
1368 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1369 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1371 self.p.add_local_id(**idr)
1372 self.p.add_remote_id(**idi)
1374 self.p.add_local_id(**idi)
1375 self.p.add_remote_id(**idr)
1377 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1378 'loc_ts' not in params else params['loc_ts']
1379 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1380 'rem_ts' not in params else params['rem_ts']
1381 self.p.add_local_ts(**loc_ts)
1382 self.p.add_remote_ts(**rem_ts)
1383 if 'responder' in params:
1384 self.p.add_responder(params['responder'])
1385 if 'ike_transforms' in params:
1386 self.p.add_ike_transforms(params['ike_transforms'])
1387 if 'esp_transforms' in params:
1388 self.p.add_esp_transforms(params['esp_transforms'])
1390 udp_encap = False if 'udp_encap' not in params else\
1393 self.p.set_udp_encap(True)
1395 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1396 is_initiator=is_init,
1397 id_type=self.p.local_id['id_type'], natt=is_natt,
1398 priv_key=client_priv, auth_method=auth_method,
1399 auth_data=auth_data, udp_encap=udp_encap,
1400 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1402 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1403 params['ike-crypto']
1404 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1406 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1409 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1410 params['esp-crypto']
1411 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1414 self.sa.set_ike_props(
1415 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1416 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1417 self.sa.set_esp_props(
1418 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1422 class TestApi(VppTestCase):
1423 """ Test IKEV2 API """
1425 def setUpClass(cls):
1426 super(TestApi, cls).setUpClass()
1429 def tearDownClass(cls):
1430 super(TestApi, cls).tearDownClass()
1433 super(TestApi, self).tearDown()
1434 self.p1.remove_vpp_config()
1435 self.p2.remove_vpp_config()
1436 r = self.vapi.ikev2_profile_dump()
1437 self.assertEqual(len(r), 0)
1439 def configure_profile(self, cfg):
1440 p = Profile(self, cfg['name'])
1441 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1442 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1443 p.add_local_ts(**cfg['loc_ts'])
1444 p.add_remote_ts(**cfg['rem_ts'])
1445 p.add_responder(cfg['responder'])
1446 p.add_ike_transforms(cfg['ike_ts'])
1447 p.add_esp_transforms(cfg['esp_ts'])
1448 p.add_auth(**cfg['auth'])
1449 p.set_udp_encap(cfg['udp_encap'])
1450 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1451 if 'lifetime_data' in cfg:
1452 p.set_lifetime_data(cfg['lifetime_data'])
1453 if 'tun_itf' in cfg:
1454 p.set_tunnel_interface(cfg['tun_itf'])
1455 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1460 def test_profile_api(self):
1461 """ test profile dump API """
1466 'start_addr': '3.3.3.2',
1467 'end_addr': '3.3.3.3',
1473 'start_addr': '4.5.76.80',
1474 'end_addr': '2.3.4.6',
1481 'start_addr': 'ab::1',
1482 'end_addr': 'ab::4',
1488 'start_addr': 'cd::12',
1489 'end_addr': 'cd::13',
1495 'natt_disabled': True,
1496 'loc_id': ('fqdn', b'vpp.home'),
1497 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1500 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1503 'crypto_key_size': 32,
1508 'crypto_key_size': 24,
1510 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1512 'ipsec_over_udp_port': 4501,
1515 'lifetime_maxdata': 20192,
1516 'lifetime_jitter': 9,
1521 'loc_id': ('ip4-addr', b'192.168.2.1'),
1522 'rem_id': ('ip6-addr', b'abcd::1'),
1525 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1528 'crypto_key_size': 16,
1533 'crypto_key_size': 24,
1535 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1537 'ipsec_over_udp_port': 4600,
1540 self.p1 = self.configure_profile(conf['p1'])
1541 self.p2 = self.configure_profile(conf['p2'])
1543 r = self.vapi.ikev2_profile_dump()
1544 self.assertEqual(len(r), 2)
1545 self.verify_profile(r[0].profile, conf['p1'])
1546 self.verify_profile(r[1].profile, conf['p2'])
1548 def verify_id(self, api_id, cfg_id):
1549 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1550 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1552 def verify_ts(self, api_ts, cfg_ts):
1553 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1554 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1555 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1556 self.assertEqual(api_ts.start_addr,
1557 ip_address(text_type(cfg_ts['start_addr'])))
1558 self.assertEqual(api_ts.end_addr,
1559 ip_address(text_type(cfg_ts['end_addr'])))
1561 def verify_responder(self, api_r, cfg_r):
1562 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1563 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1565 def verify_transforms(self, api_ts, cfg_ts):
1566 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1567 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1568 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1570 def verify_ike_transforms(self, api_ts, cfg_ts):
1571 self.verify_transforms(api_ts, cfg_ts)
1572 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1574 def verify_esp_transforms(self, api_ts, cfg_ts):
1575 self.verify_transforms(api_ts, cfg_ts)
1577 def verify_auth(self, api_auth, cfg_auth):
1578 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1579 self.assertEqual(api_auth.data, cfg_auth['data'])
1580 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1582 def verify_lifetime_data(self, p, ld):
1583 self.assertEqual(p.lifetime, ld['lifetime'])
1584 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1585 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1586 self.assertEqual(p.handover, ld['handover'])
1588 def verify_profile(self, ap, cp):
1589 self.assertEqual(ap.name, cp['name'])
1590 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1591 self.verify_id(ap.loc_id, cp['loc_id'])
1592 self.verify_id(ap.rem_id, cp['rem_id'])
1593 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1594 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1595 self.verify_responder(ap.responder, cp['responder'])
1596 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1597 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1598 self.verify_auth(ap.auth, cp['auth'])
1599 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1600 self.assertTrue(natt_dis == ap.natt_disabled)
1602 if 'lifetime_data' in cp:
1603 self.verify_lifetime_data(ap, cp['lifetime_data'])
1604 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1606 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1608 self.assertEqual(ap.tun_itf, 0xffffffff)
1611 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1612 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1614 def config_tc(self):
1615 self.config_params({
1617 'is_initiator': False, # seen from test case perspective
1618 # thus vpp is initiator
1619 'responder': {'sw_if_index': self.pg0.sw_if_index,
1620 'addr': self.pg0.remote_ip4},
1621 'ike-crypto': ('AES-GCM-16ICV', 32),
1622 'ike-integ': 'NULL',
1623 'ike-dh': '3072MODPgr',
1625 'crypto_alg': 20, # "aes-gcm-16"
1626 'crypto_key_size': 256,
1627 'dh_group': 15, # "modp-3072"
1630 'crypto_alg': 12, # "aes-cbc"
1631 'crypto_key_size': 256,
1632 # "hmac-sha2-256-128"
1636 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1637 """ test ikev2 initiator - pre shared key auth """
1639 def config_tc(self):
1640 self.config_params({
1641 'is_initiator': False, # seen from test case perspective
1642 # thus vpp is initiator
1643 'responder': {'sw_if_index': self.pg0.sw_if_index,
1644 'addr': self.pg0.remote_ip4},
1645 'ike-crypto': ('AES-GCM-16ICV', 32),
1646 'ike-integ': 'NULL',
1647 'ike-dh': '3072MODPgr',
1649 'crypto_alg': 20, # "aes-gcm-16"
1650 'crypto_key_size': 256,
1651 'dh_group': 15, # "modp-3072"
1654 'crypto_alg': 12, # "aes-cbc"
1655 'crypto_key_size': 256,
1656 # "hmac-sha2-256-128"
1660 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1661 """ test initiator - request window size (1) """
1663 def rekey_respond(self, req, update_child_sa_data):
1664 ih = self.get_ike_header(req)
1665 plain = self.sa.hmac_and_decrypt(ih)
1666 sa = ikev2.IKEv2_payload_SA(plain)
1667 if update_child_sa_data:
1668 prop = sa[ikev2.IKEv2_payload_Proposal]
1669 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1670 self.sa.r_nonce = self.sa.i_nonce
1671 self.sa.child_sas[0].ispi = prop.SPI
1672 self.sa.child_sas[0].rspi = prop.SPI
1673 self.sa.calc_child_keys()
1675 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1676 flags='Response', exch_type=36,
1677 id=ih.id, next_payload='Encrypted')
1678 resp = self.encrypt_ike_msg(header, sa, 'SA')
1679 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1680 self.sa.dport, self.sa.natt, self.ip6)
1681 self.send_and_assert_no_replies(self.pg0, packet)
1683 def test_initiator(self):
1684 super(TestInitiatorRequestWindowSize, self).test_initiator()
1685 self.pg0.enable_capture()
1687 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1688 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1689 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1690 capture = self.pg0.get_capture(2)
1692 # reply in reverse order
1693 self.rekey_respond(capture[1], True)
1694 self.rekey_respond(capture[0], False)
1696 # verify that only the second request was accepted
1697 self.verify_ike_sas()
1698 self.verify_ipsec_sas(is_rekey=True)
1701 class TestInitiatorRekey(TestInitiatorPsk):
1702 """ test ikev2 initiator - rekey """
1704 def rekey_from_initiator(self):
1705 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1706 self.pg0.enable_capture()
1708 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1709 capture = self.pg0.get_capture(1)
1710 ih = self.get_ike_header(capture[0])
1711 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1712 self.assertNotIn('Response', ih.flags)
1713 self.assertIn('Initiator', ih.flags)
1714 plain = self.sa.hmac_and_decrypt(ih)
1715 sa = ikev2.IKEv2_payload_SA(plain)
1716 prop = sa[ikev2.IKEv2_payload_Proposal]
1717 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1718 self.sa.r_nonce = self.sa.i_nonce
1719 # update new responder SPI
1720 self.sa.child_sas[0].ispi = prop.SPI
1721 self.sa.child_sas[0].rspi = prop.SPI
1722 self.sa.calc_child_keys()
1723 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1724 flags='Response', exch_type=36,
1725 id=ih.id, next_payload='Encrypted')
1726 resp = self.encrypt_ike_msg(header, sa, 'SA')
1727 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1728 self.sa.dport, self.sa.natt, self.ip6)
1729 self.send_and_assert_no_replies(self.pg0, packet)
1731 def test_initiator(self):
1732 super(TestInitiatorRekey, self).test_initiator()
1733 self.rekey_from_initiator()
1734 self.verify_ike_sas()
1735 self.verify_ipsec_sas(is_rekey=True)
1738 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1739 """ test ikev2 initiator - delete IKE SA from responder """
1741 def config_tc(self):
1742 self.config_params({
1743 'del_sa_from_responder': True,
1744 'is_initiator': False, # seen from test case perspective
1745 # thus vpp is initiator
1746 'responder': {'sw_if_index': self.pg0.sw_if_index,
1747 'addr': self.pg0.remote_ip4},
1748 'ike-crypto': ('AES-GCM-16ICV', 32),
1749 'ike-integ': 'NULL',
1750 'ike-dh': '3072MODPgr',
1752 'crypto_alg': 20, # "aes-gcm-16"
1753 'crypto_key_size': 256,
1754 'dh_group': 15, # "modp-3072"
1757 'crypto_alg': 12, # "aes-cbc"
1758 'crypto_key_size': 256,
1759 # "hmac-sha2-256-128"
1763 class TestResponderNATT(TemplateResponder, Ikev2Params):
1764 """ test ikev2 responder - nat traversal """
1765 def config_tc(self):
1770 class TestResponderPsk(TemplateResponder, Ikev2Params):
1771 """ test ikev2 responder - pre shared key auth """
1772 def config_tc(self):
1773 self.config_params()
1776 class TestResponderDpd(TestResponderPsk):
1778 Dead peer detection test
1780 def config_tc(self):
1781 self.config_params({'dpd_disabled': False})
1786 def test_responder(self):
1787 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1788 super(TestResponderDpd, self).test_responder()
1789 self.pg0.enable_capture()
1791 # capture empty request but don't reply
1792 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1793 ih = self.get_ike_header(capture[0])
1794 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1795 plain = self.sa.hmac_and_decrypt(ih)
1796 self.assertEqual(plain, b'')
1797 # wait for SA expiration
1799 ike_sas = self.vapi.ikev2_sa_dump()
1800 self.assertEqual(len(ike_sas), 0)
1801 ipsec_sas = self.vapi.ipsec_sa_dump()
1802 self.assertEqual(len(ipsec_sas), 0)
1805 class TestResponderRekey(TestResponderPsk):
1806 """ test ikev2 responder - rekey """
1808 def rekey_from_initiator(self):
1809 packet = self.create_rekey_request()
1810 self.pg0.add_stream(packet)
1811 self.pg0.enable_capture()
1813 capture = self.pg0.get_capture(1)
1814 ih = self.get_ike_header(capture[0])
1815 plain = self.sa.hmac_and_decrypt(ih)
1816 sa = ikev2.IKEv2_payload_SA(plain)
1817 prop = sa[ikev2.IKEv2_payload_Proposal]
1818 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1819 # update new responder SPI
1820 self.sa.child_sas[0].rspi = prop.SPI
1822 def test_responder(self):
1823 super(TestResponderRekey, self).test_responder()
1824 self.rekey_from_initiator()
1825 self.sa.calc_child_keys()
1826 self.verify_ike_sas()
1827 self.verify_ipsec_sas(is_rekey=True)
1830 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1831 """ test ikev2 responder - cert based auth """
1832 def config_tc(self):
1833 self.config_params({
1836 'server-key': 'server-key.pem',
1837 'client-key': 'client-key.pem',
1838 'client-cert': 'client-cert.pem',
1839 'server-cert': 'server-cert.pem'})
1842 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1843 (TemplateResponder, Ikev2Params):
1845 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1847 def config_tc(self):
1848 self.config_params({
1849 'ike-crypto': ('AES-CBC', 16),
1850 'ike-integ': 'SHA2-256-128',
1851 'esp-crypto': ('AES-CBC', 24),
1852 'esp-integ': 'SHA2-384-192',
1853 'ike-dh': '2048MODPgr'})
1856 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1857 (TemplateResponder, Ikev2Params):
1859 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1861 def config_tc(self):
1862 self.config_params({
1863 'ike-crypto': ('AES-CBC', 32),
1864 'ike-integ': 'SHA2-256-128',
1865 'esp-crypto': ('AES-GCM-16ICV', 32),
1866 'esp-integ': 'NULL',
1867 'ike-dh': '3072MODPgr'})
1870 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1874 def config_tc(self):
1875 self.config_params({
1876 'del_sa_from_responder': True,
1879 'ike-crypto': ('AES-GCM-16ICV', 32),
1880 'ike-integ': 'NULL',
1881 'ike-dh': '2048MODPgr',
1882 'loc_ts': {'start_addr': 'ab:cd::0',
1883 'end_addr': 'ab:cd::10'},
1884 'rem_ts': {'start_addr': '11::0',
1885 'end_addr': '11::100'}})
1888 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1890 Test for keep alive messages
1893 def send_empty_req_from_responder(self):
1894 packet = self.create_empty_request()
1895 self.pg0.add_stream(packet)
1896 self.pg0.enable_capture()
1898 capture = self.pg0.get_capture(1)
1899 ih = self.get_ike_header(capture[0])
1900 self.assertEqual(ih.id, self.sa.msg_id)
1901 plain = self.sa.hmac_and_decrypt(ih)
1902 self.assertEqual(plain, b'')
1904 def test_initiator(self):
1905 super(TestInitiatorKeepaliveMsg, self).test_initiator()
1906 self.send_empty_req_from_responder()
1909 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1910 """ malformed packet test """
1915 def config_tc(self):
1916 self.config_params()
1918 def assert_counter(self, count, name, version='ip4'):
1919 node_name = '/err/ikev2-%s/' % version + name
1920 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1922 def create_ike_init_msg(self, length=None, payload=None):
1923 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1924 flags='Initiator', exch_type='IKE_SA_INIT')
1925 if payload is not None:
1927 return self.create_packet(self.pg0, msg, self.sa.sport,
1930 def verify_bad_packet_length(self):
1931 ike_msg = self.create_ike_init_msg(length=0xdead)
1932 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1933 self.assert_counter(self.pkt_count, 'Bad packet length')
1935 def verify_bad_sa_payload_length(self):
1936 p = ikev2.IKEv2_payload_SA(length=0xdead)
1937 ike_msg = self.create_ike_init_msg(payload=p)
1938 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1939 self.assert_counter(self.pkt_count, 'Malformed packet')
1941 def test_responder(self):
1942 self.pkt_count = 254
1943 self.verify_bad_packet_length()
1944 self.verify_bad_sa_payload_length()
1947 if __name__ == '__main__':
1948 unittest.main(testRunner=VppTestRunner)