ikev2: add SA dump API
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
8     Cipher,
9     algorithms,
10     modes,
11 )
12 from ipaddress import IPv4Address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.packet import raw, Raw
16 from scapy.utils import long_converter
17 from framework import VppTestCase, VppTestRunner
18 from vpp_ikev2 import Profile, IDType, AuthMethod
19 from vpp_papi import VppEnum
20
21
22 KEY_PAD = b"Key Pad for IKEv2"
23 SALT_SIZE = 4
24 GCM_ICV_SIZE = 16
25 GCM_IV_SIZE = 8
26
27
28 # defined in rfc3526
29 # tuple structure is (p, g, key_len)
30 DH = {
31     '2048MODPgr': (long_converter("""
32     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
33     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
34     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
35     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
36     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
37     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
38     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
39     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
40     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
41     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
42     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
43
44     '3072MODPgr': (long_converter("""
45     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
56     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
57     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
58     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
59     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
60     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
61 }
62
63
64 class CryptoAlgo(object):
65     def __init__(self, name, cipher, mode):
66         self.name = name
67         self.cipher = cipher
68         self.mode = mode
69         if self.cipher is not None:
70             self.bs = self.cipher.block_size // 8
71
72             if self.name == 'AES-GCM-16ICV':
73                 self.iv_len = GCM_IV_SIZE
74             else:
75                 self.iv_len = self.bs
76
77     def encrypt(self, data, key, aad=None):
78         iv = os.urandom(self.iv_len)
79         if aad is None:
80             encryptor = Cipher(self.cipher(key), self.mode(iv),
81                                default_backend()).encryptor()
82             return iv + encryptor.update(data) + encryptor.finalize()
83         else:
84             salt = key[-SALT_SIZE:]
85             nonce = salt + iv
86             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
87                                default_backend()).encryptor()
88             encryptor.authenticate_additional_data(aad)
89             data = encryptor.update(data) + encryptor.finalize()
90             data += encryptor.tag[:GCM_ICV_SIZE]
91             return iv + data
92
93     def decrypt(self, data, key, aad=None, icv=None):
94         if aad is None:
95             iv = data[:self.iv_len]
96             ct = data[self.iv_len:]
97             decryptor = Cipher(algorithms.AES(key),
98                                self.mode(iv),
99                                default_backend()).decryptor()
100             return decryptor.update(ct) + decryptor.finalize()
101         else:
102             salt = key[-SALT_SIZE:]
103             nonce = salt + data[:GCM_IV_SIZE]
104             ct = data[GCM_IV_SIZE:]
105             key = key[:-SALT_SIZE]
106             decryptor = Cipher(algorithms.AES(key),
107                                self.mode(nonce, icv, len(icv)),
108                                default_backend()).decryptor()
109             decryptor.authenticate_additional_data(aad)
110             pt = decryptor.update(ct) + decryptor.finalize()
111             pad_len = pt[-1] + 1
112             return pt[:-pad_len]
113
114     def pad(self, data):
115         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
116         data = data + b'\x00' * (pad_len - 1)
117         return data + bytes([pad_len])
118
119
120 class AuthAlgo(object):
121     def __init__(self, name, mac, mod, key_len, trunc_len=None):
122         self.name = name
123         self.mac = mac
124         self.mod = mod
125         self.key_len = key_len
126         self.trunc_len = trunc_len or key_len
127
128
129 CRYPTO_ALGOS = {
130     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
131     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
132     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
133                                 mode=modes.GCM),
134 }
135
136 AUTH_ALGOS = {
137     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
138     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
139     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
140     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
141     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
142 }
143
144 PRF_ALGOS = {
145     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
147                                   hashes.SHA256, 32),
148 }
149
150
151 class IKEv2ChildSA(object):
152     def __init__(self, local_ts, remote_ts, spi=None):
153         self.spi = spi or os.urandom(4)
154         self.local_ts = local_ts
155         self.remote_ts = remote_ts
156
157
158 class IKEv2SA(object):
159     def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
160                  i_id=None, r_id=None, id_type='fqdn', nonce=None,
161                  auth_data=None, local_ts=None, remote_ts=None,
162                  auth_method='shared-key', priv_key=None, natt=False):
163         self.natt = natt
164         if natt:
165             self.sport = 4500
166             self.dport = 4500
167         else:
168             self.sport = 500
169             self.dport = 500
170         self.dh_params = None
171         self.test = test
172         self.priv_key = priv_key
173         self.is_initiator = is_initiator
174         nonce = nonce or os.urandom(32)
175         self.auth_data = auth_data
176         self.i_id = i_id
177         self.r_id = r_id
178         if isinstance(id_type, str):
179             self.id_type = IDType.value(id_type)
180         else:
181             self.id_type = id_type
182         self.auth_method = auth_method
183         if self.is_initiator:
184             self.rspi = None
185             self.ispi = spi
186             self.i_nonce = nonce
187         else:
188             self.rspi = spi
189             self.ispi = None
190             self.r_nonce = None
191         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
192
193     def dh_pub_key(self):
194         return self.i_dh_data
195
196     def compute_secret(self):
197         priv = self.dh_private_key
198         peer = self.r_dh_data
199         p, g, l = self.ike_group
200         return pow(int.from_bytes(peer, 'big'),
201                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
202
203     def generate_dh_data(self):
204         # generate DH keys
205         if self.is_initiator:
206             if self.ike_dh not in DH:
207                 raise NotImplementedError('%s not in DH group' % self.ike_dh)
208             if self.dh_params is None:
209                 dhg = DH[self.ike_dh]
210                 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
211                 self.dh_params = pn.parameters(default_backend())
212             priv = self.dh_params.generate_private_key()
213             pub = priv.public_key()
214             x = priv.private_numbers().x
215             self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
216             y = pub.public_numbers().y
217             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
218
219     def complete_dh_data(self):
220         self.dh_shared_secret = self.compute_secret()
221
222     def calc_child_keys(self):
223         prf = self.ike_prf_alg.mod()
224         s = self.i_nonce + self.r_nonce
225         c = self.child_sas[0]
226
227         encr_key_len = self.esp_crypto_key_len
228         integ_key_len = self.esp_integ_alg.key_len
229         salt_len = 0 if integ_key_len else 4
230
231         l = (integ_key_len * 2 +
232              encr_key_len * 2 +
233              salt_len * 2)
234         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
235
236         pos = 0
237         c.sk_ei = keymat[pos:pos+encr_key_len]
238         pos += encr_key_len
239
240         if integ_key_len:
241             c.sk_ai = keymat[pos:pos+integ_key_len]
242             pos += integ_key_len
243         else:
244             c.salt_ei = keymat[pos:pos+salt_len]
245             pos += salt_len
246
247         c.sk_er = keymat[pos:pos+encr_key_len]
248         pos += encr_key_len
249
250         if integ_key_len:
251             c.sk_ar = keymat[pos:pos+integ_key_len]
252             pos += integ_key_len
253         else:
254             c.salt_er = keymat[pos:pos+salt_len]
255             pos += salt_len
256
257     def calc_prfplus(self, prf, key, seed, length):
258         r = b''
259         t = None
260         x = 1
261         while len(r) < length and x < 255:
262             if t is not None:
263                 s = t
264             else:
265                 s = b''
266             s = s + seed + bytes([x])
267             t = self.calc_prf(prf, key, s)
268             r = r + t
269             x = x + 1
270
271         if x == 255:
272             return None
273         return r
274
275     def calc_prf(self, prf, key, data):
276         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
277         h.update(data)
278         return h.finalize()
279
280     def calc_keys(self):
281         prf = self.ike_prf_alg.mod()
282         # SKEYSEED = prf(Ni | Nr, g^ir)
283         s = self.i_nonce + self.r_nonce
284         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
285
286         # calculate S = Ni | Nr | SPIi SPIr
287         s = s + self.ispi + self.rspi
288
289         prf_key_trunc = self.ike_prf_alg.trunc_len
290         encr_key_len = self.ike_crypto_key_len
291         tr_prf_key_len = self.ike_prf_alg.key_len
292         integ_key_len = self.ike_integ_alg.key_len
293         if integ_key_len == 0:
294             salt_size = 4
295         else:
296             salt_size = 0
297
298         l = (prf_key_trunc +
299              integ_key_len * 2 +
300              encr_key_len * 2 +
301              tr_prf_key_len * 2 +
302              salt_size * 2)
303         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
304
305         pos = 0
306         self.sk_d = keymat[:pos+prf_key_trunc]
307         pos += prf_key_trunc
308
309         self.sk_ai = keymat[pos:pos+integ_key_len]
310         pos += integ_key_len
311         self.sk_ar = keymat[pos:pos+integ_key_len]
312         pos += integ_key_len
313
314         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
315         pos += encr_key_len + salt_size
316         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
317         pos += encr_key_len + salt_size
318
319         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
320         pos += tr_prf_key_len
321         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
322
323     def generate_authmsg(self, prf, packet):
324         if self.is_initiator:
325             id = self.i_id
326             nonce = self.r_nonce
327             key = self.sk_pi
328         data = bytes([self.id_type, 0, 0, 0]) + id
329         id_hash = self.calc_prf(prf, key, data)
330         return packet + nonce + id_hash
331
332     def auth_init(self):
333         prf = self.ike_prf_alg.mod()
334         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
335         if self.auth_method == 'shared-key':
336             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
337             self.auth_data = self.calc_prf(prf, psk, authmsg)
338         elif self.auth_method == 'rsa-sig':
339             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
340                                                 hashes.SHA1())
341         else:
342             raise TypeError('unknown auth method type!')
343
344     def encrypt(self, data, aad=None):
345         data = self.ike_crypto_alg.pad(data)
346         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
347
348     @property
349     def peer_authkey(self):
350         if self.is_initiator:
351             return self.sk_ar
352         return self.sk_ai
353
354     @property
355     def my_authkey(self):
356         if self.is_initiator:
357             return self.sk_ai
358         return self.sk_ar
359
360     @property
361     def my_cryptokey(self):
362         if self.is_initiator:
363             return self.sk_ei
364         return self.sk_er
365
366     @property
367     def peer_cryptokey(self):
368         if self.is_initiator:
369             return self.sk_er
370         return self.sk_ei
371
372     def concat(self, alg, key_len):
373         return alg + '-' + str(key_len * 8)
374
375     @property
376     def vpp_ike_cypto_alg(self):
377         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
378
379     @property
380     def vpp_esp_cypto_alg(self):
381         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
382
383     def verify_hmac(self, ikemsg):
384         integ_trunc = self.ike_integ_alg.trunc_len
385         exp_hmac = ikemsg[-integ_trunc:]
386         data = ikemsg[:-integ_trunc]
387         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
388                                           self.peer_authkey, data)
389         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
390
391     def compute_hmac(self, integ, key, data):
392         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
393         h.update(data)
394         return h.finalize()
395
396     def decrypt(self, data, aad=None, icv=None):
397         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
398
399     def hmac_and_decrypt(self, ike):
400         ep = ike[ikev2.IKEv2_payload_Encrypted]
401         if self.ike_crypto == 'AES-GCM-16ICV':
402             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
403             ct = ep.load[:-GCM_ICV_SIZE]
404             tag = ep.load[-GCM_ICV_SIZE:]
405             return self.decrypt(ct, raw(ike)[:aad_len], tag)
406         else:
407             self.verify_hmac(raw(ike))
408             integ_trunc = self.ike_integ_alg.trunc_len
409
410             # remove ICV and decrypt payload
411             ct = ep.load[:-integ_trunc]
412             return self.decrypt(ct)
413
414     def generate_ts(self):
415         c = self.child_sas[0]
416         ts1 = ikev2.IPv4TrafficSelector(
417                 IP_protocol_ID=0,
418                 start_port=0,
419                 end_port=0xffff,
420                 starting_address_v4=c.local_ts['start_addr'],
421                 ending_address_v4=c.local_ts['end_addr'])
422         ts2 = ikev2.IPv4TrafficSelector(
423                 IP_protocol_ID=0,
424                 starting_address_v4=c.remote_ts['start_addr'],
425                 ending_address_v4=c.remote_ts['end_addr'])
426         return ([ts1], [ts2])
427
428     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
429         if crypto not in CRYPTO_ALGOS:
430             raise TypeError('unsupported encryption algo %r' % crypto)
431         self.ike_crypto = crypto
432         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
433         self.ike_crypto_key_len = crypto_key_len
434
435         if integ not in AUTH_ALGOS:
436             raise TypeError('unsupported auth algo %r' % integ)
437         self.ike_integ = None if integ == 'NULL' else integ
438         self.ike_integ_alg = AUTH_ALGOS[integ]
439
440         if prf not in PRF_ALGOS:
441             raise TypeError('unsupported prf algo %r' % prf)
442         self.ike_prf = prf
443         self.ike_prf_alg = PRF_ALGOS[prf]
444         self.ike_dh = dh
445         self.ike_group = DH[self.ike_dh]
446
447     def set_esp_props(self, crypto, crypto_key_len, integ):
448         self.esp_crypto_key_len = crypto_key_len
449         if crypto not in CRYPTO_ALGOS:
450             raise TypeError('unsupported encryption algo %r' % crypto)
451         self.esp_crypto = crypto
452         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
453
454         if integ not in AUTH_ALGOS:
455             raise TypeError('unsupported auth algo %r' % integ)
456         self.esp_integ = None if integ == 'NULL' else integ
457         self.esp_integ_alg = AUTH_ALGOS[integ]
458
459     def crypto_attr(self, key_len):
460         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
461             return (0x800e << 16 | key_len << 3, 12)
462         else:
463             raise Exception('unsupported attribute type')
464
465     def ike_crypto_attr(self):
466         return self.crypto_attr(self.ike_crypto_key_len)
467
468     def esp_crypto_attr(self):
469         return self.crypto_attr(self.esp_crypto_key_len)
470
471     def compute_nat_sha1(self, ip, port):
472         data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
473         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
474         digest.update(data)
475         return digest.finalize()
476
477
478 class TemplateResponder(VppTestCase):
479     """ responder test template """
480
481     @classmethod
482     def setUpClass(cls):
483         import scapy.contrib.ikev2 as _ikev2
484         globals()['ikev2'] = _ikev2
485         super(TemplateResponder, cls).setUpClass()
486         cls.create_pg_interfaces(range(2))
487         for i in cls.pg_interfaces:
488             i.admin_up()
489             i.config_ip4()
490             i.resolve_arp()
491
492     @classmethod
493     def tearDownClass(cls):
494         super(TemplateResponder, cls).tearDownClass()
495
496     def setUp(self):
497         super(TemplateResponder, self).setUp()
498         self.config_tc()
499         self.p.add_vpp_config()
500         self.assertIsNotNone(self.p.query_vpp_config())
501         self.sa.generate_dh_data()
502
503     def tearDown(self):
504         super(TemplateResponder, self).tearDown()
505         self.p.remove_vpp_config()
506         self.assertIsNone(self.p.query_vpp_config())
507
508     def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
509         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
510                IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
511                UDP(sport=sport, dport=dport))
512         if natt:
513             # insert non ESP marker
514             res = res / Raw(b'\x00' * 4)
515         return res / msg
516
517     def send_sa_init(self, behind_nat=False):
518         tr_attr = self.sa.ike_crypto_attr()
519         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
520                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
521                  key_length=tr_attr[0]) /
522                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
523                  transform_id=self.sa.ike_integ) /
524                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
525                  transform_id=self.sa.ike_prf_alg.name) /
526                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
527                  transform_id=self.sa.ike_dh))
528
529         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
530                  trans_nb=4, trans=trans))
531
532         if behind_nat:
533             next_payload = 'Notify'
534         else:
535             next_payload = None
536
537         self.sa.init_req_packet = (
538                 ikev2.IKEv2(init_SPI=self.sa.ispi,
539                             flags='Initiator', exch_type='IKE_SA_INIT') /
540                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
541                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
542                                        group=self.sa.ike_dh,
543                                        load=self.sa.dh_pub_key()) /
544                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
545                                           load=self.sa.i_nonce))
546
547         if behind_nat:
548             src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
549                                                self.sa.sport)
550             nat_detection = ikev2.IKEv2_payload_Notify(
551                     type='NAT_DETECTION_SOURCE_IP',
552                     load=src_nat)
553             self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
554
555         ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
556                                       self.sa.sport, self.sa.dport,
557                                       self.sa.natt)
558         self.pg0.add_stream(ike_msg)
559         self.pg0.enable_capture()
560         self.pg_start()
561         capture = self.pg0.get_capture(1)
562         self.verify_sa_init(capture[0])
563
564     def send_sa_auth(self):
565         tr_attr = self.sa.esp_crypto_attr()
566         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
567                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
568                  key_length=tr_attr[0]) /
569                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
570                  transform_id=self.sa.esp_integ) /
571                  ikev2.IKEv2_payload_Transform(
572                  transform_type='Extended Sequence Number',
573                  transform_id='No ESN') /
574                  ikev2.IKEv2_payload_Transform(
575                  transform_type='Extended Sequence Number',
576                  transform_id='ESN'))
577
578         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
579                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
580
581         tsi, tsr = self.sa.generate_ts()
582         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
583                  IDtype=self.sa.id_type, load=self.sa.i_id) /
584                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
585                  IDtype=self.sa.id_type, load=self.sa.r_id) /
586                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
587                  auth_type=AuthMethod.value(self.sa.auth_method),
588                  load=self.sa.auth_data) /
589                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
590                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
591                  number_of_TSs=len(tsi),
592                  traffic_selector=tsi) /
593                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
594                  number_of_TSs=len(tsr),
595                  traffic_selector=tsr) /
596                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
597
598         if self.sa.ike_crypto == 'AES-GCM-16ICV':
599             data = self.sa.ike_crypto_alg.pad(raw(plain))
600             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
601                 len(ikev2.IKEv2_payload_Encrypted())
602             tlen = plen + len(ikev2.IKEv2())
603
604             # prepare aad data
605             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
606                                                  length=plen)
607             sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
608                        resp_SPI=self.sa.rspi, id=1,
609                        length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
610             sa_auth /= sk_p
611
612             encr = self.sa.encrypt(raw(plain), raw(sa_auth))
613             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
614                                                  length=plen, load=encr)
615             sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
616                        resp_SPI=self.sa.rspi, id=1,
617                        length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
618             sa_auth /= sk_p
619         else:
620             encr = self.sa.encrypt(raw(plain))
621             trunc_len = self.sa.ike_integ_alg.trunc_len
622             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
623             tlen = plen + len(ikev2.IKEv2())
624
625             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
626                                                  length=plen, load=encr)
627             sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
628                        resp_SPI=self.sa.rspi, id=1,
629                        length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
630             sa_auth /= sk_p
631
632             integ_data = raw(sa_auth)
633             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
634                                              self.sa.my_authkey, integ_data)
635             sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
636
637         assert(len(sa_auth) == tlen)
638         packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
639                                      self.sa.dport, self.sa.natt)
640         self.pg0.add_stream(packet)
641         self.pg0.enable_capture()
642         self.pg_start()
643         capture = self.pg0.get_capture(1)
644         self.verify_sa_auth(capture[0])
645
646     def get_ike_header(self, packet):
647         try:
648             ih = packet[ikev2.IKEv2]
649         except IndexError as e:
650             # this is a workaround for getting IKEv2 layer as both ikev2 and
651             # ipsec register for port 4500
652             esp = packet[ESP]
653             ih = self.verify_and_remove_non_esp_marker(esp)
654         return ih
655
656     def verify_sa_init(self, packet):
657         ih = self.get_ike_header(packet)
658
659         self.assertEqual(ih.exch_type, 34)
660         self.assertTrue('Response' in ih.flags)
661         self.assertEqual(ih.init_SPI, self.sa.ispi)
662         self.assertNotEqual(ih.resp_SPI, 0)
663         self.sa.rspi = ih.resp_SPI
664         try:
665             sa = ih[ikev2.IKEv2_payload_SA]
666             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
667             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
668         except IndexError as e:
669             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
670             self.logger.error(ih.show())
671             raise
672         self.sa.complete_dh_data()
673         self.sa.calc_keys()
674         self.sa.auth_init()
675
676     def verify_and_remove_non_esp_marker(self, packet):
677         if self.sa.natt:
678             # if we are in nat traversal mode check for non esp marker
679             # and remove it
680             data = raw(packet)
681             self.assertEqual(data[:4], b'\x00' * 4)
682             return ikev2.IKEv2(data[4:])
683         else:
684             return packet
685
686     def verify_udp(self, udp):
687         self.assertEqual(udp.sport, self.sa.sport)
688         self.assertEqual(udp.dport, self.sa.dport)
689
690     def verify_sa_auth(self, packet):
691         ike = self.get_ike_header(packet)
692         udp = packet[UDP]
693         self.verify_udp(udp)
694         plain = self.sa.hmac_and_decrypt(ike)
695         self.sa.calc_child_keys()
696
697     def verify_ipsec_sas(self):
698         sas = self.vapi.ipsec_sa_dump()
699         self.assertEqual(len(sas), 2)
700         sa0 = sas[0].entry
701         sa1 = sas[1].entry
702         c = self.sa.child_sas[0]
703
704         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
705         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
706         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
707
708         if self.sa.esp_integ is None:
709             vpp_integ_alg = 0
710         else:
711             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
712         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
713         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
714
715         # verify crypto keys
716         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
717         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
718         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
719         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
720
721         # verify integ keys
722         if vpp_integ_alg:
723             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
724             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
725             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
726             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
727         else:
728             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
729             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
730
731     def verify_keymat(self, api_keys, keys, name):
732         km = getattr(keys, name)
733         api_km = getattr(api_keys, name)
734         api_km_len = getattr(api_keys, name + '_len')
735         self.assertEqual(len(km), api_km_len)
736         self.assertEqual(km, api_km[:api_km_len])
737
738     def verify_id(self, api_id, exp_id):
739         self.assertEqual(api_id.type, IDType.value(exp_id.type))
740         self.assertEqual(api_id.data_len, exp_id.data_len)
741         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
742
743     def verify_ike_sas(self):
744         r = self.vapi.ikev2_sa_dump()
745         self.assertEqual(len(r), 1)
746         sa = r[0].sa
747         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'little'))
748         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
749         self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
750         self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
751         self.verify_keymat(sa.keys, self.sa, 'sk_d')
752         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
753         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
754         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
755         self.verify_keymat(sa.keys, self.sa, 'sk_er')
756         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
757         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
758
759         self.assertEqual(sa.i_id.type, self.sa.id_type)
760         self.assertEqual(sa.r_id.type, self.sa.id_type)
761         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
762         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
763         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
764         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
765
766         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
767         self.assertEqual(len(r), 1)
768         csa = r[0].child_sa
769         self.assertEqual(csa.sa_index, sa.sa_index)
770         c = self.sa.child_sas[0]
771         if hasattr(c, 'sk_ai'):
772             self.verify_keymat(csa.keys, c, 'sk_ai')
773             self.verify_keymat(csa.keys, c, 'sk_ar')
774         self.verify_keymat(csa.keys, c, 'sk_ei')
775         self.verify_keymat(csa.keys, c, 'sk_er')
776
777         tsi, tsr = self.sa.generate_ts()
778         tsi = tsi[0]
779         tsr = tsr[0]
780         r = self.vapi.ikev2_traffic_selector_dump(
781                 is_initiator=True, sa_index=sa.sa_index,
782                 child_sa_index=csa.child_sa_index)
783         self.assertEqual(len(r), 1)
784         ts = r[0].ts
785         self.verify_ts(r[0].ts, tsi[0], True)
786
787         r = self.vapi.ikev2_traffic_selector_dump(
788                 is_initiator=False, sa_index=sa.sa_index,
789                 child_sa_index=csa.child_sa_index)
790         self.assertEqual(len(r), 1)
791         self.verify_ts(r[0].ts, tsr[0], False)
792
793         n = self.vapi.ikev2_nonce_get(is_initiator=True,
794                                       sa_index=sa.sa_index)
795         self.verify_nonce(n, self.sa.i_nonce)
796         n = self.vapi.ikev2_nonce_get(is_initiator=False,
797                                       sa_index=sa.sa_index)
798         self.verify_nonce(n, self.sa.r_nonce)
799
800     def verify_nonce(self, api_nonce, nonce):
801         self.assertEqual(api_nonce.data_len, len(nonce))
802         self.assertEqual(api_nonce.nonce, nonce)
803
804     def verify_ts(self, api_ts, ts, is_initiator):
805         if is_initiator:
806             self.assertTrue(api_ts.is_local)
807         else:
808             self.assertFalse(api_ts.is_local)
809         self.assertEqual(api_ts.start_addr,
810                          IPv4Address(ts.starting_address_v4))
811         self.assertEqual(api_ts.end_addr,
812                          IPv4Address(ts.ending_address_v4))
813         self.assertEqual(api_ts.start_port, ts.start_port)
814         self.assertEqual(api_ts.end_port, ts.end_port)
815         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
816
817     def test_responder(self):
818         self.send_sa_init(self.sa.natt)
819         self.send_sa_auth()
820         self.verify_ipsec_sas()
821         self.verify_ike_sas()
822
823
824 class Ikev2Params(object):
825     def config_params(self, params={}):
826         ec = VppEnum.vl_api_ipsec_crypto_alg_t
827         ei = VppEnum.vl_api_ipsec_integ_alg_t
828         self.vpp_enums = {
829                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
830                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
831                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
832                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
833                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
834                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
835
836                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
837                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
838                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
839                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
840
841         is_natt = 'natt' in params and params['natt'] or False
842         self.p = Profile(self, 'pr1')
843
844         if 'auth' in params and params['auth'] == 'rsa-sig':
845             auth_method = 'rsa-sig'
846             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
847             self.vapi.ikev2_set_local_key(
848                     key_file=work_dir + params['server-key'])
849
850             client_file = work_dir + params['client-cert']
851             server_pem = open(work_dir + params['server-cert']).read()
852             client_priv = open(work_dir + params['client-key']).read()
853             client_priv = load_pem_private_key(str.encode(client_priv), None,
854                                                default_backend())
855             self.peer_cert = x509.load_pem_x509_certificate(
856                     str.encode(server_pem),
857                     default_backend())
858             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
859             auth_data = None
860         else:
861             auth_data = b'$3cr3tpa$$w0rd'
862             self.p.add_auth(method='shared-key', data=auth_data)
863             auth_method = 'shared-key'
864             client_priv = None
865
866         self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
867         self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
868         self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
869         self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
870
871         self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
872                           r_id=self.p.local_id['data'],
873                           id_type=self.p.local_id['id_type'], natt=is_natt,
874                           priv_key=client_priv, auth_method=auth_method,
875                           auth_data=auth_data,
876                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
877
878         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
879             params['ike-crypto']
880         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
881             params['ike-integ']
882         ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
883
884         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
885             params['esp-crypto']
886         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
887             params['esp-integ']
888
889         self.sa.set_ike_props(
890                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
891                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
892         self.sa.set_esp_props(
893                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
894                 integ=esp_integ)
895
896
897 class TestApi(VppTestCase):
898     """ Test IKEV2 API """
899     @classmethod
900     def setUpClass(cls):
901         super(TestApi, cls).setUpClass()
902
903     @classmethod
904     def tearDownClass(cls):
905         super(TestApi, cls).tearDownClass()
906
907     def tearDown(self):
908         super(TestApi, self).tearDown()
909         self.p1.remove_vpp_config()
910         self.p2.remove_vpp_config()
911         r = self.vapi.ikev2_profile_dump()
912         self.assertEqual(len(r), 0)
913
914     def configure_profile(self, cfg):
915         p = Profile(self, cfg['name'])
916         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
917         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
918         p.add_local_ts(**cfg['loc_ts'])
919         p.add_remote_ts(**cfg['rem_ts'])
920         p.add_responder(cfg['responder'])
921         p.add_ike_transforms(cfg['ike_ts'])
922         p.add_esp_transforms(cfg['esp_ts'])
923         p.add_auth(**cfg['auth'])
924         p.set_udp_encap(cfg['udp_encap'])
925         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
926         if 'lifetime_data' in cfg:
927             p.set_lifetime_data(cfg['lifetime_data'])
928         if 'tun_itf' in cfg:
929             p.set_tunnel_interface(cfg['tun_itf'])
930         p.add_vpp_config()
931         return p
932
933     def test_profile_api(self):
934         """ test profile dump API """
935         loc_ts = {
936                     'proto': 8,
937                     'start_port': 1,
938                     'end_port': 19,
939                     'start_addr': '3.3.3.2',
940                     'end_addr': '3.3.3.3',
941                 }
942         rem_ts = {
943                     'proto': 9,
944                     'start_port': 10,
945                     'end_port': 119,
946                     'start_addr': '4.5.76.80',
947                     'end_addr': '2.3.4.6',
948                 }
949
950         conf = {
951             'p1': {
952                 'name': 'p1',
953                 'loc_id': ('fqdn', b'vpp.home'),
954                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
955                 'loc_ts': loc_ts,
956                 'rem_ts': rem_ts,
957                 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
958                 'ike_ts': {
959                         'crypto_alg': 20,
960                         'crypto_key_size': 32,
961                         'integ_alg': 1,
962                         'dh_group': 1},
963                 'esp_ts': {
964                         'crypto_alg': 13,
965                         'crypto_key_size': 24,
966                         'integ_alg': 2},
967                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
968                 'udp_encap': True,
969                 'ipsec_over_udp_port': 4501,
970                 'lifetime_data': {
971                     'lifetime': 123,
972                     'lifetime_maxdata': 20192,
973                     'lifetime_jitter': 9,
974                     'handover': 132},
975             },
976             'p2': {
977                 'name': 'p2',
978                 'loc_id': ('ip4-addr', b'192.168.2.1'),
979                 'rem_id': ('ip4-addr', b'192.168.2.2'),
980                 'loc_ts': loc_ts,
981                 'rem_ts': rem_ts,
982                 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
983                 'ike_ts': {
984                         'crypto_alg': 12,
985                         'crypto_key_size': 16,
986                         'integ_alg': 3,
987                         'dh_group': 3},
988                 'esp_ts': {
989                         'crypto_alg': 9,
990                         'crypto_key_size': 24,
991                         'integ_alg': 4},
992                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
993                 'udp_encap': False,
994                 'ipsec_over_udp_port': 4600,
995                 'tun_itf': 0}
996         }
997         self.p1 = self.configure_profile(conf['p1'])
998         self.p2 = self.configure_profile(conf['p2'])
999
1000         r = self.vapi.ikev2_profile_dump()
1001         self.assertEqual(len(r), 2)
1002         self.verify_profile(r[0].profile, conf['p1'])
1003         self.verify_profile(r[1].profile, conf['p2'])
1004
1005     def verify_id(self, api_id, cfg_id):
1006         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1007         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1008
1009     def verify_ts(self, api_ts, cfg_ts):
1010         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1011         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1012         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1013         self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
1014         self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
1015
1016     def verify_responder(self, api_r, cfg_r):
1017         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1018         self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
1019
1020     def verify_transforms(self, api_ts, cfg_ts):
1021         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1022         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1023         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1024
1025     def verify_ike_transforms(self, api_ts, cfg_ts):
1026         self.verify_transforms(api_ts, cfg_ts)
1027         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1028
1029     def verify_esp_transforms(self, api_ts, cfg_ts):
1030         self.verify_transforms(api_ts, cfg_ts)
1031
1032     def verify_auth(self, api_auth, cfg_auth):
1033         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1034         self.assertEqual(api_auth.data, cfg_auth['data'])
1035         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1036
1037     def verify_lifetime_data(self, p, ld):
1038         self.assertEqual(p.lifetime, ld['lifetime'])
1039         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1040         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1041         self.assertEqual(p.handover, ld['handover'])
1042
1043     def verify_profile(self, ap, cp):
1044         self.assertEqual(ap.name, cp['name'])
1045         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1046         self.verify_id(ap.loc_id, cp['loc_id'])
1047         self.verify_id(ap.rem_id, cp['rem_id'])
1048         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1049         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1050         self.verify_responder(ap.responder, cp['responder'])
1051         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1052         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1053         self.verify_auth(ap.auth, cp['auth'])
1054         if 'lifetime_data' in cp:
1055             self.verify_lifetime_data(ap, cp['lifetime_data'])
1056         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1057         if 'tun_itf' in cp:
1058             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1059         else:
1060             self.assertEqual(ap.tun_itf, 0xffffffff)
1061
1062
1063 class TestResponderNATT(TemplateResponder, Ikev2Params):
1064     """ test ikev2 responder - nat traversal """
1065     def config_tc(self):
1066         self.config_params(
1067                 {'natt': True})
1068
1069
1070 class TestResponderPsk(TemplateResponder, Ikev2Params):
1071     """ test ikev2 responder - pre shared key auth """
1072     def config_tc(self):
1073         self.config_params()
1074
1075
1076 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1077     """ test ikev2 responder - cert based auth """
1078     def config_tc(self):
1079         self.config_params({
1080             'auth': 'rsa-sig',
1081             'server-key': 'server-key.pem',
1082             'client-key': 'client-key.pem',
1083             'client-cert': 'client-cert.pem',
1084             'server-cert': 'server-cert.pem'})
1085
1086
1087 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1088         (TemplateResponder, Ikev2Params):
1089     """
1090     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1091     """
1092     def config_tc(self):
1093         self.config_params({
1094             'ike-crypto': ('AES-CBC', 16),
1095             'ike-integ': 'SHA2-256-128',
1096             'esp-crypto': ('AES-CBC', 24),
1097             'esp-integ': 'SHA2-384-192',
1098             'ike-dh': '2048MODPgr'})
1099
1100
1101 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1102         (TemplateResponder, Ikev2Params):
1103     """
1104     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1105     """
1106     def config_tc(self):
1107         self.config_params({
1108             'ike-crypto': ('AES-CBC', 32),
1109             'ike-integ': 'SHA2-256-128',
1110             'esp-crypto': ('AES-GCM-16ICV', 32),
1111             'esp-integ': 'NULL',
1112             'ike-dh': '3072MODPgr'})
1113
1114
1115 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1116     """
1117     IKE:AES_GCM_16_256
1118     """
1119     def config_tc(self):
1120         self.config_params({
1121             'ike-crypto': ('AES-GCM-16ICV', 32),
1122             'ike-integ': 'NULL',
1123             'ike-dh': '2048MODPgr'})
1124
1125
1126 if __name__ == '__main__':
1127     unittest.main(testRunner=VppTestRunner)