3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import tag_fixme_vpp_workers
22 from framework import VppTestCase, VppTestRunner
23 from vpp_ikev2 import Profile, IDType, AuthMethod
24 from vpp_papi import VppEnum
31 KEY_PAD = b"Key Pad for IKEv2"
38 # tuple structure is (p, g, key_len)
40 '2048MODPgr': (long_converter("""
41 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
51 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
53 '3072MODPgr': (long_converter("""
54 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
55 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
56 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
57 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
58 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
59 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
60 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
61 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
62 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
63 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
64 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
65 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
66 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
67 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
68 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
69 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
73 class CryptoAlgo(object):
74 def __init__(self, name, cipher, mode):
78 if self.cipher is not None:
79 self.bs = self.cipher.block_size // 8
81 if self.name == 'AES-GCM-16ICV':
82 self.iv_len = GCM_IV_SIZE
86 def encrypt(self, data, key, aad=None):
87 iv = os.urandom(self.iv_len)
89 encryptor = Cipher(self.cipher(key), self.mode(iv),
90 default_backend()).encryptor()
91 return iv + encryptor.update(data) + encryptor.finalize()
93 salt = key[-SALT_SIZE:]
95 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
96 default_backend()).encryptor()
97 encryptor.authenticate_additional_data(aad)
98 data = encryptor.update(data) + encryptor.finalize()
99 data += encryptor.tag[:GCM_ICV_SIZE]
102 def decrypt(self, data, key, aad=None, icv=None):
104 iv = data[:self.iv_len]
105 ct = data[self.iv_len:]
106 decryptor = Cipher(algorithms.AES(key),
108 default_backend()).decryptor()
109 return decryptor.update(ct) + decryptor.finalize()
111 salt = key[-SALT_SIZE:]
112 nonce = salt + data[:GCM_IV_SIZE]
113 ct = data[GCM_IV_SIZE:]
114 key = key[:-SALT_SIZE]
115 decryptor = Cipher(algorithms.AES(key),
116 self.mode(nonce, icv, len(icv)),
117 default_backend()).decryptor()
118 decryptor.authenticate_additional_data(aad)
119 return decryptor.update(ct) + decryptor.finalize()
122 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
123 data = data + b'\x00' * (pad_len - 1)
124 return data + bytes([pad_len - 1])
127 class AuthAlgo(object):
128 def __init__(self, name, mac, mod, key_len, trunc_len=None):
132 self.key_len = key_len
133 self.trunc_len = trunc_len or key_len
137 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
138 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
139 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
144 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
145 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
146 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
147 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
148 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
152 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
153 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
170 class IKEv2ChildSA(object):
171 def __init__(self, local_ts, remote_ts, is_initiator):
179 self.local_ts = local_ts
180 self.remote_ts = remote_ts
183 class IKEv2SA(object):
184 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
185 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
186 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
187 auth_method='shared-key', priv_key=None, i_natt=False,
188 r_natt=False, udp_encap=False):
189 self.udp_encap = udp_encap
199 self.dh_params = None
201 self.priv_key = priv_key
202 self.is_initiator = is_initiator
203 nonce = nonce or os.urandom(32)
204 self.auth_data = auth_data
207 if isinstance(id_type, str):
208 self.id_type = IDType.value(id_type)
210 self.id_type = id_type
211 self.auth_method = auth_method
212 if self.is_initiator:
213 self.rspi = 8 * b'\x00'
218 self.ispi = 8 * b'\x00'
220 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
223 def new_msg_id(self):
228 def my_dh_pub_key(self):
229 if self.is_initiator:
230 return self.i_dh_data
231 return self.r_dh_data
234 def peer_dh_pub_key(self):
235 if self.is_initiator:
236 return self.r_dh_data
237 return self.i_dh_data
241 return self.i_natt or self.r_natt
243 def compute_secret(self):
244 priv = self.dh_private_key
245 peer = self.peer_dh_pub_key
246 p, g, l = self.ike_group
247 return pow(int.from_bytes(peer, 'big'),
248 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
250 def generate_dh_data(self):
252 if self.ike_dh not in DH:
253 raise NotImplementedError('%s not in DH group' % self.ike_dh)
255 if self.dh_params is None:
256 dhg = DH[self.ike_dh]
257 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
258 self.dh_params = pn.parameters(default_backend())
260 priv = self.dh_params.generate_private_key()
261 pub = priv.public_key()
262 x = priv.private_numbers().x
263 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
264 y = pub.public_numbers().y
266 if self.is_initiator:
267 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
269 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
271 def complete_dh_data(self):
272 self.dh_shared_secret = self.compute_secret()
274 def calc_child_keys(self):
275 prf = self.ike_prf_alg.mod()
276 s = self.i_nonce + self.r_nonce
277 c = self.child_sas[0]
279 encr_key_len = self.esp_crypto_key_len
280 integ_key_len = self.esp_integ_alg.key_len
281 salt_len = 0 if integ_key_len else 4
283 l = (integ_key_len * 2 +
286 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
289 c.sk_ei = keymat[pos:pos+encr_key_len]
293 c.sk_ai = keymat[pos:pos+integ_key_len]
296 c.salt_ei = keymat[pos:pos+salt_len]
299 c.sk_er = keymat[pos:pos+encr_key_len]
303 c.sk_ar = keymat[pos:pos+integ_key_len]
306 c.salt_er = keymat[pos:pos+salt_len]
309 def calc_prfplus(self, prf, key, seed, length):
313 while len(r) < length and x < 255:
318 s = s + seed + bytes([x])
319 t = self.calc_prf(prf, key, s)
327 def calc_prf(self, prf, key, data):
328 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
333 prf = self.ike_prf_alg.mod()
334 # SKEYSEED = prf(Ni | Nr, g^ir)
335 s = self.i_nonce + self.r_nonce
336 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
338 # calculate S = Ni | Nr | SPIi SPIr
339 s = s + self.ispi + self.rspi
341 prf_key_trunc = self.ike_prf_alg.trunc_len
342 encr_key_len = self.ike_crypto_key_len
343 tr_prf_key_len = self.ike_prf_alg.key_len
344 integ_key_len = self.ike_integ_alg.key_len
345 if integ_key_len == 0:
355 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
358 self.sk_d = keymat[:pos+prf_key_trunc]
361 self.sk_ai = keymat[pos:pos+integ_key_len]
363 self.sk_ar = keymat[pos:pos+integ_key_len]
366 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
367 pos += encr_key_len + salt_size
368 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
369 pos += encr_key_len + salt_size
371 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
372 pos += tr_prf_key_len
373 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
375 def generate_authmsg(self, prf, packet):
376 if self.is_initiator:
384 data = bytes([self.id_type, 0, 0, 0]) + id
385 id_hash = self.calc_prf(prf, key, data)
386 return packet + nonce + id_hash
389 prf = self.ike_prf_alg.mod()
390 if self.is_initiator:
391 packet = self.init_req_packet
393 packet = self.init_resp_packet
394 authmsg = self.generate_authmsg(prf, raw(packet))
395 if self.auth_method == 'shared-key':
396 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
397 self.auth_data = self.calc_prf(prf, psk, authmsg)
398 elif self.auth_method == 'rsa-sig':
399 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
402 raise TypeError('unknown auth method type!')
404 def encrypt(self, data, aad=None):
405 data = self.ike_crypto_alg.pad(data)
406 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
409 def peer_authkey(self):
410 if self.is_initiator:
415 def my_authkey(self):
416 if self.is_initiator:
421 def my_cryptokey(self):
422 if self.is_initiator:
427 def peer_cryptokey(self):
428 if self.is_initiator:
432 def concat(self, alg, key_len):
433 return alg + '-' + str(key_len * 8)
436 def vpp_ike_cypto_alg(self):
437 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
440 def vpp_esp_cypto_alg(self):
441 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
443 def verify_hmac(self, ikemsg):
444 integ_trunc = self.ike_integ_alg.trunc_len
445 exp_hmac = ikemsg[-integ_trunc:]
446 data = ikemsg[:-integ_trunc]
447 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
448 self.peer_authkey, data)
449 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
451 def compute_hmac(self, integ, key, data):
452 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
456 def decrypt(self, data, aad=None, icv=None):
457 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
459 def hmac_and_decrypt(self, ike):
460 ep = ike[ikev2.IKEv2_payload_Encrypted]
461 if self.ike_crypto == 'AES-GCM-16ICV':
462 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
463 ct = ep.load[:-GCM_ICV_SIZE]
464 tag = ep.load[-GCM_ICV_SIZE:]
465 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
467 self.verify_hmac(raw(ike))
468 integ_trunc = self.ike_integ_alg.trunc_len
470 # remove ICV and decrypt payload
471 ct = ep.load[:-integ_trunc]
472 plain = self.decrypt(ct)
475 return plain[:-pad_len - 1]
477 def build_ts_addr(self, ts, version):
478 return {'starting_address_v' + version: ts['start_addr'],
479 'ending_address_v' + version: ts['end_addr']}
481 def generate_ts(self, is_ip4):
482 c = self.child_sas[0]
483 ts_data = {'IP_protocol_ID': 0,
487 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
488 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
489 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
490 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
492 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
493 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
494 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
495 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
497 if self.is_initiator:
498 return ([ts1], [ts2])
499 return ([ts2], [ts1])
501 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
502 if crypto not in CRYPTO_ALGOS:
503 raise TypeError('unsupported encryption algo %r' % crypto)
504 self.ike_crypto = crypto
505 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
506 self.ike_crypto_key_len = crypto_key_len
508 if integ not in AUTH_ALGOS:
509 raise TypeError('unsupported auth algo %r' % integ)
510 self.ike_integ = None if integ == 'NULL' else integ
511 self.ike_integ_alg = AUTH_ALGOS[integ]
513 if prf not in PRF_ALGOS:
514 raise TypeError('unsupported prf algo %r' % prf)
516 self.ike_prf_alg = PRF_ALGOS[prf]
518 self.ike_group = DH[self.ike_dh]
520 def set_esp_props(self, crypto, crypto_key_len, integ):
521 self.esp_crypto_key_len = crypto_key_len
522 if crypto not in CRYPTO_ALGOS:
523 raise TypeError('unsupported encryption algo %r' % crypto)
524 self.esp_crypto = crypto
525 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
527 if integ not in AUTH_ALGOS:
528 raise TypeError('unsupported auth algo %r' % integ)
529 self.esp_integ = None if integ == 'NULL' else integ
530 self.esp_integ_alg = AUTH_ALGOS[integ]
532 def crypto_attr(self, key_len):
533 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
534 return (0x800e << 16 | key_len << 3, 12)
536 raise Exception('unsupported attribute type')
538 def ike_crypto_attr(self):
539 return self.crypto_attr(self.ike_crypto_key_len)
541 def esp_crypto_attr(self):
542 return self.crypto_attr(self.esp_crypto_key_len)
544 def compute_nat_sha1(self, ip, port, rspi=None):
547 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
548 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
550 return digest.finalize()
553 class IkePeer(VppTestCase):
554 """ common class for initiator and responder """
558 import scapy.contrib.ikev2 as _ikev2
559 globals()['ikev2'] = _ikev2
560 super(IkePeer, cls).setUpClass()
561 cls.create_pg_interfaces(range(2))
562 for i in cls.pg_interfaces:
570 def tearDownClass(cls):
571 super(IkePeer, cls).tearDownClass()
574 super(IkePeer, self).tearDown()
575 if self.del_sa_from_responder:
576 self.initiate_del_sa_from_responder()
578 self.initiate_del_sa_from_initiator()
579 r = self.vapi.ikev2_sa_dump()
580 self.assertEqual(len(r), 0)
581 sas = self.vapi.ipsec_sa_dump()
582 self.assertEqual(len(sas), 0)
583 self.p.remove_vpp_config()
584 self.assertIsNone(self.p.query_vpp_config())
587 super(IkePeer, self).setUp()
589 self.p.add_vpp_config()
590 self.assertIsNotNone(self.p.query_vpp_config())
591 if self.sa.is_initiator:
592 self.sa.generate_dh_data()
593 self.vapi.cli('ikev2 set logging level 4')
594 self.vapi.cli('event-lo clear')
596 def assert_counter(self, count, name, version='ip4'):
597 node_name = '/err/ikev2-%s/' % version + name
598 self.assertEqual(count, self.statistics.get_err_counter(node_name))
600 def create_rekey_request(self):
601 sa, first_payload = self.generate_auth_payload(is_rekey=True)
602 header = ikev2.IKEv2(
603 init_SPI=self.sa.ispi,
604 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
605 flags='Initiator', exch_type='CREATE_CHILD_SA')
607 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
608 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
609 self.sa.dport, self.sa.natt, self.ip6)
611 def create_empty_request(self):
612 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
613 id=self.sa.new_msg_id(), flags='Initiator',
614 exch_type='INFORMATIONAL',
615 next_payload='Encrypted')
617 msg = self.encrypt_ike_msg(header, b'', None)
618 return self.create_packet(self.pg0, msg, self.sa.sport,
619 self.sa.dport, self.sa.natt, self.ip6)
621 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
624 src_ip = src_if.remote_ip6
625 dst_ip = src_if.local_ip6
628 src_ip = src_if.remote_ip4
629 dst_ip = src_if.local_ip4
631 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
632 ip_layer(src=src_ip, dst=dst_ip) /
633 UDP(sport=sport, dport=dport))
635 # insert non ESP marker
636 res = res / Raw(b'\x00' * 4)
639 def verify_udp(self, udp):
640 self.assertEqual(udp.sport, self.sa.sport)
641 self.assertEqual(udp.dport, self.sa.dport)
643 def get_ike_header(self, packet):
645 ih = packet[ikev2.IKEv2]
646 ih = self.verify_and_remove_non_esp_marker(ih)
647 except IndexError as e:
648 # this is a workaround for getting IKEv2 layer as both ikev2 and
649 # ipsec register for port 4500
651 ih = self.verify_and_remove_non_esp_marker(esp)
652 self.assertEqual(ih.version, 0x20)
653 self.assertNotIn('Version', ih.flags)
656 def verify_and_remove_non_esp_marker(self, packet):
658 # if we are in nat traversal mode check for non esp marker
661 self.assertEqual(data[:4], b'\x00' * 4)
662 return ikev2.IKEv2(data[4:])
666 def encrypt_ike_msg(self, header, plain, first_payload):
667 if self.sa.ike_crypto == 'AES-GCM-16ICV':
668 data = self.sa.ike_crypto_alg.pad(raw(plain))
669 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
670 len(ikev2.IKEv2_payload_Encrypted())
671 tlen = plen + len(ikev2.IKEv2())
674 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
678 encr = self.sa.encrypt(raw(plain), raw(res))
679 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
680 length=plen, load=encr)
683 encr = self.sa.encrypt(raw(plain))
684 trunc_len = self.sa.ike_integ_alg.trunc_len
685 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
686 tlen = plen + len(ikev2.IKEv2())
688 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
689 length=plen, load=encr)
693 integ_data = raw(res)
694 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
695 self.sa.my_authkey, integ_data)
696 res = res / Raw(hmac_data[:trunc_len])
697 assert(len(res) == tlen)
700 def verify_udp_encap(self, ipsec_sa):
701 e = VppEnum.vl_api_ipsec_sad_flags_t
702 if self.sa.udp_encap or self.sa.natt:
703 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
705 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
707 def verify_ipsec_sas(self, is_rekey=False):
708 sas = self.vapi.ipsec_sa_dump()
710 # after rekey there is a short period of time in which old
711 # inbound SA is still present
715 self.assertEqual(len(sas), sa_count)
716 if self.sa.is_initiator:
731 c = self.sa.child_sas[0]
733 self.verify_udp_encap(sa0)
734 self.verify_udp_encap(sa1)
735 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
736 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
737 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
739 if self.sa.esp_integ is None:
742 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
743 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
744 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
747 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
748 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
749 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
750 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
754 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
755 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
756 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
757 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
759 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
760 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
762 def verify_keymat(self, api_keys, keys, name):
763 km = getattr(keys, name)
764 api_km = getattr(api_keys, name)
765 api_km_len = getattr(api_keys, name + '_len')
766 self.assertEqual(len(km), api_km_len)
767 self.assertEqual(km, api_km[:api_km_len])
769 def verify_id(self, api_id, exp_id):
770 self.assertEqual(api_id.type, IDType.value(exp_id.type))
771 self.assertEqual(api_id.data_len, exp_id.data_len)
772 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
774 def verify_ike_sas(self):
775 r = self.vapi.ikev2_sa_dump()
776 self.assertEqual(len(r), 1)
778 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
779 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
781 if self.sa.is_initiator:
782 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
783 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
785 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
786 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
788 if self.sa.is_initiator:
789 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
790 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
792 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
793 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
794 self.verify_keymat(sa.keys, self.sa, 'sk_d')
795 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
797 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
798 self.verify_keymat(sa.keys, self.sa, 'sk_er')
799 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
800 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
802 self.assertEqual(sa.i_id.type, self.sa.id_type)
803 self.assertEqual(sa.r_id.type, self.sa.id_type)
804 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
805 self.assertEqual(sa.r_id.data_len, len(self.idr))
806 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
807 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr)
809 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
810 self.assertEqual(len(r), 1)
812 self.assertEqual(csa.sa_index, sa.sa_index)
813 c = self.sa.child_sas[0]
814 if hasattr(c, 'sk_ai'):
815 self.verify_keymat(csa.keys, c, 'sk_ai')
816 self.verify_keymat(csa.keys, c, 'sk_ar')
817 self.verify_keymat(csa.keys, c, 'sk_ei')
818 self.verify_keymat(csa.keys, c, 'sk_er')
819 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
820 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
822 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
825 r = self.vapi.ikev2_traffic_selector_dump(
826 is_initiator=True, sa_index=sa.sa_index,
827 child_sa_index=csa.child_sa_index)
828 self.assertEqual(len(r), 1)
830 self.verify_ts(r[0].ts, tsi[0], True)
832 r = self.vapi.ikev2_traffic_selector_dump(
833 is_initiator=False, sa_index=sa.sa_index,
834 child_sa_index=csa.child_sa_index)
835 self.assertEqual(len(r), 1)
836 self.verify_ts(r[0].ts, tsr[0], False)
838 n = self.vapi.ikev2_nonce_get(is_initiator=True,
839 sa_index=sa.sa_index)
840 self.verify_nonce(n, self.sa.i_nonce)
841 n = self.vapi.ikev2_nonce_get(is_initiator=False,
842 sa_index=sa.sa_index)
843 self.verify_nonce(n, self.sa.r_nonce)
845 def verify_nonce(self, api_nonce, nonce):
846 self.assertEqual(api_nonce.data_len, len(nonce))
847 self.assertEqual(api_nonce.nonce, nonce)
849 def verify_ts(self, api_ts, ts, is_initiator):
851 self.assertTrue(api_ts.is_local)
853 self.assertFalse(api_ts.is_local)
856 self.assertEqual(api_ts.start_addr,
857 IPv4Address(ts.starting_address_v4))
858 self.assertEqual(api_ts.end_addr,
859 IPv4Address(ts.ending_address_v4))
861 self.assertEqual(api_ts.start_addr,
862 IPv6Address(ts.starting_address_v6))
863 self.assertEqual(api_ts.end_addr,
864 IPv6Address(ts.ending_address_v6))
865 self.assertEqual(api_ts.start_port, ts.start_port)
866 self.assertEqual(api_ts.end_port, ts.end_port)
867 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
870 class TemplateInitiator(IkePeer):
871 """ initiator test template """
873 def initiate_del_sa_from_initiator(self):
874 ispi = int.from_bytes(self.sa.ispi, 'little')
875 self.pg0.enable_capture()
877 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
878 capture = self.pg0.get_capture(1)
879 ih = self.get_ike_header(capture[0])
880 self.assertNotIn('Response', ih.flags)
881 self.assertIn('Initiator', ih.flags)
882 self.assertEqual(ih.init_SPI, self.sa.ispi)
883 self.assertEqual(ih.resp_SPI, self.sa.rspi)
884 plain = self.sa.hmac_and_decrypt(ih)
885 d = ikev2.IKEv2_payload_Delete(plain)
886 self.assertEqual(d.proto, 1) # proto=IKEv2
887 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
888 flags='Response', exch_type='INFORMATIONAL',
889 id=ih.id, next_payload='Encrypted')
890 resp = self.encrypt_ike_msg(header, b'', None)
891 self.send_and_assert_no_replies(self.pg0, resp)
893 def verify_del_sa(self, packet):
894 ih = self.get_ike_header(packet)
895 self.assertEqual(ih.id, self.sa.msg_id)
896 self.assertEqual(ih.exch_type, 37) # exchange informational
897 self.assertIn('Response', ih.flags)
898 self.assertIn('Initiator', ih.flags)
899 plain = self.sa.hmac_and_decrypt(ih)
900 self.assertEqual(plain, b'')
902 def initiate_del_sa_from_responder(self):
903 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
904 exch_type='INFORMATIONAL',
905 id=self.sa.new_msg_id())
906 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
907 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
908 packet = self.create_packet(self.pg0, ike_msg,
909 self.sa.sport, self.sa.dport,
910 self.sa.natt, self.ip6)
911 self.pg0.add_stream(packet)
912 self.pg0.enable_capture()
914 capture = self.pg0.get_capture(1)
915 self.verify_del_sa(capture[0])
918 def find_notify_payload(packet, notify_type):
919 n = packet[ikev2.IKEv2_payload_Notify]
921 if n.type == notify_type:
926 def verify_nat_detection(self, packet):
933 # NAT_DETECTION_SOURCE_IP
934 s = self.find_notify_payload(packet, 16388)
935 self.assertIsNotNone(s)
936 src_sha = self.sa.compute_nat_sha1(
937 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
938 self.assertEqual(s.load, src_sha)
940 # NAT_DETECTION_DESTINATION_IP
941 s = self.find_notify_payload(packet, 16389)
942 self.assertIsNotNone(s)
943 dst_sha = self.sa.compute_nat_sha1(
944 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
945 self.assertEqual(s.load, dst_sha)
947 def verify_sa_init_request(self, packet):
949 self.sa.dport = udp.sport
950 ih = packet[ikev2.IKEv2]
951 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
952 self.assertEqual(ih.exch_type, 34) # SA_INIT
953 self.sa.ispi = ih.init_SPI
954 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
955 self.assertIn('Initiator', ih.flags)
956 self.assertNotIn('Response', ih.flags)
957 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
958 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
960 prop = packet[ikev2.IKEv2_payload_Proposal]
961 self.assertEqual(prop.proto, 1) # proto = ikev2
962 self.assertEqual(prop.proposal, 1)
963 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
964 self.assertEqual(prop.trans[0].transform_id,
965 self.p.ike_transforms['crypto_alg'])
966 self.assertEqual(prop.trans[1].transform_type, 2) # prf
967 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
968 self.assertEqual(prop.trans[2].transform_type, 4) # dh
969 self.assertEqual(prop.trans[2].transform_id,
970 self.p.ike_transforms['dh_group'])
972 self.verify_nat_detection(packet)
973 self.sa.set_ike_props(
974 crypto='AES-GCM-16ICV', crypto_key_len=32,
975 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
976 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
977 integ='SHA2-256-128')
978 self.sa.generate_dh_data()
979 self.sa.complete_dh_data()
982 def update_esp_transforms(self, trans, sa):
984 if trans.transform_type == 1: # ecryption
985 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
986 elif trans.transform_type == 3: # integrity
987 sa.esp_integ = INTEG_IDS[trans.transform_id]
988 trans = trans.payload
990 def verify_sa_auth_req(self, packet):
992 self.sa.dport = udp.sport
993 ih = self.get_ike_header(packet)
994 self.assertEqual(ih.resp_SPI, self.sa.rspi)
995 self.assertEqual(ih.init_SPI, self.sa.ispi)
996 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
997 self.assertIn('Initiator', ih.flags)
998 self.assertNotIn('Response', ih.flags)
1001 self.verify_udp(udp)
1002 self.assertEqual(ih.id, self.sa.msg_id + 1)
1004 plain = self.sa.hmac_and_decrypt(ih)
1005 idi = ikev2.IKEv2_payload_IDi(plain)
1006 self.assertEqual(idi.load, self.sa.i_id)
1007 if self.no_idr_auth:
1008 self.assertEqual(idi.next_payload, 39) # AUTH
1010 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1011 self.assertEqual(idr.load, self.sa.r_id)
1012 prop = idi[ikev2.IKEv2_payload_Proposal]
1013 c = self.sa.child_sas[0]
1015 self.update_esp_transforms(
1016 prop[ikev2.IKEv2_payload_Transform], self.sa)
1018 def send_init_response(self):
1019 tr_attr = self.sa.ike_crypto_attr()
1020 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1021 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1022 key_length=tr_attr[0]) /
1023 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1024 transform_id=self.sa.ike_integ) /
1025 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1026 transform_id=self.sa.ike_prf_alg.name) /
1027 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1028 transform_id=self.sa.ike_dh))
1029 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1030 trans_nb=4, trans=trans))
1032 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1034 dst_address = b'\x0a\x0a\x0a\x0a'
1036 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1037 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1038 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1040 self.sa.init_resp_packet = (
1041 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1042 exch_type='IKE_SA_INIT', flags='Response') /
1043 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1044 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1045 group=self.sa.ike_dh,
1046 load=self.sa.my_dh_pub_key) /
1047 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1048 next_payload='Notify') /
1049 ikev2.IKEv2_payload_Notify(
1050 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1051 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1052 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1054 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1055 self.sa.sport, self.sa.dport,
1057 self.pg_send(self.pg0, ike_msg)
1058 capture = self.pg0.get_capture(1)
1059 self.verify_sa_auth_req(capture[0])
1061 def initiate_sa_init(self):
1062 self.pg0.enable_capture()
1064 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1066 capture = self.pg0.get_capture(1)
1067 self.verify_sa_init_request(capture[0])
1068 self.send_init_response()
1070 def send_auth_response(self):
1071 tr_attr = self.sa.esp_crypto_attr()
1072 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1073 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1074 key_length=tr_attr[0]) /
1075 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1076 transform_id=self.sa.esp_integ) /
1077 ikev2.IKEv2_payload_Transform(
1078 transform_type='Extended Sequence Number',
1079 transform_id='No ESN') /
1080 ikev2.IKEv2_payload_Transform(
1081 transform_type='Extended Sequence Number',
1082 transform_id='ESN'))
1084 c = self.sa.child_sas[0]
1085 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1086 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1088 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1089 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1090 IDtype=self.sa.id_type, load=self.sa.i_id) /
1091 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1092 IDtype=self.sa.id_type, load=self.sa.r_id) /
1093 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1094 auth_type=AuthMethod.value(self.sa.auth_method),
1095 load=self.sa.auth_data) /
1096 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1097 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1098 number_of_TSs=len(tsi),
1099 traffic_selector=tsi) /
1100 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1101 number_of_TSs=len(tsr),
1102 traffic_selector=tsr) /
1103 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1105 header = ikev2.IKEv2(
1106 init_SPI=self.sa.ispi,
1107 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1108 flags='Response', exch_type='IKE_AUTH')
1110 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1111 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1112 self.sa.dport, self.sa.natt, self.ip6)
1113 self.pg_send(self.pg0, packet)
1115 def test_initiator(self):
1116 self.initiate_sa_init()
1118 self.sa.calc_child_keys()
1119 self.send_auth_response()
1120 self.verify_ike_sas()
1123 class TemplateResponder(IkePeer):
1124 """ responder test template """
1126 def initiate_del_sa_from_responder(self):
1127 self.pg0.enable_capture()
1129 self.vapi.ikev2_initiate_del_ike_sa(
1130 ispi=int.from_bytes(self.sa.ispi, 'little'))
1131 capture = self.pg0.get_capture(1)
1132 ih = self.get_ike_header(capture[0])
1133 self.assertNotIn('Response', ih.flags)
1134 self.assertNotIn('Initiator', ih.flags)
1135 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1136 plain = self.sa.hmac_and_decrypt(ih)
1137 d = ikev2.IKEv2_payload_Delete(plain)
1138 self.assertEqual(d.proto, 1) # proto=IKEv2
1139 self.assertEqual(ih.init_SPI, self.sa.ispi)
1140 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1141 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1142 flags='Initiator+Response',
1143 exch_type='INFORMATIONAL',
1144 id=ih.id, next_payload='Encrypted')
1145 resp = self.encrypt_ike_msg(header, b'', None)
1146 self.send_and_assert_no_replies(self.pg0, resp)
1148 def verify_del_sa(self, packet):
1149 ih = self.get_ike_header(packet)
1150 self.assertEqual(ih.id, self.sa.msg_id)
1151 self.assertEqual(ih.exch_type, 37) # exchange informational
1152 self.assertIn('Response', ih.flags)
1153 self.assertNotIn('Initiator', ih.flags)
1154 self.assertEqual(ih.next_payload, 46) # Encrypted
1155 self.assertEqual(ih.init_SPI, self.sa.ispi)
1156 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1157 plain = self.sa.hmac_and_decrypt(ih)
1158 self.assertEqual(plain, b'')
1160 def initiate_del_sa_from_initiator(self):
1161 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1162 flags='Initiator', exch_type='INFORMATIONAL',
1163 id=self.sa.new_msg_id())
1164 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1165 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1166 packet = self.create_packet(self.pg0, ike_msg,
1167 self.sa.sport, self.sa.dport,
1168 self.sa.natt, self.ip6)
1169 self.pg0.add_stream(packet)
1170 self.pg0.enable_capture()
1172 capture = self.pg0.get_capture(1)
1173 self.verify_del_sa(capture[0])
1175 def send_sa_init_req(self):
1176 tr_attr = self.sa.ike_crypto_attr()
1177 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1178 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1179 key_length=tr_attr[0]) /
1180 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1181 transform_id=self.sa.ike_integ) /
1182 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1183 transform_id=self.sa.ike_prf_alg.name) /
1184 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1185 transform_id=self.sa.ike_dh))
1187 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1188 trans_nb=4, trans=trans))
1190 next_payload = None if self.ip6 else 'Notify'
1192 self.sa.init_req_packet = (
1193 ikev2.IKEv2(init_SPI=self.sa.ispi,
1194 flags='Initiator', exch_type='IKE_SA_INIT') /
1195 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1196 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1197 group=self.sa.ike_dh,
1198 load=self.sa.my_dh_pub_key) /
1199 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1200 load=self.sa.i_nonce))
1204 src_address = b'\x0a\x0a\x0a\x01'
1206 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1209 dst_address = b'\x0a\x0a\x0a\x0a'
1211 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1213 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1214 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1215 nat_src_detection = ikev2.IKEv2_payload_Notify(
1216 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1217 next_payload='Notify')
1218 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1219 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1220 self.sa.init_req_packet = (self.sa.init_req_packet /
1224 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1225 self.sa.sport, self.sa.dport,
1226 self.sa.natt, self.ip6)
1227 self.pg0.add_stream(ike_msg)
1228 self.pg0.enable_capture()
1230 capture = self.pg0.get_capture(1)
1231 self.verify_sa_init(capture[0])
1233 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1234 tr_attr = self.sa.esp_crypto_attr()
1235 last_payload = last_payload or 'Notify'
1236 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1237 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1238 key_length=tr_attr[0]) /
1239 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1240 transform_id=self.sa.esp_integ) /
1241 ikev2.IKEv2_payload_Transform(
1242 transform_type='Extended Sequence Number',
1243 transform_id='No ESN') /
1244 ikev2.IKEv2_payload_Transform(
1245 transform_type='Extended Sequence Number',
1246 transform_id='ESN'))
1248 c = self.sa.child_sas[0]
1249 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1250 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1252 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1253 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1254 auth_type=AuthMethod.value(self.sa.auth_method),
1255 load=self.sa.auth_data) /
1256 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1257 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1258 number_of_TSs=len(tsi), traffic_selector=tsi) /
1259 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1260 number_of_TSs=len(tsr), traffic_selector=tsr))
1263 first_payload = 'Nonce'
1264 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1265 next_payload='SA') / plain /
1266 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1267 proto='ESP', SPI=c.ispi))
1269 first_payload = 'IDi'
1270 if self.no_idr_auth:
1271 ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH',
1272 IDtype=self.sa.id_type,
1275 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1276 IDtype=self.sa.id_type, load=self.sa.i_id) /
1277 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1278 IDtype=self.sa.id_type, load=self.sa.r_id))
1280 return plain, first_payload
1282 def send_sa_auth(self):
1283 plain, first_payload = self.generate_auth_payload(
1284 last_payload='Notify')
1285 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1286 header = ikev2.IKEv2(
1287 init_SPI=self.sa.ispi,
1288 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1289 flags='Initiator', exch_type='IKE_AUTH')
1291 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1292 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1293 self.sa.dport, self.sa.natt, self.ip6)
1294 self.pg0.add_stream(packet)
1295 self.pg0.enable_capture()
1297 capture = self.pg0.get_capture(1)
1298 self.verify_sa_auth_resp(capture[0])
1300 def verify_sa_init(self, packet):
1301 ih = self.get_ike_header(packet)
1303 self.assertEqual(ih.id, self.sa.msg_id)
1304 self.assertEqual(ih.exch_type, 34)
1305 self.assertIn('Response', ih.flags)
1306 self.assertEqual(ih.init_SPI, self.sa.ispi)
1307 self.assertNotEqual(ih.resp_SPI, 0)
1308 self.sa.rspi = ih.resp_SPI
1310 sa = ih[ikev2.IKEv2_payload_SA]
1311 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1312 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1313 except IndexError as e:
1314 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1315 self.logger.error(ih.show())
1317 self.sa.complete_dh_data()
1321 def verify_sa_auth_resp(self, packet):
1322 ike = self.get_ike_header(packet)
1324 self.verify_udp(udp)
1325 self.assertEqual(ike.id, self.sa.msg_id)
1326 plain = self.sa.hmac_and_decrypt(ike)
1327 idr = ikev2.IKEv2_payload_IDr(plain)
1328 prop = idr[ikev2.IKEv2_payload_Proposal]
1329 self.assertEqual(prop.SPIsize, 4)
1330 self.sa.child_sas[0].rspi = prop.SPI
1331 self.sa.calc_child_keys()
1333 IKE_NODE_SUFFIX = 'ip4'
1335 def verify_counters(self):
1336 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1337 self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1338 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1340 r = self.vapi.ikev2_sa_dump()
1342 self.assertEqual(1, s.n_sa_auth_req)
1343 self.assertEqual(1, s.n_sa_init_req)
1345 def test_responder(self):
1346 self.send_sa_init_req()
1348 self.verify_ipsec_sas()
1349 self.verify_ike_sas()
1350 self.verify_counters()
1353 class Ikev2Params(object):
1354 def config_params(self, params={}):
1355 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1356 ei = VppEnum.vl_api_ipsec_integ_alg_t
1358 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1359 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1360 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1361 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1362 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1363 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1365 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1366 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1367 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1368 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1370 dpd_disabled = True if 'dpd_disabled' not in params else\
1371 params['dpd_disabled']
1373 self.vapi.cli('ikev2 dpd disable')
1374 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1375 not in params else params['del_sa_from_responder']
1376 i_natt = False if 'i_natt' not in params else params['i_natt']
1377 r_natt = False if 'r_natt' not in params else params['r_natt']
1378 self.p = Profile(self, 'pr1')
1379 self.ip6 = False if 'ip6' not in params else params['ip6']
1381 if 'auth' in params and params['auth'] == 'rsa-sig':
1382 auth_method = 'rsa-sig'
1383 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1384 self.vapi.ikev2_set_local_key(
1385 key_file=work_dir + params['server-key'])
1387 client_file = work_dir + params['client-cert']
1388 server_pem = open(work_dir + params['server-cert']).read()
1389 client_priv = open(work_dir + params['client-key']).read()
1390 client_priv = load_pem_private_key(str.encode(client_priv), None,
1392 self.peer_cert = x509.load_pem_x509_certificate(
1393 str.encode(server_pem),
1395 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1398 auth_data = b'$3cr3tpa$$w0rd'
1399 self.p.add_auth(method='shared-key', data=auth_data)
1400 auth_method = 'shared-key'
1403 is_init = True if 'is_initiator' not in params else\
1404 params['is_initiator']
1405 self.no_idr_auth = params.get('no_idr_in_auth', False)
1407 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1408 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1409 r_id = self.idr = idr['data']
1410 i_id = self.idi = idi['data']
1412 # scapy is initiator, VPP is responder
1413 self.p.add_local_id(**idr)
1414 self.p.add_remote_id(**idi)
1415 if self.no_idr_auth:
1418 # VPP is initiator, scapy is responder
1419 self.p.add_local_id(**idi)
1420 if not self.no_idr_auth:
1421 self.p.add_remote_id(**idr)
1423 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1424 'loc_ts' not in params else params['loc_ts']
1425 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1426 'rem_ts' not in params else params['rem_ts']
1427 self.p.add_local_ts(**loc_ts)
1428 self.p.add_remote_ts(**rem_ts)
1429 if 'responder' in params:
1430 self.p.add_responder(params['responder'])
1431 if 'ike_transforms' in params:
1432 self.p.add_ike_transforms(params['ike_transforms'])
1433 if 'esp_transforms' in params:
1434 self.p.add_esp_transforms(params['esp_transforms'])
1436 udp_encap = False if 'udp_encap' not in params else\
1439 self.p.set_udp_encap(True)
1441 if 'responder_hostname' in params:
1442 hn = params['responder_hostname']
1443 self.p.add_responder_hostname(hn)
1445 # configure static dns record
1446 self.vapi.dns_name_server_add_del(
1448 server_address=IPv4Address(u'8.8.8.8').packed)
1449 self.vapi.dns_enable_disable(enable=1)
1451 cmd = "dns cache add {} {}".format(hn['hostname'],
1452 self.pg0.remote_ip4)
1455 self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id,
1456 is_initiator=is_init,
1457 id_type=self.p.local_id['id_type'],
1458 i_natt=i_natt, r_natt=r_natt,
1459 priv_key=client_priv, auth_method=auth_method,
1460 nonce=params.get('nonce'),
1461 auth_data=auth_data, udp_encap=udp_encap,
1462 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1465 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1466 params['ike-crypto']
1467 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1469 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1472 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1473 params['esp-crypto']
1474 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1477 self.sa.set_ike_props(
1478 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1479 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1480 self.sa.set_esp_props(
1481 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1485 class TestApi(VppTestCase):
1486 """ Test IKEV2 API """
1488 def setUpClass(cls):
1489 super(TestApi, cls).setUpClass()
1492 def tearDownClass(cls):
1493 super(TestApi, cls).tearDownClass()
1496 super(TestApi, self).tearDown()
1497 self.p1.remove_vpp_config()
1498 self.p2.remove_vpp_config()
1499 r = self.vapi.ikev2_profile_dump()
1500 self.assertEqual(len(r), 0)
1502 def configure_profile(self, cfg):
1503 p = Profile(self, cfg['name'])
1504 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1505 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1506 p.add_local_ts(**cfg['loc_ts'])
1507 p.add_remote_ts(**cfg['rem_ts'])
1508 p.add_responder(cfg['responder'])
1509 p.add_ike_transforms(cfg['ike_ts'])
1510 p.add_esp_transforms(cfg['esp_ts'])
1511 p.add_auth(**cfg['auth'])
1512 p.set_udp_encap(cfg['udp_encap'])
1513 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1514 if 'lifetime_data' in cfg:
1515 p.set_lifetime_data(cfg['lifetime_data'])
1516 if 'tun_itf' in cfg:
1517 p.set_tunnel_interface(cfg['tun_itf'])
1518 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1523 def test_profile_api(self):
1524 """ test profile dump API """
1529 'start_addr': '3.3.3.2',
1530 'end_addr': '3.3.3.3',
1536 'start_addr': '4.5.76.80',
1537 'end_addr': '2.3.4.6',
1544 'start_addr': 'ab::1',
1545 'end_addr': 'ab::4',
1551 'start_addr': 'cd::12',
1552 'end_addr': 'cd::13',
1558 'natt_disabled': True,
1559 'loc_id': ('fqdn', b'vpp.home'),
1560 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1563 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1566 'crypto_key_size': 32,
1571 'crypto_key_size': 24,
1573 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1575 'ipsec_over_udp_port': 4501,
1578 'lifetime_maxdata': 20192,
1579 'lifetime_jitter': 9,
1584 'loc_id': ('ip4-addr', b'192.168.2.1'),
1585 'rem_id': ('ip6-addr', b'abcd::1'),
1588 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1591 'crypto_key_size': 16,
1596 'crypto_key_size': 24,
1598 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1600 'ipsec_over_udp_port': 4600,
1603 self.p1 = self.configure_profile(conf['p1'])
1604 self.p2 = self.configure_profile(conf['p2'])
1606 r = self.vapi.ikev2_profile_dump()
1607 self.assertEqual(len(r), 2)
1608 self.verify_profile(r[0].profile, conf['p1'])
1609 self.verify_profile(r[1].profile, conf['p2'])
1611 def verify_id(self, api_id, cfg_id):
1612 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1613 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1615 def verify_ts(self, api_ts, cfg_ts):
1616 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1617 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1618 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1619 self.assertEqual(api_ts.start_addr,
1620 ip_address(text_type(cfg_ts['start_addr'])))
1621 self.assertEqual(api_ts.end_addr,
1622 ip_address(text_type(cfg_ts['end_addr'])))
1624 def verify_responder(self, api_r, cfg_r):
1625 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1626 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1628 def verify_transforms(self, api_ts, cfg_ts):
1629 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1630 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1631 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1633 def verify_ike_transforms(self, api_ts, cfg_ts):
1634 self.verify_transforms(api_ts, cfg_ts)
1635 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1637 def verify_esp_transforms(self, api_ts, cfg_ts):
1638 self.verify_transforms(api_ts, cfg_ts)
1640 def verify_auth(self, api_auth, cfg_auth):
1641 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1642 self.assertEqual(api_auth.data, cfg_auth['data'])
1643 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1645 def verify_lifetime_data(self, p, ld):
1646 self.assertEqual(p.lifetime, ld['lifetime'])
1647 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1648 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1649 self.assertEqual(p.handover, ld['handover'])
1651 def verify_profile(self, ap, cp):
1652 self.assertEqual(ap.name, cp['name'])
1653 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1654 self.verify_id(ap.loc_id, cp['loc_id'])
1655 self.verify_id(ap.rem_id, cp['rem_id'])
1656 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1657 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1658 self.verify_responder(ap.responder, cp['responder'])
1659 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1660 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1661 self.verify_auth(ap.auth, cp['auth'])
1662 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1663 self.assertTrue(natt_dis == ap.natt_disabled)
1665 if 'lifetime_data' in cp:
1666 self.verify_lifetime_data(ap, cp['lifetime_data'])
1667 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1669 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1671 self.assertEqual(ap.tun_itf, 0xffffffff)
1674 @tag_fixme_vpp_workers
1675 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1676 """ test responder - responder behind NAT """
1678 IKE_NODE_SUFFIX = 'ip4-natt'
1680 def config_tc(self):
1681 self.config_params({'r_natt': True})
1684 @tag_fixme_vpp_workers
1685 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1686 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1688 def config_tc(self):
1689 self.config_params({
1691 'is_initiator': False, # seen from test case perspective
1692 # thus vpp is initiator
1693 'responder': {'sw_if_index': self.pg0.sw_if_index,
1694 'addr': self.pg0.remote_ip4},
1695 'ike-crypto': ('AES-GCM-16ICV', 32),
1696 'ike-integ': 'NULL',
1697 'ike-dh': '3072MODPgr',
1699 'crypto_alg': 20, # "aes-gcm-16"
1700 'crypto_key_size': 256,
1701 'dh_group': 15, # "modp-3072"
1704 'crypto_alg': 12, # "aes-cbc"
1705 'crypto_key_size': 256,
1706 # "hmac-sha2-256-128"
1710 @tag_fixme_vpp_workers
1711 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1712 """ test ikev2 initiator - pre shared key auth """
1714 def config_tc(self):
1715 self.config_params({
1716 'is_initiator': False, # seen from test case perspective
1717 # thus vpp is initiator
1718 'ike-crypto': ('AES-GCM-16ICV', 32),
1719 'ike-integ': 'NULL',
1720 'ike-dh': '3072MODPgr',
1722 'crypto_alg': 20, # "aes-gcm-16"
1723 'crypto_key_size': 256,
1724 'dh_group': 15, # "modp-3072"
1727 'crypto_alg': 12, # "aes-cbc"
1728 'crypto_key_size': 256,
1729 # "hmac-sha2-256-128"
1731 'responder_hostname': {'hostname': 'vpp.responder.org',
1732 'sw_if_index': self.pg0.sw_if_index}})
1735 @tag_fixme_vpp_workers
1736 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1737 """ test initiator - request window size (1) """
1739 def rekey_respond(self, req, update_child_sa_data):
1740 ih = self.get_ike_header(req)
1741 plain = self.sa.hmac_and_decrypt(ih)
1742 sa = ikev2.IKEv2_payload_SA(plain)
1743 if update_child_sa_data:
1744 prop = sa[ikev2.IKEv2_payload_Proposal]
1745 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1746 self.sa.r_nonce = self.sa.i_nonce
1747 self.sa.child_sas[0].ispi = prop.SPI
1748 self.sa.child_sas[0].rspi = prop.SPI
1749 self.sa.calc_child_keys()
1751 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1752 flags='Response', exch_type=36,
1753 id=ih.id, next_payload='Encrypted')
1754 resp = self.encrypt_ike_msg(header, sa, 'SA')
1755 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1756 self.sa.dport, self.sa.natt, self.ip6)
1757 self.send_and_assert_no_replies(self.pg0, packet)
1759 def test_initiator(self):
1760 super(TestInitiatorRequestWindowSize, self).test_initiator()
1761 self.pg0.enable_capture()
1763 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1764 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1765 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1766 capture = self.pg0.get_capture(2)
1768 # reply in reverse order
1769 self.rekey_respond(capture[1], True)
1770 self.rekey_respond(capture[0], False)
1772 # verify that only the second request was accepted
1773 self.verify_ike_sas()
1774 self.verify_ipsec_sas(is_rekey=True)
1777 @tag_fixme_vpp_workers
1778 class TestInitiatorRekey(TestInitiatorPsk):
1779 """ test ikev2 initiator - rekey """
1781 def rekey_from_initiator(self):
1782 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1783 self.pg0.enable_capture()
1785 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1786 capture = self.pg0.get_capture(1)
1787 ih = self.get_ike_header(capture[0])
1788 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1789 self.assertNotIn('Response', ih.flags)
1790 self.assertIn('Initiator', ih.flags)
1791 plain = self.sa.hmac_and_decrypt(ih)
1792 sa = ikev2.IKEv2_payload_SA(plain)
1793 prop = sa[ikev2.IKEv2_payload_Proposal]
1794 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1795 self.sa.r_nonce = self.sa.i_nonce
1796 # update new responder SPI
1797 self.sa.child_sas[0].ispi = prop.SPI
1798 self.sa.child_sas[0].rspi = prop.SPI
1799 self.sa.calc_child_keys()
1800 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1801 flags='Response', exch_type=36,
1802 id=ih.id, next_payload='Encrypted')
1803 resp = self.encrypt_ike_msg(header, sa, 'SA')
1804 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1805 self.sa.dport, self.sa.natt, self.ip6)
1806 self.send_and_assert_no_replies(self.pg0, packet)
1808 def test_initiator(self):
1809 super(TestInitiatorRekey, self).test_initiator()
1810 self.rekey_from_initiator()
1811 self.verify_ike_sas()
1812 self.verify_ipsec_sas(is_rekey=True)
1815 @tag_fixme_vpp_workers
1816 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1817 """ test ikev2 initiator - delete IKE SA from responder """
1819 def config_tc(self):
1820 self.config_params({
1821 'del_sa_from_responder': True,
1822 'is_initiator': False, # seen from test case perspective
1823 # thus vpp is initiator
1824 'responder': {'sw_if_index': self.pg0.sw_if_index,
1825 'addr': self.pg0.remote_ip4},
1826 'ike-crypto': ('AES-GCM-16ICV', 32),
1827 'ike-integ': 'NULL',
1828 'ike-dh': '3072MODPgr',
1830 'crypto_alg': 20, # "aes-gcm-16"
1831 'crypto_key_size': 256,
1832 'dh_group': 15, # "modp-3072"
1835 'crypto_alg': 12, # "aes-cbc"
1836 'crypto_key_size': 256,
1837 # "hmac-sha2-256-128"
1839 'no_idr_in_auth': True})
1842 @tag_fixme_vpp_workers
1843 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1844 """ test ikev2 responder - initiator behind NAT """
1846 IKE_NODE_SUFFIX = 'ip4-natt'
1848 def config_tc(self):
1853 @tag_fixme_vpp_workers
1854 class TestResponderPsk(TemplateResponder, Ikev2Params):
1855 """ test ikev2 responder - pre shared key auth """
1856 def config_tc(self):
1857 self.config_params()
1860 @tag_fixme_vpp_workers
1861 class TestResponderDpd(TestResponderPsk):
1863 Dead peer detection test
1865 def config_tc(self):
1866 self.config_params({'dpd_disabled': False})
1871 def test_responder(self):
1872 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1873 super(TestResponderDpd, self).test_responder()
1874 self.pg0.enable_capture()
1876 # capture empty request but don't reply
1877 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1878 ih = self.get_ike_header(capture[0])
1879 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1880 plain = self.sa.hmac_and_decrypt(ih)
1881 self.assertEqual(plain, b'')
1882 # wait for SA expiration
1884 ike_sas = self.vapi.ikev2_sa_dump()
1885 self.assertEqual(len(ike_sas), 0)
1886 ipsec_sas = self.vapi.ipsec_sa_dump()
1887 self.assertEqual(len(ipsec_sas), 0)
1890 @tag_fixme_vpp_workers
1891 class TestResponderRekey(TestResponderPsk):
1892 """ test ikev2 responder - rekey """
1894 def rekey_from_initiator(self):
1895 packet = self.create_rekey_request()
1896 self.pg0.add_stream(packet)
1897 self.pg0.enable_capture()
1899 capture = self.pg0.get_capture(1)
1900 ih = self.get_ike_header(capture[0])
1901 plain = self.sa.hmac_and_decrypt(ih)
1902 sa = ikev2.IKEv2_payload_SA(plain)
1903 prop = sa[ikev2.IKEv2_payload_Proposal]
1904 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1905 # update new responder SPI
1906 self.sa.child_sas[0].rspi = prop.SPI
1908 def test_responder(self):
1909 super(TestResponderRekey, self).test_responder()
1910 self.rekey_from_initiator()
1911 self.sa.calc_child_keys()
1912 self.verify_ike_sas()
1913 self.verify_ipsec_sas(is_rekey=True)
1914 self.assert_counter(1, 'rekey_req', 'ip4')
1915 r = self.vapi.ikev2_sa_dump()
1916 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1919 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1920 """ test ikev2 responder - non-default table id """
1923 def setUpClass(cls):
1924 import scapy.contrib.ikev2 as _ikev2
1925 globals()['ikev2'] = _ikev2
1926 super(IkePeer, cls).setUpClass()
1927 cls.create_pg_interfaces(range(1))
1928 cls.vapi.cli("ip table add 1")
1929 cls.vapi.cli("set interface ip table pg0 1")
1930 for i in cls.pg_interfaces:
1937 def config_tc(self):
1938 self.config_params({'dpd_disabled': False})
1940 def test_responder(self):
1941 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1942 super(TestResponderVrf, self).test_responder()
1943 self.pg0.enable_capture()
1945 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1946 ih = self.get_ike_header(capture[0])
1947 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1948 plain = self.sa.hmac_and_decrypt(ih)
1949 self.assertEqual(plain, b'')
1952 @tag_fixme_vpp_workers
1953 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1954 """ test ikev2 responder - cert based auth """
1955 def config_tc(self):
1956 self.config_params({
1959 'server-key': 'server-key.pem',
1960 'client-key': 'client-key.pem',
1961 'client-cert': 'client-cert.pem',
1962 'server-cert': 'server-cert.pem'})
1965 @tag_fixme_vpp_workers
1966 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1967 (TemplateResponder, Ikev2Params):
1969 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1971 def config_tc(self):
1972 self.config_params({
1973 'ike-crypto': ('AES-CBC', 16),
1974 'ike-integ': 'SHA2-256-128',
1975 'esp-crypto': ('AES-CBC', 24),
1976 'esp-integ': 'SHA2-384-192',
1977 'ike-dh': '2048MODPgr',
1978 'nonce': os.urandom(256),
1979 'no_idr_in_auth': True})
1982 @tag_fixme_vpp_workers
1983 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1984 (TemplateResponder, Ikev2Params):
1987 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1989 def config_tc(self):
1990 self.config_params({
1991 'ike-crypto': ('AES-CBC', 32),
1992 'ike-integ': 'SHA2-256-128',
1993 'esp-crypto': ('AES-GCM-16ICV', 32),
1994 'esp-integ': 'NULL',
1995 'ike-dh': '3072MODPgr'})
1998 @tag_fixme_vpp_workers
1999 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2004 IKE_NODE_SUFFIX = 'ip6'
2006 def config_tc(self):
2007 self.config_params({
2008 'del_sa_from_responder': True,
2011 'ike-crypto': ('AES-GCM-16ICV', 32),
2012 'ike-integ': 'NULL',
2013 'ike-dh': '2048MODPgr',
2014 'loc_ts': {'start_addr': 'ab:cd::0',
2015 'end_addr': 'ab:cd::10'},
2016 'rem_ts': {'start_addr': '11::0',
2017 'end_addr': '11::100'}})
2020 @tag_fixme_vpp_workers
2021 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2023 Test for keep alive messages
2026 def send_empty_req_from_responder(self):
2027 packet = self.create_empty_request()
2028 self.pg0.add_stream(packet)
2029 self.pg0.enable_capture()
2031 capture = self.pg0.get_capture(1)
2032 ih = self.get_ike_header(capture[0])
2033 self.assertEqual(ih.id, self.sa.msg_id)
2034 plain = self.sa.hmac_and_decrypt(ih)
2035 self.assertEqual(plain, b'')
2036 self.assert_counter(1, 'keepalive', 'ip4')
2037 r = self.vapi.ikev2_sa_dump()
2038 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2040 def test_initiator(self):
2041 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2042 self.send_empty_req_from_responder()
2045 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2046 """ malformed packet test """
2051 def config_tc(self):
2052 self.config_params()
2054 def create_ike_init_msg(self, length=None, payload=None):
2055 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2056 flags='Initiator', exch_type='IKE_SA_INIT')
2057 if payload is not None:
2059 return self.create_packet(self.pg0, msg, self.sa.sport,
2062 def verify_bad_packet_length(self):
2063 ike_msg = self.create_ike_init_msg(length=0xdead)
2064 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2065 self.assert_counter(self.pkt_count, 'bad_length')
2067 def verify_bad_sa_payload_length(self):
2068 p = ikev2.IKEv2_payload_SA(length=0xdead)
2069 ike_msg = self.create_ike_init_msg(payload=p)
2070 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2071 self.assert_counter(self.pkt_count, 'malformed_packet')
2073 def test_responder(self):
2074 self.pkt_count = 254
2075 self.verify_bad_packet_length()
2076 self.verify_bad_sa_payload_length()
2079 if __name__ == '__main__':
2080 unittest.main(testRunner=VppTestRunner)