3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
30 KEY_PAD = b"Key Pad for IKEv2"
37 # tuple structure is (p, g, key_len)
39 '2048MODPgr': (long_converter("""
40 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52 '3072MODPgr': (long_converter("""
53 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
72 class CryptoAlgo(object):
73 def __init__(self, name, cipher, mode):
77 if self.cipher is not None:
78 self.bs = self.cipher.block_size // 8
80 if self.name == 'AES-GCM-16ICV':
81 self.iv_len = GCM_IV_SIZE
85 def encrypt(self, data, key, aad=None):
86 iv = os.urandom(self.iv_len)
88 encryptor = Cipher(self.cipher(key), self.mode(iv),
89 default_backend()).encryptor()
90 return iv + encryptor.update(data) + encryptor.finalize()
92 salt = key[-SALT_SIZE:]
94 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95 default_backend()).encryptor()
96 encryptor.authenticate_additional_data(aad)
97 data = encryptor.update(data) + encryptor.finalize()
98 data += encryptor.tag[:GCM_ICV_SIZE]
101 def decrypt(self, data, key, aad=None, icv=None):
103 iv = data[:self.iv_len]
104 ct = data[self.iv_len:]
105 decryptor = Cipher(algorithms.AES(key),
107 default_backend()).decryptor()
108 return decryptor.update(ct) + decryptor.finalize()
110 salt = key[-SALT_SIZE:]
111 nonce = salt + data[:GCM_IV_SIZE]
112 ct = data[GCM_IV_SIZE:]
113 key = key[:-SALT_SIZE]
114 decryptor = Cipher(algorithms.AES(key),
115 self.mode(nonce, icv, len(icv)),
116 default_backend()).decryptor()
117 decryptor.authenticate_additional_data(aad)
118 return decryptor.update(ct) + decryptor.finalize()
121 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122 data = data + b'\x00' * (pad_len - 1)
123 return data + bytes([pad_len - 1])
126 class AuthAlgo(object):
127 def __init__(self, name, mac, mod, key_len, trunc_len=None):
131 self.key_len = key_len
132 self.trunc_len = trunc_len or key_len
136 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
143 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
151 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
169 class IKEv2ChildSA(object):
170 def __init__(self, local_ts, remote_ts, is_initiator):
178 self.local_ts = local_ts
179 self.remote_ts = remote_ts
182 class IKEv2SA(object):
183 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186 auth_method='shared-key', priv_key=None, i_natt=False,
187 r_natt=False, udp_encap=False):
188 self.udp_encap = udp_encap
198 self.dh_params = None
200 self.priv_key = priv_key
201 self.is_initiator = is_initiator
202 nonce = nonce or os.urandom(32)
203 self.auth_data = auth_data
206 if isinstance(id_type, str):
207 self.id_type = IDType.value(id_type)
209 self.id_type = id_type
210 self.auth_method = auth_method
211 if self.is_initiator:
212 self.rspi = 8 * b'\x00'
217 self.ispi = 8 * b'\x00'
219 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
222 def new_msg_id(self):
227 def my_dh_pub_key(self):
228 if self.is_initiator:
229 return self.i_dh_data
230 return self.r_dh_data
233 def peer_dh_pub_key(self):
234 if self.is_initiator:
235 return self.r_dh_data
236 return self.i_dh_data
240 return self.i_natt or self.r_natt
242 def compute_secret(self):
243 priv = self.dh_private_key
244 peer = self.peer_dh_pub_key
245 p, g, l = self.ike_group
246 return pow(int.from_bytes(peer, 'big'),
247 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249 def generate_dh_data(self):
251 if self.ike_dh not in DH:
252 raise NotImplementedError('%s not in DH group' % self.ike_dh)
254 if self.dh_params is None:
255 dhg = DH[self.ike_dh]
256 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
257 self.dh_params = pn.parameters(default_backend())
259 priv = self.dh_params.generate_private_key()
260 pub = priv.public_key()
261 x = priv.private_numbers().x
262 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
263 y = pub.public_numbers().y
265 if self.is_initiator:
266 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
268 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
270 def complete_dh_data(self):
271 self.dh_shared_secret = self.compute_secret()
273 def calc_child_keys(self):
274 prf = self.ike_prf_alg.mod()
275 s = self.i_nonce + self.r_nonce
276 c = self.child_sas[0]
278 encr_key_len = self.esp_crypto_key_len
279 integ_key_len = self.esp_integ_alg.key_len
280 salt_len = 0 if integ_key_len else 4
282 l = (integ_key_len * 2 +
285 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
288 c.sk_ei = keymat[pos:pos+encr_key_len]
292 c.sk_ai = keymat[pos:pos+integ_key_len]
295 c.salt_ei = keymat[pos:pos+salt_len]
298 c.sk_er = keymat[pos:pos+encr_key_len]
302 c.sk_ar = keymat[pos:pos+integ_key_len]
305 c.salt_er = keymat[pos:pos+salt_len]
308 def calc_prfplus(self, prf, key, seed, length):
312 while len(r) < length and x < 255:
317 s = s + seed + bytes([x])
318 t = self.calc_prf(prf, key, s)
326 def calc_prf(self, prf, key, data):
327 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
332 prf = self.ike_prf_alg.mod()
333 # SKEYSEED = prf(Ni | Nr, g^ir)
334 s = self.i_nonce + self.r_nonce
335 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337 # calculate S = Ni | Nr | SPIi SPIr
338 s = s + self.ispi + self.rspi
340 prf_key_trunc = self.ike_prf_alg.trunc_len
341 encr_key_len = self.ike_crypto_key_len
342 tr_prf_key_len = self.ike_prf_alg.key_len
343 integ_key_len = self.ike_integ_alg.key_len
344 if integ_key_len == 0:
354 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
357 self.sk_d = keymat[:pos+prf_key_trunc]
360 self.sk_ai = keymat[pos:pos+integ_key_len]
362 self.sk_ar = keymat[pos:pos+integ_key_len]
365 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
366 pos += encr_key_len + salt_size
367 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
368 pos += encr_key_len + salt_size
370 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
371 pos += tr_prf_key_len
372 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374 def generate_authmsg(self, prf, packet):
375 if self.is_initiator:
383 data = bytes([self.id_type, 0, 0, 0]) + id
384 id_hash = self.calc_prf(prf, key, data)
385 return packet + nonce + id_hash
388 prf = self.ike_prf_alg.mod()
389 if self.is_initiator:
390 packet = self.init_req_packet
392 packet = self.init_resp_packet
393 authmsg = self.generate_authmsg(prf, raw(packet))
394 if self.auth_method == 'shared-key':
395 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
396 self.auth_data = self.calc_prf(prf, psk, authmsg)
397 elif self.auth_method == 'rsa-sig':
398 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
401 raise TypeError('unknown auth method type!')
403 def encrypt(self, data, aad=None):
404 data = self.ike_crypto_alg.pad(data)
405 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
408 def peer_authkey(self):
409 if self.is_initiator:
414 def my_authkey(self):
415 if self.is_initiator:
420 def my_cryptokey(self):
421 if self.is_initiator:
426 def peer_cryptokey(self):
427 if self.is_initiator:
431 def concat(self, alg, key_len):
432 return alg + '-' + str(key_len * 8)
435 def vpp_ike_cypto_alg(self):
436 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
439 def vpp_esp_cypto_alg(self):
440 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442 def verify_hmac(self, ikemsg):
443 integ_trunc = self.ike_integ_alg.trunc_len
444 exp_hmac = ikemsg[-integ_trunc:]
445 data = ikemsg[:-integ_trunc]
446 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
447 self.peer_authkey, data)
448 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450 def compute_hmac(self, integ, key, data):
451 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
455 def decrypt(self, data, aad=None, icv=None):
456 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
458 def hmac_and_decrypt(self, ike):
459 ep = ike[ikev2.IKEv2_payload_Encrypted]
460 if self.ike_crypto == 'AES-GCM-16ICV':
461 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
462 ct = ep.load[:-GCM_ICV_SIZE]
463 tag = ep.load[-GCM_ICV_SIZE:]
464 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
466 self.verify_hmac(raw(ike))
467 integ_trunc = self.ike_integ_alg.trunc_len
469 # remove ICV and decrypt payload
470 ct = ep.load[:-integ_trunc]
471 plain = self.decrypt(ct)
474 return plain[:-pad_len - 1]
476 def build_ts_addr(self, ts, version):
477 return {'starting_address_v' + version: ts['start_addr'],
478 'ending_address_v' + version: ts['end_addr']}
480 def generate_ts(self, is_ip4):
481 c = self.child_sas[0]
482 ts_data = {'IP_protocol_ID': 0,
486 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
487 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
488 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
489 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
492 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
493 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
494 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
496 if self.is_initiator:
497 return ([ts1], [ts2])
498 return ([ts2], [ts1])
500 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
501 if crypto not in CRYPTO_ALGOS:
502 raise TypeError('unsupported encryption algo %r' % crypto)
503 self.ike_crypto = crypto
504 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
505 self.ike_crypto_key_len = crypto_key_len
507 if integ not in AUTH_ALGOS:
508 raise TypeError('unsupported auth algo %r' % integ)
509 self.ike_integ = None if integ == 'NULL' else integ
510 self.ike_integ_alg = AUTH_ALGOS[integ]
512 if prf not in PRF_ALGOS:
513 raise TypeError('unsupported prf algo %r' % prf)
515 self.ike_prf_alg = PRF_ALGOS[prf]
517 self.ike_group = DH[self.ike_dh]
519 def set_esp_props(self, crypto, crypto_key_len, integ):
520 self.esp_crypto_key_len = crypto_key_len
521 if crypto not in CRYPTO_ALGOS:
522 raise TypeError('unsupported encryption algo %r' % crypto)
523 self.esp_crypto = crypto
524 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526 if integ not in AUTH_ALGOS:
527 raise TypeError('unsupported auth algo %r' % integ)
528 self.esp_integ = None if integ == 'NULL' else integ
529 self.esp_integ_alg = AUTH_ALGOS[integ]
531 def crypto_attr(self, key_len):
532 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
533 return (0x800e << 16 | key_len << 3, 12)
535 raise Exception('unsupported attribute type')
537 def ike_crypto_attr(self):
538 return self.crypto_attr(self.ike_crypto_key_len)
540 def esp_crypto_attr(self):
541 return self.crypto_attr(self.esp_crypto_key_len)
543 def compute_nat_sha1(self, ip, port, rspi=None):
546 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
547 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549 return digest.finalize()
552 class IkePeer(VppTestCase):
553 """ common class for initiator and responder """
557 import scapy.contrib.ikev2 as _ikev2
558 globals()['ikev2'] = _ikev2
559 super(IkePeer, cls).setUpClass()
560 cls.create_pg_interfaces(range(2))
561 for i in cls.pg_interfaces:
569 def tearDownClass(cls):
570 super(IkePeer, cls).tearDownClass()
573 super(IkePeer, self).tearDown()
574 if self.del_sa_from_responder:
575 self.initiate_del_sa_from_responder()
577 self.initiate_del_sa_from_initiator()
578 r = self.vapi.ikev2_sa_dump()
579 self.assertEqual(len(r), 0)
580 sas = self.vapi.ipsec_sa_dump()
581 self.assertEqual(len(sas), 0)
582 self.p.remove_vpp_config()
583 self.assertIsNone(self.p.query_vpp_config())
586 super(IkePeer, self).setUp()
588 self.p.add_vpp_config()
589 self.assertIsNotNone(self.p.query_vpp_config())
590 if self.sa.is_initiator:
591 self.sa.generate_dh_data()
592 self.vapi.cli('ikev2 set logging level 4')
593 self.vapi.cli('event-lo clear')
595 def assert_counter(self, count, name, version='ip4'):
596 node_name = '/err/ikev2-%s/' % version + name
597 self.assertEqual(count, self.statistics.get_err_counter(node_name))
599 def create_rekey_request(self):
600 sa, first_payload = self.generate_auth_payload(is_rekey=True)
601 header = ikev2.IKEv2(
602 init_SPI=self.sa.ispi,
603 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
604 flags='Initiator', exch_type='CREATE_CHILD_SA')
606 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
607 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
608 self.sa.dport, self.sa.natt, self.ip6)
610 def create_empty_request(self):
611 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
612 id=self.sa.new_msg_id(), flags='Initiator',
613 exch_type='INFORMATIONAL',
614 next_payload='Encrypted')
616 msg = self.encrypt_ike_msg(header, b'', None)
617 return self.create_packet(self.pg0, msg, self.sa.sport,
618 self.sa.dport, self.sa.natt, self.ip6)
620 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
623 src_ip = src_if.remote_ip6
624 dst_ip = src_if.local_ip6
627 src_ip = src_if.remote_ip4
628 dst_ip = src_if.local_ip4
630 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
631 ip_layer(src=src_ip, dst=dst_ip) /
632 UDP(sport=sport, dport=dport))
634 # insert non ESP marker
635 res = res / Raw(b'\x00' * 4)
638 def verify_udp(self, udp):
639 self.assertEqual(udp.sport, self.sa.sport)
640 self.assertEqual(udp.dport, self.sa.dport)
642 def get_ike_header(self, packet):
644 ih = packet[ikev2.IKEv2]
645 ih = self.verify_and_remove_non_esp_marker(ih)
646 except IndexError as e:
647 # this is a workaround for getting IKEv2 layer as both ikev2 and
648 # ipsec register for port 4500
650 ih = self.verify_and_remove_non_esp_marker(esp)
651 self.assertEqual(ih.version, 0x20)
652 self.assertNotIn('Version', ih.flags)
655 def verify_and_remove_non_esp_marker(self, packet):
657 # if we are in nat traversal mode check for non esp marker
660 self.assertEqual(data[:4], b'\x00' * 4)
661 return ikev2.IKEv2(data[4:])
665 def encrypt_ike_msg(self, header, plain, first_payload):
666 if self.sa.ike_crypto == 'AES-GCM-16ICV':
667 data = self.sa.ike_crypto_alg.pad(raw(plain))
668 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
669 len(ikev2.IKEv2_payload_Encrypted())
670 tlen = plen + len(ikev2.IKEv2())
673 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
677 encr = self.sa.encrypt(raw(plain), raw(res))
678 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
679 length=plen, load=encr)
682 encr = self.sa.encrypt(raw(plain))
683 trunc_len = self.sa.ike_integ_alg.trunc_len
684 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
685 tlen = plen + len(ikev2.IKEv2())
687 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
688 length=plen, load=encr)
692 integ_data = raw(res)
693 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
694 self.sa.my_authkey, integ_data)
695 res = res / Raw(hmac_data[:trunc_len])
696 assert(len(res) == tlen)
699 def verify_udp_encap(self, ipsec_sa):
700 e = VppEnum.vl_api_ipsec_sad_flags_t
701 if self.sa.udp_encap or self.sa.natt:
702 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
704 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706 def verify_ipsec_sas(self, is_rekey=False):
707 sas = self.vapi.ipsec_sa_dump()
709 # after rekey there is a short period of time in which old
710 # inbound SA is still present
714 self.assertEqual(len(sas), sa_count)
715 if self.sa.is_initiator:
730 c = self.sa.child_sas[0]
732 self.verify_udp_encap(sa0)
733 self.verify_udp_encap(sa1)
734 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
735 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
736 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
738 if self.sa.esp_integ is None:
741 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
742 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
743 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
746 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
747 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
748 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
749 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
753 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
754 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
755 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
756 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
758 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
759 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
761 def verify_keymat(self, api_keys, keys, name):
762 km = getattr(keys, name)
763 api_km = getattr(api_keys, name)
764 api_km_len = getattr(api_keys, name + '_len')
765 self.assertEqual(len(km), api_km_len)
766 self.assertEqual(km, api_km[:api_km_len])
768 def verify_id(self, api_id, exp_id):
769 self.assertEqual(api_id.type, IDType.value(exp_id.type))
770 self.assertEqual(api_id.data_len, exp_id.data_len)
771 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
773 def verify_ike_sas(self):
774 r = self.vapi.ikev2_sa_dump()
775 self.assertEqual(len(r), 1)
777 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
778 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
780 if self.sa.is_initiator:
781 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
782 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
784 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
785 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
787 if self.sa.is_initiator:
788 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
789 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
791 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
792 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
793 self.verify_keymat(sa.keys, self.sa, 'sk_d')
794 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
795 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
797 self.verify_keymat(sa.keys, self.sa, 'sk_er')
798 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
799 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
801 self.assertEqual(sa.i_id.type, self.sa.id_type)
802 self.assertEqual(sa.r_id.type, self.sa.id_type)
803 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
804 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
805 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
806 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
808 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
809 self.assertEqual(len(r), 1)
811 self.assertEqual(csa.sa_index, sa.sa_index)
812 c = self.sa.child_sas[0]
813 if hasattr(c, 'sk_ai'):
814 self.verify_keymat(csa.keys, c, 'sk_ai')
815 self.verify_keymat(csa.keys, c, 'sk_ar')
816 self.verify_keymat(csa.keys, c, 'sk_ei')
817 self.verify_keymat(csa.keys, c, 'sk_er')
818 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
819 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
821 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
824 r = self.vapi.ikev2_traffic_selector_dump(
825 is_initiator=True, sa_index=sa.sa_index,
826 child_sa_index=csa.child_sa_index)
827 self.assertEqual(len(r), 1)
829 self.verify_ts(r[0].ts, tsi[0], True)
831 r = self.vapi.ikev2_traffic_selector_dump(
832 is_initiator=False, sa_index=sa.sa_index,
833 child_sa_index=csa.child_sa_index)
834 self.assertEqual(len(r), 1)
835 self.verify_ts(r[0].ts, tsr[0], False)
837 n = self.vapi.ikev2_nonce_get(is_initiator=True,
838 sa_index=sa.sa_index)
839 self.verify_nonce(n, self.sa.i_nonce)
840 n = self.vapi.ikev2_nonce_get(is_initiator=False,
841 sa_index=sa.sa_index)
842 self.verify_nonce(n, self.sa.r_nonce)
844 def verify_nonce(self, api_nonce, nonce):
845 self.assertEqual(api_nonce.data_len, len(nonce))
846 self.assertEqual(api_nonce.nonce, nonce)
848 def verify_ts(self, api_ts, ts, is_initiator):
850 self.assertTrue(api_ts.is_local)
852 self.assertFalse(api_ts.is_local)
855 self.assertEqual(api_ts.start_addr,
856 IPv4Address(ts.starting_address_v4))
857 self.assertEqual(api_ts.end_addr,
858 IPv4Address(ts.ending_address_v4))
860 self.assertEqual(api_ts.start_addr,
861 IPv6Address(ts.starting_address_v6))
862 self.assertEqual(api_ts.end_addr,
863 IPv6Address(ts.ending_address_v6))
864 self.assertEqual(api_ts.start_port, ts.start_port)
865 self.assertEqual(api_ts.end_port, ts.end_port)
866 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
869 class TemplateInitiator(IkePeer):
870 """ initiator test template """
872 def initiate_del_sa_from_initiator(self):
873 ispi = int.from_bytes(self.sa.ispi, 'little')
874 self.pg0.enable_capture()
876 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
877 capture = self.pg0.get_capture(1)
878 ih = self.get_ike_header(capture[0])
879 self.assertNotIn('Response', ih.flags)
880 self.assertIn('Initiator', ih.flags)
881 self.assertEqual(ih.init_SPI, self.sa.ispi)
882 self.assertEqual(ih.resp_SPI, self.sa.rspi)
883 plain = self.sa.hmac_and_decrypt(ih)
884 d = ikev2.IKEv2_payload_Delete(plain)
885 self.assertEqual(d.proto, 1) # proto=IKEv2
886 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
887 flags='Response', exch_type='INFORMATIONAL',
888 id=ih.id, next_payload='Encrypted')
889 resp = self.encrypt_ike_msg(header, b'', None)
890 self.send_and_assert_no_replies(self.pg0, resp)
892 def verify_del_sa(self, packet):
893 ih = self.get_ike_header(packet)
894 self.assertEqual(ih.id, self.sa.msg_id)
895 self.assertEqual(ih.exch_type, 37) # exchange informational
896 self.assertIn('Response', ih.flags)
897 self.assertIn('Initiator', ih.flags)
898 plain = self.sa.hmac_and_decrypt(ih)
899 self.assertEqual(plain, b'')
901 def initiate_del_sa_from_responder(self):
902 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
903 exch_type='INFORMATIONAL',
904 id=self.sa.new_msg_id())
905 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
906 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
907 packet = self.create_packet(self.pg0, ike_msg,
908 self.sa.sport, self.sa.dport,
909 self.sa.natt, self.ip6)
910 self.pg0.add_stream(packet)
911 self.pg0.enable_capture()
913 capture = self.pg0.get_capture(1)
914 self.verify_del_sa(capture[0])
917 def find_notify_payload(packet, notify_type):
918 n = packet[ikev2.IKEv2_payload_Notify]
920 if n.type == notify_type:
925 def verify_nat_detection(self, packet):
932 # NAT_DETECTION_SOURCE_IP
933 s = self.find_notify_payload(packet, 16388)
934 self.assertIsNotNone(s)
935 src_sha = self.sa.compute_nat_sha1(
936 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
937 self.assertEqual(s.load, src_sha)
939 # NAT_DETECTION_DESTINATION_IP
940 s = self.find_notify_payload(packet, 16389)
941 self.assertIsNotNone(s)
942 dst_sha = self.sa.compute_nat_sha1(
943 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
944 self.assertEqual(s.load, dst_sha)
946 def verify_sa_init_request(self, packet):
948 self.sa.dport = udp.sport
949 ih = packet[ikev2.IKEv2]
950 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
951 self.assertEqual(ih.exch_type, 34) # SA_INIT
952 self.sa.ispi = ih.init_SPI
953 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
954 self.assertIn('Initiator', ih.flags)
955 self.assertNotIn('Response', ih.flags)
956 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
957 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
959 prop = packet[ikev2.IKEv2_payload_Proposal]
960 self.assertEqual(prop.proto, 1) # proto = ikev2
961 self.assertEqual(prop.proposal, 1)
962 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
963 self.assertEqual(prop.trans[0].transform_id,
964 self.p.ike_transforms['crypto_alg'])
965 self.assertEqual(prop.trans[1].transform_type, 2) # prf
966 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
967 self.assertEqual(prop.trans[2].transform_type, 4) # dh
968 self.assertEqual(prop.trans[2].transform_id,
969 self.p.ike_transforms['dh_group'])
971 self.verify_nat_detection(packet)
972 self.sa.set_ike_props(
973 crypto='AES-GCM-16ICV', crypto_key_len=32,
974 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
975 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
976 integ='SHA2-256-128')
977 self.sa.generate_dh_data()
978 self.sa.complete_dh_data()
981 def update_esp_transforms(self, trans, sa):
983 if trans.transform_type == 1: # ecryption
984 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
985 elif trans.transform_type == 3: # integrity
986 sa.esp_integ = INTEG_IDS[trans.transform_id]
987 trans = trans.payload
989 def verify_sa_auth_req(self, packet):
991 self.sa.dport = udp.sport
992 ih = self.get_ike_header(packet)
993 self.assertEqual(ih.resp_SPI, self.sa.rspi)
994 self.assertEqual(ih.init_SPI, self.sa.ispi)
995 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
996 self.assertIn('Initiator', ih.flags)
997 self.assertNotIn('Response', ih.flags)
1000 self.verify_udp(udp)
1001 self.assertEqual(ih.id, self.sa.msg_id + 1)
1003 plain = self.sa.hmac_and_decrypt(ih)
1004 idi = ikev2.IKEv2_payload_IDi(plain)
1005 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1006 self.assertEqual(idi.load, self.sa.i_id)
1007 self.assertEqual(idr.load, self.sa.r_id)
1008 prop = idi[ikev2.IKEv2_payload_Proposal]
1009 c = self.sa.child_sas[0]
1011 self.update_esp_transforms(
1012 prop[ikev2.IKEv2_payload_Transform], self.sa)
1014 def send_init_response(self):
1015 tr_attr = self.sa.ike_crypto_attr()
1016 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1017 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1018 key_length=tr_attr[0]) /
1019 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1020 transform_id=self.sa.ike_integ) /
1021 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1022 transform_id=self.sa.ike_prf_alg.name) /
1023 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1024 transform_id=self.sa.ike_dh))
1025 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1026 trans_nb=4, trans=trans))
1028 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1030 dst_address = b'\x0a\x0a\x0a\x0a'
1032 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1033 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1034 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1036 self.sa.init_resp_packet = (
1037 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1038 exch_type='IKE_SA_INIT', flags='Response') /
1039 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1040 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1041 group=self.sa.ike_dh,
1042 load=self.sa.my_dh_pub_key) /
1043 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1044 next_payload='Notify') /
1045 ikev2.IKEv2_payload_Notify(
1046 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1047 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1048 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1050 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1051 self.sa.sport, self.sa.dport,
1053 self.pg_send(self.pg0, ike_msg)
1054 capture = self.pg0.get_capture(1)
1055 self.verify_sa_auth_req(capture[0])
1057 def initiate_sa_init(self):
1058 self.pg0.enable_capture()
1060 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1062 capture = self.pg0.get_capture(1)
1063 self.verify_sa_init_request(capture[0])
1064 self.send_init_response()
1066 def send_auth_response(self):
1067 tr_attr = self.sa.esp_crypto_attr()
1068 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1069 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1070 key_length=tr_attr[0]) /
1071 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1072 transform_id=self.sa.esp_integ) /
1073 ikev2.IKEv2_payload_Transform(
1074 transform_type='Extended Sequence Number',
1075 transform_id='No ESN') /
1076 ikev2.IKEv2_payload_Transform(
1077 transform_type='Extended Sequence Number',
1078 transform_id='ESN'))
1080 c = self.sa.child_sas[0]
1081 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1082 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1084 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1085 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1086 IDtype=self.sa.id_type, load=self.sa.i_id) /
1087 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1088 IDtype=self.sa.id_type, load=self.sa.r_id) /
1089 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1090 auth_type=AuthMethod.value(self.sa.auth_method),
1091 load=self.sa.auth_data) /
1092 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1093 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1094 number_of_TSs=len(tsi),
1095 traffic_selector=tsi) /
1096 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1097 number_of_TSs=len(tsr),
1098 traffic_selector=tsr) /
1099 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1101 header = ikev2.IKEv2(
1102 init_SPI=self.sa.ispi,
1103 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1104 flags='Response', exch_type='IKE_AUTH')
1106 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1107 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1108 self.sa.dport, self.sa.natt, self.ip6)
1109 self.pg_send(self.pg0, packet)
1111 def test_initiator(self):
1112 self.initiate_sa_init()
1114 self.sa.calc_child_keys()
1115 self.send_auth_response()
1116 self.verify_ike_sas()
1119 class TemplateResponder(IkePeer):
1120 """ responder test template """
1122 def initiate_del_sa_from_responder(self):
1123 self.pg0.enable_capture()
1125 self.vapi.ikev2_initiate_del_ike_sa(
1126 ispi=int.from_bytes(self.sa.ispi, 'little'))
1127 capture = self.pg0.get_capture(1)
1128 ih = self.get_ike_header(capture[0])
1129 self.assertNotIn('Response', ih.flags)
1130 self.assertNotIn('Initiator', ih.flags)
1131 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1132 plain = self.sa.hmac_and_decrypt(ih)
1133 d = ikev2.IKEv2_payload_Delete(plain)
1134 self.assertEqual(d.proto, 1) # proto=IKEv2
1135 self.assertEqual(ih.init_SPI, self.sa.ispi)
1136 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1137 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1138 flags='Initiator+Response',
1139 exch_type='INFORMATIONAL',
1140 id=ih.id, next_payload='Encrypted')
1141 resp = self.encrypt_ike_msg(header, b'', None)
1142 self.send_and_assert_no_replies(self.pg0, resp)
1144 def verify_del_sa(self, packet):
1145 ih = self.get_ike_header(packet)
1146 self.assertEqual(ih.id, self.sa.msg_id)
1147 self.assertEqual(ih.exch_type, 37) # exchange informational
1148 self.assertIn('Response', ih.flags)
1149 self.assertNotIn('Initiator', ih.flags)
1150 self.assertEqual(ih.next_payload, 46) # Encrypted
1151 self.assertEqual(ih.init_SPI, self.sa.ispi)
1152 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1153 plain = self.sa.hmac_and_decrypt(ih)
1154 self.assertEqual(plain, b'')
1156 def initiate_del_sa_from_initiator(self):
1157 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1158 flags='Initiator', exch_type='INFORMATIONAL',
1159 id=self.sa.new_msg_id())
1160 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1161 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1162 packet = self.create_packet(self.pg0, ike_msg,
1163 self.sa.sport, self.sa.dport,
1164 self.sa.natt, self.ip6)
1165 self.pg0.add_stream(packet)
1166 self.pg0.enable_capture()
1168 capture = self.pg0.get_capture(1)
1169 self.verify_del_sa(capture[0])
1171 def send_sa_init_req(self):
1172 tr_attr = self.sa.ike_crypto_attr()
1173 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1174 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1175 key_length=tr_attr[0]) /
1176 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1177 transform_id=self.sa.ike_integ) /
1178 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1179 transform_id=self.sa.ike_prf_alg.name) /
1180 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1181 transform_id=self.sa.ike_dh))
1183 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1184 trans_nb=4, trans=trans))
1186 next_payload = None if self.ip6 else 'Notify'
1188 self.sa.init_req_packet = (
1189 ikev2.IKEv2(init_SPI=self.sa.ispi,
1190 flags='Initiator', exch_type='IKE_SA_INIT') /
1191 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1192 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1193 group=self.sa.ike_dh,
1194 load=self.sa.my_dh_pub_key) /
1195 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1196 load=self.sa.i_nonce))
1200 src_address = b'\x0a\x0a\x0a\x01'
1202 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1205 dst_address = b'\x0a\x0a\x0a\x0a'
1207 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1209 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1210 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1211 nat_src_detection = ikev2.IKEv2_payload_Notify(
1212 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1213 next_payload='Notify')
1214 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1215 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1216 self.sa.init_req_packet = (self.sa.init_req_packet /
1220 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1221 self.sa.sport, self.sa.dport,
1222 self.sa.natt, self.ip6)
1223 self.pg0.add_stream(ike_msg)
1224 self.pg0.enable_capture()
1226 capture = self.pg0.get_capture(1)
1227 self.verify_sa_init(capture[0])
1229 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1230 tr_attr = self.sa.esp_crypto_attr()
1231 last_payload = last_payload or 'Notify'
1232 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1233 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1234 key_length=tr_attr[0]) /
1235 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1236 transform_id=self.sa.esp_integ) /
1237 ikev2.IKEv2_payload_Transform(
1238 transform_type='Extended Sequence Number',
1239 transform_id='No ESN') /
1240 ikev2.IKEv2_payload_Transform(
1241 transform_type='Extended Sequence Number',
1242 transform_id='ESN'))
1244 c = self.sa.child_sas[0]
1245 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1246 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1248 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1249 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1250 auth_type=AuthMethod.value(self.sa.auth_method),
1251 load=self.sa.auth_data) /
1252 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1253 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1254 number_of_TSs=len(tsi), traffic_selector=tsi) /
1255 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1256 number_of_TSs=len(tsr), traffic_selector=tsr))
1259 first_payload = 'Nonce'
1260 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1261 next_payload='SA') / plain /
1262 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1263 proto='ESP', SPI=c.ispi))
1265 first_payload = 'IDi'
1266 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1267 IDtype=self.sa.id_type, load=self.sa.i_id) /
1268 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1269 IDtype=self.sa.id_type, load=self.sa.r_id))
1271 return plain, first_payload
1273 def send_sa_auth(self):
1274 plain, first_payload = self.generate_auth_payload(
1275 last_payload='Notify')
1276 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1277 header = ikev2.IKEv2(
1278 init_SPI=self.sa.ispi,
1279 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1280 flags='Initiator', exch_type='IKE_AUTH')
1282 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1283 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1284 self.sa.dport, self.sa.natt, self.ip6)
1285 self.pg0.add_stream(packet)
1286 self.pg0.enable_capture()
1288 capture = self.pg0.get_capture(1)
1289 self.verify_sa_auth_resp(capture[0])
1291 def verify_sa_init(self, packet):
1292 ih = self.get_ike_header(packet)
1294 self.assertEqual(ih.id, self.sa.msg_id)
1295 self.assertEqual(ih.exch_type, 34)
1296 self.assertIn('Response', ih.flags)
1297 self.assertEqual(ih.init_SPI, self.sa.ispi)
1298 self.assertNotEqual(ih.resp_SPI, 0)
1299 self.sa.rspi = ih.resp_SPI
1301 sa = ih[ikev2.IKEv2_payload_SA]
1302 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1303 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1304 except IndexError as e:
1305 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1306 self.logger.error(ih.show())
1308 self.sa.complete_dh_data()
1312 def verify_sa_auth_resp(self, packet):
1313 ike = self.get_ike_header(packet)
1315 self.verify_udp(udp)
1316 self.assertEqual(ike.id, self.sa.msg_id)
1317 plain = self.sa.hmac_and_decrypt(ike)
1318 idr = ikev2.IKEv2_payload_IDr(plain)
1319 prop = idr[ikev2.IKEv2_payload_Proposal]
1320 self.assertEqual(prop.SPIsize, 4)
1321 self.sa.child_sas[0].rspi = prop.SPI
1322 self.sa.calc_child_keys()
1324 IKE_NODE_SUFFIX = 'ip4'
1326 def verify_counters(self):
1327 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1328 self.assert_counter(1, 'exchange_sa_req', self.IKE_NODE_SUFFIX)
1329 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1331 def test_responder(self):
1332 self.send_sa_init_req()
1334 self.verify_ipsec_sas()
1335 self.verify_ike_sas()
1336 self.verify_counters()
1339 class Ikev2Params(object):
1340 def config_params(self, params={}):
1341 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1342 ei = VppEnum.vl_api_ipsec_integ_alg_t
1344 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1345 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1346 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1347 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1348 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1349 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1351 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1352 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1353 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1354 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1356 dpd_disabled = True if 'dpd_disabled' not in params else\
1357 params['dpd_disabled']
1359 self.vapi.cli('ikev2 dpd disable')
1360 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1361 not in params else params['del_sa_from_responder']
1362 i_natt = False if 'i_natt' not in params else params['i_natt']
1363 r_natt = False if 'r_natt' not in params else params['r_natt']
1364 self.p = Profile(self, 'pr1')
1365 self.ip6 = False if 'ip6' not in params else params['ip6']
1367 if 'auth' in params and params['auth'] == 'rsa-sig':
1368 auth_method = 'rsa-sig'
1369 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1370 self.vapi.ikev2_set_local_key(
1371 key_file=work_dir + params['server-key'])
1373 client_file = work_dir + params['client-cert']
1374 server_pem = open(work_dir + params['server-cert']).read()
1375 client_priv = open(work_dir + params['client-key']).read()
1376 client_priv = load_pem_private_key(str.encode(client_priv), None,
1378 self.peer_cert = x509.load_pem_x509_certificate(
1379 str.encode(server_pem),
1381 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1384 auth_data = b'$3cr3tpa$$w0rd'
1385 self.p.add_auth(method='shared-key', data=auth_data)
1386 auth_method = 'shared-key'
1389 is_init = True if 'is_initiator' not in params else\
1390 params['is_initiator']
1392 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1393 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1395 self.p.add_local_id(**idr)
1396 self.p.add_remote_id(**idi)
1398 self.p.add_local_id(**idi)
1399 self.p.add_remote_id(**idr)
1401 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1402 'loc_ts' not in params else params['loc_ts']
1403 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1404 'rem_ts' not in params else params['rem_ts']
1405 self.p.add_local_ts(**loc_ts)
1406 self.p.add_remote_ts(**rem_ts)
1407 if 'responder' in params:
1408 self.p.add_responder(params['responder'])
1409 if 'ike_transforms' in params:
1410 self.p.add_ike_transforms(params['ike_transforms'])
1411 if 'esp_transforms' in params:
1412 self.p.add_esp_transforms(params['esp_transforms'])
1414 udp_encap = False if 'udp_encap' not in params else\
1417 self.p.set_udp_encap(True)
1419 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1420 is_initiator=is_init,
1421 id_type=self.p.local_id['id_type'],
1422 i_natt=i_natt, r_natt=r_natt,
1423 priv_key=client_priv, auth_method=auth_method,
1424 auth_data=auth_data, udp_encap=udp_encap,
1425 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1427 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1428 params['ike-crypto']
1429 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1431 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1434 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1435 params['esp-crypto']
1436 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1439 self.sa.set_ike_props(
1440 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1441 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1442 self.sa.set_esp_props(
1443 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1447 class TestApi(VppTestCase):
1448 """ Test IKEV2 API """
1450 def setUpClass(cls):
1451 super(TestApi, cls).setUpClass()
1454 def tearDownClass(cls):
1455 super(TestApi, cls).tearDownClass()
1458 super(TestApi, self).tearDown()
1459 self.p1.remove_vpp_config()
1460 self.p2.remove_vpp_config()
1461 r = self.vapi.ikev2_profile_dump()
1462 self.assertEqual(len(r), 0)
1464 def configure_profile(self, cfg):
1465 p = Profile(self, cfg['name'])
1466 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1467 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1468 p.add_local_ts(**cfg['loc_ts'])
1469 p.add_remote_ts(**cfg['rem_ts'])
1470 p.add_responder(cfg['responder'])
1471 p.add_ike_transforms(cfg['ike_ts'])
1472 p.add_esp_transforms(cfg['esp_ts'])
1473 p.add_auth(**cfg['auth'])
1474 p.set_udp_encap(cfg['udp_encap'])
1475 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1476 if 'lifetime_data' in cfg:
1477 p.set_lifetime_data(cfg['lifetime_data'])
1478 if 'tun_itf' in cfg:
1479 p.set_tunnel_interface(cfg['tun_itf'])
1480 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1485 def test_profile_api(self):
1486 """ test profile dump API """
1491 'start_addr': '3.3.3.2',
1492 'end_addr': '3.3.3.3',
1498 'start_addr': '4.5.76.80',
1499 'end_addr': '2.3.4.6',
1506 'start_addr': 'ab::1',
1507 'end_addr': 'ab::4',
1513 'start_addr': 'cd::12',
1514 'end_addr': 'cd::13',
1520 'natt_disabled': True,
1521 'loc_id': ('fqdn', b'vpp.home'),
1522 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1525 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1528 'crypto_key_size': 32,
1533 'crypto_key_size': 24,
1535 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1537 'ipsec_over_udp_port': 4501,
1540 'lifetime_maxdata': 20192,
1541 'lifetime_jitter': 9,
1546 'loc_id': ('ip4-addr', b'192.168.2.1'),
1547 'rem_id': ('ip6-addr', b'abcd::1'),
1550 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1553 'crypto_key_size': 16,
1558 'crypto_key_size': 24,
1560 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1562 'ipsec_over_udp_port': 4600,
1565 self.p1 = self.configure_profile(conf['p1'])
1566 self.p2 = self.configure_profile(conf['p2'])
1568 r = self.vapi.ikev2_profile_dump()
1569 self.assertEqual(len(r), 2)
1570 self.verify_profile(r[0].profile, conf['p1'])
1571 self.verify_profile(r[1].profile, conf['p2'])
1573 def verify_id(self, api_id, cfg_id):
1574 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1575 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1577 def verify_ts(self, api_ts, cfg_ts):
1578 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1579 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1580 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1581 self.assertEqual(api_ts.start_addr,
1582 ip_address(text_type(cfg_ts['start_addr'])))
1583 self.assertEqual(api_ts.end_addr,
1584 ip_address(text_type(cfg_ts['end_addr'])))
1586 def verify_responder(self, api_r, cfg_r):
1587 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1588 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1590 def verify_transforms(self, api_ts, cfg_ts):
1591 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1592 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1593 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1595 def verify_ike_transforms(self, api_ts, cfg_ts):
1596 self.verify_transforms(api_ts, cfg_ts)
1597 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1599 def verify_esp_transforms(self, api_ts, cfg_ts):
1600 self.verify_transforms(api_ts, cfg_ts)
1602 def verify_auth(self, api_auth, cfg_auth):
1603 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1604 self.assertEqual(api_auth.data, cfg_auth['data'])
1605 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1607 def verify_lifetime_data(self, p, ld):
1608 self.assertEqual(p.lifetime, ld['lifetime'])
1609 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1610 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1611 self.assertEqual(p.handover, ld['handover'])
1613 def verify_profile(self, ap, cp):
1614 self.assertEqual(ap.name, cp['name'])
1615 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1616 self.verify_id(ap.loc_id, cp['loc_id'])
1617 self.verify_id(ap.rem_id, cp['rem_id'])
1618 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1619 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1620 self.verify_responder(ap.responder, cp['responder'])
1621 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1622 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1623 self.verify_auth(ap.auth, cp['auth'])
1624 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1625 self.assertTrue(natt_dis == ap.natt_disabled)
1627 if 'lifetime_data' in cp:
1628 self.verify_lifetime_data(ap, cp['lifetime_data'])
1629 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1631 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1633 self.assertEqual(ap.tun_itf, 0xffffffff)
1636 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1637 """ test responder - responder behind NAT """
1639 IKE_NODE_SUFFIX = 'ip4-natt'
1641 def config_tc(self):
1642 self.config_params({'r_natt': True})
1645 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1646 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1648 def config_tc(self):
1649 self.config_params({
1651 'is_initiator': False, # seen from test case perspective
1652 # thus vpp is initiator
1653 'responder': {'sw_if_index': self.pg0.sw_if_index,
1654 'addr': self.pg0.remote_ip4},
1655 'ike-crypto': ('AES-GCM-16ICV', 32),
1656 'ike-integ': 'NULL',
1657 'ike-dh': '3072MODPgr',
1659 'crypto_alg': 20, # "aes-gcm-16"
1660 'crypto_key_size': 256,
1661 'dh_group': 15, # "modp-3072"
1664 'crypto_alg': 12, # "aes-cbc"
1665 'crypto_key_size': 256,
1666 # "hmac-sha2-256-128"
1670 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1671 """ test ikev2 initiator - pre shared key auth """
1673 def config_tc(self):
1674 self.config_params({
1675 'is_initiator': False, # seen from test case perspective
1676 # thus vpp is initiator
1677 'responder': {'sw_if_index': self.pg0.sw_if_index,
1678 'addr': self.pg0.remote_ip4},
1679 'ike-crypto': ('AES-GCM-16ICV', 32),
1680 'ike-integ': 'NULL',
1681 'ike-dh': '3072MODPgr',
1683 'crypto_alg': 20, # "aes-gcm-16"
1684 'crypto_key_size': 256,
1685 'dh_group': 15, # "modp-3072"
1688 'crypto_alg': 12, # "aes-cbc"
1689 'crypto_key_size': 256,
1690 # "hmac-sha2-256-128"
1694 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1695 """ test initiator - request window size (1) """
1697 def rekey_respond(self, req, update_child_sa_data):
1698 ih = self.get_ike_header(req)
1699 plain = self.sa.hmac_and_decrypt(ih)
1700 sa = ikev2.IKEv2_payload_SA(plain)
1701 if update_child_sa_data:
1702 prop = sa[ikev2.IKEv2_payload_Proposal]
1703 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1704 self.sa.r_nonce = self.sa.i_nonce
1705 self.sa.child_sas[0].ispi = prop.SPI
1706 self.sa.child_sas[0].rspi = prop.SPI
1707 self.sa.calc_child_keys()
1709 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1710 flags='Response', exch_type=36,
1711 id=ih.id, next_payload='Encrypted')
1712 resp = self.encrypt_ike_msg(header, sa, 'SA')
1713 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1714 self.sa.dport, self.sa.natt, self.ip6)
1715 self.send_and_assert_no_replies(self.pg0, packet)
1717 def test_initiator(self):
1718 super(TestInitiatorRequestWindowSize, self).test_initiator()
1719 self.pg0.enable_capture()
1721 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1722 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1723 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1724 capture = self.pg0.get_capture(2)
1726 # reply in reverse order
1727 self.rekey_respond(capture[1], True)
1728 self.rekey_respond(capture[0], False)
1730 # verify that only the second request was accepted
1731 self.verify_ike_sas()
1732 self.verify_ipsec_sas(is_rekey=True)
1735 class TestInitiatorRekey(TestInitiatorPsk):
1736 """ test ikev2 initiator - rekey """
1738 def rekey_from_initiator(self):
1739 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1740 self.pg0.enable_capture()
1742 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1743 capture = self.pg0.get_capture(1)
1744 ih = self.get_ike_header(capture[0])
1745 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1746 self.assertNotIn('Response', ih.flags)
1747 self.assertIn('Initiator', ih.flags)
1748 plain = self.sa.hmac_and_decrypt(ih)
1749 sa = ikev2.IKEv2_payload_SA(plain)
1750 prop = sa[ikev2.IKEv2_payload_Proposal]
1751 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1752 self.sa.r_nonce = self.sa.i_nonce
1753 # update new responder SPI
1754 self.sa.child_sas[0].ispi = prop.SPI
1755 self.sa.child_sas[0].rspi = prop.SPI
1756 self.sa.calc_child_keys()
1757 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1758 flags='Response', exch_type=36,
1759 id=ih.id, next_payload='Encrypted')
1760 resp = self.encrypt_ike_msg(header, sa, 'SA')
1761 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1762 self.sa.dport, self.sa.natt, self.ip6)
1763 self.send_and_assert_no_replies(self.pg0, packet)
1765 def test_initiator(self):
1766 super(TestInitiatorRekey, self).test_initiator()
1767 self.rekey_from_initiator()
1768 self.verify_ike_sas()
1769 self.verify_ipsec_sas(is_rekey=True)
1772 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1773 """ test ikev2 initiator - delete IKE SA from responder """
1775 def config_tc(self):
1776 self.config_params({
1777 'del_sa_from_responder': True,
1778 'is_initiator': False, # seen from test case perspective
1779 # thus vpp is initiator
1780 'responder': {'sw_if_index': self.pg0.sw_if_index,
1781 'addr': self.pg0.remote_ip4},
1782 'ike-crypto': ('AES-GCM-16ICV', 32),
1783 'ike-integ': 'NULL',
1784 'ike-dh': '3072MODPgr',
1786 'crypto_alg': 20, # "aes-gcm-16"
1787 'crypto_key_size': 256,
1788 'dh_group': 15, # "modp-3072"
1791 'crypto_alg': 12, # "aes-cbc"
1792 'crypto_key_size': 256,
1793 # "hmac-sha2-256-128"
1797 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1798 """ test ikev2 responder - initiator behind NAT """
1800 IKE_NODE_SUFFIX = 'ip4-natt'
1802 def config_tc(self):
1807 class TestResponderPsk(TemplateResponder, Ikev2Params):
1808 """ test ikev2 responder - pre shared key auth """
1809 def config_tc(self):
1810 self.config_params()
1813 class TestResponderDpd(TestResponderPsk):
1815 Dead peer detection test
1817 def config_tc(self):
1818 self.config_params({'dpd_disabled': False})
1823 def test_responder(self):
1824 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1825 super(TestResponderDpd, self).test_responder()
1826 self.pg0.enable_capture()
1828 # capture empty request but don't reply
1829 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1830 ih = self.get_ike_header(capture[0])
1831 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1832 plain = self.sa.hmac_and_decrypt(ih)
1833 self.assertEqual(plain, b'')
1834 # wait for SA expiration
1836 ike_sas = self.vapi.ikev2_sa_dump()
1837 self.assertEqual(len(ike_sas), 0)
1838 ipsec_sas = self.vapi.ipsec_sa_dump()
1839 self.assertEqual(len(ipsec_sas), 0)
1842 class TestResponderRekey(TestResponderPsk):
1843 """ test ikev2 responder - rekey """
1845 def rekey_from_initiator(self):
1846 packet = self.create_rekey_request()
1847 self.pg0.add_stream(packet)
1848 self.pg0.enable_capture()
1850 capture = self.pg0.get_capture(1)
1851 ih = self.get_ike_header(capture[0])
1852 plain = self.sa.hmac_and_decrypt(ih)
1853 sa = ikev2.IKEv2_payload_SA(plain)
1854 prop = sa[ikev2.IKEv2_payload_Proposal]
1855 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1856 # update new responder SPI
1857 self.sa.child_sas[0].rspi = prop.SPI
1859 def test_responder(self):
1860 super(TestResponderRekey, self).test_responder()
1861 self.rekey_from_initiator()
1862 self.sa.calc_child_keys()
1863 self.verify_ike_sas()
1864 self.verify_ipsec_sas(is_rekey=True)
1867 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1868 """ test ikev2 responder - cert based auth """
1869 def config_tc(self):
1870 self.config_params({
1873 'server-key': 'server-key.pem',
1874 'client-key': 'client-key.pem',
1875 'client-cert': 'client-cert.pem',
1876 'server-cert': 'server-cert.pem'})
1879 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1880 (TemplateResponder, Ikev2Params):
1882 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1884 def config_tc(self):
1885 self.config_params({
1886 'ike-crypto': ('AES-CBC', 16),
1887 'ike-integ': 'SHA2-256-128',
1888 'esp-crypto': ('AES-CBC', 24),
1889 'esp-integ': 'SHA2-384-192',
1890 'ike-dh': '2048MODPgr'})
1893 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1894 (TemplateResponder, Ikev2Params):
1897 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1899 def config_tc(self):
1900 self.config_params({
1901 'ike-crypto': ('AES-CBC', 32),
1902 'ike-integ': 'SHA2-256-128',
1903 'esp-crypto': ('AES-GCM-16ICV', 32),
1904 'esp-integ': 'NULL',
1905 'ike-dh': '3072MODPgr'})
1908 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1913 IKE_NODE_SUFFIX = 'ip6'
1915 def config_tc(self):
1916 self.config_params({
1917 'del_sa_from_responder': True,
1920 'ike-crypto': ('AES-GCM-16ICV', 32),
1921 'ike-integ': 'NULL',
1922 'ike-dh': '2048MODPgr',
1923 'loc_ts': {'start_addr': 'ab:cd::0',
1924 'end_addr': 'ab:cd::10'},
1925 'rem_ts': {'start_addr': '11::0',
1926 'end_addr': '11::100'}})
1929 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1931 Test for keep alive messages
1934 def send_empty_req_from_responder(self):
1935 packet = self.create_empty_request()
1936 self.pg0.add_stream(packet)
1937 self.pg0.enable_capture()
1939 capture = self.pg0.get_capture(1)
1940 ih = self.get_ike_header(capture[0])
1941 self.assertEqual(ih.id, self.sa.msg_id)
1942 plain = self.sa.hmac_and_decrypt(ih)
1943 self.assertEqual(plain, b'')
1944 self.assert_counter(1, 'keepalive', 'ip4')
1946 def test_initiator(self):
1947 super(TestInitiatorKeepaliveMsg, self).test_initiator()
1948 self.send_empty_req_from_responder()
1951 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1952 """ malformed packet test """
1957 def config_tc(self):
1958 self.config_params()
1960 def create_ike_init_msg(self, length=None, payload=None):
1961 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1962 flags='Initiator', exch_type='IKE_SA_INIT')
1963 if payload is not None:
1965 return self.create_packet(self.pg0, msg, self.sa.sport,
1968 def verify_bad_packet_length(self):
1969 ike_msg = self.create_ike_init_msg(length=0xdead)
1970 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1971 self.assert_counter(self.pkt_count, 'bad_length')
1973 def verify_bad_sa_payload_length(self):
1974 p = ikev2.IKEv2_payload_SA(length=0xdead)
1975 ike_msg = self.create_ike_init_msg(payload=p)
1976 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1977 self.assert_counter(self.pkt_count, 'malformed_packet')
1979 def test_responder(self):
1980 self.pkt_count = 254
1981 self.verify_bad_packet_length()
1982 self.verify_bad_sa_payload_length()
1985 if __name__ == '__main__':
1986 unittest.main(testRunner=VppTestRunner)