3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 from scapy.layers.ipsec import ESP
16 from scapy.layers.inet import IP, UDP, Ether
17 from scapy.layers.inet6 import IPv6
18 from scapy.packet import raw, Raw
19 from scapy.utils import long_converter
20 from framework import VppTestCase, VppTestRunner
21 from vpp_ikev2 import Profile, IDType, AuthMethod
22 from vpp_papi import VppEnum
29 KEY_PAD = b"Key Pad for IKEv2"
36 # tuple structure is (p, g, key_len)
38 '2048MODPgr': (long_converter("""
39 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
40 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
41 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
42 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
43 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
44 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
45 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
46 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
47 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
48 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
49 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
51 '3072MODPgr': (long_converter("""
52 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
63 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
64 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
65 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
66 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
67 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
71 class CryptoAlgo(object):
72 def __init__(self, name, cipher, mode):
76 if self.cipher is not None:
77 self.bs = self.cipher.block_size // 8
79 if self.name == 'AES-GCM-16ICV':
80 self.iv_len = GCM_IV_SIZE
84 def encrypt(self, data, key, aad=None):
85 iv = os.urandom(self.iv_len)
87 encryptor = Cipher(self.cipher(key), self.mode(iv),
88 default_backend()).encryptor()
89 return iv + encryptor.update(data) + encryptor.finalize()
91 salt = key[-SALT_SIZE:]
93 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
94 default_backend()).encryptor()
95 encryptor.authenticate_additional_data(aad)
96 data = encryptor.update(data) + encryptor.finalize()
97 data += encryptor.tag[:GCM_ICV_SIZE]
100 def decrypt(self, data, key, aad=None, icv=None):
102 iv = data[:self.iv_len]
103 ct = data[self.iv_len:]
104 decryptor = Cipher(algorithms.AES(key),
106 default_backend()).decryptor()
107 return decryptor.update(ct) + decryptor.finalize()
109 salt = key[-SALT_SIZE:]
110 nonce = salt + data[:GCM_IV_SIZE]
111 ct = data[GCM_IV_SIZE:]
112 key = key[:-SALT_SIZE]
113 decryptor = Cipher(algorithms.AES(key),
114 self.mode(nonce, icv, len(icv)),
115 default_backend()).decryptor()
116 decryptor.authenticate_additional_data(aad)
117 return decryptor.update(ct) + decryptor.finalize()
120 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121 data = data + b'\x00' * (pad_len - 1)
122 return data + bytes([pad_len - 1])
125 class AuthAlgo(object):
126 def __init__(self, name, mac, mod, key_len, trunc_len=None):
130 self.key_len = key_len
131 self.trunc_len = trunc_len or key_len
135 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
142 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
150 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
168 class IKEv2ChildSA(object):
169 def __init__(self, local_ts, remote_ts, is_initiator):
177 self.local_ts = local_ts
178 self.remote_ts = remote_ts
181 class IKEv2SA(object):
182 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
183 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
184 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
185 auth_method='shared-key', priv_key=None, natt=False,
187 self.udp_encap = udp_encap
196 self.dh_params = None
198 self.priv_key = priv_key
199 self.is_initiator = is_initiator
200 nonce = nonce or os.urandom(32)
201 self.auth_data = auth_data
204 if isinstance(id_type, str):
205 self.id_type = IDType.value(id_type)
207 self.id_type = id_type
208 self.auth_method = auth_method
209 if self.is_initiator:
210 self.rspi = 8 * b'\x00'
215 self.ispi = 8 * b'\x00'
217 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
220 def new_msg_id(self):
225 def my_dh_pub_key(self):
226 if self.is_initiator:
227 return self.i_dh_data
228 return self.r_dh_data
231 def peer_dh_pub_key(self):
232 if self.is_initiator:
233 return self.r_dh_data
234 return self.i_dh_data
236 def compute_secret(self):
237 priv = self.dh_private_key
238 peer = self.peer_dh_pub_key
239 p, g, l = self.ike_group
240 return pow(int.from_bytes(peer, 'big'),
241 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
243 def generate_dh_data(self):
245 if self.ike_dh not in DH:
246 raise NotImplementedError('%s not in DH group' % self.ike_dh)
248 if self.dh_params is None:
249 dhg = DH[self.ike_dh]
250 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
251 self.dh_params = pn.parameters(default_backend())
253 priv = self.dh_params.generate_private_key()
254 pub = priv.public_key()
255 x = priv.private_numbers().x
256 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
257 y = pub.public_numbers().y
259 if self.is_initiator:
260 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
262 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
264 def complete_dh_data(self):
265 self.dh_shared_secret = self.compute_secret()
267 def calc_child_keys(self):
268 prf = self.ike_prf_alg.mod()
269 s = self.i_nonce + self.r_nonce
270 c = self.child_sas[0]
272 encr_key_len = self.esp_crypto_key_len
273 integ_key_len = self.esp_integ_alg.key_len
274 salt_len = 0 if integ_key_len else 4
276 l = (integ_key_len * 2 +
279 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
282 c.sk_ei = keymat[pos:pos+encr_key_len]
286 c.sk_ai = keymat[pos:pos+integ_key_len]
289 c.salt_ei = keymat[pos:pos+salt_len]
292 c.sk_er = keymat[pos:pos+encr_key_len]
296 c.sk_ar = keymat[pos:pos+integ_key_len]
299 c.salt_er = keymat[pos:pos+salt_len]
302 def calc_prfplus(self, prf, key, seed, length):
306 while len(r) < length and x < 255:
311 s = s + seed + bytes([x])
312 t = self.calc_prf(prf, key, s)
320 def calc_prf(self, prf, key, data):
321 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
326 prf = self.ike_prf_alg.mod()
327 # SKEYSEED = prf(Ni | Nr, g^ir)
328 s = self.i_nonce + self.r_nonce
329 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
331 # calculate S = Ni | Nr | SPIi SPIr
332 s = s + self.ispi + self.rspi
334 prf_key_trunc = self.ike_prf_alg.trunc_len
335 encr_key_len = self.ike_crypto_key_len
336 tr_prf_key_len = self.ike_prf_alg.key_len
337 integ_key_len = self.ike_integ_alg.key_len
338 if integ_key_len == 0:
348 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
351 self.sk_d = keymat[:pos+prf_key_trunc]
354 self.sk_ai = keymat[pos:pos+integ_key_len]
356 self.sk_ar = keymat[pos:pos+integ_key_len]
359 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
360 pos += encr_key_len + salt_size
361 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
362 pos += encr_key_len + salt_size
364 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
365 pos += tr_prf_key_len
366 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
368 def generate_authmsg(self, prf, packet):
369 if self.is_initiator:
377 data = bytes([self.id_type, 0, 0, 0]) + id
378 id_hash = self.calc_prf(prf, key, data)
379 return packet + nonce + id_hash
382 prf = self.ike_prf_alg.mod()
383 if self.is_initiator:
384 packet = self.init_req_packet
386 packet = self.init_resp_packet
387 authmsg = self.generate_authmsg(prf, raw(packet))
388 if self.auth_method == 'shared-key':
389 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
390 self.auth_data = self.calc_prf(prf, psk, authmsg)
391 elif self.auth_method == 'rsa-sig':
392 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
395 raise TypeError('unknown auth method type!')
397 def encrypt(self, data, aad=None):
398 data = self.ike_crypto_alg.pad(data)
399 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
402 def peer_authkey(self):
403 if self.is_initiator:
408 def my_authkey(self):
409 if self.is_initiator:
414 def my_cryptokey(self):
415 if self.is_initiator:
420 def peer_cryptokey(self):
421 if self.is_initiator:
425 def concat(self, alg, key_len):
426 return alg + '-' + str(key_len * 8)
429 def vpp_ike_cypto_alg(self):
430 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
433 def vpp_esp_cypto_alg(self):
434 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
436 def verify_hmac(self, ikemsg):
437 integ_trunc = self.ike_integ_alg.trunc_len
438 exp_hmac = ikemsg[-integ_trunc:]
439 data = ikemsg[:-integ_trunc]
440 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
441 self.peer_authkey, data)
442 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
444 def compute_hmac(self, integ, key, data):
445 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
449 def decrypt(self, data, aad=None, icv=None):
450 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
452 def hmac_and_decrypt(self, ike):
453 ep = ike[ikev2.IKEv2_payload_Encrypted]
454 if self.ike_crypto == 'AES-GCM-16ICV':
455 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
456 ct = ep.load[:-GCM_ICV_SIZE]
457 tag = ep.load[-GCM_ICV_SIZE:]
458 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
460 self.verify_hmac(raw(ike))
461 integ_trunc = self.ike_integ_alg.trunc_len
463 # remove ICV and decrypt payload
464 ct = ep.load[:-integ_trunc]
465 plain = self.decrypt(ct)
468 return plain[:-pad_len - 1]
470 def build_ts_addr(self, ts, version):
471 return {'starting_address_v' + version: ts['start_addr'],
472 'ending_address_v' + version: ts['end_addr']}
474 def generate_ts(self, is_ip4):
475 c = self.child_sas[0]
476 ts_data = {'IP_protocol_ID': 0,
480 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
481 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
482 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
483 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
485 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
486 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
487 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
488 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
490 if self.is_initiator:
491 return ([ts1], [ts2])
492 return ([ts2], [ts1])
494 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
495 if crypto not in CRYPTO_ALGOS:
496 raise TypeError('unsupported encryption algo %r' % crypto)
497 self.ike_crypto = crypto
498 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
499 self.ike_crypto_key_len = crypto_key_len
501 if integ not in AUTH_ALGOS:
502 raise TypeError('unsupported auth algo %r' % integ)
503 self.ike_integ = None if integ == 'NULL' else integ
504 self.ike_integ_alg = AUTH_ALGOS[integ]
506 if prf not in PRF_ALGOS:
507 raise TypeError('unsupported prf algo %r' % prf)
509 self.ike_prf_alg = PRF_ALGOS[prf]
511 self.ike_group = DH[self.ike_dh]
513 def set_esp_props(self, crypto, crypto_key_len, integ):
514 self.esp_crypto_key_len = crypto_key_len
515 if crypto not in CRYPTO_ALGOS:
516 raise TypeError('unsupported encryption algo %r' % crypto)
517 self.esp_crypto = crypto
518 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
520 if integ not in AUTH_ALGOS:
521 raise TypeError('unsupported auth algo %r' % integ)
522 self.esp_integ = None if integ == 'NULL' else integ
523 self.esp_integ_alg = AUTH_ALGOS[integ]
525 def crypto_attr(self, key_len):
526 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
527 return (0x800e << 16 | key_len << 3, 12)
529 raise Exception('unsupported attribute type')
531 def ike_crypto_attr(self):
532 return self.crypto_attr(self.ike_crypto_key_len)
534 def esp_crypto_attr(self):
535 return self.crypto_attr(self.esp_crypto_key_len)
537 def compute_nat_sha1(self, ip, port, rspi=None):
540 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
541 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
543 return digest.finalize()
546 class IkePeer(VppTestCase):
547 """ common class for initiator and responder """
551 import scapy.contrib.ikev2 as _ikev2
552 globals()['ikev2'] = _ikev2
553 super(IkePeer, cls).setUpClass()
554 cls.create_pg_interfaces(range(2))
555 for i in cls.pg_interfaces:
563 def tearDownClass(cls):
564 super(IkePeer, cls).tearDownClass()
567 super(IkePeer, self).tearDown()
568 if self.del_sa_from_responder:
569 self.initiate_del_sa_from_responder()
571 self.initiate_del_sa_from_initiator()
572 r = self.vapi.ikev2_sa_dump()
573 self.assertEqual(len(r), 0)
574 sas = self.vapi.ipsec_sa_dump()
575 self.assertEqual(len(sas), 0)
576 self.p.remove_vpp_config()
577 self.assertIsNone(self.p.query_vpp_config())
580 super(IkePeer, self).setUp()
582 self.p.add_vpp_config()
583 self.assertIsNotNone(self.p.query_vpp_config())
584 if self.sa.is_initiator:
585 self.sa.generate_dh_data()
586 self.vapi.cli('ikev2 set logging level 4')
587 self.vapi.cli('event-lo clear')
589 def create_rekey_request(self):
590 sa, first_payload = self.generate_auth_payload(is_rekey=True)
591 header = ikev2.IKEv2(
592 init_SPI=self.sa.ispi,
593 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
594 flags='Initiator', exch_type='CREATE_CHILD_SA')
596 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
597 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
598 self.sa.dport, self.sa.natt, self.ip6)
600 def create_empty_request(self):
601 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
602 id=self.sa.new_msg_id(), flags='Initiator',
603 exch_type='INFORMATIONAL',
604 next_payload='Encrypted')
606 msg = self.encrypt_ike_msg(header, b'', None)
607 return self.create_packet(self.pg0, msg, self.sa.sport,
608 self.sa.dport, self.sa.natt, self.ip6)
610 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
613 src_ip = src_if.remote_ip6
614 dst_ip = src_if.local_ip6
617 src_ip = src_if.remote_ip4
618 dst_ip = src_if.local_ip4
620 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
621 ip_layer(src=src_ip, dst=dst_ip) /
622 UDP(sport=sport, dport=dport))
624 # insert non ESP marker
625 res = res / Raw(b'\x00' * 4)
628 def verify_udp(self, udp):
629 self.assertEqual(udp.sport, self.sa.sport)
630 self.assertEqual(udp.dport, self.sa.dport)
632 def get_ike_header(self, packet):
634 ih = packet[ikev2.IKEv2]
635 except IndexError as e:
636 # this is a workaround for getting IKEv2 layer as both ikev2 and
637 # ipsec register for port 4500
639 ih = self.verify_and_remove_non_esp_marker(esp)
640 self.assertEqual(ih.version, 0x20)
641 self.assertNotIn('Version', ih.flags)
644 def verify_and_remove_non_esp_marker(self, packet):
646 # if we are in nat traversal mode check for non esp marker
649 self.assertEqual(data[:4], b'\x00' * 4)
650 return ikev2.IKEv2(data[4:])
654 def encrypt_ike_msg(self, header, plain, first_payload):
655 if self.sa.ike_crypto == 'AES-GCM-16ICV':
656 data = self.sa.ike_crypto_alg.pad(raw(plain))
657 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
658 len(ikev2.IKEv2_payload_Encrypted())
659 tlen = plen + len(ikev2.IKEv2())
662 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
666 encr = self.sa.encrypt(raw(plain), raw(res))
667 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
668 length=plen, load=encr)
671 encr = self.sa.encrypt(raw(plain))
672 trunc_len = self.sa.ike_integ_alg.trunc_len
673 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
674 tlen = plen + len(ikev2.IKEv2())
676 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
677 length=plen, load=encr)
681 integ_data = raw(res)
682 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
683 self.sa.my_authkey, integ_data)
684 res = res / Raw(hmac_data[:trunc_len])
685 assert(len(res) == tlen)
688 def verify_udp_encap(self, ipsec_sa):
689 e = VppEnum.vl_api_ipsec_sad_flags_t
690 if self.sa.udp_encap or self.sa.natt:
691 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
693 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
695 def verify_ipsec_sas(self, is_rekey=False):
696 sas = self.vapi.ipsec_sa_dump()
698 # after rekey there is a short period of time in which old
699 # inbound SA is still present
703 self.assertEqual(len(sas), sa_count)
704 if self.sa.is_initiator:
719 c = self.sa.child_sas[0]
721 self.verify_udp_encap(sa0)
722 self.verify_udp_encap(sa1)
723 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
724 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
725 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
727 if self.sa.esp_integ is None:
730 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
731 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
732 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
735 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
736 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
737 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
738 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
742 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
743 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
744 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
745 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
747 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
748 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
750 def verify_keymat(self, api_keys, keys, name):
751 km = getattr(keys, name)
752 api_km = getattr(api_keys, name)
753 api_km_len = getattr(api_keys, name + '_len')
754 self.assertEqual(len(km), api_km_len)
755 self.assertEqual(km, api_km[:api_km_len])
757 def verify_id(self, api_id, exp_id):
758 self.assertEqual(api_id.type, IDType.value(exp_id.type))
759 self.assertEqual(api_id.data_len, exp_id.data_len)
760 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
762 def verify_ike_sas(self):
763 r = self.vapi.ikev2_sa_dump()
764 self.assertEqual(len(r), 1)
766 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
767 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
769 if self.sa.is_initiator:
770 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
771 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
773 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
774 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
776 if self.sa.is_initiator:
777 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
778 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
780 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
781 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
782 self.verify_keymat(sa.keys, self.sa, 'sk_d')
783 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
784 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
785 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
786 self.verify_keymat(sa.keys, self.sa, 'sk_er')
787 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
788 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
790 self.assertEqual(sa.i_id.type, self.sa.id_type)
791 self.assertEqual(sa.r_id.type, self.sa.id_type)
792 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
793 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
794 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
795 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
797 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
798 self.assertEqual(len(r), 1)
800 self.assertEqual(csa.sa_index, sa.sa_index)
801 c = self.sa.child_sas[0]
802 if hasattr(c, 'sk_ai'):
803 self.verify_keymat(csa.keys, c, 'sk_ai')
804 self.verify_keymat(csa.keys, c, 'sk_ar')
805 self.verify_keymat(csa.keys, c, 'sk_ei')
806 self.verify_keymat(csa.keys, c, 'sk_er')
807 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
808 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
810 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
813 r = self.vapi.ikev2_traffic_selector_dump(
814 is_initiator=True, sa_index=sa.sa_index,
815 child_sa_index=csa.child_sa_index)
816 self.assertEqual(len(r), 1)
818 self.verify_ts(r[0].ts, tsi[0], True)
820 r = self.vapi.ikev2_traffic_selector_dump(
821 is_initiator=False, sa_index=sa.sa_index,
822 child_sa_index=csa.child_sa_index)
823 self.assertEqual(len(r), 1)
824 self.verify_ts(r[0].ts, tsr[0], False)
826 n = self.vapi.ikev2_nonce_get(is_initiator=True,
827 sa_index=sa.sa_index)
828 self.verify_nonce(n, self.sa.i_nonce)
829 n = self.vapi.ikev2_nonce_get(is_initiator=False,
830 sa_index=sa.sa_index)
831 self.verify_nonce(n, self.sa.r_nonce)
833 def verify_nonce(self, api_nonce, nonce):
834 self.assertEqual(api_nonce.data_len, len(nonce))
835 self.assertEqual(api_nonce.nonce, nonce)
837 def verify_ts(self, api_ts, ts, is_initiator):
839 self.assertTrue(api_ts.is_local)
841 self.assertFalse(api_ts.is_local)
844 self.assertEqual(api_ts.start_addr,
845 IPv4Address(ts.starting_address_v4))
846 self.assertEqual(api_ts.end_addr,
847 IPv4Address(ts.ending_address_v4))
849 self.assertEqual(api_ts.start_addr,
850 IPv6Address(ts.starting_address_v6))
851 self.assertEqual(api_ts.end_addr,
852 IPv6Address(ts.ending_address_v6))
853 self.assertEqual(api_ts.start_port, ts.start_port)
854 self.assertEqual(api_ts.end_port, ts.end_port)
855 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
858 class TemplateInitiator(IkePeer):
859 """ initiator test template """
861 def initiate_del_sa_from_initiator(self):
862 ispi = int.from_bytes(self.sa.ispi, 'little')
863 self.pg0.enable_capture()
865 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
866 capture = self.pg0.get_capture(1)
867 ih = self.get_ike_header(capture[0])
868 self.assertNotIn('Response', ih.flags)
869 self.assertIn('Initiator', ih.flags)
870 self.assertEqual(ih.init_SPI, self.sa.ispi)
871 self.assertEqual(ih.resp_SPI, self.sa.rspi)
872 plain = self.sa.hmac_and_decrypt(ih)
873 d = ikev2.IKEv2_payload_Delete(plain)
874 self.assertEqual(d.proto, 1) # proto=IKEv2
875 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
876 flags='Response', exch_type='INFORMATIONAL',
877 id=ih.id, next_payload='Encrypted')
878 resp = self.encrypt_ike_msg(header, b'', None)
879 self.send_and_assert_no_replies(self.pg0, resp)
881 def verify_del_sa(self, packet):
882 ih = self.get_ike_header(packet)
883 self.assertEqual(ih.id, self.sa.msg_id)
884 self.assertEqual(ih.exch_type, 37) # exchange informational
885 self.assertIn('Response', ih.flags)
886 self.assertIn('Initiator', ih.flags)
887 plain = self.sa.hmac_and_decrypt(ih)
888 self.assertEqual(plain, b'')
890 def initiate_del_sa_from_responder(self):
891 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
892 exch_type='INFORMATIONAL',
893 id=self.sa.new_msg_id())
894 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
895 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
896 packet = self.create_packet(self.pg0, ike_msg,
897 self.sa.sport, self.sa.dport,
898 self.sa.natt, self.ip6)
899 self.pg0.add_stream(packet)
900 self.pg0.enable_capture()
902 capture = self.pg0.get_capture(1)
903 self.verify_del_sa(capture[0])
906 def find_notify_payload(packet, notify_type):
907 n = packet[ikev2.IKEv2_payload_Notify]
909 if n.type == notify_type:
914 def verify_nat_detection(self, packet):
921 # NAT_DETECTION_SOURCE_IP
922 s = self.find_notify_payload(packet, 16388)
923 self.assertIsNotNone(s)
924 src_sha = self.sa.compute_nat_sha1(
925 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
926 self.assertEqual(s.load, src_sha)
928 # NAT_DETECTION_DESTINATION_IP
929 s = self.find_notify_payload(packet, 16389)
930 self.assertIsNotNone(s)
931 dst_sha = self.sa.compute_nat_sha1(
932 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
933 self.assertEqual(s.load, dst_sha)
935 def verify_sa_init_request(self, packet):
936 ih = packet[ikev2.IKEv2]
937 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
938 self.assertEqual(ih.exch_type, 34) # SA_INIT
939 self.sa.ispi = ih.init_SPI
940 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
941 self.assertIn('Initiator', ih.flags)
942 self.assertNotIn('Response', ih.flags)
943 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
944 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
946 prop = packet[ikev2.IKEv2_payload_Proposal]
947 self.assertEqual(prop.proto, 1) # proto = ikev2
948 self.assertEqual(prop.proposal, 1)
949 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
950 self.assertEqual(prop.trans[0].transform_id,
951 self.p.ike_transforms['crypto_alg'])
952 self.assertEqual(prop.trans[1].transform_type, 2) # prf
953 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
954 self.assertEqual(prop.trans[2].transform_type, 4) # dh
955 self.assertEqual(prop.trans[2].transform_id,
956 self.p.ike_transforms['dh_group'])
958 self.verify_nat_detection(packet)
959 self.sa.set_ike_props(
960 crypto='AES-GCM-16ICV', crypto_key_len=32,
961 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
962 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
963 integ='SHA2-256-128')
964 self.sa.generate_dh_data()
965 self.sa.complete_dh_data()
968 def update_esp_transforms(self, trans, sa):
970 if trans.transform_type == 1: # ecryption
971 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
972 elif trans.transform_type == 3: # integrity
973 sa.esp_integ = INTEG_IDS[trans.transform_id]
974 trans = trans.payload
976 def verify_sa_auth_req(self, packet):
977 ih = self.get_ike_header(packet)
978 self.assertEqual(ih.resp_SPI, self.sa.rspi)
979 self.assertEqual(ih.init_SPI, self.sa.ispi)
980 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
981 self.assertIn('Initiator', ih.flags)
982 self.assertNotIn('Response', ih.flags)
986 self.assertEqual(ih.id, self.sa.msg_id + 1)
988 plain = self.sa.hmac_and_decrypt(ih)
989 idi = ikev2.IKEv2_payload_IDi(plain)
990 idr = ikev2.IKEv2_payload_IDr(idi.payload)
991 self.assertEqual(idi.load, self.sa.i_id)
992 self.assertEqual(idr.load, self.sa.r_id)
993 prop = idi[ikev2.IKEv2_payload_Proposal]
994 c = self.sa.child_sas[0]
996 self.update_esp_transforms(
997 prop[ikev2.IKEv2_payload_Transform], self.sa)
999 def send_init_response(self):
1000 tr_attr = self.sa.ike_crypto_attr()
1001 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1002 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1003 key_length=tr_attr[0]) /
1004 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1005 transform_id=self.sa.ike_integ) /
1006 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1007 transform_id=self.sa.ike_prf_alg.name) /
1008 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1009 transform_id=self.sa.ike_dh))
1010 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1011 trans_nb=4, trans=trans))
1012 self.sa.init_resp_packet = (
1013 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1014 exch_type='IKE_SA_INIT', flags='Response') /
1015 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1016 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1017 group=self.sa.ike_dh,
1018 load=self.sa.my_dh_pub_key) /
1019 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
1021 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1022 self.sa.sport, self.sa.dport,
1023 self.sa.natt, self.ip6)
1024 self.pg_send(self.pg0, ike_msg)
1025 capture = self.pg0.get_capture(1)
1026 self.verify_sa_auth_req(capture[0])
1028 def initiate_sa_init(self):
1029 self.pg0.enable_capture()
1031 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1033 capture = self.pg0.get_capture(1)
1034 self.verify_sa_init_request(capture[0])
1035 self.send_init_response()
1037 def send_auth_response(self):
1038 tr_attr = self.sa.esp_crypto_attr()
1039 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1040 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1041 key_length=tr_attr[0]) /
1042 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1043 transform_id=self.sa.esp_integ) /
1044 ikev2.IKEv2_payload_Transform(
1045 transform_type='Extended Sequence Number',
1046 transform_id='No ESN') /
1047 ikev2.IKEv2_payload_Transform(
1048 transform_type='Extended Sequence Number',
1049 transform_id='ESN'))
1051 c = self.sa.child_sas[0]
1052 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1053 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1055 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1056 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1057 IDtype=self.sa.id_type, load=self.sa.i_id) /
1058 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1059 IDtype=self.sa.id_type, load=self.sa.r_id) /
1060 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1061 auth_type=AuthMethod.value(self.sa.auth_method),
1062 load=self.sa.auth_data) /
1063 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1064 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1065 number_of_TSs=len(tsi),
1066 traffic_selector=tsi) /
1067 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1068 number_of_TSs=len(tsr),
1069 traffic_selector=tsr) /
1070 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1072 header = ikev2.IKEv2(
1073 init_SPI=self.sa.ispi,
1074 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1075 flags='Response', exch_type='IKE_AUTH')
1077 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1078 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1079 self.sa.dport, self.sa.natt, self.ip6)
1080 self.pg_send(self.pg0, packet)
1082 def test_initiator(self):
1083 self.initiate_sa_init()
1085 self.sa.calc_child_keys()
1086 self.send_auth_response()
1087 self.verify_ike_sas()
1090 class TemplateResponder(IkePeer):
1091 """ responder test template """
1093 def initiate_del_sa_from_responder(self):
1094 self.pg0.enable_capture()
1096 self.vapi.ikev2_initiate_del_ike_sa(
1097 ispi=int.from_bytes(self.sa.ispi, 'little'))
1098 capture = self.pg0.get_capture(1)
1099 ih = self.get_ike_header(capture[0])
1100 self.assertNotIn('Response', ih.flags)
1101 self.assertNotIn('Initiator', ih.flags)
1102 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1103 plain = self.sa.hmac_and_decrypt(ih)
1104 d = ikev2.IKEv2_payload_Delete(plain)
1105 self.assertEqual(d.proto, 1) # proto=IKEv2
1106 self.assertEqual(ih.init_SPI, self.sa.ispi)
1107 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1108 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1109 flags='Initiator+Response',
1110 exch_type='INFORMATIONAL',
1111 id=ih.id, next_payload='Encrypted')
1112 resp = self.encrypt_ike_msg(header, b'', None)
1113 self.send_and_assert_no_replies(self.pg0, resp)
1115 def verify_del_sa(self, packet):
1116 ih = self.get_ike_header(packet)
1117 self.assertEqual(ih.id, self.sa.msg_id)
1118 self.assertEqual(ih.exch_type, 37) # exchange informational
1119 self.assertIn('Response', ih.flags)
1120 self.assertNotIn('Initiator', ih.flags)
1121 self.assertEqual(ih.next_payload, 46) # Encrypted
1122 self.assertEqual(ih.init_SPI, self.sa.ispi)
1123 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1124 plain = self.sa.hmac_and_decrypt(ih)
1125 self.assertEqual(plain, b'')
1127 def initiate_del_sa_from_initiator(self):
1128 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1129 flags='Initiator', exch_type='INFORMATIONAL',
1130 id=self.sa.new_msg_id())
1131 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1132 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1133 packet = self.create_packet(self.pg0, ike_msg,
1134 self.sa.sport, self.sa.dport,
1135 self.sa.natt, self.ip6)
1136 self.pg0.add_stream(packet)
1137 self.pg0.enable_capture()
1139 capture = self.pg0.get_capture(1)
1140 self.verify_del_sa(capture[0])
1142 def send_sa_init_req(self, behind_nat=False):
1143 tr_attr = self.sa.ike_crypto_attr()
1144 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1145 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1146 key_length=tr_attr[0]) /
1147 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1148 transform_id=self.sa.ike_integ) /
1149 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1150 transform_id=self.sa.ike_prf_alg.name) /
1151 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1152 transform_id=self.sa.ike_dh))
1154 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1155 trans_nb=4, trans=trans))
1157 self.sa.init_req_packet = (
1158 ikev2.IKEv2(init_SPI=self.sa.ispi,
1159 flags='Initiator', exch_type='IKE_SA_INIT') /
1160 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1161 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1162 group=self.sa.ike_dh,
1163 load=self.sa.my_dh_pub_key) /
1164 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1165 load=self.sa.i_nonce))
1168 src_address = b'\x0a\x0a\x0a\x01'
1170 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1172 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1173 dst_nat = self.sa.compute_nat_sha1(
1174 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1176 nat_src_detection = ikev2.IKEv2_payload_Notify(
1177 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1178 next_payload='Notify')
1179 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1180 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1181 self.sa.init_req_packet = (self.sa.init_req_packet /
1185 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1186 self.sa.sport, self.sa.dport,
1187 self.sa.natt, self.ip6)
1188 self.pg0.add_stream(ike_msg)
1189 self.pg0.enable_capture()
1191 capture = self.pg0.get_capture(1)
1192 self.verify_sa_init(capture[0])
1194 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1195 tr_attr = self.sa.esp_crypto_attr()
1196 last_payload = last_payload or 'Notify'
1197 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1198 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1199 key_length=tr_attr[0]) /
1200 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1201 transform_id=self.sa.esp_integ) /
1202 ikev2.IKEv2_payload_Transform(
1203 transform_type='Extended Sequence Number',
1204 transform_id='No ESN') /
1205 ikev2.IKEv2_payload_Transform(
1206 transform_type='Extended Sequence Number',
1207 transform_id='ESN'))
1209 c = self.sa.child_sas[0]
1210 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1211 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1213 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1214 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1215 auth_type=AuthMethod.value(self.sa.auth_method),
1216 load=self.sa.auth_data) /
1217 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1218 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1219 number_of_TSs=len(tsi), traffic_selector=tsi) /
1220 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1221 number_of_TSs=len(tsr), traffic_selector=tsr))
1224 first_payload = 'Nonce'
1225 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1226 next_payload='SA') / plain /
1227 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1228 proto='ESP', SPI=c.ispi))
1230 first_payload = 'IDi'
1231 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1232 IDtype=self.sa.id_type, load=self.sa.i_id) /
1233 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1234 IDtype=self.sa.id_type, load=self.sa.r_id))
1236 return plain, first_payload
1238 def send_sa_auth(self):
1239 plain, first_payload = self.generate_auth_payload(
1240 last_payload='Notify')
1241 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1242 header = ikev2.IKEv2(
1243 init_SPI=self.sa.ispi,
1244 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1245 flags='Initiator', exch_type='IKE_AUTH')
1247 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1248 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1249 self.sa.dport, self.sa.natt, self.ip6)
1250 self.pg0.add_stream(packet)
1251 self.pg0.enable_capture()
1253 capture = self.pg0.get_capture(1)
1254 self.verify_sa_auth_resp(capture[0])
1256 def verify_sa_init(self, packet):
1257 ih = self.get_ike_header(packet)
1259 self.assertEqual(ih.id, self.sa.msg_id)
1260 self.assertEqual(ih.exch_type, 34)
1261 self.assertIn('Response', ih.flags)
1262 self.assertEqual(ih.init_SPI, self.sa.ispi)
1263 self.assertNotEqual(ih.resp_SPI, 0)
1264 self.sa.rspi = ih.resp_SPI
1266 sa = ih[ikev2.IKEv2_payload_SA]
1267 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1268 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1269 except IndexError as e:
1270 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1271 self.logger.error(ih.show())
1273 self.sa.complete_dh_data()
1277 def verify_sa_auth_resp(self, packet):
1278 ike = self.get_ike_header(packet)
1280 self.verify_udp(udp)
1281 self.assertEqual(ike.id, self.sa.msg_id)
1282 plain = self.sa.hmac_and_decrypt(ike)
1283 idr = ikev2.IKEv2_payload_IDr(plain)
1284 prop = idr[ikev2.IKEv2_payload_Proposal]
1285 self.assertEqual(prop.SPIsize, 4)
1286 self.sa.child_sas[0].rspi = prop.SPI
1287 self.sa.calc_child_keys()
1289 def test_responder(self):
1290 self.send_sa_init_req(self.sa.natt)
1292 self.verify_ipsec_sas()
1293 self.verify_ike_sas()
1296 class Ikev2Params(object):
1297 def config_params(self, params={}):
1298 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1299 ei = VppEnum.vl_api_ipsec_integ_alg_t
1301 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1302 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1303 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1304 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1305 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1306 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1308 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1309 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1310 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1311 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1313 dpd_disabled = True if 'dpd_disabled' not in params else\
1314 params['dpd_disabled']
1316 self.vapi.cli('ikev2 dpd disable')
1317 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1318 not in params else params['del_sa_from_responder']
1319 is_natt = 'natt' in params and params['natt'] or False
1320 self.p = Profile(self, 'pr1')
1321 self.ip6 = False if 'ip6' not in params else params['ip6']
1323 if 'auth' in params and params['auth'] == 'rsa-sig':
1324 auth_method = 'rsa-sig'
1325 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1326 self.vapi.ikev2_set_local_key(
1327 key_file=work_dir + params['server-key'])
1329 client_file = work_dir + params['client-cert']
1330 server_pem = open(work_dir + params['server-cert']).read()
1331 client_priv = open(work_dir + params['client-key']).read()
1332 client_priv = load_pem_private_key(str.encode(client_priv), None,
1334 self.peer_cert = x509.load_pem_x509_certificate(
1335 str.encode(server_pem),
1337 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1340 auth_data = b'$3cr3tpa$$w0rd'
1341 self.p.add_auth(method='shared-key', data=auth_data)
1342 auth_method = 'shared-key'
1345 is_init = True if 'is_initiator' not in params else\
1346 params['is_initiator']
1348 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1349 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1351 self.p.add_local_id(**idr)
1352 self.p.add_remote_id(**idi)
1354 self.p.add_local_id(**idi)
1355 self.p.add_remote_id(**idr)
1357 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1358 'loc_ts' not in params else params['loc_ts']
1359 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1360 'rem_ts' not in params else params['rem_ts']
1361 self.p.add_local_ts(**loc_ts)
1362 self.p.add_remote_ts(**rem_ts)
1363 if 'responder' in params:
1364 self.p.add_responder(params['responder'])
1365 if 'ike_transforms' in params:
1366 self.p.add_ike_transforms(params['ike_transforms'])
1367 if 'esp_transforms' in params:
1368 self.p.add_esp_transforms(params['esp_transforms'])
1370 udp_encap = False if 'udp_encap' not in params else\
1373 self.p.set_udp_encap(True)
1375 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1376 is_initiator=is_init,
1377 id_type=self.p.local_id['id_type'], natt=is_natt,
1378 priv_key=client_priv, auth_method=auth_method,
1379 auth_data=auth_data, udp_encap=udp_encap,
1380 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1382 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1383 params['ike-crypto']
1384 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1386 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1389 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1390 params['esp-crypto']
1391 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1394 self.sa.set_ike_props(
1395 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1396 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1397 self.sa.set_esp_props(
1398 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1402 class TestApi(VppTestCase):
1403 """ Test IKEV2 API """
1405 def setUpClass(cls):
1406 super(TestApi, cls).setUpClass()
1409 def tearDownClass(cls):
1410 super(TestApi, cls).tearDownClass()
1413 super(TestApi, self).tearDown()
1414 self.p1.remove_vpp_config()
1415 self.p2.remove_vpp_config()
1416 r = self.vapi.ikev2_profile_dump()
1417 self.assertEqual(len(r), 0)
1419 def configure_profile(self, cfg):
1420 p = Profile(self, cfg['name'])
1421 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1422 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1423 p.add_local_ts(**cfg['loc_ts'])
1424 p.add_remote_ts(**cfg['rem_ts'])
1425 p.add_responder(cfg['responder'])
1426 p.add_ike_transforms(cfg['ike_ts'])
1427 p.add_esp_transforms(cfg['esp_ts'])
1428 p.add_auth(**cfg['auth'])
1429 p.set_udp_encap(cfg['udp_encap'])
1430 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1431 if 'lifetime_data' in cfg:
1432 p.set_lifetime_data(cfg['lifetime_data'])
1433 if 'tun_itf' in cfg:
1434 p.set_tunnel_interface(cfg['tun_itf'])
1435 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1440 def test_profile_api(self):
1441 """ test profile dump API """
1446 'start_addr': '3.3.3.2',
1447 'end_addr': '3.3.3.3',
1453 'start_addr': '4.5.76.80',
1454 'end_addr': '2.3.4.6',
1461 'start_addr': 'ab::1',
1462 'end_addr': 'ab::4',
1468 'start_addr': 'cd::12',
1469 'end_addr': 'cd::13',
1475 'natt_disabled': True,
1476 'loc_id': ('fqdn', b'vpp.home'),
1477 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1480 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1483 'crypto_key_size': 32,
1488 'crypto_key_size': 24,
1490 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1492 'ipsec_over_udp_port': 4501,
1495 'lifetime_maxdata': 20192,
1496 'lifetime_jitter': 9,
1501 'loc_id': ('ip4-addr', b'192.168.2.1'),
1502 'rem_id': ('ip6-addr', b'abcd::1'),
1505 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1508 'crypto_key_size': 16,
1513 'crypto_key_size': 24,
1515 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1517 'ipsec_over_udp_port': 4600,
1520 self.p1 = self.configure_profile(conf['p1'])
1521 self.p2 = self.configure_profile(conf['p2'])
1523 r = self.vapi.ikev2_profile_dump()
1524 self.assertEqual(len(r), 2)
1525 self.verify_profile(r[0].profile, conf['p1'])
1526 self.verify_profile(r[1].profile, conf['p2'])
1528 def verify_id(self, api_id, cfg_id):
1529 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1530 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1532 def verify_ts(self, api_ts, cfg_ts):
1533 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1534 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1535 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1536 self.assertEqual(api_ts.start_addr,
1537 ip_address(text_type(cfg_ts['start_addr'])))
1538 self.assertEqual(api_ts.end_addr,
1539 ip_address(text_type(cfg_ts['end_addr'])))
1541 def verify_responder(self, api_r, cfg_r):
1542 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1543 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1545 def verify_transforms(self, api_ts, cfg_ts):
1546 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1547 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1548 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1550 def verify_ike_transforms(self, api_ts, cfg_ts):
1551 self.verify_transforms(api_ts, cfg_ts)
1552 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1554 def verify_esp_transforms(self, api_ts, cfg_ts):
1555 self.verify_transforms(api_ts, cfg_ts)
1557 def verify_auth(self, api_auth, cfg_auth):
1558 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1559 self.assertEqual(api_auth.data, cfg_auth['data'])
1560 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1562 def verify_lifetime_data(self, p, ld):
1563 self.assertEqual(p.lifetime, ld['lifetime'])
1564 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1565 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1566 self.assertEqual(p.handover, ld['handover'])
1568 def verify_profile(self, ap, cp):
1569 self.assertEqual(ap.name, cp['name'])
1570 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1571 self.verify_id(ap.loc_id, cp['loc_id'])
1572 self.verify_id(ap.rem_id, cp['rem_id'])
1573 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1574 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1575 self.verify_responder(ap.responder, cp['responder'])
1576 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1577 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1578 self.verify_auth(ap.auth, cp['auth'])
1579 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1580 self.assertTrue(natt_dis == ap.natt_disabled)
1582 if 'lifetime_data' in cp:
1583 self.verify_lifetime_data(ap, cp['lifetime_data'])
1584 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1586 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1588 self.assertEqual(ap.tun_itf, 0xffffffff)
1591 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1592 """ test ikev2 initiator - pre shared key auth """
1594 def config_tc(self):
1595 self.config_params({
1596 'is_initiator': False, # seen from test case perspective
1597 # thus vpp is initiator
1598 'responder': {'sw_if_index': self.pg0.sw_if_index,
1599 'addr': self.pg0.remote_ip4},
1600 'ike-crypto': ('AES-GCM-16ICV', 32),
1601 'ike-integ': 'NULL',
1602 'ike-dh': '3072MODPgr',
1604 'crypto_alg': 20, # "aes-gcm-16"
1605 'crypto_key_size': 256,
1606 'dh_group': 15, # "modp-3072"
1609 'crypto_alg': 12, # "aes-cbc"
1610 'crypto_key_size': 256,
1611 # "hmac-sha2-256-128"
1615 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1616 """ test initiator - request window size (1) """
1618 def rekey_respond(self, req, update_child_sa_data):
1619 ih = self.get_ike_header(req)
1620 plain = self.sa.hmac_and_decrypt(ih)
1621 sa = ikev2.IKEv2_payload_SA(plain)
1622 if update_child_sa_data:
1623 prop = sa[ikev2.IKEv2_payload_Proposal]
1624 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1625 self.sa.r_nonce = self.sa.i_nonce
1626 self.sa.child_sas[0].ispi = prop.SPI
1627 self.sa.child_sas[0].rspi = prop.SPI
1628 self.sa.calc_child_keys()
1630 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1631 flags='Response', exch_type=36,
1632 id=ih.id, next_payload='Encrypted')
1633 resp = self.encrypt_ike_msg(header, sa, 'SA')
1634 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1635 self.sa.dport, self.sa.natt, self.ip6)
1636 self.send_and_assert_no_replies(self.pg0, packet)
1638 def test_initiator(self):
1639 super(TestInitiatorRequestWindowSize, self).test_initiator()
1640 self.pg0.enable_capture()
1642 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1643 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1644 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1645 capture = self.pg0.get_capture(2)
1647 # reply in reverse order
1648 self.rekey_respond(capture[1], True)
1649 self.rekey_respond(capture[0], False)
1651 # verify that only the second request was accepted
1652 self.verify_ike_sas()
1653 self.verify_ipsec_sas(is_rekey=True)
1656 class TestInitiatorRekey(TestInitiatorPsk):
1657 """ test ikev2 initiator - rekey """
1659 def rekey_from_initiator(self):
1660 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1661 self.pg0.enable_capture()
1663 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1664 capture = self.pg0.get_capture(1)
1665 ih = self.get_ike_header(capture[0])
1666 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1667 self.assertNotIn('Response', ih.flags)
1668 self.assertIn('Initiator', ih.flags)
1669 plain = self.sa.hmac_and_decrypt(ih)
1670 sa = ikev2.IKEv2_payload_SA(plain)
1671 prop = sa[ikev2.IKEv2_payload_Proposal]
1672 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1673 self.sa.r_nonce = self.sa.i_nonce
1674 # update new responder SPI
1675 self.sa.child_sas[0].ispi = prop.SPI
1676 self.sa.child_sas[0].rspi = prop.SPI
1677 self.sa.calc_child_keys()
1678 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1679 flags='Response', exch_type=36,
1680 id=ih.id, next_payload='Encrypted')
1681 resp = self.encrypt_ike_msg(header, sa, 'SA')
1682 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1683 self.sa.dport, self.sa.natt, self.ip6)
1684 self.send_and_assert_no_replies(self.pg0, packet)
1686 def test_initiator(self):
1687 super(TestInitiatorRekey, self).test_initiator()
1688 self.rekey_from_initiator()
1689 self.verify_ike_sas()
1690 self.verify_ipsec_sas(is_rekey=True)
1693 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1694 """ test ikev2 initiator - delete IKE SA from responder """
1696 def config_tc(self):
1697 self.config_params({
1698 'del_sa_from_responder': True,
1699 'is_initiator': False, # seen from test case perspective
1700 # thus vpp is initiator
1701 'responder': {'sw_if_index': self.pg0.sw_if_index,
1702 'addr': self.pg0.remote_ip4},
1703 'ike-crypto': ('AES-GCM-16ICV', 32),
1704 'ike-integ': 'NULL',
1705 'ike-dh': '3072MODPgr',
1707 'crypto_alg': 20, # "aes-gcm-16"
1708 'crypto_key_size': 256,
1709 'dh_group': 15, # "modp-3072"
1712 'crypto_alg': 12, # "aes-cbc"
1713 'crypto_key_size': 256,
1714 # "hmac-sha2-256-128"
1718 class TestResponderNATT(TemplateResponder, Ikev2Params):
1719 """ test ikev2 responder - nat traversal """
1720 def config_tc(self):
1725 class TestResponderPsk(TemplateResponder, Ikev2Params):
1726 """ test ikev2 responder - pre shared key auth """
1727 def config_tc(self):
1728 self.config_params()
1731 class TestResponderDpd(TestResponderPsk):
1733 Dead peer detection test
1735 def config_tc(self):
1736 self.config_params({'dpd_disabled': False})
1741 def test_responder(self):
1742 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1743 super(TestResponderDpd, self).test_responder()
1744 self.pg0.enable_capture()
1746 # capture empty request but don't reply
1747 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1748 ih = self.get_ike_header(capture[0])
1749 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1750 plain = self.sa.hmac_and_decrypt(ih)
1751 self.assertEqual(plain, b'')
1752 # wait for SA expiration
1754 ike_sas = self.vapi.ikev2_sa_dump()
1755 self.assertEqual(len(ike_sas), 0)
1756 ipsec_sas = self.vapi.ipsec_sa_dump()
1757 self.assertEqual(len(ipsec_sas), 0)
1760 class TestResponderRekey(TestResponderPsk):
1761 """ test ikev2 responder - rekey """
1763 def rekey_from_initiator(self):
1764 packet = self.create_rekey_request()
1765 self.pg0.add_stream(packet)
1766 self.pg0.enable_capture()
1768 capture = self.pg0.get_capture(1)
1769 ih = self.get_ike_header(capture[0])
1770 plain = self.sa.hmac_and_decrypt(ih)
1771 sa = ikev2.IKEv2_payload_SA(plain)
1772 prop = sa[ikev2.IKEv2_payload_Proposal]
1773 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1774 # update new responder SPI
1775 self.sa.child_sas[0].rspi = prop.SPI
1777 def test_responder(self):
1778 super(TestResponderRekey, self).test_responder()
1779 self.rekey_from_initiator()
1780 self.sa.calc_child_keys()
1781 self.verify_ike_sas()
1782 self.verify_ipsec_sas(is_rekey=True)
1785 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1786 """ test ikev2 responder - cert based auth """
1787 def config_tc(self):
1788 self.config_params({
1791 'server-key': 'server-key.pem',
1792 'client-key': 'client-key.pem',
1793 'client-cert': 'client-cert.pem',
1794 'server-cert': 'server-cert.pem'})
1797 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1798 (TemplateResponder, Ikev2Params):
1800 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1802 def config_tc(self):
1803 self.config_params({
1804 'ike-crypto': ('AES-CBC', 16),
1805 'ike-integ': 'SHA2-256-128',
1806 'esp-crypto': ('AES-CBC', 24),
1807 'esp-integ': 'SHA2-384-192',
1808 'ike-dh': '2048MODPgr'})
1811 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1812 (TemplateResponder, Ikev2Params):
1814 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1816 def config_tc(self):
1817 self.config_params({
1818 'ike-crypto': ('AES-CBC', 32),
1819 'ike-integ': 'SHA2-256-128',
1820 'esp-crypto': ('AES-GCM-16ICV', 32),
1821 'esp-integ': 'NULL',
1822 'ike-dh': '3072MODPgr'})
1825 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1829 def config_tc(self):
1830 self.config_params({
1831 'del_sa_from_responder': True,
1834 'ike-crypto': ('AES-GCM-16ICV', 32),
1835 'ike-integ': 'NULL',
1836 'ike-dh': '2048MODPgr',
1837 'loc_ts': {'start_addr': 'ab:cd::0',
1838 'end_addr': 'ab:cd::10'},
1839 'rem_ts': {'start_addr': '11::0',
1840 'end_addr': '11::100'}})
1843 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1845 Test for keep alive messages
1848 def send_empty_req_from_responder(self):
1849 packet = self.create_empty_request()
1850 self.pg0.add_stream(packet)
1851 self.pg0.enable_capture()
1853 capture = self.pg0.get_capture(1)
1854 ih = self.get_ike_header(capture[0])
1855 self.assertEqual(ih.id, self.sa.msg_id)
1856 plain = self.sa.hmac_and_decrypt(ih)
1857 self.assertEqual(plain, b'')
1859 def test_initiator(self):
1860 super(TestInitiatorKeepaliveMsg, self).test_initiator()
1861 self.send_empty_req_from_responder()
1864 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1865 """ malformed packet test """
1870 def config_tc(self):
1871 self.config_params()
1873 def assert_counter(self, count, name, version='ip4'):
1874 node_name = '/err/ikev2-%s/' % version + name
1875 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1877 def create_ike_init_msg(self, length=None, payload=None):
1878 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1879 flags='Initiator', exch_type='IKE_SA_INIT')
1880 if payload is not None:
1882 return self.create_packet(self.pg0, msg, self.sa.sport,
1885 def verify_bad_packet_length(self):
1886 ike_msg = self.create_ike_init_msg(length=0xdead)
1887 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1888 self.assert_counter(self.pkt_count, 'Bad packet length')
1890 def verify_bad_sa_payload_length(self):
1891 p = ikev2.IKEv2_payload_SA(length=0xdead)
1892 ike_msg = self.create_ike_init_msg(payload=p)
1893 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1894 self.assert_counter(self.pkt_count, 'Malformed packet')
1896 def test_responder(self):
1897 self.pkt_count = 254
1898 self.verify_bad_packet_length()
1899 self.verify_bad_sa_payload_length()
1902 if __name__ == '__main__':
1903 unittest.main(testRunner=VppTestRunner)