3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import tag_fixme_vpp_workers
22 from framework import VppTestCase, VppTestRunner
23 from vpp_ikev2 import Profile, IDType, AuthMethod
24 from vpp_papi import VppEnum
31 KEY_PAD = b"Key Pad for IKEv2"
38 # tuple structure is (p, g, key_len)
40 '2048MODPgr': (long_converter("""
41 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
51 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
53 '3072MODPgr': (long_converter("""
54 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
55 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
56 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
57 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
58 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
59 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
60 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
61 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
62 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
63 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
64 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
65 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
66 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
67 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
68 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
69 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
73 class CryptoAlgo(object):
74 def __init__(self, name, cipher, mode):
78 if self.cipher is not None:
79 self.bs = self.cipher.block_size // 8
81 if self.name == 'AES-GCM-16ICV':
82 self.iv_len = GCM_IV_SIZE
86 def encrypt(self, data, key, aad=None):
87 iv = os.urandom(self.iv_len)
89 encryptor = Cipher(self.cipher(key), self.mode(iv),
90 default_backend()).encryptor()
91 return iv + encryptor.update(data) + encryptor.finalize()
93 salt = key[-SALT_SIZE:]
95 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
96 default_backend()).encryptor()
97 encryptor.authenticate_additional_data(aad)
98 data = encryptor.update(data) + encryptor.finalize()
99 data += encryptor.tag[:GCM_ICV_SIZE]
102 def decrypt(self, data, key, aad=None, icv=None):
104 iv = data[:self.iv_len]
105 ct = data[self.iv_len:]
106 decryptor = Cipher(algorithms.AES(key),
108 default_backend()).decryptor()
109 return decryptor.update(ct) + decryptor.finalize()
111 salt = key[-SALT_SIZE:]
112 nonce = salt + data[:GCM_IV_SIZE]
113 ct = data[GCM_IV_SIZE:]
114 key = key[:-SALT_SIZE]
115 decryptor = Cipher(algorithms.AES(key),
116 self.mode(nonce, icv, len(icv)),
117 default_backend()).decryptor()
118 decryptor.authenticate_additional_data(aad)
119 return decryptor.update(ct) + decryptor.finalize()
122 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
123 data = data + b'\x00' * (pad_len - 1)
124 return data + bytes([pad_len - 1])
127 class AuthAlgo(object):
128 def __init__(self, name, mac, mod, key_len, trunc_len=None):
132 self.key_len = key_len
133 self.trunc_len = trunc_len or key_len
137 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
138 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
139 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
144 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
145 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
146 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
147 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
148 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
152 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
153 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
170 class IKEv2ChildSA(object):
171 def __init__(self, local_ts, remote_ts, is_initiator):
179 self.local_ts = local_ts
180 self.remote_ts = remote_ts
183 class IKEv2SA(object):
184 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
185 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
186 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
187 auth_method='shared-key', priv_key=None, i_natt=False,
188 r_natt=False, udp_encap=False):
189 self.udp_encap = udp_encap
199 self.dh_params = None
201 self.priv_key = priv_key
202 self.is_initiator = is_initiator
203 nonce = nonce or os.urandom(32)
204 self.auth_data = auth_data
207 if isinstance(id_type, str):
208 self.id_type = IDType.value(id_type)
210 self.id_type = id_type
211 self.auth_method = auth_method
212 if self.is_initiator:
213 self.rspi = 8 * b'\x00'
218 self.ispi = 8 * b'\x00'
220 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
223 def new_msg_id(self):
228 def my_dh_pub_key(self):
229 if self.is_initiator:
230 return self.i_dh_data
231 return self.r_dh_data
234 def peer_dh_pub_key(self):
235 if self.is_initiator:
236 return self.r_dh_data
237 return self.i_dh_data
241 return self.i_natt or self.r_natt
243 def compute_secret(self):
244 priv = self.dh_private_key
245 peer = self.peer_dh_pub_key
246 p, g, l = self.ike_group
247 return pow(int.from_bytes(peer, 'big'),
248 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
250 def generate_dh_data(self):
252 if self.ike_dh not in DH:
253 raise NotImplementedError('%s not in DH group' % self.ike_dh)
255 if self.dh_params is None:
256 dhg = DH[self.ike_dh]
257 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
258 self.dh_params = pn.parameters(default_backend())
260 priv = self.dh_params.generate_private_key()
261 pub = priv.public_key()
262 x = priv.private_numbers().x
263 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
264 y = pub.public_numbers().y
266 if self.is_initiator:
267 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
269 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
271 def complete_dh_data(self):
272 self.dh_shared_secret = self.compute_secret()
274 def calc_child_keys(self):
275 prf = self.ike_prf_alg.mod()
276 s = self.i_nonce + self.r_nonce
277 c = self.child_sas[0]
279 encr_key_len = self.esp_crypto_key_len
280 integ_key_len = self.esp_integ_alg.key_len
281 salt_len = 0 if integ_key_len else 4
283 l = (integ_key_len * 2 +
286 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
289 c.sk_ei = keymat[pos:pos+encr_key_len]
293 c.sk_ai = keymat[pos:pos+integ_key_len]
296 c.salt_ei = keymat[pos:pos+salt_len]
299 c.sk_er = keymat[pos:pos+encr_key_len]
303 c.sk_ar = keymat[pos:pos+integ_key_len]
306 c.salt_er = keymat[pos:pos+salt_len]
309 def calc_prfplus(self, prf, key, seed, length):
313 while len(r) < length and x < 255:
318 s = s + seed + bytes([x])
319 t = self.calc_prf(prf, key, s)
327 def calc_prf(self, prf, key, data):
328 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
333 prf = self.ike_prf_alg.mod()
334 # SKEYSEED = prf(Ni | Nr, g^ir)
335 s = self.i_nonce + self.r_nonce
336 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
338 # calculate S = Ni | Nr | SPIi SPIr
339 s = s + self.ispi + self.rspi
341 prf_key_trunc = self.ike_prf_alg.trunc_len
342 encr_key_len = self.ike_crypto_key_len
343 tr_prf_key_len = self.ike_prf_alg.key_len
344 integ_key_len = self.ike_integ_alg.key_len
345 if integ_key_len == 0:
355 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
358 self.sk_d = keymat[:pos+prf_key_trunc]
361 self.sk_ai = keymat[pos:pos+integ_key_len]
363 self.sk_ar = keymat[pos:pos+integ_key_len]
366 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
367 pos += encr_key_len + salt_size
368 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
369 pos += encr_key_len + salt_size
371 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
372 pos += tr_prf_key_len
373 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
375 def generate_authmsg(self, prf, packet):
376 if self.is_initiator:
384 data = bytes([self.id_type, 0, 0, 0]) + id
385 id_hash = self.calc_prf(prf, key, data)
386 return packet + nonce + id_hash
389 prf = self.ike_prf_alg.mod()
390 if self.is_initiator:
391 packet = self.init_req_packet
393 packet = self.init_resp_packet
394 authmsg = self.generate_authmsg(prf, raw(packet))
395 if self.auth_method == 'shared-key':
396 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
397 self.auth_data = self.calc_prf(prf, psk, authmsg)
398 elif self.auth_method == 'rsa-sig':
399 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
402 raise TypeError('unknown auth method type!')
404 def encrypt(self, data, aad=None):
405 data = self.ike_crypto_alg.pad(data)
406 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
409 def peer_authkey(self):
410 if self.is_initiator:
415 def my_authkey(self):
416 if self.is_initiator:
421 def my_cryptokey(self):
422 if self.is_initiator:
427 def peer_cryptokey(self):
428 if self.is_initiator:
432 def concat(self, alg, key_len):
433 return alg + '-' + str(key_len * 8)
436 def vpp_ike_cypto_alg(self):
437 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
440 def vpp_esp_cypto_alg(self):
441 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
443 def verify_hmac(self, ikemsg):
444 integ_trunc = self.ike_integ_alg.trunc_len
445 exp_hmac = ikemsg[-integ_trunc:]
446 data = ikemsg[:-integ_trunc]
447 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
448 self.peer_authkey, data)
449 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
451 def compute_hmac(self, integ, key, data):
452 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
456 def decrypt(self, data, aad=None, icv=None):
457 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
459 def hmac_and_decrypt(self, ike):
460 ep = ike[ikev2.IKEv2_payload_Encrypted]
461 if self.ike_crypto == 'AES-GCM-16ICV':
462 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
463 ct = ep.load[:-GCM_ICV_SIZE]
464 tag = ep.load[-GCM_ICV_SIZE:]
465 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
467 self.verify_hmac(raw(ike))
468 integ_trunc = self.ike_integ_alg.trunc_len
470 # remove ICV and decrypt payload
471 ct = ep.load[:-integ_trunc]
472 plain = self.decrypt(ct)
475 return plain[:-pad_len - 1]
477 def build_ts_addr(self, ts, version):
478 return {'starting_address_v' + version: ts['start_addr'],
479 'ending_address_v' + version: ts['end_addr']}
481 def generate_ts(self, is_ip4):
482 c = self.child_sas[0]
483 ts_data = {'IP_protocol_ID': 0,
487 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
488 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
489 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
490 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
492 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
493 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
494 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
495 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
497 if self.is_initiator:
498 return ([ts1], [ts2])
499 return ([ts2], [ts1])
501 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
502 if crypto not in CRYPTO_ALGOS:
503 raise TypeError('unsupported encryption algo %r' % crypto)
504 self.ike_crypto = crypto
505 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
506 self.ike_crypto_key_len = crypto_key_len
508 if integ not in AUTH_ALGOS:
509 raise TypeError('unsupported auth algo %r' % integ)
510 self.ike_integ = None if integ == 'NULL' else integ
511 self.ike_integ_alg = AUTH_ALGOS[integ]
513 if prf not in PRF_ALGOS:
514 raise TypeError('unsupported prf algo %r' % prf)
516 self.ike_prf_alg = PRF_ALGOS[prf]
518 self.ike_group = DH[self.ike_dh]
520 def set_esp_props(self, crypto, crypto_key_len, integ):
521 self.esp_crypto_key_len = crypto_key_len
522 if crypto not in CRYPTO_ALGOS:
523 raise TypeError('unsupported encryption algo %r' % crypto)
524 self.esp_crypto = crypto
525 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
527 if integ not in AUTH_ALGOS:
528 raise TypeError('unsupported auth algo %r' % integ)
529 self.esp_integ = None if integ == 'NULL' else integ
530 self.esp_integ_alg = AUTH_ALGOS[integ]
532 def crypto_attr(self, key_len):
533 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
534 return (0x800e << 16 | key_len << 3, 12)
536 raise Exception('unsupported attribute type')
538 def ike_crypto_attr(self):
539 return self.crypto_attr(self.ike_crypto_key_len)
541 def esp_crypto_attr(self):
542 return self.crypto_attr(self.esp_crypto_key_len)
544 def compute_nat_sha1(self, ip, port, rspi=None):
547 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
548 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
550 return digest.finalize()
553 class IkePeer(VppTestCase):
554 """ common class for initiator and responder """
558 import scapy.contrib.ikev2 as _ikev2
559 globals()['ikev2'] = _ikev2
560 super(IkePeer, cls).setUpClass()
561 cls.create_pg_interfaces(range(2))
562 for i in cls.pg_interfaces:
570 def tearDownClass(cls):
571 super(IkePeer, cls).tearDownClass()
574 super(IkePeer, self).tearDown()
575 if self.del_sa_from_responder:
576 self.initiate_del_sa_from_responder()
578 self.initiate_del_sa_from_initiator()
579 r = self.vapi.ikev2_sa_dump()
580 self.assertEqual(len(r), 0)
581 sas = self.vapi.ipsec_sa_dump()
582 self.assertEqual(len(sas), 0)
583 self.p.remove_vpp_config()
584 self.assertIsNone(self.p.query_vpp_config())
587 super(IkePeer, self).setUp()
589 self.p.add_vpp_config()
590 self.assertIsNotNone(self.p.query_vpp_config())
591 if self.sa.is_initiator:
592 self.sa.generate_dh_data()
593 self.vapi.cli('ikev2 set logging level 4')
594 self.vapi.cli('event-lo clear')
596 def assert_counter(self, count, name, version='ip4'):
597 node_name = '/err/ikev2-%s/' % version + name
598 self.assertEqual(count, self.statistics.get_err_counter(node_name))
600 def create_rekey_request(self):
601 sa, first_payload = self.generate_auth_payload(is_rekey=True)
602 header = ikev2.IKEv2(
603 init_SPI=self.sa.ispi,
604 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
605 flags='Initiator', exch_type='CREATE_CHILD_SA')
607 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
608 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
609 self.sa.dport, self.sa.natt, self.ip6)
611 def create_empty_request(self):
612 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
613 id=self.sa.new_msg_id(), flags='Initiator',
614 exch_type='INFORMATIONAL',
615 next_payload='Encrypted')
617 msg = self.encrypt_ike_msg(header, b'', None)
618 return self.create_packet(self.pg0, msg, self.sa.sport,
619 self.sa.dport, self.sa.natt, self.ip6)
621 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
624 src_ip = src_if.remote_ip6
625 dst_ip = src_if.local_ip6
628 src_ip = src_if.remote_ip4
629 dst_ip = src_if.local_ip4
631 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
632 ip_layer(src=src_ip, dst=dst_ip) /
633 UDP(sport=sport, dport=dport))
635 # insert non ESP marker
636 res = res / Raw(b'\x00' * 4)
639 def verify_udp(self, udp):
640 self.assertEqual(udp.sport, self.sa.sport)
641 self.assertEqual(udp.dport, self.sa.dport)
643 def get_ike_header(self, packet):
645 ih = packet[ikev2.IKEv2]
646 ih = self.verify_and_remove_non_esp_marker(ih)
647 except IndexError as e:
648 # this is a workaround for getting IKEv2 layer as both ikev2 and
649 # ipsec register for port 4500
651 ih = self.verify_and_remove_non_esp_marker(esp)
652 self.assertEqual(ih.version, 0x20)
653 self.assertNotIn('Version', ih.flags)
656 def verify_and_remove_non_esp_marker(self, packet):
658 # if we are in nat traversal mode check for non esp marker
661 self.assertEqual(data[:4], b'\x00' * 4)
662 return ikev2.IKEv2(data[4:])
666 def encrypt_ike_msg(self, header, plain, first_payload):
667 if self.sa.ike_crypto == 'AES-GCM-16ICV':
668 data = self.sa.ike_crypto_alg.pad(raw(plain))
669 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
670 len(ikev2.IKEv2_payload_Encrypted())
671 tlen = plen + len(ikev2.IKEv2())
674 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
678 encr = self.sa.encrypt(raw(plain), raw(res))
679 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
680 length=plen, load=encr)
683 encr = self.sa.encrypt(raw(plain))
684 trunc_len = self.sa.ike_integ_alg.trunc_len
685 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
686 tlen = plen + len(ikev2.IKEv2())
688 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
689 length=plen, load=encr)
693 integ_data = raw(res)
694 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
695 self.sa.my_authkey, integ_data)
696 res = res / Raw(hmac_data[:trunc_len])
697 assert(len(res) == tlen)
700 def verify_udp_encap(self, ipsec_sa):
701 e = VppEnum.vl_api_ipsec_sad_flags_t
702 if self.sa.udp_encap or self.sa.natt:
703 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
705 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
707 def verify_ipsec_sas(self, is_rekey=False):
708 sas = self.vapi.ipsec_sa_dump()
710 # after rekey there is a short period of time in which old
711 # inbound SA is still present
715 self.assertEqual(len(sas), sa_count)
716 if self.sa.is_initiator:
731 c = self.sa.child_sas[0]
733 self.verify_udp_encap(sa0)
734 self.verify_udp_encap(sa1)
735 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
736 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
737 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
739 if self.sa.esp_integ is None:
742 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
743 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
744 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
747 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
748 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
749 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
750 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
754 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
755 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
756 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
757 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
759 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
760 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
762 def verify_keymat(self, api_keys, keys, name):
763 km = getattr(keys, name)
764 api_km = getattr(api_keys, name)
765 api_km_len = getattr(api_keys, name + '_len')
766 self.assertEqual(len(km), api_km_len)
767 self.assertEqual(km, api_km[:api_km_len])
769 def verify_id(self, api_id, exp_id):
770 self.assertEqual(api_id.type, IDType.value(exp_id.type))
771 self.assertEqual(api_id.data_len, exp_id.data_len)
772 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
774 def verify_ike_sas(self):
775 r = self.vapi.ikev2_sa_dump()
776 self.assertEqual(len(r), 1)
778 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
779 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
781 if self.sa.is_initiator:
782 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
783 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
785 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
786 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
788 if self.sa.is_initiator:
789 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
790 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
792 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
793 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
794 self.verify_keymat(sa.keys, self.sa, 'sk_d')
795 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
797 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
798 self.verify_keymat(sa.keys, self.sa, 'sk_er')
799 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
800 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
802 self.assertEqual(sa.i_id.type, self.sa.id_type)
803 self.assertEqual(sa.r_id.type, self.sa.id_type)
804 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
805 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
806 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
807 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
809 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
810 self.assertEqual(len(r), 1)
812 self.assertEqual(csa.sa_index, sa.sa_index)
813 c = self.sa.child_sas[0]
814 if hasattr(c, 'sk_ai'):
815 self.verify_keymat(csa.keys, c, 'sk_ai')
816 self.verify_keymat(csa.keys, c, 'sk_ar')
817 self.verify_keymat(csa.keys, c, 'sk_ei')
818 self.verify_keymat(csa.keys, c, 'sk_er')
819 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
820 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
822 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
825 r = self.vapi.ikev2_traffic_selector_dump(
826 is_initiator=True, sa_index=sa.sa_index,
827 child_sa_index=csa.child_sa_index)
828 self.assertEqual(len(r), 1)
830 self.verify_ts(r[0].ts, tsi[0], True)
832 r = self.vapi.ikev2_traffic_selector_dump(
833 is_initiator=False, sa_index=sa.sa_index,
834 child_sa_index=csa.child_sa_index)
835 self.assertEqual(len(r), 1)
836 self.verify_ts(r[0].ts, tsr[0], False)
838 n = self.vapi.ikev2_nonce_get(is_initiator=True,
839 sa_index=sa.sa_index)
840 self.verify_nonce(n, self.sa.i_nonce)
841 n = self.vapi.ikev2_nonce_get(is_initiator=False,
842 sa_index=sa.sa_index)
843 self.verify_nonce(n, self.sa.r_nonce)
845 def verify_nonce(self, api_nonce, nonce):
846 self.assertEqual(api_nonce.data_len, len(nonce))
847 self.assertEqual(api_nonce.nonce, nonce)
849 def verify_ts(self, api_ts, ts, is_initiator):
851 self.assertTrue(api_ts.is_local)
853 self.assertFalse(api_ts.is_local)
856 self.assertEqual(api_ts.start_addr,
857 IPv4Address(ts.starting_address_v4))
858 self.assertEqual(api_ts.end_addr,
859 IPv4Address(ts.ending_address_v4))
861 self.assertEqual(api_ts.start_addr,
862 IPv6Address(ts.starting_address_v6))
863 self.assertEqual(api_ts.end_addr,
864 IPv6Address(ts.ending_address_v6))
865 self.assertEqual(api_ts.start_port, ts.start_port)
866 self.assertEqual(api_ts.end_port, ts.end_port)
867 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
870 class TemplateInitiator(IkePeer):
871 """ initiator test template """
873 def initiate_del_sa_from_initiator(self):
874 ispi = int.from_bytes(self.sa.ispi, 'little')
875 self.pg0.enable_capture()
877 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
878 capture = self.pg0.get_capture(1)
879 ih = self.get_ike_header(capture[0])
880 self.assertNotIn('Response', ih.flags)
881 self.assertIn('Initiator', ih.flags)
882 self.assertEqual(ih.init_SPI, self.sa.ispi)
883 self.assertEqual(ih.resp_SPI, self.sa.rspi)
884 plain = self.sa.hmac_and_decrypt(ih)
885 d = ikev2.IKEv2_payload_Delete(plain)
886 self.assertEqual(d.proto, 1) # proto=IKEv2
887 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
888 flags='Response', exch_type='INFORMATIONAL',
889 id=ih.id, next_payload='Encrypted')
890 resp = self.encrypt_ike_msg(header, b'', None)
891 self.send_and_assert_no_replies(self.pg0, resp)
893 def verify_del_sa(self, packet):
894 ih = self.get_ike_header(packet)
895 self.assertEqual(ih.id, self.sa.msg_id)
896 self.assertEqual(ih.exch_type, 37) # exchange informational
897 self.assertIn('Response', ih.flags)
898 self.assertIn('Initiator', ih.flags)
899 plain = self.sa.hmac_and_decrypt(ih)
900 self.assertEqual(plain, b'')
902 def initiate_del_sa_from_responder(self):
903 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
904 exch_type='INFORMATIONAL',
905 id=self.sa.new_msg_id())
906 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
907 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
908 packet = self.create_packet(self.pg0, ike_msg,
909 self.sa.sport, self.sa.dport,
910 self.sa.natt, self.ip6)
911 self.pg0.add_stream(packet)
912 self.pg0.enable_capture()
914 capture = self.pg0.get_capture(1)
915 self.verify_del_sa(capture[0])
918 def find_notify_payload(packet, notify_type):
919 n = packet[ikev2.IKEv2_payload_Notify]
921 if n.type == notify_type:
926 def verify_nat_detection(self, packet):
933 # NAT_DETECTION_SOURCE_IP
934 s = self.find_notify_payload(packet, 16388)
935 self.assertIsNotNone(s)
936 src_sha = self.sa.compute_nat_sha1(
937 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
938 self.assertEqual(s.load, src_sha)
940 # NAT_DETECTION_DESTINATION_IP
941 s = self.find_notify_payload(packet, 16389)
942 self.assertIsNotNone(s)
943 dst_sha = self.sa.compute_nat_sha1(
944 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
945 self.assertEqual(s.load, dst_sha)
947 def verify_sa_init_request(self, packet):
949 self.sa.dport = udp.sport
950 ih = packet[ikev2.IKEv2]
951 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
952 self.assertEqual(ih.exch_type, 34) # SA_INIT
953 self.sa.ispi = ih.init_SPI
954 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
955 self.assertIn('Initiator', ih.flags)
956 self.assertNotIn('Response', ih.flags)
957 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
958 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
960 prop = packet[ikev2.IKEv2_payload_Proposal]
961 self.assertEqual(prop.proto, 1) # proto = ikev2
962 self.assertEqual(prop.proposal, 1)
963 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
964 self.assertEqual(prop.trans[0].transform_id,
965 self.p.ike_transforms['crypto_alg'])
966 self.assertEqual(prop.trans[1].transform_type, 2) # prf
967 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
968 self.assertEqual(prop.trans[2].transform_type, 4) # dh
969 self.assertEqual(prop.trans[2].transform_id,
970 self.p.ike_transforms['dh_group'])
972 self.verify_nat_detection(packet)
973 self.sa.set_ike_props(
974 crypto='AES-GCM-16ICV', crypto_key_len=32,
975 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
976 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
977 integ='SHA2-256-128')
978 self.sa.generate_dh_data()
979 self.sa.complete_dh_data()
982 def update_esp_transforms(self, trans, sa):
984 if trans.transform_type == 1: # ecryption
985 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
986 elif trans.transform_type == 3: # integrity
987 sa.esp_integ = INTEG_IDS[trans.transform_id]
988 trans = trans.payload
990 def verify_sa_auth_req(self, packet):
992 self.sa.dport = udp.sport
993 ih = self.get_ike_header(packet)
994 self.assertEqual(ih.resp_SPI, self.sa.rspi)
995 self.assertEqual(ih.init_SPI, self.sa.ispi)
996 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
997 self.assertIn('Initiator', ih.flags)
998 self.assertNotIn('Response', ih.flags)
1001 self.verify_udp(udp)
1002 self.assertEqual(ih.id, self.sa.msg_id + 1)
1004 plain = self.sa.hmac_and_decrypt(ih)
1005 idi = ikev2.IKEv2_payload_IDi(plain)
1006 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1007 self.assertEqual(idi.load, self.sa.i_id)
1008 self.assertEqual(idr.load, self.sa.r_id)
1009 prop = idi[ikev2.IKEv2_payload_Proposal]
1010 c = self.sa.child_sas[0]
1012 self.update_esp_transforms(
1013 prop[ikev2.IKEv2_payload_Transform], self.sa)
1015 def send_init_response(self):
1016 tr_attr = self.sa.ike_crypto_attr()
1017 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1018 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1019 key_length=tr_attr[0]) /
1020 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1021 transform_id=self.sa.ike_integ) /
1022 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1023 transform_id=self.sa.ike_prf_alg.name) /
1024 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1025 transform_id=self.sa.ike_dh))
1026 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1027 trans_nb=4, trans=trans))
1029 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1031 dst_address = b'\x0a\x0a\x0a\x0a'
1033 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1034 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1035 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1037 self.sa.init_resp_packet = (
1038 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1039 exch_type='IKE_SA_INIT', flags='Response') /
1040 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1041 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1042 group=self.sa.ike_dh,
1043 load=self.sa.my_dh_pub_key) /
1044 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1045 next_payload='Notify') /
1046 ikev2.IKEv2_payload_Notify(
1047 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1048 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1049 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1051 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1052 self.sa.sport, self.sa.dport,
1054 self.pg_send(self.pg0, ike_msg)
1055 capture = self.pg0.get_capture(1)
1056 self.verify_sa_auth_req(capture[0])
1058 def initiate_sa_init(self):
1059 self.pg0.enable_capture()
1061 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1063 capture = self.pg0.get_capture(1)
1064 self.verify_sa_init_request(capture[0])
1065 self.send_init_response()
1067 def send_auth_response(self):
1068 tr_attr = self.sa.esp_crypto_attr()
1069 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1070 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1071 key_length=tr_attr[0]) /
1072 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1073 transform_id=self.sa.esp_integ) /
1074 ikev2.IKEv2_payload_Transform(
1075 transform_type='Extended Sequence Number',
1076 transform_id='No ESN') /
1077 ikev2.IKEv2_payload_Transform(
1078 transform_type='Extended Sequence Number',
1079 transform_id='ESN'))
1081 c = self.sa.child_sas[0]
1082 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1083 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1085 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1086 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1087 IDtype=self.sa.id_type, load=self.sa.i_id) /
1088 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1089 IDtype=self.sa.id_type, load=self.sa.r_id) /
1090 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1091 auth_type=AuthMethod.value(self.sa.auth_method),
1092 load=self.sa.auth_data) /
1093 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1094 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1095 number_of_TSs=len(tsi),
1096 traffic_selector=tsi) /
1097 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1098 number_of_TSs=len(tsr),
1099 traffic_selector=tsr) /
1100 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1102 header = ikev2.IKEv2(
1103 init_SPI=self.sa.ispi,
1104 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1105 flags='Response', exch_type='IKE_AUTH')
1107 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1108 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1109 self.sa.dport, self.sa.natt, self.ip6)
1110 self.pg_send(self.pg0, packet)
1112 def test_initiator(self):
1113 self.initiate_sa_init()
1115 self.sa.calc_child_keys()
1116 self.send_auth_response()
1117 self.verify_ike_sas()
1120 class TemplateResponder(IkePeer):
1121 """ responder test template """
1123 def initiate_del_sa_from_responder(self):
1124 self.pg0.enable_capture()
1126 self.vapi.ikev2_initiate_del_ike_sa(
1127 ispi=int.from_bytes(self.sa.ispi, 'little'))
1128 capture = self.pg0.get_capture(1)
1129 ih = self.get_ike_header(capture[0])
1130 self.assertNotIn('Response', ih.flags)
1131 self.assertNotIn('Initiator', ih.flags)
1132 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1133 plain = self.sa.hmac_and_decrypt(ih)
1134 d = ikev2.IKEv2_payload_Delete(plain)
1135 self.assertEqual(d.proto, 1) # proto=IKEv2
1136 self.assertEqual(ih.init_SPI, self.sa.ispi)
1137 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1138 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1139 flags='Initiator+Response',
1140 exch_type='INFORMATIONAL',
1141 id=ih.id, next_payload='Encrypted')
1142 resp = self.encrypt_ike_msg(header, b'', None)
1143 self.send_and_assert_no_replies(self.pg0, resp)
1145 def verify_del_sa(self, packet):
1146 ih = self.get_ike_header(packet)
1147 self.assertEqual(ih.id, self.sa.msg_id)
1148 self.assertEqual(ih.exch_type, 37) # exchange informational
1149 self.assertIn('Response', ih.flags)
1150 self.assertNotIn('Initiator', ih.flags)
1151 self.assertEqual(ih.next_payload, 46) # Encrypted
1152 self.assertEqual(ih.init_SPI, self.sa.ispi)
1153 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1154 plain = self.sa.hmac_and_decrypt(ih)
1155 self.assertEqual(plain, b'')
1157 def initiate_del_sa_from_initiator(self):
1158 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1159 flags='Initiator', exch_type='INFORMATIONAL',
1160 id=self.sa.new_msg_id())
1161 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1162 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1163 packet = self.create_packet(self.pg0, ike_msg,
1164 self.sa.sport, self.sa.dport,
1165 self.sa.natt, self.ip6)
1166 self.pg0.add_stream(packet)
1167 self.pg0.enable_capture()
1169 capture = self.pg0.get_capture(1)
1170 self.verify_del_sa(capture[0])
1172 def send_sa_init_req(self):
1173 tr_attr = self.sa.ike_crypto_attr()
1174 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1175 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1176 key_length=tr_attr[0]) /
1177 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1178 transform_id=self.sa.ike_integ) /
1179 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1180 transform_id=self.sa.ike_prf_alg.name) /
1181 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1182 transform_id=self.sa.ike_dh))
1184 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1185 trans_nb=4, trans=trans))
1187 next_payload = None if self.ip6 else 'Notify'
1189 self.sa.init_req_packet = (
1190 ikev2.IKEv2(init_SPI=self.sa.ispi,
1191 flags='Initiator', exch_type='IKE_SA_INIT') /
1192 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1193 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1194 group=self.sa.ike_dh,
1195 load=self.sa.my_dh_pub_key) /
1196 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1197 load=self.sa.i_nonce))
1201 src_address = b'\x0a\x0a\x0a\x01'
1203 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1206 dst_address = b'\x0a\x0a\x0a\x0a'
1208 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1210 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1211 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1212 nat_src_detection = ikev2.IKEv2_payload_Notify(
1213 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1214 next_payload='Notify')
1215 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1216 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1217 self.sa.init_req_packet = (self.sa.init_req_packet /
1221 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1222 self.sa.sport, self.sa.dport,
1223 self.sa.natt, self.ip6)
1224 self.pg0.add_stream(ike_msg)
1225 self.pg0.enable_capture()
1227 capture = self.pg0.get_capture(1)
1228 self.verify_sa_init(capture[0])
1230 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1231 tr_attr = self.sa.esp_crypto_attr()
1232 last_payload = last_payload or 'Notify'
1233 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1234 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1235 key_length=tr_attr[0]) /
1236 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1237 transform_id=self.sa.esp_integ) /
1238 ikev2.IKEv2_payload_Transform(
1239 transform_type='Extended Sequence Number',
1240 transform_id='No ESN') /
1241 ikev2.IKEv2_payload_Transform(
1242 transform_type='Extended Sequence Number',
1243 transform_id='ESN'))
1245 c = self.sa.child_sas[0]
1246 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1247 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1249 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1250 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1251 auth_type=AuthMethod.value(self.sa.auth_method),
1252 load=self.sa.auth_data) /
1253 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1254 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1255 number_of_TSs=len(tsi), traffic_selector=tsi) /
1256 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1257 number_of_TSs=len(tsr), traffic_selector=tsr))
1260 first_payload = 'Nonce'
1261 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1262 next_payload='SA') / plain /
1263 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1264 proto='ESP', SPI=c.ispi))
1266 first_payload = 'IDi'
1267 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1268 IDtype=self.sa.id_type, load=self.sa.i_id) /
1269 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1270 IDtype=self.sa.id_type, load=self.sa.r_id))
1272 return plain, first_payload
1274 def send_sa_auth(self):
1275 plain, first_payload = self.generate_auth_payload(
1276 last_payload='Notify')
1277 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1278 header = ikev2.IKEv2(
1279 init_SPI=self.sa.ispi,
1280 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1281 flags='Initiator', exch_type='IKE_AUTH')
1283 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1284 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1285 self.sa.dport, self.sa.natt, self.ip6)
1286 self.pg0.add_stream(packet)
1287 self.pg0.enable_capture()
1289 capture = self.pg0.get_capture(1)
1290 self.verify_sa_auth_resp(capture[0])
1292 def verify_sa_init(self, packet):
1293 ih = self.get_ike_header(packet)
1295 self.assertEqual(ih.id, self.sa.msg_id)
1296 self.assertEqual(ih.exch_type, 34)
1297 self.assertIn('Response', ih.flags)
1298 self.assertEqual(ih.init_SPI, self.sa.ispi)
1299 self.assertNotEqual(ih.resp_SPI, 0)
1300 self.sa.rspi = ih.resp_SPI
1302 sa = ih[ikev2.IKEv2_payload_SA]
1303 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1304 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1305 except IndexError as e:
1306 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1307 self.logger.error(ih.show())
1309 self.sa.complete_dh_data()
1313 def verify_sa_auth_resp(self, packet):
1314 ike = self.get_ike_header(packet)
1316 self.verify_udp(udp)
1317 self.assertEqual(ike.id, self.sa.msg_id)
1318 plain = self.sa.hmac_and_decrypt(ike)
1319 idr = ikev2.IKEv2_payload_IDr(plain)
1320 prop = idr[ikev2.IKEv2_payload_Proposal]
1321 self.assertEqual(prop.SPIsize, 4)
1322 self.sa.child_sas[0].rspi = prop.SPI
1323 self.sa.calc_child_keys()
1325 IKE_NODE_SUFFIX = 'ip4'
1327 def verify_counters(self):
1328 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1329 self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1330 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1332 r = self.vapi.ikev2_sa_dump()
1334 self.assertEqual(1, s.n_sa_auth_req)
1335 self.assertEqual(1, s.n_sa_init_req)
1337 def test_responder(self):
1338 self.send_sa_init_req()
1340 self.verify_ipsec_sas()
1341 self.verify_ike_sas()
1342 self.verify_counters()
1345 class Ikev2Params(object):
1346 def config_params(self, params={}):
1347 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1348 ei = VppEnum.vl_api_ipsec_integ_alg_t
1350 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1351 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1352 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1353 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1354 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1355 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1357 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1358 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1359 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1360 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1362 dpd_disabled = True if 'dpd_disabled' not in params else\
1363 params['dpd_disabled']
1365 self.vapi.cli('ikev2 dpd disable')
1366 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1367 not in params else params['del_sa_from_responder']
1368 i_natt = False if 'i_natt' not in params else params['i_natt']
1369 r_natt = False if 'r_natt' not in params else params['r_natt']
1370 self.p = Profile(self, 'pr1')
1371 self.ip6 = False if 'ip6' not in params else params['ip6']
1373 if 'auth' in params and params['auth'] == 'rsa-sig':
1374 auth_method = 'rsa-sig'
1375 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1376 self.vapi.ikev2_set_local_key(
1377 key_file=work_dir + params['server-key'])
1379 client_file = work_dir + params['client-cert']
1380 server_pem = open(work_dir + params['server-cert']).read()
1381 client_priv = open(work_dir + params['client-key']).read()
1382 client_priv = load_pem_private_key(str.encode(client_priv), None,
1384 self.peer_cert = x509.load_pem_x509_certificate(
1385 str.encode(server_pem),
1387 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1390 auth_data = b'$3cr3tpa$$w0rd'
1391 self.p.add_auth(method='shared-key', data=auth_data)
1392 auth_method = 'shared-key'
1395 is_init = True if 'is_initiator' not in params else\
1396 params['is_initiator']
1398 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1399 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1401 self.p.add_local_id(**idr)
1402 self.p.add_remote_id(**idi)
1404 self.p.add_local_id(**idi)
1405 self.p.add_remote_id(**idr)
1407 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1408 'loc_ts' not in params else params['loc_ts']
1409 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1410 'rem_ts' not in params else params['rem_ts']
1411 self.p.add_local_ts(**loc_ts)
1412 self.p.add_remote_ts(**rem_ts)
1413 if 'responder' in params:
1414 self.p.add_responder(params['responder'])
1415 if 'ike_transforms' in params:
1416 self.p.add_ike_transforms(params['ike_transforms'])
1417 if 'esp_transforms' in params:
1418 self.p.add_esp_transforms(params['esp_transforms'])
1420 udp_encap = False if 'udp_encap' not in params else\
1423 self.p.set_udp_encap(True)
1425 if 'responder_hostname' in params:
1426 hn = params['responder_hostname']
1427 self.p.add_responder_hostname(hn)
1429 # configure static dns record
1430 self.vapi.dns_name_server_add_del(
1432 server_address=IPv4Address(u'8.8.8.8').packed)
1433 self.vapi.dns_enable_disable(enable=1)
1435 cmd = "dns cache add {} {}".format(hn['hostname'],
1436 self.pg0.remote_ip4)
1439 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1440 is_initiator=is_init,
1441 id_type=self.p.local_id['id_type'],
1442 i_natt=i_natt, r_natt=r_natt,
1443 priv_key=client_priv, auth_method=auth_method,
1444 auth_data=auth_data, udp_encap=udp_encap,
1445 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1447 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1448 params['ike-crypto']
1449 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1451 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1454 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1455 params['esp-crypto']
1456 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1459 self.sa.set_ike_props(
1460 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1461 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1462 self.sa.set_esp_props(
1463 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1467 class TestApi(VppTestCase):
1468 """ Test IKEV2 API """
1470 def setUpClass(cls):
1471 super(TestApi, cls).setUpClass()
1474 def tearDownClass(cls):
1475 super(TestApi, cls).tearDownClass()
1478 super(TestApi, self).tearDown()
1479 self.p1.remove_vpp_config()
1480 self.p2.remove_vpp_config()
1481 r = self.vapi.ikev2_profile_dump()
1482 self.assertEqual(len(r), 0)
1484 def configure_profile(self, cfg):
1485 p = Profile(self, cfg['name'])
1486 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1487 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1488 p.add_local_ts(**cfg['loc_ts'])
1489 p.add_remote_ts(**cfg['rem_ts'])
1490 p.add_responder(cfg['responder'])
1491 p.add_ike_transforms(cfg['ike_ts'])
1492 p.add_esp_transforms(cfg['esp_ts'])
1493 p.add_auth(**cfg['auth'])
1494 p.set_udp_encap(cfg['udp_encap'])
1495 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1496 if 'lifetime_data' in cfg:
1497 p.set_lifetime_data(cfg['lifetime_data'])
1498 if 'tun_itf' in cfg:
1499 p.set_tunnel_interface(cfg['tun_itf'])
1500 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1505 def test_profile_api(self):
1506 """ test profile dump API """
1511 'start_addr': '3.3.3.2',
1512 'end_addr': '3.3.3.3',
1518 'start_addr': '4.5.76.80',
1519 'end_addr': '2.3.4.6',
1526 'start_addr': 'ab::1',
1527 'end_addr': 'ab::4',
1533 'start_addr': 'cd::12',
1534 'end_addr': 'cd::13',
1540 'natt_disabled': True,
1541 'loc_id': ('fqdn', b'vpp.home'),
1542 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1545 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1548 'crypto_key_size': 32,
1553 'crypto_key_size': 24,
1555 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1557 'ipsec_over_udp_port': 4501,
1560 'lifetime_maxdata': 20192,
1561 'lifetime_jitter': 9,
1566 'loc_id': ('ip4-addr', b'192.168.2.1'),
1567 'rem_id': ('ip6-addr', b'abcd::1'),
1570 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1573 'crypto_key_size': 16,
1578 'crypto_key_size': 24,
1580 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1582 'ipsec_over_udp_port': 4600,
1585 self.p1 = self.configure_profile(conf['p1'])
1586 self.p2 = self.configure_profile(conf['p2'])
1588 r = self.vapi.ikev2_profile_dump()
1589 self.assertEqual(len(r), 2)
1590 self.verify_profile(r[0].profile, conf['p1'])
1591 self.verify_profile(r[1].profile, conf['p2'])
1593 def verify_id(self, api_id, cfg_id):
1594 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1595 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1597 def verify_ts(self, api_ts, cfg_ts):
1598 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1599 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1600 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1601 self.assertEqual(api_ts.start_addr,
1602 ip_address(text_type(cfg_ts['start_addr'])))
1603 self.assertEqual(api_ts.end_addr,
1604 ip_address(text_type(cfg_ts['end_addr'])))
1606 def verify_responder(self, api_r, cfg_r):
1607 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1608 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1610 def verify_transforms(self, api_ts, cfg_ts):
1611 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1612 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1613 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1615 def verify_ike_transforms(self, api_ts, cfg_ts):
1616 self.verify_transforms(api_ts, cfg_ts)
1617 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1619 def verify_esp_transforms(self, api_ts, cfg_ts):
1620 self.verify_transforms(api_ts, cfg_ts)
1622 def verify_auth(self, api_auth, cfg_auth):
1623 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1624 self.assertEqual(api_auth.data, cfg_auth['data'])
1625 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1627 def verify_lifetime_data(self, p, ld):
1628 self.assertEqual(p.lifetime, ld['lifetime'])
1629 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1630 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1631 self.assertEqual(p.handover, ld['handover'])
1633 def verify_profile(self, ap, cp):
1634 self.assertEqual(ap.name, cp['name'])
1635 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1636 self.verify_id(ap.loc_id, cp['loc_id'])
1637 self.verify_id(ap.rem_id, cp['rem_id'])
1638 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1639 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1640 self.verify_responder(ap.responder, cp['responder'])
1641 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1642 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1643 self.verify_auth(ap.auth, cp['auth'])
1644 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1645 self.assertTrue(natt_dis == ap.natt_disabled)
1647 if 'lifetime_data' in cp:
1648 self.verify_lifetime_data(ap, cp['lifetime_data'])
1649 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1651 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1653 self.assertEqual(ap.tun_itf, 0xffffffff)
1656 @tag_fixme_vpp_workers
1657 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1658 """ test responder - responder behind NAT """
1660 IKE_NODE_SUFFIX = 'ip4-natt'
1662 def config_tc(self):
1663 self.config_params({'r_natt': True})
1666 @tag_fixme_vpp_workers
1667 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1668 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1670 def config_tc(self):
1671 self.config_params({
1673 'is_initiator': False, # seen from test case perspective
1674 # thus vpp is initiator
1675 'responder': {'sw_if_index': self.pg0.sw_if_index,
1676 'addr': self.pg0.remote_ip4},
1677 'ike-crypto': ('AES-GCM-16ICV', 32),
1678 'ike-integ': 'NULL',
1679 'ike-dh': '3072MODPgr',
1681 'crypto_alg': 20, # "aes-gcm-16"
1682 'crypto_key_size': 256,
1683 'dh_group': 15, # "modp-3072"
1686 'crypto_alg': 12, # "aes-cbc"
1687 'crypto_key_size': 256,
1688 # "hmac-sha2-256-128"
1692 @tag_fixme_vpp_workers
1693 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1694 """ test ikev2 initiator - pre shared key auth """
1696 def config_tc(self):
1697 self.config_params({
1698 'is_initiator': False, # seen from test case perspective
1699 # thus vpp is initiator
1700 'ike-crypto': ('AES-GCM-16ICV', 32),
1701 'ike-integ': 'NULL',
1702 'ike-dh': '3072MODPgr',
1704 'crypto_alg': 20, # "aes-gcm-16"
1705 'crypto_key_size': 256,
1706 'dh_group': 15, # "modp-3072"
1709 'crypto_alg': 12, # "aes-cbc"
1710 'crypto_key_size': 256,
1711 # "hmac-sha2-256-128"
1713 'responder_hostname': {'hostname': 'vpp.responder.org',
1714 'sw_if_index': self.pg0.sw_if_index}})
1717 @tag_fixme_vpp_workers
1718 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1719 """ test initiator - request window size (1) """
1721 def rekey_respond(self, req, update_child_sa_data):
1722 ih = self.get_ike_header(req)
1723 plain = self.sa.hmac_and_decrypt(ih)
1724 sa = ikev2.IKEv2_payload_SA(plain)
1725 if update_child_sa_data:
1726 prop = sa[ikev2.IKEv2_payload_Proposal]
1727 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1728 self.sa.r_nonce = self.sa.i_nonce
1729 self.sa.child_sas[0].ispi = prop.SPI
1730 self.sa.child_sas[0].rspi = prop.SPI
1731 self.sa.calc_child_keys()
1733 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1734 flags='Response', exch_type=36,
1735 id=ih.id, next_payload='Encrypted')
1736 resp = self.encrypt_ike_msg(header, sa, 'SA')
1737 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1738 self.sa.dport, self.sa.natt, self.ip6)
1739 self.send_and_assert_no_replies(self.pg0, packet)
1741 def test_initiator(self):
1742 super(TestInitiatorRequestWindowSize, self).test_initiator()
1743 self.pg0.enable_capture()
1745 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1746 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1747 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1748 capture = self.pg0.get_capture(2)
1750 # reply in reverse order
1751 self.rekey_respond(capture[1], True)
1752 self.rekey_respond(capture[0], False)
1754 # verify that only the second request was accepted
1755 self.verify_ike_sas()
1756 self.verify_ipsec_sas(is_rekey=True)
1759 @tag_fixme_vpp_workers
1760 class TestInitiatorRekey(TestInitiatorPsk):
1761 """ test ikev2 initiator - rekey """
1763 def rekey_from_initiator(self):
1764 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1765 self.pg0.enable_capture()
1767 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1768 capture = self.pg0.get_capture(1)
1769 ih = self.get_ike_header(capture[0])
1770 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1771 self.assertNotIn('Response', ih.flags)
1772 self.assertIn('Initiator', ih.flags)
1773 plain = self.sa.hmac_and_decrypt(ih)
1774 sa = ikev2.IKEv2_payload_SA(plain)
1775 prop = sa[ikev2.IKEv2_payload_Proposal]
1776 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1777 self.sa.r_nonce = self.sa.i_nonce
1778 # update new responder SPI
1779 self.sa.child_sas[0].ispi = prop.SPI
1780 self.sa.child_sas[0].rspi = prop.SPI
1781 self.sa.calc_child_keys()
1782 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1783 flags='Response', exch_type=36,
1784 id=ih.id, next_payload='Encrypted')
1785 resp = self.encrypt_ike_msg(header, sa, 'SA')
1786 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1787 self.sa.dport, self.sa.natt, self.ip6)
1788 self.send_and_assert_no_replies(self.pg0, packet)
1790 def test_initiator(self):
1791 super(TestInitiatorRekey, self).test_initiator()
1792 self.rekey_from_initiator()
1793 self.verify_ike_sas()
1794 self.verify_ipsec_sas(is_rekey=True)
1797 @tag_fixme_vpp_workers
1798 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1799 """ test ikev2 initiator - delete IKE SA from responder """
1801 def config_tc(self):
1802 self.config_params({
1803 'del_sa_from_responder': True,
1804 'is_initiator': False, # seen from test case perspective
1805 # thus vpp is initiator
1806 'responder': {'sw_if_index': self.pg0.sw_if_index,
1807 'addr': self.pg0.remote_ip4},
1808 'ike-crypto': ('AES-GCM-16ICV', 32),
1809 'ike-integ': 'NULL',
1810 'ike-dh': '3072MODPgr',
1812 'crypto_alg': 20, # "aes-gcm-16"
1813 'crypto_key_size': 256,
1814 'dh_group': 15, # "modp-3072"
1817 'crypto_alg': 12, # "aes-cbc"
1818 'crypto_key_size': 256,
1819 # "hmac-sha2-256-128"
1823 @tag_fixme_vpp_workers
1824 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1825 """ test ikev2 responder - initiator behind NAT """
1827 IKE_NODE_SUFFIX = 'ip4-natt'
1829 def config_tc(self):
1834 @tag_fixme_vpp_workers
1835 class TestResponderPsk(TemplateResponder, Ikev2Params):
1836 """ test ikev2 responder - pre shared key auth """
1837 def config_tc(self):
1838 self.config_params()
1841 @tag_fixme_vpp_workers
1842 class TestResponderDpd(TestResponderPsk):
1844 Dead peer detection test
1846 def config_tc(self):
1847 self.config_params({'dpd_disabled': False})
1852 def test_responder(self):
1853 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1854 super(TestResponderDpd, self).test_responder()
1855 self.pg0.enable_capture()
1857 # capture empty request but don't reply
1858 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1859 ih = self.get_ike_header(capture[0])
1860 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1861 plain = self.sa.hmac_and_decrypt(ih)
1862 self.assertEqual(plain, b'')
1863 # wait for SA expiration
1865 ike_sas = self.vapi.ikev2_sa_dump()
1866 self.assertEqual(len(ike_sas), 0)
1867 ipsec_sas = self.vapi.ipsec_sa_dump()
1868 self.assertEqual(len(ipsec_sas), 0)
1871 @tag_fixme_vpp_workers
1872 class TestResponderRekey(TestResponderPsk):
1873 """ test ikev2 responder - rekey """
1875 def rekey_from_initiator(self):
1876 packet = self.create_rekey_request()
1877 self.pg0.add_stream(packet)
1878 self.pg0.enable_capture()
1880 capture = self.pg0.get_capture(1)
1881 ih = self.get_ike_header(capture[0])
1882 plain = self.sa.hmac_and_decrypt(ih)
1883 sa = ikev2.IKEv2_payload_SA(plain)
1884 prop = sa[ikev2.IKEv2_payload_Proposal]
1885 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1886 # update new responder SPI
1887 self.sa.child_sas[0].rspi = prop.SPI
1889 def test_responder(self):
1890 super(TestResponderRekey, self).test_responder()
1891 self.rekey_from_initiator()
1892 self.sa.calc_child_keys()
1893 self.verify_ike_sas()
1894 self.verify_ipsec_sas(is_rekey=True)
1895 self.assert_counter(1, 'rekey_req', 'ip4')
1896 r = self.vapi.ikev2_sa_dump()
1897 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1900 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1901 """ test ikev2 responder - non-default table id """
1904 def setUpClass(cls):
1905 import scapy.contrib.ikev2 as _ikev2
1906 globals()['ikev2'] = _ikev2
1907 super(IkePeer, cls).setUpClass()
1908 cls.create_pg_interfaces(range(1))
1909 cls.vapi.cli("ip table add 1")
1910 cls.vapi.cli("set interface ip table pg0 1")
1911 for i in cls.pg_interfaces:
1918 def config_tc(self):
1919 self.config_params({'dpd_disabled': False})
1921 def test_responder(self):
1922 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1923 super(TestResponderVrf, self).test_responder()
1924 self.pg0.enable_capture()
1926 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1927 ih = self.get_ike_header(capture[0])
1928 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1929 plain = self.sa.hmac_and_decrypt(ih)
1930 self.assertEqual(plain, b'')
1933 @tag_fixme_vpp_workers
1934 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1935 """ test ikev2 responder - cert based auth """
1936 def config_tc(self):
1937 self.config_params({
1940 'server-key': 'server-key.pem',
1941 'client-key': 'client-key.pem',
1942 'client-cert': 'client-cert.pem',
1943 'server-cert': 'server-cert.pem'})
1946 @tag_fixme_vpp_workers
1947 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1948 (TemplateResponder, Ikev2Params):
1950 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1952 def config_tc(self):
1953 self.config_params({
1954 'ike-crypto': ('AES-CBC', 16),
1955 'ike-integ': 'SHA2-256-128',
1956 'esp-crypto': ('AES-CBC', 24),
1957 'esp-integ': 'SHA2-384-192',
1958 'ike-dh': '2048MODPgr'})
1961 @tag_fixme_vpp_workers
1962 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1963 (TemplateResponder, Ikev2Params):
1966 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1968 def config_tc(self):
1969 self.config_params({
1970 'ike-crypto': ('AES-CBC', 32),
1971 'ike-integ': 'SHA2-256-128',
1972 'esp-crypto': ('AES-GCM-16ICV', 32),
1973 'esp-integ': 'NULL',
1974 'ike-dh': '3072MODPgr'})
1977 @tag_fixme_vpp_workers
1978 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1983 IKE_NODE_SUFFIX = 'ip6'
1985 def config_tc(self):
1986 self.config_params({
1987 'del_sa_from_responder': True,
1990 'ike-crypto': ('AES-GCM-16ICV', 32),
1991 'ike-integ': 'NULL',
1992 'ike-dh': '2048MODPgr',
1993 'loc_ts': {'start_addr': 'ab:cd::0',
1994 'end_addr': 'ab:cd::10'},
1995 'rem_ts': {'start_addr': '11::0',
1996 'end_addr': '11::100'}})
1999 @tag_fixme_vpp_workers
2000 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2002 Test for keep alive messages
2005 def send_empty_req_from_responder(self):
2006 packet = self.create_empty_request()
2007 self.pg0.add_stream(packet)
2008 self.pg0.enable_capture()
2010 capture = self.pg0.get_capture(1)
2011 ih = self.get_ike_header(capture[0])
2012 self.assertEqual(ih.id, self.sa.msg_id)
2013 plain = self.sa.hmac_and_decrypt(ih)
2014 self.assertEqual(plain, b'')
2015 self.assert_counter(1, 'keepalive', 'ip4')
2016 r = self.vapi.ikev2_sa_dump()
2017 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2019 def test_initiator(self):
2020 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2021 self.send_empty_req_from_responder()
2024 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2025 """ malformed packet test """
2030 def config_tc(self):
2031 self.config_params()
2033 def create_ike_init_msg(self, length=None, payload=None):
2034 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2035 flags='Initiator', exch_type='IKE_SA_INIT')
2036 if payload is not None:
2038 return self.create_packet(self.pg0, msg, self.sa.sport,
2041 def verify_bad_packet_length(self):
2042 ike_msg = self.create_ike_init_msg(length=0xdead)
2043 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2044 self.assert_counter(self.pkt_count, 'bad_length')
2046 def verify_bad_sa_payload_length(self):
2047 p = ikev2.IKEv2_payload_SA(length=0xdead)
2048 ike_msg = self.create_ike_init_msg(payload=p)
2049 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2050 self.assert_counter(self.pkt_count, 'malformed_packet')
2052 def test_responder(self):
2053 self.pkt_count = 254
2054 self.verify_bad_packet_length()
2055 self.verify_bad_sa_payload_length()
2058 if __name__ == '__main__':
2059 unittest.main(testRunner=VppTestRunner)