3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
30 KEY_PAD = b"Key Pad for IKEv2"
37 # tuple structure is (p, g, key_len)
39 '2048MODPgr': (long_converter("""
40 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52 '3072MODPgr': (long_converter("""
53 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
72 class CryptoAlgo(object):
73 def __init__(self, name, cipher, mode):
77 if self.cipher is not None:
78 self.bs = self.cipher.block_size // 8
80 if self.name == 'AES-GCM-16ICV':
81 self.iv_len = GCM_IV_SIZE
85 def encrypt(self, data, key, aad=None):
86 iv = os.urandom(self.iv_len)
88 encryptor = Cipher(self.cipher(key), self.mode(iv),
89 default_backend()).encryptor()
90 return iv + encryptor.update(data) + encryptor.finalize()
92 salt = key[-SALT_SIZE:]
94 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95 default_backend()).encryptor()
96 encryptor.authenticate_additional_data(aad)
97 data = encryptor.update(data) + encryptor.finalize()
98 data += encryptor.tag[:GCM_ICV_SIZE]
101 def decrypt(self, data, key, aad=None, icv=None):
103 iv = data[:self.iv_len]
104 ct = data[self.iv_len:]
105 decryptor = Cipher(algorithms.AES(key),
107 default_backend()).decryptor()
108 return decryptor.update(ct) + decryptor.finalize()
110 salt = key[-SALT_SIZE:]
111 nonce = salt + data[:GCM_IV_SIZE]
112 ct = data[GCM_IV_SIZE:]
113 key = key[:-SALT_SIZE]
114 decryptor = Cipher(algorithms.AES(key),
115 self.mode(nonce, icv, len(icv)),
116 default_backend()).decryptor()
117 decryptor.authenticate_additional_data(aad)
118 return decryptor.update(ct) + decryptor.finalize()
121 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122 data = data + b'\x00' * (pad_len - 1)
123 return data + bytes([pad_len - 1])
126 class AuthAlgo(object):
127 def __init__(self, name, mac, mod, key_len, trunc_len=None):
131 self.key_len = key_len
132 self.trunc_len = trunc_len or key_len
136 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
143 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
151 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
169 class IKEv2ChildSA(object):
170 def __init__(self, local_ts, remote_ts, is_initiator):
178 self.local_ts = local_ts
179 self.remote_ts = remote_ts
182 class IKEv2SA(object):
183 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186 auth_method='shared-key', priv_key=None, i_natt=False,
187 r_natt=False, udp_encap=False):
188 self.udp_encap = udp_encap
198 self.dh_params = None
200 self.priv_key = priv_key
201 self.is_initiator = is_initiator
202 nonce = nonce or os.urandom(32)
203 self.auth_data = auth_data
206 if isinstance(id_type, str):
207 self.id_type = IDType.value(id_type)
209 self.id_type = id_type
210 self.auth_method = auth_method
211 if self.is_initiator:
212 self.rspi = 8 * b'\x00'
217 self.ispi = 8 * b'\x00'
219 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
222 def new_msg_id(self):
227 def my_dh_pub_key(self):
228 if self.is_initiator:
229 return self.i_dh_data
230 return self.r_dh_data
233 def peer_dh_pub_key(self):
234 if self.is_initiator:
235 return self.r_dh_data
236 return self.i_dh_data
240 return self.i_natt or self.r_natt
242 def compute_secret(self):
243 priv = self.dh_private_key
244 peer = self.peer_dh_pub_key
245 p, g, l = self.ike_group
246 return pow(int.from_bytes(peer, 'big'),
247 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249 def generate_dh_data(self):
251 if self.ike_dh not in DH:
252 raise NotImplementedError('%s not in DH group' % self.ike_dh)
254 if self.dh_params is None:
255 dhg = DH[self.ike_dh]
256 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
257 self.dh_params = pn.parameters(default_backend())
259 priv = self.dh_params.generate_private_key()
260 pub = priv.public_key()
261 x = priv.private_numbers().x
262 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
263 y = pub.public_numbers().y
265 if self.is_initiator:
266 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
268 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
270 def complete_dh_data(self):
271 self.dh_shared_secret = self.compute_secret()
273 def calc_child_keys(self):
274 prf = self.ike_prf_alg.mod()
275 s = self.i_nonce + self.r_nonce
276 c = self.child_sas[0]
278 encr_key_len = self.esp_crypto_key_len
279 integ_key_len = self.esp_integ_alg.key_len
280 salt_len = 0 if integ_key_len else 4
282 l = (integ_key_len * 2 +
285 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
288 c.sk_ei = keymat[pos:pos+encr_key_len]
292 c.sk_ai = keymat[pos:pos+integ_key_len]
295 c.salt_ei = keymat[pos:pos+salt_len]
298 c.sk_er = keymat[pos:pos+encr_key_len]
302 c.sk_ar = keymat[pos:pos+integ_key_len]
305 c.salt_er = keymat[pos:pos+salt_len]
308 def calc_prfplus(self, prf, key, seed, length):
312 while len(r) < length and x < 255:
317 s = s + seed + bytes([x])
318 t = self.calc_prf(prf, key, s)
326 def calc_prf(self, prf, key, data):
327 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
332 prf = self.ike_prf_alg.mod()
333 # SKEYSEED = prf(Ni | Nr, g^ir)
334 s = self.i_nonce + self.r_nonce
335 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337 # calculate S = Ni | Nr | SPIi SPIr
338 s = s + self.ispi + self.rspi
340 prf_key_trunc = self.ike_prf_alg.trunc_len
341 encr_key_len = self.ike_crypto_key_len
342 tr_prf_key_len = self.ike_prf_alg.key_len
343 integ_key_len = self.ike_integ_alg.key_len
344 if integ_key_len == 0:
354 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
357 self.sk_d = keymat[:pos+prf_key_trunc]
360 self.sk_ai = keymat[pos:pos+integ_key_len]
362 self.sk_ar = keymat[pos:pos+integ_key_len]
365 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
366 pos += encr_key_len + salt_size
367 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
368 pos += encr_key_len + salt_size
370 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
371 pos += tr_prf_key_len
372 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374 def generate_authmsg(self, prf, packet):
375 if self.is_initiator:
383 data = bytes([self.id_type, 0, 0, 0]) + id
384 id_hash = self.calc_prf(prf, key, data)
385 return packet + nonce + id_hash
388 prf = self.ike_prf_alg.mod()
389 if self.is_initiator:
390 packet = self.init_req_packet
392 packet = self.init_resp_packet
393 authmsg = self.generate_authmsg(prf, raw(packet))
394 if self.auth_method == 'shared-key':
395 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
396 self.auth_data = self.calc_prf(prf, psk, authmsg)
397 elif self.auth_method == 'rsa-sig':
398 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
401 raise TypeError('unknown auth method type!')
403 def encrypt(self, data, aad=None):
404 data = self.ike_crypto_alg.pad(data)
405 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
408 def peer_authkey(self):
409 if self.is_initiator:
414 def my_authkey(self):
415 if self.is_initiator:
420 def my_cryptokey(self):
421 if self.is_initiator:
426 def peer_cryptokey(self):
427 if self.is_initiator:
431 def concat(self, alg, key_len):
432 return alg + '-' + str(key_len * 8)
435 def vpp_ike_cypto_alg(self):
436 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
439 def vpp_esp_cypto_alg(self):
440 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442 def verify_hmac(self, ikemsg):
443 integ_trunc = self.ike_integ_alg.trunc_len
444 exp_hmac = ikemsg[-integ_trunc:]
445 data = ikemsg[:-integ_trunc]
446 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
447 self.peer_authkey, data)
448 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450 def compute_hmac(self, integ, key, data):
451 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
455 def decrypt(self, data, aad=None, icv=None):
456 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
458 def hmac_and_decrypt(self, ike):
459 ep = ike[ikev2.IKEv2_payload_Encrypted]
460 if self.ike_crypto == 'AES-GCM-16ICV':
461 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
462 ct = ep.load[:-GCM_ICV_SIZE]
463 tag = ep.load[-GCM_ICV_SIZE:]
464 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
466 self.verify_hmac(raw(ike))
467 integ_trunc = self.ike_integ_alg.trunc_len
469 # remove ICV and decrypt payload
470 ct = ep.load[:-integ_trunc]
471 plain = self.decrypt(ct)
474 return plain[:-pad_len - 1]
476 def build_ts_addr(self, ts, version):
477 return {'starting_address_v' + version: ts['start_addr'],
478 'ending_address_v' + version: ts['end_addr']}
480 def generate_ts(self, is_ip4):
481 c = self.child_sas[0]
482 ts_data = {'IP_protocol_ID': 0,
486 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
487 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
488 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
489 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
492 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
493 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
494 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
496 if self.is_initiator:
497 return ([ts1], [ts2])
498 return ([ts2], [ts1])
500 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
501 if crypto not in CRYPTO_ALGOS:
502 raise TypeError('unsupported encryption algo %r' % crypto)
503 self.ike_crypto = crypto
504 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
505 self.ike_crypto_key_len = crypto_key_len
507 if integ not in AUTH_ALGOS:
508 raise TypeError('unsupported auth algo %r' % integ)
509 self.ike_integ = None if integ == 'NULL' else integ
510 self.ike_integ_alg = AUTH_ALGOS[integ]
512 if prf not in PRF_ALGOS:
513 raise TypeError('unsupported prf algo %r' % prf)
515 self.ike_prf_alg = PRF_ALGOS[prf]
517 self.ike_group = DH[self.ike_dh]
519 def set_esp_props(self, crypto, crypto_key_len, integ):
520 self.esp_crypto_key_len = crypto_key_len
521 if crypto not in CRYPTO_ALGOS:
522 raise TypeError('unsupported encryption algo %r' % crypto)
523 self.esp_crypto = crypto
524 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526 if integ not in AUTH_ALGOS:
527 raise TypeError('unsupported auth algo %r' % integ)
528 self.esp_integ = None if integ == 'NULL' else integ
529 self.esp_integ_alg = AUTH_ALGOS[integ]
531 def crypto_attr(self, key_len):
532 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
533 return (0x800e << 16 | key_len << 3, 12)
535 raise Exception('unsupported attribute type')
537 def ike_crypto_attr(self):
538 return self.crypto_attr(self.ike_crypto_key_len)
540 def esp_crypto_attr(self):
541 return self.crypto_attr(self.esp_crypto_key_len)
543 def compute_nat_sha1(self, ip, port, rspi=None):
546 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
547 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549 return digest.finalize()
552 class IkePeer(VppTestCase):
553 """ common class for initiator and responder """
557 import scapy.contrib.ikev2 as _ikev2
558 globals()['ikev2'] = _ikev2
559 super(IkePeer, cls).setUpClass()
560 cls.create_pg_interfaces(range(2))
561 for i in cls.pg_interfaces:
569 def tearDownClass(cls):
570 super(IkePeer, cls).tearDownClass()
573 super(IkePeer, self).tearDown()
574 if self.del_sa_from_responder:
575 self.initiate_del_sa_from_responder()
577 self.initiate_del_sa_from_initiator()
578 r = self.vapi.ikev2_sa_dump()
579 self.assertEqual(len(r), 0)
580 sas = self.vapi.ipsec_sa_dump()
581 self.assertEqual(len(sas), 0)
582 self.p.remove_vpp_config()
583 self.assertIsNone(self.p.query_vpp_config())
586 super(IkePeer, self).setUp()
588 self.p.add_vpp_config()
589 self.assertIsNotNone(self.p.query_vpp_config())
590 if self.sa.is_initiator:
591 self.sa.generate_dh_data()
592 self.vapi.cli('ikev2 set logging level 4')
593 self.vapi.cli('event-lo clear')
595 def assert_counter(self, count, name, version='ip4'):
596 node_name = '/err/ikev2-%s/' % version + name
597 self.assertEqual(count, self.statistics.get_err_counter(node_name))
599 def create_rekey_request(self):
600 sa, first_payload = self.generate_auth_payload(is_rekey=True)
601 header = ikev2.IKEv2(
602 init_SPI=self.sa.ispi,
603 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
604 flags='Initiator', exch_type='CREATE_CHILD_SA')
606 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
607 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
608 self.sa.dport, self.sa.natt, self.ip6)
610 def create_empty_request(self):
611 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
612 id=self.sa.new_msg_id(), flags='Initiator',
613 exch_type='INFORMATIONAL',
614 next_payload='Encrypted')
616 msg = self.encrypt_ike_msg(header, b'', None)
617 return self.create_packet(self.pg0, msg, self.sa.sport,
618 self.sa.dport, self.sa.natt, self.ip6)
620 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
623 src_ip = src_if.remote_ip6
624 dst_ip = src_if.local_ip6
627 src_ip = src_if.remote_ip4
628 dst_ip = src_if.local_ip4
630 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
631 ip_layer(src=src_ip, dst=dst_ip) /
632 UDP(sport=sport, dport=dport))
634 # insert non ESP marker
635 res = res / Raw(b'\x00' * 4)
638 def verify_udp(self, udp):
639 self.assertEqual(udp.sport, self.sa.sport)
640 self.assertEqual(udp.dport, self.sa.dport)
642 def get_ike_header(self, packet):
644 ih = packet[ikev2.IKEv2]
645 ih = self.verify_and_remove_non_esp_marker(ih)
646 except IndexError as e:
647 # this is a workaround for getting IKEv2 layer as both ikev2 and
648 # ipsec register for port 4500
650 ih = self.verify_and_remove_non_esp_marker(esp)
651 self.assertEqual(ih.version, 0x20)
652 self.assertNotIn('Version', ih.flags)
655 def verify_and_remove_non_esp_marker(self, packet):
657 # if we are in nat traversal mode check for non esp marker
660 self.assertEqual(data[:4], b'\x00' * 4)
661 return ikev2.IKEv2(data[4:])
665 def encrypt_ike_msg(self, header, plain, first_payload):
666 if self.sa.ike_crypto == 'AES-GCM-16ICV':
667 data = self.sa.ike_crypto_alg.pad(raw(plain))
668 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
669 len(ikev2.IKEv2_payload_Encrypted())
670 tlen = plen + len(ikev2.IKEv2())
673 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
677 encr = self.sa.encrypt(raw(plain), raw(res))
678 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
679 length=plen, load=encr)
682 encr = self.sa.encrypt(raw(plain))
683 trunc_len = self.sa.ike_integ_alg.trunc_len
684 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
685 tlen = plen + len(ikev2.IKEv2())
687 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
688 length=plen, load=encr)
692 integ_data = raw(res)
693 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
694 self.sa.my_authkey, integ_data)
695 res = res / Raw(hmac_data[:trunc_len])
696 assert(len(res) == tlen)
699 def verify_udp_encap(self, ipsec_sa):
700 e = VppEnum.vl_api_ipsec_sad_flags_t
701 if self.sa.udp_encap or self.sa.natt:
702 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
704 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706 def verify_ipsec_sas(self, is_rekey=False):
707 sas = self.vapi.ipsec_sa_dump()
709 # after rekey there is a short period of time in which old
710 # inbound SA is still present
714 self.assertEqual(len(sas), sa_count)
715 if self.sa.is_initiator:
730 c = self.sa.child_sas[0]
732 self.verify_udp_encap(sa0)
733 self.verify_udp_encap(sa1)
734 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
735 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
736 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
738 if self.sa.esp_integ is None:
741 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
742 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
743 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
746 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
747 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
748 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
749 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
753 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
754 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
755 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
756 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
758 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
759 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
761 def verify_keymat(self, api_keys, keys, name):
762 km = getattr(keys, name)
763 api_km = getattr(api_keys, name)
764 api_km_len = getattr(api_keys, name + '_len')
765 self.assertEqual(len(km), api_km_len)
766 self.assertEqual(km, api_km[:api_km_len])
768 def verify_id(self, api_id, exp_id):
769 self.assertEqual(api_id.type, IDType.value(exp_id.type))
770 self.assertEqual(api_id.data_len, exp_id.data_len)
771 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
773 def verify_ike_sas(self):
774 r = self.vapi.ikev2_sa_dump()
775 self.assertEqual(len(r), 1)
777 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
778 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
780 if self.sa.is_initiator:
781 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
782 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
784 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
785 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
787 if self.sa.is_initiator:
788 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
789 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
791 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
792 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
793 self.verify_keymat(sa.keys, self.sa, 'sk_d')
794 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
795 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
797 self.verify_keymat(sa.keys, self.sa, 'sk_er')
798 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
799 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
801 self.assertEqual(sa.i_id.type, self.sa.id_type)
802 self.assertEqual(sa.r_id.type, self.sa.id_type)
803 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
804 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
805 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
806 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
808 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
809 self.assertEqual(len(r), 1)
811 self.assertEqual(csa.sa_index, sa.sa_index)
812 c = self.sa.child_sas[0]
813 if hasattr(c, 'sk_ai'):
814 self.verify_keymat(csa.keys, c, 'sk_ai')
815 self.verify_keymat(csa.keys, c, 'sk_ar')
816 self.verify_keymat(csa.keys, c, 'sk_ei')
817 self.verify_keymat(csa.keys, c, 'sk_er')
818 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
819 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
821 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
824 r = self.vapi.ikev2_traffic_selector_dump(
825 is_initiator=True, sa_index=sa.sa_index,
826 child_sa_index=csa.child_sa_index)
827 self.assertEqual(len(r), 1)
829 self.verify_ts(r[0].ts, tsi[0], True)
831 r = self.vapi.ikev2_traffic_selector_dump(
832 is_initiator=False, sa_index=sa.sa_index,
833 child_sa_index=csa.child_sa_index)
834 self.assertEqual(len(r), 1)
835 self.verify_ts(r[0].ts, tsr[0], False)
837 n = self.vapi.ikev2_nonce_get(is_initiator=True,
838 sa_index=sa.sa_index)
839 self.verify_nonce(n, self.sa.i_nonce)
840 n = self.vapi.ikev2_nonce_get(is_initiator=False,
841 sa_index=sa.sa_index)
842 self.verify_nonce(n, self.sa.r_nonce)
844 def verify_nonce(self, api_nonce, nonce):
845 self.assertEqual(api_nonce.data_len, len(nonce))
846 self.assertEqual(api_nonce.nonce, nonce)
848 def verify_ts(self, api_ts, ts, is_initiator):
850 self.assertTrue(api_ts.is_local)
852 self.assertFalse(api_ts.is_local)
855 self.assertEqual(api_ts.start_addr,
856 IPv4Address(ts.starting_address_v4))
857 self.assertEqual(api_ts.end_addr,
858 IPv4Address(ts.ending_address_v4))
860 self.assertEqual(api_ts.start_addr,
861 IPv6Address(ts.starting_address_v6))
862 self.assertEqual(api_ts.end_addr,
863 IPv6Address(ts.ending_address_v6))
864 self.assertEqual(api_ts.start_port, ts.start_port)
865 self.assertEqual(api_ts.end_port, ts.end_port)
866 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
869 class TemplateInitiator(IkePeer):
870 """ initiator test template """
872 def initiate_del_sa_from_initiator(self):
873 ispi = int.from_bytes(self.sa.ispi, 'little')
874 self.pg0.enable_capture()
876 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
877 capture = self.pg0.get_capture(1)
878 ih = self.get_ike_header(capture[0])
879 self.assertNotIn('Response', ih.flags)
880 self.assertIn('Initiator', ih.flags)
881 self.assertEqual(ih.init_SPI, self.sa.ispi)
882 self.assertEqual(ih.resp_SPI, self.sa.rspi)
883 plain = self.sa.hmac_and_decrypt(ih)
884 d = ikev2.IKEv2_payload_Delete(plain)
885 self.assertEqual(d.proto, 1) # proto=IKEv2
886 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
887 flags='Response', exch_type='INFORMATIONAL',
888 id=ih.id, next_payload='Encrypted')
889 resp = self.encrypt_ike_msg(header, b'', None)
890 self.send_and_assert_no_replies(self.pg0, resp)
892 def verify_del_sa(self, packet):
893 ih = self.get_ike_header(packet)
894 self.assertEqual(ih.id, self.sa.msg_id)
895 self.assertEqual(ih.exch_type, 37) # exchange informational
896 self.assertIn('Response', ih.flags)
897 self.assertIn('Initiator', ih.flags)
898 plain = self.sa.hmac_and_decrypt(ih)
899 self.assertEqual(plain, b'')
901 def initiate_del_sa_from_responder(self):
902 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
903 exch_type='INFORMATIONAL',
904 id=self.sa.new_msg_id())
905 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
906 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
907 packet = self.create_packet(self.pg0, ike_msg,
908 self.sa.sport, self.sa.dport,
909 self.sa.natt, self.ip6)
910 self.pg0.add_stream(packet)
911 self.pg0.enable_capture()
913 capture = self.pg0.get_capture(1)
914 self.verify_del_sa(capture[0])
917 def find_notify_payload(packet, notify_type):
918 n = packet[ikev2.IKEv2_payload_Notify]
920 if n.type == notify_type:
925 def verify_nat_detection(self, packet):
932 # NAT_DETECTION_SOURCE_IP
933 s = self.find_notify_payload(packet, 16388)
934 self.assertIsNotNone(s)
935 src_sha = self.sa.compute_nat_sha1(
936 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
937 self.assertEqual(s.load, src_sha)
939 # NAT_DETECTION_DESTINATION_IP
940 s = self.find_notify_payload(packet, 16389)
941 self.assertIsNotNone(s)
942 dst_sha = self.sa.compute_nat_sha1(
943 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
944 self.assertEqual(s.load, dst_sha)
946 def verify_sa_init_request(self, packet):
948 self.sa.dport = udp.sport
949 ih = packet[ikev2.IKEv2]
950 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
951 self.assertEqual(ih.exch_type, 34) # SA_INIT
952 self.sa.ispi = ih.init_SPI
953 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
954 self.assertIn('Initiator', ih.flags)
955 self.assertNotIn('Response', ih.flags)
956 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
957 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
959 prop = packet[ikev2.IKEv2_payload_Proposal]
960 self.assertEqual(prop.proto, 1) # proto = ikev2
961 self.assertEqual(prop.proposal, 1)
962 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
963 self.assertEqual(prop.trans[0].transform_id,
964 self.p.ike_transforms['crypto_alg'])
965 self.assertEqual(prop.trans[1].transform_type, 2) # prf
966 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
967 self.assertEqual(prop.trans[2].transform_type, 4) # dh
968 self.assertEqual(prop.trans[2].transform_id,
969 self.p.ike_transforms['dh_group'])
971 self.verify_nat_detection(packet)
972 self.sa.set_ike_props(
973 crypto='AES-GCM-16ICV', crypto_key_len=32,
974 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
975 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
976 integ='SHA2-256-128')
977 self.sa.generate_dh_data()
978 self.sa.complete_dh_data()
981 def update_esp_transforms(self, trans, sa):
983 if trans.transform_type == 1: # ecryption
984 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
985 elif trans.transform_type == 3: # integrity
986 sa.esp_integ = INTEG_IDS[trans.transform_id]
987 trans = trans.payload
989 def verify_sa_auth_req(self, packet):
991 self.sa.dport = udp.sport
992 ih = self.get_ike_header(packet)
993 self.assertEqual(ih.resp_SPI, self.sa.rspi)
994 self.assertEqual(ih.init_SPI, self.sa.ispi)
995 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
996 self.assertIn('Initiator', ih.flags)
997 self.assertNotIn('Response', ih.flags)
1000 self.verify_udp(udp)
1001 self.assertEqual(ih.id, self.sa.msg_id + 1)
1003 plain = self.sa.hmac_and_decrypt(ih)
1004 idi = ikev2.IKEv2_payload_IDi(plain)
1005 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1006 self.assertEqual(idi.load, self.sa.i_id)
1007 self.assertEqual(idr.load, self.sa.r_id)
1008 prop = idi[ikev2.IKEv2_payload_Proposal]
1009 c = self.sa.child_sas[0]
1011 self.update_esp_transforms(
1012 prop[ikev2.IKEv2_payload_Transform], self.sa)
1014 def send_init_response(self):
1015 tr_attr = self.sa.ike_crypto_attr()
1016 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1017 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1018 key_length=tr_attr[0]) /
1019 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1020 transform_id=self.sa.ike_integ) /
1021 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1022 transform_id=self.sa.ike_prf_alg.name) /
1023 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1024 transform_id=self.sa.ike_dh))
1025 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1026 trans_nb=4, trans=trans))
1028 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1030 dst_address = b'\x0a\x0a\x0a\x0a'
1032 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1033 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1034 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1036 self.sa.init_resp_packet = (
1037 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1038 exch_type='IKE_SA_INIT', flags='Response') /
1039 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1040 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1041 group=self.sa.ike_dh,
1042 load=self.sa.my_dh_pub_key) /
1043 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1044 next_payload='Notify') /
1045 ikev2.IKEv2_payload_Notify(
1046 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1047 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1048 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1050 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1051 self.sa.sport, self.sa.dport,
1053 self.pg_send(self.pg0, ike_msg)
1054 capture = self.pg0.get_capture(1)
1055 self.verify_sa_auth_req(capture[0])
1057 def initiate_sa_init(self):
1058 self.pg0.enable_capture()
1060 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1062 capture = self.pg0.get_capture(1)
1063 self.verify_sa_init_request(capture[0])
1064 self.send_init_response()
1066 def send_auth_response(self):
1067 tr_attr = self.sa.esp_crypto_attr()
1068 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1069 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1070 key_length=tr_attr[0]) /
1071 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1072 transform_id=self.sa.esp_integ) /
1073 ikev2.IKEv2_payload_Transform(
1074 transform_type='Extended Sequence Number',
1075 transform_id='No ESN') /
1076 ikev2.IKEv2_payload_Transform(
1077 transform_type='Extended Sequence Number',
1078 transform_id='ESN'))
1080 c = self.sa.child_sas[0]
1081 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1082 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1084 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1085 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1086 IDtype=self.sa.id_type, load=self.sa.i_id) /
1087 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1088 IDtype=self.sa.id_type, load=self.sa.r_id) /
1089 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1090 auth_type=AuthMethod.value(self.sa.auth_method),
1091 load=self.sa.auth_data) /
1092 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1093 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1094 number_of_TSs=len(tsi),
1095 traffic_selector=tsi) /
1096 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1097 number_of_TSs=len(tsr),
1098 traffic_selector=tsr) /
1099 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1101 header = ikev2.IKEv2(
1102 init_SPI=self.sa.ispi,
1103 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1104 flags='Response', exch_type='IKE_AUTH')
1106 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1107 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1108 self.sa.dport, self.sa.natt, self.ip6)
1109 self.pg_send(self.pg0, packet)
1111 def test_initiator(self):
1112 self.initiate_sa_init()
1114 self.sa.calc_child_keys()
1115 self.send_auth_response()
1116 self.verify_ike_sas()
1119 class TemplateResponder(IkePeer):
1120 """ responder test template """
1122 def initiate_del_sa_from_responder(self):
1123 self.pg0.enable_capture()
1125 self.vapi.ikev2_initiate_del_ike_sa(
1126 ispi=int.from_bytes(self.sa.ispi, 'little'))
1127 capture = self.pg0.get_capture(1)
1128 ih = self.get_ike_header(capture[0])
1129 self.assertNotIn('Response', ih.flags)
1130 self.assertNotIn('Initiator', ih.flags)
1131 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1132 plain = self.sa.hmac_and_decrypt(ih)
1133 d = ikev2.IKEv2_payload_Delete(plain)
1134 self.assertEqual(d.proto, 1) # proto=IKEv2
1135 self.assertEqual(ih.init_SPI, self.sa.ispi)
1136 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1137 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1138 flags='Initiator+Response',
1139 exch_type='INFORMATIONAL',
1140 id=ih.id, next_payload='Encrypted')
1141 resp = self.encrypt_ike_msg(header, b'', None)
1142 self.send_and_assert_no_replies(self.pg0, resp)
1144 def verify_del_sa(self, packet):
1145 ih = self.get_ike_header(packet)
1146 self.assertEqual(ih.id, self.sa.msg_id)
1147 self.assertEqual(ih.exch_type, 37) # exchange informational
1148 self.assertIn('Response', ih.flags)
1149 self.assertNotIn('Initiator', ih.flags)
1150 self.assertEqual(ih.next_payload, 46) # Encrypted
1151 self.assertEqual(ih.init_SPI, self.sa.ispi)
1152 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1153 plain = self.sa.hmac_and_decrypt(ih)
1154 self.assertEqual(plain, b'')
1156 def initiate_del_sa_from_initiator(self):
1157 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1158 flags='Initiator', exch_type='INFORMATIONAL',
1159 id=self.sa.new_msg_id())
1160 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1161 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1162 packet = self.create_packet(self.pg0, ike_msg,
1163 self.sa.sport, self.sa.dport,
1164 self.sa.natt, self.ip6)
1165 self.pg0.add_stream(packet)
1166 self.pg0.enable_capture()
1168 capture = self.pg0.get_capture(1)
1169 self.verify_del_sa(capture[0])
1171 def send_sa_init_req(self):
1172 tr_attr = self.sa.ike_crypto_attr()
1173 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1174 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1175 key_length=tr_attr[0]) /
1176 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1177 transform_id=self.sa.ike_integ) /
1178 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1179 transform_id=self.sa.ike_prf_alg.name) /
1180 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1181 transform_id=self.sa.ike_dh))
1183 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1184 trans_nb=4, trans=trans))
1186 next_payload = None if self.ip6 else 'Notify'
1188 self.sa.init_req_packet = (
1189 ikev2.IKEv2(init_SPI=self.sa.ispi,
1190 flags='Initiator', exch_type='IKE_SA_INIT') /
1191 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1192 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1193 group=self.sa.ike_dh,
1194 load=self.sa.my_dh_pub_key) /
1195 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1196 load=self.sa.i_nonce))
1200 src_address = b'\x0a\x0a\x0a\x01'
1202 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1205 dst_address = b'\x0a\x0a\x0a\x0a'
1207 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1209 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1210 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1211 nat_src_detection = ikev2.IKEv2_payload_Notify(
1212 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1213 next_payload='Notify')
1214 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1215 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1216 self.sa.init_req_packet = (self.sa.init_req_packet /
1220 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1221 self.sa.sport, self.sa.dport,
1222 self.sa.natt, self.ip6)
1223 self.pg0.add_stream(ike_msg)
1224 self.pg0.enable_capture()
1226 capture = self.pg0.get_capture(1)
1227 self.verify_sa_init(capture[0])
1229 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1230 tr_attr = self.sa.esp_crypto_attr()
1231 last_payload = last_payload or 'Notify'
1232 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1233 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1234 key_length=tr_attr[0]) /
1235 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1236 transform_id=self.sa.esp_integ) /
1237 ikev2.IKEv2_payload_Transform(
1238 transform_type='Extended Sequence Number',
1239 transform_id='No ESN') /
1240 ikev2.IKEv2_payload_Transform(
1241 transform_type='Extended Sequence Number',
1242 transform_id='ESN'))
1244 c = self.sa.child_sas[0]
1245 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1246 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1248 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1249 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1250 auth_type=AuthMethod.value(self.sa.auth_method),
1251 load=self.sa.auth_data) /
1252 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1253 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1254 number_of_TSs=len(tsi), traffic_selector=tsi) /
1255 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1256 number_of_TSs=len(tsr), traffic_selector=tsr))
1259 first_payload = 'Nonce'
1260 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1261 next_payload='SA') / plain /
1262 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1263 proto='ESP', SPI=c.ispi))
1265 first_payload = 'IDi'
1266 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1267 IDtype=self.sa.id_type, load=self.sa.i_id) /
1268 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1269 IDtype=self.sa.id_type, load=self.sa.r_id))
1271 return plain, first_payload
1273 def send_sa_auth(self):
1274 plain, first_payload = self.generate_auth_payload(
1275 last_payload='Notify')
1276 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1277 header = ikev2.IKEv2(
1278 init_SPI=self.sa.ispi,
1279 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1280 flags='Initiator', exch_type='IKE_AUTH')
1282 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1283 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1284 self.sa.dport, self.sa.natt, self.ip6)
1285 self.pg0.add_stream(packet)
1286 self.pg0.enable_capture()
1288 capture = self.pg0.get_capture(1)
1289 self.verify_sa_auth_resp(capture[0])
1291 def verify_sa_init(self, packet):
1292 ih = self.get_ike_header(packet)
1294 self.assertEqual(ih.id, self.sa.msg_id)
1295 self.assertEqual(ih.exch_type, 34)
1296 self.assertIn('Response', ih.flags)
1297 self.assertEqual(ih.init_SPI, self.sa.ispi)
1298 self.assertNotEqual(ih.resp_SPI, 0)
1299 self.sa.rspi = ih.resp_SPI
1301 sa = ih[ikev2.IKEv2_payload_SA]
1302 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1303 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1304 except IndexError as e:
1305 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1306 self.logger.error(ih.show())
1308 self.sa.complete_dh_data()
1312 def verify_sa_auth_resp(self, packet):
1313 ike = self.get_ike_header(packet)
1315 self.verify_udp(udp)
1316 self.assertEqual(ike.id, self.sa.msg_id)
1317 plain = self.sa.hmac_and_decrypt(ike)
1318 idr = ikev2.IKEv2_payload_IDr(plain)
1319 prop = idr[ikev2.IKEv2_payload_Proposal]
1320 self.assertEqual(prop.SPIsize, 4)
1321 self.sa.child_sas[0].rspi = prop.SPI
1322 self.sa.calc_child_keys()
1324 IKE_NODE_SUFFIX = 'ip4'
1326 def verify_counters(self):
1327 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1328 self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1329 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1331 r = self.vapi.ikev2_sa_dump()
1333 self.assertEqual(1, s.n_sa_auth_req)
1334 self.assertEqual(1, s.n_sa_init_req)
1336 def test_responder(self):
1337 self.send_sa_init_req()
1339 self.verify_ipsec_sas()
1340 self.verify_ike_sas()
1341 self.verify_counters()
1344 class Ikev2Params(object):
1345 def config_params(self, params={}):
1346 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1347 ei = VppEnum.vl_api_ipsec_integ_alg_t
1349 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1350 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1351 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1352 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1353 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1354 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1356 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1357 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1358 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1359 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1361 dpd_disabled = True if 'dpd_disabled' not in params else\
1362 params['dpd_disabled']
1364 self.vapi.cli('ikev2 dpd disable')
1365 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1366 not in params else params['del_sa_from_responder']
1367 i_natt = False if 'i_natt' not in params else params['i_natt']
1368 r_natt = False if 'r_natt' not in params else params['r_natt']
1369 self.p = Profile(self, 'pr1')
1370 self.ip6 = False if 'ip6' not in params else params['ip6']
1372 if 'auth' in params and params['auth'] == 'rsa-sig':
1373 auth_method = 'rsa-sig'
1374 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1375 self.vapi.ikev2_set_local_key(
1376 key_file=work_dir + params['server-key'])
1378 client_file = work_dir + params['client-cert']
1379 server_pem = open(work_dir + params['server-cert']).read()
1380 client_priv = open(work_dir + params['client-key']).read()
1381 client_priv = load_pem_private_key(str.encode(client_priv), None,
1383 self.peer_cert = x509.load_pem_x509_certificate(
1384 str.encode(server_pem),
1386 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1389 auth_data = b'$3cr3tpa$$w0rd'
1390 self.p.add_auth(method='shared-key', data=auth_data)
1391 auth_method = 'shared-key'
1394 is_init = True if 'is_initiator' not in params else\
1395 params['is_initiator']
1397 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1398 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1400 self.p.add_local_id(**idr)
1401 self.p.add_remote_id(**idi)
1403 self.p.add_local_id(**idi)
1404 self.p.add_remote_id(**idr)
1406 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1407 'loc_ts' not in params else params['loc_ts']
1408 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1409 'rem_ts' not in params else params['rem_ts']
1410 self.p.add_local_ts(**loc_ts)
1411 self.p.add_remote_ts(**rem_ts)
1412 if 'responder' in params:
1413 self.p.add_responder(params['responder'])
1414 if 'ike_transforms' in params:
1415 self.p.add_ike_transforms(params['ike_transforms'])
1416 if 'esp_transforms' in params:
1417 self.p.add_esp_transforms(params['esp_transforms'])
1419 udp_encap = False if 'udp_encap' not in params else\
1422 self.p.set_udp_encap(True)
1424 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1425 is_initiator=is_init,
1426 id_type=self.p.local_id['id_type'],
1427 i_natt=i_natt, r_natt=r_natt,
1428 priv_key=client_priv, auth_method=auth_method,
1429 auth_data=auth_data, udp_encap=udp_encap,
1430 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1432 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1433 params['ike-crypto']
1434 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1436 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1439 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1440 params['esp-crypto']
1441 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1444 self.sa.set_ike_props(
1445 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1446 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1447 self.sa.set_esp_props(
1448 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1452 class TestApi(VppTestCase):
1453 """ Test IKEV2 API """
1455 def setUpClass(cls):
1456 super(TestApi, cls).setUpClass()
1459 def tearDownClass(cls):
1460 super(TestApi, cls).tearDownClass()
1463 super(TestApi, self).tearDown()
1464 self.p1.remove_vpp_config()
1465 self.p2.remove_vpp_config()
1466 r = self.vapi.ikev2_profile_dump()
1467 self.assertEqual(len(r), 0)
1469 def configure_profile(self, cfg):
1470 p = Profile(self, cfg['name'])
1471 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1472 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1473 p.add_local_ts(**cfg['loc_ts'])
1474 p.add_remote_ts(**cfg['rem_ts'])
1475 p.add_responder(cfg['responder'])
1476 p.add_ike_transforms(cfg['ike_ts'])
1477 p.add_esp_transforms(cfg['esp_ts'])
1478 p.add_auth(**cfg['auth'])
1479 p.set_udp_encap(cfg['udp_encap'])
1480 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1481 if 'lifetime_data' in cfg:
1482 p.set_lifetime_data(cfg['lifetime_data'])
1483 if 'tun_itf' in cfg:
1484 p.set_tunnel_interface(cfg['tun_itf'])
1485 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1490 def test_profile_api(self):
1491 """ test profile dump API """
1496 'start_addr': '3.3.3.2',
1497 'end_addr': '3.3.3.3',
1503 'start_addr': '4.5.76.80',
1504 'end_addr': '2.3.4.6',
1511 'start_addr': 'ab::1',
1512 'end_addr': 'ab::4',
1518 'start_addr': 'cd::12',
1519 'end_addr': 'cd::13',
1525 'natt_disabled': True,
1526 'loc_id': ('fqdn', b'vpp.home'),
1527 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1530 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1533 'crypto_key_size': 32,
1538 'crypto_key_size': 24,
1540 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1542 'ipsec_over_udp_port': 4501,
1545 'lifetime_maxdata': 20192,
1546 'lifetime_jitter': 9,
1551 'loc_id': ('ip4-addr', b'192.168.2.1'),
1552 'rem_id': ('ip6-addr', b'abcd::1'),
1555 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1558 'crypto_key_size': 16,
1563 'crypto_key_size': 24,
1565 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1567 'ipsec_over_udp_port': 4600,
1570 self.p1 = self.configure_profile(conf['p1'])
1571 self.p2 = self.configure_profile(conf['p2'])
1573 r = self.vapi.ikev2_profile_dump()
1574 self.assertEqual(len(r), 2)
1575 self.verify_profile(r[0].profile, conf['p1'])
1576 self.verify_profile(r[1].profile, conf['p2'])
1578 def verify_id(self, api_id, cfg_id):
1579 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1580 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1582 def verify_ts(self, api_ts, cfg_ts):
1583 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1584 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1585 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1586 self.assertEqual(api_ts.start_addr,
1587 ip_address(text_type(cfg_ts['start_addr'])))
1588 self.assertEqual(api_ts.end_addr,
1589 ip_address(text_type(cfg_ts['end_addr'])))
1591 def verify_responder(self, api_r, cfg_r):
1592 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1593 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1595 def verify_transforms(self, api_ts, cfg_ts):
1596 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1597 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1598 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1600 def verify_ike_transforms(self, api_ts, cfg_ts):
1601 self.verify_transforms(api_ts, cfg_ts)
1602 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1604 def verify_esp_transforms(self, api_ts, cfg_ts):
1605 self.verify_transforms(api_ts, cfg_ts)
1607 def verify_auth(self, api_auth, cfg_auth):
1608 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1609 self.assertEqual(api_auth.data, cfg_auth['data'])
1610 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1612 def verify_lifetime_data(self, p, ld):
1613 self.assertEqual(p.lifetime, ld['lifetime'])
1614 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1615 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1616 self.assertEqual(p.handover, ld['handover'])
1618 def verify_profile(self, ap, cp):
1619 self.assertEqual(ap.name, cp['name'])
1620 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1621 self.verify_id(ap.loc_id, cp['loc_id'])
1622 self.verify_id(ap.rem_id, cp['rem_id'])
1623 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1624 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1625 self.verify_responder(ap.responder, cp['responder'])
1626 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1627 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1628 self.verify_auth(ap.auth, cp['auth'])
1629 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1630 self.assertTrue(natt_dis == ap.natt_disabled)
1632 if 'lifetime_data' in cp:
1633 self.verify_lifetime_data(ap, cp['lifetime_data'])
1634 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1636 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1638 self.assertEqual(ap.tun_itf, 0xffffffff)
1641 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1642 """ test responder - responder behind NAT """
1644 IKE_NODE_SUFFIX = 'ip4-natt'
1646 def config_tc(self):
1647 self.config_params({'r_natt': True})
1650 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1651 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1653 def config_tc(self):
1654 self.config_params({
1656 'is_initiator': False, # seen from test case perspective
1657 # thus vpp is initiator
1658 'responder': {'sw_if_index': self.pg0.sw_if_index,
1659 'addr': self.pg0.remote_ip4},
1660 'ike-crypto': ('AES-GCM-16ICV', 32),
1661 'ike-integ': 'NULL',
1662 'ike-dh': '3072MODPgr',
1664 'crypto_alg': 20, # "aes-gcm-16"
1665 'crypto_key_size': 256,
1666 'dh_group': 15, # "modp-3072"
1669 'crypto_alg': 12, # "aes-cbc"
1670 'crypto_key_size': 256,
1671 # "hmac-sha2-256-128"
1675 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1676 """ test ikev2 initiator - pre shared key auth """
1678 def config_tc(self):
1679 self.config_params({
1680 'is_initiator': False, # seen from test case perspective
1681 # thus vpp is initiator
1682 'responder': {'sw_if_index': self.pg0.sw_if_index,
1683 'addr': self.pg0.remote_ip4},
1684 'ike-crypto': ('AES-GCM-16ICV', 32),
1685 'ike-integ': 'NULL',
1686 'ike-dh': '3072MODPgr',
1688 'crypto_alg': 20, # "aes-gcm-16"
1689 'crypto_key_size': 256,
1690 'dh_group': 15, # "modp-3072"
1693 'crypto_alg': 12, # "aes-cbc"
1694 'crypto_key_size': 256,
1695 # "hmac-sha2-256-128"
1699 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1700 """ test initiator - request window size (1) """
1702 def rekey_respond(self, req, update_child_sa_data):
1703 ih = self.get_ike_header(req)
1704 plain = self.sa.hmac_and_decrypt(ih)
1705 sa = ikev2.IKEv2_payload_SA(plain)
1706 if update_child_sa_data:
1707 prop = sa[ikev2.IKEv2_payload_Proposal]
1708 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1709 self.sa.r_nonce = self.sa.i_nonce
1710 self.sa.child_sas[0].ispi = prop.SPI
1711 self.sa.child_sas[0].rspi = prop.SPI
1712 self.sa.calc_child_keys()
1714 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1715 flags='Response', exch_type=36,
1716 id=ih.id, next_payload='Encrypted')
1717 resp = self.encrypt_ike_msg(header, sa, 'SA')
1718 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1719 self.sa.dport, self.sa.natt, self.ip6)
1720 self.send_and_assert_no_replies(self.pg0, packet)
1722 def test_initiator(self):
1723 super(TestInitiatorRequestWindowSize, self).test_initiator()
1724 self.pg0.enable_capture()
1726 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1727 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1728 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1729 capture = self.pg0.get_capture(2)
1731 # reply in reverse order
1732 self.rekey_respond(capture[1], True)
1733 self.rekey_respond(capture[0], False)
1735 # verify that only the second request was accepted
1736 self.verify_ike_sas()
1737 self.verify_ipsec_sas(is_rekey=True)
1740 class TestInitiatorRekey(TestInitiatorPsk):
1741 """ test ikev2 initiator - rekey """
1743 def rekey_from_initiator(self):
1744 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1745 self.pg0.enable_capture()
1747 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1748 capture = self.pg0.get_capture(1)
1749 ih = self.get_ike_header(capture[0])
1750 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1751 self.assertNotIn('Response', ih.flags)
1752 self.assertIn('Initiator', ih.flags)
1753 plain = self.sa.hmac_and_decrypt(ih)
1754 sa = ikev2.IKEv2_payload_SA(plain)
1755 prop = sa[ikev2.IKEv2_payload_Proposal]
1756 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1757 self.sa.r_nonce = self.sa.i_nonce
1758 # update new responder SPI
1759 self.sa.child_sas[0].ispi = prop.SPI
1760 self.sa.child_sas[0].rspi = prop.SPI
1761 self.sa.calc_child_keys()
1762 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1763 flags='Response', exch_type=36,
1764 id=ih.id, next_payload='Encrypted')
1765 resp = self.encrypt_ike_msg(header, sa, 'SA')
1766 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1767 self.sa.dport, self.sa.natt, self.ip6)
1768 self.send_and_assert_no_replies(self.pg0, packet)
1770 def test_initiator(self):
1771 super(TestInitiatorRekey, self).test_initiator()
1772 self.rekey_from_initiator()
1773 self.verify_ike_sas()
1774 self.verify_ipsec_sas(is_rekey=True)
1777 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1778 """ test ikev2 initiator - delete IKE SA from responder """
1780 def config_tc(self):
1781 self.config_params({
1782 'del_sa_from_responder': True,
1783 'is_initiator': False, # seen from test case perspective
1784 # thus vpp is initiator
1785 'responder': {'sw_if_index': self.pg0.sw_if_index,
1786 'addr': self.pg0.remote_ip4},
1787 'ike-crypto': ('AES-GCM-16ICV', 32),
1788 'ike-integ': 'NULL',
1789 'ike-dh': '3072MODPgr',
1791 'crypto_alg': 20, # "aes-gcm-16"
1792 'crypto_key_size': 256,
1793 'dh_group': 15, # "modp-3072"
1796 'crypto_alg': 12, # "aes-cbc"
1797 'crypto_key_size': 256,
1798 # "hmac-sha2-256-128"
1802 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1803 """ test ikev2 responder - initiator behind NAT """
1805 IKE_NODE_SUFFIX = 'ip4-natt'
1807 def config_tc(self):
1812 class TestResponderPsk(TemplateResponder, Ikev2Params):
1813 """ test ikev2 responder - pre shared key auth """
1814 def config_tc(self):
1815 self.config_params()
1818 class TestResponderDpd(TestResponderPsk):
1820 Dead peer detection test
1822 def config_tc(self):
1823 self.config_params({'dpd_disabled': False})
1828 def test_responder(self):
1829 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1830 super(TestResponderDpd, self).test_responder()
1831 self.pg0.enable_capture()
1833 # capture empty request but don't reply
1834 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1835 ih = self.get_ike_header(capture[0])
1836 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1837 plain = self.sa.hmac_and_decrypt(ih)
1838 self.assertEqual(plain, b'')
1839 # wait for SA expiration
1841 ike_sas = self.vapi.ikev2_sa_dump()
1842 self.assertEqual(len(ike_sas), 0)
1843 ipsec_sas = self.vapi.ipsec_sa_dump()
1844 self.assertEqual(len(ipsec_sas), 0)
1847 class TestResponderRekey(TestResponderPsk):
1848 """ test ikev2 responder - rekey """
1850 def rekey_from_initiator(self):
1851 packet = self.create_rekey_request()
1852 self.pg0.add_stream(packet)
1853 self.pg0.enable_capture()
1855 capture = self.pg0.get_capture(1)
1856 ih = self.get_ike_header(capture[0])
1857 plain = self.sa.hmac_and_decrypt(ih)
1858 sa = ikev2.IKEv2_payload_SA(plain)
1859 prop = sa[ikev2.IKEv2_payload_Proposal]
1860 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1861 # update new responder SPI
1862 self.sa.child_sas[0].rspi = prop.SPI
1864 def test_responder(self):
1865 super(TestResponderRekey, self).test_responder()
1866 self.rekey_from_initiator()
1867 self.sa.calc_child_keys()
1868 self.verify_ike_sas()
1869 self.verify_ipsec_sas(is_rekey=True)
1870 self.assert_counter(1, 'rekey_req', 'ip4')
1871 r = self.vapi.ikev2_sa_dump()
1872 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1875 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1876 """ test ikev2 responder - non-default table id """
1879 def setUpClass(cls):
1880 import scapy.contrib.ikev2 as _ikev2
1881 globals()['ikev2'] = _ikev2
1882 super(IkePeer, cls).setUpClass()
1883 cls.create_pg_interfaces(range(1))
1884 cls.vapi.cli("ip table add 1")
1885 cls.vapi.cli("set interface ip table pg0 1")
1886 for i in cls.pg_interfaces:
1893 def config_tc(self):
1894 self.config_params({'dpd_disabled': False})
1896 def test_responder(self):
1897 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1898 super(TestResponderVrf, self).test_responder()
1899 self.pg0.enable_capture()
1901 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1902 ih = self.get_ike_header(capture[0])
1903 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1904 plain = self.sa.hmac_and_decrypt(ih)
1905 self.assertEqual(plain, b'')
1908 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1909 """ test ikev2 responder - cert based auth """
1910 def config_tc(self):
1911 self.config_params({
1914 'server-key': 'server-key.pem',
1915 'client-key': 'client-key.pem',
1916 'client-cert': 'client-cert.pem',
1917 'server-cert': 'server-cert.pem'})
1920 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1921 (TemplateResponder, Ikev2Params):
1923 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1925 def config_tc(self):
1926 self.config_params({
1927 'ike-crypto': ('AES-CBC', 16),
1928 'ike-integ': 'SHA2-256-128',
1929 'esp-crypto': ('AES-CBC', 24),
1930 'esp-integ': 'SHA2-384-192',
1931 'ike-dh': '2048MODPgr'})
1934 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1935 (TemplateResponder, Ikev2Params):
1938 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1940 def config_tc(self):
1941 self.config_params({
1942 'ike-crypto': ('AES-CBC', 32),
1943 'ike-integ': 'SHA2-256-128',
1944 'esp-crypto': ('AES-GCM-16ICV', 32),
1945 'esp-integ': 'NULL',
1946 'ike-dh': '3072MODPgr'})
1949 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1954 IKE_NODE_SUFFIX = 'ip6'
1956 def config_tc(self):
1957 self.config_params({
1958 'del_sa_from_responder': True,
1961 'ike-crypto': ('AES-GCM-16ICV', 32),
1962 'ike-integ': 'NULL',
1963 'ike-dh': '2048MODPgr',
1964 'loc_ts': {'start_addr': 'ab:cd::0',
1965 'end_addr': 'ab:cd::10'},
1966 'rem_ts': {'start_addr': '11::0',
1967 'end_addr': '11::100'}})
1970 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1972 Test for keep alive messages
1975 def send_empty_req_from_responder(self):
1976 packet = self.create_empty_request()
1977 self.pg0.add_stream(packet)
1978 self.pg0.enable_capture()
1980 capture = self.pg0.get_capture(1)
1981 ih = self.get_ike_header(capture[0])
1982 self.assertEqual(ih.id, self.sa.msg_id)
1983 plain = self.sa.hmac_and_decrypt(ih)
1984 self.assertEqual(plain, b'')
1985 self.assert_counter(1, 'keepalive', 'ip4')
1986 r = self.vapi.ikev2_sa_dump()
1987 self.assertEqual(1, r[0].sa.stats.n_keepalives)
1989 def test_initiator(self):
1990 super(TestInitiatorKeepaliveMsg, self).test_initiator()
1991 self.send_empty_req_from_responder()
1994 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1995 """ malformed packet test """
2000 def config_tc(self):
2001 self.config_params()
2003 def create_ike_init_msg(self, length=None, payload=None):
2004 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2005 flags='Initiator', exch_type='IKE_SA_INIT')
2006 if payload is not None:
2008 return self.create_packet(self.pg0, msg, self.sa.sport,
2011 def verify_bad_packet_length(self):
2012 ike_msg = self.create_ike_init_msg(length=0xdead)
2013 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2014 self.assert_counter(self.pkt_count, 'bad_length')
2016 def verify_bad_sa_payload_length(self):
2017 p = ikev2.IKEv2_payload_SA(length=0xdead)
2018 ike_msg = self.create_ike_init_msg(payload=p)
2019 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2020 self.assert_counter(self.pkt_count, 'malformed_packet')
2022 def test_responder(self):
2023 self.pkt_count = 254
2024 self.verify_bad_packet_length()
2025 self.verify_bad_sa_payload_length()
2028 if __name__ == '__main__':
2029 unittest.main(testRunner=VppTestRunner)