tests: ikev2: add nat traversal & cert based auth test
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
8     Cipher,
9     algorithms,
10     modes,
11 )
12 from scapy.layers.ipsec import ESP
13 from scapy.layers.inet import IP, UDP, Ether
14 from scapy.packet import raw, Raw
15 from scapy.utils import long_converter
16 from framework import VppTestCase, VppTestRunner
17 from vpp_ikev2 import Profile, IDType, AuthMethod
18
19
20 KEY_PAD = b"Key Pad for IKEv2"
21
22
23 # defined in rfc3526
24 # tuple structure is (p, g, key_len)
25 DH = {
26     '2048MODPgr': (long_converter("""
27     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
28     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
29     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
30     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
31     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
32     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
33     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
34     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
35     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
36     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
37     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256)
38 }
39
40
41 class CryptoAlgo(object):
42     def __init__(self, name, cipher, mode):
43         self.name = name
44         self.cipher = cipher
45         self.mode = mode
46         if self.cipher is not None:
47             self.bs = self.cipher.block_size // 8
48
49     def encrypt(self, data, key):
50         iv = os.urandom(self.bs)
51         encryptor = Cipher(self.cipher(key), self.mode(iv),
52                            default_backend()).encryptor()
53         return iv + encryptor.update(data) + encryptor.finalize()
54
55     def decrypt(self, data, key, icv=None):
56         iv = data[:self.bs]
57         ct = data[self.bs:]
58         decryptor = Cipher(algorithms.AES(key),
59                            modes.CBC(iv),
60                            default_backend()).decryptor()
61         return decryptor.update(ct) + decryptor.finalize()
62
63     def pad(self, data):
64         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
65         data = data + b'\x00' * (pad_len - 1)
66         return data + bytes([pad_len])
67
68
69 class AuthAlgo(object):
70     def __init__(self, name, mac, mod, key_len, trunc_len=None):
71         self.name = name
72         self.mac = mac
73         self.mod = mod
74         self.key_len = key_len
75         self.trunc_len = trunc_len or key_len
76
77
78 CRYPTO_ALGOS = {
79     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
80     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
81 }
82
83 AUTH_ALGOS = {
84     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
85     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
86 }
87
88 PRF_ALGOS = {
89     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
90     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
91                                   hashes.SHA256, 32),
92 }
93
94
95 class IKEv2ChildSA(object):
96     def __init__(self, local_ts, remote_ts, spi=None):
97         self.spi = spi or os.urandom(4)
98         self.local_ts = local_ts
99         self.remote_ts = remote_ts
100
101
102 class IKEv2SA(object):
103     def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
104                  i_id=None, r_id=None, id_type='fqdn', nonce=None,
105                  auth_data=None, local_ts=None, remote_ts=None,
106                  auth_method='shared-key', priv_key=None, natt=False):
107         self.natt = natt
108         if natt:
109             self.sport = 4500
110             self.dport = 4500
111         else:
112             self.sport = 500
113             self.dport = 500
114         self.dh_params = None
115         self.test = test
116         self.priv_key = priv_key
117         self.is_initiator = is_initiator
118         nonce = nonce or os.urandom(32)
119         self.auth_data = auth_data
120         self.i_id = i_id
121         self.r_id = r_id
122         if isinstance(id_type, str):
123             self.id_type = IDType.value(id_type)
124         else:
125             self.id_type = id_type
126         self.auth_method = auth_method
127         if self.is_initiator:
128             self.rspi = None
129             self.ispi = spi
130             self.i_nonce = nonce
131         else:
132             self.rspi = spi
133             self.ispi = None
134             self.r_nonce = None
135         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
136
137     def dh_pub_key(self):
138         return self.i_dh_data
139
140     def compute_secret(self):
141         priv = self.dh_private_key
142         peer = self.r_dh_data
143         p, g, l = self.ike_group
144         return pow(int.from_bytes(peer, 'big'),
145                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
146
147     def generate_dh_data(self):
148         # generate DH keys
149         if self.is_initiator:
150             if self.ike_dh not in DH:
151                 raise NotImplementedError('%s not in DH group' % self.ike_dh)
152             if self.dh_params is None:
153                 dhg = DH[self.ike_dh]
154                 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
155                 self.dh_params = pn.parameters(default_backend())
156             priv = self.dh_params.generate_private_key()
157             pub = priv.public_key()
158             x = priv.private_numbers().x
159             self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
160             y = pub.public_numbers().y
161             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
162
163     def complete_dh_data(self):
164         self.dh_shared_secret = self.compute_secret()
165
166     def calc_child_keys(self):
167         prf = self.ike_prf_alg.mod()
168         s = self.i_nonce + self.r_nonce
169         c = self.child_sas[0]
170
171         encr_key_len = self.esp_crypto_key_len
172         integ_key_len = self.ike_integ_alg.key_len
173         l = (integ_key_len * 2 +
174              encr_key_len * 2)
175         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
176
177         pos = 0
178         c.sk_ei = keymat[pos:pos+encr_key_len]
179         pos += encr_key_len
180
181         c.sk_ai = keymat[pos:pos+integ_key_len]
182         pos += integ_key_len
183
184         c.sk_er = keymat[pos:pos+encr_key_len]
185         pos += encr_key_len
186
187         c.sk_ar = keymat[pos:pos+integ_key_len]
188         pos += integ_key_len
189
190     def calc_prfplus(self, prf, key, seed, length):
191         r = b''
192         t = None
193         x = 1
194         while len(r) < length and x < 255:
195             if t is not None:
196                 s = t
197             else:
198                 s = b''
199             s = s + seed + bytes([x])
200             t = self.calc_prf(prf, key, s)
201             r = r + t
202             x = x + 1
203
204         if x == 255:
205             return None
206         return r
207
208     def calc_prf(self, prf, key, data):
209         h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
210         h.update(data)
211         return h.finalize()
212
213     def calc_keys(self):
214         prf = self.ike_prf_alg.mod()
215         # SKEYSEED = prf(Ni | Nr, g^ir)
216         s = self.i_nonce + self.r_nonce
217         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
218
219         # calculate S = Ni | Nr | SPIi SPIr
220         s = s + self.ispi + self.rspi
221
222         prf_key_trunc = self.ike_prf_alg.trunc_len
223         encr_key_len = self.ike_crypto_key_len
224         tr_prf_key_len = self.ike_prf_alg.key_len
225         integ_key_len = self.ike_integ_alg.key_len
226         l = (prf_key_trunc +
227              integ_key_len * 2 +
228              encr_key_len * 2 +
229              tr_prf_key_len * 2)
230         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
231
232         pos = 0
233         self.sk_d = keymat[:pos+prf_key_trunc]
234         pos += prf_key_trunc
235
236         self.sk_ai = keymat[pos:pos+integ_key_len]
237         pos += integ_key_len
238         self.sk_ar = keymat[pos:pos+integ_key_len]
239         pos += integ_key_len
240
241         self.sk_ei = keymat[pos:pos+encr_key_len]
242         pos += encr_key_len
243         self.sk_er = keymat[pos:pos+encr_key_len]
244         pos += encr_key_len
245
246         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
247         pos += tr_prf_key_len
248         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
249
250     def generate_authmsg(self, prf, packet):
251         if self.is_initiator:
252             id = self.i_id
253             nonce = self.r_nonce
254             key = self.sk_pi
255         data = bytes([self.id_type, 0, 0, 0]) + id
256         id_hash = self.calc_prf(prf, key, data)
257         return packet + nonce + id_hash
258
259     def auth_init(self):
260         prf = self.ike_prf_alg.mod()
261         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
262         if self.auth_method == 'shared-key':
263             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
264             self.auth_data = self.calc_prf(prf, psk, authmsg)
265         elif self.auth_method == 'rsa-sig':
266             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
267                                                 hashes.SHA1())
268         else:
269             raise TypeError('unknown auth method type!')
270
271     def encrypt(self, data):
272         data = self.ike_crypto_alg.pad(data)
273         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
274
275     @property
276     def peer_authkey(self):
277         if self.is_initiator:
278             return self.sk_ar
279         return self.sk_ai
280
281     @property
282     def my_authkey(self):
283         if self.is_initiator:
284             return self.sk_ai
285         return self.sk_ar
286
287     @property
288     def my_cryptokey(self):
289         if self.is_initiator:
290             return self.sk_ei
291         return self.sk_er
292
293     @property
294     def peer_cryptokey(self):
295         if self.is_initiator:
296             return self.sk_er
297         return self.sk_ei
298
299     def verify_hmac(self, ikemsg):
300         integ_trunc = self.ike_integ_alg.trunc_len
301         exp_hmac = ikemsg[-integ_trunc:]
302         data = ikemsg[:-integ_trunc]
303         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
304                                           self.peer_authkey, data)
305         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
306
307     def compute_hmac(self, integ, key, data):
308         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
309         h.update(data)
310         return h.finalize()
311
312     def decrypt(self, data):
313         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
314
315     def hmac_and_decrypt(self, ike):
316         ep = ike[ikev2.IKEv2_payload_Encrypted]
317         self.verify_hmac(raw(ike))
318         integ_trunc = self.ike_integ_alg.trunc_len
319
320         # remove ICV and decrypt payload
321         ct = ep.load[:-integ_trunc]
322         return self.decrypt(ct)
323
324     def generate_ts(self):
325         c = self.child_sas[0]
326         ts1 = ikev2.IPv4TrafficSelector(
327                 IP_protocol_ID=0,
328                 starting_address_v4=c.local_ts['start_addr'],
329                 ending_address_v4=c.local_ts['end_addr'])
330         ts2 = ikev2.IPv4TrafficSelector(
331                 IP_protocol_ID=0,
332                 starting_address_v4=c.remote_ts['start_addr'],
333                 ending_address_v4=c.remote_ts['end_addr'])
334         return ([ts1], [ts2])
335
336     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
337         if crypto not in CRYPTO_ALGOS:
338             raise TypeError('unsupported encryption algo %r' % crypto)
339         self.ike_crypto = crypto
340         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
341         self.ike_crypto_key_len = crypto_key_len
342
343         if integ not in AUTH_ALGOS:
344             raise TypeError('unsupported auth algo %r' % integ)
345         self.ike_integ = integ
346         self.ike_integ_alg = AUTH_ALGOS[integ]
347
348         if prf not in PRF_ALGOS:
349             raise TypeError('unsupported prf algo %r' % prf)
350         self.ike_prf = prf
351         self.ike_prf_alg = PRF_ALGOS[prf]
352         self.ike_dh = dh
353         self.ike_group = DH[self.ike_dh]
354
355     def set_esp_props(self, crypto, crypto_key_len, integ):
356         self.esp_crypto_key_len = crypto_key_len
357         if crypto not in CRYPTO_ALGOS:
358             raise TypeError('unsupported encryption algo %r' % crypto)
359         self.esp_crypto = crypto
360         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
361
362         if integ not in AUTH_ALGOS:
363             raise TypeError('unsupported auth algo %r' % integ)
364         self.esp_integ = integ
365         self.esp_integ_alg = AUTH_ALGOS[integ]
366
367     def crypto_attr(self, key_len):
368         if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
369             return (0x800e << 16 | key_len << 3, 12)
370         else:
371             raise Exception('unsupported attribute type')
372
373     def ike_crypto_attr(self):
374         return self.crypto_attr(self.ike_crypto_key_len)
375
376     def esp_crypto_attr(self):
377         return self.crypto_attr(self.esp_crypto_key_len)
378
379     def compute_nat_sha1(self, ip, port):
380         data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
381         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
382         digest.update(data)
383         return digest.finalize()
384
385
386 class TemplateResponder(VppTestCase):
387     """ responder test """
388
389     @classmethod
390     def setUpClass(cls):
391         import scapy.contrib.ikev2 as _ikev2
392         globals()['ikev2'] = _ikev2
393         super(TemplateResponder, cls).setUpClass()
394         cls.create_pg_interfaces(range(2))
395         for i in cls.pg_interfaces:
396             i.admin_up()
397             i.config_ip4()
398             i.resolve_arp()
399
400     @classmethod
401     def tearDownClass(cls):
402         super(TemplateResponder, cls).tearDownClass()
403
404     def setUp(self):
405         super(TemplateResponder, self).setUp()
406         self.config_tc()
407         self.p.add_vpp_config()
408         self.sa.generate_dh_data()
409
410     def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
411         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
412                IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
413                UDP(sport=sport, dport=dport))
414         if natt:
415             # insert non ESP marker
416             res = res / Raw(b'\x00' * 4)
417         return res / msg
418
419     def send_sa_init(self, behind_nat=False):
420         tr_attr = self.sa.ike_crypto_attr()
421         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
422                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
423                  key_length=tr_attr[0]) /
424                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
425                  transform_id=self.sa.ike_integ) /
426                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
427                  transform_id=self.sa.ike_prf_alg.name) /
428                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
429                  transform_id=self.sa.ike_dh))
430
431         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
432                  trans_nb=4, trans=trans))
433
434         if behind_nat:
435             next_payload = 'Notify'
436         else:
437             next_payload = None
438
439         self.sa.init_req_packet = (
440                 ikev2.IKEv2(init_SPI=self.sa.ispi,
441                             flags='Initiator', exch_type='IKE_SA_INIT') /
442                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
443                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
444                                        group=self.sa.ike_dh,
445                                        load=self.sa.dh_pub_key()) /
446                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
447                                           load=self.sa.i_nonce))
448
449         if behind_nat:
450             src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
451                                                self.sa.sport)
452             nat_detection = ikev2.IKEv2_payload_Notify(
453                     type='NAT_DETECTION_SOURCE_IP',
454                     load=src_nat)
455             self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
456
457         ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
458                                       self.sa.sport, self.sa.dport,
459                                       self.sa.natt)
460         self.pg0.add_stream(ike_msg)
461         self.pg0.enable_capture()
462         self.pg_start()
463         capture = self.pg0.get_capture(1)
464         self.verify_sa_init(capture[0])
465
466     def send_sa_auth(self):
467         tr_attr = self.sa.esp_crypto_attr()
468         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
469                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
470                  key_length=tr_attr[0]) /
471                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
472                  transform_id=self.sa.esp_integ) /
473                  ikev2.IKEv2_payload_Transform(
474                  transform_type='Extended Sequence Number',
475                  transform_id='No ESN') /
476                  ikev2.IKEv2_payload_Transform(
477                  transform_type='Extended Sequence Number',
478                  transform_id='ESN'))
479
480         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
481                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
482
483         tsi, tsr = self.sa.generate_ts()
484         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
485                  IDtype=self.sa.id_type, load=self.sa.i_id) /
486                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
487                  IDtype=self.sa.id_type, load=self.sa.r_id) /
488                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
489                  auth_type=AuthMethod.value(self.sa.auth_method),
490                  load=self.sa.auth_data) /
491                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
492                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
493                  number_of_TSs=len(tsi),
494                  traffic_selector=tsi) /
495                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
496                  number_of_TSs=len(tsr),
497                  traffic_selector=tsr) /
498                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
499         encr = self.sa.encrypt(raw(plain))
500
501         trunc_len = self.sa.ike_integ_alg.trunc_len
502         plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
503         tlen = plen + len(ikev2.IKEv2())
504
505         sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
506                                              length=plen, load=encr)
507         sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
508                    length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
509         sa_auth /= sk_p
510
511         integ_data = raw(sa_auth)
512         hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
513                                          self.sa.my_authkey, integ_data)
514         sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
515         assert(len(sa_auth) == tlen)
516
517         packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
518                                      self.sa.dport, self.sa.natt)
519         self.pg0.add_stream(packet)
520         self.pg0.enable_capture()
521         self.pg_start()
522         capture = self.pg0.get_capture(1)
523         self.verify_sa_auth(capture[0])
524
525     def get_ike_header(self, packet):
526         try:
527             ih = packet[ikev2.IKEv2]
528         except IndexError as e:
529             # this is a workaround for getting IKEv2 layer as both ikev2 and
530             # ipsec register for port 4500
531             esp = packet[ESP]
532             ih = self.verify_and_remove_non_esp_marker(esp)
533         return ih
534
535     def verify_sa_init(self, packet):
536         ih = self.get_ike_header(packet)
537
538         self.assertEqual(ih.exch_type, 34)
539         self.assertTrue('Response' in ih.flags)
540         self.assertEqual(ih.init_SPI, self.sa.ispi)
541         self.assertNotEqual(ih.resp_SPI, 0)
542         self.sa.rspi = ih.resp_SPI
543         try:
544             sa = ih[ikev2.IKEv2_payload_SA]
545             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
546             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
547         except AttributeError as e:
548             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
549             raise
550         self.sa.complete_dh_data()
551         self.sa.calc_keys()
552         self.sa.auth_init()
553
554     def verify_and_remove_non_esp_marker(self, packet):
555         if self.sa.natt:
556             # if we are in nat traversal mode check for non esp marker
557             # and remove it
558             data = raw(packet)
559             self.assertEqual(data[:4], b'\x00' * 4)
560             return ikev2.IKEv2(data[4:])
561         else:
562             return packet
563
564     def verify_udp(self, udp):
565         self.assertEqual(udp.sport, self.sa.sport)
566         self.assertEqual(udp.dport, self.sa.dport)
567
568     def verify_sa_auth(self, packet):
569         ike = self.get_ike_header(packet)
570         udp = packet[UDP]
571         self.verify_udp(udp)
572         plain = self.sa.hmac_and_decrypt(ike)
573         self.sa.calc_child_keys()
574
575     def verify_child_sas(self):
576         sas = self.vapi.ipsec_sa_dump()
577         self.assertEqual(len(sas), 2)
578         sa0 = sas[0].entry
579         sa1 = sas[1].entry
580         c = self.sa.child_sas[0]
581
582         # verify crypto keys
583         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
584         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
585         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
586         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
587
588         # verify integ keys
589         self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
590         self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
591         self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
592         self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
593
594     def test_responder(self):
595         self.send_sa_init(self.sa.natt)
596         self.send_sa_auth()
597         self.verify_child_sas()
598
599
600 class Ikev2Params(object):
601     def config_params(self, params={}):
602         is_natt = 'natt' in params and params['natt'] or False
603         self.p = Profile(self, 'pr1')
604
605         if 'auth' in params and params['auth'] == 'rsa-sig':
606             auth_method = 'rsa-sig'
607             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
608             self.vapi.ikev2_set_local_key(
609                     key_file=work_dir + params['server-key'])
610
611             client_file = work_dir + params['client-cert']
612             server_pem = open(work_dir + params['server-cert']).read()
613             client_priv = open(work_dir + params['client-key']).read()
614             client_priv = load_pem_private_key(str.encode(client_priv), None,
615                                                default_backend())
616             self.peer_cert = x509.load_pem_x509_certificate(
617                     str.encode(server_pem),
618                     default_backend())
619             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
620             auth_data = None
621         else:
622             auth_data = b'$3cr3tpa$$w0rd'
623             self.p.add_auth(method='shared-key', data=auth_data)
624             auth_method = 'shared-key'
625             client_priv = None
626
627         self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
628         self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
629         self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
630         self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
631
632         self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
633                           r_id=self.p.local_id['data'],
634                           id_type=self.p.local_id['id_type'], natt=is_natt,
635                           priv_key=client_priv, auth_method=auth_method,
636                           auth_data=auth_data,
637                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
638
639         self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
640                               integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
641                               dh='2048MODPgr')
642         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
643                               integ='HMAC-SHA1-96')
644
645
646 class TestResponderNATT(TemplateResponder, Ikev2Params):
647     """ test ikev2 responder - nat traversal """
648     def config_tc(self):
649         self.config_params(
650                 {'natt': True})
651
652
653 class TestResponderPsk(TemplateResponder, Ikev2Params):
654     """ test ikev2 responder - pre shared key auth """
655     def config_tc(self):
656         self.config_params()
657
658
659 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
660     """ test ikev2 responder - cert based auth """
661     def config_tc(self):
662         self.config_params({
663             'auth': 'rsa-sig',
664             'server-key': 'server-key.pem',
665             'client-key': 'client-key.pem',
666             'client-cert': 'client-cert.pem',
667             'server-cert': 'server-cert.pem'})
668
669 if __name__ == '__main__':
670     unittest.main(testRunner=VppTestRunner)