ikev2: test responder behind NAT
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
24
25 try:
26     text_type = unicode
27 except NameError:
28     text_type = str
29
30 KEY_PAD = b"Key Pad for IKEv2"
31 SALT_SIZE = 4
32 GCM_ICV_SIZE = 16
33 GCM_IV_SIZE = 8
34
35
36 # defined in rfc3526
37 # tuple structure is (p, g, key_len)
38 DH = {
39     '2048MODPgr': (long_converter("""
40     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
51
52     '3072MODPgr': (long_converter("""
53     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
69 }
70
71
72 class CryptoAlgo(object):
73     def __init__(self, name, cipher, mode):
74         self.name = name
75         self.cipher = cipher
76         self.mode = mode
77         if self.cipher is not None:
78             self.bs = self.cipher.block_size // 8
79
80             if self.name == 'AES-GCM-16ICV':
81                 self.iv_len = GCM_IV_SIZE
82             else:
83                 self.iv_len = self.bs
84
85     def encrypt(self, data, key, aad=None):
86         iv = os.urandom(self.iv_len)
87         if aad is None:
88             encryptor = Cipher(self.cipher(key), self.mode(iv),
89                                default_backend()).encryptor()
90             return iv + encryptor.update(data) + encryptor.finalize()
91         else:
92             salt = key[-SALT_SIZE:]
93             nonce = salt + iv
94             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95                                default_backend()).encryptor()
96             encryptor.authenticate_additional_data(aad)
97             data = encryptor.update(data) + encryptor.finalize()
98             data += encryptor.tag[:GCM_ICV_SIZE]
99             return iv + data
100
101     def decrypt(self, data, key, aad=None, icv=None):
102         if aad is None:
103             iv = data[:self.iv_len]
104             ct = data[self.iv_len:]
105             decryptor = Cipher(algorithms.AES(key),
106                                self.mode(iv),
107                                default_backend()).decryptor()
108             return decryptor.update(ct) + decryptor.finalize()
109         else:
110             salt = key[-SALT_SIZE:]
111             nonce = salt + data[:GCM_IV_SIZE]
112             ct = data[GCM_IV_SIZE:]
113             key = key[:-SALT_SIZE]
114             decryptor = Cipher(algorithms.AES(key),
115                                self.mode(nonce, icv, len(icv)),
116                                default_backend()).decryptor()
117             decryptor.authenticate_additional_data(aad)
118             return decryptor.update(ct) + decryptor.finalize()
119
120     def pad(self, data):
121         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122         data = data + b'\x00' * (pad_len - 1)
123         return data + bytes([pad_len - 1])
124
125
126 class AuthAlgo(object):
127     def __init__(self, name, mac, mod, key_len, trunc_len=None):
128         self.name = name
129         self.mac = mac
130         self.mod = mod
131         self.key_len = key_len
132         self.trunc_len = trunc_len or key_len
133
134
135 CRYPTO_ALGOS = {
136     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
139                                 mode=modes.GCM),
140 }
141
142 AUTH_ALGOS = {
143     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
148 }
149
150 PRF_ALGOS = {
151     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
153                                   hashes.SHA256, 32),
154 }
155
156 CRYPTO_IDS = {
157     12: 'AES-CBC',
158     20: 'AES-GCM-16ICV',
159 }
160
161 INTEG_IDS = {
162     2: 'HMAC-SHA1-96',
163     12: 'SHA2-256-128',
164     13: 'SHA2-384-192',
165     14: 'SHA2-512-256',
166 }
167
168
169 class IKEv2ChildSA(object):
170     def __init__(self, local_ts, remote_ts, is_initiator):
171         spi = os.urandom(4)
172         if is_initiator:
173             self.ispi = spi
174             self.rspi = None
175         else:
176             self.rspi = spi
177             self.ispi = None
178         self.local_ts = local_ts
179         self.remote_ts = remote_ts
180
181
182 class IKEv2SA(object):
183     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186                  auth_method='shared-key', priv_key=None, i_natt=False,
187                  r_natt=False, udp_encap=False):
188         self.udp_encap = udp_encap
189         self.i_natt = i_natt
190         self.r_natt = r_natt
191         if i_natt or r_natt:
192             self.sport = 4500
193             self.dport = 4500
194         else:
195             self.sport = 500
196             self.dport = 500
197         self.msg_id = 0
198         self.dh_params = None
199         self.test = test
200         self.priv_key = priv_key
201         self.is_initiator = is_initiator
202         nonce = nonce or os.urandom(32)
203         self.auth_data = auth_data
204         self.i_id = i_id
205         self.r_id = r_id
206         if isinstance(id_type, str):
207             self.id_type = IDType.value(id_type)
208         else:
209             self.id_type = id_type
210         self.auth_method = auth_method
211         if self.is_initiator:
212             self.rspi = 8 * b'\x00'
213             self.ispi = spi
214             self.i_nonce = nonce
215         else:
216             self.rspi = spi
217             self.ispi = 8 * b'\x00'
218             self.r_nonce = nonce
219         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
220                           self.is_initiator)]
221
222     def new_msg_id(self):
223         self.msg_id += 1
224         return self.msg_id
225
226     @property
227     def my_dh_pub_key(self):
228         if self.is_initiator:
229             return self.i_dh_data
230         return self.r_dh_data
231
232     @property
233     def peer_dh_pub_key(self):
234         if self.is_initiator:
235             return self.r_dh_data
236         return self.i_dh_data
237
238     @property
239     def natt(self):
240         return self.i_natt or self.r_natt
241
242     def compute_secret(self):
243         priv = self.dh_private_key
244         peer = self.peer_dh_pub_key
245         p, g, l = self.ike_group
246         return pow(int.from_bytes(peer, 'big'),
247                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
248
249     def generate_dh_data(self):
250         # generate DH keys
251         if self.ike_dh not in DH:
252             raise NotImplementedError('%s not in DH group' % self.ike_dh)
253
254         if self.dh_params is None:
255             dhg = DH[self.ike_dh]
256             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
257             self.dh_params = pn.parameters(default_backend())
258
259         priv = self.dh_params.generate_private_key()
260         pub = priv.public_key()
261         x = priv.private_numbers().x
262         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
263         y = pub.public_numbers().y
264
265         if self.is_initiator:
266             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
267         else:
268             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
269
270     def complete_dh_data(self):
271         self.dh_shared_secret = self.compute_secret()
272
273     def calc_child_keys(self):
274         prf = self.ike_prf_alg.mod()
275         s = self.i_nonce + self.r_nonce
276         c = self.child_sas[0]
277
278         encr_key_len = self.esp_crypto_key_len
279         integ_key_len = self.esp_integ_alg.key_len
280         salt_len = 0 if integ_key_len else 4
281
282         l = (integ_key_len * 2 +
283              encr_key_len * 2 +
284              salt_len * 2)
285         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
286
287         pos = 0
288         c.sk_ei = keymat[pos:pos+encr_key_len]
289         pos += encr_key_len
290
291         if integ_key_len:
292             c.sk_ai = keymat[pos:pos+integ_key_len]
293             pos += integ_key_len
294         else:
295             c.salt_ei = keymat[pos:pos+salt_len]
296             pos += salt_len
297
298         c.sk_er = keymat[pos:pos+encr_key_len]
299         pos += encr_key_len
300
301         if integ_key_len:
302             c.sk_ar = keymat[pos:pos+integ_key_len]
303             pos += integ_key_len
304         else:
305             c.salt_er = keymat[pos:pos+salt_len]
306             pos += salt_len
307
308     def calc_prfplus(self, prf, key, seed, length):
309         r = b''
310         t = None
311         x = 1
312         while len(r) < length and x < 255:
313             if t is not None:
314                 s = t
315             else:
316                 s = b''
317             s = s + seed + bytes([x])
318             t = self.calc_prf(prf, key, s)
319             r = r + t
320             x = x + 1
321
322         if x == 255:
323             return None
324         return r
325
326     def calc_prf(self, prf, key, data):
327         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
328         h.update(data)
329         return h.finalize()
330
331     def calc_keys(self):
332         prf = self.ike_prf_alg.mod()
333         # SKEYSEED = prf(Ni | Nr, g^ir)
334         s = self.i_nonce + self.r_nonce
335         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
336
337         # calculate S = Ni | Nr | SPIi SPIr
338         s = s + self.ispi + self.rspi
339
340         prf_key_trunc = self.ike_prf_alg.trunc_len
341         encr_key_len = self.ike_crypto_key_len
342         tr_prf_key_len = self.ike_prf_alg.key_len
343         integ_key_len = self.ike_integ_alg.key_len
344         if integ_key_len == 0:
345             salt_size = 4
346         else:
347             salt_size = 0
348
349         l = (prf_key_trunc +
350              integ_key_len * 2 +
351              encr_key_len * 2 +
352              tr_prf_key_len * 2 +
353              salt_size * 2)
354         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
355
356         pos = 0
357         self.sk_d = keymat[:pos+prf_key_trunc]
358         pos += prf_key_trunc
359
360         self.sk_ai = keymat[pos:pos+integ_key_len]
361         pos += integ_key_len
362         self.sk_ar = keymat[pos:pos+integ_key_len]
363         pos += integ_key_len
364
365         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
366         pos += encr_key_len + salt_size
367         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
368         pos += encr_key_len + salt_size
369
370         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
371         pos += tr_prf_key_len
372         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
373
374     def generate_authmsg(self, prf, packet):
375         if self.is_initiator:
376             id = self.i_id
377             nonce = self.r_nonce
378             key = self.sk_pi
379         else:
380             id = self.r_id
381             nonce = self.i_nonce
382             key = self.sk_pr
383         data = bytes([self.id_type, 0, 0, 0]) + id
384         id_hash = self.calc_prf(prf, key, data)
385         return packet + nonce + id_hash
386
387     def auth_init(self):
388         prf = self.ike_prf_alg.mod()
389         if self.is_initiator:
390             packet = self.init_req_packet
391         else:
392             packet = self.init_resp_packet
393         authmsg = self.generate_authmsg(prf, raw(packet))
394         if self.auth_method == 'shared-key':
395             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
396             self.auth_data = self.calc_prf(prf, psk, authmsg)
397         elif self.auth_method == 'rsa-sig':
398             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
399                                                 hashes.SHA1())
400         else:
401             raise TypeError('unknown auth method type!')
402
403     def encrypt(self, data, aad=None):
404         data = self.ike_crypto_alg.pad(data)
405         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
406
407     @property
408     def peer_authkey(self):
409         if self.is_initiator:
410             return self.sk_ar
411         return self.sk_ai
412
413     @property
414     def my_authkey(self):
415         if self.is_initiator:
416             return self.sk_ai
417         return self.sk_ar
418
419     @property
420     def my_cryptokey(self):
421         if self.is_initiator:
422             return self.sk_ei
423         return self.sk_er
424
425     @property
426     def peer_cryptokey(self):
427         if self.is_initiator:
428             return self.sk_er
429         return self.sk_ei
430
431     def concat(self, alg, key_len):
432         return alg + '-' + str(key_len * 8)
433
434     @property
435     def vpp_ike_cypto_alg(self):
436         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
437
438     @property
439     def vpp_esp_cypto_alg(self):
440         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
441
442     def verify_hmac(self, ikemsg):
443         integ_trunc = self.ike_integ_alg.trunc_len
444         exp_hmac = ikemsg[-integ_trunc:]
445         data = ikemsg[:-integ_trunc]
446         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
447                                           self.peer_authkey, data)
448         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
449
450     def compute_hmac(self, integ, key, data):
451         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
452         h.update(data)
453         return h.finalize()
454
455     def decrypt(self, data, aad=None, icv=None):
456         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
457
458     def hmac_and_decrypt(self, ike):
459         ep = ike[ikev2.IKEv2_payload_Encrypted]
460         if self.ike_crypto == 'AES-GCM-16ICV':
461             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
462             ct = ep.load[:-GCM_ICV_SIZE]
463             tag = ep.load[-GCM_ICV_SIZE:]
464             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
465         else:
466             self.verify_hmac(raw(ike))
467             integ_trunc = self.ike_integ_alg.trunc_len
468
469             # remove ICV and decrypt payload
470             ct = ep.load[:-integ_trunc]
471             plain = self.decrypt(ct)
472         # remove padding
473         pad_len = plain[-1]
474         return plain[:-pad_len - 1]
475
476     def build_ts_addr(self, ts, version):
477         return {'starting_address_v' + version: ts['start_addr'],
478                 'ending_address_v' + version: ts['end_addr']}
479
480     def generate_ts(self, is_ip4):
481         c = self.child_sas[0]
482         ts_data = {'IP_protocol_ID': 0,
483                    'start_port': 0,
484                    'end_port': 0xffff}
485         if is_ip4:
486             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
487             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
488             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
489             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
490         else:
491             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
492             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
493             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
494             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
495
496         if self.is_initiator:
497             return ([ts1], [ts2])
498         return ([ts2], [ts1])
499
500     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
501         if crypto not in CRYPTO_ALGOS:
502             raise TypeError('unsupported encryption algo %r' % crypto)
503         self.ike_crypto = crypto
504         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
505         self.ike_crypto_key_len = crypto_key_len
506
507         if integ not in AUTH_ALGOS:
508             raise TypeError('unsupported auth algo %r' % integ)
509         self.ike_integ = None if integ == 'NULL' else integ
510         self.ike_integ_alg = AUTH_ALGOS[integ]
511
512         if prf not in PRF_ALGOS:
513             raise TypeError('unsupported prf algo %r' % prf)
514         self.ike_prf = prf
515         self.ike_prf_alg = PRF_ALGOS[prf]
516         self.ike_dh = dh
517         self.ike_group = DH[self.ike_dh]
518
519     def set_esp_props(self, crypto, crypto_key_len, integ):
520         self.esp_crypto_key_len = crypto_key_len
521         if crypto not in CRYPTO_ALGOS:
522             raise TypeError('unsupported encryption algo %r' % crypto)
523         self.esp_crypto = crypto
524         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
525
526         if integ not in AUTH_ALGOS:
527             raise TypeError('unsupported auth algo %r' % integ)
528         self.esp_integ = None if integ == 'NULL' else integ
529         self.esp_integ_alg = AUTH_ALGOS[integ]
530
531     def crypto_attr(self, key_len):
532         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
533             return (0x800e << 16 | key_len << 3, 12)
534         else:
535             raise Exception('unsupported attribute type')
536
537     def ike_crypto_attr(self):
538         return self.crypto_attr(self.ike_crypto_key_len)
539
540     def esp_crypto_attr(self):
541         return self.crypto_attr(self.esp_crypto_key_len)
542
543     def compute_nat_sha1(self, ip, port, rspi=None):
544         if rspi is None:
545             rspi = self.rspi
546         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
547         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
548         digest.update(data)
549         return digest.finalize()
550
551
552 class IkePeer(VppTestCase):
553     """ common class for initiator and responder """
554
555     @classmethod
556     def setUpClass(cls):
557         import scapy.contrib.ikev2 as _ikev2
558         globals()['ikev2'] = _ikev2
559         super(IkePeer, cls).setUpClass()
560         cls.create_pg_interfaces(range(2))
561         for i in cls.pg_interfaces:
562             i.admin_up()
563             i.config_ip4()
564             i.resolve_arp()
565             i.config_ip6()
566             i.resolve_ndp()
567
568     @classmethod
569     def tearDownClass(cls):
570         super(IkePeer, cls).tearDownClass()
571
572     def tearDown(self):
573         super(IkePeer, self).tearDown()
574         if self.del_sa_from_responder:
575             self.initiate_del_sa_from_responder()
576         else:
577             self.initiate_del_sa_from_initiator()
578         r = self.vapi.ikev2_sa_dump()
579         self.assertEqual(len(r), 0)
580         sas = self.vapi.ipsec_sa_dump()
581         self.assertEqual(len(sas), 0)
582         self.p.remove_vpp_config()
583         self.assertIsNone(self.p.query_vpp_config())
584
585     def setUp(self):
586         super(IkePeer, self).setUp()
587         self.config_tc()
588         self.p.add_vpp_config()
589         self.assertIsNotNone(self.p.query_vpp_config())
590         if self.sa.is_initiator:
591             self.sa.generate_dh_data()
592         self.vapi.cli('ikev2 set logging level 4')
593         self.vapi.cli('event-lo clear')
594
595     def create_rekey_request(self):
596         sa, first_payload = self.generate_auth_payload(is_rekey=True)
597         header = ikev2.IKEv2(
598                 init_SPI=self.sa.ispi,
599                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
600                 flags='Initiator', exch_type='CREATE_CHILD_SA')
601
602         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
603         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
604                                   self.sa.dport, self.sa.natt, self.ip6)
605
606     def create_empty_request(self):
607         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
608                              id=self.sa.new_msg_id(), flags='Initiator',
609                              exch_type='INFORMATIONAL',
610                              next_payload='Encrypted')
611
612         msg = self.encrypt_ike_msg(header, b'', None)
613         return self.create_packet(self.pg0, msg, self.sa.sport,
614                                   self.sa.dport, self.sa.natt, self.ip6)
615
616     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
617                       use_ip6=False):
618         if use_ip6:
619             src_ip = src_if.remote_ip6
620             dst_ip = src_if.local_ip6
621             ip_layer = IPv6
622         else:
623             src_ip = src_if.remote_ip4
624             dst_ip = src_if.local_ip4
625             ip_layer = IP
626         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
627                ip_layer(src=src_ip, dst=dst_ip) /
628                UDP(sport=sport, dport=dport))
629         if natt:
630             # insert non ESP marker
631             res = res / Raw(b'\x00' * 4)
632         return res / msg
633
634     def verify_udp(self, udp):
635         self.assertEqual(udp.sport, self.sa.sport)
636         self.assertEqual(udp.dport, self.sa.dport)
637
638     def get_ike_header(self, packet):
639         try:
640             ih = packet[ikev2.IKEv2]
641             ih = self.verify_and_remove_non_esp_marker(ih)
642         except IndexError as e:
643             # this is a workaround for getting IKEv2 layer as both ikev2 and
644             # ipsec register for port 4500
645             esp = packet[ESP]
646             ih = self.verify_and_remove_non_esp_marker(esp)
647         self.assertEqual(ih.version, 0x20)
648         self.assertNotIn('Version', ih.flags)
649         return ih
650
651     def verify_and_remove_non_esp_marker(self, packet):
652         if self.sa.natt:
653             # if we are in nat traversal mode check for non esp marker
654             # and remove it
655             data = raw(packet)
656             self.assertEqual(data[:4], b'\x00' * 4)
657             return ikev2.IKEv2(data[4:])
658         else:
659             return packet
660
661     def encrypt_ike_msg(self, header, plain, first_payload):
662         if self.sa.ike_crypto == 'AES-GCM-16ICV':
663             data = self.sa.ike_crypto_alg.pad(raw(plain))
664             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
665                 len(ikev2.IKEv2_payload_Encrypted())
666             tlen = plen + len(ikev2.IKEv2())
667
668             # prepare aad data
669             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
670                                                  length=plen)
671             header.length = tlen
672             res = header / sk_p
673             encr = self.sa.encrypt(raw(plain), raw(res))
674             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
675                                                  length=plen, load=encr)
676             res = header / sk_p
677         else:
678             encr = self.sa.encrypt(raw(plain))
679             trunc_len = self.sa.ike_integ_alg.trunc_len
680             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
681             tlen = plen + len(ikev2.IKEv2())
682
683             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
684                                                  length=plen, load=encr)
685             header.length = tlen
686             res = header / sk_p
687
688             integ_data = raw(res)
689             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
690                                              self.sa.my_authkey, integ_data)
691             res = res / Raw(hmac_data[:trunc_len])
692         assert(len(res) == tlen)
693         return res
694
695     def verify_udp_encap(self, ipsec_sa):
696         e = VppEnum.vl_api_ipsec_sad_flags_t
697         if self.sa.udp_encap or self.sa.natt:
698             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
699         else:
700             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
701
702     def verify_ipsec_sas(self, is_rekey=False):
703         sas = self.vapi.ipsec_sa_dump()
704         if is_rekey:
705             # after rekey there is a short period of time in which old
706             # inbound SA is still present
707             sa_count = 3
708         else:
709             sa_count = 2
710         self.assertEqual(len(sas), sa_count)
711         if self.sa.is_initiator:
712             if is_rekey:
713                 sa0 = sas[0].entry
714                 sa1 = sas[2].entry
715             else:
716                 sa0 = sas[0].entry
717                 sa1 = sas[1].entry
718         else:
719             if is_rekey:
720                 sa0 = sas[2].entry
721                 sa1 = sas[0].entry
722             else:
723                 sa1 = sas[0].entry
724                 sa0 = sas[1].entry
725
726         c = self.sa.child_sas[0]
727
728         self.verify_udp_encap(sa0)
729         self.verify_udp_encap(sa1)
730         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
731         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
732         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
733
734         if self.sa.esp_integ is None:
735             vpp_integ_alg = 0
736         else:
737             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
738         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
739         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
740
741         # verify crypto keys
742         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
743         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
744         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
745         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
746
747         # verify integ keys
748         if vpp_integ_alg:
749             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
750             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
751             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
752             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
753         else:
754             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
755             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
756
757     def verify_keymat(self, api_keys, keys, name):
758         km = getattr(keys, name)
759         api_km = getattr(api_keys, name)
760         api_km_len = getattr(api_keys, name + '_len')
761         self.assertEqual(len(km), api_km_len)
762         self.assertEqual(km, api_km[:api_km_len])
763
764     def verify_id(self, api_id, exp_id):
765         self.assertEqual(api_id.type, IDType.value(exp_id.type))
766         self.assertEqual(api_id.data_len, exp_id.data_len)
767         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
768
769     def verify_ike_sas(self):
770         r = self.vapi.ikev2_sa_dump()
771         self.assertEqual(len(r), 1)
772         sa = r[0].sa
773         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
774         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
775         if self.ip6:
776             if self.sa.is_initiator:
777                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
778                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
779             else:
780                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
781                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
782         else:
783             if self.sa.is_initiator:
784                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
785                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
786             else:
787                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
788                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
789         self.verify_keymat(sa.keys, self.sa, 'sk_d')
790         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
791         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
792         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
793         self.verify_keymat(sa.keys, self.sa, 'sk_er')
794         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
795         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
796
797         self.assertEqual(sa.i_id.type, self.sa.id_type)
798         self.assertEqual(sa.r_id.type, self.sa.id_type)
799         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
800         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
801         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
802         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
803
804         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
805         self.assertEqual(len(r), 1)
806         csa = r[0].child_sa
807         self.assertEqual(csa.sa_index, sa.sa_index)
808         c = self.sa.child_sas[0]
809         if hasattr(c, 'sk_ai'):
810             self.verify_keymat(csa.keys, c, 'sk_ai')
811             self.verify_keymat(csa.keys, c, 'sk_ar')
812         self.verify_keymat(csa.keys, c, 'sk_ei')
813         self.verify_keymat(csa.keys, c, 'sk_er')
814         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
815         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
816
817         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
818         tsi = tsi[0]
819         tsr = tsr[0]
820         r = self.vapi.ikev2_traffic_selector_dump(
821                 is_initiator=True, sa_index=sa.sa_index,
822                 child_sa_index=csa.child_sa_index)
823         self.assertEqual(len(r), 1)
824         ts = r[0].ts
825         self.verify_ts(r[0].ts, tsi[0], True)
826
827         r = self.vapi.ikev2_traffic_selector_dump(
828                 is_initiator=False, sa_index=sa.sa_index,
829                 child_sa_index=csa.child_sa_index)
830         self.assertEqual(len(r), 1)
831         self.verify_ts(r[0].ts, tsr[0], False)
832
833         n = self.vapi.ikev2_nonce_get(is_initiator=True,
834                                       sa_index=sa.sa_index)
835         self.verify_nonce(n, self.sa.i_nonce)
836         n = self.vapi.ikev2_nonce_get(is_initiator=False,
837                                       sa_index=sa.sa_index)
838         self.verify_nonce(n, self.sa.r_nonce)
839
840     def verify_nonce(self, api_nonce, nonce):
841         self.assertEqual(api_nonce.data_len, len(nonce))
842         self.assertEqual(api_nonce.nonce, nonce)
843
844     def verify_ts(self, api_ts, ts, is_initiator):
845         if is_initiator:
846             self.assertTrue(api_ts.is_local)
847         else:
848             self.assertFalse(api_ts.is_local)
849
850         if self.p.ts_is_ip4:
851             self.assertEqual(api_ts.start_addr,
852                              IPv4Address(ts.starting_address_v4))
853             self.assertEqual(api_ts.end_addr,
854                              IPv4Address(ts.ending_address_v4))
855         else:
856             self.assertEqual(api_ts.start_addr,
857                              IPv6Address(ts.starting_address_v6))
858             self.assertEqual(api_ts.end_addr,
859                              IPv6Address(ts.ending_address_v6))
860         self.assertEqual(api_ts.start_port, ts.start_port)
861         self.assertEqual(api_ts.end_port, ts.end_port)
862         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
863
864
865 class TemplateInitiator(IkePeer):
866     """ initiator test template """
867
868     def initiate_del_sa_from_initiator(self):
869         ispi = int.from_bytes(self.sa.ispi, 'little')
870         self.pg0.enable_capture()
871         self.pg_start()
872         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
873         capture = self.pg0.get_capture(1)
874         ih = self.get_ike_header(capture[0])
875         self.assertNotIn('Response', ih.flags)
876         self.assertIn('Initiator', ih.flags)
877         self.assertEqual(ih.init_SPI, self.sa.ispi)
878         self.assertEqual(ih.resp_SPI, self.sa.rspi)
879         plain = self.sa.hmac_and_decrypt(ih)
880         d = ikev2.IKEv2_payload_Delete(plain)
881         self.assertEqual(d.proto, 1)  # proto=IKEv2
882         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
883                              flags='Response', exch_type='INFORMATIONAL',
884                              id=ih.id, next_payload='Encrypted')
885         resp = self.encrypt_ike_msg(header, b'', None)
886         self.send_and_assert_no_replies(self.pg0, resp)
887
888     def verify_del_sa(self, packet):
889         ih = self.get_ike_header(packet)
890         self.assertEqual(ih.id, self.sa.msg_id)
891         self.assertEqual(ih.exch_type, 37)  # exchange informational
892         self.assertIn('Response', ih.flags)
893         self.assertIn('Initiator', ih.flags)
894         plain = self.sa.hmac_and_decrypt(ih)
895         self.assertEqual(plain, b'')
896
897     def initiate_del_sa_from_responder(self):
898         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
899                              exch_type='INFORMATIONAL',
900                              id=self.sa.new_msg_id())
901         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
902         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
903         packet = self.create_packet(self.pg0, ike_msg,
904                                     self.sa.sport, self.sa.dport,
905                                     self.sa.natt, self.ip6)
906         self.pg0.add_stream(packet)
907         self.pg0.enable_capture()
908         self.pg_start()
909         capture = self.pg0.get_capture(1)
910         self.verify_del_sa(capture[0])
911
912     @staticmethod
913     def find_notify_payload(packet, notify_type):
914         n = packet[ikev2.IKEv2_payload_Notify]
915         while n is not None:
916             if n.type == notify_type:
917                 return n
918             n = n.payload
919         return None
920
921     def verify_nat_detection(self, packet):
922         if self.ip6:
923             iph = packet[IPv6]
924         else:
925             iph = packet[IP]
926         udp = packet[UDP]
927
928         # NAT_DETECTION_SOURCE_IP
929         s = self.find_notify_payload(packet, 16388)
930         self.assertIsNotNone(s)
931         src_sha = self.sa.compute_nat_sha1(
932                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
933         self.assertEqual(s.load, src_sha)
934
935         # NAT_DETECTION_DESTINATION_IP
936         s = self.find_notify_payload(packet, 16389)
937         self.assertIsNotNone(s)
938         dst_sha = self.sa.compute_nat_sha1(
939                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
940         self.assertEqual(s.load, dst_sha)
941
942     def verify_sa_init_request(self, packet):
943         udp = packet[UDP]
944         self.sa.dport = udp.sport
945         ih = packet[ikev2.IKEv2]
946         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
947         self.assertEqual(ih.exch_type, 34)  # SA_INIT
948         self.sa.ispi = ih.init_SPI
949         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
950         self.assertIn('Initiator', ih.flags)
951         self.assertNotIn('Response', ih.flags)
952         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
953         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
954
955         prop = packet[ikev2.IKEv2_payload_Proposal]
956         self.assertEqual(prop.proto, 1)  # proto = ikev2
957         self.assertEqual(prop.proposal, 1)
958         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
959         self.assertEqual(prop.trans[0].transform_id,
960                          self.p.ike_transforms['crypto_alg'])
961         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
962         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
963         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
964         self.assertEqual(prop.trans[2].transform_id,
965                          self.p.ike_transforms['dh_group'])
966
967         self.verify_nat_detection(packet)
968         self.sa.set_ike_props(
969                     crypto='AES-GCM-16ICV', crypto_key_len=32,
970                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
971         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
972                               integ='SHA2-256-128')
973         self.sa.generate_dh_data()
974         self.sa.complete_dh_data()
975         self.sa.calc_keys()
976
977     def update_esp_transforms(self, trans, sa):
978         while trans:
979             if trans.transform_type == 1:  # ecryption
980                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
981             elif trans.transform_type == 3:  # integrity
982                 sa.esp_integ = INTEG_IDS[trans.transform_id]
983             trans = trans.payload
984
985     def verify_sa_auth_req(self, packet):
986         udp = packet[UDP]
987         self.sa.dport = udp.sport
988         ih = self.get_ike_header(packet)
989         self.assertEqual(ih.resp_SPI, self.sa.rspi)
990         self.assertEqual(ih.init_SPI, self.sa.ispi)
991         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
992         self.assertIn('Initiator', ih.flags)
993         self.assertNotIn('Response', ih.flags)
994
995         udp = packet[UDP]
996         self.verify_udp(udp)
997         self.assertEqual(ih.id, self.sa.msg_id + 1)
998         self.sa.msg_id += 1
999         plain = self.sa.hmac_and_decrypt(ih)
1000         idi = ikev2.IKEv2_payload_IDi(plain)
1001         idr = ikev2.IKEv2_payload_IDr(idi.payload)
1002         self.assertEqual(idi.load, self.sa.i_id)
1003         self.assertEqual(idr.load, self.sa.r_id)
1004         prop = idi[ikev2.IKEv2_payload_Proposal]
1005         c = self.sa.child_sas[0]
1006         c.ispi = prop.SPI
1007         self.update_esp_transforms(
1008                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1009
1010     def send_init_response(self):
1011         tr_attr = self.sa.ike_crypto_attr()
1012         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1013                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1014                  key_length=tr_attr[0]) /
1015                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1016                  transform_id=self.sa.ike_integ) /
1017                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1018                  transform_id=self.sa.ike_prf_alg.name) /
1019                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1020                  transform_id=self.sa.ike_dh))
1021         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1022                  trans_nb=4, trans=trans))
1023
1024         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1025         if self.sa.natt:
1026             dst_address = b'\x0a\x0a\x0a\x0a'
1027         else:
1028             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1029         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1030         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1031
1032         self.sa.init_resp_packet = (
1033             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1034                         exch_type='IKE_SA_INIT', flags='Response') /
1035             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1036             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1037                                    group=self.sa.ike_dh,
1038                                    load=self.sa.my_dh_pub_key) /
1039             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1040                                       next_payload='Notify') /
1041             ikev2.IKEv2_payload_Notify(
1042                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1043                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1044                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1045
1046         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1047                                      self.sa.sport, self.sa.dport,
1048                                      False, self.ip6)
1049         self.pg_send(self.pg0, ike_msg)
1050         capture = self.pg0.get_capture(1)
1051         self.verify_sa_auth_req(capture[0])
1052
1053     def initiate_sa_init(self):
1054         self.pg0.enable_capture()
1055         self.pg_start()
1056         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1057
1058         capture = self.pg0.get_capture(1)
1059         self.verify_sa_init_request(capture[0])
1060         self.send_init_response()
1061
1062     def send_auth_response(self):
1063         tr_attr = self.sa.esp_crypto_attr()
1064         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1065                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1066                  key_length=tr_attr[0]) /
1067                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1068                  transform_id=self.sa.esp_integ) /
1069                  ikev2.IKEv2_payload_Transform(
1070                  transform_type='Extended Sequence Number',
1071                  transform_id='No ESN') /
1072                  ikev2.IKEv2_payload_Transform(
1073                  transform_type='Extended Sequence Number',
1074                  transform_id='ESN'))
1075
1076         c = self.sa.child_sas[0]
1077         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1078                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1079
1080         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1081         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1082                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1083                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1084                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1085                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1086                  auth_type=AuthMethod.value(self.sa.auth_method),
1087                  load=self.sa.auth_data) /
1088                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1089                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1090                  number_of_TSs=len(tsi),
1091                  traffic_selector=tsi) /
1092                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1093                  number_of_TSs=len(tsr),
1094                  traffic_selector=tsr) /
1095                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1096
1097         header = ikev2.IKEv2(
1098                 init_SPI=self.sa.ispi,
1099                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1100                 flags='Response', exch_type='IKE_AUTH')
1101
1102         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1103         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1104                                     self.sa.dport, self.sa.natt, self.ip6)
1105         self.pg_send(self.pg0, packet)
1106
1107     def test_initiator(self):
1108         self.initiate_sa_init()
1109         self.sa.auth_init()
1110         self.sa.calc_child_keys()
1111         self.send_auth_response()
1112         self.verify_ike_sas()
1113
1114
1115 class TemplateResponder(IkePeer):
1116     """ responder test template """
1117
1118     def initiate_del_sa_from_responder(self):
1119         self.pg0.enable_capture()
1120         self.pg_start()
1121         self.vapi.ikev2_initiate_del_ike_sa(
1122                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1123         capture = self.pg0.get_capture(1)
1124         ih = self.get_ike_header(capture[0])
1125         self.assertNotIn('Response', ih.flags)
1126         self.assertNotIn('Initiator', ih.flags)
1127         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1128         plain = self.sa.hmac_and_decrypt(ih)
1129         d = ikev2.IKEv2_payload_Delete(plain)
1130         self.assertEqual(d.proto, 1)  # proto=IKEv2
1131         self.assertEqual(ih.init_SPI, self.sa.ispi)
1132         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1133         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1134                              flags='Initiator+Response',
1135                              exch_type='INFORMATIONAL',
1136                              id=ih.id, next_payload='Encrypted')
1137         resp = self.encrypt_ike_msg(header, b'', None)
1138         self.send_and_assert_no_replies(self.pg0, resp)
1139
1140     def verify_del_sa(self, packet):
1141         ih = self.get_ike_header(packet)
1142         self.assertEqual(ih.id, self.sa.msg_id)
1143         self.assertEqual(ih.exch_type, 37)  # exchange informational
1144         self.assertIn('Response', ih.flags)
1145         self.assertNotIn('Initiator', ih.flags)
1146         self.assertEqual(ih.next_payload, 46)  # Encrypted
1147         self.assertEqual(ih.init_SPI, self.sa.ispi)
1148         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1149         plain = self.sa.hmac_and_decrypt(ih)
1150         self.assertEqual(plain, b'')
1151
1152     def initiate_del_sa_from_initiator(self):
1153         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1154                              flags='Initiator', exch_type='INFORMATIONAL',
1155                              id=self.sa.new_msg_id())
1156         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1157         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1158         packet = self.create_packet(self.pg0, ike_msg,
1159                                     self.sa.sport, self.sa.dport,
1160                                     self.sa.natt, self.ip6)
1161         self.pg0.add_stream(packet)
1162         self.pg0.enable_capture()
1163         self.pg_start()
1164         capture = self.pg0.get_capture(1)
1165         self.verify_del_sa(capture[0])
1166
1167     def send_sa_init_req(self):
1168         tr_attr = self.sa.ike_crypto_attr()
1169         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1170                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1171                  key_length=tr_attr[0]) /
1172                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1173                  transform_id=self.sa.ike_integ) /
1174                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1175                  transform_id=self.sa.ike_prf_alg.name) /
1176                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1177                  transform_id=self.sa.ike_dh))
1178
1179         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1180                  trans_nb=4, trans=trans))
1181
1182         next_payload = None if self.ip6 else 'Notify'
1183
1184         self.sa.init_req_packet = (
1185                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1186                             flags='Initiator', exch_type='IKE_SA_INIT') /
1187                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1188                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1189                                        group=self.sa.ike_dh,
1190                                        load=self.sa.my_dh_pub_key) /
1191                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1192                                           load=self.sa.i_nonce))
1193
1194         if not self.ip6:
1195             if self.sa.i_natt:
1196                 src_address = b'\x0a\x0a\x0a\x01'
1197             else:
1198                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1199
1200             if self.sa.r_natt:
1201                 dst_address = b'\x0a\x0a\x0a\x0a'
1202             else:
1203                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1204
1205             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1206             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1207             nat_src_detection = ikev2.IKEv2_payload_Notify(
1208                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1209                     next_payload='Notify')
1210             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1211                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1212             self.sa.init_req_packet = (self.sa.init_req_packet /
1213                                        nat_src_detection /
1214                                        nat_dst_detection)
1215
1216         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1217                                      self.sa.sport, self.sa.dport,
1218                                      self.sa.natt, self.ip6)
1219         self.pg0.add_stream(ike_msg)
1220         self.pg0.enable_capture()
1221         self.pg_start()
1222         capture = self.pg0.get_capture(1)
1223         self.verify_sa_init(capture[0])
1224
1225     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1226         tr_attr = self.sa.esp_crypto_attr()
1227         last_payload = last_payload or 'Notify'
1228         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1229                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1230                  key_length=tr_attr[0]) /
1231                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1232                  transform_id=self.sa.esp_integ) /
1233                  ikev2.IKEv2_payload_Transform(
1234                  transform_type='Extended Sequence Number',
1235                  transform_id='No ESN') /
1236                  ikev2.IKEv2_payload_Transform(
1237                  transform_type='Extended Sequence Number',
1238                  transform_id='ESN'))
1239
1240         c = self.sa.child_sas[0]
1241         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1242                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1243
1244         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1245         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1246                  auth_type=AuthMethod.value(self.sa.auth_method),
1247                  load=self.sa.auth_data) /
1248                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1249                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1250                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1251                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1252                  number_of_TSs=len(tsr), traffic_selector=tsr))
1253
1254         if is_rekey:
1255             first_payload = 'Nonce'
1256             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1257                      next_payload='SA') / plain /
1258                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1259                      proto='ESP', SPI=c.ispi))
1260         else:
1261             first_payload = 'IDi'
1262             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1263                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1264                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1265                    IDtype=self.sa.id_type, load=self.sa.r_id))
1266             plain = ids / plain
1267         return plain, first_payload
1268
1269     def send_sa_auth(self):
1270         plain, first_payload = self.generate_auth_payload(
1271                     last_payload='Notify')
1272         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1273         header = ikev2.IKEv2(
1274                 init_SPI=self.sa.ispi,
1275                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1276                 flags='Initiator', exch_type='IKE_AUTH')
1277
1278         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1279         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1280                                     self.sa.dport, self.sa.natt, self.ip6)
1281         self.pg0.add_stream(packet)
1282         self.pg0.enable_capture()
1283         self.pg_start()
1284         capture = self.pg0.get_capture(1)
1285         self.verify_sa_auth_resp(capture[0])
1286
1287     def verify_sa_init(self, packet):
1288         ih = self.get_ike_header(packet)
1289
1290         self.assertEqual(ih.id, self.sa.msg_id)
1291         self.assertEqual(ih.exch_type, 34)
1292         self.assertIn('Response', ih.flags)
1293         self.assertEqual(ih.init_SPI, self.sa.ispi)
1294         self.assertNotEqual(ih.resp_SPI, 0)
1295         self.sa.rspi = ih.resp_SPI
1296         try:
1297             sa = ih[ikev2.IKEv2_payload_SA]
1298             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1299             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1300         except IndexError as e:
1301             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1302             self.logger.error(ih.show())
1303             raise
1304         self.sa.complete_dh_data()
1305         self.sa.calc_keys()
1306         self.sa.auth_init()
1307
1308     def verify_sa_auth_resp(self, packet):
1309         ike = self.get_ike_header(packet)
1310         udp = packet[UDP]
1311         self.verify_udp(udp)
1312         self.assertEqual(ike.id, self.sa.msg_id)
1313         plain = self.sa.hmac_and_decrypt(ike)
1314         idr = ikev2.IKEv2_payload_IDr(plain)
1315         prop = idr[ikev2.IKEv2_payload_Proposal]
1316         self.assertEqual(prop.SPIsize, 4)
1317         self.sa.child_sas[0].rspi = prop.SPI
1318         self.sa.calc_child_keys()
1319
1320     def test_responder(self):
1321         self.send_sa_init_req()
1322         self.send_sa_auth()
1323         self.verify_ipsec_sas()
1324         self.verify_ike_sas()
1325
1326
1327 class Ikev2Params(object):
1328     def config_params(self, params={}):
1329         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1330         ei = VppEnum.vl_api_ipsec_integ_alg_t
1331         self.vpp_enums = {
1332                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1333                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1334                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1335                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1336                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1337                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1338
1339                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1340                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1341                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1342                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1343
1344         dpd_disabled = True if 'dpd_disabled' not in params else\
1345             params['dpd_disabled']
1346         if dpd_disabled:
1347             self.vapi.cli('ikev2 dpd disable')
1348         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1349             not in params else params['del_sa_from_responder']
1350         i_natt = False if 'i_natt' not in params else params['i_natt']
1351         r_natt = False if 'r_natt' not in params else params['r_natt']
1352         self.p = Profile(self, 'pr1')
1353         self.ip6 = False if 'ip6' not in params else params['ip6']
1354
1355         if 'auth' in params and params['auth'] == 'rsa-sig':
1356             auth_method = 'rsa-sig'
1357             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1358             self.vapi.ikev2_set_local_key(
1359                     key_file=work_dir + params['server-key'])
1360
1361             client_file = work_dir + params['client-cert']
1362             server_pem = open(work_dir + params['server-cert']).read()
1363             client_priv = open(work_dir + params['client-key']).read()
1364             client_priv = load_pem_private_key(str.encode(client_priv), None,
1365                                                default_backend())
1366             self.peer_cert = x509.load_pem_x509_certificate(
1367                     str.encode(server_pem),
1368                     default_backend())
1369             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1370             auth_data = None
1371         else:
1372             auth_data = b'$3cr3tpa$$w0rd'
1373             self.p.add_auth(method='shared-key', data=auth_data)
1374             auth_method = 'shared-key'
1375             client_priv = None
1376
1377         is_init = True if 'is_initiator' not in params else\
1378             params['is_initiator']
1379
1380         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1381         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1382         if is_init:
1383             self.p.add_local_id(**idr)
1384             self.p.add_remote_id(**idi)
1385         else:
1386             self.p.add_local_id(**idi)
1387             self.p.add_remote_id(**idr)
1388
1389         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1390             'loc_ts' not in params else params['loc_ts']
1391         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1392             'rem_ts' not in params else params['rem_ts']
1393         self.p.add_local_ts(**loc_ts)
1394         self.p.add_remote_ts(**rem_ts)
1395         if 'responder' in params:
1396             self.p.add_responder(params['responder'])
1397         if 'ike_transforms' in params:
1398             self.p.add_ike_transforms(params['ike_transforms'])
1399         if 'esp_transforms' in params:
1400             self.p.add_esp_transforms(params['esp_transforms'])
1401
1402         udp_encap = False if 'udp_encap' not in params else\
1403             params['udp_encap']
1404         if udp_encap:
1405             self.p.set_udp_encap(True)
1406
1407         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1408                           is_initiator=is_init,
1409                           id_type=self.p.local_id['id_type'],
1410                           i_natt=i_natt, r_natt=r_natt,
1411                           priv_key=client_priv, auth_method=auth_method,
1412                           auth_data=auth_data, udp_encap=udp_encap,
1413                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1414         if is_init:
1415             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1416                 params['ike-crypto']
1417             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1418                 params['ike-integ']
1419             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1420                 params['ike-dh']
1421
1422             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1423                 params['esp-crypto']
1424             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1425                 params['esp-integ']
1426
1427             self.sa.set_ike_props(
1428                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1429                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1430             self.sa.set_esp_props(
1431                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1432                     integ=esp_integ)
1433
1434
1435 class TestApi(VppTestCase):
1436     """ Test IKEV2 API """
1437     @classmethod
1438     def setUpClass(cls):
1439         super(TestApi, cls).setUpClass()
1440
1441     @classmethod
1442     def tearDownClass(cls):
1443         super(TestApi, cls).tearDownClass()
1444
1445     def tearDown(self):
1446         super(TestApi, self).tearDown()
1447         self.p1.remove_vpp_config()
1448         self.p2.remove_vpp_config()
1449         r = self.vapi.ikev2_profile_dump()
1450         self.assertEqual(len(r), 0)
1451
1452     def configure_profile(self, cfg):
1453         p = Profile(self, cfg['name'])
1454         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1455         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1456         p.add_local_ts(**cfg['loc_ts'])
1457         p.add_remote_ts(**cfg['rem_ts'])
1458         p.add_responder(cfg['responder'])
1459         p.add_ike_transforms(cfg['ike_ts'])
1460         p.add_esp_transforms(cfg['esp_ts'])
1461         p.add_auth(**cfg['auth'])
1462         p.set_udp_encap(cfg['udp_encap'])
1463         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1464         if 'lifetime_data' in cfg:
1465             p.set_lifetime_data(cfg['lifetime_data'])
1466         if 'tun_itf' in cfg:
1467             p.set_tunnel_interface(cfg['tun_itf'])
1468         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1469             p.disable_natt()
1470         p.add_vpp_config()
1471         return p
1472
1473     def test_profile_api(self):
1474         """ test profile dump API """
1475         loc_ts4 = {
1476                     'proto': 8,
1477                     'start_port': 1,
1478                     'end_port': 19,
1479                     'start_addr': '3.3.3.2',
1480                     'end_addr': '3.3.3.3',
1481                 }
1482         rem_ts4 = {
1483                     'proto': 9,
1484                     'start_port': 10,
1485                     'end_port': 119,
1486                     'start_addr': '4.5.76.80',
1487                     'end_addr': '2.3.4.6',
1488                 }
1489
1490         loc_ts6 = {
1491                     'proto': 8,
1492                     'start_port': 1,
1493                     'end_port': 19,
1494                     'start_addr': 'ab::1',
1495                     'end_addr': 'ab::4',
1496                 }
1497         rem_ts6 = {
1498                     'proto': 9,
1499                     'start_port': 10,
1500                     'end_port': 119,
1501                     'start_addr': 'cd::12',
1502                     'end_addr': 'cd::13',
1503                 }
1504
1505         conf = {
1506             'p1': {
1507                 'name': 'p1',
1508                 'natt_disabled': True,
1509                 'loc_id': ('fqdn', b'vpp.home'),
1510                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1511                 'loc_ts': loc_ts4,
1512                 'rem_ts': rem_ts4,
1513                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1514                 'ike_ts': {
1515                         'crypto_alg': 20,
1516                         'crypto_key_size': 32,
1517                         'integ_alg': 1,
1518                         'dh_group': 1},
1519                 'esp_ts': {
1520                         'crypto_alg': 13,
1521                         'crypto_key_size': 24,
1522                         'integ_alg': 2},
1523                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1524                 'udp_encap': True,
1525                 'ipsec_over_udp_port': 4501,
1526                 'lifetime_data': {
1527                     'lifetime': 123,
1528                     'lifetime_maxdata': 20192,
1529                     'lifetime_jitter': 9,
1530                     'handover': 132},
1531             },
1532             'p2': {
1533                 'name': 'p2',
1534                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1535                 'rem_id': ('ip6-addr', b'abcd::1'),
1536                 'loc_ts': loc_ts6,
1537                 'rem_ts': rem_ts6,
1538                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1539                 'ike_ts': {
1540                         'crypto_alg': 12,
1541                         'crypto_key_size': 16,
1542                         'integ_alg': 3,
1543                         'dh_group': 3},
1544                 'esp_ts': {
1545                         'crypto_alg': 9,
1546                         'crypto_key_size': 24,
1547                         'integ_alg': 4},
1548                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1549                 'udp_encap': False,
1550                 'ipsec_over_udp_port': 4600,
1551                 'tun_itf': 0}
1552         }
1553         self.p1 = self.configure_profile(conf['p1'])
1554         self.p2 = self.configure_profile(conf['p2'])
1555
1556         r = self.vapi.ikev2_profile_dump()
1557         self.assertEqual(len(r), 2)
1558         self.verify_profile(r[0].profile, conf['p1'])
1559         self.verify_profile(r[1].profile, conf['p2'])
1560
1561     def verify_id(self, api_id, cfg_id):
1562         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1563         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1564
1565     def verify_ts(self, api_ts, cfg_ts):
1566         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1567         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1568         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1569         self.assertEqual(api_ts.start_addr,
1570                          ip_address(text_type(cfg_ts['start_addr'])))
1571         self.assertEqual(api_ts.end_addr,
1572                          ip_address(text_type(cfg_ts['end_addr'])))
1573
1574     def verify_responder(self, api_r, cfg_r):
1575         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1576         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1577
1578     def verify_transforms(self, api_ts, cfg_ts):
1579         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1580         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1581         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1582
1583     def verify_ike_transforms(self, api_ts, cfg_ts):
1584         self.verify_transforms(api_ts, cfg_ts)
1585         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1586
1587     def verify_esp_transforms(self, api_ts, cfg_ts):
1588         self.verify_transforms(api_ts, cfg_ts)
1589
1590     def verify_auth(self, api_auth, cfg_auth):
1591         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1592         self.assertEqual(api_auth.data, cfg_auth['data'])
1593         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1594
1595     def verify_lifetime_data(self, p, ld):
1596         self.assertEqual(p.lifetime, ld['lifetime'])
1597         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1598         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1599         self.assertEqual(p.handover, ld['handover'])
1600
1601     def verify_profile(self, ap, cp):
1602         self.assertEqual(ap.name, cp['name'])
1603         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1604         self.verify_id(ap.loc_id, cp['loc_id'])
1605         self.verify_id(ap.rem_id, cp['rem_id'])
1606         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1607         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1608         self.verify_responder(ap.responder, cp['responder'])
1609         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1610         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1611         self.verify_auth(ap.auth, cp['auth'])
1612         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1613         self.assertTrue(natt_dis == ap.natt_disabled)
1614
1615         if 'lifetime_data' in cp:
1616             self.verify_lifetime_data(ap, cp['lifetime_data'])
1617         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1618         if 'tun_itf' in cp:
1619             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1620         else:
1621             self.assertEqual(ap.tun_itf, 0xffffffff)
1622
1623
1624 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1625     """ test responder - responder behind NAT """
1626
1627     def config_tc(self):
1628         self.config_params({'r_natt': True})
1629
1630
1631 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1632     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1633
1634     def config_tc(self):
1635         self.config_params({
1636             'i_natt': True,
1637             'is_initiator': False,  # seen from test case perspective
1638                                     # thus vpp is initiator
1639             'responder': {'sw_if_index': self.pg0.sw_if_index,
1640                            'addr': self.pg0.remote_ip4},
1641             'ike-crypto': ('AES-GCM-16ICV', 32),
1642             'ike-integ': 'NULL',
1643             'ike-dh': '3072MODPgr',
1644             'ike_transforms': {
1645                 'crypto_alg': 20,  # "aes-gcm-16"
1646                 'crypto_key_size': 256,
1647                 'dh_group': 15,  # "modp-3072"
1648             },
1649             'esp_transforms': {
1650                 'crypto_alg': 12,  # "aes-cbc"
1651                 'crypto_key_size': 256,
1652                 # "hmac-sha2-256-128"
1653                 'integ_alg': 12}})
1654
1655
1656 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1657     """ test ikev2 initiator - pre shared key auth """
1658
1659     def config_tc(self):
1660         self.config_params({
1661             'is_initiator': False,  # seen from test case perspective
1662                                     # thus vpp is initiator
1663             'responder': {'sw_if_index': self.pg0.sw_if_index,
1664                            'addr': self.pg0.remote_ip4},
1665             'ike-crypto': ('AES-GCM-16ICV', 32),
1666             'ike-integ': 'NULL',
1667             'ike-dh': '3072MODPgr',
1668             'ike_transforms': {
1669                 'crypto_alg': 20,  # "aes-gcm-16"
1670                 'crypto_key_size': 256,
1671                 'dh_group': 15,  # "modp-3072"
1672             },
1673             'esp_transforms': {
1674                 'crypto_alg': 12,  # "aes-cbc"
1675                 'crypto_key_size': 256,
1676                 # "hmac-sha2-256-128"
1677                 'integ_alg': 12}})
1678
1679
1680 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1681     """ test initiator - request window size (1) """
1682
1683     def rekey_respond(self, req, update_child_sa_data):
1684         ih = self.get_ike_header(req)
1685         plain = self.sa.hmac_and_decrypt(ih)
1686         sa = ikev2.IKEv2_payload_SA(plain)
1687         if update_child_sa_data:
1688             prop = sa[ikev2.IKEv2_payload_Proposal]
1689             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1690             self.sa.r_nonce = self.sa.i_nonce
1691             self.sa.child_sas[0].ispi = prop.SPI
1692             self.sa.child_sas[0].rspi = prop.SPI
1693             self.sa.calc_child_keys()
1694
1695         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1696                              flags='Response', exch_type=36,
1697                              id=ih.id, next_payload='Encrypted')
1698         resp = self.encrypt_ike_msg(header, sa, 'SA')
1699         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1700                                     self.sa.dport, self.sa.natt, self.ip6)
1701         self.send_and_assert_no_replies(self.pg0, packet)
1702
1703     def test_initiator(self):
1704         super(TestInitiatorRequestWindowSize, self).test_initiator()
1705         self.pg0.enable_capture()
1706         self.pg_start()
1707         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1708         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1709         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1710         capture = self.pg0.get_capture(2)
1711
1712         # reply in reverse order
1713         self.rekey_respond(capture[1], True)
1714         self.rekey_respond(capture[0], False)
1715
1716         # verify that only the second request was accepted
1717         self.verify_ike_sas()
1718         self.verify_ipsec_sas(is_rekey=True)
1719
1720
1721 class TestInitiatorRekey(TestInitiatorPsk):
1722     """ test ikev2 initiator - rekey """
1723
1724     def rekey_from_initiator(self):
1725         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1726         self.pg0.enable_capture()
1727         self.pg_start()
1728         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1729         capture = self.pg0.get_capture(1)
1730         ih = self.get_ike_header(capture[0])
1731         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1732         self.assertNotIn('Response', ih.flags)
1733         self.assertIn('Initiator', ih.flags)
1734         plain = self.sa.hmac_and_decrypt(ih)
1735         sa = ikev2.IKEv2_payload_SA(plain)
1736         prop = sa[ikev2.IKEv2_payload_Proposal]
1737         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1738         self.sa.r_nonce = self.sa.i_nonce
1739         # update new responder SPI
1740         self.sa.child_sas[0].ispi = prop.SPI
1741         self.sa.child_sas[0].rspi = prop.SPI
1742         self.sa.calc_child_keys()
1743         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1744                              flags='Response', exch_type=36,
1745                              id=ih.id, next_payload='Encrypted')
1746         resp = self.encrypt_ike_msg(header, sa, 'SA')
1747         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1748                                     self.sa.dport, self.sa.natt, self.ip6)
1749         self.send_and_assert_no_replies(self.pg0, packet)
1750
1751     def test_initiator(self):
1752         super(TestInitiatorRekey, self).test_initiator()
1753         self.rekey_from_initiator()
1754         self.verify_ike_sas()
1755         self.verify_ipsec_sas(is_rekey=True)
1756
1757
1758 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1759     """ test ikev2 initiator - delete IKE SA from responder """
1760
1761     def config_tc(self):
1762         self.config_params({
1763             'del_sa_from_responder': True,
1764             'is_initiator': False,  # seen from test case perspective
1765                                     # thus vpp is initiator
1766             'responder': {'sw_if_index': self.pg0.sw_if_index,
1767                            'addr': self.pg0.remote_ip4},
1768             'ike-crypto': ('AES-GCM-16ICV', 32),
1769             'ike-integ': 'NULL',
1770             'ike-dh': '3072MODPgr',
1771             'ike_transforms': {
1772                 'crypto_alg': 20,  # "aes-gcm-16"
1773                 'crypto_key_size': 256,
1774                 'dh_group': 15,  # "modp-3072"
1775             },
1776             'esp_transforms': {
1777                 'crypto_alg': 12,  # "aes-cbc"
1778                 'crypto_key_size': 256,
1779                 # "hmac-sha2-256-128"
1780                 'integ_alg': 12}})
1781
1782
1783 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1784     """ test ikev2 responder - initiator behind NAT """
1785     def config_tc(self):
1786         self.config_params(
1787                 {'i_natt': True})
1788
1789
1790 class TestResponderPsk(TemplateResponder, Ikev2Params):
1791     """ test ikev2 responder - pre shared key auth """
1792     def config_tc(self):
1793         self.config_params()
1794
1795
1796 class TestResponderDpd(TestResponderPsk):
1797     """
1798     Dead peer detection test
1799     """
1800     def config_tc(self):
1801         self.config_params({'dpd_disabled': False})
1802
1803     def tearDown(self):
1804         pass
1805
1806     def test_responder(self):
1807         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1808         super(TestResponderDpd, self).test_responder()
1809         self.pg0.enable_capture()
1810         self.pg_start()
1811         # capture empty request but don't reply
1812         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1813         ih = self.get_ike_header(capture[0])
1814         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1815         plain = self.sa.hmac_and_decrypt(ih)
1816         self.assertEqual(plain, b'')
1817         # wait for SA expiration
1818         time.sleep(3)
1819         ike_sas = self.vapi.ikev2_sa_dump()
1820         self.assertEqual(len(ike_sas), 0)
1821         ipsec_sas = self.vapi.ipsec_sa_dump()
1822         self.assertEqual(len(ipsec_sas), 0)
1823
1824
1825 class TestResponderRekey(TestResponderPsk):
1826     """ test ikev2 responder - rekey """
1827
1828     def rekey_from_initiator(self):
1829         packet = self.create_rekey_request()
1830         self.pg0.add_stream(packet)
1831         self.pg0.enable_capture()
1832         self.pg_start()
1833         capture = self.pg0.get_capture(1)
1834         ih = self.get_ike_header(capture[0])
1835         plain = self.sa.hmac_and_decrypt(ih)
1836         sa = ikev2.IKEv2_payload_SA(plain)
1837         prop = sa[ikev2.IKEv2_payload_Proposal]
1838         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1839         # update new responder SPI
1840         self.sa.child_sas[0].rspi = prop.SPI
1841
1842     def test_responder(self):
1843         super(TestResponderRekey, self).test_responder()
1844         self.rekey_from_initiator()
1845         self.sa.calc_child_keys()
1846         self.verify_ike_sas()
1847         self.verify_ipsec_sas(is_rekey=True)
1848
1849
1850 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1851     """ test ikev2 responder - cert based auth """
1852     def config_tc(self):
1853         self.config_params({
1854             'udp_encap': True,
1855             'auth': 'rsa-sig',
1856             'server-key': 'server-key.pem',
1857             'client-key': 'client-key.pem',
1858             'client-cert': 'client-cert.pem',
1859             'server-cert': 'server-cert.pem'})
1860
1861
1862 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1863         (TemplateResponder, Ikev2Params):
1864     """
1865     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1866     """
1867     def config_tc(self):
1868         self.config_params({
1869             'ike-crypto': ('AES-CBC', 16),
1870             'ike-integ': 'SHA2-256-128',
1871             'esp-crypto': ('AES-CBC', 24),
1872             'esp-integ': 'SHA2-384-192',
1873             'ike-dh': '2048MODPgr'})
1874
1875
1876 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1877         (TemplateResponder, Ikev2Params):
1878     """
1879     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1880     """
1881     def config_tc(self):
1882         self.config_params({
1883             'ike-crypto': ('AES-CBC', 32),
1884             'ike-integ': 'SHA2-256-128',
1885             'esp-crypto': ('AES-GCM-16ICV', 32),
1886             'esp-integ': 'NULL',
1887             'ike-dh': '3072MODPgr'})
1888
1889
1890 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1891     """
1892     IKE:AES_GCM_16_256
1893     """
1894     def config_tc(self):
1895         self.config_params({
1896             'del_sa_from_responder': True,
1897             'ip6': True,
1898             'natt': True,
1899             'ike-crypto': ('AES-GCM-16ICV', 32),
1900             'ike-integ': 'NULL',
1901             'ike-dh': '2048MODPgr',
1902             'loc_ts': {'start_addr': 'ab:cd::0',
1903                        'end_addr': 'ab:cd::10'},
1904             'rem_ts': {'start_addr': '11::0',
1905                        'end_addr': '11::100'}})
1906
1907
1908 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1909     """
1910     Test for keep alive messages
1911     """
1912
1913     def send_empty_req_from_responder(self):
1914         packet = self.create_empty_request()
1915         self.pg0.add_stream(packet)
1916         self.pg0.enable_capture()
1917         self.pg_start()
1918         capture = self.pg0.get_capture(1)
1919         ih = self.get_ike_header(capture[0])
1920         self.assertEqual(ih.id, self.sa.msg_id)
1921         plain = self.sa.hmac_and_decrypt(ih)
1922         self.assertEqual(plain, b'')
1923
1924     def test_initiator(self):
1925         super(TestInitiatorKeepaliveMsg, self).test_initiator()
1926         self.send_empty_req_from_responder()
1927
1928
1929 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1930     """ malformed packet test """
1931
1932     def tearDown(self):
1933         pass
1934
1935     def config_tc(self):
1936         self.config_params()
1937
1938     def assert_counter(self, count, name, version='ip4'):
1939         node_name = '/err/ikev2-%s/' % version + name
1940         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1941
1942     def create_ike_init_msg(self, length=None, payload=None):
1943         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1944                           flags='Initiator', exch_type='IKE_SA_INIT')
1945         if payload is not None:
1946             msg /= payload
1947         return self.create_packet(self.pg0, msg, self.sa.sport,
1948                                   self.sa.dport)
1949
1950     def verify_bad_packet_length(self):
1951         ike_msg = self.create_ike_init_msg(length=0xdead)
1952         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1953         self.assert_counter(self.pkt_count, 'Bad packet length')
1954
1955     def verify_bad_sa_payload_length(self):
1956         p = ikev2.IKEv2_payload_SA(length=0xdead)
1957         ike_msg = self.create_ike_init_msg(payload=p)
1958         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1959         self.assert_counter(self.pkt_count, 'Malformed packet')
1960
1961     def test_responder(self):
1962         self.pkt_count = 254
1963         self.verify_bad_packet_length()
1964         self.verify_bad_sa_payload_length()
1965
1966
1967 if __name__ == '__main__':
1968     unittest.main(testRunner=VppTestRunner)