ikev2: fix issue when sending multiple requests at once
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 from scapy.layers.ipsec import ESP
16 from scapy.layers.inet import IP, UDP, Ether
17 from scapy.layers.inet6 import IPv6
18 from scapy.packet import raw, Raw
19 from scapy.utils import long_converter
20 from framework import VppTestCase, VppTestRunner
21 from vpp_ikev2 import Profile, IDType, AuthMethod
22 from vpp_papi import VppEnum
23
24 try:
25     text_type = unicode
26 except NameError:
27     text_type = str
28
29 KEY_PAD = b"Key Pad for IKEv2"
30 SALT_SIZE = 4
31 GCM_ICV_SIZE = 16
32 GCM_IV_SIZE = 8
33
34
35 # defined in rfc3526
36 # tuple structure is (p, g, key_len)
37 DH = {
38     '2048MODPgr': (long_converter("""
39     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
40     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
41     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
42     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
43     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
44     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
45     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
46     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
47     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
48     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
49     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
50
51     '3072MODPgr': (long_converter("""
52     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
63     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
64     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
65     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
66     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
67     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
68 }
69
70
71 class CryptoAlgo(object):
72     def __init__(self, name, cipher, mode):
73         self.name = name
74         self.cipher = cipher
75         self.mode = mode
76         if self.cipher is not None:
77             self.bs = self.cipher.block_size // 8
78
79             if self.name == 'AES-GCM-16ICV':
80                 self.iv_len = GCM_IV_SIZE
81             else:
82                 self.iv_len = self.bs
83
84     def encrypt(self, data, key, aad=None):
85         iv = os.urandom(self.iv_len)
86         if aad is None:
87             encryptor = Cipher(self.cipher(key), self.mode(iv),
88                                default_backend()).encryptor()
89             return iv + encryptor.update(data) + encryptor.finalize()
90         else:
91             salt = key[-SALT_SIZE:]
92             nonce = salt + iv
93             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
94                                default_backend()).encryptor()
95             encryptor.authenticate_additional_data(aad)
96             data = encryptor.update(data) + encryptor.finalize()
97             data += encryptor.tag[:GCM_ICV_SIZE]
98             return iv + data
99
100     def decrypt(self, data, key, aad=None, icv=None):
101         if aad is None:
102             iv = data[:self.iv_len]
103             ct = data[self.iv_len:]
104             decryptor = Cipher(algorithms.AES(key),
105                                self.mode(iv),
106                                default_backend()).decryptor()
107             return decryptor.update(ct) + decryptor.finalize()
108         else:
109             salt = key[-SALT_SIZE:]
110             nonce = salt + data[:GCM_IV_SIZE]
111             ct = data[GCM_IV_SIZE:]
112             key = key[:-SALT_SIZE]
113             decryptor = Cipher(algorithms.AES(key),
114                                self.mode(nonce, icv, len(icv)),
115                                default_backend()).decryptor()
116             decryptor.authenticate_additional_data(aad)
117             return decryptor.update(ct) + decryptor.finalize()
118
119     def pad(self, data):
120         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121         data = data + b'\x00' * (pad_len - 1)
122         return data + bytes([pad_len - 1])
123
124
125 class AuthAlgo(object):
126     def __init__(self, name, mac, mod, key_len, trunc_len=None):
127         self.name = name
128         self.mac = mac
129         self.mod = mod
130         self.key_len = key_len
131         self.trunc_len = trunc_len or key_len
132
133
134 CRYPTO_ALGOS = {
135     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
138                                 mode=modes.GCM),
139 }
140
141 AUTH_ALGOS = {
142     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
147 }
148
149 PRF_ALGOS = {
150     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
152                                   hashes.SHA256, 32),
153 }
154
155 CRYPTO_IDS = {
156     12: 'AES-CBC',
157     20: 'AES-GCM-16ICV',
158 }
159
160 INTEG_IDS = {
161     2: 'HMAC-SHA1-96',
162     12: 'SHA2-256-128',
163     13: 'SHA2-384-192',
164     14: 'SHA2-512-256',
165 }
166
167
168 class IKEv2ChildSA(object):
169     def __init__(self, local_ts, remote_ts, is_initiator):
170         spi = os.urandom(4)
171         if is_initiator:
172             self.ispi = spi
173             self.rspi = None
174         else:
175             self.rspi = spi
176             self.ispi = None
177         self.local_ts = local_ts
178         self.remote_ts = remote_ts
179
180
181 class IKEv2SA(object):
182     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
183                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
184                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
185                  auth_method='shared-key', priv_key=None, natt=False,
186                  udp_encap=False):
187         self.udp_encap = udp_encap
188         self.natt = natt
189         if natt:
190             self.sport = 4500
191             self.dport = 4500
192         else:
193             self.sport = 500
194             self.dport = 500
195         self.msg_id = 0
196         self.dh_params = None
197         self.test = test
198         self.priv_key = priv_key
199         self.is_initiator = is_initiator
200         nonce = nonce or os.urandom(32)
201         self.auth_data = auth_data
202         self.i_id = i_id
203         self.r_id = r_id
204         if isinstance(id_type, str):
205             self.id_type = IDType.value(id_type)
206         else:
207             self.id_type = id_type
208         self.auth_method = auth_method
209         if self.is_initiator:
210             self.rspi = 8 * b'\x00'
211             self.ispi = spi
212             self.i_nonce = nonce
213         else:
214             self.rspi = spi
215             self.ispi = 8 * b'\x00'
216             self.r_nonce = nonce
217         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
218                           self.is_initiator)]
219
220     def new_msg_id(self):
221         self.msg_id += 1
222         return self.msg_id
223
224     @property
225     def my_dh_pub_key(self):
226         if self.is_initiator:
227             return self.i_dh_data
228         return self.r_dh_data
229
230     @property
231     def peer_dh_pub_key(self):
232         if self.is_initiator:
233             return self.r_dh_data
234         return self.i_dh_data
235
236     def compute_secret(self):
237         priv = self.dh_private_key
238         peer = self.peer_dh_pub_key
239         p, g, l = self.ike_group
240         return pow(int.from_bytes(peer, 'big'),
241                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
242
243     def generate_dh_data(self):
244         # generate DH keys
245         if self.ike_dh not in DH:
246             raise NotImplementedError('%s not in DH group' % self.ike_dh)
247
248         if self.dh_params is None:
249             dhg = DH[self.ike_dh]
250             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
251             self.dh_params = pn.parameters(default_backend())
252
253         priv = self.dh_params.generate_private_key()
254         pub = priv.public_key()
255         x = priv.private_numbers().x
256         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
257         y = pub.public_numbers().y
258
259         if self.is_initiator:
260             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
261         else:
262             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
263
264     def complete_dh_data(self):
265         self.dh_shared_secret = self.compute_secret()
266
267     def calc_child_keys(self):
268         prf = self.ike_prf_alg.mod()
269         s = self.i_nonce + self.r_nonce
270         c = self.child_sas[0]
271
272         encr_key_len = self.esp_crypto_key_len
273         integ_key_len = self.esp_integ_alg.key_len
274         salt_len = 0 if integ_key_len else 4
275
276         l = (integ_key_len * 2 +
277              encr_key_len * 2 +
278              salt_len * 2)
279         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
280
281         pos = 0
282         c.sk_ei = keymat[pos:pos+encr_key_len]
283         pos += encr_key_len
284
285         if integ_key_len:
286             c.sk_ai = keymat[pos:pos+integ_key_len]
287             pos += integ_key_len
288         else:
289             c.salt_ei = keymat[pos:pos+salt_len]
290             pos += salt_len
291
292         c.sk_er = keymat[pos:pos+encr_key_len]
293         pos += encr_key_len
294
295         if integ_key_len:
296             c.sk_ar = keymat[pos:pos+integ_key_len]
297             pos += integ_key_len
298         else:
299             c.salt_er = keymat[pos:pos+salt_len]
300             pos += salt_len
301
302     def calc_prfplus(self, prf, key, seed, length):
303         r = b''
304         t = None
305         x = 1
306         while len(r) < length and x < 255:
307             if t is not None:
308                 s = t
309             else:
310                 s = b''
311             s = s + seed + bytes([x])
312             t = self.calc_prf(prf, key, s)
313             r = r + t
314             x = x + 1
315
316         if x == 255:
317             return None
318         return r
319
320     def calc_prf(self, prf, key, data):
321         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
322         h.update(data)
323         return h.finalize()
324
325     def calc_keys(self):
326         prf = self.ike_prf_alg.mod()
327         # SKEYSEED = prf(Ni | Nr, g^ir)
328         s = self.i_nonce + self.r_nonce
329         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
330
331         # calculate S = Ni | Nr | SPIi SPIr
332         s = s + self.ispi + self.rspi
333
334         prf_key_trunc = self.ike_prf_alg.trunc_len
335         encr_key_len = self.ike_crypto_key_len
336         tr_prf_key_len = self.ike_prf_alg.key_len
337         integ_key_len = self.ike_integ_alg.key_len
338         if integ_key_len == 0:
339             salt_size = 4
340         else:
341             salt_size = 0
342
343         l = (prf_key_trunc +
344              integ_key_len * 2 +
345              encr_key_len * 2 +
346              tr_prf_key_len * 2 +
347              salt_size * 2)
348         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
349
350         pos = 0
351         self.sk_d = keymat[:pos+prf_key_trunc]
352         pos += prf_key_trunc
353
354         self.sk_ai = keymat[pos:pos+integ_key_len]
355         pos += integ_key_len
356         self.sk_ar = keymat[pos:pos+integ_key_len]
357         pos += integ_key_len
358
359         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
360         pos += encr_key_len + salt_size
361         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
362         pos += encr_key_len + salt_size
363
364         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
365         pos += tr_prf_key_len
366         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
367
368     def generate_authmsg(self, prf, packet):
369         if self.is_initiator:
370             id = self.i_id
371             nonce = self.r_nonce
372             key = self.sk_pi
373         else:
374             id = self.r_id
375             nonce = self.i_nonce
376             key = self.sk_pr
377         data = bytes([self.id_type, 0, 0, 0]) + id
378         id_hash = self.calc_prf(prf, key, data)
379         return packet + nonce + id_hash
380
381     def auth_init(self):
382         prf = self.ike_prf_alg.mod()
383         if self.is_initiator:
384             packet = self.init_req_packet
385         else:
386             packet = self.init_resp_packet
387         authmsg = self.generate_authmsg(prf, raw(packet))
388         if self.auth_method == 'shared-key':
389             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
390             self.auth_data = self.calc_prf(prf, psk, authmsg)
391         elif self.auth_method == 'rsa-sig':
392             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
393                                                 hashes.SHA1())
394         else:
395             raise TypeError('unknown auth method type!')
396
397     def encrypt(self, data, aad=None):
398         data = self.ike_crypto_alg.pad(data)
399         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
400
401     @property
402     def peer_authkey(self):
403         if self.is_initiator:
404             return self.sk_ar
405         return self.sk_ai
406
407     @property
408     def my_authkey(self):
409         if self.is_initiator:
410             return self.sk_ai
411         return self.sk_ar
412
413     @property
414     def my_cryptokey(self):
415         if self.is_initiator:
416             return self.sk_ei
417         return self.sk_er
418
419     @property
420     def peer_cryptokey(self):
421         if self.is_initiator:
422             return self.sk_er
423         return self.sk_ei
424
425     def concat(self, alg, key_len):
426         return alg + '-' + str(key_len * 8)
427
428     @property
429     def vpp_ike_cypto_alg(self):
430         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
431
432     @property
433     def vpp_esp_cypto_alg(self):
434         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
435
436     def verify_hmac(self, ikemsg):
437         integ_trunc = self.ike_integ_alg.trunc_len
438         exp_hmac = ikemsg[-integ_trunc:]
439         data = ikemsg[:-integ_trunc]
440         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
441                                           self.peer_authkey, data)
442         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
443
444     def compute_hmac(self, integ, key, data):
445         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
446         h.update(data)
447         return h.finalize()
448
449     def decrypt(self, data, aad=None, icv=None):
450         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
451
452     def hmac_and_decrypt(self, ike):
453         ep = ike[ikev2.IKEv2_payload_Encrypted]
454         if self.ike_crypto == 'AES-GCM-16ICV':
455             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
456             ct = ep.load[:-GCM_ICV_SIZE]
457             tag = ep.load[-GCM_ICV_SIZE:]
458             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
459         else:
460             self.verify_hmac(raw(ike))
461             integ_trunc = self.ike_integ_alg.trunc_len
462
463             # remove ICV and decrypt payload
464             ct = ep.load[:-integ_trunc]
465             plain = self.decrypt(ct)
466         # remove padding
467         pad_len = plain[-1]
468         return plain[:-pad_len - 1]
469
470     def build_ts_addr(self, ts, version):
471         return {'starting_address_v' + version: ts['start_addr'],
472                 'ending_address_v' + version: ts['end_addr']}
473
474     def generate_ts(self, is_ip4):
475         c = self.child_sas[0]
476         ts_data = {'IP_protocol_ID': 0,
477                    'start_port': 0,
478                    'end_port': 0xffff}
479         if is_ip4:
480             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
481             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
482             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
483             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
484         else:
485             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
486             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
487             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
488             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
489
490         if self.is_initiator:
491             return ([ts1], [ts2])
492         return ([ts2], [ts1])
493
494     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
495         if crypto not in CRYPTO_ALGOS:
496             raise TypeError('unsupported encryption algo %r' % crypto)
497         self.ike_crypto = crypto
498         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
499         self.ike_crypto_key_len = crypto_key_len
500
501         if integ not in AUTH_ALGOS:
502             raise TypeError('unsupported auth algo %r' % integ)
503         self.ike_integ = None if integ == 'NULL' else integ
504         self.ike_integ_alg = AUTH_ALGOS[integ]
505
506         if prf not in PRF_ALGOS:
507             raise TypeError('unsupported prf algo %r' % prf)
508         self.ike_prf = prf
509         self.ike_prf_alg = PRF_ALGOS[prf]
510         self.ike_dh = dh
511         self.ike_group = DH[self.ike_dh]
512
513     def set_esp_props(self, crypto, crypto_key_len, integ):
514         self.esp_crypto_key_len = crypto_key_len
515         if crypto not in CRYPTO_ALGOS:
516             raise TypeError('unsupported encryption algo %r' % crypto)
517         self.esp_crypto = crypto
518         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
519
520         if integ not in AUTH_ALGOS:
521             raise TypeError('unsupported auth algo %r' % integ)
522         self.esp_integ = None if integ == 'NULL' else integ
523         self.esp_integ_alg = AUTH_ALGOS[integ]
524
525     def crypto_attr(self, key_len):
526         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
527             return (0x800e << 16 | key_len << 3, 12)
528         else:
529             raise Exception('unsupported attribute type')
530
531     def ike_crypto_attr(self):
532         return self.crypto_attr(self.ike_crypto_key_len)
533
534     def esp_crypto_attr(self):
535         return self.crypto_attr(self.esp_crypto_key_len)
536
537     def compute_nat_sha1(self, ip, port, rspi=None):
538         if rspi is None:
539             rspi = self.rspi
540         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
541         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
542         digest.update(data)
543         return digest.finalize()
544
545
546 class IkePeer(VppTestCase):
547     """ common class for initiator and responder """
548
549     @classmethod
550     def setUpClass(cls):
551         import scapy.contrib.ikev2 as _ikev2
552         globals()['ikev2'] = _ikev2
553         super(IkePeer, cls).setUpClass()
554         cls.create_pg_interfaces(range(2))
555         for i in cls.pg_interfaces:
556             i.admin_up()
557             i.config_ip4()
558             i.resolve_arp()
559             i.config_ip6()
560             i.resolve_ndp()
561
562     @classmethod
563     def tearDownClass(cls):
564         super(IkePeer, cls).tearDownClass()
565
566     def tearDown(self):
567         super(IkePeer, self).tearDown()
568         if self.del_sa_from_responder:
569             self.initiate_del_sa_from_responder()
570         else:
571             self.initiate_del_sa_from_initiator()
572         r = self.vapi.ikev2_sa_dump()
573         self.assertEqual(len(r), 0)
574         sas = self.vapi.ipsec_sa_dump()
575         self.assertEqual(len(sas), 0)
576         self.p.remove_vpp_config()
577         self.assertIsNone(self.p.query_vpp_config())
578
579     def setUp(self):
580         super(IkePeer, self).setUp()
581         self.config_tc()
582         self.p.add_vpp_config()
583         self.assertIsNotNone(self.p.query_vpp_config())
584         if self.sa.is_initiator:
585             self.sa.generate_dh_data()
586         self.vapi.cli('ikev2 set logging level 4')
587         self.vapi.cli('event-lo clear')
588
589     def create_rekey_request(self):
590         sa, first_payload = self.generate_auth_payload(is_rekey=True)
591         header = ikev2.IKEv2(
592                 init_SPI=self.sa.ispi,
593                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
594                 flags='Initiator', exch_type='CREATE_CHILD_SA')
595
596         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
597         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
598                                   self.sa.dport, self.sa.natt, self.ip6)
599
600     def create_empty_request(self):
601         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
602                              id=self.sa.new_msg_id(), flags='Initiator',
603                              exch_type='INFORMATIONAL',
604                              next_payload='Encrypted')
605
606         msg = self.encrypt_ike_msg(header, b'', None)
607         return self.create_packet(self.pg0, msg, self.sa.sport,
608                                   self.sa.dport, self.sa.natt, self.ip6)
609
610     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
611                       use_ip6=False):
612         if use_ip6:
613             src_ip = src_if.remote_ip6
614             dst_ip = src_if.local_ip6
615             ip_layer = IPv6
616         else:
617             src_ip = src_if.remote_ip4
618             dst_ip = src_if.local_ip4
619             ip_layer = IP
620         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
621                ip_layer(src=src_ip, dst=dst_ip) /
622                UDP(sport=sport, dport=dport))
623         if natt:
624             # insert non ESP marker
625             res = res / Raw(b'\x00' * 4)
626         return res / msg
627
628     def verify_udp(self, udp):
629         self.assertEqual(udp.sport, self.sa.sport)
630         self.assertEqual(udp.dport, self.sa.dport)
631
632     def get_ike_header(self, packet):
633         try:
634             ih = packet[ikev2.IKEv2]
635         except IndexError as e:
636             # this is a workaround for getting IKEv2 layer as both ikev2 and
637             # ipsec register for port 4500
638             esp = packet[ESP]
639             ih = self.verify_and_remove_non_esp_marker(esp)
640         self.assertEqual(ih.version, 0x20)
641         self.assertNotIn('Version', ih.flags)
642         return ih
643
644     def verify_and_remove_non_esp_marker(self, packet):
645         if self.sa.natt:
646             # if we are in nat traversal mode check for non esp marker
647             # and remove it
648             data = raw(packet)
649             self.assertEqual(data[:4], b'\x00' * 4)
650             return ikev2.IKEv2(data[4:])
651         else:
652             return packet
653
654     def encrypt_ike_msg(self, header, plain, first_payload):
655         if self.sa.ike_crypto == 'AES-GCM-16ICV':
656             data = self.sa.ike_crypto_alg.pad(raw(plain))
657             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
658                 len(ikev2.IKEv2_payload_Encrypted())
659             tlen = plen + len(ikev2.IKEv2())
660
661             # prepare aad data
662             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
663                                                  length=plen)
664             header.length = tlen
665             res = header / sk_p
666             encr = self.sa.encrypt(raw(plain), raw(res))
667             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
668                                                  length=plen, load=encr)
669             res = header / sk_p
670         else:
671             encr = self.sa.encrypt(raw(plain))
672             trunc_len = self.sa.ike_integ_alg.trunc_len
673             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
674             tlen = plen + len(ikev2.IKEv2())
675
676             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
677                                                  length=plen, load=encr)
678             header.length = tlen
679             res = header / sk_p
680
681             integ_data = raw(res)
682             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
683                                              self.sa.my_authkey, integ_data)
684             res = res / Raw(hmac_data[:trunc_len])
685         assert(len(res) == tlen)
686         return res
687
688     def verify_udp_encap(self, ipsec_sa):
689         e = VppEnum.vl_api_ipsec_sad_flags_t
690         if self.sa.udp_encap or self.sa.natt:
691             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
692         else:
693             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
694
695     def verify_ipsec_sas(self, is_rekey=False):
696         sas = self.vapi.ipsec_sa_dump()
697         if is_rekey:
698             # after rekey there is a short period of time in which old
699             # inbound SA is still present
700             sa_count = 3
701         else:
702             sa_count = 2
703         self.assertEqual(len(sas), sa_count)
704         if self.sa.is_initiator:
705             if is_rekey:
706                 sa0 = sas[0].entry
707                 sa1 = sas[2].entry
708             else:
709                 sa0 = sas[0].entry
710                 sa1 = sas[1].entry
711         else:
712             if is_rekey:
713                 sa0 = sas[2].entry
714                 sa1 = sas[0].entry
715             else:
716                 sa1 = sas[0].entry
717                 sa0 = sas[1].entry
718
719         c = self.sa.child_sas[0]
720
721         self.verify_udp_encap(sa0)
722         self.verify_udp_encap(sa1)
723         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
724         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
725         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
726
727         if self.sa.esp_integ is None:
728             vpp_integ_alg = 0
729         else:
730             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
731         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
732         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
733
734         # verify crypto keys
735         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
736         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
737         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
738         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
739
740         # verify integ keys
741         if vpp_integ_alg:
742             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
743             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
744             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
745             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
746         else:
747             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
748             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
749
750     def verify_keymat(self, api_keys, keys, name):
751         km = getattr(keys, name)
752         api_km = getattr(api_keys, name)
753         api_km_len = getattr(api_keys, name + '_len')
754         self.assertEqual(len(km), api_km_len)
755         self.assertEqual(km, api_km[:api_km_len])
756
757     def verify_id(self, api_id, exp_id):
758         self.assertEqual(api_id.type, IDType.value(exp_id.type))
759         self.assertEqual(api_id.data_len, exp_id.data_len)
760         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
761
762     def verify_ike_sas(self):
763         r = self.vapi.ikev2_sa_dump()
764         self.assertEqual(len(r), 1)
765         sa = r[0].sa
766         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
767         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
768         if self.ip6:
769             if self.sa.is_initiator:
770                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
771                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
772             else:
773                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
774                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
775         else:
776             if self.sa.is_initiator:
777                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
778                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
779             else:
780                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
781                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
782         self.verify_keymat(sa.keys, self.sa, 'sk_d')
783         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
784         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
785         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
786         self.verify_keymat(sa.keys, self.sa, 'sk_er')
787         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
788         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
789
790         self.assertEqual(sa.i_id.type, self.sa.id_type)
791         self.assertEqual(sa.r_id.type, self.sa.id_type)
792         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
793         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
794         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
795         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
796
797         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
798         self.assertEqual(len(r), 1)
799         csa = r[0].child_sa
800         self.assertEqual(csa.sa_index, sa.sa_index)
801         c = self.sa.child_sas[0]
802         if hasattr(c, 'sk_ai'):
803             self.verify_keymat(csa.keys, c, 'sk_ai')
804             self.verify_keymat(csa.keys, c, 'sk_ar')
805         self.verify_keymat(csa.keys, c, 'sk_ei')
806         self.verify_keymat(csa.keys, c, 'sk_er')
807         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
808         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
809
810         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
811         tsi = tsi[0]
812         tsr = tsr[0]
813         r = self.vapi.ikev2_traffic_selector_dump(
814                 is_initiator=True, sa_index=sa.sa_index,
815                 child_sa_index=csa.child_sa_index)
816         self.assertEqual(len(r), 1)
817         ts = r[0].ts
818         self.verify_ts(r[0].ts, tsi[0], True)
819
820         r = self.vapi.ikev2_traffic_selector_dump(
821                 is_initiator=False, sa_index=sa.sa_index,
822                 child_sa_index=csa.child_sa_index)
823         self.assertEqual(len(r), 1)
824         self.verify_ts(r[0].ts, tsr[0], False)
825
826         n = self.vapi.ikev2_nonce_get(is_initiator=True,
827                                       sa_index=sa.sa_index)
828         self.verify_nonce(n, self.sa.i_nonce)
829         n = self.vapi.ikev2_nonce_get(is_initiator=False,
830                                       sa_index=sa.sa_index)
831         self.verify_nonce(n, self.sa.r_nonce)
832
833     def verify_nonce(self, api_nonce, nonce):
834         self.assertEqual(api_nonce.data_len, len(nonce))
835         self.assertEqual(api_nonce.nonce, nonce)
836
837     def verify_ts(self, api_ts, ts, is_initiator):
838         if is_initiator:
839             self.assertTrue(api_ts.is_local)
840         else:
841             self.assertFalse(api_ts.is_local)
842
843         if self.p.ts_is_ip4:
844             self.assertEqual(api_ts.start_addr,
845                              IPv4Address(ts.starting_address_v4))
846             self.assertEqual(api_ts.end_addr,
847                              IPv4Address(ts.ending_address_v4))
848         else:
849             self.assertEqual(api_ts.start_addr,
850                              IPv6Address(ts.starting_address_v6))
851             self.assertEqual(api_ts.end_addr,
852                              IPv6Address(ts.ending_address_v6))
853         self.assertEqual(api_ts.start_port, ts.start_port)
854         self.assertEqual(api_ts.end_port, ts.end_port)
855         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
856
857
858 class TemplateInitiator(IkePeer):
859     """ initiator test template """
860
861     def initiate_del_sa_from_initiator(self):
862         ispi = int.from_bytes(self.sa.ispi, 'little')
863         self.pg0.enable_capture()
864         self.pg_start()
865         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
866         capture = self.pg0.get_capture(1)
867         ih = self.get_ike_header(capture[0])
868         self.assertNotIn('Response', ih.flags)
869         self.assertIn('Initiator', ih.flags)
870         self.assertEqual(ih.init_SPI, self.sa.ispi)
871         self.assertEqual(ih.resp_SPI, self.sa.rspi)
872         plain = self.sa.hmac_and_decrypt(ih)
873         d = ikev2.IKEv2_payload_Delete(plain)
874         self.assertEqual(d.proto, 1)  # proto=IKEv2
875         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
876                              flags='Response', exch_type='INFORMATIONAL',
877                              id=ih.id, next_payload='Encrypted')
878         resp = self.encrypt_ike_msg(header, b'', None)
879         self.send_and_assert_no_replies(self.pg0, resp)
880
881     def verify_del_sa(self, packet):
882         ih = self.get_ike_header(packet)
883         self.assertEqual(ih.id, self.sa.msg_id)
884         self.assertEqual(ih.exch_type, 37)  # exchange informational
885         self.assertIn('Response', ih.flags)
886         self.assertIn('Initiator', ih.flags)
887         plain = self.sa.hmac_and_decrypt(ih)
888         self.assertEqual(plain, b'')
889
890     def initiate_del_sa_from_responder(self):
891         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
892                              exch_type='INFORMATIONAL',
893                              id=self.sa.new_msg_id())
894         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
895         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
896         packet = self.create_packet(self.pg0, ike_msg,
897                                     self.sa.sport, self.sa.dport,
898                                     self.sa.natt, self.ip6)
899         self.pg0.add_stream(packet)
900         self.pg0.enable_capture()
901         self.pg_start()
902         capture = self.pg0.get_capture(1)
903         self.verify_del_sa(capture[0])
904
905     @staticmethod
906     def find_notify_payload(packet, notify_type):
907         n = packet[ikev2.IKEv2_payload_Notify]
908         while n is not None:
909             if n.type == notify_type:
910                 return n
911             n = n.payload
912         return None
913
914     def verify_nat_detection(self, packet):
915         if self.ip6:
916             iph = packet[IPv6]
917         else:
918             iph = packet[IP]
919         udp = packet[UDP]
920
921         # NAT_DETECTION_SOURCE_IP
922         s = self.find_notify_payload(packet, 16388)
923         self.assertIsNotNone(s)
924         src_sha = self.sa.compute_nat_sha1(
925                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
926         self.assertEqual(s.load, src_sha)
927
928         # NAT_DETECTION_DESTINATION_IP
929         s = self.find_notify_payload(packet, 16389)
930         self.assertIsNotNone(s)
931         dst_sha = self.sa.compute_nat_sha1(
932                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
933         self.assertEqual(s.load, dst_sha)
934
935     def verify_sa_init_request(self, packet):
936         ih = packet[ikev2.IKEv2]
937         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
938         self.assertEqual(ih.exch_type, 34)  # SA_INIT
939         self.sa.ispi = ih.init_SPI
940         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
941         self.assertIn('Initiator', ih.flags)
942         self.assertNotIn('Response', ih.flags)
943         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
944         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
945
946         prop = packet[ikev2.IKEv2_payload_Proposal]
947         self.assertEqual(prop.proto, 1)  # proto = ikev2
948         self.assertEqual(prop.proposal, 1)
949         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
950         self.assertEqual(prop.trans[0].transform_id,
951                          self.p.ike_transforms['crypto_alg'])
952         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
953         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
954         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
955         self.assertEqual(prop.trans[2].transform_id,
956                          self.p.ike_transforms['dh_group'])
957
958         self.verify_nat_detection(packet)
959         self.sa.set_ike_props(
960                     crypto='AES-GCM-16ICV', crypto_key_len=32,
961                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
962         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
963                               integ='SHA2-256-128')
964         self.sa.generate_dh_data()
965         self.sa.complete_dh_data()
966         self.sa.calc_keys()
967
968     def update_esp_transforms(self, trans, sa):
969         while trans:
970             if trans.transform_type == 1:  # ecryption
971                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
972             elif trans.transform_type == 3:  # integrity
973                 sa.esp_integ = INTEG_IDS[trans.transform_id]
974             trans = trans.payload
975
976     def verify_sa_auth_req(self, packet):
977         ih = self.get_ike_header(packet)
978         self.assertEqual(ih.resp_SPI, self.sa.rspi)
979         self.assertEqual(ih.init_SPI, self.sa.ispi)
980         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
981         self.assertIn('Initiator', ih.flags)
982         self.assertNotIn('Response', ih.flags)
983
984         udp = packet[UDP]
985         self.verify_udp(udp)
986         self.assertEqual(ih.id, self.sa.msg_id + 1)
987         self.sa.msg_id += 1
988         plain = self.sa.hmac_and_decrypt(ih)
989         idi = ikev2.IKEv2_payload_IDi(plain)
990         idr = ikev2.IKEv2_payload_IDr(idi.payload)
991         self.assertEqual(idi.load, self.sa.i_id)
992         self.assertEqual(idr.load, self.sa.r_id)
993         prop = idi[ikev2.IKEv2_payload_Proposal]
994         c = self.sa.child_sas[0]
995         c.ispi = prop.SPI
996         self.update_esp_transforms(
997                 prop[ikev2.IKEv2_payload_Transform], self.sa)
998
999     def send_init_response(self):
1000         tr_attr = self.sa.ike_crypto_attr()
1001         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1002                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1003                  key_length=tr_attr[0]) /
1004                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1005                  transform_id=self.sa.ike_integ) /
1006                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1007                  transform_id=self.sa.ike_prf_alg.name) /
1008                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1009                  transform_id=self.sa.ike_dh))
1010         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1011                  trans_nb=4, trans=trans))
1012         self.sa.init_resp_packet = (
1013             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1014                         exch_type='IKE_SA_INIT', flags='Response') /
1015             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1016             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1017                                    group=self.sa.ike_dh,
1018                                    load=self.sa.my_dh_pub_key) /
1019             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
1020
1021         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1022                                      self.sa.sport, self.sa.dport,
1023                                      self.sa.natt, self.ip6)
1024         self.pg_send(self.pg0, ike_msg)
1025         capture = self.pg0.get_capture(1)
1026         self.verify_sa_auth_req(capture[0])
1027
1028     def initiate_sa_init(self):
1029         self.pg0.enable_capture()
1030         self.pg_start()
1031         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1032
1033         capture = self.pg0.get_capture(1)
1034         self.verify_sa_init_request(capture[0])
1035         self.send_init_response()
1036
1037     def send_auth_response(self):
1038         tr_attr = self.sa.esp_crypto_attr()
1039         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1040                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1041                  key_length=tr_attr[0]) /
1042                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1043                  transform_id=self.sa.esp_integ) /
1044                  ikev2.IKEv2_payload_Transform(
1045                  transform_type='Extended Sequence Number',
1046                  transform_id='No ESN') /
1047                  ikev2.IKEv2_payload_Transform(
1048                  transform_type='Extended Sequence Number',
1049                  transform_id='ESN'))
1050
1051         c = self.sa.child_sas[0]
1052         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1053                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1054
1055         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1056         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1057                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1058                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1059                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1060                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1061                  auth_type=AuthMethod.value(self.sa.auth_method),
1062                  load=self.sa.auth_data) /
1063                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1064                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1065                  number_of_TSs=len(tsi),
1066                  traffic_selector=tsi) /
1067                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1068                  number_of_TSs=len(tsr),
1069                  traffic_selector=tsr) /
1070                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1071
1072         header = ikev2.IKEv2(
1073                 init_SPI=self.sa.ispi,
1074                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1075                 flags='Response', exch_type='IKE_AUTH')
1076
1077         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1078         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1079                                     self.sa.dport, self.sa.natt, self.ip6)
1080         self.pg_send(self.pg0, packet)
1081
1082     def test_initiator(self):
1083         self.initiate_sa_init()
1084         self.sa.auth_init()
1085         self.sa.calc_child_keys()
1086         self.send_auth_response()
1087         self.verify_ike_sas()
1088
1089
1090 class TemplateResponder(IkePeer):
1091     """ responder test template """
1092
1093     def initiate_del_sa_from_responder(self):
1094         self.pg0.enable_capture()
1095         self.pg_start()
1096         self.vapi.ikev2_initiate_del_ike_sa(
1097                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1098         capture = self.pg0.get_capture(1)
1099         ih = self.get_ike_header(capture[0])
1100         self.assertNotIn('Response', ih.flags)
1101         self.assertNotIn('Initiator', ih.flags)
1102         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1103         plain = self.sa.hmac_and_decrypt(ih)
1104         d = ikev2.IKEv2_payload_Delete(plain)
1105         self.assertEqual(d.proto, 1)  # proto=IKEv2
1106         self.assertEqual(ih.init_SPI, self.sa.ispi)
1107         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1108         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1109                              flags='Initiator+Response',
1110                              exch_type='INFORMATIONAL',
1111                              id=ih.id, next_payload='Encrypted')
1112         resp = self.encrypt_ike_msg(header, b'', None)
1113         self.send_and_assert_no_replies(self.pg0, resp)
1114
1115     def verify_del_sa(self, packet):
1116         ih = self.get_ike_header(packet)
1117         self.assertEqual(ih.id, self.sa.msg_id)
1118         self.assertEqual(ih.exch_type, 37)  # exchange informational
1119         self.assertIn('Response', ih.flags)
1120         self.assertNotIn('Initiator', ih.flags)
1121         self.assertEqual(ih.next_payload, 46)  # Encrypted
1122         self.assertEqual(ih.init_SPI, self.sa.ispi)
1123         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1124         plain = self.sa.hmac_and_decrypt(ih)
1125         self.assertEqual(plain, b'')
1126
1127     def initiate_del_sa_from_initiator(self):
1128         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1129                              flags='Initiator', exch_type='INFORMATIONAL',
1130                              id=self.sa.new_msg_id())
1131         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1132         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1133         packet = self.create_packet(self.pg0, ike_msg,
1134                                     self.sa.sport, self.sa.dport,
1135                                     self.sa.natt, self.ip6)
1136         self.pg0.add_stream(packet)
1137         self.pg0.enable_capture()
1138         self.pg_start()
1139         capture = self.pg0.get_capture(1)
1140         self.verify_del_sa(capture[0])
1141
1142     def send_sa_init_req(self, behind_nat=False):
1143         tr_attr = self.sa.ike_crypto_attr()
1144         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1145                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1146                  key_length=tr_attr[0]) /
1147                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1148                  transform_id=self.sa.ike_integ) /
1149                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1150                  transform_id=self.sa.ike_prf_alg.name) /
1151                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1152                  transform_id=self.sa.ike_dh))
1153
1154         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1155                  trans_nb=4, trans=trans))
1156
1157         self.sa.init_req_packet = (
1158                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1159                             flags='Initiator', exch_type='IKE_SA_INIT') /
1160                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1161                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1162                                        group=self.sa.ike_dh,
1163                                        load=self.sa.my_dh_pub_key) /
1164                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1165                                           load=self.sa.i_nonce))
1166
1167         if behind_nat:
1168             src_address = b'\x0a\x0a\x0a\x01'
1169         else:
1170             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1171
1172         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1173         dst_nat = self.sa.compute_nat_sha1(
1174                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1175                 self.sa.dport)
1176         nat_src_detection = ikev2.IKEv2_payload_Notify(
1177                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1178                 next_payload='Notify')
1179         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1180                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1181         self.sa.init_req_packet = (self.sa.init_req_packet /
1182                                    nat_src_detection /
1183                                    nat_dst_detection)
1184
1185         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1186                                      self.sa.sport, self.sa.dport,
1187                                      self.sa.natt, self.ip6)
1188         self.pg0.add_stream(ike_msg)
1189         self.pg0.enable_capture()
1190         self.pg_start()
1191         capture = self.pg0.get_capture(1)
1192         self.verify_sa_init(capture[0])
1193
1194     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1195         tr_attr = self.sa.esp_crypto_attr()
1196         last_payload = last_payload or 'Notify'
1197         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1198                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1199                  key_length=tr_attr[0]) /
1200                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1201                  transform_id=self.sa.esp_integ) /
1202                  ikev2.IKEv2_payload_Transform(
1203                  transform_type='Extended Sequence Number',
1204                  transform_id='No ESN') /
1205                  ikev2.IKEv2_payload_Transform(
1206                  transform_type='Extended Sequence Number',
1207                  transform_id='ESN'))
1208
1209         c = self.sa.child_sas[0]
1210         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1211                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1212
1213         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1214         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1215                  auth_type=AuthMethod.value(self.sa.auth_method),
1216                  load=self.sa.auth_data) /
1217                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1218                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1219                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1220                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1221                  number_of_TSs=len(tsr), traffic_selector=tsr))
1222
1223         if is_rekey:
1224             first_payload = 'Nonce'
1225             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1226                      next_payload='SA') / plain /
1227                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1228                      proto='ESP', SPI=c.ispi))
1229         else:
1230             first_payload = 'IDi'
1231             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1232                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1233                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1234                    IDtype=self.sa.id_type, load=self.sa.r_id))
1235             plain = ids / plain
1236         return plain, first_payload
1237
1238     def send_sa_auth(self):
1239         plain, first_payload = self.generate_auth_payload(
1240                     last_payload='Notify')
1241         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1242         header = ikev2.IKEv2(
1243                 init_SPI=self.sa.ispi,
1244                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1245                 flags='Initiator', exch_type='IKE_AUTH')
1246
1247         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1248         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1249                                     self.sa.dport, self.sa.natt, self.ip6)
1250         self.pg0.add_stream(packet)
1251         self.pg0.enable_capture()
1252         self.pg_start()
1253         capture = self.pg0.get_capture(1)
1254         self.verify_sa_auth_resp(capture[0])
1255
1256     def verify_sa_init(self, packet):
1257         ih = self.get_ike_header(packet)
1258
1259         self.assertEqual(ih.id, self.sa.msg_id)
1260         self.assertEqual(ih.exch_type, 34)
1261         self.assertIn('Response', ih.flags)
1262         self.assertEqual(ih.init_SPI, self.sa.ispi)
1263         self.assertNotEqual(ih.resp_SPI, 0)
1264         self.sa.rspi = ih.resp_SPI
1265         try:
1266             sa = ih[ikev2.IKEv2_payload_SA]
1267             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1268             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1269         except IndexError as e:
1270             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1271             self.logger.error(ih.show())
1272             raise
1273         self.sa.complete_dh_data()
1274         self.sa.calc_keys()
1275         self.sa.auth_init()
1276
1277     def verify_sa_auth_resp(self, packet):
1278         ike = self.get_ike_header(packet)
1279         udp = packet[UDP]
1280         self.verify_udp(udp)
1281         self.assertEqual(ike.id, self.sa.msg_id)
1282         plain = self.sa.hmac_and_decrypt(ike)
1283         idr = ikev2.IKEv2_payload_IDr(plain)
1284         prop = idr[ikev2.IKEv2_payload_Proposal]
1285         self.assertEqual(prop.SPIsize, 4)
1286         self.sa.child_sas[0].rspi = prop.SPI
1287         self.sa.calc_child_keys()
1288
1289     def test_responder(self):
1290         self.send_sa_init_req(self.sa.natt)
1291         self.send_sa_auth()
1292         self.verify_ipsec_sas()
1293         self.verify_ike_sas()
1294
1295
1296 class Ikev2Params(object):
1297     def config_params(self, params={}):
1298         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1299         ei = VppEnum.vl_api_ipsec_integ_alg_t
1300         self.vpp_enums = {
1301                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1302                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1303                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1304                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1305                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1306                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1307
1308                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1309                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1310                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1311                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1312
1313         dpd_disabled = True if 'dpd_disabled' not in params else\
1314             params['dpd_disabled']
1315         if dpd_disabled:
1316             self.vapi.cli('ikev2 dpd disable')
1317         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1318             not in params else params['del_sa_from_responder']
1319         is_natt = 'natt' in params and params['natt'] or False
1320         self.p = Profile(self, 'pr1')
1321         self.ip6 = False if 'ip6' not in params else params['ip6']
1322
1323         if 'auth' in params and params['auth'] == 'rsa-sig':
1324             auth_method = 'rsa-sig'
1325             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1326             self.vapi.ikev2_set_local_key(
1327                     key_file=work_dir + params['server-key'])
1328
1329             client_file = work_dir + params['client-cert']
1330             server_pem = open(work_dir + params['server-cert']).read()
1331             client_priv = open(work_dir + params['client-key']).read()
1332             client_priv = load_pem_private_key(str.encode(client_priv), None,
1333                                                default_backend())
1334             self.peer_cert = x509.load_pem_x509_certificate(
1335                     str.encode(server_pem),
1336                     default_backend())
1337             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1338             auth_data = None
1339         else:
1340             auth_data = b'$3cr3tpa$$w0rd'
1341             self.p.add_auth(method='shared-key', data=auth_data)
1342             auth_method = 'shared-key'
1343             client_priv = None
1344
1345         is_init = True if 'is_initiator' not in params else\
1346             params['is_initiator']
1347
1348         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1349         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1350         if is_init:
1351             self.p.add_local_id(**idr)
1352             self.p.add_remote_id(**idi)
1353         else:
1354             self.p.add_local_id(**idi)
1355             self.p.add_remote_id(**idr)
1356
1357         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1358             'loc_ts' not in params else params['loc_ts']
1359         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1360             'rem_ts' not in params else params['rem_ts']
1361         self.p.add_local_ts(**loc_ts)
1362         self.p.add_remote_ts(**rem_ts)
1363         if 'responder' in params:
1364             self.p.add_responder(params['responder'])
1365         if 'ike_transforms' in params:
1366             self.p.add_ike_transforms(params['ike_transforms'])
1367         if 'esp_transforms' in params:
1368             self.p.add_esp_transforms(params['esp_transforms'])
1369
1370         udp_encap = False if 'udp_encap' not in params else\
1371             params['udp_encap']
1372         if udp_encap:
1373             self.p.set_udp_encap(True)
1374
1375         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1376                           is_initiator=is_init,
1377                           id_type=self.p.local_id['id_type'], natt=is_natt,
1378                           priv_key=client_priv, auth_method=auth_method,
1379                           auth_data=auth_data, udp_encap=udp_encap,
1380                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1381         if is_init:
1382             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1383                 params['ike-crypto']
1384             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1385                 params['ike-integ']
1386             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1387                 params['ike-dh']
1388
1389             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1390                 params['esp-crypto']
1391             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1392                 params['esp-integ']
1393
1394             self.sa.set_ike_props(
1395                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1396                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1397             self.sa.set_esp_props(
1398                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1399                     integ=esp_integ)
1400
1401
1402 class TestApi(VppTestCase):
1403     """ Test IKEV2 API """
1404     @classmethod
1405     def setUpClass(cls):
1406         super(TestApi, cls).setUpClass()
1407
1408     @classmethod
1409     def tearDownClass(cls):
1410         super(TestApi, cls).tearDownClass()
1411
1412     def tearDown(self):
1413         super(TestApi, self).tearDown()
1414         self.p1.remove_vpp_config()
1415         self.p2.remove_vpp_config()
1416         r = self.vapi.ikev2_profile_dump()
1417         self.assertEqual(len(r), 0)
1418
1419     def configure_profile(self, cfg):
1420         p = Profile(self, cfg['name'])
1421         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1422         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1423         p.add_local_ts(**cfg['loc_ts'])
1424         p.add_remote_ts(**cfg['rem_ts'])
1425         p.add_responder(cfg['responder'])
1426         p.add_ike_transforms(cfg['ike_ts'])
1427         p.add_esp_transforms(cfg['esp_ts'])
1428         p.add_auth(**cfg['auth'])
1429         p.set_udp_encap(cfg['udp_encap'])
1430         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1431         if 'lifetime_data' in cfg:
1432             p.set_lifetime_data(cfg['lifetime_data'])
1433         if 'tun_itf' in cfg:
1434             p.set_tunnel_interface(cfg['tun_itf'])
1435         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1436             p.disable_natt()
1437         p.add_vpp_config()
1438         return p
1439
1440     def test_profile_api(self):
1441         """ test profile dump API """
1442         loc_ts4 = {
1443                     'proto': 8,
1444                     'start_port': 1,
1445                     'end_port': 19,
1446                     'start_addr': '3.3.3.2',
1447                     'end_addr': '3.3.3.3',
1448                 }
1449         rem_ts4 = {
1450                     'proto': 9,
1451                     'start_port': 10,
1452                     'end_port': 119,
1453                     'start_addr': '4.5.76.80',
1454                     'end_addr': '2.3.4.6',
1455                 }
1456
1457         loc_ts6 = {
1458                     'proto': 8,
1459                     'start_port': 1,
1460                     'end_port': 19,
1461                     'start_addr': 'ab::1',
1462                     'end_addr': 'ab::4',
1463                 }
1464         rem_ts6 = {
1465                     'proto': 9,
1466                     'start_port': 10,
1467                     'end_port': 119,
1468                     'start_addr': 'cd::12',
1469                     'end_addr': 'cd::13',
1470                 }
1471
1472         conf = {
1473             'p1': {
1474                 'name': 'p1',
1475                 'natt_disabled': True,
1476                 'loc_id': ('fqdn', b'vpp.home'),
1477                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1478                 'loc_ts': loc_ts4,
1479                 'rem_ts': rem_ts4,
1480                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1481                 'ike_ts': {
1482                         'crypto_alg': 20,
1483                         'crypto_key_size': 32,
1484                         'integ_alg': 1,
1485                         'dh_group': 1},
1486                 'esp_ts': {
1487                         'crypto_alg': 13,
1488                         'crypto_key_size': 24,
1489                         'integ_alg': 2},
1490                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1491                 'udp_encap': True,
1492                 'ipsec_over_udp_port': 4501,
1493                 'lifetime_data': {
1494                     'lifetime': 123,
1495                     'lifetime_maxdata': 20192,
1496                     'lifetime_jitter': 9,
1497                     'handover': 132},
1498             },
1499             'p2': {
1500                 'name': 'p2',
1501                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1502                 'rem_id': ('ip6-addr', b'abcd::1'),
1503                 'loc_ts': loc_ts6,
1504                 'rem_ts': rem_ts6,
1505                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1506                 'ike_ts': {
1507                         'crypto_alg': 12,
1508                         'crypto_key_size': 16,
1509                         'integ_alg': 3,
1510                         'dh_group': 3},
1511                 'esp_ts': {
1512                         'crypto_alg': 9,
1513                         'crypto_key_size': 24,
1514                         'integ_alg': 4},
1515                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1516                 'udp_encap': False,
1517                 'ipsec_over_udp_port': 4600,
1518                 'tun_itf': 0}
1519         }
1520         self.p1 = self.configure_profile(conf['p1'])
1521         self.p2 = self.configure_profile(conf['p2'])
1522
1523         r = self.vapi.ikev2_profile_dump()
1524         self.assertEqual(len(r), 2)
1525         self.verify_profile(r[0].profile, conf['p1'])
1526         self.verify_profile(r[1].profile, conf['p2'])
1527
1528     def verify_id(self, api_id, cfg_id):
1529         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1530         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1531
1532     def verify_ts(self, api_ts, cfg_ts):
1533         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1534         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1535         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1536         self.assertEqual(api_ts.start_addr,
1537                          ip_address(text_type(cfg_ts['start_addr'])))
1538         self.assertEqual(api_ts.end_addr,
1539                          ip_address(text_type(cfg_ts['end_addr'])))
1540
1541     def verify_responder(self, api_r, cfg_r):
1542         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1543         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1544
1545     def verify_transforms(self, api_ts, cfg_ts):
1546         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1547         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1548         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1549
1550     def verify_ike_transforms(self, api_ts, cfg_ts):
1551         self.verify_transforms(api_ts, cfg_ts)
1552         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1553
1554     def verify_esp_transforms(self, api_ts, cfg_ts):
1555         self.verify_transforms(api_ts, cfg_ts)
1556
1557     def verify_auth(self, api_auth, cfg_auth):
1558         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1559         self.assertEqual(api_auth.data, cfg_auth['data'])
1560         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1561
1562     def verify_lifetime_data(self, p, ld):
1563         self.assertEqual(p.lifetime, ld['lifetime'])
1564         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1565         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1566         self.assertEqual(p.handover, ld['handover'])
1567
1568     def verify_profile(self, ap, cp):
1569         self.assertEqual(ap.name, cp['name'])
1570         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1571         self.verify_id(ap.loc_id, cp['loc_id'])
1572         self.verify_id(ap.rem_id, cp['rem_id'])
1573         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1574         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1575         self.verify_responder(ap.responder, cp['responder'])
1576         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1577         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1578         self.verify_auth(ap.auth, cp['auth'])
1579         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1580         self.assertTrue(natt_dis == ap.natt_disabled)
1581
1582         if 'lifetime_data' in cp:
1583             self.verify_lifetime_data(ap, cp['lifetime_data'])
1584         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1585         if 'tun_itf' in cp:
1586             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1587         else:
1588             self.assertEqual(ap.tun_itf, 0xffffffff)
1589
1590
1591 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1592     """ test ikev2 initiator - pre shared key auth """
1593
1594     def config_tc(self):
1595         self.config_params({
1596             'is_initiator': False,  # seen from test case perspective
1597                                     # thus vpp is initiator
1598             'responder': {'sw_if_index': self.pg0.sw_if_index,
1599                            'addr': self.pg0.remote_ip4},
1600             'ike-crypto': ('AES-GCM-16ICV', 32),
1601             'ike-integ': 'NULL',
1602             'ike-dh': '3072MODPgr',
1603             'ike_transforms': {
1604                 'crypto_alg': 20,  # "aes-gcm-16"
1605                 'crypto_key_size': 256,
1606                 'dh_group': 15,  # "modp-3072"
1607             },
1608             'esp_transforms': {
1609                 'crypto_alg': 12,  # "aes-cbc"
1610                 'crypto_key_size': 256,
1611                 # "hmac-sha2-256-128"
1612                 'integ_alg': 12}})
1613
1614
1615 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1616     """ test initiator - request window size (1) """
1617
1618     def rekey_respond(self, req, update_child_sa_data):
1619         ih = self.get_ike_header(req)
1620         plain = self.sa.hmac_and_decrypt(ih)
1621         sa = ikev2.IKEv2_payload_SA(plain)
1622         if update_child_sa_data:
1623             prop = sa[ikev2.IKEv2_payload_Proposal]
1624             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1625             self.sa.r_nonce = self.sa.i_nonce
1626             self.sa.child_sas[0].ispi = prop.SPI
1627             self.sa.child_sas[0].rspi = prop.SPI
1628             self.sa.calc_child_keys()
1629
1630         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1631                              flags='Response', exch_type=36,
1632                              id=ih.id, next_payload='Encrypted')
1633         resp = self.encrypt_ike_msg(header, sa, 'SA')
1634         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1635                                     self.sa.dport, self.sa.natt, self.ip6)
1636         self.send_and_assert_no_replies(self.pg0, packet)
1637
1638     def test_initiator(self):
1639         super(TestInitiatorRequestWindowSize, self).test_initiator()
1640         self.pg0.enable_capture()
1641         self.pg_start()
1642         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1643         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1644         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1645         capture = self.pg0.get_capture(2)
1646
1647         # reply in reverse order
1648         self.rekey_respond(capture[1], True)
1649         self.rekey_respond(capture[0], False)
1650
1651         # verify that only the second request was accepted
1652         self.verify_ike_sas()
1653         self.verify_ipsec_sas(is_rekey=True)
1654
1655
1656 class TestInitiatorRekey(TestInitiatorPsk):
1657     """ test ikev2 initiator - rekey """
1658
1659     def rekey_from_initiator(self):
1660         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1661         self.pg0.enable_capture()
1662         self.pg_start()
1663         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1664         capture = self.pg0.get_capture(1)
1665         ih = self.get_ike_header(capture[0])
1666         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1667         self.assertNotIn('Response', ih.flags)
1668         self.assertIn('Initiator', ih.flags)
1669         plain = self.sa.hmac_and_decrypt(ih)
1670         sa = ikev2.IKEv2_payload_SA(plain)
1671         prop = sa[ikev2.IKEv2_payload_Proposal]
1672         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1673         self.sa.r_nonce = self.sa.i_nonce
1674         # update new responder SPI
1675         self.sa.child_sas[0].ispi = prop.SPI
1676         self.sa.child_sas[0].rspi = prop.SPI
1677         self.sa.calc_child_keys()
1678         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1679                              flags='Response', exch_type=36,
1680                              id=ih.id, next_payload='Encrypted')
1681         resp = self.encrypt_ike_msg(header, sa, 'SA')
1682         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1683                                     self.sa.dport, self.sa.natt, self.ip6)
1684         self.send_and_assert_no_replies(self.pg0, packet)
1685
1686     def test_initiator(self):
1687         super(TestInitiatorRekey, self).test_initiator()
1688         self.rekey_from_initiator()
1689         self.verify_ike_sas()
1690         self.verify_ipsec_sas(is_rekey=True)
1691
1692
1693 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1694     """ test ikev2 initiator - delete IKE SA from responder """
1695
1696     def config_tc(self):
1697         self.config_params({
1698             'del_sa_from_responder': True,
1699             'is_initiator': False,  # seen from test case perspective
1700                                     # thus vpp is initiator
1701             'responder': {'sw_if_index': self.pg0.sw_if_index,
1702                            'addr': self.pg0.remote_ip4},
1703             'ike-crypto': ('AES-GCM-16ICV', 32),
1704             'ike-integ': 'NULL',
1705             'ike-dh': '3072MODPgr',
1706             'ike_transforms': {
1707                 'crypto_alg': 20,  # "aes-gcm-16"
1708                 'crypto_key_size': 256,
1709                 'dh_group': 15,  # "modp-3072"
1710             },
1711             'esp_transforms': {
1712                 'crypto_alg': 12,  # "aes-cbc"
1713                 'crypto_key_size': 256,
1714                 # "hmac-sha2-256-128"
1715                 'integ_alg': 12}})
1716
1717
1718 class TestResponderNATT(TemplateResponder, Ikev2Params):
1719     """ test ikev2 responder - nat traversal """
1720     def config_tc(self):
1721         self.config_params(
1722                 {'natt': True})
1723
1724
1725 class TestResponderPsk(TemplateResponder, Ikev2Params):
1726     """ test ikev2 responder - pre shared key auth """
1727     def config_tc(self):
1728         self.config_params()
1729
1730
1731 class TestResponderDpd(TestResponderPsk):
1732     """
1733     Dead peer detection test
1734     """
1735     def config_tc(self):
1736         self.config_params({'dpd_disabled': False})
1737
1738     def tearDown(self):
1739         pass
1740
1741     def test_responder(self):
1742         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1743         super(TestResponderDpd, self).test_responder()
1744         self.pg0.enable_capture()
1745         self.pg_start()
1746         # capture empty request but don't reply
1747         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1748         ih = self.get_ike_header(capture[0])
1749         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1750         plain = self.sa.hmac_and_decrypt(ih)
1751         self.assertEqual(plain, b'')
1752         # wait for SA expiration
1753         time.sleep(3)
1754         ike_sas = self.vapi.ikev2_sa_dump()
1755         self.assertEqual(len(ike_sas), 0)
1756         ipsec_sas = self.vapi.ipsec_sa_dump()
1757         self.assertEqual(len(ipsec_sas), 0)
1758
1759
1760 class TestResponderRekey(TestResponderPsk):
1761     """ test ikev2 responder - rekey """
1762
1763     def rekey_from_initiator(self):
1764         packet = self.create_rekey_request()
1765         self.pg0.add_stream(packet)
1766         self.pg0.enable_capture()
1767         self.pg_start()
1768         capture = self.pg0.get_capture(1)
1769         ih = self.get_ike_header(capture[0])
1770         plain = self.sa.hmac_and_decrypt(ih)
1771         sa = ikev2.IKEv2_payload_SA(plain)
1772         prop = sa[ikev2.IKEv2_payload_Proposal]
1773         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1774         # update new responder SPI
1775         self.sa.child_sas[0].rspi = prop.SPI
1776
1777     def test_responder(self):
1778         super(TestResponderRekey, self).test_responder()
1779         self.rekey_from_initiator()
1780         self.sa.calc_child_keys()
1781         self.verify_ike_sas()
1782         self.verify_ipsec_sas(is_rekey=True)
1783
1784
1785 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1786     """ test ikev2 responder - cert based auth """
1787     def config_tc(self):
1788         self.config_params({
1789             'udp_encap': True,
1790             'auth': 'rsa-sig',
1791             'server-key': 'server-key.pem',
1792             'client-key': 'client-key.pem',
1793             'client-cert': 'client-cert.pem',
1794             'server-cert': 'server-cert.pem'})
1795
1796
1797 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1798         (TemplateResponder, Ikev2Params):
1799     """
1800     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1801     """
1802     def config_tc(self):
1803         self.config_params({
1804             'ike-crypto': ('AES-CBC', 16),
1805             'ike-integ': 'SHA2-256-128',
1806             'esp-crypto': ('AES-CBC', 24),
1807             'esp-integ': 'SHA2-384-192',
1808             'ike-dh': '2048MODPgr'})
1809
1810
1811 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1812         (TemplateResponder, Ikev2Params):
1813     """
1814     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1815     """
1816     def config_tc(self):
1817         self.config_params({
1818             'ike-crypto': ('AES-CBC', 32),
1819             'ike-integ': 'SHA2-256-128',
1820             'esp-crypto': ('AES-GCM-16ICV', 32),
1821             'esp-integ': 'NULL',
1822             'ike-dh': '3072MODPgr'})
1823
1824
1825 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1826     """
1827     IKE:AES_GCM_16_256
1828     """
1829     def config_tc(self):
1830         self.config_params({
1831             'del_sa_from_responder': True,
1832             'ip6': True,
1833             'natt': True,
1834             'ike-crypto': ('AES-GCM-16ICV', 32),
1835             'ike-integ': 'NULL',
1836             'ike-dh': '2048MODPgr',
1837             'loc_ts': {'start_addr': 'ab:cd::0',
1838                        'end_addr': 'ab:cd::10'},
1839             'rem_ts': {'start_addr': '11::0',
1840                        'end_addr': '11::100'}})
1841
1842
1843 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1844     """
1845     Test for keep alive messages
1846     """
1847
1848     def send_empty_req_from_responder(self):
1849         packet = self.create_empty_request()
1850         self.pg0.add_stream(packet)
1851         self.pg0.enable_capture()
1852         self.pg_start()
1853         capture = self.pg0.get_capture(1)
1854         ih = self.get_ike_header(capture[0])
1855         self.assertEqual(ih.id, self.sa.msg_id)
1856         plain = self.sa.hmac_and_decrypt(ih)
1857         self.assertEqual(plain, b'')
1858
1859     def test_initiator(self):
1860         super(TestInitiatorKeepaliveMsg, self).test_initiator()
1861         self.send_empty_req_from_responder()
1862
1863
1864 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1865     """ malformed packet test """
1866
1867     def tearDown(self):
1868         pass
1869
1870     def config_tc(self):
1871         self.config_params()
1872
1873     def assert_counter(self, count, name, version='ip4'):
1874         node_name = '/err/ikev2-%s/' % version + name
1875         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1876
1877     def create_ike_init_msg(self, length=None, payload=None):
1878         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1879                           flags='Initiator', exch_type='IKE_SA_INIT')
1880         if payload is not None:
1881             msg /= payload
1882         return self.create_packet(self.pg0, msg, self.sa.sport,
1883                                   self.sa.dport)
1884
1885     def verify_bad_packet_length(self):
1886         ike_msg = self.create_ike_init_msg(length=0xdead)
1887         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1888         self.assert_counter(self.pkt_count, 'Bad packet length')
1889
1890     def verify_bad_sa_payload_length(self):
1891         p = ikev2.IKEv2_payload_SA(length=0xdead)
1892         ike_msg = self.create_ike_init_msg(payload=p)
1893         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1894         self.assert_counter(self.pkt_count, 'Malformed packet')
1895
1896     def test_responder(self):
1897         self.pkt_count = 254
1898         self.verify_bad_packet_length()
1899         self.verify_bad_sa_payload_length()
1900
1901
1902 if __name__ == '__main__':
1903     unittest.main(testRunner=VppTestRunner)