ikev2: add more ikev2 tests
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
8     Cipher,
9     algorithms,
10     modes,
11 )
12 from scapy.layers.ipsec import ESP
13 from scapy.layers.inet import IP, UDP, Ether
14 from scapy.packet import raw, Raw
15 from scapy.utils import long_converter
16 from framework import VppTestCase, VppTestRunner
17 from vpp_ikev2 import Profile, IDType, AuthMethod
18 from vpp_papi import VppEnum
19
20
21 KEY_PAD = b"Key Pad for IKEv2"
22
23
24 # defined in rfc3526
25 # tuple structure is (p, g, key_len)
26 DH = {
27     '2048MODPgr': (long_converter("""
28     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
29     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
30     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
31     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
32     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
33     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
34     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
35     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
36     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
37     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
38     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
39
40     '3072MODPgr': (long_converter("""
41     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
51     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
52     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
53     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
54     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
55     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
56     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
57 }
58
59
60 class CryptoAlgo(object):
61     def __init__(self, name, cipher, mode):
62         self.name = name
63         self.cipher = cipher
64         self.mode = mode
65         if self.cipher is not None:
66             self.bs = self.cipher.block_size // 8
67
68     def encrypt(self, data, key):
69         iv = os.urandom(self.bs)
70         encryptor = Cipher(self.cipher(key), self.mode(iv),
71                            default_backend()).encryptor()
72         return iv + encryptor.update(data) + encryptor.finalize()
73
74     def decrypt(self, data, key, icv=None):
75         iv = data[:self.bs]
76         ct = data[self.bs:]
77         decryptor = Cipher(algorithms.AES(key),
78                            modes.CBC(iv),
79                            default_backend()).decryptor()
80         return decryptor.update(ct) + decryptor.finalize()
81
82     def pad(self, data):
83         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
84         data = data + b'\x00' * (pad_len - 1)
85         return data + bytes([pad_len])
86
87
88 class AuthAlgo(object):
89     def __init__(self, name, mac, mod, key_len, trunc_len=None):
90         self.name = name
91         self.mac = mac
92         self.mod = mod
93         self.key_len = key_len
94         self.trunc_len = trunc_len or key_len
95
96
97 CRYPTO_ALGOS = {
98     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
99     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
100     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
101                                 mode=modes.GCM),
102 }
103
104 AUTH_ALGOS = {
105     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
106     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
107     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
108     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
109     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
110 }
111
112 PRF_ALGOS = {
113     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
114     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
115                                   hashes.SHA256, 32),
116 }
117
118
119 class IKEv2ChildSA(object):
120     def __init__(self, local_ts, remote_ts, spi=None):
121         self.spi = spi or os.urandom(4)
122         self.local_ts = local_ts
123         self.remote_ts = remote_ts
124
125
126 class IKEv2SA(object):
127     def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
128                  i_id=None, r_id=None, id_type='fqdn', nonce=None,
129                  auth_data=None, local_ts=None, remote_ts=None,
130                  auth_method='shared-key', priv_key=None, natt=False):
131         self.natt = natt
132         if natt:
133             self.sport = 4500
134             self.dport = 4500
135         else:
136             self.sport = 500
137             self.dport = 500
138         self.dh_params = None
139         self.test = test
140         self.priv_key = priv_key
141         self.is_initiator = is_initiator
142         nonce = nonce or os.urandom(32)
143         self.auth_data = auth_data
144         self.i_id = i_id
145         self.r_id = r_id
146         if isinstance(id_type, str):
147             self.id_type = IDType.value(id_type)
148         else:
149             self.id_type = id_type
150         self.auth_method = auth_method
151         if self.is_initiator:
152             self.rspi = None
153             self.ispi = spi
154             self.i_nonce = nonce
155         else:
156             self.rspi = spi
157             self.ispi = None
158             self.r_nonce = None
159         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
160
161     def dh_pub_key(self):
162         return self.i_dh_data
163
164     def compute_secret(self):
165         priv = self.dh_private_key
166         peer = self.r_dh_data
167         p, g, l = self.ike_group
168         return pow(int.from_bytes(peer, 'big'),
169                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
170
171     def generate_dh_data(self):
172         # generate DH keys
173         if self.is_initiator:
174             if self.ike_dh not in DH:
175                 raise NotImplementedError('%s not in DH group' % self.ike_dh)
176             if self.dh_params is None:
177                 dhg = DH[self.ike_dh]
178                 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
179                 self.dh_params = pn.parameters(default_backend())
180             priv = self.dh_params.generate_private_key()
181             pub = priv.public_key()
182             x = priv.private_numbers().x
183             self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
184             y = pub.public_numbers().y
185             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
186
187     def complete_dh_data(self):
188         self.dh_shared_secret = self.compute_secret()
189
190     def calc_child_keys(self):
191         prf = self.ike_prf_alg.mod()
192         s = self.i_nonce + self.r_nonce
193         c = self.child_sas[0]
194
195         encr_key_len = self.esp_crypto_key_len
196         integ_key_len = self.esp_integ_alg.key_len
197         salt_len = 0 if integ_key_len else 4
198
199         l = (integ_key_len * 2 +
200              encr_key_len * 2 +
201              salt_len * 2)
202         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
203
204         pos = 0
205         c.sk_ei = keymat[pos:pos+encr_key_len]
206         pos += encr_key_len
207
208         if integ_key_len:
209             c.sk_ai = keymat[pos:pos+integ_key_len]
210             pos += integ_key_len
211         else:
212             c.salt_ei = keymat[pos:pos+salt_len]
213             pos += salt_len
214
215         c.sk_er = keymat[pos:pos+encr_key_len]
216         pos += encr_key_len
217
218         if integ_key_len:
219             c.sk_ar = keymat[pos:pos+integ_key_len]
220             pos += integ_key_len
221         else:
222             c.salt_er = keymat[pos:pos+salt_len]
223             pos += salt_len
224
225     def calc_prfplus(self, prf, key, seed, length):
226         r = b''
227         t = None
228         x = 1
229         while len(r) < length and x < 255:
230             if t is not None:
231                 s = t
232             else:
233                 s = b''
234             s = s + seed + bytes([x])
235             t = self.calc_prf(prf, key, s)
236             r = r + t
237             x = x + 1
238
239         if x == 255:
240             return None
241         return r
242
243     def calc_prf(self, prf, key, data):
244         h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
245         h.update(data)
246         return h.finalize()
247
248     def calc_keys(self):
249         prf = self.ike_prf_alg.mod()
250         # SKEYSEED = prf(Ni | Nr, g^ir)
251         s = self.i_nonce + self.r_nonce
252         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
253
254         # calculate S = Ni | Nr | SPIi SPIr
255         s = s + self.ispi + self.rspi
256
257         prf_key_trunc = self.ike_prf_alg.trunc_len
258         encr_key_len = self.ike_crypto_key_len
259         tr_prf_key_len = self.ike_prf_alg.key_len
260         integ_key_len = self.ike_integ_alg.key_len
261         l = (prf_key_trunc +
262              integ_key_len * 2 +
263              encr_key_len * 2 +
264              tr_prf_key_len * 2)
265         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
266
267         pos = 0
268         self.sk_d = keymat[:pos+prf_key_trunc]
269         pos += prf_key_trunc
270
271         self.sk_ai = keymat[pos:pos+integ_key_len]
272         pos += integ_key_len
273         self.sk_ar = keymat[pos:pos+integ_key_len]
274         pos += integ_key_len
275
276         self.sk_ei = keymat[pos:pos+encr_key_len]
277         pos += encr_key_len
278         self.sk_er = keymat[pos:pos+encr_key_len]
279         pos += encr_key_len
280
281         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
282         pos += tr_prf_key_len
283         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
284
285     def generate_authmsg(self, prf, packet):
286         if self.is_initiator:
287             id = self.i_id
288             nonce = self.r_nonce
289             key = self.sk_pi
290         data = bytes([self.id_type, 0, 0, 0]) + id
291         id_hash = self.calc_prf(prf, key, data)
292         return packet + nonce + id_hash
293
294     def auth_init(self):
295         prf = self.ike_prf_alg.mod()
296         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
297         if self.auth_method == 'shared-key':
298             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
299             self.auth_data = self.calc_prf(prf, psk, authmsg)
300         elif self.auth_method == 'rsa-sig':
301             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
302                                                 hashes.SHA1())
303         else:
304             raise TypeError('unknown auth method type!')
305
306     def encrypt(self, data):
307         data = self.ike_crypto_alg.pad(data)
308         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
309
310     @property
311     def peer_authkey(self):
312         if self.is_initiator:
313             return self.sk_ar
314         return self.sk_ai
315
316     @property
317     def my_authkey(self):
318         if self.is_initiator:
319             return self.sk_ai
320         return self.sk_ar
321
322     @property
323     def my_cryptokey(self):
324         if self.is_initiator:
325             return self.sk_ei
326         return self.sk_er
327
328     @property
329     def peer_cryptokey(self):
330         if self.is_initiator:
331             return self.sk_er
332         return self.sk_ei
333
334     def concat(self, alg, key_len):
335         return alg + '-' + str(key_len * 8)
336
337     @property
338     def vpp_ike_cypto_alg(self):
339         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
340
341     @property
342     def vpp_esp_cypto_alg(self):
343         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
344
345     def verify_hmac(self, ikemsg):
346         integ_trunc = self.ike_integ_alg.trunc_len
347         exp_hmac = ikemsg[-integ_trunc:]
348         data = ikemsg[:-integ_trunc]
349         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
350                                           self.peer_authkey, data)
351         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
352
353     def compute_hmac(self, integ, key, data):
354         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
355         h.update(data)
356         return h.finalize()
357
358     def decrypt(self, data):
359         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
360
361     def hmac_and_decrypt(self, ike):
362         ep = ike[ikev2.IKEv2_payload_Encrypted]
363         self.verify_hmac(raw(ike))
364         integ_trunc = self.ike_integ_alg.trunc_len
365
366         # remove ICV and decrypt payload
367         ct = ep.load[:-integ_trunc]
368         return self.decrypt(ct)
369
370     def generate_ts(self):
371         c = self.child_sas[0]
372         ts1 = ikev2.IPv4TrafficSelector(
373                 IP_protocol_ID=0,
374                 starting_address_v4=c.local_ts['start_addr'],
375                 ending_address_v4=c.local_ts['end_addr'])
376         ts2 = ikev2.IPv4TrafficSelector(
377                 IP_protocol_ID=0,
378                 starting_address_v4=c.remote_ts['start_addr'],
379                 ending_address_v4=c.remote_ts['end_addr'])
380         return ([ts1], [ts2])
381
382     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
383         if crypto not in CRYPTO_ALGOS:
384             raise TypeError('unsupported encryption algo %r' % crypto)
385         self.ike_crypto = crypto
386         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
387         self.ike_crypto_key_len = crypto_key_len
388
389         if integ not in AUTH_ALGOS:
390             raise TypeError('unsupported auth algo %r' % integ)
391         self.ike_integ = integ
392         self.ike_integ_alg = AUTH_ALGOS[integ]
393
394         if prf not in PRF_ALGOS:
395             raise TypeError('unsupported prf algo %r' % prf)
396         self.ike_prf = prf
397         self.ike_prf_alg = PRF_ALGOS[prf]
398         self.ike_dh = dh
399         self.ike_group = DH[self.ike_dh]
400
401     def set_esp_props(self, crypto, crypto_key_len, integ):
402         self.esp_crypto_key_len = crypto_key_len
403         if crypto not in CRYPTO_ALGOS:
404             raise TypeError('unsupported encryption algo %r' % crypto)
405         self.esp_crypto = crypto
406         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
407
408         if integ not in AUTH_ALGOS:
409             raise TypeError('unsupported auth algo %r' % integ)
410         self.esp_integ = None if integ == 'NULL' else integ
411         self.esp_integ_alg = AUTH_ALGOS[integ]
412
413     def crypto_attr(self, key_len):
414         if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
415             return (0x800e << 16 | key_len << 3, 12)
416         else:
417             raise Exception('unsupported attribute type')
418
419     def ike_crypto_attr(self):
420         return self.crypto_attr(self.ike_crypto_key_len)
421
422     def esp_crypto_attr(self):
423         return self.crypto_attr(self.esp_crypto_key_len)
424
425     def compute_nat_sha1(self, ip, port):
426         data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
427         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
428         digest.update(data)
429         return digest.finalize()
430
431
432 class TemplateResponder(VppTestCase):
433     """ responder test template """
434
435     @classmethod
436     def setUpClass(cls):
437         import scapy.contrib.ikev2 as _ikev2
438         globals()['ikev2'] = _ikev2
439         super(TemplateResponder, cls).setUpClass()
440         cls.create_pg_interfaces(range(2))
441         for i in cls.pg_interfaces:
442             i.admin_up()
443             i.config_ip4()
444             i.resolve_arp()
445
446     @classmethod
447     def tearDownClass(cls):
448         super(TemplateResponder, cls).tearDownClass()
449
450     def setUp(self):
451         super(TemplateResponder, self).setUp()
452         self.config_tc()
453         self.p.add_vpp_config()
454         self.sa.generate_dh_data()
455
456     def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
457         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
458                IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
459                UDP(sport=sport, dport=dport))
460         if natt:
461             # insert non ESP marker
462             res = res / Raw(b'\x00' * 4)
463         return res / msg
464
465     def send_sa_init(self, behind_nat=False):
466         tr_attr = self.sa.ike_crypto_attr()
467         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
468                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
469                  key_length=tr_attr[0]) /
470                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
471                  transform_id=self.sa.ike_integ) /
472                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
473                  transform_id=self.sa.ike_prf_alg.name) /
474                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
475                  transform_id=self.sa.ike_dh))
476
477         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
478                  trans_nb=4, trans=trans))
479
480         if behind_nat:
481             next_payload = 'Notify'
482         else:
483             next_payload = None
484
485         self.sa.init_req_packet = (
486                 ikev2.IKEv2(init_SPI=self.sa.ispi,
487                             flags='Initiator', exch_type='IKE_SA_INIT') /
488                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
489                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
490                                        group=self.sa.ike_dh,
491                                        load=self.sa.dh_pub_key()) /
492                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
493                                           load=self.sa.i_nonce))
494
495         if behind_nat:
496             src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
497                                                self.sa.sport)
498             nat_detection = ikev2.IKEv2_payload_Notify(
499                     type='NAT_DETECTION_SOURCE_IP',
500                     load=src_nat)
501             self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
502
503         ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
504                                       self.sa.sport, self.sa.dport,
505                                       self.sa.natt)
506         self.pg0.add_stream(ike_msg)
507         self.pg0.enable_capture()
508         self.pg_start()
509         capture = self.pg0.get_capture(1)
510         self.verify_sa_init(capture[0])
511
512     def send_sa_auth(self):
513         tr_attr = self.sa.esp_crypto_attr()
514         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
515                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
516                  key_length=tr_attr[0]) /
517                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
518                  transform_id=self.sa.esp_integ) /
519                  ikev2.IKEv2_payload_Transform(
520                  transform_type='Extended Sequence Number',
521                  transform_id='No ESN') /
522                  ikev2.IKEv2_payload_Transform(
523                  transform_type='Extended Sequence Number',
524                  transform_id='ESN'))
525
526         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
527                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
528
529         tsi, tsr = self.sa.generate_ts()
530         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
531                  IDtype=self.sa.id_type, load=self.sa.i_id) /
532                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
533                  IDtype=self.sa.id_type, load=self.sa.r_id) /
534                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
535                  auth_type=AuthMethod.value(self.sa.auth_method),
536                  load=self.sa.auth_data) /
537                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
538                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
539                  number_of_TSs=len(tsi),
540                  traffic_selector=tsi) /
541                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
542                  number_of_TSs=len(tsr),
543                  traffic_selector=tsr) /
544                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
545         encr = self.sa.encrypt(raw(plain))
546
547         trunc_len = self.sa.ike_integ_alg.trunc_len
548         plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
549         tlen = plen + len(ikev2.IKEv2())
550
551         sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
552                                              length=plen, load=encr)
553         sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
554                    length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
555         sa_auth /= sk_p
556
557         integ_data = raw(sa_auth)
558         hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
559                                          self.sa.my_authkey, integ_data)
560         sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
561         assert(len(sa_auth) == tlen)
562
563         packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
564                                      self.sa.dport, self.sa.natt)
565         self.pg0.add_stream(packet)
566         self.pg0.enable_capture()
567         self.pg_start()
568         capture = self.pg0.get_capture(1)
569         self.verify_sa_auth(capture[0])
570
571     def get_ike_header(self, packet):
572         try:
573             ih = packet[ikev2.IKEv2]
574         except IndexError as e:
575             # this is a workaround for getting IKEv2 layer as both ikev2 and
576             # ipsec register for port 4500
577             esp = packet[ESP]
578             ih = self.verify_and_remove_non_esp_marker(esp)
579         return ih
580
581     def verify_sa_init(self, packet):
582         ih = self.get_ike_header(packet)
583
584         self.assertEqual(ih.exch_type, 34)
585         self.assertTrue('Response' in ih.flags)
586         self.assertEqual(ih.init_SPI, self.sa.ispi)
587         self.assertNotEqual(ih.resp_SPI, 0)
588         self.sa.rspi = ih.resp_SPI
589         try:
590             sa = ih[ikev2.IKEv2_payload_SA]
591             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
592             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
593         except AttributeError as e:
594             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
595             raise
596         self.sa.complete_dh_data()
597         self.sa.calc_keys()
598         self.sa.auth_init()
599
600     def verify_and_remove_non_esp_marker(self, packet):
601         if self.sa.natt:
602             # if we are in nat traversal mode check for non esp marker
603             # and remove it
604             data = raw(packet)
605             self.assertEqual(data[:4], b'\x00' * 4)
606             return ikev2.IKEv2(data[4:])
607         else:
608             return packet
609
610     def verify_udp(self, udp):
611         self.assertEqual(udp.sport, self.sa.sport)
612         self.assertEqual(udp.dport, self.sa.dport)
613
614     def verify_sa_auth(self, packet):
615         ike = self.get_ike_header(packet)
616         udp = packet[UDP]
617         self.verify_udp(udp)
618         plain = self.sa.hmac_and_decrypt(ike)
619         self.sa.calc_child_keys()
620
621     def verify_child_sas(self):
622         sas = self.vapi.ipsec_sa_dump()
623         self.assertEqual(len(sas), 2)
624         sa0 = sas[0].entry
625         sa1 = sas[1].entry
626         c = self.sa.child_sas[0]
627
628         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
629         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
630         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
631
632         if self.sa.esp_integ is None:
633             vpp_integ_alg = 0
634         else:
635             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
636         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
637         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
638
639         # verify crypto keys
640         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
641         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
642         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
643         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
644
645         # verify integ keys
646         if vpp_integ_alg:
647             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
648             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
649             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
650             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
651         else:
652             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
653             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
654
655     def test_responder(self):
656         self.send_sa_init(self.sa.natt)
657         self.send_sa_auth()
658         self.verify_child_sas()
659
660
661 class Ikev2Params(object):
662     def config_params(self, params={}):
663         ec = VppEnum.vl_api_ipsec_crypto_alg_t
664         ei = VppEnum.vl_api_ipsec_integ_alg_t
665         self.vpp_enums = {
666                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
667                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
668                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
669                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
670                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
671                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
672
673                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
674                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
675                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
676                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
677
678         is_natt = 'natt' in params and params['natt'] or False
679         self.p = Profile(self, 'pr1')
680
681         if 'auth' in params and params['auth'] == 'rsa-sig':
682             auth_method = 'rsa-sig'
683             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
684             self.vapi.ikev2_set_local_key(
685                     key_file=work_dir + params['server-key'])
686
687             client_file = work_dir + params['client-cert']
688             server_pem = open(work_dir + params['server-cert']).read()
689             client_priv = open(work_dir + params['client-key']).read()
690             client_priv = load_pem_private_key(str.encode(client_priv), None,
691                                                default_backend())
692             self.peer_cert = x509.load_pem_x509_certificate(
693                     str.encode(server_pem),
694                     default_backend())
695             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
696             auth_data = None
697         else:
698             auth_data = b'$3cr3tpa$$w0rd'
699             self.p.add_auth(method='shared-key', data=auth_data)
700             auth_method = 'shared-key'
701             client_priv = None
702
703         self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
704         self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
705         self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
706         self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
707
708         self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
709                           r_id=self.p.local_id['data'],
710                           id_type=self.p.local_id['id_type'], natt=is_natt,
711                           priv_key=client_priv, auth_method=auth_method,
712                           auth_data=auth_data,
713                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
714
715         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
716             params['ike-crypto']
717         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
718             params['ike-integ']
719         ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
720
721         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
722             params['esp-crypto']
723         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
724             params['esp-integ']
725
726         self.sa.set_ike_props(
727                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
728                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
729         self.sa.set_esp_props(
730                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
731                 integ=esp_integ)
732
733
734 class TestResponderNATT(TemplateResponder, Ikev2Params):
735     """ test ikev2 responder - nat traversal """
736     def config_tc(self):
737         self.config_params(
738                 {'natt': True})
739
740
741 class TestResponderPsk(TemplateResponder, Ikev2Params):
742     """ test ikev2 responder - pre shared key auth """
743     def config_tc(self):
744         self.config_params()
745
746
747 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
748     """ test ikev2 responder - cert based auth """
749     def config_tc(self):
750         self.config_params({
751             'auth': 'rsa-sig',
752             'server-key': 'server-key.pem',
753             'client-key': 'client-key.pem',
754             'client-cert': 'client-cert.pem',
755             'server-cert': 'server-cert.pem'})
756
757
758 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
759         (TemplateResponder, Ikev2Params):
760     """
761     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
762     """
763     def config_tc(self):
764         self.config_params({
765             'ike-crypto': ('AES-CBC', 16),
766             'ike-integ': 'SHA2-256-128',
767             'esp-crypto': ('AES-CBC', 24),
768             'esp-integ': 'SHA2-384-192',
769             'ike-dh': '2048MODPgr'})
770
771
772 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
773         (TemplateResponder, Ikev2Params):
774     """
775     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
776     """
777     def config_tc(self):
778         self.config_params({
779             'ike-crypto': ('AES-CBC', 32),
780             'ike-integ': 'SHA2-256-128',
781             'esp-crypto': ('AES-GCM-16ICV', 32),
782             'esp-integ': 'NULL',
783             'ike-dh': '3072MODPgr'})
784
785
786 if __name__ == '__main__':
787     unittest.main(testRunner=VppTestRunner)