3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
32 KEY_PAD = b"Key Pad for IKEv2"
39 # tuple structure is (p, g, key_len)
41 '2048MODPgr': (long_converter("""
42 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
43 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
44 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
45 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
46 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
47 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
48 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
49 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
50 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
51 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
52 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
54 '3072MODPgr': (long_converter("""
55 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
56 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
57 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
58 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
59 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
60 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
61 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
62 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
63 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
64 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
65 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
66 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
67 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
68 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
69 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
70 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
74 class CryptoAlgo(object):
75 def __init__(self, name, cipher, mode):
79 if self.cipher is not None:
80 self.bs = self.cipher.block_size // 8
82 if self.name == 'AES-GCM-16ICV':
83 self.iv_len = GCM_IV_SIZE
87 def encrypt(self, data, key, aad=None):
88 iv = os.urandom(self.iv_len)
90 encryptor = Cipher(self.cipher(key), self.mode(iv),
91 default_backend()).encryptor()
92 return iv + encryptor.update(data) + encryptor.finalize()
94 salt = key[-SALT_SIZE:]
96 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
97 default_backend()).encryptor()
98 encryptor.authenticate_additional_data(aad)
99 data = encryptor.update(data) + encryptor.finalize()
100 data += encryptor.tag[:GCM_ICV_SIZE]
103 def decrypt(self, data, key, aad=None, icv=None):
105 iv = data[:self.iv_len]
106 ct = data[self.iv_len:]
107 decryptor = Cipher(algorithms.AES(key),
109 default_backend()).decryptor()
110 return decryptor.update(ct) + decryptor.finalize()
112 salt = key[-SALT_SIZE:]
113 nonce = salt + data[:GCM_IV_SIZE]
114 ct = data[GCM_IV_SIZE:]
115 key = key[:-SALT_SIZE]
116 decryptor = Cipher(algorithms.AES(key),
117 self.mode(nonce, icv, len(icv)),
118 default_backend()).decryptor()
119 decryptor.authenticate_additional_data(aad)
120 return decryptor.update(ct) + decryptor.finalize()
123 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
124 data = data + b'\x00' * (pad_len - 1)
125 return data + bytes([pad_len - 1])
128 class AuthAlgo(object):
129 def __init__(self, name, mac, mod, key_len, trunc_len=None):
133 self.key_len = key_len
134 self.trunc_len = trunc_len or key_len
138 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
139 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
140 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
145 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
147 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
148 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
149 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
153 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
154 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
171 class IKEv2ChildSA(object):
172 def __init__(self, local_ts, remote_ts, is_initiator):
180 self.local_ts = local_ts
181 self.remote_ts = remote_ts
184 class IKEv2SA(object):
185 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
186 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
187 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
188 auth_method='shared-key', priv_key=None, i_natt=False,
189 r_natt=False, udp_encap=False):
190 self.udp_encap = udp_encap
200 self.dh_params = None
202 self.priv_key = priv_key
203 self.is_initiator = is_initiator
204 nonce = nonce or os.urandom(32)
205 self.auth_data = auth_data
208 if isinstance(id_type, str):
209 self.id_type = IDType.value(id_type)
211 self.id_type = id_type
212 self.auth_method = auth_method
213 if self.is_initiator:
214 self.rspi = 8 * b'\x00'
219 self.ispi = 8 * b'\x00'
221 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
224 def new_msg_id(self):
229 def my_dh_pub_key(self):
230 if self.is_initiator:
231 return self.i_dh_data
232 return self.r_dh_data
235 def peer_dh_pub_key(self):
236 if self.is_initiator:
237 return self.r_dh_data
238 return self.i_dh_data
242 return self.i_natt or self.r_natt
244 def compute_secret(self):
245 priv = self.dh_private_key
246 peer = self.peer_dh_pub_key
247 p, g, l = self.ike_group
248 return pow(int.from_bytes(peer, 'big'),
249 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
251 def generate_dh_data(self):
253 if self.ike_dh not in DH:
254 raise NotImplementedError('%s not in DH group' % self.ike_dh)
256 if self.dh_params is None:
257 dhg = DH[self.ike_dh]
258 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
259 self.dh_params = pn.parameters(default_backend())
261 priv = self.dh_params.generate_private_key()
262 pub = priv.public_key()
263 x = priv.private_numbers().x
264 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
265 y = pub.public_numbers().y
267 if self.is_initiator:
268 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
270 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
272 def complete_dh_data(self):
273 self.dh_shared_secret = self.compute_secret()
275 def calc_child_keys(self):
276 prf = self.ike_prf_alg.mod()
277 s = self.i_nonce + self.r_nonce
278 c = self.child_sas[0]
280 encr_key_len = self.esp_crypto_key_len
281 integ_key_len = self.esp_integ_alg.key_len
282 salt_len = 0 if integ_key_len else 4
284 l = (integ_key_len * 2 +
287 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
290 c.sk_ei = keymat[pos:pos+encr_key_len]
294 c.sk_ai = keymat[pos:pos+integ_key_len]
297 c.salt_ei = keymat[pos:pos+salt_len]
300 c.sk_er = keymat[pos:pos+encr_key_len]
304 c.sk_ar = keymat[pos:pos+integ_key_len]
307 c.salt_er = keymat[pos:pos+salt_len]
310 def calc_prfplus(self, prf, key, seed, length):
314 while len(r) < length and x < 255:
319 s = s + seed + bytes([x])
320 t = self.calc_prf(prf, key, s)
328 def calc_prf(self, prf, key, data):
329 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
334 prf = self.ike_prf_alg.mod()
335 # SKEYSEED = prf(Ni | Nr, g^ir)
336 s = self.i_nonce + self.r_nonce
337 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
339 # calculate S = Ni | Nr | SPIi SPIr
340 s = s + self.ispi + self.rspi
342 prf_key_trunc = self.ike_prf_alg.trunc_len
343 encr_key_len = self.ike_crypto_key_len
344 tr_prf_key_len = self.ike_prf_alg.key_len
345 integ_key_len = self.ike_integ_alg.key_len
346 if integ_key_len == 0:
356 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
359 self.sk_d = keymat[:pos+prf_key_trunc]
362 self.sk_ai = keymat[pos:pos+integ_key_len]
364 self.sk_ar = keymat[pos:pos+integ_key_len]
367 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
368 pos += encr_key_len + salt_size
369 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
370 pos += encr_key_len + salt_size
372 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
373 pos += tr_prf_key_len
374 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
376 def generate_authmsg(self, prf, packet):
377 if self.is_initiator:
385 data = bytes([self.id_type, 0, 0, 0]) + id
386 id_hash = self.calc_prf(prf, key, data)
387 return packet + nonce + id_hash
390 prf = self.ike_prf_alg.mod()
391 if self.is_initiator:
392 packet = self.init_req_packet
394 packet = self.init_resp_packet
395 authmsg = self.generate_authmsg(prf, raw(packet))
396 if self.auth_method == 'shared-key':
397 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
398 self.auth_data = self.calc_prf(prf, psk, authmsg)
399 elif self.auth_method == 'rsa-sig':
400 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
403 raise TypeError('unknown auth method type!')
405 def encrypt(self, data, aad=None):
406 data = self.ike_crypto_alg.pad(data)
407 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
410 def peer_authkey(self):
411 if self.is_initiator:
416 def my_authkey(self):
417 if self.is_initiator:
422 def my_cryptokey(self):
423 if self.is_initiator:
428 def peer_cryptokey(self):
429 if self.is_initiator:
433 def concat(self, alg, key_len):
434 return alg + '-' + str(key_len * 8)
437 def vpp_ike_cypto_alg(self):
438 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
441 def vpp_esp_cypto_alg(self):
442 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
444 def verify_hmac(self, ikemsg):
445 integ_trunc = self.ike_integ_alg.trunc_len
446 exp_hmac = ikemsg[-integ_trunc:]
447 data = ikemsg[:-integ_trunc]
448 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
449 self.peer_authkey, data)
450 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
452 def compute_hmac(self, integ, key, data):
453 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
457 def decrypt(self, data, aad=None, icv=None):
458 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
460 def hmac_and_decrypt(self, ike):
461 ep = ike[ikev2.IKEv2_payload_Encrypted]
462 if self.ike_crypto == 'AES-GCM-16ICV':
463 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
464 ct = ep.load[:-GCM_ICV_SIZE]
465 tag = ep.load[-GCM_ICV_SIZE:]
466 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
468 self.verify_hmac(raw(ike))
469 integ_trunc = self.ike_integ_alg.trunc_len
471 # remove ICV and decrypt payload
472 ct = ep.load[:-integ_trunc]
473 plain = self.decrypt(ct)
476 return plain[:-pad_len - 1]
478 def build_ts_addr(self, ts, version):
479 return {'starting_address_v' + version: ts['start_addr'],
480 'ending_address_v' + version: ts['end_addr']}
482 def generate_ts(self, is_ip4):
483 c = self.child_sas[0]
484 ts_data = {'IP_protocol_ID': 0,
488 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
489 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
490 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
491 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
493 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
494 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
495 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
496 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
498 if self.is_initiator:
499 return ([ts1], [ts2])
500 return ([ts2], [ts1])
502 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
503 if crypto not in CRYPTO_ALGOS:
504 raise TypeError('unsupported encryption algo %r' % crypto)
505 self.ike_crypto = crypto
506 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
507 self.ike_crypto_key_len = crypto_key_len
509 if integ not in AUTH_ALGOS:
510 raise TypeError('unsupported auth algo %r' % integ)
511 self.ike_integ = None if integ == 'NULL' else integ
512 self.ike_integ_alg = AUTH_ALGOS[integ]
514 if prf not in PRF_ALGOS:
515 raise TypeError('unsupported prf algo %r' % prf)
517 self.ike_prf_alg = PRF_ALGOS[prf]
519 self.ike_group = DH[self.ike_dh]
521 def set_esp_props(self, crypto, crypto_key_len, integ):
522 self.esp_crypto_key_len = crypto_key_len
523 if crypto not in CRYPTO_ALGOS:
524 raise TypeError('unsupported encryption algo %r' % crypto)
525 self.esp_crypto = crypto
526 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
528 if integ not in AUTH_ALGOS:
529 raise TypeError('unsupported auth algo %r' % integ)
530 self.esp_integ = None if integ == 'NULL' else integ
531 self.esp_integ_alg = AUTH_ALGOS[integ]
533 def crypto_attr(self, key_len):
534 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
535 return (0x800e << 16 | key_len << 3, 12)
537 raise Exception('unsupported attribute type')
539 def ike_crypto_attr(self):
540 return self.crypto_attr(self.ike_crypto_key_len)
542 def esp_crypto_attr(self):
543 return self.crypto_attr(self.esp_crypto_key_len)
545 def compute_nat_sha1(self, ip, port, rspi=None):
548 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
549 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
551 return digest.finalize()
554 class IkePeer(VppTestCase):
555 """ common class for initiator and responder """
559 import scapy.contrib.ikev2 as _ikev2
560 globals()['ikev2'] = _ikev2
561 super(IkePeer, cls).setUpClass()
562 cls.create_pg_interfaces(range(2))
563 for i in cls.pg_interfaces:
571 def tearDownClass(cls):
572 super(IkePeer, cls).tearDownClass()
575 super(IkePeer, self).tearDown()
576 if self.del_sa_from_responder:
577 self.initiate_del_sa_from_responder()
579 self.initiate_del_sa_from_initiator()
580 r = self.vapi.ikev2_sa_dump()
581 self.assertEqual(len(r), 0)
582 sas = self.vapi.ipsec_sa_dump()
583 self.assertEqual(len(sas), 0)
584 self.p.remove_vpp_config()
585 self.assertIsNone(self.p.query_vpp_config())
588 super(IkePeer, self).setUp()
590 self.p.add_vpp_config()
591 self.assertIsNotNone(self.p.query_vpp_config())
592 if self.sa.is_initiator:
593 self.sa.generate_dh_data()
594 self.vapi.cli('ikev2 set logging level 4')
595 self.vapi.cli('event-lo clear')
597 def assert_counter(self, count, name, version='ip4'):
598 node_name = '/err/ikev2-%s/' % version + name
599 self.assertEqual(count, self.statistics.get_err_counter(node_name))
601 def create_rekey_request(self):
602 sa, first_payload = self.generate_auth_payload(is_rekey=True)
603 header = ikev2.IKEv2(
604 init_SPI=self.sa.ispi,
605 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
606 flags='Initiator', exch_type='CREATE_CHILD_SA')
608 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
609 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
610 self.sa.dport, self.sa.natt, self.ip6)
612 def create_empty_request(self):
613 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
614 id=self.sa.new_msg_id(), flags='Initiator',
615 exch_type='INFORMATIONAL',
616 next_payload='Encrypted')
618 msg = self.encrypt_ike_msg(header, b'', None)
619 return self.create_packet(self.pg0, msg, self.sa.sport,
620 self.sa.dport, self.sa.natt, self.ip6)
622 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
625 src_ip = src_if.remote_ip6
626 dst_ip = src_if.local_ip6
629 src_ip = src_if.remote_ip4
630 dst_ip = src_if.local_ip4
632 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
633 ip_layer(src=src_ip, dst=dst_ip) /
634 UDP(sport=sport, dport=dport))
636 # insert non ESP marker
637 res = res / Raw(b'\x00' * 4)
640 def verify_udp(self, udp):
641 self.assertEqual(udp.sport, self.sa.sport)
642 self.assertEqual(udp.dport, self.sa.dport)
644 def get_ike_header(self, packet):
646 ih = packet[ikev2.IKEv2]
647 ih = self.verify_and_remove_non_esp_marker(ih)
648 except IndexError as e:
649 # this is a workaround for getting IKEv2 layer as both ikev2 and
650 # ipsec register for port 4500
652 ih = self.verify_and_remove_non_esp_marker(esp)
653 self.assertEqual(ih.version, 0x20)
654 self.assertNotIn('Version', ih.flags)
657 def verify_and_remove_non_esp_marker(self, packet):
659 # if we are in nat traversal mode check for non esp marker
662 self.assertEqual(data[:4], b'\x00' * 4)
663 return ikev2.IKEv2(data[4:])
667 def encrypt_ike_msg(self, header, plain, first_payload):
668 if self.sa.ike_crypto == 'AES-GCM-16ICV':
669 data = self.sa.ike_crypto_alg.pad(raw(plain))
670 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
671 len(ikev2.IKEv2_payload_Encrypted())
672 tlen = plen + len(ikev2.IKEv2())
675 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
679 encr = self.sa.encrypt(raw(plain), raw(res))
680 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
681 length=plen, load=encr)
684 encr = self.sa.encrypt(raw(plain))
685 trunc_len = self.sa.ike_integ_alg.trunc_len
686 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
687 tlen = plen + len(ikev2.IKEv2())
689 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
690 length=plen, load=encr)
694 integ_data = raw(res)
695 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
696 self.sa.my_authkey, integ_data)
697 res = res / Raw(hmac_data[:trunc_len])
698 assert(len(res) == tlen)
701 def verify_udp_encap(self, ipsec_sa):
702 e = VppEnum.vl_api_ipsec_sad_flags_t
703 if self.sa.udp_encap or self.sa.natt:
704 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
708 def verify_ipsec_sas(self, is_rekey=False):
709 sas = self.vapi.ipsec_sa_dump()
711 # after rekey there is a short period of time in which old
712 # inbound SA is still present
716 self.assertEqual(len(sas), sa_count)
717 if self.sa.is_initiator:
732 c = self.sa.child_sas[0]
734 self.verify_udp_encap(sa0)
735 self.verify_udp_encap(sa1)
736 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
737 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
738 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
740 if self.sa.esp_integ is None:
743 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
744 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
745 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
748 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
749 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
750 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
751 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
755 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
756 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
757 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
758 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
760 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
761 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
763 def verify_keymat(self, api_keys, keys, name):
764 km = getattr(keys, name)
765 api_km = getattr(api_keys, name)
766 api_km_len = getattr(api_keys, name + '_len')
767 self.assertEqual(len(km), api_km_len)
768 self.assertEqual(km, api_km[:api_km_len])
770 def verify_id(self, api_id, exp_id):
771 self.assertEqual(api_id.type, IDType.value(exp_id.type))
772 self.assertEqual(api_id.data_len, exp_id.data_len)
773 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
775 def verify_ike_sas(self):
776 r = self.vapi.ikev2_sa_dump()
777 self.assertEqual(len(r), 1)
779 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
780 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
782 if self.sa.is_initiator:
783 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
784 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
786 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
787 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
789 if self.sa.is_initiator:
790 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
791 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
793 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
794 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
795 self.verify_keymat(sa.keys, self.sa, 'sk_d')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
797 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
798 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
799 self.verify_keymat(sa.keys, self.sa, 'sk_er')
800 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
801 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
803 self.assertEqual(sa.i_id.type, self.sa.id_type)
804 self.assertEqual(sa.r_id.type, self.sa.id_type)
805 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
806 self.assertEqual(sa.r_id.data_len, len(self.idr))
807 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
808 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr)
810 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
811 self.assertEqual(len(r), 1)
813 self.assertEqual(csa.sa_index, sa.sa_index)
814 c = self.sa.child_sas[0]
815 if hasattr(c, 'sk_ai'):
816 self.verify_keymat(csa.keys, c, 'sk_ai')
817 self.verify_keymat(csa.keys, c, 'sk_ar')
818 self.verify_keymat(csa.keys, c, 'sk_ei')
819 self.verify_keymat(csa.keys, c, 'sk_er')
820 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
821 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
823 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
826 r = self.vapi.ikev2_traffic_selector_dump(
827 is_initiator=True, sa_index=sa.sa_index,
828 child_sa_index=csa.child_sa_index)
829 self.assertEqual(len(r), 1)
831 self.verify_ts(r[0].ts, tsi[0], True)
833 r = self.vapi.ikev2_traffic_selector_dump(
834 is_initiator=False, sa_index=sa.sa_index,
835 child_sa_index=csa.child_sa_index)
836 self.assertEqual(len(r), 1)
837 self.verify_ts(r[0].ts, tsr[0], False)
839 n = self.vapi.ikev2_nonce_get(is_initiator=True,
840 sa_index=sa.sa_index)
841 self.verify_nonce(n, self.sa.i_nonce)
842 n = self.vapi.ikev2_nonce_get(is_initiator=False,
843 sa_index=sa.sa_index)
844 self.verify_nonce(n, self.sa.r_nonce)
846 def verify_nonce(self, api_nonce, nonce):
847 self.assertEqual(api_nonce.data_len, len(nonce))
848 self.assertEqual(api_nonce.nonce, nonce)
850 def verify_ts(self, api_ts, ts, is_initiator):
852 self.assertTrue(api_ts.is_local)
854 self.assertFalse(api_ts.is_local)
857 self.assertEqual(api_ts.start_addr,
858 IPv4Address(ts.starting_address_v4))
859 self.assertEqual(api_ts.end_addr,
860 IPv4Address(ts.ending_address_v4))
862 self.assertEqual(api_ts.start_addr,
863 IPv6Address(ts.starting_address_v6))
864 self.assertEqual(api_ts.end_addr,
865 IPv6Address(ts.ending_address_v6))
866 self.assertEqual(api_ts.start_port, ts.start_port)
867 self.assertEqual(api_ts.end_port, ts.end_port)
868 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
871 class TemplateInitiator(IkePeer):
872 """ initiator test template """
874 def initiate_del_sa_from_initiator(self):
875 ispi = int.from_bytes(self.sa.ispi, 'little')
876 self.pg0.enable_capture()
878 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
879 capture = self.pg0.get_capture(1)
880 ih = self.get_ike_header(capture[0])
881 self.assertNotIn('Response', ih.flags)
882 self.assertIn('Initiator', ih.flags)
883 self.assertEqual(ih.init_SPI, self.sa.ispi)
884 self.assertEqual(ih.resp_SPI, self.sa.rspi)
885 plain = self.sa.hmac_and_decrypt(ih)
886 d = ikev2.IKEv2_payload_Delete(plain)
887 self.assertEqual(d.proto, 1) # proto=IKEv2
888 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
889 flags='Response', exch_type='INFORMATIONAL',
890 id=ih.id, next_payload='Encrypted')
891 resp = self.encrypt_ike_msg(header, b'', None)
892 self.send_and_assert_no_replies(self.pg0, resp)
894 def verify_del_sa(self, packet):
895 ih = self.get_ike_header(packet)
896 self.assertEqual(ih.id, self.sa.msg_id)
897 self.assertEqual(ih.exch_type, 37) # exchange informational
898 self.assertIn('Response', ih.flags)
899 self.assertIn('Initiator', ih.flags)
900 plain = self.sa.hmac_and_decrypt(ih)
901 self.assertEqual(plain, b'')
903 def initiate_del_sa_from_responder(self):
904 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
905 exch_type='INFORMATIONAL',
906 id=self.sa.new_msg_id())
907 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
908 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
909 packet = self.create_packet(self.pg0, ike_msg,
910 self.sa.sport, self.sa.dport,
911 self.sa.natt, self.ip6)
912 self.pg0.add_stream(packet)
913 self.pg0.enable_capture()
915 capture = self.pg0.get_capture(1)
916 self.verify_del_sa(capture[0])
919 def find_notify_payload(packet, notify_type):
920 n = packet[ikev2.IKEv2_payload_Notify]
922 if n.type == notify_type:
927 def verify_nat_detection(self, packet):
934 # NAT_DETECTION_SOURCE_IP
935 s = self.find_notify_payload(packet, 16388)
936 self.assertIsNotNone(s)
937 src_sha = self.sa.compute_nat_sha1(
938 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
939 self.assertEqual(s.load, src_sha)
941 # NAT_DETECTION_DESTINATION_IP
942 s = self.find_notify_payload(packet, 16389)
943 self.assertIsNotNone(s)
944 dst_sha = self.sa.compute_nat_sha1(
945 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
946 self.assertEqual(s.load, dst_sha)
948 def verify_sa_init_request(self, packet):
950 self.sa.dport = udp.sport
951 ih = packet[ikev2.IKEv2]
952 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
953 self.assertEqual(ih.exch_type, 34) # SA_INIT
954 self.sa.ispi = ih.init_SPI
955 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
956 self.assertIn('Initiator', ih.flags)
957 self.assertNotIn('Response', ih.flags)
958 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
959 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
961 prop = packet[ikev2.IKEv2_payload_Proposal]
962 self.assertEqual(prop.proto, 1) # proto = ikev2
963 self.assertEqual(prop.proposal, 1)
964 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
965 self.assertEqual(prop.trans[0].transform_id,
966 self.p.ike_transforms['crypto_alg'])
967 self.assertEqual(prop.trans[1].transform_type, 2) # prf
968 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
969 self.assertEqual(prop.trans[2].transform_type, 4) # dh
970 self.assertEqual(prop.trans[2].transform_id,
971 self.p.ike_transforms['dh_group'])
973 self.verify_nat_detection(packet)
974 self.sa.set_ike_props(
975 crypto='AES-GCM-16ICV', crypto_key_len=32,
976 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
977 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
978 integ='SHA2-256-128')
979 self.sa.generate_dh_data()
980 self.sa.complete_dh_data()
983 def update_esp_transforms(self, trans, sa):
985 if trans.transform_type == 1: # ecryption
986 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
987 elif trans.transform_type == 3: # integrity
988 sa.esp_integ = INTEG_IDS[trans.transform_id]
989 trans = trans.payload
991 def verify_sa_auth_req(self, packet):
993 self.sa.dport = udp.sport
994 ih = self.get_ike_header(packet)
995 self.assertEqual(ih.resp_SPI, self.sa.rspi)
996 self.assertEqual(ih.init_SPI, self.sa.ispi)
997 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
998 self.assertIn('Initiator', ih.flags)
999 self.assertNotIn('Response', ih.flags)
1002 self.verify_udp(udp)
1003 self.assertEqual(ih.id, self.sa.msg_id + 1)
1005 plain = self.sa.hmac_and_decrypt(ih)
1006 idi = ikev2.IKEv2_payload_IDi(plain)
1007 self.assertEqual(idi.load, self.sa.i_id)
1008 if self.no_idr_auth:
1009 self.assertEqual(idi.next_payload, 39) # AUTH
1011 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1012 self.assertEqual(idr.load, self.sa.r_id)
1013 prop = idi[ikev2.IKEv2_payload_Proposal]
1014 c = self.sa.child_sas[0]
1016 self.update_esp_transforms(
1017 prop[ikev2.IKEv2_payload_Transform], self.sa)
1019 def send_init_response(self):
1020 tr_attr = self.sa.ike_crypto_attr()
1021 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1022 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1023 key_length=tr_attr[0]) /
1024 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1025 transform_id=self.sa.ike_integ) /
1026 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1027 transform_id=self.sa.ike_prf_alg.name) /
1028 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1029 transform_id=self.sa.ike_dh))
1030 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1031 trans_nb=4, trans=trans))
1033 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1035 dst_address = b'\x0a\x0a\x0a\x0a'
1037 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1038 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1039 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1041 self.sa.init_resp_packet = (
1042 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1043 exch_type='IKE_SA_INIT', flags='Response') /
1044 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1045 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1046 group=self.sa.ike_dh,
1047 load=self.sa.my_dh_pub_key) /
1048 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1049 next_payload='Notify') /
1050 ikev2.IKEv2_payload_Notify(
1051 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1052 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1053 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1055 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1056 self.sa.sport, self.sa.dport,
1058 self.pg_send(self.pg0, ike_msg)
1059 capture = self.pg0.get_capture(1)
1060 self.verify_sa_auth_req(capture[0])
1062 def initiate_sa_init(self):
1063 self.pg0.enable_capture()
1065 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1067 capture = self.pg0.get_capture(1)
1068 self.verify_sa_init_request(capture[0])
1069 self.send_init_response()
1071 def send_auth_response(self):
1072 tr_attr = self.sa.esp_crypto_attr()
1073 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1074 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1075 key_length=tr_attr[0]) /
1076 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1077 transform_id=self.sa.esp_integ) /
1078 ikev2.IKEv2_payload_Transform(
1079 transform_type='Extended Sequence Number',
1080 transform_id='No ESN') /
1081 ikev2.IKEv2_payload_Transform(
1082 transform_type='Extended Sequence Number',
1083 transform_id='ESN'))
1085 c = self.sa.child_sas[0]
1086 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1087 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1089 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1090 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1091 IDtype=self.sa.id_type, load=self.sa.i_id) /
1092 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1093 IDtype=self.sa.id_type, load=self.sa.r_id) /
1094 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1095 auth_type=AuthMethod.value(self.sa.auth_method),
1096 load=self.sa.auth_data) /
1097 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1098 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1099 number_of_TSs=len(tsi),
1100 traffic_selector=tsi) /
1101 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1102 number_of_TSs=len(tsr),
1103 traffic_selector=tsr) /
1104 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1106 header = ikev2.IKEv2(
1107 init_SPI=self.sa.ispi,
1108 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1109 flags='Response', exch_type='IKE_AUTH')
1111 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1112 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1113 self.sa.dport, self.sa.natt, self.ip6)
1114 self.pg_send(self.pg0, packet)
1116 def test_initiator(self):
1117 self.initiate_sa_init()
1119 self.sa.calc_child_keys()
1120 self.send_auth_response()
1121 self.verify_ike_sas()
1124 class TemplateResponder(IkePeer):
1125 """ responder test template """
1127 def initiate_del_sa_from_responder(self):
1128 self.pg0.enable_capture()
1130 self.vapi.ikev2_initiate_del_ike_sa(
1131 ispi=int.from_bytes(self.sa.ispi, 'little'))
1132 capture = self.pg0.get_capture(1)
1133 ih = self.get_ike_header(capture[0])
1134 self.assertNotIn('Response', ih.flags)
1135 self.assertNotIn('Initiator', ih.flags)
1136 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1137 plain = self.sa.hmac_and_decrypt(ih)
1138 d = ikev2.IKEv2_payload_Delete(plain)
1139 self.assertEqual(d.proto, 1) # proto=IKEv2
1140 self.assertEqual(ih.init_SPI, self.sa.ispi)
1141 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1142 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1143 flags='Initiator+Response',
1144 exch_type='INFORMATIONAL',
1145 id=ih.id, next_payload='Encrypted')
1146 resp = self.encrypt_ike_msg(header, b'', None)
1147 self.send_and_assert_no_replies(self.pg0, resp)
1149 def verify_del_sa(self, packet):
1150 ih = self.get_ike_header(packet)
1151 self.assertEqual(ih.id, self.sa.msg_id)
1152 self.assertEqual(ih.exch_type, 37) # exchange informational
1153 self.assertIn('Response', ih.flags)
1154 self.assertNotIn('Initiator', ih.flags)
1155 self.assertEqual(ih.next_payload, 46) # Encrypted
1156 self.assertEqual(ih.init_SPI, self.sa.ispi)
1157 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1158 plain = self.sa.hmac_and_decrypt(ih)
1159 self.assertEqual(plain, b'')
1161 def initiate_del_sa_from_initiator(self):
1162 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1163 flags='Initiator', exch_type='INFORMATIONAL',
1164 id=self.sa.new_msg_id())
1165 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1166 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1167 packet = self.create_packet(self.pg0, ike_msg,
1168 self.sa.sport, self.sa.dport,
1169 self.sa.natt, self.ip6)
1170 self.pg0.add_stream(packet)
1171 self.pg0.enable_capture()
1173 capture = self.pg0.get_capture(1)
1174 self.verify_del_sa(capture[0])
1176 def send_sa_init_req(self):
1177 tr_attr = self.sa.ike_crypto_attr()
1178 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1179 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1180 key_length=tr_attr[0]) /
1181 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1182 transform_id=self.sa.ike_integ) /
1183 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1184 transform_id=self.sa.ike_prf_alg.name) /
1185 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1186 transform_id=self.sa.ike_dh))
1188 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1189 trans_nb=4, trans=trans))
1191 next_payload = None if self.ip6 else 'Notify'
1193 self.sa.init_req_packet = (
1194 ikev2.IKEv2(init_SPI=self.sa.ispi,
1195 flags='Initiator', exch_type='IKE_SA_INIT') /
1196 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1197 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1198 group=self.sa.ike_dh,
1199 load=self.sa.my_dh_pub_key) /
1200 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1201 load=self.sa.i_nonce))
1205 src_address = b'\x0a\x0a\x0a\x01'
1207 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1210 dst_address = b'\x0a\x0a\x0a\x0a'
1212 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1214 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1215 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1216 nat_src_detection = ikev2.IKEv2_payload_Notify(
1217 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1218 next_payload='Notify')
1219 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1220 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1221 self.sa.init_req_packet = (self.sa.init_req_packet /
1225 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1226 self.sa.sport, self.sa.dport,
1227 self.sa.natt, self.ip6)
1228 self.pg0.add_stream(ike_msg)
1229 self.pg0.enable_capture()
1231 capture = self.pg0.get_capture(1)
1232 self.verify_sa_init(capture[0])
1234 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1235 tr_attr = self.sa.esp_crypto_attr()
1236 last_payload = last_payload or 'Notify'
1237 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1238 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1239 key_length=tr_attr[0]) /
1240 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1241 transform_id=self.sa.esp_integ) /
1242 ikev2.IKEv2_payload_Transform(
1243 transform_type='Extended Sequence Number',
1244 transform_id='No ESN') /
1245 ikev2.IKEv2_payload_Transform(
1246 transform_type='Extended Sequence Number',
1247 transform_id='ESN'))
1249 c = self.sa.child_sas[0]
1250 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1251 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1253 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1254 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1255 auth_type=AuthMethod.value(self.sa.auth_method),
1256 load=self.sa.auth_data) /
1257 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1258 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1259 number_of_TSs=len(tsi), traffic_selector=tsi) /
1260 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1261 number_of_TSs=len(tsr), traffic_selector=tsr))
1264 first_payload = 'Nonce'
1265 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1266 next_payload='SA') / plain /
1267 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1268 proto='ESP', SPI=c.ispi))
1270 first_payload = 'IDi'
1271 if self.no_idr_auth:
1272 ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH',
1273 IDtype=self.sa.id_type,
1276 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1277 IDtype=self.sa.id_type, load=self.sa.i_id) /
1278 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1279 IDtype=self.sa.id_type, load=self.sa.r_id))
1281 return plain, first_payload
1283 def send_sa_auth(self):
1284 plain, first_payload = self.generate_auth_payload(
1285 last_payload='Notify')
1286 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1287 header = ikev2.IKEv2(
1288 init_SPI=self.sa.ispi,
1289 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1290 flags='Initiator', exch_type='IKE_AUTH')
1292 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1293 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1294 self.sa.dport, self.sa.natt, self.ip6)
1295 self.pg0.add_stream(packet)
1296 self.pg0.enable_capture()
1298 capture = self.pg0.get_capture(1)
1299 self.verify_sa_auth_resp(capture[0])
1301 def verify_sa_init(self, packet):
1302 ih = self.get_ike_header(packet)
1304 self.assertEqual(ih.id, self.sa.msg_id)
1305 self.assertEqual(ih.exch_type, 34)
1306 self.assertIn('Response', ih.flags)
1307 self.assertEqual(ih.init_SPI, self.sa.ispi)
1308 self.assertNotEqual(ih.resp_SPI, 0)
1309 self.sa.rspi = ih.resp_SPI
1311 sa = ih[ikev2.IKEv2_payload_SA]
1312 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1313 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1314 except IndexError as e:
1315 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1316 self.logger.error(ih.show())
1318 self.sa.complete_dh_data()
1322 def verify_sa_auth_resp(self, packet):
1323 ike = self.get_ike_header(packet)
1325 self.verify_udp(udp)
1326 self.assertEqual(ike.id, self.sa.msg_id)
1327 plain = self.sa.hmac_and_decrypt(ike)
1328 idr = ikev2.IKEv2_payload_IDr(plain)
1329 prop = idr[ikev2.IKEv2_payload_Proposal]
1330 self.assertEqual(prop.SPIsize, 4)
1331 self.sa.child_sas[0].rspi = prop.SPI
1332 self.sa.calc_child_keys()
1334 IKE_NODE_SUFFIX = 'ip4'
1336 def verify_counters(self):
1337 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1338 self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1339 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1341 r = self.vapi.ikev2_sa_dump()
1343 self.assertEqual(1, s.n_sa_auth_req)
1344 self.assertEqual(1, s.n_sa_init_req)
1346 def test_responder(self):
1347 self.send_sa_init_req()
1349 self.verify_ipsec_sas()
1350 self.verify_ike_sas()
1351 self.verify_counters()
1354 class Ikev2Params(object):
1355 def config_params(self, params={}):
1356 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1357 ei = VppEnum.vl_api_ipsec_integ_alg_t
1359 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1360 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1361 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1362 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1363 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1364 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1366 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1367 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1368 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1369 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1371 dpd_disabled = True if 'dpd_disabled' not in params else\
1372 params['dpd_disabled']
1374 self.vapi.cli('ikev2 dpd disable')
1375 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1376 not in params else params['del_sa_from_responder']
1377 i_natt = False if 'i_natt' not in params else params['i_natt']
1378 r_natt = False if 'r_natt' not in params else params['r_natt']
1379 self.p = Profile(self, 'pr1')
1380 self.ip6 = False if 'ip6' not in params else params['ip6']
1382 if 'auth' in params and params['auth'] == 'rsa-sig':
1383 auth_method = 'rsa-sig'
1384 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1385 self.vapi.ikev2_set_local_key(
1386 key_file=work_dir + params['server-key'])
1388 client_file = work_dir + params['client-cert']
1389 server_pem = open(work_dir + params['server-cert']).read()
1390 client_priv = open(work_dir + params['client-key']).read()
1391 client_priv = load_pem_private_key(str.encode(client_priv), None,
1393 self.peer_cert = x509.load_pem_x509_certificate(
1394 str.encode(server_pem),
1396 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1399 auth_data = b'$3cr3tpa$$w0rd'
1400 self.p.add_auth(method='shared-key', data=auth_data)
1401 auth_method = 'shared-key'
1404 is_init = True if 'is_initiator' not in params else\
1405 params['is_initiator']
1406 self.no_idr_auth = params.get('no_idr_in_auth', False)
1408 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1409 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1410 r_id = self.idr = idr['data']
1411 i_id = self.idi = idi['data']
1413 # scapy is initiator, VPP is responder
1414 self.p.add_local_id(**idr)
1415 self.p.add_remote_id(**idi)
1416 if self.no_idr_auth:
1419 # VPP is initiator, scapy is responder
1420 self.p.add_local_id(**idi)
1421 if not self.no_idr_auth:
1422 self.p.add_remote_id(**idr)
1424 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1425 'loc_ts' not in params else params['loc_ts']
1426 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1427 'rem_ts' not in params else params['rem_ts']
1428 self.p.add_local_ts(**loc_ts)
1429 self.p.add_remote_ts(**rem_ts)
1430 if 'responder' in params:
1431 self.p.add_responder(params['responder'])
1432 if 'ike_transforms' in params:
1433 self.p.add_ike_transforms(params['ike_transforms'])
1434 if 'esp_transforms' in params:
1435 self.p.add_esp_transforms(params['esp_transforms'])
1437 udp_encap = False if 'udp_encap' not in params else\
1440 self.p.set_udp_encap(True)
1442 if 'responder_hostname' in params:
1443 hn = params['responder_hostname']
1444 self.p.add_responder_hostname(hn)
1446 # configure static dns record
1447 self.vapi.dns_name_server_add_del(
1449 server_address=IPv4Address(u'8.8.8.8').packed)
1450 self.vapi.dns_enable_disable(enable=1)
1452 cmd = "dns cache add {} {}".format(hn['hostname'],
1453 self.pg0.remote_ip4)
1456 self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id,
1457 is_initiator=is_init,
1458 id_type=self.p.local_id['id_type'],
1459 i_natt=i_natt, r_natt=r_natt,
1460 priv_key=client_priv, auth_method=auth_method,
1461 nonce=params.get('nonce'),
1462 auth_data=auth_data, udp_encap=udp_encap,
1463 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1466 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1467 params['ike-crypto']
1468 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1470 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1473 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1474 params['esp-crypto']
1475 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1478 self.sa.set_ike_props(
1479 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1480 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1481 self.sa.set_esp_props(
1482 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1486 class TestApi(VppTestCase):
1487 """ Test IKEV2 API """
1489 def setUpClass(cls):
1490 super(TestApi, cls).setUpClass()
1493 def tearDownClass(cls):
1494 super(TestApi, cls).tearDownClass()
1497 super(TestApi, self).tearDown()
1498 self.p1.remove_vpp_config()
1499 self.p2.remove_vpp_config()
1500 r = self.vapi.ikev2_profile_dump()
1501 self.assertEqual(len(r), 0)
1503 def configure_profile(self, cfg):
1504 p = Profile(self, cfg['name'])
1505 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1506 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1507 p.add_local_ts(**cfg['loc_ts'])
1508 p.add_remote_ts(**cfg['rem_ts'])
1509 p.add_responder(cfg['responder'])
1510 p.add_ike_transforms(cfg['ike_ts'])
1511 p.add_esp_transforms(cfg['esp_ts'])
1512 p.add_auth(**cfg['auth'])
1513 p.set_udp_encap(cfg['udp_encap'])
1514 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1515 if 'lifetime_data' in cfg:
1516 p.set_lifetime_data(cfg['lifetime_data'])
1517 if 'tun_itf' in cfg:
1518 p.set_tunnel_interface(cfg['tun_itf'])
1519 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1524 def test_profile_api(self):
1525 """ test profile dump API """
1530 'start_addr': '3.3.3.2',
1531 'end_addr': '3.3.3.3',
1537 'start_addr': '4.5.76.80',
1538 'end_addr': '2.3.4.6',
1545 'start_addr': 'ab::1',
1546 'end_addr': 'ab::4',
1552 'start_addr': 'cd::12',
1553 'end_addr': 'cd::13',
1559 'natt_disabled': True,
1560 'loc_id': ('fqdn', b'vpp.home'),
1561 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1564 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1567 'crypto_key_size': 32,
1572 'crypto_key_size': 24,
1574 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1576 'ipsec_over_udp_port': 4501,
1579 'lifetime_maxdata': 20192,
1580 'lifetime_jitter': 9,
1585 'loc_id': ('ip4-addr', b'192.168.2.1'),
1586 'rem_id': ('ip6-addr', b'abcd::1'),
1589 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1592 'crypto_key_size': 16,
1597 'crypto_key_size': 24,
1599 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1601 'ipsec_over_udp_port': 4600,
1604 self.p1 = self.configure_profile(conf['p1'])
1605 self.p2 = self.configure_profile(conf['p2'])
1607 r = self.vapi.ikev2_profile_dump()
1608 self.assertEqual(len(r), 2)
1609 self.verify_profile(r[0].profile, conf['p1'])
1610 self.verify_profile(r[1].profile, conf['p2'])
1612 def verify_id(self, api_id, cfg_id):
1613 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1614 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1616 def verify_ts(self, api_ts, cfg_ts):
1617 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1618 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1619 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1620 self.assertEqual(api_ts.start_addr,
1621 ip_address(text_type(cfg_ts['start_addr'])))
1622 self.assertEqual(api_ts.end_addr,
1623 ip_address(text_type(cfg_ts['end_addr'])))
1625 def verify_responder(self, api_r, cfg_r):
1626 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1627 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1629 def verify_transforms(self, api_ts, cfg_ts):
1630 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1631 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1632 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1634 def verify_ike_transforms(self, api_ts, cfg_ts):
1635 self.verify_transforms(api_ts, cfg_ts)
1636 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1638 def verify_esp_transforms(self, api_ts, cfg_ts):
1639 self.verify_transforms(api_ts, cfg_ts)
1641 def verify_auth(self, api_auth, cfg_auth):
1642 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1643 self.assertEqual(api_auth.data, cfg_auth['data'])
1644 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1646 def verify_lifetime_data(self, p, ld):
1647 self.assertEqual(p.lifetime, ld['lifetime'])
1648 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1649 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1650 self.assertEqual(p.handover, ld['handover'])
1652 def verify_profile(self, ap, cp):
1653 self.assertEqual(ap.name, cp['name'])
1654 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1655 self.verify_id(ap.loc_id, cp['loc_id'])
1656 self.verify_id(ap.rem_id, cp['rem_id'])
1657 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1658 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1659 self.verify_responder(ap.responder, cp['responder'])
1660 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1661 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1662 self.verify_auth(ap.auth, cp['auth'])
1663 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1664 self.assertTrue(natt_dis == ap.natt_disabled)
1666 if 'lifetime_data' in cp:
1667 self.verify_lifetime_data(ap, cp['lifetime_data'])
1668 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1670 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1672 self.assertEqual(ap.tun_itf, 0xffffffff)
1675 @tag_fixme_vpp_workers
1676 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1677 """ test responder - responder behind NAT """
1679 IKE_NODE_SUFFIX = 'ip4-natt'
1681 def config_tc(self):
1682 self.config_params({'r_natt': True})
1685 @tag_fixme_vpp_workers
1686 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1687 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1689 def config_tc(self):
1690 self.config_params({
1692 'is_initiator': False, # seen from test case perspective
1693 # thus vpp is initiator
1694 'responder': {'sw_if_index': self.pg0.sw_if_index,
1695 'addr': self.pg0.remote_ip4},
1696 'ike-crypto': ('AES-GCM-16ICV', 32),
1697 'ike-integ': 'NULL',
1698 'ike-dh': '3072MODPgr',
1700 'crypto_alg': 20, # "aes-gcm-16"
1701 'crypto_key_size': 256,
1702 'dh_group': 15, # "modp-3072"
1705 'crypto_alg': 12, # "aes-cbc"
1706 'crypto_key_size': 256,
1707 # "hmac-sha2-256-128"
1711 @tag_fixme_vpp_workers
1712 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1713 """ test ikev2 initiator - pre shared key auth """
1715 def config_tc(self):
1716 self.config_params({
1717 'is_initiator': False, # seen from test case perspective
1718 # thus vpp is initiator
1719 'ike-crypto': ('AES-GCM-16ICV', 32),
1720 'ike-integ': 'NULL',
1721 'ike-dh': '3072MODPgr',
1723 'crypto_alg': 20, # "aes-gcm-16"
1724 'crypto_key_size': 256,
1725 'dh_group': 15, # "modp-3072"
1728 'crypto_alg': 12, # "aes-cbc"
1729 'crypto_key_size': 256,
1730 # "hmac-sha2-256-128"
1732 'responder_hostname': {'hostname': 'vpp.responder.org',
1733 'sw_if_index': self.pg0.sw_if_index}})
1736 @tag_fixme_vpp_workers
1737 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1738 """ test initiator - request window size (1) """
1740 def rekey_respond(self, req, update_child_sa_data):
1741 ih = self.get_ike_header(req)
1742 plain = self.sa.hmac_and_decrypt(ih)
1743 sa = ikev2.IKEv2_payload_SA(plain)
1744 if update_child_sa_data:
1745 prop = sa[ikev2.IKEv2_payload_Proposal]
1746 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1747 self.sa.r_nonce = self.sa.i_nonce
1748 self.sa.child_sas[0].ispi = prop.SPI
1749 self.sa.child_sas[0].rspi = prop.SPI
1750 self.sa.calc_child_keys()
1752 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1753 flags='Response', exch_type=36,
1754 id=ih.id, next_payload='Encrypted')
1755 resp = self.encrypt_ike_msg(header, sa, 'SA')
1756 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1757 self.sa.dport, self.sa.natt, self.ip6)
1758 self.send_and_assert_no_replies(self.pg0, packet)
1760 def test_initiator(self):
1761 super(TestInitiatorRequestWindowSize, self).test_initiator()
1762 self.pg0.enable_capture()
1764 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1765 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1766 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1767 capture = self.pg0.get_capture(2)
1769 # reply in reverse order
1770 self.rekey_respond(capture[1], True)
1771 self.rekey_respond(capture[0], False)
1773 # verify that only the second request was accepted
1774 self.verify_ike_sas()
1775 self.verify_ipsec_sas(is_rekey=True)
1778 @tag_fixme_vpp_workers
1779 class TestInitiatorRekey(TestInitiatorPsk):
1780 """ test ikev2 initiator - rekey """
1782 def rekey_from_initiator(self):
1783 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1784 self.pg0.enable_capture()
1786 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1787 capture = self.pg0.get_capture(1)
1788 ih = self.get_ike_header(capture[0])
1789 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1790 self.assertNotIn('Response', ih.flags)
1791 self.assertIn('Initiator', ih.flags)
1792 plain = self.sa.hmac_and_decrypt(ih)
1793 sa = ikev2.IKEv2_payload_SA(plain)
1794 prop = sa[ikev2.IKEv2_payload_Proposal]
1795 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1796 self.sa.r_nonce = self.sa.i_nonce
1797 # update new responder SPI
1798 self.sa.child_sas[0].ispi = prop.SPI
1799 self.sa.child_sas[0].rspi = prop.SPI
1800 self.sa.calc_child_keys()
1801 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1802 flags='Response', exch_type=36,
1803 id=ih.id, next_payload='Encrypted')
1804 resp = self.encrypt_ike_msg(header, sa, 'SA')
1805 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1806 self.sa.dport, self.sa.natt, self.ip6)
1807 self.send_and_assert_no_replies(self.pg0, packet)
1809 def test_initiator(self):
1810 super(TestInitiatorRekey, self).test_initiator()
1811 self.rekey_from_initiator()
1812 self.verify_ike_sas()
1813 self.verify_ipsec_sas(is_rekey=True)
1816 @tag_fixme_vpp_workers
1817 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1818 """ test ikev2 initiator - delete IKE SA from responder """
1820 def config_tc(self):
1821 self.config_params({
1822 'del_sa_from_responder': True,
1823 'is_initiator': False, # seen from test case perspective
1824 # thus vpp is initiator
1825 'responder': {'sw_if_index': self.pg0.sw_if_index,
1826 'addr': self.pg0.remote_ip4},
1827 'ike-crypto': ('AES-GCM-16ICV', 32),
1828 'ike-integ': 'NULL',
1829 'ike-dh': '3072MODPgr',
1831 'crypto_alg': 20, # "aes-gcm-16"
1832 'crypto_key_size': 256,
1833 'dh_group': 15, # "modp-3072"
1836 'crypto_alg': 12, # "aes-cbc"
1837 'crypto_key_size': 256,
1838 # "hmac-sha2-256-128"
1840 'no_idr_in_auth': True})
1843 @tag_fixme_vpp_workers
1844 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1845 """ test ikev2 responder - initiator behind NAT """
1847 IKE_NODE_SUFFIX = 'ip4-natt'
1849 def config_tc(self):
1854 @tag_fixme_vpp_workers
1855 class TestResponderPsk(TemplateResponder, Ikev2Params):
1856 """ test ikev2 responder - pre shared key auth """
1857 def config_tc(self):
1858 self.config_params()
1861 @tag_fixme_vpp_workers
1862 class TestResponderDpd(TestResponderPsk):
1864 Dead peer detection test
1866 def config_tc(self):
1867 self.config_params({'dpd_disabled': False})
1872 def test_responder(self):
1873 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1874 super(TestResponderDpd, self).test_responder()
1875 self.pg0.enable_capture()
1877 # capture empty request but don't reply
1878 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1879 ih = self.get_ike_header(capture[0])
1880 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1881 plain = self.sa.hmac_and_decrypt(ih)
1882 self.assertEqual(plain, b'')
1883 # wait for SA expiration
1885 ike_sas = self.vapi.ikev2_sa_dump()
1886 self.assertEqual(len(ike_sas), 0)
1887 ipsec_sas = self.vapi.ipsec_sa_dump()
1888 self.assertEqual(len(ipsec_sas), 0)
1891 @tag_fixme_vpp_workers
1892 class TestResponderRekey(TestResponderPsk):
1893 """ test ikev2 responder - rekey """
1895 def rekey_from_initiator(self):
1896 packet = self.create_rekey_request()
1897 self.pg0.add_stream(packet)
1898 self.pg0.enable_capture()
1900 capture = self.pg0.get_capture(1)
1901 ih = self.get_ike_header(capture[0])
1902 plain = self.sa.hmac_and_decrypt(ih)
1903 sa = ikev2.IKEv2_payload_SA(plain)
1904 prop = sa[ikev2.IKEv2_payload_Proposal]
1905 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1906 # update new responder SPI
1907 self.sa.child_sas[0].rspi = prop.SPI
1909 def test_responder(self):
1910 super(TestResponderRekey, self).test_responder()
1911 self.rekey_from_initiator()
1912 self.sa.calc_child_keys()
1913 self.verify_ike_sas()
1914 self.verify_ipsec_sas(is_rekey=True)
1915 self.assert_counter(1, 'rekey_req', 'ip4')
1916 r = self.vapi.ikev2_sa_dump()
1917 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1920 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1921 """ test ikev2 responder - non-default table id """
1924 def setUpClass(cls):
1925 import scapy.contrib.ikev2 as _ikev2
1926 globals()['ikev2'] = _ikev2
1927 super(IkePeer, cls).setUpClass()
1928 cls.create_pg_interfaces(range(1))
1929 cls.vapi.cli("ip table add 1")
1930 cls.vapi.cli("set interface ip table pg0 1")
1931 for i in cls.pg_interfaces:
1938 def config_tc(self):
1939 self.config_params({'dpd_disabled': False})
1941 def test_responder(self):
1942 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1943 super(TestResponderVrf, self).test_responder()
1944 self.pg0.enable_capture()
1946 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1947 ih = self.get_ike_header(capture[0])
1948 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1949 plain = self.sa.hmac_and_decrypt(ih)
1950 self.assertEqual(plain, b'')
1953 @tag_fixme_vpp_workers
1954 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1955 """ test ikev2 responder - cert based auth """
1956 def config_tc(self):
1957 self.config_params({
1960 'server-key': 'server-key.pem',
1961 'client-key': 'client-key.pem',
1962 'client-cert': 'client-cert.pem',
1963 'server-cert': 'server-cert.pem'})
1966 @tag_fixme_vpp_workers
1967 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1968 (TemplateResponder, Ikev2Params):
1970 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1972 def config_tc(self):
1973 self.config_params({
1974 'ike-crypto': ('AES-CBC', 16),
1975 'ike-integ': 'SHA2-256-128',
1976 'esp-crypto': ('AES-CBC', 24),
1977 'esp-integ': 'SHA2-384-192',
1978 'ike-dh': '2048MODPgr',
1979 'nonce': os.urandom(256),
1980 'no_idr_in_auth': True})
1983 @tag_fixme_vpp_workers
1984 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1985 (TemplateResponder, Ikev2Params):
1988 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1990 def config_tc(self):
1991 self.config_params({
1992 'ike-crypto': ('AES-CBC', 32),
1993 'ike-integ': 'SHA2-256-128',
1994 'esp-crypto': ('AES-GCM-16ICV', 32),
1995 'esp-integ': 'NULL',
1996 'ike-dh': '3072MODPgr'})
1999 @tag_fixme_vpp_workers
2000 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2005 IKE_NODE_SUFFIX = 'ip6'
2007 def config_tc(self):
2008 self.config_params({
2009 'del_sa_from_responder': True,
2012 'ike-crypto': ('AES-GCM-16ICV', 32),
2013 'ike-integ': 'NULL',
2014 'ike-dh': '2048MODPgr',
2015 'loc_ts': {'start_addr': 'ab:cd::0',
2016 'end_addr': 'ab:cd::10'},
2017 'rem_ts': {'start_addr': '11::0',
2018 'end_addr': '11::100'}})
2021 @tag_fixme_vpp_workers
2022 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2024 Test for keep alive messages
2027 def send_empty_req_from_responder(self):
2028 packet = self.create_empty_request()
2029 self.pg0.add_stream(packet)
2030 self.pg0.enable_capture()
2032 capture = self.pg0.get_capture(1)
2033 ih = self.get_ike_header(capture[0])
2034 self.assertEqual(ih.id, self.sa.msg_id)
2035 plain = self.sa.hmac_and_decrypt(ih)
2036 self.assertEqual(plain, b'')
2037 self.assert_counter(1, 'keepalive', 'ip4')
2038 r = self.vapi.ikev2_sa_dump()
2039 self.assertEqual(1, r[0].sa.stats.n_keepalives)
2041 def test_initiator(self):
2042 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2043 self.send_empty_req_from_responder()
2046 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2047 """ malformed packet test """
2052 def config_tc(self):
2053 self.config_params()
2055 def create_ike_init_msg(self, length=None, payload=None):
2056 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2057 flags='Initiator', exch_type='IKE_SA_INIT')
2058 if payload is not None:
2060 return self.create_packet(self.pg0, msg, self.sa.sport,
2063 def verify_bad_packet_length(self):
2064 ike_msg = self.create_ike_init_msg(length=0xdead)
2065 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2066 self.assert_counter(self.pkt_count, 'bad_length')
2068 def verify_bad_sa_payload_length(self):
2069 p = ikev2.IKEv2_payload_SA(length=0xdead)
2070 ike_msg = self.create_ike_init_msg(payload=p)
2071 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2072 self.assert_counter(self.pkt_count, 'malformed_packet')
2074 def test_responder(self):
2075 self.pkt_count = 254
2076 self.verify_bad_packet_length()
2077 self.verify_bad_sa_payload_length()
2080 if __name__ == '__main__':
2081 unittest.main(testRunner=VppTestRunner)