ikev2: refactor and test profile dump API
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
8     Cipher,
9     algorithms,
10     modes,
11 )
12 from ipaddress import IPv4Address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.packet import raw, Raw
16 from scapy.utils import long_converter
17 from framework import VppTestCase, VppTestRunner
18 from vpp_ikev2 import Profile, IDType, AuthMethod
19 from vpp_papi import VppEnum
20
21
22 KEY_PAD = b"Key Pad for IKEv2"
23 SALT_SIZE = 4
24 GCM_ICV_SIZE = 16
25 GCM_IV_SIZE = 8
26
27
28 # defined in rfc3526
29 # tuple structure is (p, g, key_len)
30 DH = {
31     '2048MODPgr': (long_converter("""
32     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
33     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
34     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
35     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
36     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
37     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
38     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
39     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
40     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
41     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
42     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
43
44     '3072MODPgr': (long_converter("""
45     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
56     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
57     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
58     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
59     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
60     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
61 }
62
63
64 class CryptoAlgo(object):
65     def __init__(self, name, cipher, mode):
66         self.name = name
67         self.cipher = cipher
68         self.mode = mode
69         if self.cipher is not None:
70             self.bs = self.cipher.block_size // 8
71
72             if self.name == 'AES-GCM-16ICV':
73                 self.iv_len = GCM_IV_SIZE
74             else:
75                 self.iv_len = self.bs
76
77     def encrypt(self, data, key, aad=None):
78         iv = os.urandom(self.iv_len)
79         if aad is None:
80             encryptor = Cipher(self.cipher(key), self.mode(iv),
81                                default_backend()).encryptor()
82             return iv + encryptor.update(data) + encryptor.finalize()
83         else:
84             salt = key[-SALT_SIZE:]
85             nonce = salt + iv
86             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
87                                default_backend()).encryptor()
88             encryptor.authenticate_additional_data(aad)
89             data = encryptor.update(data) + encryptor.finalize()
90             data += encryptor.tag[:GCM_ICV_SIZE]
91             return iv + data
92
93     def decrypt(self, data, key, aad=None, icv=None):
94         if aad is None:
95             iv = data[:self.iv_len]
96             ct = data[self.iv_len:]
97             decryptor = Cipher(algorithms.AES(key),
98                                self.mode(iv),
99                                default_backend()).decryptor()
100             return decryptor.update(ct) + decryptor.finalize()
101         else:
102             salt = key[-SALT_SIZE:]
103             nonce = salt + data[:GCM_IV_SIZE]
104             ct = data[GCM_IV_SIZE:]
105             key = key[:-SALT_SIZE]
106             decryptor = Cipher(algorithms.AES(key),
107                                self.mode(nonce, icv, len(icv)),
108                                default_backend()).decryptor()
109             decryptor.authenticate_additional_data(aad)
110             pt = decryptor.update(ct) + decryptor.finalize()
111             pad_len = pt[-1] + 1
112             return pt[:-pad_len]
113
114     def pad(self, data):
115         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
116         data = data + b'\x00' * (pad_len - 1)
117         return data + bytes([pad_len])
118
119
120 class AuthAlgo(object):
121     def __init__(self, name, mac, mod, key_len, trunc_len=None):
122         self.name = name
123         self.mac = mac
124         self.mod = mod
125         self.key_len = key_len
126         self.trunc_len = trunc_len or key_len
127
128
129 CRYPTO_ALGOS = {
130     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
131     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
132     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
133                                 mode=modes.GCM),
134 }
135
136 AUTH_ALGOS = {
137     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
138     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
139     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
140     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
141     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
142 }
143
144 PRF_ALGOS = {
145     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
147                                   hashes.SHA256, 32),
148 }
149
150
151 class IKEv2ChildSA(object):
152     def __init__(self, local_ts, remote_ts, spi=None):
153         self.spi = spi or os.urandom(4)
154         self.local_ts = local_ts
155         self.remote_ts = remote_ts
156
157
158 class IKEv2SA(object):
159     def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
160                  i_id=None, r_id=None, id_type='fqdn', nonce=None,
161                  auth_data=None, local_ts=None, remote_ts=None,
162                  auth_method='shared-key', priv_key=None, natt=False):
163         self.natt = natt
164         if natt:
165             self.sport = 4500
166             self.dport = 4500
167         else:
168             self.sport = 500
169             self.dport = 500
170         self.dh_params = None
171         self.test = test
172         self.priv_key = priv_key
173         self.is_initiator = is_initiator
174         nonce = nonce or os.urandom(32)
175         self.auth_data = auth_data
176         self.i_id = i_id
177         self.r_id = r_id
178         if isinstance(id_type, str):
179             self.id_type = IDType.value(id_type)
180         else:
181             self.id_type = id_type
182         self.auth_method = auth_method
183         if self.is_initiator:
184             self.rspi = None
185             self.ispi = spi
186             self.i_nonce = nonce
187         else:
188             self.rspi = spi
189             self.ispi = None
190             self.r_nonce = None
191         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
192
193     def dh_pub_key(self):
194         return self.i_dh_data
195
196     def compute_secret(self):
197         priv = self.dh_private_key
198         peer = self.r_dh_data
199         p, g, l = self.ike_group
200         return pow(int.from_bytes(peer, 'big'),
201                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
202
203     def generate_dh_data(self):
204         # generate DH keys
205         if self.is_initiator:
206             if self.ike_dh not in DH:
207                 raise NotImplementedError('%s not in DH group' % self.ike_dh)
208             if self.dh_params is None:
209                 dhg = DH[self.ike_dh]
210                 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
211                 self.dh_params = pn.parameters(default_backend())
212             priv = self.dh_params.generate_private_key()
213             pub = priv.public_key()
214             x = priv.private_numbers().x
215             self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
216             y = pub.public_numbers().y
217             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
218
219     def complete_dh_data(self):
220         self.dh_shared_secret = self.compute_secret()
221
222     def calc_child_keys(self):
223         prf = self.ike_prf_alg.mod()
224         s = self.i_nonce + self.r_nonce
225         c = self.child_sas[0]
226
227         encr_key_len = self.esp_crypto_key_len
228         integ_key_len = self.esp_integ_alg.key_len
229         salt_len = 0 if integ_key_len else 4
230
231         l = (integ_key_len * 2 +
232              encr_key_len * 2 +
233              salt_len * 2)
234         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
235
236         pos = 0
237         c.sk_ei = keymat[pos:pos+encr_key_len]
238         pos += encr_key_len
239
240         if integ_key_len:
241             c.sk_ai = keymat[pos:pos+integ_key_len]
242             pos += integ_key_len
243         else:
244             c.salt_ei = keymat[pos:pos+salt_len]
245             pos += salt_len
246
247         c.sk_er = keymat[pos:pos+encr_key_len]
248         pos += encr_key_len
249
250         if integ_key_len:
251             c.sk_ar = keymat[pos:pos+integ_key_len]
252             pos += integ_key_len
253         else:
254             c.salt_er = keymat[pos:pos+salt_len]
255             pos += salt_len
256
257     def calc_prfplus(self, prf, key, seed, length):
258         r = b''
259         t = None
260         x = 1
261         while len(r) < length and x < 255:
262             if t is not None:
263                 s = t
264             else:
265                 s = b''
266             s = s + seed + bytes([x])
267             t = self.calc_prf(prf, key, s)
268             r = r + t
269             x = x + 1
270
271         if x == 255:
272             return None
273         return r
274
275     def calc_prf(self, prf, key, data):
276         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
277         h.update(data)
278         return h.finalize()
279
280     def calc_keys(self):
281         prf = self.ike_prf_alg.mod()
282         # SKEYSEED = prf(Ni | Nr, g^ir)
283         s = self.i_nonce + self.r_nonce
284         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
285
286         # calculate S = Ni | Nr | SPIi SPIr
287         s = s + self.ispi + self.rspi
288
289         prf_key_trunc = self.ike_prf_alg.trunc_len
290         encr_key_len = self.ike_crypto_key_len
291         tr_prf_key_len = self.ike_prf_alg.key_len
292         integ_key_len = self.ike_integ_alg.key_len
293         if integ_key_len == 0:
294             salt_size = 4
295         else:
296             salt_size = 0
297
298         l = (prf_key_trunc +
299              integ_key_len * 2 +
300              encr_key_len * 2 +
301              tr_prf_key_len * 2 +
302              salt_size * 2)
303         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
304
305         pos = 0
306         self.sk_d = keymat[:pos+prf_key_trunc]
307         pos += prf_key_trunc
308
309         self.sk_ai = keymat[pos:pos+integ_key_len]
310         pos += integ_key_len
311         self.sk_ar = keymat[pos:pos+integ_key_len]
312         pos += integ_key_len
313
314         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
315         pos += encr_key_len + salt_size
316         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
317         pos += encr_key_len + salt_size
318
319         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
320         pos += tr_prf_key_len
321         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
322
323     def generate_authmsg(self, prf, packet):
324         if self.is_initiator:
325             id = self.i_id
326             nonce = self.r_nonce
327             key = self.sk_pi
328         data = bytes([self.id_type, 0, 0, 0]) + id
329         id_hash = self.calc_prf(prf, key, data)
330         return packet + nonce + id_hash
331
332     def auth_init(self):
333         prf = self.ike_prf_alg.mod()
334         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
335         if self.auth_method == 'shared-key':
336             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
337             self.auth_data = self.calc_prf(prf, psk, authmsg)
338         elif self.auth_method == 'rsa-sig':
339             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
340                                                 hashes.SHA1())
341         else:
342             raise TypeError('unknown auth method type!')
343
344     def encrypt(self, data, aad=None):
345         data = self.ike_crypto_alg.pad(data)
346         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
347
348     @property
349     def peer_authkey(self):
350         if self.is_initiator:
351             return self.sk_ar
352         return self.sk_ai
353
354     @property
355     def my_authkey(self):
356         if self.is_initiator:
357             return self.sk_ai
358         return self.sk_ar
359
360     @property
361     def my_cryptokey(self):
362         if self.is_initiator:
363             return self.sk_ei
364         return self.sk_er
365
366     @property
367     def peer_cryptokey(self):
368         if self.is_initiator:
369             return self.sk_er
370         return self.sk_ei
371
372     def concat(self, alg, key_len):
373         return alg + '-' + str(key_len * 8)
374
375     @property
376     def vpp_ike_cypto_alg(self):
377         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
378
379     @property
380     def vpp_esp_cypto_alg(self):
381         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
382
383     def verify_hmac(self, ikemsg):
384         integ_trunc = self.ike_integ_alg.trunc_len
385         exp_hmac = ikemsg[-integ_trunc:]
386         data = ikemsg[:-integ_trunc]
387         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
388                                           self.peer_authkey, data)
389         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
390
391     def compute_hmac(self, integ, key, data):
392         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
393         h.update(data)
394         return h.finalize()
395
396     def decrypt(self, data, aad=None, icv=None):
397         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
398
399     def hmac_and_decrypt(self, ike):
400         ep = ike[ikev2.IKEv2_payload_Encrypted]
401         if self.ike_crypto == 'AES-GCM-16ICV':
402             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
403             ct = ep.load[:-GCM_ICV_SIZE]
404             tag = ep.load[-GCM_ICV_SIZE:]
405             return self.decrypt(ct, raw(ike)[:aad_len], tag)
406         else:
407             self.verify_hmac(raw(ike))
408             integ_trunc = self.ike_integ_alg.trunc_len
409
410             # remove ICV and decrypt payload
411             ct = ep.load[:-integ_trunc]
412             return self.decrypt(ct)
413
414     def generate_ts(self):
415         c = self.child_sas[0]
416         ts1 = ikev2.IPv4TrafficSelector(
417                 IP_protocol_ID=0,
418                 starting_address_v4=c.local_ts['start_addr'],
419                 ending_address_v4=c.local_ts['end_addr'])
420         ts2 = ikev2.IPv4TrafficSelector(
421                 IP_protocol_ID=0,
422                 starting_address_v4=c.remote_ts['start_addr'],
423                 ending_address_v4=c.remote_ts['end_addr'])
424         return ([ts1], [ts2])
425
426     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
427         if crypto not in CRYPTO_ALGOS:
428             raise TypeError('unsupported encryption algo %r' % crypto)
429         self.ike_crypto = crypto
430         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
431         self.ike_crypto_key_len = crypto_key_len
432
433         if integ not in AUTH_ALGOS:
434             raise TypeError('unsupported auth algo %r' % integ)
435         self.ike_integ = None if integ == 'NULL' else integ
436         self.ike_integ_alg = AUTH_ALGOS[integ]
437
438         if prf not in PRF_ALGOS:
439             raise TypeError('unsupported prf algo %r' % prf)
440         self.ike_prf = prf
441         self.ike_prf_alg = PRF_ALGOS[prf]
442         self.ike_dh = dh
443         self.ike_group = DH[self.ike_dh]
444
445     def set_esp_props(self, crypto, crypto_key_len, integ):
446         self.esp_crypto_key_len = crypto_key_len
447         if crypto not in CRYPTO_ALGOS:
448             raise TypeError('unsupported encryption algo %r' % crypto)
449         self.esp_crypto = crypto
450         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
451
452         if integ not in AUTH_ALGOS:
453             raise TypeError('unsupported auth algo %r' % integ)
454         self.esp_integ = None if integ == 'NULL' else integ
455         self.esp_integ_alg = AUTH_ALGOS[integ]
456
457     def crypto_attr(self, key_len):
458         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
459             return (0x800e << 16 | key_len << 3, 12)
460         else:
461             raise Exception('unsupported attribute type')
462
463     def ike_crypto_attr(self):
464         return self.crypto_attr(self.ike_crypto_key_len)
465
466     def esp_crypto_attr(self):
467         return self.crypto_attr(self.esp_crypto_key_len)
468
469     def compute_nat_sha1(self, ip, port):
470         data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
471         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
472         digest.update(data)
473         return digest.finalize()
474
475
476 class TemplateResponder(VppTestCase):
477     """ responder test template """
478
479     @classmethod
480     def setUpClass(cls):
481         import scapy.contrib.ikev2 as _ikev2
482         globals()['ikev2'] = _ikev2
483         super(TemplateResponder, cls).setUpClass()
484         cls.create_pg_interfaces(range(2))
485         for i in cls.pg_interfaces:
486             i.admin_up()
487             i.config_ip4()
488             i.resolve_arp()
489
490     @classmethod
491     def tearDownClass(cls):
492         super(TemplateResponder, cls).tearDownClass()
493
494     def setUp(self):
495         super(TemplateResponder, self).setUp()
496         self.config_tc()
497         self.p.add_vpp_config()
498         self.assertIsNotNone(self.p.query_vpp_config())
499         self.sa.generate_dh_data()
500
501     def tearDown(self):
502         super(TemplateResponder, self).tearDown()
503         self.p.remove_vpp_config()
504         self.assertIsNone(self.p.query_vpp_config())
505
506     def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
507         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
508                IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
509                UDP(sport=sport, dport=dport))
510         if natt:
511             # insert non ESP marker
512             res = res / Raw(b'\x00' * 4)
513         return res / msg
514
515     def send_sa_init(self, behind_nat=False):
516         tr_attr = self.sa.ike_crypto_attr()
517         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
518                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
519                  key_length=tr_attr[0]) /
520                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
521                  transform_id=self.sa.ike_integ) /
522                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
523                  transform_id=self.sa.ike_prf_alg.name) /
524                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
525                  transform_id=self.sa.ike_dh))
526
527         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
528                  trans_nb=4, trans=trans))
529
530         if behind_nat:
531             next_payload = 'Notify'
532         else:
533             next_payload = None
534
535         self.sa.init_req_packet = (
536                 ikev2.IKEv2(init_SPI=self.sa.ispi,
537                             flags='Initiator', exch_type='IKE_SA_INIT') /
538                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
539                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
540                                        group=self.sa.ike_dh,
541                                        load=self.sa.dh_pub_key()) /
542                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
543                                           load=self.sa.i_nonce))
544
545         if behind_nat:
546             src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
547                                                self.sa.sport)
548             nat_detection = ikev2.IKEv2_payload_Notify(
549                     type='NAT_DETECTION_SOURCE_IP',
550                     load=src_nat)
551             self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
552
553         ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
554                                       self.sa.sport, self.sa.dport,
555                                       self.sa.natt)
556         self.pg0.add_stream(ike_msg)
557         self.pg0.enable_capture()
558         self.pg_start()
559         capture = self.pg0.get_capture(1)
560         self.verify_sa_init(capture[0])
561
562     def send_sa_auth(self):
563         tr_attr = self.sa.esp_crypto_attr()
564         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
565                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
566                  key_length=tr_attr[0]) /
567                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
568                  transform_id=self.sa.esp_integ) /
569                  ikev2.IKEv2_payload_Transform(
570                  transform_type='Extended Sequence Number',
571                  transform_id='No ESN') /
572                  ikev2.IKEv2_payload_Transform(
573                  transform_type='Extended Sequence Number',
574                  transform_id='ESN'))
575
576         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
577                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
578
579         tsi, tsr = self.sa.generate_ts()
580         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
581                  IDtype=self.sa.id_type, load=self.sa.i_id) /
582                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
583                  IDtype=self.sa.id_type, load=self.sa.r_id) /
584                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
585                  auth_type=AuthMethod.value(self.sa.auth_method),
586                  load=self.sa.auth_data) /
587                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
588                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
589                  number_of_TSs=len(tsi),
590                  traffic_selector=tsi) /
591                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
592                  number_of_TSs=len(tsr),
593                  traffic_selector=tsr) /
594                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
595
596         if self.sa.ike_crypto == 'AES-GCM-16ICV':
597             data = self.sa.ike_crypto_alg.pad(raw(plain))
598             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
599                 len(ikev2.IKEv2_payload_Encrypted())
600             tlen = plen + len(ikev2.IKEv2())
601
602             # prepare aad data
603             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
604                                                  length=plen)
605             sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
606                        resp_SPI=self.sa.rspi, id=1,
607                        length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
608             sa_auth /= sk_p
609
610             encr = self.sa.encrypt(raw(plain), raw(sa_auth))
611             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
612                                                  length=plen, load=encr)
613             sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
614                        resp_SPI=self.sa.rspi, id=1,
615                        length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
616             sa_auth /= sk_p
617         else:
618             encr = self.sa.encrypt(raw(plain))
619             trunc_len = self.sa.ike_integ_alg.trunc_len
620             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
621             tlen = plen + len(ikev2.IKEv2())
622
623             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
624                                                  length=plen, load=encr)
625             sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
626                        resp_SPI=self.sa.rspi, id=1,
627                        length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
628             sa_auth /= sk_p
629
630             integ_data = raw(sa_auth)
631             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
632                                              self.sa.my_authkey, integ_data)
633             sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
634
635         assert(len(sa_auth) == tlen)
636         packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
637                                      self.sa.dport, self.sa.natt)
638         self.pg0.add_stream(packet)
639         self.pg0.enable_capture()
640         self.pg_start()
641         capture = self.pg0.get_capture(1)
642         self.verify_sa_auth(capture[0])
643
644     def get_ike_header(self, packet):
645         try:
646             ih = packet[ikev2.IKEv2]
647         except IndexError as e:
648             # this is a workaround for getting IKEv2 layer as both ikev2 and
649             # ipsec register for port 4500
650             esp = packet[ESP]
651             ih = self.verify_and_remove_non_esp_marker(esp)
652         return ih
653
654     def verify_sa_init(self, packet):
655         ih = self.get_ike_header(packet)
656
657         self.assertEqual(ih.exch_type, 34)
658         self.assertTrue('Response' in ih.flags)
659         self.assertEqual(ih.init_SPI, self.sa.ispi)
660         self.assertNotEqual(ih.resp_SPI, 0)
661         self.sa.rspi = ih.resp_SPI
662         try:
663             sa = ih[ikev2.IKEv2_payload_SA]
664             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
665             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
666         except IndexError as e:
667             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
668             self.logger.error(ih.show())
669             raise
670         self.sa.complete_dh_data()
671         self.sa.calc_keys()
672         self.sa.auth_init()
673
674     def verify_and_remove_non_esp_marker(self, packet):
675         if self.sa.natt:
676             # if we are in nat traversal mode check for non esp marker
677             # and remove it
678             data = raw(packet)
679             self.assertEqual(data[:4], b'\x00' * 4)
680             return ikev2.IKEv2(data[4:])
681         else:
682             return packet
683
684     def verify_udp(self, udp):
685         self.assertEqual(udp.sport, self.sa.sport)
686         self.assertEqual(udp.dport, self.sa.dport)
687
688     def verify_sa_auth(self, packet):
689         ike = self.get_ike_header(packet)
690         udp = packet[UDP]
691         self.verify_udp(udp)
692         plain = self.sa.hmac_and_decrypt(ike)
693         self.sa.calc_child_keys()
694
695     def verify_child_sas(self):
696         sas = self.vapi.ipsec_sa_dump()
697         self.assertEqual(len(sas), 2)
698         sa0 = sas[0].entry
699         sa1 = sas[1].entry
700         c = self.sa.child_sas[0]
701
702         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
703         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
704         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
705
706         if self.sa.esp_integ is None:
707             vpp_integ_alg = 0
708         else:
709             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
710         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
711         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
712
713         # verify crypto keys
714         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
715         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
716         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
717         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
718
719         # verify integ keys
720         if vpp_integ_alg:
721             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
722             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
723             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
724             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
725         else:
726             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
727             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
728
729     def test_responder(self):
730         self.send_sa_init(self.sa.natt)
731         self.send_sa_auth()
732         self.verify_child_sas()
733
734
735 class Ikev2Params(object):
736     def config_params(self, params={}):
737         ec = VppEnum.vl_api_ipsec_crypto_alg_t
738         ei = VppEnum.vl_api_ipsec_integ_alg_t
739         self.vpp_enums = {
740                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
741                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
742                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
743                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
744                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
745                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
746
747                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
748                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
749                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
750                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
751
752         is_natt = 'natt' in params and params['natt'] or False
753         self.p = Profile(self, 'pr1')
754
755         if 'auth' in params and params['auth'] == 'rsa-sig':
756             auth_method = 'rsa-sig'
757             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
758             self.vapi.ikev2_set_local_key(
759                     key_file=work_dir + params['server-key'])
760
761             client_file = work_dir + params['client-cert']
762             server_pem = open(work_dir + params['server-cert']).read()
763             client_priv = open(work_dir + params['client-key']).read()
764             client_priv = load_pem_private_key(str.encode(client_priv), None,
765                                                default_backend())
766             self.peer_cert = x509.load_pem_x509_certificate(
767                     str.encode(server_pem),
768                     default_backend())
769             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
770             auth_data = None
771         else:
772             auth_data = b'$3cr3tpa$$w0rd'
773             self.p.add_auth(method='shared-key', data=auth_data)
774             auth_method = 'shared-key'
775             client_priv = None
776
777         self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
778         self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
779         self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
780         self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
781
782         self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
783                           r_id=self.p.local_id['data'],
784                           id_type=self.p.local_id['id_type'], natt=is_natt,
785                           priv_key=client_priv, auth_method=auth_method,
786                           auth_data=auth_data,
787                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
788
789         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
790             params['ike-crypto']
791         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
792             params['ike-integ']
793         ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
794
795         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
796             params['esp-crypto']
797         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
798             params['esp-integ']
799
800         self.sa.set_ike_props(
801                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
802                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
803         self.sa.set_esp_props(
804                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
805                 integ=esp_integ)
806
807
808 class TestApi(VppTestCase):
809     """ Test IKEV2 API """
810     @classmethod
811     def setUpClass(cls):
812         super(TestApi, cls).setUpClass()
813
814     @classmethod
815     def tearDownClass(cls):
816         super(TestApi, cls).tearDownClass()
817
818     def tearDown(self):
819         super(TestApi, self).tearDown()
820         self.p1.remove_vpp_config()
821         self.p2.remove_vpp_config()
822         r = self.vapi.ikev2_profile_dump()
823         self.assertEqual(len(r), 0)
824
825     def configure_profile(self, cfg):
826         p = Profile(self, cfg['name'])
827         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
828         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
829         p.add_local_ts(**cfg['loc_ts'])
830         p.add_remote_ts(**cfg['rem_ts'])
831         p.add_responder(cfg['responder'])
832         p.add_ike_transforms(cfg['ike_ts'])
833         p.add_esp_transforms(cfg['esp_ts'])
834         p.add_auth(**cfg['auth'])
835         p.set_udp_encap(cfg['udp_encap'])
836         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
837         if 'lifetime_data' in cfg:
838             p.set_lifetime_data(cfg['lifetime_data'])
839         if 'tun_itf' in cfg:
840             p.set_tunnel_interface(cfg['tun_itf'])
841         p.add_vpp_config()
842         return p
843
844     def test_profile_api(self):
845         """ test profile dump API """
846         loc_ts = {
847                     'proto': 8,
848                     'start_port': 1,
849                     'end_port': 19,
850                     'start_addr': '3.3.3.2',
851                     'end_addr': '3.3.3.3',
852                 }
853         rem_ts = {
854                     'proto': 9,
855                     'start_port': 10,
856                     'end_port': 119,
857                     'start_addr': '4.5.76.80',
858                     'end_addr': '2.3.4.6',
859                 }
860
861         conf = {
862             'p1': {
863                 'name': 'p1',
864                 'loc_id': ('fqdn', b'vpp.home'),
865                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
866                 'loc_ts': loc_ts,
867                 'rem_ts': rem_ts,
868                 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
869                 'ike_ts': {
870                         'crypto_alg': 20,
871                         'crypto_key_size': 32,
872                         'integ_alg': 1,
873                         'dh_group': 1},
874                 'esp_ts': {
875                         'crypto_alg': 13,
876                         'crypto_key_size': 24,
877                         'integ_alg': 2},
878                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
879                 'udp_encap': True,
880                 'ipsec_over_udp_port': 4501,
881                 'lifetime_data': {
882                     'lifetime': 123,
883                     'lifetime_maxdata': 20192,
884                     'lifetime_jitter': 9,
885                     'handover': 132},
886             },
887             'p2': {
888                 'name': 'p2',
889                 'loc_id': ('ip4-addr', b'192.168.2.1'),
890                 'rem_id': ('ip4-addr', b'192.168.2.2'),
891                 'loc_ts': loc_ts,
892                 'rem_ts': rem_ts,
893                 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
894                 'ike_ts': {
895                         'crypto_alg': 12,
896                         'crypto_key_size': 16,
897                         'integ_alg': 3,
898                         'dh_group': 3},
899                 'esp_ts': {
900                         'crypto_alg': 9,
901                         'crypto_key_size': 24,
902                         'integ_alg': 4},
903                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
904                 'udp_encap': False,
905                 'ipsec_over_udp_port': 4600,
906                 'tun_itf': 0}
907         }
908         self.p1 = self.configure_profile(conf['p1'])
909         self.p2 = self.configure_profile(conf['p2'])
910
911         r = self.vapi.ikev2_profile_dump()
912         self.assertEqual(len(r), 2)
913         self.verify_profile(r[0].profile, conf['p1'])
914         self.verify_profile(r[1].profile, conf['p2'])
915
916     def verify_id(self, api_id, cfg_id):
917         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
918         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
919
920     def verify_ts(self, api_ts, cfg_ts):
921         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
922         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
923         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
924         self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
925         self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
926
927     def verify_responder(self, api_r, cfg_r):
928         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
929         self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
930
931     def verify_transforms(self, api_ts, cfg_ts):
932         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
933         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
934         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
935
936     def verify_ike_transforms(self, api_ts, cfg_ts):
937         self.verify_transforms(api_ts, cfg_ts)
938         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
939
940     def verify_esp_transforms(self, api_ts, cfg_ts):
941         self.verify_transforms(api_ts, cfg_ts)
942
943     def verify_auth(self, api_auth, cfg_auth):
944         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
945         self.assertEqual(api_auth.data, cfg_auth['data'])
946         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
947
948     def verify_lifetime_data(self, p, ld):
949         self.assertEqual(p.lifetime, ld['lifetime'])
950         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
951         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
952         self.assertEqual(p.handover, ld['handover'])
953
954     def verify_profile(self, ap, cp):
955         self.assertEqual(ap.name, cp['name'])
956         self.assertEqual(ap.udp_encap, cp['udp_encap'])
957         self.verify_id(ap.loc_id, cp['loc_id'])
958         self.verify_id(ap.rem_id, cp['rem_id'])
959         self.verify_ts(ap.loc_ts, cp['loc_ts'])
960         self.verify_ts(ap.rem_ts, cp['rem_ts'])
961         self.verify_responder(ap.responder, cp['responder'])
962         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
963         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
964         self.verify_auth(ap.auth, cp['auth'])
965         if 'lifetime_data' in cp:
966             self.verify_lifetime_data(ap, cp['lifetime_data'])
967         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
968         if 'tun_itf' in cp:
969             self.assertEqual(ap.tun_itf, cp['tun_itf'])
970         else:
971             self.assertEqual(ap.tun_itf, 0xffffffff)
972
973
974 class TestResponderNATT(TemplateResponder, Ikev2Params):
975     """ test ikev2 responder - nat traversal """
976     def config_tc(self):
977         self.config_params(
978                 {'natt': True})
979
980
981 class TestResponderPsk(TemplateResponder, Ikev2Params):
982     """ test ikev2 responder - pre shared key auth """
983     def config_tc(self):
984         self.config_params()
985
986
987 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
988     """ test ikev2 responder - cert based auth """
989     def config_tc(self):
990         self.config_params({
991             'auth': 'rsa-sig',
992             'server-key': 'server-key.pem',
993             'client-key': 'client-key.pem',
994             'client-cert': 'client-cert.pem',
995             'server-cert': 'server-cert.pem'})
996
997
998 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
999         (TemplateResponder, Ikev2Params):
1000     """
1001     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1002     """
1003     def config_tc(self):
1004         self.config_params({
1005             'ike-crypto': ('AES-CBC', 16),
1006             'ike-integ': 'SHA2-256-128',
1007             'esp-crypto': ('AES-CBC', 24),
1008             'esp-integ': 'SHA2-384-192',
1009             'ike-dh': '2048MODPgr'})
1010
1011
1012 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1013         (TemplateResponder, Ikev2Params):
1014     """
1015     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1016     """
1017     def config_tc(self):
1018         self.config_params({
1019             'ike-crypto': ('AES-CBC', 32),
1020             'ike-integ': 'SHA2-256-128',
1021             'esp-crypto': ('AES-GCM-16ICV', 32),
1022             'esp-integ': 'NULL',
1023             'ike-dh': '3072MODPgr'})
1024
1025
1026 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1027     """
1028     IKE:AES_GCM_16_256
1029     """
1030     def config_tc(self):
1031         self.config_params({
1032             'ike-crypto': ('AES-GCM-16ICV', 32),
1033             'ike-integ': 'NULL',
1034             'ike-dh': '2048MODPgr'})
1035
1036
1037 if __name__ == '__main__':
1038     unittest.main(testRunner=VppTestRunner)