tests: add ikev2 test framework with basic test case
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography.hazmat.backends import default_backend
3 from cryptography.hazmat.primitives import hashes, hmac
4 from cryptography.hazmat.primitives.asymmetric import dh
5 from cryptography.hazmat.primitives.ciphers import (
6     Cipher,
7     algorithms,
8     modes,
9 )
10 from scapy.layers.inet import IP, UDP, Ether
11 from scapy.packet import raw, Raw
12 from scapy.utils import long_converter
13 from framework import VppTestCase, VppTestRunner
14 from vpp_ikev2 import Profile, IDType
15
16
17 KEY_PAD = b"Key Pad for IKEv2"
18
19
20 # defined in rfc3526
21 # tuple structure is (p, g, key_len)
22 DH = {
23     '2048MODPgr': (long_converter("""
24     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
25     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
26     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
27     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
28     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
29     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
30     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
31     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
32     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
33     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
34     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256)
35 }
36
37
38 class CryptoAlgo(object):
39     def __init__(self, name, cipher, mode):
40         self.name = name
41         self.cipher = cipher
42         self.mode = mode
43         if self.cipher is not None:
44             self.bs = self.cipher.block_size // 8
45
46     def encrypt(self, data, key):
47         iv = os.urandom(self.bs)
48         encryptor = Cipher(self.cipher(key), self.mode(iv),
49                            default_backend()).encryptor()
50         return iv + encryptor.update(data) + encryptor.finalize()
51
52     def decrypt(self, data, key, icv=None):
53         iv = data[:self.bs]
54         ct = data[self.bs:]
55         decryptor = Cipher(algorithms.AES(key),
56                            modes.CBC(iv),
57                            default_backend()).decryptor()
58         return decryptor.update(ct) + decryptor.finalize()
59
60     def pad(self, data):
61         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
62         data = data + b'\x00' * (pad_len - 1)
63         return data + bytes([pad_len])
64
65
66 class AuthAlgo(object):
67     def __init__(self, name, mac, mod, key_len, trunc_len=None):
68         self.name = name
69         self.mac = mac
70         self.mod = mod
71         self.key_len = key_len
72         self.trunc_len = trunc_len or key_len
73
74
75 CRYPTO_ALGOS = {
76     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
77     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
78 }
79
80 AUTH_ALGOS = {
81     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
82     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
83 }
84
85 PRF_ALGOS = {
86     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
87     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
88                                   hashes.SHA256, 32),
89 }
90
91
92 class IKEv2ChildSA(object):
93     def __init__(self, local_ts, remote_ts, spi=None):
94         self.spi = spi or os.urandom(4)
95         self.local_ts = local_ts
96         self.remote_ts = remote_ts
97
98
99 class IKEv2SA(object):
100     def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
101                  id=None, id_type='fqdn', nonce=None, auth_data=None,
102                  local_ts=None, remote_ts=None, auth_method='shared-key'):
103         self.dh_params = None
104         self.test = test
105         self.is_initiator = is_initiator
106         nonce = nonce or os.urandom(32)
107         self.auth_data = auth_data
108         if isinstance(id_type, str):
109             self.id_type = IDType.value(id_type)
110         else:
111             self.id_type = id_type
112         self.auth_method = auth_method
113         if self.is_initiator:
114             self.rspi = None
115             self.ispi = spi
116             self.i_id = id
117             self.i_nonce = nonce
118         else:
119             self.rspi = spi
120             self.ispi = None
121             self.r_id = id
122             self.r_nonce = None
123         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
124
125     def dh_pub_key(self):
126         return self.i_dh_data
127
128     def compute_secret(self):
129         priv = self.dh_private_key
130         peer = self.r_dh_data
131         p, g, l = self.ike_group
132         return pow(int.from_bytes(peer, 'big'),
133                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
134
135     def generate_dh_data(self):
136         # generate DH keys
137         if self.is_initiator:
138             if self.ike_dh not in DH:
139                 raise NotImplementedError('%s not in DH group' % self.ike_dh)
140             if self.dh_params is None:
141                 dhg = DH[self.ike_dh]
142                 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
143                 self.dh_params = pn.parameters(default_backend())
144             priv = self.dh_params.generate_private_key()
145             pub = priv.public_key()
146             x = priv.private_numbers().x
147             self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
148             y = pub.public_numbers().y
149             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
150
151     def complete_dh_data(self):
152         self.dh_shared_secret = self.compute_secret()
153
154     def calc_child_keys(self):
155         prf = self.ike_prf_alg.mod()
156         s = self.i_nonce + self.r_nonce
157         c = self.child_sas[0]
158
159         encr_key_len = self.esp_crypto_key_len
160         integ_key_len = self.ike_integ_alg.key_len
161         l = (integ_key_len * 2 +
162              encr_key_len * 2)
163         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
164
165         pos = 0
166         c.sk_ei = keymat[pos:pos+encr_key_len]
167         pos += encr_key_len
168
169         c.sk_ai = keymat[pos:pos+integ_key_len]
170         pos += integ_key_len
171
172         c.sk_er = keymat[pos:pos+encr_key_len]
173         pos += encr_key_len
174
175         c.sk_ar = keymat[pos:pos+integ_key_len]
176         pos += integ_key_len
177
178     def calc_prfplus(self, prf, key, seed, length):
179         r = b''
180         t = None
181         x = 1
182         while len(r) < length and x < 255:
183             if t is not None:
184                 s = t
185             else:
186                 s = b''
187             s = s + seed + bytes([x])
188             t = self.calc_prf(prf, key, s)
189             r = r + t
190             x = x + 1
191
192         if x == 255:
193             return None
194         return r
195
196     def calc_prf(self, prf, key, data):
197         h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
198         h.update(data)
199         return h.finalize()
200
201     def calc_keys(self):
202         prf = self.ike_prf_alg.mod()
203         # SKEYSEED = prf(Ni | Nr, g^ir)
204         s = self.i_nonce + self.r_nonce
205         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
206
207         # calculate S = Ni | Nr | SPIi SPIr
208         s = s + self.ispi + self.rspi
209
210         prf_key_trunc = self.ike_prf_alg.trunc_len
211         encr_key_len = self.ike_crypto_key_len
212         tr_prf_key_len = self.ike_prf_alg.key_len
213         integ_key_len = self.ike_integ_alg.key_len
214         l = (prf_key_trunc +
215              integ_key_len * 2 +
216              encr_key_len * 2 +
217              tr_prf_key_len * 2)
218         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
219
220         pos = 0
221         self.sk_d = keymat[:pos+prf_key_trunc]
222         pos += prf_key_trunc
223
224         self.sk_ai = keymat[pos:pos+integ_key_len]
225         pos += integ_key_len
226         self.sk_ar = keymat[pos:pos+integ_key_len]
227         pos += integ_key_len
228
229         self.sk_ei = keymat[pos:pos+encr_key_len]
230         pos += encr_key_len
231         self.sk_er = keymat[pos:pos+encr_key_len]
232         pos += encr_key_len
233
234         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
235         pos += tr_prf_key_len
236         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
237
238     def generate_authmsg(self, prf, packet):
239         if self.is_initiator:
240             id = self.i_id
241             nonce = self.r_nonce
242             key = self.sk_pi
243         data = bytes([self.id_type, 0, 0, 0]) + id
244         id_hash = self.calc_prf(prf, key, data)
245         return packet + nonce + id_hash
246
247     def auth_init(self):
248         prf = self.ike_prf_alg.mod()
249         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
250         psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
251         self.auth_data = self.calc_prf(prf, psk, authmsg)
252
253     def encrypt(self, data):
254         data = self.ike_crypto_alg.pad(data)
255         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
256
257     @property
258     def peer_authkey(self):
259         if self.is_initiator:
260             return self.sk_ar
261         return self.sk_ai
262
263     @property
264     def my_authkey(self):
265         if self.is_initiator:
266             return self.sk_ai
267         return self.sk_ar
268
269     @property
270     def my_cryptokey(self):
271         if self.is_initiator:
272             return self.sk_ei
273         return self.sk_er
274
275     @property
276     def peer_cryptokey(self):
277         if self.is_initiator:
278             return self.sk_er
279         return self.sk_ei
280
281     def verify_hmac(self, ikemsg):
282         integ_trunc = self.ike_integ_alg.trunc_len
283         exp_hmac = ikemsg[-integ_trunc:]
284         data = ikemsg[:-integ_trunc]
285         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
286                                           self.peer_authkey, data)
287         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
288
289     def compute_hmac(self, integ, key, data):
290         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
291         h.update(data)
292         return h.finalize()
293
294     def decrypt(self, data):
295         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
296
297     def hmac_and_decrypt(self, ike):
298         ep = ike[ikev2.IKEv2_payload_Encrypted]
299         self.verify_hmac(raw(ike))
300         integ_trunc = self.ike_integ_alg.trunc_len
301
302         # remove ICV and decrypt payload
303         ct = ep.load[:-integ_trunc]
304         return self.decrypt(ct)
305
306     def generate_ts(self):
307         c = self.child_sas[0]
308         ts1 = ikev2.IPv4TrafficSelector(
309                 IP_protocol_ID=0,
310                 starting_address_v4=c.local_ts['start_addr'],
311                 ending_address_v4=c.local_ts['end_addr'])
312         ts2 = ikev2.IPv4TrafficSelector(
313                 IP_protocol_ID=0,
314                 starting_address_v4=c.remote_ts['start_addr'],
315                 ending_address_v4=c.remote_ts['end_addr'])
316         return ([ts1], [ts2])
317
318     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
319         if crypto not in CRYPTO_ALGOS:
320             raise TypeError('unsupported encryption algo %r' % crypto)
321         self.ike_crypto = crypto
322         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
323         self.ike_crypto_key_len = crypto_key_len
324
325         if integ not in AUTH_ALGOS:
326             raise TypeError('unsupported auth algo %r' % integ)
327         self.ike_integ = integ
328         self.ike_integ_alg = AUTH_ALGOS[integ]
329
330         if prf not in PRF_ALGOS:
331             raise TypeError('unsupported prf algo %r' % prf)
332         self.ike_prf = prf
333         self.ike_prf_alg = PRF_ALGOS[prf]
334         self.ike_dh = dh
335         self.ike_group = DH[self.ike_dh]
336
337     def set_esp_props(self, crypto, crypto_key_len, integ):
338         self.esp_crypto_key_len = crypto_key_len
339         if crypto not in CRYPTO_ALGOS:
340             raise TypeError('unsupported encryption algo %r' % crypto)
341         self.esp_crypto = crypto
342         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
343
344         if integ not in AUTH_ALGOS:
345             raise TypeError('unsupported auth algo %r' % integ)
346         self.esp_integ = integ
347         self.esp_integ_alg = AUTH_ALGOS[integ]
348
349     def crypto_attr(self, key_len):
350         if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
351             return (0x800e << 16 | key_len << 3, 12)
352         else:
353             raise Exception('unsupported attribute type')
354
355     def ike_crypto_attr(self):
356         return self.crypto_attr(self.ike_crypto_key_len)
357
358     def esp_crypto_attr(self):
359         return self.crypto_attr(self.esp_crypto_key_len)
360
361
362 class TestResponder(VppTestCase):
363     """ responder test """
364
365     @classmethod
366     def setUpClass(cls):
367         import scapy.contrib.ikev2 as _ikev2
368         globals()['ikev2'] = _ikev2
369         super(TestResponder, cls).setUpClass()
370         cls.create_pg_interfaces(range(2))
371         for i in cls.pg_interfaces:
372             i.admin_up()
373             i.config_ip4()
374             i.resolve_arp()
375
376     @classmethod
377     def tearDownClass(cls):
378         super(TestResponder, cls).tearDownClass()
379
380     def setUp(self):
381         super(TestResponder, self).setUp()
382         self.config_tc()
383
384     def config_tc(self):
385         self.p = Profile(self, 'pr1')
386         self.p.add_auth(method='shared-key', data=b'$3cr3tpa$$w0rd')
387         self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
388         self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
389         self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
390         self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
391         self.p.add_vpp_config()
392
393         self.sa = IKEv2SA(self, id=self.p.remote_id['data'], is_initiator=True,
394                           auth_data=self.p.auth['data'],
395                           id_type=self.p.local_id['id_type'],
396                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
397
398         self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
399                               integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
400                               dh='2048MODPgr')
401         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
402                               integ='HMAC-SHA1-96')
403         self.sa.generate_dh_data()
404
405     def create_ike_msg(self, src_if, msg, sport=500, dport=500):
406         return (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
407                 IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
408                 UDP(sport=sport, dport=dport) / msg)
409
410     def send_sa_init(self):
411         tr_attr = self.sa.ike_crypto_attr()
412         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
413                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
414                  key_length=tr_attr[0]) /
415                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
416                  transform_id=self.sa.ike_integ) /
417                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
418                  transform_id=self.sa.ike_prf_alg.name) /
419                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
420                  transform_id=self.sa.ike_dh))
421
422         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
423                  trans_nb=4, trans=trans))
424
425         self.sa.init_req_packet = (
426                 ikev2.IKEv2(init_SPI=self.sa.ispi,
427                             flags='Initiator', exch_type='IKE_SA_INIT') /
428                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
429                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
430                                        group=self.sa.ike_dh,
431                                        load=self.sa.dh_pub_key()) /
432                 ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce))
433
434         ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet)
435         self.pg0.add_stream(ike_msg)
436         self.pg0.enable_capture()
437         self.pg_start()
438         capture = self.pg0.get_capture(1)
439         self.verify_sa_init(capture[0])
440
441     def send_sa_auth(self):
442         tr_attr = self.sa.esp_crypto_attr()
443         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
444                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
445                  key_length=tr_attr[0]) /
446                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
447                  transform_id=self.sa.esp_integ) /
448                  ikev2.IKEv2_payload_Transform(
449                  transform_type='Extended Sequence Number',
450                  transform_id='No ESN') /
451                  ikev2.IKEv2_payload_Transform(
452                  transform_type='Extended Sequence Number',
453                  transform_id='ESN'))
454
455         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
456                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
457
458         tsi, tsr = self.sa.generate_ts()
459         plain = (ikev2.IKEv2_payload_IDi(next_payload='AUTH',
460                  IDtype=self.sa.id_type, load=self.sa.i_id) /
461                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
462                  auth_type=2, load=self.sa.auth_data) /
463                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
464                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
465                  number_of_TSs=len(tsi),
466                  traffic_selector=tsi) /
467                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
468                  number_of_TSs=len(tsr),
469                  traffic_selector=tsr) /
470                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
471         encr = self.sa.encrypt(raw(plain))
472
473         trunc_len = self.sa.ike_integ_alg.trunc_len
474         plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
475         tlen = plen + len(ikev2.IKEv2())
476
477         sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
478                                              length=plen, load=encr)
479         sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
480                    length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
481         sa_auth /= sk_p
482
483         integ_data = raw(sa_auth)
484         hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
485                                          self.sa.my_authkey, integ_data)
486         sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
487         assert(len(sa_auth) == tlen)
488
489         packet = self.create_ike_msg(self.pg0, sa_auth)
490         self.pg0.add_stream(packet)
491         self.pg0.enable_capture()
492         self.pg_start()
493         capture = self.pg0.get_capture(1)
494         self.verify_sa_auth(capture[0])
495
496     def verify_sa_init(self, packet):
497         ih = packet[ikev2.IKEv2]
498         self.assertEqual(ih.exch_type, 34)
499         self.assertTrue('Response' in ih.flags)
500         self.assertEqual(ih.init_SPI, self.sa.ispi)
501         self.assertNotEqual(ih.resp_SPI, 0)
502         self.sa.rspi = ih.resp_SPI
503         try:
504             sa = ih[ikev2.IKEv2_payload_SA]
505             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
506             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
507         except AttributeError as e:
508             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
509             raise
510         self.sa.complete_dh_data()
511         self.sa.calc_keys()
512         self.sa.auth_init()
513
514     def verify_sa_auth(self, packet):
515         try:
516             ike = packet[ikev2.IKEv2]
517             ep = packet[ikev2.IKEv2_payload_Encrypted]
518         except KeyError as e:
519             self.logger.error("unexpected reply: no IKEv2/Encrypt payload!")
520             raise
521         plain = self.sa.hmac_and_decrypt(ike)
522         self.sa.calc_child_keys()
523
524     def verify_child_sas(self):
525         sas = self.vapi.ipsec_sa_dump()
526         self.assertEqual(len(sas), 2)
527         sa0 = sas[0].entry
528         sa1 = sas[1].entry
529         c = self.sa.child_sas[0]
530
531         # verify crypto keys
532         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
533         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
534         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
535         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
536
537         # verify integ keys
538         self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
539         self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
540         self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
541         self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
542
543     def test_responder(self):
544         self.send_sa_init()
545         self.send_sa_auth()
546         self.verify_child_sas()
547
548
549 if __name__ == '__main__':
550     unittest.main(testRunner=VppTestRunner)