3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
30 KEY_PAD = b"Key Pad for IKEv2"
37 # tuple structure is (p, g, key_len)
39 '2048MODPgr': (long_converter("""
40 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52 '3072MODPgr': (long_converter("""
53 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
72 class CryptoAlgo(object):
73 def __init__(self, name, cipher, mode):
77 if self.cipher is not None:
78 self.bs = self.cipher.block_size // 8
80 if self.name == 'AES-GCM-16ICV':
81 self.iv_len = GCM_IV_SIZE
85 def encrypt(self, data, key, aad=None):
86 iv = os.urandom(self.iv_len)
88 encryptor = Cipher(self.cipher(key), self.mode(iv),
89 default_backend()).encryptor()
90 return iv + encryptor.update(data) + encryptor.finalize()
92 salt = key[-SALT_SIZE:]
94 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95 default_backend()).encryptor()
96 encryptor.authenticate_additional_data(aad)
97 data = encryptor.update(data) + encryptor.finalize()
98 data += encryptor.tag[:GCM_ICV_SIZE]
101 def decrypt(self, data, key, aad=None, icv=None):
103 iv = data[:self.iv_len]
104 ct = data[self.iv_len:]
105 decryptor = Cipher(algorithms.AES(key),
107 default_backend()).decryptor()
108 return decryptor.update(ct) + decryptor.finalize()
110 salt = key[-SALT_SIZE:]
111 nonce = salt + data[:GCM_IV_SIZE]
112 ct = data[GCM_IV_SIZE:]
113 key = key[:-SALT_SIZE]
114 decryptor = Cipher(algorithms.AES(key),
115 self.mode(nonce, icv, len(icv)),
116 default_backend()).decryptor()
117 decryptor.authenticate_additional_data(aad)
118 return decryptor.update(ct) + decryptor.finalize()
121 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122 data = data + b'\x00' * (pad_len - 1)
123 return data + bytes([pad_len - 1])
126 class AuthAlgo(object):
127 def __init__(self, name, mac, mod, key_len, trunc_len=None):
131 self.key_len = key_len
132 self.trunc_len = trunc_len or key_len
136 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
143 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
151 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
169 class IKEv2ChildSA(object):
170 def __init__(self, local_ts, remote_ts, is_initiator):
178 self.local_ts = local_ts
179 self.remote_ts = remote_ts
182 class IKEv2SA(object):
183 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186 auth_method='shared-key', priv_key=None, i_natt=False,
187 r_natt=False, udp_encap=False):
188 self.udp_encap = udp_encap
198 self.dh_params = None
200 self.priv_key = priv_key
201 self.is_initiator = is_initiator
202 nonce = nonce or os.urandom(32)
203 self.auth_data = auth_data
206 if isinstance(id_type, str):
207 self.id_type = IDType.value(id_type)
209 self.id_type = id_type
210 self.auth_method = auth_method
211 if self.is_initiator:
212 self.rspi = 8 * b'\x00'
217 self.ispi = 8 * b'\x00'
219 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
222 def new_msg_id(self):
227 def my_dh_pub_key(self):
228 if self.is_initiator:
229 return self.i_dh_data
230 return self.r_dh_data
233 def peer_dh_pub_key(self):
234 if self.is_initiator:
235 return self.r_dh_data
236 return self.i_dh_data
240 return self.i_natt or self.r_natt
242 def compute_secret(self):
243 priv = self.dh_private_key
244 peer = self.peer_dh_pub_key
245 p, g, l = self.ike_group
246 return pow(int.from_bytes(peer, 'big'),
247 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249 def generate_dh_data(self):
251 if self.ike_dh not in DH:
252 raise NotImplementedError('%s not in DH group' % self.ike_dh)
254 if self.dh_params is None:
255 dhg = DH[self.ike_dh]
256 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
257 self.dh_params = pn.parameters(default_backend())
259 priv = self.dh_params.generate_private_key()
260 pub = priv.public_key()
261 x = priv.private_numbers().x
262 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
263 y = pub.public_numbers().y
265 if self.is_initiator:
266 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
268 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
270 def complete_dh_data(self):
271 self.dh_shared_secret = self.compute_secret()
273 def calc_child_keys(self):
274 prf = self.ike_prf_alg.mod()
275 s = self.i_nonce + self.r_nonce
276 c = self.child_sas[0]
278 encr_key_len = self.esp_crypto_key_len
279 integ_key_len = self.esp_integ_alg.key_len
280 salt_len = 0 if integ_key_len else 4
282 l = (integ_key_len * 2 +
285 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
288 c.sk_ei = keymat[pos:pos+encr_key_len]
292 c.sk_ai = keymat[pos:pos+integ_key_len]
295 c.salt_ei = keymat[pos:pos+salt_len]
298 c.sk_er = keymat[pos:pos+encr_key_len]
302 c.sk_ar = keymat[pos:pos+integ_key_len]
305 c.salt_er = keymat[pos:pos+salt_len]
308 def calc_prfplus(self, prf, key, seed, length):
312 while len(r) < length and x < 255:
317 s = s + seed + bytes([x])
318 t = self.calc_prf(prf, key, s)
326 def calc_prf(self, prf, key, data):
327 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
332 prf = self.ike_prf_alg.mod()
333 # SKEYSEED = prf(Ni | Nr, g^ir)
334 s = self.i_nonce + self.r_nonce
335 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337 # calculate S = Ni | Nr | SPIi SPIr
338 s = s + self.ispi + self.rspi
340 prf_key_trunc = self.ike_prf_alg.trunc_len
341 encr_key_len = self.ike_crypto_key_len
342 tr_prf_key_len = self.ike_prf_alg.key_len
343 integ_key_len = self.ike_integ_alg.key_len
344 if integ_key_len == 0:
354 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
357 self.sk_d = keymat[:pos+prf_key_trunc]
360 self.sk_ai = keymat[pos:pos+integ_key_len]
362 self.sk_ar = keymat[pos:pos+integ_key_len]
365 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
366 pos += encr_key_len + salt_size
367 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
368 pos += encr_key_len + salt_size
370 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
371 pos += tr_prf_key_len
372 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374 def generate_authmsg(self, prf, packet):
375 if self.is_initiator:
383 data = bytes([self.id_type, 0, 0, 0]) + id
384 id_hash = self.calc_prf(prf, key, data)
385 return packet + nonce + id_hash
388 prf = self.ike_prf_alg.mod()
389 if self.is_initiator:
390 packet = self.init_req_packet
392 packet = self.init_resp_packet
393 authmsg = self.generate_authmsg(prf, raw(packet))
394 if self.auth_method == 'shared-key':
395 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
396 self.auth_data = self.calc_prf(prf, psk, authmsg)
397 elif self.auth_method == 'rsa-sig':
398 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
401 raise TypeError('unknown auth method type!')
403 def encrypt(self, data, aad=None):
404 data = self.ike_crypto_alg.pad(data)
405 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
408 def peer_authkey(self):
409 if self.is_initiator:
414 def my_authkey(self):
415 if self.is_initiator:
420 def my_cryptokey(self):
421 if self.is_initiator:
426 def peer_cryptokey(self):
427 if self.is_initiator:
431 def concat(self, alg, key_len):
432 return alg + '-' + str(key_len * 8)
435 def vpp_ike_cypto_alg(self):
436 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
439 def vpp_esp_cypto_alg(self):
440 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442 def verify_hmac(self, ikemsg):
443 integ_trunc = self.ike_integ_alg.trunc_len
444 exp_hmac = ikemsg[-integ_trunc:]
445 data = ikemsg[:-integ_trunc]
446 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
447 self.peer_authkey, data)
448 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450 def compute_hmac(self, integ, key, data):
451 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
455 def decrypt(self, data, aad=None, icv=None):
456 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
458 def hmac_and_decrypt(self, ike):
459 ep = ike[ikev2.IKEv2_payload_Encrypted]
460 if self.ike_crypto == 'AES-GCM-16ICV':
461 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
462 ct = ep.load[:-GCM_ICV_SIZE]
463 tag = ep.load[-GCM_ICV_SIZE:]
464 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
466 self.verify_hmac(raw(ike))
467 integ_trunc = self.ike_integ_alg.trunc_len
469 # remove ICV and decrypt payload
470 ct = ep.load[:-integ_trunc]
471 plain = self.decrypt(ct)
474 return plain[:-pad_len - 1]
476 def build_ts_addr(self, ts, version):
477 return {'starting_address_v' + version: ts['start_addr'],
478 'ending_address_v' + version: ts['end_addr']}
480 def generate_ts(self, is_ip4):
481 c = self.child_sas[0]
482 ts_data = {'IP_protocol_ID': 0,
486 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
487 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
488 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
489 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
492 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
493 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
494 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
496 if self.is_initiator:
497 return ([ts1], [ts2])
498 return ([ts2], [ts1])
500 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
501 if crypto not in CRYPTO_ALGOS:
502 raise TypeError('unsupported encryption algo %r' % crypto)
503 self.ike_crypto = crypto
504 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
505 self.ike_crypto_key_len = crypto_key_len
507 if integ not in AUTH_ALGOS:
508 raise TypeError('unsupported auth algo %r' % integ)
509 self.ike_integ = None if integ == 'NULL' else integ
510 self.ike_integ_alg = AUTH_ALGOS[integ]
512 if prf not in PRF_ALGOS:
513 raise TypeError('unsupported prf algo %r' % prf)
515 self.ike_prf_alg = PRF_ALGOS[prf]
517 self.ike_group = DH[self.ike_dh]
519 def set_esp_props(self, crypto, crypto_key_len, integ):
520 self.esp_crypto_key_len = crypto_key_len
521 if crypto not in CRYPTO_ALGOS:
522 raise TypeError('unsupported encryption algo %r' % crypto)
523 self.esp_crypto = crypto
524 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526 if integ not in AUTH_ALGOS:
527 raise TypeError('unsupported auth algo %r' % integ)
528 self.esp_integ = None if integ == 'NULL' else integ
529 self.esp_integ_alg = AUTH_ALGOS[integ]
531 def crypto_attr(self, key_len):
532 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
533 return (0x800e << 16 | key_len << 3, 12)
535 raise Exception('unsupported attribute type')
537 def ike_crypto_attr(self):
538 return self.crypto_attr(self.ike_crypto_key_len)
540 def esp_crypto_attr(self):
541 return self.crypto_attr(self.esp_crypto_key_len)
543 def compute_nat_sha1(self, ip, port, rspi=None):
546 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
547 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549 return digest.finalize()
552 class IkePeer(VppTestCase):
553 """ common class for initiator and responder """
557 import scapy.contrib.ikev2 as _ikev2
558 globals()['ikev2'] = _ikev2
559 super(IkePeer, cls).setUpClass()
560 cls.create_pg_interfaces(range(2))
561 for i in cls.pg_interfaces:
569 def tearDownClass(cls):
570 super(IkePeer, cls).tearDownClass()
573 super(IkePeer, self).tearDown()
574 if self.del_sa_from_responder:
575 self.initiate_del_sa_from_responder()
577 self.initiate_del_sa_from_initiator()
578 r = self.vapi.ikev2_sa_dump()
579 self.assertEqual(len(r), 0)
580 sas = self.vapi.ipsec_sa_dump()
581 self.assertEqual(len(sas), 0)
582 self.p.remove_vpp_config()
583 self.assertIsNone(self.p.query_vpp_config())
586 super(IkePeer, self).setUp()
588 self.p.add_vpp_config()
589 self.assertIsNotNone(self.p.query_vpp_config())
590 if self.sa.is_initiator:
591 self.sa.generate_dh_data()
592 self.vapi.cli('ikev2 set logging level 4')
593 self.vapi.cli('event-lo clear')
595 def create_rekey_request(self):
596 sa, first_payload = self.generate_auth_payload(is_rekey=True)
597 header = ikev2.IKEv2(
598 init_SPI=self.sa.ispi,
599 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
600 flags='Initiator', exch_type='CREATE_CHILD_SA')
602 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
603 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
604 self.sa.dport, self.sa.natt, self.ip6)
606 def create_empty_request(self):
607 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
608 id=self.sa.new_msg_id(), flags='Initiator',
609 exch_type='INFORMATIONAL',
610 next_payload='Encrypted')
612 msg = self.encrypt_ike_msg(header, b'', None)
613 return self.create_packet(self.pg0, msg, self.sa.sport,
614 self.sa.dport, self.sa.natt, self.ip6)
616 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
619 src_ip = src_if.remote_ip6
620 dst_ip = src_if.local_ip6
623 src_ip = src_if.remote_ip4
624 dst_ip = src_if.local_ip4
626 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
627 ip_layer(src=src_ip, dst=dst_ip) /
628 UDP(sport=sport, dport=dport))
630 # insert non ESP marker
631 res = res / Raw(b'\x00' * 4)
634 def verify_udp(self, udp):
635 self.assertEqual(udp.sport, self.sa.sport)
636 self.assertEqual(udp.dport, self.sa.dport)
638 def get_ike_header(self, packet):
640 ih = packet[ikev2.IKEv2]
641 ih = self.verify_and_remove_non_esp_marker(ih)
642 except IndexError as e:
643 # this is a workaround for getting IKEv2 layer as both ikev2 and
644 # ipsec register for port 4500
646 ih = self.verify_and_remove_non_esp_marker(esp)
647 self.assertEqual(ih.version, 0x20)
648 self.assertNotIn('Version', ih.flags)
651 def verify_and_remove_non_esp_marker(self, packet):
653 # if we are in nat traversal mode check for non esp marker
656 self.assertEqual(data[:4], b'\x00' * 4)
657 return ikev2.IKEv2(data[4:])
661 def encrypt_ike_msg(self, header, plain, first_payload):
662 if self.sa.ike_crypto == 'AES-GCM-16ICV':
663 data = self.sa.ike_crypto_alg.pad(raw(plain))
664 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
665 len(ikev2.IKEv2_payload_Encrypted())
666 tlen = plen + len(ikev2.IKEv2())
669 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
673 encr = self.sa.encrypt(raw(plain), raw(res))
674 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
675 length=plen, load=encr)
678 encr = self.sa.encrypt(raw(plain))
679 trunc_len = self.sa.ike_integ_alg.trunc_len
680 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
681 tlen = plen + len(ikev2.IKEv2())
683 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
684 length=plen, load=encr)
688 integ_data = raw(res)
689 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
690 self.sa.my_authkey, integ_data)
691 res = res / Raw(hmac_data[:trunc_len])
692 assert(len(res) == tlen)
695 def verify_udp_encap(self, ipsec_sa):
696 e = VppEnum.vl_api_ipsec_sad_flags_t
697 if self.sa.udp_encap or self.sa.natt:
698 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
700 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
702 def verify_ipsec_sas(self, is_rekey=False):
703 sas = self.vapi.ipsec_sa_dump()
705 # after rekey there is a short period of time in which old
706 # inbound SA is still present
710 self.assertEqual(len(sas), sa_count)
711 if self.sa.is_initiator:
726 c = self.sa.child_sas[0]
728 self.verify_udp_encap(sa0)
729 self.verify_udp_encap(sa1)
730 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
731 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
732 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
734 if self.sa.esp_integ is None:
737 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
738 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
739 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
742 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
743 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
744 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
745 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
749 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
750 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
751 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
752 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
754 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
755 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
757 def verify_keymat(self, api_keys, keys, name):
758 km = getattr(keys, name)
759 api_km = getattr(api_keys, name)
760 api_km_len = getattr(api_keys, name + '_len')
761 self.assertEqual(len(km), api_km_len)
762 self.assertEqual(km, api_km[:api_km_len])
764 def verify_id(self, api_id, exp_id):
765 self.assertEqual(api_id.type, IDType.value(exp_id.type))
766 self.assertEqual(api_id.data_len, exp_id.data_len)
767 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
769 def verify_ike_sas(self):
770 r = self.vapi.ikev2_sa_dump()
771 self.assertEqual(len(r), 1)
773 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
774 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
776 if self.sa.is_initiator:
777 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
778 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
780 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
781 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
783 if self.sa.is_initiator:
784 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
785 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
787 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
788 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
789 self.verify_keymat(sa.keys, self.sa, 'sk_d')
790 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
791 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
792 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
793 self.verify_keymat(sa.keys, self.sa, 'sk_er')
794 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
795 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
797 self.assertEqual(sa.i_id.type, self.sa.id_type)
798 self.assertEqual(sa.r_id.type, self.sa.id_type)
799 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
800 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
801 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
802 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
804 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
805 self.assertEqual(len(r), 1)
807 self.assertEqual(csa.sa_index, sa.sa_index)
808 c = self.sa.child_sas[0]
809 if hasattr(c, 'sk_ai'):
810 self.verify_keymat(csa.keys, c, 'sk_ai')
811 self.verify_keymat(csa.keys, c, 'sk_ar')
812 self.verify_keymat(csa.keys, c, 'sk_ei')
813 self.verify_keymat(csa.keys, c, 'sk_er')
814 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
815 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
817 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
820 r = self.vapi.ikev2_traffic_selector_dump(
821 is_initiator=True, sa_index=sa.sa_index,
822 child_sa_index=csa.child_sa_index)
823 self.assertEqual(len(r), 1)
825 self.verify_ts(r[0].ts, tsi[0], True)
827 r = self.vapi.ikev2_traffic_selector_dump(
828 is_initiator=False, sa_index=sa.sa_index,
829 child_sa_index=csa.child_sa_index)
830 self.assertEqual(len(r), 1)
831 self.verify_ts(r[0].ts, tsr[0], False)
833 n = self.vapi.ikev2_nonce_get(is_initiator=True,
834 sa_index=sa.sa_index)
835 self.verify_nonce(n, self.sa.i_nonce)
836 n = self.vapi.ikev2_nonce_get(is_initiator=False,
837 sa_index=sa.sa_index)
838 self.verify_nonce(n, self.sa.r_nonce)
840 def verify_nonce(self, api_nonce, nonce):
841 self.assertEqual(api_nonce.data_len, len(nonce))
842 self.assertEqual(api_nonce.nonce, nonce)
844 def verify_ts(self, api_ts, ts, is_initiator):
846 self.assertTrue(api_ts.is_local)
848 self.assertFalse(api_ts.is_local)
851 self.assertEqual(api_ts.start_addr,
852 IPv4Address(ts.starting_address_v4))
853 self.assertEqual(api_ts.end_addr,
854 IPv4Address(ts.ending_address_v4))
856 self.assertEqual(api_ts.start_addr,
857 IPv6Address(ts.starting_address_v6))
858 self.assertEqual(api_ts.end_addr,
859 IPv6Address(ts.ending_address_v6))
860 self.assertEqual(api_ts.start_port, ts.start_port)
861 self.assertEqual(api_ts.end_port, ts.end_port)
862 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
865 class TemplateInitiator(IkePeer):
866 """ initiator test template """
868 def initiate_del_sa_from_initiator(self):
869 ispi = int.from_bytes(self.sa.ispi, 'little')
870 self.pg0.enable_capture()
872 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
873 capture = self.pg0.get_capture(1)
874 ih = self.get_ike_header(capture[0])
875 self.assertNotIn('Response', ih.flags)
876 self.assertIn('Initiator', ih.flags)
877 self.assertEqual(ih.init_SPI, self.sa.ispi)
878 self.assertEqual(ih.resp_SPI, self.sa.rspi)
879 plain = self.sa.hmac_and_decrypt(ih)
880 d = ikev2.IKEv2_payload_Delete(plain)
881 self.assertEqual(d.proto, 1) # proto=IKEv2
882 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
883 flags='Response', exch_type='INFORMATIONAL',
884 id=ih.id, next_payload='Encrypted')
885 resp = self.encrypt_ike_msg(header, b'', None)
886 self.send_and_assert_no_replies(self.pg0, resp)
888 def verify_del_sa(self, packet):
889 ih = self.get_ike_header(packet)
890 self.assertEqual(ih.id, self.sa.msg_id)
891 self.assertEqual(ih.exch_type, 37) # exchange informational
892 self.assertIn('Response', ih.flags)
893 self.assertIn('Initiator', ih.flags)
894 plain = self.sa.hmac_and_decrypt(ih)
895 self.assertEqual(plain, b'')
897 def initiate_del_sa_from_responder(self):
898 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
899 exch_type='INFORMATIONAL',
900 id=self.sa.new_msg_id())
901 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
902 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
903 packet = self.create_packet(self.pg0, ike_msg,
904 self.sa.sport, self.sa.dport,
905 self.sa.natt, self.ip6)
906 self.pg0.add_stream(packet)
907 self.pg0.enable_capture()
909 capture = self.pg0.get_capture(1)
910 self.verify_del_sa(capture[0])
913 def find_notify_payload(packet, notify_type):
914 n = packet[ikev2.IKEv2_payload_Notify]
916 if n.type == notify_type:
921 def verify_nat_detection(self, packet):
928 # NAT_DETECTION_SOURCE_IP
929 s = self.find_notify_payload(packet, 16388)
930 self.assertIsNotNone(s)
931 src_sha = self.sa.compute_nat_sha1(
932 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
933 self.assertEqual(s.load, src_sha)
935 # NAT_DETECTION_DESTINATION_IP
936 s = self.find_notify_payload(packet, 16389)
937 self.assertIsNotNone(s)
938 dst_sha = self.sa.compute_nat_sha1(
939 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
940 self.assertEqual(s.load, dst_sha)
942 def verify_sa_init_request(self, packet):
944 self.sa.dport = udp.sport
945 ih = packet[ikev2.IKEv2]
946 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
947 self.assertEqual(ih.exch_type, 34) # SA_INIT
948 self.sa.ispi = ih.init_SPI
949 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
950 self.assertIn('Initiator', ih.flags)
951 self.assertNotIn('Response', ih.flags)
952 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
953 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
955 prop = packet[ikev2.IKEv2_payload_Proposal]
956 self.assertEqual(prop.proto, 1) # proto = ikev2
957 self.assertEqual(prop.proposal, 1)
958 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
959 self.assertEqual(prop.trans[0].transform_id,
960 self.p.ike_transforms['crypto_alg'])
961 self.assertEqual(prop.trans[1].transform_type, 2) # prf
962 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
963 self.assertEqual(prop.trans[2].transform_type, 4) # dh
964 self.assertEqual(prop.trans[2].transform_id,
965 self.p.ike_transforms['dh_group'])
967 self.verify_nat_detection(packet)
968 self.sa.set_ike_props(
969 crypto='AES-GCM-16ICV', crypto_key_len=32,
970 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
971 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
972 integ='SHA2-256-128')
973 self.sa.generate_dh_data()
974 self.sa.complete_dh_data()
977 def update_esp_transforms(self, trans, sa):
979 if trans.transform_type == 1: # ecryption
980 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
981 elif trans.transform_type == 3: # integrity
982 sa.esp_integ = INTEG_IDS[trans.transform_id]
983 trans = trans.payload
985 def verify_sa_auth_req(self, packet):
987 self.sa.dport = udp.sport
988 ih = self.get_ike_header(packet)
989 self.assertEqual(ih.resp_SPI, self.sa.rspi)
990 self.assertEqual(ih.init_SPI, self.sa.ispi)
991 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
992 self.assertIn('Initiator', ih.flags)
993 self.assertNotIn('Response', ih.flags)
997 self.assertEqual(ih.id, self.sa.msg_id + 1)
999 plain = self.sa.hmac_and_decrypt(ih)
1000 idi = ikev2.IKEv2_payload_IDi(plain)
1001 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1002 self.assertEqual(idi.load, self.sa.i_id)
1003 self.assertEqual(idr.load, self.sa.r_id)
1004 prop = idi[ikev2.IKEv2_payload_Proposal]
1005 c = self.sa.child_sas[0]
1007 self.update_esp_transforms(
1008 prop[ikev2.IKEv2_payload_Transform], self.sa)
1010 def send_init_response(self):
1011 tr_attr = self.sa.ike_crypto_attr()
1012 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1013 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1014 key_length=tr_attr[0]) /
1015 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1016 transform_id=self.sa.ike_integ) /
1017 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1018 transform_id=self.sa.ike_prf_alg.name) /
1019 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1020 transform_id=self.sa.ike_dh))
1021 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1022 trans_nb=4, trans=trans))
1024 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1026 dst_address = b'\x0a\x0a\x0a\x0a'
1028 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1029 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1030 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1032 self.sa.init_resp_packet = (
1033 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1034 exch_type='IKE_SA_INIT', flags='Response') /
1035 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1036 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1037 group=self.sa.ike_dh,
1038 load=self.sa.my_dh_pub_key) /
1039 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1040 next_payload='Notify') /
1041 ikev2.IKEv2_payload_Notify(
1042 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1043 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1044 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1046 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1047 self.sa.sport, self.sa.dport,
1049 self.pg_send(self.pg0, ike_msg)
1050 capture = self.pg0.get_capture(1)
1051 self.verify_sa_auth_req(capture[0])
1053 def initiate_sa_init(self):
1054 self.pg0.enable_capture()
1056 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1058 capture = self.pg0.get_capture(1)
1059 self.verify_sa_init_request(capture[0])
1060 self.send_init_response()
1062 def send_auth_response(self):
1063 tr_attr = self.sa.esp_crypto_attr()
1064 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1065 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1066 key_length=tr_attr[0]) /
1067 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1068 transform_id=self.sa.esp_integ) /
1069 ikev2.IKEv2_payload_Transform(
1070 transform_type='Extended Sequence Number',
1071 transform_id='No ESN') /
1072 ikev2.IKEv2_payload_Transform(
1073 transform_type='Extended Sequence Number',
1074 transform_id='ESN'))
1076 c = self.sa.child_sas[0]
1077 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1078 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1080 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1081 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1082 IDtype=self.sa.id_type, load=self.sa.i_id) /
1083 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1084 IDtype=self.sa.id_type, load=self.sa.r_id) /
1085 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1086 auth_type=AuthMethod.value(self.sa.auth_method),
1087 load=self.sa.auth_data) /
1088 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1089 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1090 number_of_TSs=len(tsi),
1091 traffic_selector=tsi) /
1092 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1093 number_of_TSs=len(tsr),
1094 traffic_selector=tsr) /
1095 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1097 header = ikev2.IKEv2(
1098 init_SPI=self.sa.ispi,
1099 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1100 flags='Response', exch_type='IKE_AUTH')
1102 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1103 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1104 self.sa.dport, self.sa.natt, self.ip6)
1105 self.pg_send(self.pg0, packet)
1107 def test_initiator(self):
1108 self.initiate_sa_init()
1110 self.sa.calc_child_keys()
1111 self.send_auth_response()
1112 self.verify_ike_sas()
1115 class TemplateResponder(IkePeer):
1116 """ responder test template """
1118 def initiate_del_sa_from_responder(self):
1119 self.pg0.enable_capture()
1121 self.vapi.ikev2_initiate_del_ike_sa(
1122 ispi=int.from_bytes(self.sa.ispi, 'little'))
1123 capture = self.pg0.get_capture(1)
1124 ih = self.get_ike_header(capture[0])
1125 self.assertNotIn('Response', ih.flags)
1126 self.assertNotIn('Initiator', ih.flags)
1127 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1128 plain = self.sa.hmac_and_decrypt(ih)
1129 d = ikev2.IKEv2_payload_Delete(plain)
1130 self.assertEqual(d.proto, 1) # proto=IKEv2
1131 self.assertEqual(ih.init_SPI, self.sa.ispi)
1132 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1133 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1134 flags='Initiator+Response',
1135 exch_type='INFORMATIONAL',
1136 id=ih.id, next_payload='Encrypted')
1137 resp = self.encrypt_ike_msg(header, b'', None)
1138 self.send_and_assert_no_replies(self.pg0, resp)
1140 def verify_del_sa(self, packet):
1141 ih = self.get_ike_header(packet)
1142 self.assertEqual(ih.id, self.sa.msg_id)
1143 self.assertEqual(ih.exch_type, 37) # exchange informational
1144 self.assertIn('Response', ih.flags)
1145 self.assertNotIn('Initiator', ih.flags)
1146 self.assertEqual(ih.next_payload, 46) # Encrypted
1147 self.assertEqual(ih.init_SPI, self.sa.ispi)
1148 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1149 plain = self.sa.hmac_and_decrypt(ih)
1150 self.assertEqual(plain, b'')
1152 def initiate_del_sa_from_initiator(self):
1153 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1154 flags='Initiator', exch_type='INFORMATIONAL',
1155 id=self.sa.new_msg_id())
1156 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1157 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1158 packet = self.create_packet(self.pg0, ike_msg,
1159 self.sa.sport, self.sa.dport,
1160 self.sa.natt, self.ip6)
1161 self.pg0.add_stream(packet)
1162 self.pg0.enable_capture()
1164 capture = self.pg0.get_capture(1)
1165 self.verify_del_sa(capture[0])
1167 def send_sa_init_req(self):
1168 tr_attr = self.sa.ike_crypto_attr()
1169 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1170 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1171 key_length=tr_attr[0]) /
1172 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1173 transform_id=self.sa.ike_integ) /
1174 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1175 transform_id=self.sa.ike_prf_alg.name) /
1176 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1177 transform_id=self.sa.ike_dh))
1179 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1180 trans_nb=4, trans=trans))
1182 next_payload = None if self.ip6 else 'Notify'
1184 self.sa.init_req_packet = (
1185 ikev2.IKEv2(init_SPI=self.sa.ispi,
1186 flags='Initiator', exch_type='IKE_SA_INIT') /
1187 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1188 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1189 group=self.sa.ike_dh,
1190 load=self.sa.my_dh_pub_key) /
1191 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1192 load=self.sa.i_nonce))
1196 src_address = b'\x0a\x0a\x0a\x01'
1198 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1201 dst_address = b'\x0a\x0a\x0a\x0a'
1203 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1205 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1206 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1207 nat_src_detection = ikev2.IKEv2_payload_Notify(
1208 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1209 next_payload='Notify')
1210 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1211 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1212 self.sa.init_req_packet = (self.sa.init_req_packet /
1216 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1217 self.sa.sport, self.sa.dport,
1218 self.sa.natt, self.ip6)
1219 self.pg0.add_stream(ike_msg)
1220 self.pg0.enable_capture()
1222 capture = self.pg0.get_capture(1)
1223 self.verify_sa_init(capture[0])
1225 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1226 tr_attr = self.sa.esp_crypto_attr()
1227 last_payload = last_payload or 'Notify'
1228 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1229 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1230 key_length=tr_attr[0]) /
1231 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1232 transform_id=self.sa.esp_integ) /
1233 ikev2.IKEv2_payload_Transform(
1234 transform_type='Extended Sequence Number',
1235 transform_id='No ESN') /
1236 ikev2.IKEv2_payload_Transform(
1237 transform_type='Extended Sequence Number',
1238 transform_id='ESN'))
1240 c = self.sa.child_sas[0]
1241 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1242 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1244 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1245 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1246 auth_type=AuthMethod.value(self.sa.auth_method),
1247 load=self.sa.auth_data) /
1248 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1249 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1250 number_of_TSs=len(tsi), traffic_selector=tsi) /
1251 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1252 number_of_TSs=len(tsr), traffic_selector=tsr))
1255 first_payload = 'Nonce'
1256 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1257 next_payload='SA') / plain /
1258 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1259 proto='ESP', SPI=c.ispi))
1261 first_payload = 'IDi'
1262 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1263 IDtype=self.sa.id_type, load=self.sa.i_id) /
1264 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1265 IDtype=self.sa.id_type, load=self.sa.r_id))
1267 return plain, first_payload
1269 def send_sa_auth(self):
1270 plain, first_payload = self.generate_auth_payload(
1271 last_payload='Notify')
1272 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1273 header = ikev2.IKEv2(
1274 init_SPI=self.sa.ispi,
1275 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1276 flags='Initiator', exch_type='IKE_AUTH')
1278 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1279 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1280 self.sa.dport, self.sa.natt, self.ip6)
1281 self.pg0.add_stream(packet)
1282 self.pg0.enable_capture()
1284 capture = self.pg0.get_capture(1)
1285 self.verify_sa_auth_resp(capture[0])
1287 def verify_sa_init(self, packet):
1288 ih = self.get_ike_header(packet)
1290 self.assertEqual(ih.id, self.sa.msg_id)
1291 self.assertEqual(ih.exch_type, 34)
1292 self.assertIn('Response', ih.flags)
1293 self.assertEqual(ih.init_SPI, self.sa.ispi)
1294 self.assertNotEqual(ih.resp_SPI, 0)
1295 self.sa.rspi = ih.resp_SPI
1297 sa = ih[ikev2.IKEv2_payload_SA]
1298 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1299 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1300 except IndexError as e:
1301 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1302 self.logger.error(ih.show())
1304 self.sa.complete_dh_data()
1308 def verify_sa_auth_resp(self, packet):
1309 ike = self.get_ike_header(packet)
1311 self.verify_udp(udp)
1312 self.assertEqual(ike.id, self.sa.msg_id)
1313 plain = self.sa.hmac_and_decrypt(ike)
1314 idr = ikev2.IKEv2_payload_IDr(plain)
1315 prop = idr[ikev2.IKEv2_payload_Proposal]
1316 self.assertEqual(prop.SPIsize, 4)
1317 self.sa.child_sas[0].rspi = prop.SPI
1318 self.sa.calc_child_keys()
1320 def test_responder(self):
1321 self.send_sa_init_req()
1323 self.verify_ipsec_sas()
1324 self.verify_ike_sas()
1327 class Ikev2Params(object):
1328 def config_params(self, params={}):
1329 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1330 ei = VppEnum.vl_api_ipsec_integ_alg_t
1332 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1333 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1334 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1335 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1336 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1337 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1339 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1340 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1341 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1342 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1344 dpd_disabled = True if 'dpd_disabled' not in params else\
1345 params['dpd_disabled']
1347 self.vapi.cli('ikev2 dpd disable')
1348 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1349 not in params else params['del_sa_from_responder']
1350 i_natt = False if 'i_natt' not in params else params['i_natt']
1351 r_natt = False if 'r_natt' not in params else params['r_natt']
1352 self.p = Profile(self, 'pr1')
1353 self.ip6 = False if 'ip6' not in params else params['ip6']
1355 if 'auth' in params and params['auth'] == 'rsa-sig':
1356 auth_method = 'rsa-sig'
1357 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1358 self.vapi.ikev2_set_local_key(
1359 key_file=work_dir + params['server-key'])
1361 client_file = work_dir + params['client-cert']
1362 server_pem = open(work_dir + params['server-cert']).read()
1363 client_priv = open(work_dir + params['client-key']).read()
1364 client_priv = load_pem_private_key(str.encode(client_priv), None,
1366 self.peer_cert = x509.load_pem_x509_certificate(
1367 str.encode(server_pem),
1369 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1372 auth_data = b'$3cr3tpa$$w0rd'
1373 self.p.add_auth(method='shared-key', data=auth_data)
1374 auth_method = 'shared-key'
1377 is_init = True if 'is_initiator' not in params else\
1378 params['is_initiator']
1380 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1381 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1383 self.p.add_local_id(**idr)
1384 self.p.add_remote_id(**idi)
1386 self.p.add_local_id(**idi)
1387 self.p.add_remote_id(**idr)
1389 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1390 'loc_ts' not in params else params['loc_ts']
1391 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1392 'rem_ts' not in params else params['rem_ts']
1393 self.p.add_local_ts(**loc_ts)
1394 self.p.add_remote_ts(**rem_ts)
1395 if 'responder' in params:
1396 self.p.add_responder(params['responder'])
1397 if 'ike_transforms' in params:
1398 self.p.add_ike_transforms(params['ike_transforms'])
1399 if 'esp_transforms' in params:
1400 self.p.add_esp_transforms(params['esp_transforms'])
1402 udp_encap = False if 'udp_encap' not in params else\
1405 self.p.set_udp_encap(True)
1407 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1408 is_initiator=is_init,
1409 id_type=self.p.local_id['id_type'],
1410 i_natt=i_natt, r_natt=r_natt,
1411 priv_key=client_priv, auth_method=auth_method,
1412 auth_data=auth_data, udp_encap=udp_encap,
1413 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1415 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1416 params['ike-crypto']
1417 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1419 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1422 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1423 params['esp-crypto']
1424 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1427 self.sa.set_ike_props(
1428 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1429 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1430 self.sa.set_esp_props(
1431 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1435 class TestApi(VppTestCase):
1436 """ Test IKEV2 API """
1438 def setUpClass(cls):
1439 super(TestApi, cls).setUpClass()
1442 def tearDownClass(cls):
1443 super(TestApi, cls).tearDownClass()
1446 super(TestApi, self).tearDown()
1447 self.p1.remove_vpp_config()
1448 self.p2.remove_vpp_config()
1449 r = self.vapi.ikev2_profile_dump()
1450 self.assertEqual(len(r), 0)
1452 def configure_profile(self, cfg):
1453 p = Profile(self, cfg['name'])
1454 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1455 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1456 p.add_local_ts(**cfg['loc_ts'])
1457 p.add_remote_ts(**cfg['rem_ts'])
1458 p.add_responder(cfg['responder'])
1459 p.add_ike_transforms(cfg['ike_ts'])
1460 p.add_esp_transforms(cfg['esp_ts'])
1461 p.add_auth(**cfg['auth'])
1462 p.set_udp_encap(cfg['udp_encap'])
1463 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1464 if 'lifetime_data' in cfg:
1465 p.set_lifetime_data(cfg['lifetime_data'])
1466 if 'tun_itf' in cfg:
1467 p.set_tunnel_interface(cfg['tun_itf'])
1468 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1473 def test_profile_api(self):
1474 """ test profile dump API """
1479 'start_addr': '3.3.3.2',
1480 'end_addr': '3.3.3.3',
1486 'start_addr': '4.5.76.80',
1487 'end_addr': '2.3.4.6',
1494 'start_addr': 'ab::1',
1495 'end_addr': 'ab::4',
1501 'start_addr': 'cd::12',
1502 'end_addr': 'cd::13',
1508 'natt_disabled': True,
1509 'loc_id': ('fqdn', b'vpp.home'),
1510 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1513 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1516 'crypto_key_size': 32,
1521 'crypto_key_size': 24,
1523 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1525 'ipsec_over_udp_port': 4501,
1528 'lifetime_maxdata': 20192,
1529 'lifetime_jitter': 9,
1534 'loc_id': ('ip4-addr', b'192.168.2.1'),
1535 'rem_id': ('ip6-addr', b'abcd::1'),
1538 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1541 'crypto_key_size': 16,
1546 'crypto_key_size': 24,
1548 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1550 'ipsec_over_udp_port': 4600,
1553 self.p1 = self.configure_profile(conf['p1'])
1554 self.p2 = self.configure_profile(conf['p2'])
1556 r = self.vapi.ikev2_profile_dump()
1557 self.assertEqual(len(r), 2)
1558 self.verify_profile(r[0].profile, conf['p1'])
1559 self.verify_profile(r[1].profile, conf['p2'])
1561 def verify_id(self, api_id, cfg_id):
1562 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1563 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1565 def verify_ts(self, api_ts, cfg_ts):
1566 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1567 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1568 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1569 self.assertEqual(api_ts.start_addr,
1570 ip_address(text_type(cfg_ts['start_addr'])))
1571 self.assertEqual(api_ts.end_addr,
1572 ip_address(text_type(cfg_ts['end_addr'])))
1574 def verify_responder(self, api_r, cfg_r):
1575 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1576 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1578 def verify_transforms(self, api_ts, cfg_ts):
1579 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1580 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1581 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1583 def verify_ike_transforms(self, api_ts, cfg_ts):
1584 self.verify_transforms(api_ts, cfg_ts)
1585 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1587 def verify_esp_transforms(self, api_ts, cfg_ts):
1588 self.verify_transforms(api_ts, cfg_ts)
1590 def verify_auth(self, api_auth, cfg_auth):
1591 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1592 self.assertEqual(api_auth.data, cfg_auth['data'])
1593 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1595 def verify_lifetime_data(self, p, ld):
1596 self.assertEqual(p.lifetime, ld['lifetime'])
1597 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1598 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1599 self.assertEqual(p.handover, ld['handover'])
1601 def verify_profile(self, ap, cp):
1602 self.assertEqual(ap.name, cp['name'])
1603 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1604 self.verify_id(ap.loc_id, cp['loc_id'])
1605 self.verify_id(ap.rem_id, cp['rem_id'])
1606 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1607 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1608 self.verify_responder(ap.responder, cp['responder'])
1609 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1610 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1611 self.verify_auth(ap.auth, cp['auth'])
1612 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1613 self.assertTrue(natt_dis == ap.natt_disabled)
1615 if 'lifetime_data' in cp:
1616 self.verify_lifetime_data(ap, cp['lifetime_data'])
1617 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1619 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1621 self.assertEqual(ap.tun_itf, 0xffffffff)
1624 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1625 """ test responder - responder behind NAT """
1627 def config_tc(self):
1628 self.config_params({'r_natt': True})
1631 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1632 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1634 def config_tc(self):
1635 self.config_params({
1637 'is_initiator': False, # seen from test case perspective
1638 # thus vpp is initiator
1639 'responder': {'sw_if_index': self.pg0.sw_if_index,
1640 'addr': self.pg0.remote_ip4},
1641 'ike-crypto': ('AES-GCM-16ICV', 32),
1642 'ike-integ': 'NULL',
1643 'ike-dh': '3072MODPgr',
1645 'crypto_alg': 20, # "aes-gcm-16"
1646 'crypto_key_size': 256,
1647 'dh_group': 15, # "modp-3072"
1650 'crypto_alg': 12, # "aes-cbc"
1651 'crypto_key_size': 256,
1652 # "hmac-sha2-256-128"
1656 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1657 """ test ikev2 initiator - pre shared key auth """
1659 def config_tc(self):
1660 self.config_params({
1661 'is_initiator': False, # seen from test case perspective
1662 # thus vpp is initiator
1663 'responder': {'sw_if_index': self.pg0.sw_if_index,
1664 'addr': self.pg0.remote_ip4},
1665 'ike-crypto': ('AES-GCM-16ICV', 32),
1666 'ike-integ': 'NULL',
1667 'ike-dh': '3072MODPgr',
1669 'crypto_alg': 20, # "aes-gcm-16"
1670 'crypto_key_size': 256,
1671 'dh_group': 15, # "modp-3072"
1674 'crypto_alg': 12, # "aes-cbc"
1675 'crypto_key_size': 256,
1676 # "hmac-sha2-256-128"
1680 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1681 """ test initiator - request window size (1) """
1683 def rekey_respond(self, req, update_child_sa_data):
1684 ih = self.get_ike_header(req)
1685 plain = self.sa.hmac_and_decrypt(ih)
1686 sa = ikev2.IKEv2_payload_SA(plain)
1687 if update_child_sa_data:
1688 prop = sa[ikev2.IKEv2_payload_Proposal]
1689 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1690 self.sa.r_nonce = self.sa.i_nonce
1691 self.sa.child_sas[0].ispi = prop.SPI
1692 self.sa.child_sas[0].rspi = prop.SPI
1693 self.sa.calc_child_keys()
1695 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1696 flags='Response', exch_type=36,
1697 id=ih.id, next_payload='Encrypted')
1698 resp = self.encrypt_ike_msg(header, sa, 'SA')
1699 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1700 self.sa.dport, self.sa.natt, self.ip6)
1701 self.send_and_assert_no_replies(self.pg0, packet)
1703 def test_initiator(self):
1704 super(TestInitiatorRequestWindowSize, self).test_initiator()
1705 self.pg0.enable_capture()
1707 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1708 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1709 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1710 capture = self.pg0.get_capture(2)
1712 # reply in reverse order
1713 self.rekey_respond(capture[1], True)
1714 self.rekey_respond(capture[0], False)
1716 # verify that only the second request was accepted
1717 self.verify_ike_sas()
1718 self.verify_ipsec_sas(is_rekey=True)
1721 class TestInitiatorRekey(TestInitiatorPsk):
1722 """ test ikev2 initiator - rekey """
1724 def rekey_from_initiator(self):
1725 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1726 self.pg0.enable_capture()
1728 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1729 capture = self.pg0.get_capture(1)
1730 ih = self.get_ike_header(capture[0])
1731 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1732 self.assertNotIn('Response', ih.flags)
1733 self.assertIn('Initiator', ih.flags)
1734 plain = self.sa.hmac_and_decrypt(ih)
1735 sa = ikev2.IKEv2_payload_SA(plain)
1736 prop = sa[ikev2.IKEv2_payload_Proposal]
1737 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1738 self.sa.r_nonce = self.sa.i_nonce
1739 # update new responder SPI
1740 self.sa.child_sas[0].ispi = prop.SPI
1741 self.sa.child_sas[0].rspi = prop.SPI
1742 self.sa.calc_child_keys()
1743 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1744 flags='Response', exch_type=36,
1745 id=ih.id, next_payload='Encrypted')
1746 resp = self.encrypt_ike_msg(header, sa, 'SA')
1747 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1748 self.sa.dport, self.sa.natt, self.ip6)
1749 self.send_and_assert_no_replies(self.pg0, packet)
1751 def test_initiator(self):
1752 super(TestInitiatorRekey, self).test_initiator()
1753 self.rekey_from_initiator()
1754 self.verify_ike_sas()
1755 self.verify_ipsec_sas(is_rekey=True)
1758 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1759 """ test ikev2 initiator - delete IKE SA from responder """
1761 def config_tc(self):
1762 self.config_params({
1763 'del_sa_from_responder': True,
1764 'is_initiator': False, # seen from test case perspective
1765 # thus vpp is initiator
1766 'responder': {'sw_if_index': self.pg0.sw_if_index,
1767 'addr': self.pg0.remote_ip4},
1768 'ike-crypto': ('AES-GCM-16ICV', 32),
1769 'ike-integ': 'NULL',
1770 'ike-dh': '3072MODPgr',
1772 'crypto_alg': 20, # "aes-gcm-16"
1773 'crypto_key_size': 256,
1774 'dh_group': 15, # "modp-3072"
1777 'crypto_alg': 12, # "aes-cbc"
1778 'crypto_key_size': 256,
1779 # "hmac-sha2-256-128"
1783 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1784 """ test ikev2 responder - initiator behind NAT """
1785 def config_tc(self):
1790 class TestResponderPsk(TemplateResponder, Ikev2Params):
1791 """ test ikev2 responder - pre shared key auth """
1792 def config_tc(self):
1793 self.config_params()
1796 class TestResponderDpd(TestResponderPsk):
1798 Dead peer detection test
1800 def config_tc(self):
1801 self.config_params({'dpd_disabled': False})
1806 def test_responder(self):
1807 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1808 super(TestResponderDpd, self).test_responder()
1809 self.pg0.enable_capture()
1811 # capture empty request but don't reply
1812 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1813 ih = self.get_ike_header(capture[0])
1814 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1815 plain = self.sa.hmac_and_decrypt(ih)
1816 self.assertEqual(plain, b'')
1817 # wait for SA expiration
1819 ike_sas = self.vapi.ikev2_sa_dump()
1820 self.assertEqual(len(ike_sas), 0)
1821 ipsec_sas = self.vapi.ipsec_sa_dump()
1822 self.assertEqual(len(ipsec_sas), 0)
1825 class TestResponderRekey(TestResponderPsk):
1826 """ test ikev2 responder - rekey """
1828 def rekey_from_initiator(self):
1829 packet = self.create_rekey_request()
1830 self.pg0.add_stream(packet)
1831 self.pg0.enable_capture()
1833 capture = self.pg0.get_capture(1)
1834 ih = self.get_ike_header(capture[0])
1835 plain = self.sa.hmac_and_decrypt(ih)
1836 sa = ikev2.IKEv2_payload_SA(plain)
1837 prop = sa[ikev2.IKEv2_payload_Proposal]
1838 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1839 # update new responder SPI
1840 self.sa.child_sas[0].rspi = prop.SPI
1842 def test_responder(self):
1843 super(TestResponderRekey, self).test_responder()
1844 self.rekey_from_initiator()
1845 self.sa.calc_child_keys()
1846 self.verify_ike_sas()
1847 self.verify_ipsec_sas(is_rekey=True)
1850 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1851 """ test ikev2 responder - cert based auth """
1852 def config_tc(self):
1853 self.config_params({
1856 'server-key': 'server-key.pem',
1857 'client-key': 'client-key.pem',
1858 'client-cert': 'client-cert.pem',
1859 'server-cert': 'server-cert.pem'})
1862 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1863 (TemplateResponder, Ikev2Params):
1865 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1867 def config_tc(self):
1868 self.config_params({
1869 'ike-crypto': ('AES-CBC', 16),
1870 'ike-integ': 'SHA2-256-128',
1871 'esp-crypto': ('AES-CBC', 24),
1872 'esp-integ': 'SHA2-384-192',
1873 'ike-dh': '2048MODPgr'})
1876 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1877 (TemplateResponder, Ikev2Params):
1879 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1881 def config_tc(self):
1882 self.config_params({
1883 'ike-crypto': ('AES-CBC', 32),
1884 'ike-integ': 'SHA2-256-128',
1885 'esp-crypto': ('AES-GCM-16ICV', 32),
1886 'esp-integ': 'NULL',
1887 'ike-dh': '3072MODPgr'})
1890 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1894 def config_tc(self):
1895 self.config_params({
1896 'del_sa_from_responder': True,
1899 'ike-crypto': ('AES-GCM-16ICV', 32),
1900 'ike-integ': 'NULL',
1901 'ike-dh': '2048MODPgr',
1902 'loc_ts': {'start_addr': 'ab:cd::0',
1903 'end_addr': 'ab:cd::10'},
1904 'rem_ts': {'start_addr': '11::0',
1905 'end_addr': '11::100'}})
1908 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1910 Test for keep alive messages
1913 def send_empty_req_from_responder(self):
1914 packet = self.create_empty_request()
1915 self.pg0.add_stream(packet)
1916 self.pg0.enable_capture()
1918 capture = self.pg0.get_capture(1)
1919 ih = self.get_ike_header(capture[0])
1920 self.assertEqual(ih.id, self.sa.msg_id)
1921 plain = self.sa.hmac_and_decrypt(ih)
1922 self.assertEqual(plain, b'')
1924 def test_initiator(self):
1925 super(TestInitiatorKeepaliveMsg, self).test_initiator()
1926 self.send_empty_req_from_responder()
1929 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1930 """ malformed packet test """
1935 def config_tc(self):
1936 self.config_params()
1938 def assert_counter(self, count, name, version='ip4'):
1939 node_name = '/err/ikev2-%s/' % version + name
1940 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1942 def create_ike_init_msg(self, length=None, payload=None):
1943 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1944 flags='Initiator', exch_type='IKE_SA_INIT')
1945 if payload is not None:
1947 return self.create_packet(self.pg0, msg, self.sa.sport,
1950 def verify_bad_packet_length(self):
1951 ike_msg = self.create_ike_init_msg(length=0xdead)
1952 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1953 self.assert_counter(self.pkt_count, 'Bad packet length')
1955 def verify_bad_sa_payload_length(self):
1956 p = ikev2.IKEv2_payload_SA(length=0xdead)
1957 ike_msg = self.create_ike_init_msg(payload=p)
1958 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1959 self.assert_counter(self.pkt_count, 'Malformed packet')
1961 def test_responder(self):
1962 self.pkt_count = 254
1963 self.verify_bad_packet_length()
1964 self.verify_bad_sa_payload_length()
1967 if __name__ == '__main__':
1968 unittest.main(testRunner=VppTestRunner)