3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 from scapy.layers.ipsec import ESP
16 from scapy.layers.inet import IP, UDP, Ether
17 from scapy.layers.inet6 import IPv6
18 from scapy.packet import raw, Raw
19 from scapy.utils import long_converter
20 from framework import VppTestCase, VppTestRunner
21 from vpp_ikev2 import Profile, IDType, AuthMethod
22 from vpp_papi import VppEnum
29 KEY_PAD = b"Key Pad for IKEv2"
36 # tuple structure is (p, g, key_len)
38 '2048MODPgr': (long_converter("""
39 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
40 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
41 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
42 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
43 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
44 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
45 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
46 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
47 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
48 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
49 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
51 '3072MODPgr': (long_converter("""
52 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
63 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
64 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
65 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
66 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
67 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
71 class CryptoAlgo(object):
72 def __init__(self, name, cipher, mode):
76 if self.cipher is not None:
77 self.bs = self.cipher.block_size // 8
79 if self.name == 'AES-GCM-16ICV':
80 self.iv_len = GCM_IV_SIZE
84 def encrypt(self, data, key, aad=None):
85 iv = os.urandom(self.iv_len)
87 encryptor = Cipher(self.cipher(key), self.mode(iv),
88 default_backend()).encryptor()
89 return iv + encryptor.update(data) + encryptor.finalize()
91 salt = key[-SALT_SIZE:]
93 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
94 default_backend()).encryptor()
95 encryptor.authenticate_additional_data(aad)
96 data = encryptor.update(data) + encryptor.finalize()
97 data += encryptor.tag[:GCM_ICV_SIZE]
100 def decrypt(self, data, key, aad=None, icv=None):
102 iv = data[:self.iv_len]
103 ct = data[self.iv_len:]
104 decryptor = Cipher(algorithms.AES(key),
106 default_backend()).decryptor()
107 return decryptor.update(ct) + decryptor.finalize()
109 salt = key[-SALT_SIZE:]
110 nonce = salt + data[:GCM_IV_SIZE]
111 ct = data[GCM_IV_SIZE:]
112 key = key[:-SALT_SIZE]
113 decryptor = Cipher(algorithms.AES(key),
114 self.mode(nonce, icv, len(icv)),
115 default_backend()).decryptor()
116 decryptor.authenticate_additional_data(aad)
117 return decryptor.update(ct) + decryptor.finalize()
120 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121 data = data + b'\x00' * (pad_len - 1)
122 return data + bytes([pad_len - 1])
125 class AuthAlgo(object):
126 def __init__(self, name, mac, mod, key_len, trunc_len=None):
130 self.key_len = key_len
131 self.trunc_len = trunc_len or key_len
135 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
142 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
150 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
168 class IKEv2ChildSA(object):
169 def __init__(self, local_ts, remote_ts, is_initiator):
177 self.local_ts = local_ts
178 self.remote_ts = remote_ts
181 class IKEv2SA(object):
182 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
183 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
184 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
185 auth_method='shared-key', priv_key=None, natt=False,
187 self.udp_encap = udp_encap
196 self.dh_params = None
198 self.priv_key = priv_key
199 self.is_initiator = is_initiator
200 nonce = nonce or os.urandom(32)
201 self.auth_data = auth_data
204 if isinstance(id_type, str):
205 self.id_type = IDType.value(id_type)
207 self.id_type = id_type
208 self.auth_method = auth_method
209 if self.is_initiator:
210 self.rspi = 8 * b'\x00'
215 self.ispi = 8 * b'\x00'
217 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
220 def new_msg_id(self):
225 def my_dh_pub_key(self):
226 if self.is_initiator:
227 return self.i_dh_data
228 return self.r_dh_data
231 def peer_dh_pub_key(self):
232 if self.is_initiator:
233 return self.r_dh_data
234 return self.i_dh_data
236 def compute_secret(self):
237 priv = self.dh_private_key
238 peer = self.peer_dh_pub_key
239 p, g, l = self.ike_group
240 return pow(int.from_bytes(peer, 'big'),
241 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
243 def generate_dh_data(self):
245 if self.ike_dh not in DH:
246 raise NotImplementedError('%s not in DH group' % self.ike_dh)
248 if self.dh_params is None:
249 dhg = DH[self.ike_dh]
250 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
251 self.dh_params = pn.parameters(default_backend())
253 priv = self.dh_params.generate_private_key()
254 pub = priv.public_key()
255 x = priv.private_numbers().x
256 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
257 y = pub.public_numbers().y
259 if self.is_initiator:
260 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
262 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
264 def complete_dh_data(self):
265 self.dh_shared_secret = self.compute_secret()
267 def calc_child_keys(self):
268 prf = self.ike_prf_alg.mod()
269 s = self.i_nonce + self.r_nonce
270 c = self.child_sas[0]
272 encr_key_len = self.esp_crypto_key_len
273 integ_key_len = self.esp_integ_alg.key_len
274 salt_len = 0 if integ_key_len else 4
276 l = (integ_key_len * 2 +
279 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
282 c.sk_ei = keymat[pos:pos+encr_key_len]
286 c.sk_ai = keymat[pos:pos+integ_key_len]
289 c.salt_ei = keymat[pos:pos+salt_len]
292 c.sk_er = keymat[pos:pos+encr_key_len]
296 c.sk_ar = keymat[pos:pos+integ_key_len]
299 c.salt_er = keymat[pos:pos+salt_len]
302 def calc_prfplus(self, prf, key, seed, length):
306 while len(r) < length and x < 255:
311 s = s + seed + bytes([x])
312 t = self.calc_prf(prf, key, s)
320 def calc_prf(self, prf, key, data):
321 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
326 prf = self.ike_prf_alg.mod()
327 # SKEYSEED = prf(Ni | Nr, g^ir)
328 s = self.i_nonce + self.r_nonce
329 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
331 # calculate S = Ni | Nr | SPIi SPIr
332 s = s + self.ispi + self.rspi
334 prf_key_trunc = self.ike_prf_alg.trunc_len
335 encr_key_len = self.ike_crypto_key_len
336 tr_prf_key_len = self.ike_prf_alg.key_len
337 integ_key_len = self.ike_integ_alg.key_len
338 if integ_key_len == 0:
348 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
351 self.sk_d = keymat[:pos+prf_key_trunc]
354 self.sk_ai = keymat[pos:pos+integ_key_len]
356 self.sk_ar = keymat[pos:pos+integ_key_len]
359 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
360 pos += encr_key_len + salt_size
361 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
362 pos += encr_key_len + salt_size
364 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
365 pos += tr_prf_key_len
366 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
368 def generate_authmsg(self, prf, packet):
369 if self.is_initiator:
377 data = bytes([self.id_type, 0, 0, 0]) + id
378 id_hash = self.calc_prf(prf, key, data)
379 return packet + nonce + id_hash
382 prf = self.ike_prf_alg.mod()
383 if self.is_initiator:
384 packet = self.init_req_packet
386 packet = self.init_resp_packet
387 authmsg = self.generate_authmsg(prf, raw(packet))
388 if self.auth_method == 'shared-key':
389 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
390 self.auth_data = self.calc_prf(prf, psk, authmsg)
391 elif self.auth_method == 'rsa-sig':
392 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
395 raise TypeError('unknown auth method type!')
397 def encrypt(self, data, aad=None):
398 data = self.ike_crypto_alg.pad(data)
399 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
402 def peer_authkey(self):
403 if self.is_initiator:
408 def my_authkey(self):
409 if self.is_initiator:
414 def my_cryptokey(self):
415 if self.is_initiator:
420 def peer_cryptokey(self):
421 if self.is_initiator:
425 def concat(self, alg, key_len):
426 return alg + '-' + str(key_len * 8)
429 def vpp_ike_cypto_alg(self):
430 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
433 def vpp_esp_cypto_alg(self):
434 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
436 def verify_hmac(self, ikemsg):
437 integ_trunc = self.ike_integ_alg.trunc_len
438 exp_hmac = ikemsg[-integ_trunc:]
439 data = ikemsg[:-integ_trunc]
440 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
441 self.peer_authkey, data)
442 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
444 def compute_hmac(self, integ, key, data):
445 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
449 def decrypt(self, data, aad=None, icv=None):
450 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
452 def hmac_and_decrypt(self, ike):
453 ep = ike[ikev2.IKEv2_payload_Encrypted]
454 if self.ike_crypto == 'AES-GCM-16ICV':
455 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
456 ct = ep.load[:-GCM_ICV_SIZE]
457 tag = ep.load[-GCM_ICV_SIZE:]
458 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
460 self.verify_hmac(raw(ike))
461 integ_trunc = self.ike_integ_alg.trunc_len
463 # remove ICV and decrypt payload
464 ct = ep.load[:-integ_trunc]
465 plain = self.decrypt(ct)
468 return plain[:-pad_len - 1]
470 def build_ts_addr(self, ts, version):
471 return {'starting_address_v' + version: ts['start_addr'],
472 'ending_address_v' + version: ts['end_addr']}
474 def generate_ts(self, is_ip4):
475 c = self.child_sas[0]
476 ts_data = {'IP_protocol_ID': 0,
480 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
481 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
482 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
483 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
485 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
486 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
487 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
488 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
490 if self.is_initiator:
491 return ([ts1], [ts2])
492 return ([ts2], [ts1])
494 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
495 if crypto not in CRYPTO_ALGOS:
496 raise TypeError('unsupported encryption algo %r' % crypto)
497 self.ike_crypto = crypto
498 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
499 self.ike_crypto_key_len = crypto_key_len
501 if integ not in AUTH_ALGOS:
502 raise TypeError('unsupported auth algo %r' % integ)
503 self.ike_integ = None if integ == 'NULL' else integ
504 self.ike_integ_alg = AUTH_ALGOS[integ]
506 if prf not in PRF_ALGOS:
507 raise TypeError('unsupported prf algo %r' % prf)
509 self.ike_prf_alg = PRF_ALGOS[prf]
511 self.ike_group = DH[self.ike_dh]
513 def set_esp_props(self, crypto, crypto_key_len, integ):
514 self.esp_crypto_key_len = crypto_key_len
515 if crypto not in CRYPTO_ALGOS:
516 raise TypeError('unsupported encryption algo %r' % crypto)
517 self.esp_crypto = crypto
518 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
520 if integ not in AUTH_ALGOS:
521 raise TypeError('unsupported auth algo %r' % integ)
522 self.esp_integ = None if integ == 'NULL' else integ
523 self.esp_integ_alg = AUTH_ALGOS[integ]
525 def crypto_attr(self, key_len):
526 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
527 return (0x800e << 16 | key_len << 3, 12)
529 raise Exception('unsupported attribute type')
531 def ike_crypto_attr(self):
532 return self.crypto_attr(self.ike_crypto_key_len)
534 def esp_crypto_attr(self):
535 return self.crypto_attr(self.esp_crypto_key_len)
537 def compute_nat_sha1(self, ip, port, rspi=None):
540 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
541 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
543 return digest.finalize()
546 class IkePeer(VppTestCase):
547 """ common class for initiator and responder """
551 import scapy.contrib.ikev2 as _ikev2
552 globals()['ikev2'] = _ikev2
553 super(IkePeer, cls).setUpClass()
554 cls.create_pg_interfaces(range(2))
555 for i in cls.pg_interfaces:
563 def tearDownClass(cls):
564 super(IkePeer, cls).tearDownClass()
567 super(IkePeer, self).tearDown()
568 if self.del_sa_from_responder:
569 self.initiate_del_sa_from_responder()
571 self.initiate_del_sa_from_initiator()
572 r = self.vapi.ikev2_sa_dump()
573 self.assertEqual(len(r), 0)
574 sas = self.vapi.ipsec_sa_dump()
575 self.assertEqual(len(sas), 0)
576 self.p.remove_vpp_config()
577 self.assertIsNone(self.p.query_vpp_config())
580 super(IkePeer, self).setUp()
582 self.p.add_vpp_config()
583 self.assertIsNotNone(self.p.query_vpp_config())
584 if self.sa.is_initiator:
585 self.sa.generate_dh_data()
586 self.vapi.cli('ikev2 set logging level 4')
587 self.vapi.cli('event-lo clear')
589 def create_rekey_request(self):
590 sa, first_payload = self.generate_auth_payload(is_rekey=True)
591 header = ikev2.IKEv2(
592 init_SPI=self.sa.ispi,
593 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
594 flags='Initiator', exch_type='CREATE_CHILD_SA')
596 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
597 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
598 self.sa.dport, self.sa.natt, self.ip6)
600 def create_empty_request(self):
601 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
602 id=self.sa.new_msg_id(), flags='Initiator',
603 exch_type='INFORMATIONAL',
604 next_payload='Encrypted')
606 msg = self.encrypt_ike_msg(header, b'', None)
607 return self.create_packet(self.pg0, msg, self.sa.sport,
608 self.sa.dport, self.sa.natt, self.ip6)
610 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
613 src_ip = src_if.remote_ip6
614 dst_ip = src_if.local_ip6
617 src_ip = src_if.remote_ip4
618 dst_ip = src_if.local_ip4
620 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
621 ip_layer(src=src_ip, dst=dst_ip) /
622 UDP(sport=sport, dport=dport))
624 # insert non ESP marker
625 res = res / Raw(b'\x00' * 4)
628 def verify_udp(self, udp):
629 self.assertEqual(udp.sport, self.sa.sport)
630 self.assertEqual(udp.dport, self.sa.dport)
632 def get_ike_header(self, packet):
634 ih = packet[ikev2.IKEv2]
635 ih = self.verify_and_remove_non_esp_marker(ih)
636 except IndexError as e:
637 # this is a workaround for getting IKEv2 layer as both ikev2 and
638 # ipsec register for port 4500
640 ih = self.verify_and_remove_non_esp_marker(esp)
641 self.assertEqual(ih.version, 0x20)
642 self.assertNotIn('Version', ih.flags)
645 def verify_and_remove_non_esp_marker(self, packet):
647 # if we are in nat traversal mode check for non esp marker
650 self.assertEqual(data[:4], b'\x00' * 4)
651 return ikev2.IKEv2(data[4:])
655 def encrypt_ike_msg(self, header, plain, first_payload):
656 if self.sa.ike_crypto == 'AES-GCM-16ICV':
657 data = self.sa.ike_crypto_alg.pad(raw(plain))
658 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
659 len(ikev2.IKEv2_payload_Encrypted())
660 tlen = plen + len(ikev2.IKEv2())
663 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
667 encr = self.sa.encrypt(raw(plain), raw(res))
668 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
669 length=plen, load=encr)
672 encr = self.sa.encrypt(raw(plain))
673 trunc_len = self.sa.ike_integ_alg.trunc_len
674 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
675 tlen = plen + len(ikev2.IKEv2())
677 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
678 length=plen, load=encr)
682 integ_data = raw(res)
683 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
684 self.sa.my_authkey, integ_data)
685 res = res / Raw(hmac_data[:trunc_len])
686 assert(len(res) == tlen)
689 def verify_udp_encap(self, ipsec_sa):
690 e = VppEnum.vl_api_ipsec_sad_flags_t
691 if self.sa.udp_encap or self.sa.natt:
692 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
694 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
696 def verify_ipsec_sas(self, is_rekey=False):
697 sas = self.vapi.ipsec_sa_dump()
699 # after rekey there is a short period of time in which old
700 # inbound SA is still present
704 self.assertEqual(len(sas), sa_count)
705 if self.sa.is_initiator:
720 c = self.sa.child_sas[0]
722 self.verify_udp_encap(sa0)
723 self.verify_udp_encap(sa1)
724 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
725 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
726 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
728 if self.sa.esp_integ is None:
731 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
732 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
733 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
736 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
737 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
738 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
739 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
743 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
744 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
745 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
746 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
748 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
749 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
751 def verify_keymat(self, api_keys, keys, name):
752 km = getattr(keys, name)
753 api_km = getattr(api_keys, name)
754 api_km_len = getattr(api_keys, name + '_len')
755 self.assertEqual(len(km), api_km_len)
756 self.assertEqual(km, api_km[:api_km_len])
758 def verify_id(self, api_id, exp_id):
759 self.assertEqual(api_id.type, IDType.value(exp_id.type))
760 self.assertEqual(api_id.data_len, exp_id.data_len)
761 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
763 def verify_ike_sas(self):
764 r = self.vapi.ikev2_sa_dump()
765 self.assertEqual(len(r), 1)
767 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
768 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
770 if self.sa.is_initiator:
771 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
772 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
774 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
775 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
777 if self.sa.is_initiator:
778 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
779 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
781 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
782 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
783 self.verify_keymat(sa.keys, self.sa, 'sk_d')
784 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
785 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
786 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
787 self.verify_keymat(sa.keys, self.sa, 'sk_er')
788 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
789 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
791 self.assertEqual(sa.i_id.type, self.sa.id_type)
792 self.assertEqual(sa.r_id.type, self.sa.id_type)
793 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
794 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
795 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
796 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
798 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
799 self.assertEqual(len(r), 1)
801 self.assertEqual(csa.sa_index, sa.sa_index)
802 c = self.sa.child_sas[0]
803 if hasattr(c, 'sk_ai'):
804 self.verify_keymat(csa.keys, c, 'sk_ai')
805 self.verify_keymat(csa.keys, c, 'sk_ar')
806 self.verify_keymat(csa.keys, c, 'sk_ei')
807 self.verify_keymat(csa.keys, c, 'sk_er')
808 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
809 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
811 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
814 r = self.vapi.ikev2_traffic_selector_dump(
815 is_initiator=True, sa_index=sa.sa_index,
816 child_sa_index=csa.child_sa_index)
817 self.assertEqual(len(r), 1)
819 self.verify_ts(r[0].ts, tsi[0], True)
821 r = self.vapi.ikev2_traffic_selector_dump(
822 is_initiator=False, sa_index=sa.sa_index,
823 child_sa_index=csa.child_sa_index)
824 self.assertEqual(len(r), 1)
825 self.verify_ts(r[0].ts, tsr[0], False)
827 n = self.vapi.ikev2_nonce_get(is_initiator=True,
828 sa_index=sa.sa_index)
829 self.verify_nonce(n, self.sa.i_nonce)
830 n = self.vapi.ikev2_nonce_get(is_initiator=False,
831 sa_index=sa.sa_index)
832 self.verify_nonce(n, self.sa.r_nonce)
834 def verify_nonce(self, api_nonce, nonce):
835 self.assertEqual(api_nonce.data_len, len(nonce))
836 self.assertEqual(api_nonce.nonce, nonce)
838 def verify_ts(self, api_ts, ts, is_initiator):
840 self.assertTrue(api_ts.is_local)
842 self.assertFalse(api_ts.is_local)
845 self.assertEqual(api_ts.start_addr,
846 IPv4Address(ts.starting_address_v4))
847 self.assertEqual(api_ts.end_addr,
848 IPv4Address(ts.ending_address_v4))
850 self.assertEqual(api_ts.start_addr,
851 IPv6Address(ts.starting_address_v6))
852 self.assertEqual(api_ts.end_addr,
853 IPv6Address(ts.ending_address_v6))
854 self.assertEqual(api_ts.start_port, ts.start_port)
855 self.assertEqual(api_ts.end_port, ts.end_port)
856 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
859 class TemplateInitiator(IkePeer):
860 """ initiator test template """
862 def initiate_del_sa_from_initiator(self):
863 ispi = int.from_bytes(self.sa.ispi, 'little')
864 self.pg0.enable_capture()
866 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
867 capture = self.pg0.get_capture(1)
868 ih = self.get_ike_header(capture[0])
869 self.assertNotIn('Response', ih.flags)
870 self.assertIn('Initiator', ih.flags)
871 self.assertEqual(ih.init_SPI, self.sa.ispi)
872 self.assertEqual(ih.resp_SPI, self.sa.rspi)
873 plain = self.sa.hmac_and_decrypt(ih)
874 d = ikev2.IKEv2_payload_Delete(plain)
875 self.assertEqual(d.proto, 1) # proto=IKEv2
876 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
877 flags='Response', exch_type='INFORMATIONAL',
878 id=ih.id, next_payload='Encrypted')
879 resp = self.encrypt_ike_msg(header, b'', None)
880 self.send_and_assert_no_replies(self.pg0, resp)
882 def verify_del_sa(self, packet):
883 ih = self.get_ike_header(packet)
884 self.assertEqual(ih.id, self.sa.msg_id)
885 self.assertEqual(ih.exch_type, 37) # exchange informational
886 self.assertIn('Response', ih.flags)
887 self.assertIn('Initiator', ih.flags)
888 plain = self.sa.hmac_and_decrypt(ih)
889 self.assertEqual(plain, b'')
891 def initiate_del_sa_from_responder(self):
892 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
893 exch_type='INFORMATIONAL',
894 id=self.sa.new_msg_id())
895 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
896 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
897 packet = self.create_packet(self.pg0, ike_msg,
898 self.sa.sport, self.sa.dport,
899 self.sa.natt, self.ip6)
900 self.pg0.add_stream(packet)
901 self.pg0.enable_capture()
903 capture = self.pg0.get_capture(1)
904 self.verify_del_sa(capture[0])
907 def find_notify_payload(packet, notify_type):
908 n = packet[ikev2.IKEv2_payload_Notify]
910 if n.type == notify_type:
915 def verify_nat_detection(self, packet):
922 # NAT_DETECTION_SOURCE_IP
923 s = self.find_notify_payload(packet, 16388)
924 self.assertIsNotNone(s)
925 src_sha = self.sa.compute_nat_sha1(
926 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
927 self.assertEqual(s.load, src_sha)
929 # NAT_DETECTION_DESTINATION_IP
930 s = self.find_notify_payload(packet, 16389)
931 self.assertIsNotNone(s)
932 dst_sha = self.sa.compute_nat_sha1(
933 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
934 self.assertEqual(s.load, dst_sha)
936 def verify_sa_init_request(self, packet):
938 self.sa.dport = udp.sport
939 ih = packet[ikev2.IKEv2]
940 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
941 self.assertEqual(ih.exch_type, 34) # SA_INIT
942 self.sa.ispi = ih.init_SPI
943 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
944 self.assertIn('Initiator', ih.flags)
945 self.assertNotIn('Response', ih.flags)
946 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
947 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
949 prop = packet[ikev2.IKEv2_payload_Proposal]
950 self.assertEqual(prop.proto, 1) # proto = ikev2
951 self.assertEqual(prop.proposal, 1)
952 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
953 self.assertEqual(prop.trans[0].transform_id,
954 self.p.ike_transforms['crypto_alg'])
955 self.assertEqual(prop.trans[1].transform_type, 2) # prf
956 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
957 self.assertEqual(prop.trans[2].transform_type, 4) # dh
958 self.assertEqual(prop.trans[2].transform_id,
959 self.p.ike_transforms['dh_group'])
961 self.verify_nat_detection(packet)
962 self.sa.set_ike_props(
963 crypto='AES-GCM-16ICV', crypto_key_len=32,
964 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
965 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
966 integ='SHA2-256-128')
967 self.sa.generate_dh_data()
968 self.sa.complete_dh_data()
971 def update_esp_transforms(self, trans, sa):
973 if trans.transform_type == 1: # ecryption
974 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
975 elif trans.transform_type == 3: # integrity
976 sa.esp_integ = INTEG_IDS[trans.transform_id]
977 trans = trans.payload
979 def verify_sa_auth_req(self, packet):
981 self.sa.dport = udp.sport
982 ih = self.get_ike_header(packet)
983 self.assertEqual(ih.resp_SPI, self.sa.rspi)
984 self.assertEqual(ih.init_SPI, self.sa.ispi)
985 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
986 self.assertIn('Initiator', ih.flags)
987 self.assertNotIn('Response', ih.flags)
991 self.assertEqual(ih.id, self.sa.msg_id + 1)
993 plain = self.sa.hmac_and_decrypt(ih)
994 idi = ikev2.IKEv2_payload_IDi(plain)
995 idr = ikev2.IKEv2_payload_IDr(idi.payload)
996 self.assertEqual(idi.load, self.sa.i_id)
997 self.assertEqual(idr.load, self.sa.r_id)
998 prop = idi[ikev2.IKEv2_payload_Proposal]
999 c = self.sa.child_sas[0]
1001 self.update_esp_transforms(
1002 prop[ikev2.IKEv2_payload_Transform], self.sa)
1004 def send_init_response(self):
1005 tr_attr = self.sa.ike_crypto_attr()
1006 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1007 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1008 key_length=tr_attr[0]) /
1009 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1010 transform_id=self.sa.ike_integ) /
1011 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1012 transform_id=self.sa.ike_prf_alg.name) /
1013 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1014 transform_id=self.sa.ike_dh))
1015 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1016 trans_nb=4, trans=trans))
1018 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1020 dst_address = b'\x0a\x0a\x0a\x0a'
1022 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1023 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1024 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1026 self.sa.init_resp_packet = (
1027 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1028 exch_type='IKE_SA_INIT', flags='Response') /
1029 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1030 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1031 group=self.sa.ike_dh,
1032 load=self.sa.my_dh_pub_key) /
1033 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1034 next_payload='Notify') /
1035 ikev2.IKEv2_payload_Notify(
1036 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1037 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1038 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1040 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1041 self.sa.sport, self.sa.dport,
1043 self.pg_send(self.pg0, ike_msg)
1044 capture = self.pg0.get_capture(1)
1045 self.verify_sa_auth_req(capture[0])
1047 def initiate_sa_init(self):
1048 self.pg0.enable_capture()
1050 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1052 capture = self.pg0.get_capture(1)
1053 self.verify_sa_init_request(capture[0])
1054 self.send_init_response()
1056 def send_auth_response(self):
1057 tr_attr = self.sa.esp_crypto_attr()
1058 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1059 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1060 key_length=tr_attr[0]) /
1061 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1062 transform_id=self.sa.esp_integ) /
1063 ikev2.IKEv2_payload_Transform(
1064 transform_type='Extended Sequence Number',
1065 transform_id='No ESN') /
1066 ikev2.IKEv2_payload_Transform(
1067 transform_type='Extended Sequence Number',
1068 transform_id='ESN'))
1070 c = self.sa.child_sas[0]
1071 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1072 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1074 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1075 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1076 IDtype=self.sa.id_type, load=self.sa.i_id) /
1077 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1078 IDtype=self.sa.id_type, load=self.sa.r_id) /
1079 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1080 auth_type=AuthMethod.value(self.sa.auth_method),
1081 load=self.sa.auth_data) /
1082 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1083 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1084 number_of_TSs=len(tsi),
1085 traffic_selector=tsi) /
1086 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1087 number_of_TSs=len(tsr),
1088 traffic_selector=tsr) /
1089 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1091 header = ikev2.IKEv2(
1092 init_SPI=self.sa.ispi,
1093 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1094 flags='Response', exch_type='IKE_AUTH')
1096 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1097 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1098 self.sa.dport, self.sa.natt, self.ip6)
1099 self.pg_send(self.pg0, packet)
1101 def test_initiator(self):
1102 self.initiate_sa_init()
1104 self.sa.calc_child_keys()
1105 self.send_auth_response()
1106 self.verify_ike_sas()
1109 class TemplateResponder(IkePeer):
1110 """ responder test template """
1112 def initiate_del_sa_from_responder(self):
1113 self.pg0.enable_capture()
1115 self.vapi.ikev2_initiate_del_ike_sa(
1116 ispi=int.from_bytes(self.sa.ispi, 'little'))
1117 capture = self.pg0.get_capture(1)
1118 ih = self.get_ike_header(capture[0])
1119 self.assertNotIn('Response', ih.flags)
1120 self.assertNotIn('Initiator', ih.flags)
1121 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1122 plain = self.sa.hmac_and_decrypt(ih)
1123 d = ikev2.IKEv2_payload_Delete(plain)
1124 self.assertEqual(d.proto, 1) # proto=IKEv2
1125 self.assertEqual(ih.init_SPI, self.sa.ispi)
1126 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1127 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1128 flags='Initiator+Response',
1129 exch_type='INFORMATIONAL',
1130 id=ih.id, next_payload='Encrypted')
1131 resp = self.encrypt_ike_msg(header, b'', None)
1132 self.send_and_assert_no_replies(self.pg0, resp)
1134 def verify_del_sa(self, packet):
1135 ih = self.get_ike_header(packet)
1136 self.assertEqual(ih.id, self.sa.msg_id)
1137 self.assertEqual(ih.exch_type, 37) # exchange informational
1138 self.assertIn('Response', ih.flags)
1139 self.assertNotIn('Initiator', ih.flags)
1140 self.assertEqual(ih.next_payload, 46) # Encrypted
1141 self.assertEqual(ih.init_SPI, self.sa.ispi)
1142 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1143 plain = self.sa.hmac_and_decrypt(ih)
1144 self.assertEqual(plain, b'')
1146 def initiate_del_sa_from_initiator(self):
1147 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1148 flags='Initiator', exch_type='INFORMATIONAL',
1149 id=self.sa.new_msg_id())
1150 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1151 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1152 packet = self.create_packet(self.pg0, ike_msg,
1153 self.sa.sport, self.sa.dport,
1154 self.sa.natt, self.ip6)
1155 self.pg0.add_stream(packet)
1156 self.pg0.enable_capture()
1158 capture = self.pg0.get_capture(1)
1159 self.verify_del_sa(capture[0])
1161 def send_sa_init_req(self, behind_nat=False):
1162 tr_attr = self.sa.ike_crypto_attr()
1163 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1164 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1165 key_length=tr_attr[0]) /
1166 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1167 transform_id=self.sa.ike_integ) /
1168 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1169 transform_id=self.sa.ike_prf_alg.name) /
1170 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1171 transform_id=self.sa.ike_dh))
1173 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1174 trans_nb=4, trans=trans))
1176 self.sa.init_req_packet = (
1177 ikev2.IKEv2(init_SPI=self.sa.ispi,
1178 flags='Initiator', exch_type='IKE_SA_INIT') /
1179 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1180 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1181 group=self.sa.ike_dh,
1182 load=self.sa.my_dh_pub_key) /
1183 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1184 load=self.sa.i_nonce))
1187 src_address = b'\x0a\x0a\x0a\x01'
1189 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1191 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1192 dst_nat = self.sa.compute_nat_sha1(
1193 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1195 nat_src_detection = ikev2.IKEv2_payload_Notify(
1196 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1197 next_payload='Notify')
1198 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1199 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1200 self.sa.init_req_packet = (self.sa.init_req_packet /
1204 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1205 self.sa.sport, self.sa.dport,
1206 self.sa.natt, self.ip6)
1207 self.pg0.add_stream(ike_msg)
1208 self.pg0.enable_capture()
1210 capture = self.pg0.get_capture(1)
1211 self.verify_sa_init(capture[0])
1213 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1214 tr_attr = self.sa.esp_crypto_attr()
1215 last_payload = last_payload or 'Notify'
1216 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1217 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1218 key_length=tr_attr[0]) /
1219 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1220 transform_id=self.sa.esp_integ) /
1221 ikev2.IKEv2_payload_Transform(
1222 transform_type='Extended Sequence Number',
1223 transform_id='No ESN') /
1224 ikev2.IKEv2_payload_Transform(
1225 transform_type='Extended Sequence Number',
1226 transform_id='ESN'))
1228 c = self.sa.child_sas[0]
1229 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1230 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1232 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1233 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1234 auth_type=AuthMethod.value(self.sa.auth_method),
1235 load=self.sa.auth_data) /
1236 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1237 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1238 number_of_TSs=len(tsi), traffic_selector=tsi) /
1239 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1240 number_of_TSs=len(tsr), traffic_selector=tsr))
1243 first_payload = 'Nonce'
1244 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1245 next_payload='SA') / plain /
1246 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1247 proto='ESP', SPI=c.ispi))
1249 first_payload = 'IDi'
1250 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1251 IDtype=self.sa.id_type, load=self.sa.i_id) /
1252 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1253 IDtype=self.sa.id_type, load=self.sa.r_id))
1255 return plain, first_payload
1257 def send_sa_auth(self):
1258 plain, first_payload = self.generate_auth_payload(
1259 last_payload='Notify')
1260 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1261 header = ikev2.IKEv2(
1262 init_SPI=self.sa.ispi,
1263 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1264 flags='Initiator', exch_type='IKE_AUTH')
1266 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1267 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1268 self.sa.dport, self.sa.natt, self.ip6)
1269 self.pg0.add_stream(packet)
1270 self.pg0.enable_capture()
1272 capture = self.pg0.get_capture(1)
1273 self.verify_sa_auth_resp(capture[0])
1275 def verify_sa_init(self, packet):
1276 ih = self.get_ike_header(packet)
1278 self.assertEqual(ih.id, self.sa.msg_id)
1279 self.assertEqual(ih.exch_type, 34)
1280 self.assertIn('Response', ih.flags)
1281 self.assertEqual(ih.init_SPI, self.sa.ispi)
1282 self.assertNotEqual(ih.resp_SPI, 0)
1283 self.sa.rspi = ih.resp_SPI
1285 sa = ih[ikev2.IKEv2_payload_SA]
1286 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1287 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1288 except IndexError as e:
1289 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1290 self.logger.error(ih.show())
1292 self.sa.complete_dh_data()
1296 def verify_sa_auth_resp(self, packet):
1297 ike = self.get_ike_header(packet)
1299 self.verify_udp(udp)
1300 self.assertEqual(ike.id, self.sa.msg_id)
1301 plain = self.sa.hmac_and_decrypt(ike)
1302 idr = ikev2.IKEv2_payload_IDr(plain)
1303 prop = idr[ikev2.IKEv2_payload_Proposal]
1304 self.assertEqual(prop.SPIsize, 4)
1305 self.sa.child_sas[0].rspi = prop.SPI
1306 self.sa.calc_child_keys()
1308 def test_responder(self):
1309 self.send_sa_init_req(self.sa.natt)
1311 self.verify_ipsec_sas()
1312 self.verify_ike_sas()
1315 class Ikev2Params(object):
1316 def config_params(self, params={}):
1317 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1318 ei = VppEnum.vl_api_ipsec_integ_alg_t
1320 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1321 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1322 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1323 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1324 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1325 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1327 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1328 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1329 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1330 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1332 dpd_disabled = True if 'dpd_disabled' not in params else\
1333 params['dpd_disabled']
1335 self.vapi.cli('ikev2 dpd disable')
1336 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1337 not in params else params['del_sa_from_responder']
1338 is_natt = 'natt' in params and params['natt'] or False
1339 self.p = Profile(self, 'pr1')
1340 self.ip6 = False if 'ip6' not in params else params['ip6']
1342 if 'auth' in params and params['auth'] == 'rsa-sig':
1343 auth_method = 'rsa-sig'
1344 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1345 self.vapi.ikev2_set_local_key(
1346 key_file=work_dir + params['server-key'])
1348 client_file = work_dir + params['client-cert']
1349 server_pem = open(work_dir + params['server-cert']).read()
1350 client_priv = open(work_dir + params['client-key']).read()
1351 client_priv = load_pem_private_key(str.encode(client_priv), None,
1353 self.peer_cert = x509.load_pem_x509_certificate(
1354 str.encode(server_pem),
1356 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1359 auth_data = b'$3cr3tpa$$w0rd'
1360 self.p.add_auth(method='shared-key', data=auth_data)
1361 auth_method = 'shared-key'
1364 is_init = True if 'is_initiator' not in params else\
1365 params['is_initiator']
1367 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1368 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1370 self.p.add_local_id(**idr)
1371 self.p.add_remote_id(**idi)
1373 self.p.add_local_id(**idi)
1374 self.p.add_remote_id(**idr)
1376 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1377 'loc_ts' not in params else params['loc_ts']
1378 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1379 'rem_ts' not in params else params['rem_ts']
1380 self.p.add_local_ts(**loc_ts)
1381 self.p.add_remote_ts(**rem_ts)
1382 if 'responder' in params:
1383 self.p.add_responder(params['responder'])
1384 if 'ike_transforms' in params:
1385 self.p.add_ike_transforms(params['ike_transforms'])
1386 if 'esp_transforms' in params:
1387 self.p.add_esp_transforms(params['esp_transforms'])
1389 udp_encap = False if 'udp_encap' not in params else\
1392 self.p.set_udp_encap(True)
1394 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1395 is_initiator=is_init,
1396 id_type=self.p.local_id['id_type'], natt=is_natt,
1397 priv_key=client_priv, auth_method=auth_method,
1398 auth_data=auth_data, udp_encap=udp_encap,
1399 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1401 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1402 params['ike-crypto']
1403 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1405 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1408 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1409 params['esp-crypto']
1410 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1413 self.sa.set_ike_props(
1414 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1415 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1416 self.sa.set_esp_props(
1417 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1421 class TestApi(VppTestCase):
1422 """ Test IKEV2 API """
1424 def setUpClass(cls):
1425 super(TestApi, cls).setUpClass()
1428 def tearDownClass(cls):
1429 super(TestApi, cls).tearDownClass()
1432 super(TestApi, self).tearDown()
1433 self.p1.remove_vpp_config()
1434 self.p2.remove_vpp_config()
1435 r = self.vapi.ikev2_profile_dump()
1436 self.assertEqual(len(r), 0)
1438 def configure_profile(self, cfg):
1439 p = Profile(self, cfg['name'])
1440 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1441 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1442 p.add_local_ts(**cfg['loc_ts'])
1443 p.add_remote_ts(**cfg['rem_ts'])
1444 p.add_responder(cfg['responder'])
1445 p.add_ike_transforms(cfg['ike_ts'])
1446 p.add_esp_transforms(cfg['esp_ts'])
1447 p.add_auth(**cfg['auth'])
1448 p.set_udp_encap(cfg['udp_encap'])
1449 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1450 if 'lifetime_data' in cfg:
1451 p.set_lifetime_data(cfg['lifetime_data'])
1452 if 'tun_itf' in cfg:
1453 p.set_tunnel_interface(cfg['tun_itf'])
1454 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1459 def test_profile_api(self):
1460 """ test profile dump API """
1465 'start_addr': '3.3.3.2',
1466 'end_addr': '3.3.3.3',
1472 'start_addr': '4.5.76.80',
1473 'end_addr': '2.3.4.6',
1480 'start_addr': 'ab::1',
1481 'end_addr': 'ab::4',
1487 'start_addr': 'cd::12',
1488 'end_addr': 'cd::13',
1494 'natt_disabled': True,
1495 'loc_id': ('fqdn', b'vpp.home'),
1496 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1499 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1502 'crypto_key_size': 32,
1507 'crypto_key_size': 24,
1509 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1511 'ipsec_over_udp_port': 4501,
1514 'lifetime_maxdata': 20192,
1515 'lifetime_jitter': 9,
1520 'loc_id': ('ip4-addr', b'192.168.2.1'),
1521 'rem_id': ('ip6-addr', b'abcd::1'),
1524 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1527 'crypto_key_size': 16,
1532 'crypto_key_size': 24,
1534 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1536 'ipsec_over_udp_port': 4600,
1539 self.p1 = self.configure_profile(conf['p1'])
1540 self.p2 = self.configure_profile(conf['p2'])
1542 r = self.vapi.ikev2_profile_dump()
1543 self.assertEqual(len(r), 2)
1544 self.verify_profile(r[0].profile, conf['p1'])
1545 self.verify_profile(r[1].profile, conf['p2'])
1547 def verify_id(self, api_id, cfg_id):
1548 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1549 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1551 def verify_ts(self, api_ts, cfg_ts):
1552 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1553 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1554 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1555 self.assertEqual(api_ts.start_addr,
1556 ip_address(text_type(cfg_ts['start_addr'])))
1557 self.assertEqual(api_ts.end_addr,
1558 ip_address(text_type(cfg_ts['end_addr'])))
1560 def verify_responder(self, api_r, cfg_r):
1561 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1562 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1564 def verify_transforms(self, api_ts, cfg_ts):
1565 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1566 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1567 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1569 def verify_ike_transforms(self, api_ts, cfg_ts):
1570 self.verify_transforms(api_ts, cfg_ts)
1571 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1573 def verify_esp_transforms(self, api_ts, cfg_ts):
1574 self.verify_transforms(api_ts, cfg_ts)
1576 def verify_auth(self, api_auth, cfg_auth):
1577 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1578 self.assertEqual(api_auth.data, cfg_auth['data'])
1579 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1581 def verify_lifetime_data(self, p, ld):
1582 self.assertEqual(p.lifetime, ld['lifetime'])
1583 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1584 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1585 self.assertEqual(p.handover, ld['handover'])
1587 def verify_profile(self, ap, cp):
1588 self.assertEqual(ap.name, cp['name'])
1589 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1590 self.verify_id(ap.loc_id, cp['loc_id'])
1591 self.verify_id(ap.rem_id, cp['rem_id'])
1592 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1593 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1594 self.verify_responder(ap.responder, cp['responder'])
1595 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1596 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1597 self.verify_auth(ap.auth, cp['auth'])
1598 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1599 self.assertTrue(natt_dis == ap.natt_disabled)
1601 if 'lifetime_data' in cp:
1602 self.verify_lifetime_data(ap, cp['lifetime_data'])
1603 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1605 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1607 self.assertEqual(ap.tun_itf, 0xffffffff)
1610 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1611 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1613 def config_tc(self):
1614 self.config_params({
1616 'is_initiator': False, # seen from test case perspective
1617 # thus vpp is initiator
1618 'responder': {'sw_if_index': self.pg0.sw_if_index,
1619 'addr': self.pg0.remote_ip4},
1620 'ike-crypto': ('AES-GCM-16ICV', 32),
1621 'ike-integ': 'NULL',
1622 'ike-dh': '3072MODPgr',
1624 'crypto_alg': 20, # "aes-gcm-16"
1625 'crypto_key_size': 256,
1626 'dh_group': 15, # "modp-3072"
1629 'crypto_alg': 12, # "aes-cbc"
1630 'crypto_key_size': 256,
1631 # "hmac-sha2-256-128"
1635 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1636 """ test ikev2 initiator - pre shared key auth """
1638 def config_tc(self):
1639 self.config_params({
1640 'is_initiator': False, # seen from test case perspective
1641 # thus vpp is initiator
1642 'responder': {'sw_if_index': self.pg0.sw_if_index,
1643 'addr': self.pg0.remote_ip4},
1644 'ike-crypto': ('AES-GCM-16ICV', 32),
1645 'ike-integ': 'NULL',
1646 'ike-dh': '3072MODPgr',
1648 'crypto_alg': 20, # "aes-gcm-16"
1649 'crypto_key_size': 256,
1650 'dh_group': 15, # "modp-3072"
1653 'crypto_alg': 12, # "aes-cbc"
1654 'crypto_key_size': 256,
1655 # "hmac-sha2-256-128"
1659 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1660 """ test initiator - request window size (1) """
1662 def rekey_respond(self, req, update_child_sa_data):
1663 ih = self.get_ike_header(req)
1664 plain = self.sa.hmac_and_decrypt(ih)
1665 sa = ikev2.IKEv2_payload_SA(plain)
1666 if update_child_sa_data:
1667 prop = sa[ikev2.IKEv2_payload_Proposal]
1668 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1669 self.sa.r_nonce = self.sa.i_nonce
1670 self.sa.child_sas[0].ispi = prop.SPI
1671 self.sa.child_sas[0].rspi = prop.SPI
1672 self.sa.calc_child_keys()
1674 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1675 flags='Response', exch_type=36,
1676 id=ih.id, next_payload='Encrypted')
1677 resp = self.encrypt_ike_msg(header, sa, 'SA')
1678 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1679 self.sa.dport, self.sa.natt, self.ip6)
1680 self.send_and_assert_no_replies(self.pg0, packet)
1682 def test_initiator(self):
1683 super(TestInitiatorRequestWindowSize, self).test_initiator()
1684 self.pg0.enable_capture()
1686 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1687 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1688 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1689 capture = self.pg0.get_capture(2)
1691 # reply in reverse order
1692 self.rekey_respond(capture[1], True)
1693 self.rekey_respond(capture[0], False)
1695 # verify that only the second request was accepted
1696 self.verify_ike_sas()
1697 self.verify_ipsec_sas(is_rekey=True)
1700 class TestInitiatorRekey(TestInitiatorPsk):
1701 """ test ikev2 initiator - rekey """
1703 def rekey_from_initiator(self):
1704 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1705 self.pg0.enable_capture()
1707 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1708 capture = self.pg0.get_capture(1)
1709 ih = self.get_ike_header(capture[0])
1710 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1711 self.assertNotIn('Response', ih.flags)
1712 self.assertIn('Initiator', ih.flags)
1713 plain = self.sa.hmac_and_decrypt(ih)
1714 sa = ikev2.IKEv2_payload_SA(plain)
1715 prop = sa[ikev2.IKEv2_payload_Proposal]
1716 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1717 self.sa.r_nonce = self.sa.i_nonce
1718 # update new responder SPI
1719 self.sa.child_sas[0].ispi = prop.SPI
1720 self.sa.child_sas[0].rspi = prop.SPI
1721 self.sa.calc_child_keys()
1722 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1723 flags='Response', exch_type=36,
1724 id=ih.id, next_payload='Encrypted')
1725 resp = self.encrypt_ike_msg(header, sa, 'SA')
1726 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1727 self.sa.dport, self.sa.natt, self.ip6)
1728 self.send_and_assert_no_replies(self.pg0, packet)
1730 def test_initiator(self):
1731 super(TestInitiatorRekey, self).test_initiator()
1732 self.rekey_from_initiator()
1733 self.verify_ike_sas()
1734 self.verify_ipsec_sas(is_rekey=True)
1737 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1738 """ test ikev2 initiator - delete IKE SA from responder """
1740 def config_tc(self):
1741 self.config_params({
1742 'del_sa_from_responder': True,
1743 'is_initiator': False, # seen from test case perspective
1744 # thus vpp is initiator
1745 'responder': {'sw_if_index': self.pg0.sw_if_index,
1746 'addr': self.pg0.remote_ip4},
1747 'ike-crypto': ('AES-GCM-16ICV', 32),
1748 'ike-integ': 'NULL',
1749 'ike-dh': '3072MODPgr',
1751 'crypto_alg': 20, # "aes-gcm-16"
1752 'crypto_key_size': 256,
1753 'dh_group': 15, # "modp-3072"
1756 'crypto_alg': 12, # "aes-cbc"
1757 'crypto_key_size': 256,
1758 # "hmac-sha2-256-128"
1762 class TestResponderNATT(TemplateResponder, Ikev2Params):
1763 """ test ikev2 responder - nat traversal """
1764 def config_tc(self):
1769 class TestResponderPsk(TemplateResponder, Ikev2Params):
1770 """ test ikev2 responder - pre shared key auth """
1771 def config_tc(self):
1772 self.config_params()
1775 class TestResponderDpd(TestResponderPsk):
1777 Dead peer detection test
1779 def config_tc(self):
1780 self.config_params({'dpd_disabled': False})
1785 def test_responder(self):
1786 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1787 super(TestResponderDpd, self).test_responder()
1788 self.pg0.enable_capture()
1790 # capture empty request but don't reply
1791 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1792 ih = self.get_ike_header(capture[0])
1793 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1794 plain = self.sa.hmac_and_decrypt(ih)
1795 self.assertEqual(plain, b'')
1796 # wait for SA expiration
1798 ike_sas = self.vapi.ikev2_sa_dump()
1799 self.assertEqual(len(ike_sas), 0)
1800 ipsec_sas = self.vapi.ipsec_sa_dump()
1801 self.assertEqual(len(ipsec_sas), 0)
1804 class TestResponderRekey(TestResponderPsk):
1805 """ test ikev2 responder - rekey """
1807 def rekey_from_initiator(self):
1808 packet = self.create_rekey_request()
1809 self.pg0.add_stream(packet)
1810 self.pg0.enable_capture()
1812 capture = self.pg0.get_capture(1)
1813 ih = self.get_ike_header(capture[0])
1814 plain = self.sa.hmac_and_decrypt(ih)
1815 sa = ikev2.IKEv2_payload_SA(plain)
1816 prop = sa[ikev2.IKEv2_payload_Proposal]
1817 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1818 # update new responder SPI
1819 self.sa.child_sas[0].rspi = prop.SPI
1821 def test_responder(self):
1822 super(TestResponderRekey, self).test_responder()
1823 self.rekey_from_initiator()
1824 self.sa.calc_child_keys()
1825 self.verify_ike_sas()
1826 self.verify_ipsec_sas(is_rekey=True)
1829 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1830 """ test ikev2 responder - cert based auth """
1831 def config_tc(self):
1832 self.config_params({
1835 'server-key': 'server-key.pem',
1836 'client-key': 'client-key.pem',
1837 'client-cert': 'client-cert.pem',
1838 'server-cert': 'server-cert.pem'})
1841 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1842 (TemplateResponder, Ikev2Params):
1844 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1846 def config_tc(self):
1847 self.config_params({
1848 'ike-crypto': ('AES-CBC', 16),
1849 'ike-integ': 'SHA2-256-128',
1850 'esp-crypto': ('AES-CBC', 24),
1851 'esp-integ': 'SHA2-384-192',
1852 'ike-dh': '2048MODPgr'})
1855 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1856 (TemplateResponder, Ikev2Params):
1858 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1860 def config_tc(self):
1861 self.config_params({
1862 'ike-crypto': ('AES-CBC', 32),
1863 'ike-integ': 'SHA2-256-128',
1864 'esp-crypto': ('AES-GCM-16ICV', 32),
1865 'esp-integ': 'NULL',
1866 'ike-dh': '3072MODPgr'})
1869 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1873 def config_tc(self):
1874 self.config_params({
1875 'del_sa_from_responder': True,
1878 'ike-crypto': ('AES-GCM-16ICV', 32),
1879 'ike-integ': 'NULL',
1880 'ike-dh': '2048MODPgr',
1881 'loc_ts': {'start_addr': 'ab:cd::0',
1882 'end_addr': 'ab:cd::10'},
1883 'rem_ts': {'start_addr': '11::0',
1884 'end_addr': '11::100'}})
1887 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1889 Test for keep alive messages
1892 def send_empty_req_from_responder(self):
1893 packet = self.create_empty_request()
1894 self.pg0.add_stream(packet)
1895 self.pg0.enable_capture()
1897 capture = self.pg0.get_capture(1)
1898 ih = self.get_ike_header(capture[0])
1899 self.assertEqual(ih.id, self.sa.msg_id)
1900 plain = self.sa.hmac_and_decrypt(ih)
1901 self.assertEqual(plain, b'')
1903 def test_initiator(self):
1904 super(TestInitiatorKeepaliveMsg, self).test_initiator()
1905 self.send_empty_req_from_responder()
1908 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1909 """ malformed packet test """
1914 def config_tc(self):
1915 self.config_params()
1917 def assert_counter(self, count, name, version='ip4'):
1918 node_name = '/err/ikev2-%s/' % version + name
1919 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1921 def create_ike_init_msg(self, length=None, payload=None):
1922 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1923 flags='Initiator', exch_type='IKE_SA_INIT')
1924 if payload is not None:
1926 return self.create_packet(self.pg0, msg, self.sa.sport,
1929 def verify_bad_packet_length(self):
1930 ike_msg = self.create_ike_init_msg(length=0xdead)
1931 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1932 self.assert_counter(self.pkt_count, 'Bad packet length')
1934 def verify_bad_sa_payload_length(self):
1935 p = ikev2.IKEv2_payload_SA(length=0xdead)
1936 ike_msg = self.create_ike_init_msg(payload=p)
1937 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1938 self.assert_counter(self.pkt_count, 'Malformed packet')
1940 def test_responder(self):
1941 self.pkt_count = 254
1942 self.verify_bad_packet_length()
1943 self.verify_bad_sa_payload_length()
1946 if __name__ == '__main__':
1947 unittest.main(testRunner=VppTestRunner)