ikev2: support ipv6 traffic selectors & overlay
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
8     Cipher,
9     algorithms,
10     modes,
11 )
12 from ipaddress import IPv4Address, IPv6Address, ip_address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.layers.inet6 import IPv6
16 from scapy.packet import raw, Raw
17 from scapy.utils import long_converter
18 from framework import VppTestCase, VppTestRunner
19 from vpp_ikev2 import Profile, IDType, AuthMethod
20 from vpp_papi import VppEnum
21
22 try:
23     text_type = unicode
24 except NameError:
25     text_type = str
26
27 KEY_PAD = b"Key Pad for IKEv2"
28 SALT_SIZE = 4
29 GCM_ICV_SIZE = 16
30 GCM_IV_SIZE = 8
31
32
33 # defined in rfc3526
34 # tuple structure is (p, g, key_len)
35 DH = {
36     '2048MODPgr': (long_converter("""
37     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
38     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
39     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
40     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
41     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
42     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
43     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
44     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
45     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
46     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
47     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
48
49     '3072MODPgr': (long_converter("""
50     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
51     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
52     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
53     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
54     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
55     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
56     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
57     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
58     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
59     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
60     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
61     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
62     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
63     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
64     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
65     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
66 }
67
68
69 class CryptoAlgo(object):
70     def __init__(self, name, cipher, mode):
71         self.name = name
72         self.cipher = cipher
73         self.mode = mode
74         if self.cipher is not None:
75             self.bs = self.cipher.block_size // 8
76
77             if self.name == 'AES-GCM-16ICV':
78                 self.iv_len = GCM_IV_SIZE
79             else:
80                 self.iv_len = self.bs
81
82     def encrypt(self, data, key, aad=None):
83         iv = os.urandom(self.iv_len)
84         if aad is None:
85             encryptor = Cipher(self.cipher(key), self.mode(iv),
86                                default_backend()).encryptor()
87             return iv + encryptor.update(data) + encryptor.finalize()
88         else:
89             salt = key[-SALT_SIZE:]
90             nonce = salt + iv
91             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
92                                default_backend()).encryptor()
93             encryptor.authenticate_additional_data(aad)
94             data = encryptor.update(data) + encryptor.finalize()
95             data += encryptor.tag[:GCM_ICV_SIZE]
96             return iv + data
97
98     def decrypt(self, data, key, aad=None, icv=None):
99         if aad is None:
100             iv = data[:self.iv_len]
101             ct = data[self.iv_len:]
102             decryptor = Cipher(algorithms.AES(key),
103                                self.mode(iv),
104                                default_backend()).decryptor()
105             return decryptor.update(ct) + decryptor.finalize()
106         else:
107             salt = key[-SALT_SIZE:]
108             nonce = salt + data[:GCM_IV_SIZE]
109             ct = data[GCM_IV_SIZE:]
110             key = key[:-SALT_SIZE]
111             decryptor = Cipher(algorithms.AES(key),
112                                self.mode(nonce, icv, len(icv)),
113                                default_backend()).decryptor()
114             decryptor.authenticate_additional_data(aad)
115             pt = decryptor.update(ct) + decryptor.finalize()
116             pad_len = pt[-1] + 1
117             return pt[:-pad_len]
118
119     def pad(self, data):
120         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121         data = data + b'\x00' * (pad_len - 1)
122         return data + bytes([pad_len - 1])
123
124
125 class AuthAlgo(object):
126     def __init__(self, name, mac, mod, key_len, trunc_len=None):
127         self.name = name
128         self.mac = mac
129         self.mod = mod
130         self.key_len = key_len
131         self.trunc_len = trunc_len or key_len
132
133
134 CRYPTO_ALGOS = {
135     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
138                                 mode=modes.GCM),
139 }
140
141 AUTH_ALGOS = {
142     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
147 }
148
149 PRF_ALGOS = {
150     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
152                                   hashes.SHA256, 32),
153 }
154
155
156 class IKEv2ChildSA(object):
157     def __init__(self, local_ts, remote_ts, spi=None):
158         self.spi = spi or os.urandom(4)
159         self.local_ts = local_ts
160         self.remote_ts = remote_ts
161
162
163 class IKEv2SA(object):
164     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
165                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
166                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
167                  auth_method='shared-key', priv_key=None, natt=False):
168         self.natt = natt
169         if natt:
170             self.sport = 4500
171             self.dport = 4500
172         else:
173             self.sport = 500
174             self.dport = 500
175         self.msg_id = 0
176         self.dh_params = None
177         self.test = test
178         self.priv_key = priv_key
179         self.is_initiator = is_initiator
180         nonce = nonce or os.urandom(32)
181         self.auth_data = auth_data
182         self.i_id = i_id
183         self.r_id = r_id
184         if isinstance(id_type, str):
185             self.id_type = IDType.value(id_type)
186         else:
187             self.id_type = id_type
188         self.auth_method = auth_method
189         if self.is_initiator:
190             self.rspi = 8 * b'\x00'
191             self.ispi = spi
192             self.i_nonce = nonce
193         else:
194             self.rspi = spi
195             self.ispi = 8 * b'\x00'
196             self.r_nonce = None
197         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
198
199     def new_msg_id(self):
200         self.msg_id += 1
201         return self.msg_id
202
203     def dh_pub_key(self):
204         return self.i_dh_data
205
206     def compute_secret(self):
207         priv = self.dh_private_key
208         peer = self.r_dh_data
209         p, g, l = self.ike_group
210         return pow(int.from_bytes(peer, 'big'),
211                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
212
213     def generate_dh_data(self):
214         # generate DH keys
215         if self.is_initiator:
216             if self.ike_dh not in DH:
217                 raise NotImplementedError('%s not in DH group' % self.ike_dh)
218             if self.dh_params is None:
219                 dhg = DH[self.ike_dh]
220                 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
221                 self.dh_params = pn.parameters(default_backend())
222             priv = self.dh_params.generate_private_key()
223             pub = priv.public_key()
224             x = priv.private_numbers().x
225             self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
226             y = pub.public_numbers().y
227             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
228
229     def complete_dh_data(self):
230         self.dh_shared_secret = self.compute_secret()
231
232     def calc_child_keys(self):
233         prf = self.ike_prf_alg.mod()
234         s = self.i_nonce + self.r_nonce
235         c = self.child_sas[0]
236
237         encr_key_len = self.esp_crypto_key_len
238         integ_key_len = self.esp_integ_alg.key_len
239         salt_len = 0 if integ_key_len else 4
240
241         l = (integ_key_len * 2 +
242              encr_key_len * 2 +
243              salt_len * 2)
244         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
245
246         pos = 0
247         c.sk_ei = keymat[pos:pos+encr_key_len]
248         pos += encr_key_len
249
250         if integ_key_len:
251             c.sk_ai = keymat[pos:pos+integ_key_len]
252             pos += integ_key_len
253         else:
254             c.salt_ei = keymat[pos:pos+salt_len]
255             pos += salt_len
256
257         c.sk_er = keymat[pos:pos+encr_key_len]
258         pos += encr_key_len
259
260         if integ_key_len:
261             c.sk_ar = keymat[pos:pos+integ_key_len]
262             pos += integ_key_len
263         else:
264             c.salt_er = keymat[pos:pos+salt_len]
265             pos += salt_len
266
267     def calc_prfplus(self, prf, key, seed, length):
268         r = b''
269         t = None
270         x = 1
271         while len(r) < length and x < 255:
272             if t is not None:
273                 s = t
274             else:
275                 s = b''
276             s = s + seed + bytes([x])
277             t = self.calc_prf(prf, key, s)
278             r = r + t
279             x = x + 1
280
281         if x == 255:
282             return None
283         return r
284
285     def calc_prf(self, prf, key, data):
286         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
287         h.update(data)
288         return h.finalize()
289
290     def calc_keys(self):
291         prf = self.ike_prf_alg.mod()
292         # SKEYSEED = prf(Ni | Nr, g^ir)
293         s = self.i_nonce + self.r_nonce
294         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
295
296         # calculate S = Ni | Nr | SPIi SPIr
297         s = s + self.ispi + self.rspi
298
299         prf_key_trunc = self.ike_prf_alg.trunc_len
300         encr_key_len = self.ike_crypto_key_len
301         tr_prf_key_len = self.ike_prf_alg.key_len
302         integ_key_len = self.ike_integ_alg.key_len
303         if integ_key_len == 0:
304             salt_size = 4
305         else:
306             salt_size = 0
307
308         l = (prf_key_trunc +
309              integ_key_len * 2 +
310              encr_key_len * 2 +
311              tr_prf_key_len * 2 +
312              salt_size * 2)
313         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
314
315         pos = 0
316         self.sk_d = keymat[:pos+prf_key_trunc]
317         pos += prf_key_trunc
318
319         self.sk_ai = keymat[pos:pos+integ_key_len]
320         pos += integ_key_len
321         self.sk_ar = keymat[pos:pos+integ_key_len]
322         pos += integ_key_len
323
324         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
325         pos += encr_key_len + salt_size
326         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
327         pos += encr_key_len + salt_size
328
329         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
330         pos += tr_prf_key_len
331         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
332
333     def generate_authmsg(self, prf, packet):
334         if self.is_initiator:
335             id = self.i_id
336             nonce = self.r_nonce
337             key = self.sk_pi
338         data = bytes([self.id_type, 0, 0, 0]) + id
339         id_hash = self.calc_prf(prf, key, data)
340         return packet + nonce + id_hash
341
342     def auth_init(self):
343         prf = self.ike_prf_alg.mod()
344         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
345         if self.auth_method == 'shared-key':
346             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
347             self.auth_data = self.calc_prf(prf, psk, authmsg)
348         elif self.auth_method == 'rsa-sig':
349             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
350                                                 hashes.SHA1())
351         else:
352             raise TypeError('unknown auth method type!')
353
354     def encrypt(self, data, aad=None):
355         data = self.ike_crypto_alg.pad(data)
356         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
357
358     @property
359     def peer_authkey(self):
360         if self.is_initiator:
361             return self.sk_ar
362         return self.sk_ai
363
364     @property
365     def my_authkey(self):
366         if self.is_initiator:
367             return self.sk_ai
368         return self.sk_ar
369
370     @property
371     def my_cryptokey(self):
372         if self.is_initiator:
373             return self.sk_ei
374         return self.sk_er
375
376     @property
377     def peer_cryptokey(self):
378         if self.is_initiator:
379             return self.sk_er
380         return self.sk_ei
381
382     def concat(self, alg, key_len):
383         return alg + '-' + str(key_len * 8)
384
385     @property
386     def vpp_ike_cypto_alg(self):
387         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
388
389     @property
390     def vpp_esp_cypto_alg(self):
391         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
392
393     def verify_hmac(self, ikemsg):
394         integ_trunc = self.ike_integ_alg.trunc_len
395         exp_hmac = ikemsg[-integ_trunc:]
396         data = ikemsg[:-integ_trunc]
397         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
398                                           self.peer_authkey, data)
399         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
400
401     def compute_hmac(self, integ, key, data):
402         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
403         h.update(data)
404         return h.finalize()
405
406     def decrypt(self, data, aad=None, icv=None):
407         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
408
409     def hmac_and_decrypt(self, ike):
410         ep = ike[ikev2.IKEv2_payload_Encrypted]
411         if self.ike_crypto == 'AES-GCM-16ICV':
412             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
413             ct = ep.load[:-GCM_ICV_SIZE]
414             tag = ep.load[-GCM_ICV_SIZE:]
415             return self.decrypt(ct, raw(ike)[:aad_len], tag)
416         else:
417             self.verify_hmac(raw(ike))
418             integ_trunc = self.ike_integ_alg.trunc_len
419
420             # remove ICV and decrypt payload
421             ct = ep.load[:-integ_trunc]
422             return self.decrypt(ct)
423
424     def build_ts_addr(self, ts, version):
425         return {'starting_address_v' + version: ts['start_addr'],
426                 'ending_address_v' + version: ts['end_addr']}
427
428     def generate_ts(self, is_ip4):
429         c = self.child_sas[0]
430         ts_data = {'IP_protocol_ID': 0,
431                    'start_port': 0,
432                    'end_port': 0xffff}
433         if is_ip4:
434             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
435             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
436             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
437             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
438         else:
439             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
440             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
441             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
442             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
443         return ([ts1], [ts2])
444
445     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
446         if crypto not in CRYPTO_ALGOS:
447             raise TypeError('unsupported encryption algo %r' % crypto)
448         self.ike_crypto = crypto
449         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
450         self.ike_crypto_key_len = crypto_key_len
451
452         if integ not in AUTH_ALGOS:
453             raise TypeError('unsupported auth algo %r' % integ)
454         self.ike_integ = None if integ == 'NULL' else integ
455         self.ike_integ_alg = AUTH_ALGOS[integ]
456
457         if prf not in PRF_ALGOS:
458             raise TypeError('unsupported prf algo %r' % prf)
459         self.ike_prf = prf
460         self.ike_prf_alg = PRF_ALGOS[prf]
461         self.ike_dh = dh
462         self.ike_group = DH[self.ike_dh]
463
464     def set_esp_props(self, crypto, crypto_key_len, integ):
465         self.esp_crypto_key_len = crypto_key_len
466         if crypto not in CRYPTO_ALGOS:
467             raise TypeError('unsupported encryption algo %r' % crypto)
468         self.esp_crypto = crypto
469         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
470
471         if integ not in AUTH_ALGOS:
472             raise TypeError('unsupported auth algo %r' % integ)
473         self.esp_integ = None if integ == 'NULL' else integ
474         self.esp_integ_alg = AUTH_ALGOS[integ]
475
476     def crypto_attr(self, key_len):
477         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
478             return (0x800e << 16 | key_len << 3, 12)
479         else:
480             raise Exception('unsupported attribute type')
481
482     def ike_crypto_attr(self):
483         return self.crypto_attr(self.ike_crypto_key_len)
484
485     def esp_crypto_attr(self):
486         return self.crypto_attr(self.esp_crypto_key_len)
487
488     def compute_nat_sha1(self, ip, port):
489         data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
490         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
491         digest.update(data)
492         return digest.finalize()
493
494
495 class TemplateResponder(VppTestCase):
496     """ responder test template """
497
498     @classmethod
499     def setUpClass(cls):
500         import scapy.contrib.ikev2 as _ikev2
501         globals()['ikev2'] = _ikev2
502         super(TemplateResponder, cls).setUpClass()
503         cls.create_pg_interfaces(range(2))
504         for i in cls.pg_interfaces:
505             i.admin_up()
506             i.config_ip4()
507             i.resolve_arp()
508             i.config_ip6()
509             i.resolve_ndp()
510
511     @classmethod
512     def tearDownClass(cls):
513         super(TemplateResponder, cls).tearDownClass()
514
515     def setUp(self):
516         super(TemplateResponder, self).setUp()
517         self.config_tc()
518         self.p.add_vpp_config()
519         self.assertIsNotNone(self.p.query_vpp_config())
520         self.sa.generate_dh_data()
521         self.vapi.cli('ikev2 set logging level 4')
522         self.vapi.cli('event-lo clear')
523
524     def tearDown(self):
525         super(TemplateResponder, self).tearDown()
526         if self.sa.is_initiator:
527             self.initiate_del_sa()
528             r = self.vapi.ikev2_sa_dump()
529             self.assertEqual(len(r), 0)
530
531         self.p.remove_vpp_config()
532         self.assertIsNone(self.p.query_vpp_config())
533
534     def verify_del_sa(self, packet):
535         ih = self.get_ike_header(packet)
536         self.assertEqual(ih.id, self.sa.msg_id)
537         self.assertEqual(ih.exch_type, 37)  # exchange informational
538
539     def initiate_del_sa(self):
540         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
541                              flags='Initiator', exch_type='INFORMATIONAL',
542                              id=self.sa.new_msg_id())
543         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
544         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
545         packet = self.create_packet(self.pg0, ike_msg,
546                                     self.sa.sport, self.sa.dport,
547                                     self.sa.natt, self.ip6)
548         self.pg0.add_stream(packet)
549         self.pg0.enable_capture()
550         self.pg_start()
551         capture = self.pg0.get_capture(1)
552         self.verify_del_sa(capture[0])
553
554     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
555                       use_ip6=False):
556         if use_ip6:
557             src_ip = src_if.remote_ip6
558             dst_ip = src_if.local_ip6
559             ip_layer = IPv6
560         else:
561             src_ip = src_if.remote_ip4
562             dst_ip = src_if.local_ip4
563             ip_layer = IP
564         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
565                ip_layer(src=src_ip, dst=dst_ip) /
566                UDP(sport=sport, dport=dport))
567         if natt:
568             # insert non ESP marker
569             res = res / Raw(b'\x00' * 4)
570         return res / msg
571
572     def send_sa_init(self, behind_nat=False):
573         tr_attr = self.sa.ike_crypto_attr()
574         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
575                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
576                  key_length=tr_attr[0]) /
577                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
578                  transform_id=self.sa.ike_integ) /
579                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
580                  transform_id=self.sa.ike_prf_alg.name) /
581                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
582                  transform_id=self.sa.ike_dh))
583
584         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
585                  trans_nb=4, trans=trans))
586
587         if behind_nat:
588             next_payload = 'Notify'
589         else:
590             next_payload = None
591
592         self.sa.init_req_packet = (
593                 ikev2.IKEv2(init_SPI=self.sa.ispi,
594                             flags='Initiator', exch_type='IKE_SA_INIT') /
595                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
596                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
597                                        group=self.sa.ike_dh,
598                                        load=self.sa.dh_pub_key()) /
599                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
600                                           load=self.sa.i_nonce))
601
602         if behind_nat:
603             src_address = b'\x0a\x0a\x0a\x01'
604         else:
605             src_address = bytes(self.pg0.local_ip4, 'ascii')
606
607         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
608         dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
609                                            self.sa.sport)
610         nat_src_detection = ikev2.IKEv2_payload_Notify(
611                 type='NAT_DETECTION_SOURCE_IP', load=src_nat)
612         nat_dst_detection = ikev2.IKEv2_payload_Notify(
613                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
614         self.sa.init_req_packet = (self.sa.init_req_packet /
615                                    nat_src_detection /
616                                    nat_dst_detection)
617
618         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
619                                      self.sa.sport, self.sa.dport,
620                                      self.sa.natt, self.ip6)
621         self.pg0.add_stream(ike_msg)
622         self.pg0.enable_capture()
623         self.pg_start()
624         capture = self.pg0.get_capture(1)
625         self.verify_sa_init(capture[0])
626
627     def encrypt_ike_msg(self, header, plain, first_payload):
628         if self.sa.ike_crypto == 'AES-GCM-16ICV':
629             data = self.sa.ike_crypto_alg.pad(raw(plain))
630             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
631                 len(ikev2.IKEv2_payload_Encrypted())
632             tlen = plen + len(ikev2.IKEv2())
633
634             # prepare aad data
635             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
636                                                  length=plen)
637             header.length = tlen
638             res = header / sk_p
639             encr = self.sa.encrypt(raw(plain), raw(res))
640             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
641                                                  length=plen, load=encr)
642             res = header / sk_p
643         else:
644             encr = self.sa.encrypt(raw(plain))
645             trunc_len = self.sa.ike_integ_alg.trunc_len
646             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
647             tlen = plen + len(ikev2.IKEv2())
648
649             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
650                                                  length=plen, load=encr)
651             header.length = tlen
652             res = header / sk_p
653
654             integ_data = raw(res)
655             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
656                                              self.sa.my_authkey, integ_data)
657             res = res / Raw(hmac_data[:trunc_len])
658         assert(len(res) == tlen)
659         return res
660
661     def send_sa_auth(self):
662         tr_attr = self.sa.esp_crypto_attr()
663         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
664                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
665                  key_length=tr_attr[0]) /
666                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
667                  transform_id=self.sa.esp_integ) /
668                  ikev2.IKEv2_payload_Transform(
669                  transform_type='Extended Sequence Number',
670                  transform_id='No ESN') /
671                  ikev2.IKEv2_payload_Transform(
672                  transform_type='Extended Sequence Number',
673                  transform_id='ESN'))
674
675         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
676                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
677
678         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
679         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
680                  IDtype=self.sa.id_type, load=self.sa.i_id) /
681                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
682                  IDtype=self.sa.id_type, load=self.sa.r_id) /
683                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
684                  auth_type=AuthMethod.value(self.sa.auth_method),
685                  load=self.sa.auth_data) /
686                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
687                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
688                  number_of_TSs=len(tsi),
689                  traffic_selector=tsi) /
690                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
691                  number_of_TSs=len(tsr),
692                  traffic_selector=tsr) /
693                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
694
695         header = ikev2.IKEv2(
696                 init_SPI=self.sa.ispi,
697                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
698                 flags='Initiator', exch_type='IKE_AUTH')
699
700         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
701         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
702                                     self.sa.dport, self.sa.natt, self.ip6)
703         self.pg0.add_stream(packet)
704         self.pg0.enable_capture()
705         self.pg_start()
706         capture = self.pg0.get_capture(1)
707         self.verify_sa_auth(capture[0])
708
709     def get_ike_header(self, packet):
710         try:
711             ih = packet[ikev2.IKEv2]
712         except IndexError as e:
713             # this is a workaround for getting IKEv2 layer as both ikev2 and
714             # ipsec register for port 4500
715             esp = packet[ESP]
716             ih = self.verify_and_remove_non_esp_marker(esp)
717
718         self.assertEqual(ih.version, 0x20)
719         return ih
720
721     def verify_sa_init(self, packet):
722         ih = self.get_ike_header(packet)
723
724         self.assertEqual(ih.id, self.sa.msg_id)
725         self.assertEqual(ih.exch_type, 34)
726         self.assertTrue('Response' in ih.flags)
727         self.assertEqual(ih.init_SPI, self.sa.ispi)
728         self.assertNotEqual(ih.resp_SPI, 0)
729         self.sa.rspi = ih.resp_SPI
730         try:
731             sa = ih[ikev2.IKEv2_payload_SA]
732             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
733             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
734         except IndexError as e:
735             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
736             self.logger.error(ih.show())
737             raise
738         self.sa.complete_dh_data()
739         self.sa.calc_keys()
740         self.sa.auth_init()
741
742     def verify_and_remove_non_esp_marker(self, packet):
743         if self.sa.natt:
744             # if we are in nat traversal mode check for non esp marker
745             # and remove it
746             data = raw(packet)
747             self.assertEqual(data[:4], b'\x00' * 4)
748             return ikev2.IKEv2(data[4:])
749         else:
750             return packet
751
752     def verify_udp(self, udp):
753         self.assertEqual(udp.sport, self.sa.sport)
754         self.assertEqual(udp.dport, self.sa.dport)
755
756     def verify_sa_auth(self, packet):
757         ike = self.get_ike_header(packet)
758         udp = packet[UDP]
759         self.verify_udp(udp)
760         self.assertEqual(ike.id, self.sa.msg_id)
761         plain = self.sa.hmac_and_decrypt(ike)
762         self.sa.calc_child_keys()
763
764     def verify_ipsec_sas(self):
765         sas = self.vapi.ipsec_sa_dump()
766         self.assertEqual(len(sas), 2)
767         sa0 = sas[0].entry
768         sa1 = sas[1].entry
769         c = self.sa.child_sas[0]
770
771         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
772         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
773         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
774
775         if self.sa.esp_integ is None:
776             vpp_integ_alg = 0
777         else:
778             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
779         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
780         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
781
782         # verify crypto keys
783         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
784         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
785         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
786         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
787
788         # verify integ keys
789         if vpp_integ_alg:
790             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
791             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
792             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
793             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
794         else:
795             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
796             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
797
798     def verify_keymat(self, api_keys, keys, name):
799         km = getattr(keys, name)
800         api_km = getattr(api_keys, name)
801         api_km_len = getattr(api_keys, name + '_len')
802         self.assertEqual(len(km), api_km_len)
803         self.assertEqual(km, api_km[:api_km_len])
804
805     def verify_id(self, api_id, exp_id):
806         self.assertEqual(api_id.type, IDType.value(exp_id.type))
807         self.assertEqual(api_id.data_len, exp_id.data_len)
808         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
809
810     def verify_ike_sas(self):
811         r = self.vapi.ikev2_sa_dump()
812         self.assertEqual(len(r), 1)
813         sa = r[0].sa
814         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
815         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
816         if self.ip6:
817             self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
818             self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
819         else:
820             self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
821             self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
822         self.verify_keymat(sa.keys, self.sa, 'sk_d')
823         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
824         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
825         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
826         self.verify_keymat(sa.keys, self.sa, 'sk_er')
827         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
828         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
829
830         self.assertEqual(sa.i_id.type, self.sa.id_type)
831         self.assertEqual(sa.r_id.type, self.sa.id_type)
832         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
833         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
834         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
835         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
836
837         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
838         self.assertEqual(len(r), 1)
839         csa = r[0].child_sa
840         self.assertEqual(csa.sa_index, sa.sa_index)
841         c = self.sa.child_sas[0]
842         if hasattr(c, 'sk_ai'):
843             self.verify_keymat(csa.keys, c, 'sk_ai')
844             self.verify_keymat(csa.keys, c, 'sk_ar')
845         self.verify_keymat(csa.keys, c, 'sk_ei')
846         self.verify_keymat(csa.keys, c, 'sk_er')
847
848         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
849         tsi = tsi[0]
850         tsr = tsr[0]
851         r = self.vapi.ikev2_traffic_selector_dump(
852                 is_initiator=True, sa_index=sa.sa_index,
853                 child_sa_index=csa.child_sa_index)
854         self.assertEqual(len(r), 1)
855         ts = r[0].ts
856         self.verify_ts(r[0].ts, tsi[0], True)
857
858         r = self.vapi.ikev2_traffic_selector_dump(
859                 is_initiator=False, sa_index=sa.sa_index,
860                 child_sa_index=csa.child_sa_index)
861         self.assertEqual(len(r), 1)
862         self.verify_ts(r[0].ts, tsr[0], False)
863
864         n = self.vapi.ikev2_nonce_get(is_initiator=True,
865                                       sa_index=sa.sa_index)
866         self.verify_nonce(n, self.sa.i_nonce)
867         n = self.vapi.ikev2_nonce_get(is_initiator=False,
868                                       sa_index=sa.sa_index)
869         self.verify_nonce(n, self.sa.r_nonce)
870
871     def verify_nonce(self, api_nonce, nonce):
872         self.assertEqual(api_nonce.data_len, len(nonce))
873         self.assertEqual(api_nonce.nonce, nonce)
874
875     def verify_ts(self, api_ts, ts, is_initiator):
876         if is_initiator:
877             self.assertTrue(api_ts.is_local)
878         else:
879             self.assertFalse(api_ts.is_local)
880
881         if self.p.ts_is_ip4:
882             self.assertEqual(api_ts.start_addr,
883                              IPv4Address(ts.starting_address_v4))
884             self.assertEqual(api_ts.end_addr,
885                              IPv4Address(ts.ending_address_v4))
886         else:
887             self.assertEqual(api_ts.start_addr,
888                              IPv6Address(ts.starting_address_v6))
889             self.assertEqual(api_ts.end_addr,
890                              IPv6Address(ts.ending_address_v6))
891         self.assertEqual(api_ts.start_port, ts.start_port)
892         self.assertEqual(api_ts.end_port, ts.end_port)
893         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
894
895     def test_responder(self):
896         self.send_sa_init(self.sa.natt)
897         self.send_sa_auth()
898         self.verify_ipsec_sas()
899         self.verify_ike_sas()
900
901
902 class Ikev2Params(object):
903     def config_params(self, params={}):
904         ec = VppEnum.vl_api_ipsec_crypto_alg_t
905         ei = VppEnum.vl_api_ipsec_integ_alg_t
906         self.vpp_enums = {
907                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
908                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
909                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
910                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
911                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
912                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
913
914                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
915                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
916                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
917                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
918
919         is_natt = 'natt' in params and params['natt'] or False
920         self.p = Profile(self, 'pr1')
921         self.ip6 = False if 'ip6' not in params else params['ip6']
922
923         if 'auth' in params and params['auth'] == 'rsa-sig':
924             auth_method = 'rsa-sig'
925             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
926             self.vapi.ikev2_set_local_key(
927                     key_file=work_dir + params['server-key'])
928
929             client_file = work_dir + params['client-cert']
930             server_pem = open(work_dir + params['server-cert']).read()
931             client_priv = open(work_dir + params['client-key']).read()
932             client_priv = load_pem_private_key(str.encode(client_priv), None,
933                                                default_backend())
934             self.peer_cert = x509.load_pem_x509_certificate(
935                     str.encode(server_pem),
936                     default_backend())
937             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
938             auth_data = None
939         else:
940             auth_data = b'$3cr3tpa$$w0rd'
941             self.p.add_auth(method='shared-key', data=auth_data)
942             auth_method = 'shared-key'
943             client_priv = None
944
945         self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
946         self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
947         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
948             'loc_ts' not in params else params['loc_ts']
949         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
950             'rem_ts' not in params else params['rem_ts']
951         self.p.add_local_ts(**loc_ts)
952         self.p.add_remote_ts(**rem_ts)
953
954         self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
955                           r_id=self.p.local_id['data'],
956                           id_type=self.p.local_id['id_type'], natt=is_natt,
957                           priv_key=client_priv, auth_method=auth_method,
958                           auth_data=auth_data,
959                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
960
961         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
962             params['ike-crypto']
963         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
964             params['ike-integ']
965         ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
966
967         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
968             params['esp-crypto']
969         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
970             params['esp-integ']
971
972         self.sa.set_ike_props(
973                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
974                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
975         self.sa.set_esp_props(
976                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
977                 integ=esp_integ)
978
979
980 class TestApi(VppTestCase):
981     """ Test IKEV2 API """
982     @classmethod
983     def setUpClass(cls):
984         super(TestApi, cls).setUpClass()
985
986     @classmethod
987     def tearDownClass(cls):
988         super(TestApi, cls).tearDownClass()
989
990     def tearDown(self):
991         super(TestApi, self).tearDown()
992         self.p1.remove_vpp_config()
993         self.p2.remove_vpp_config()
994         r = self.vapi.ikev2_profile_dump()
995         self.assertEqual(len(r), 0)
996
997     def configure_profile(self, cfg):
998         p = Profile(self, cfg['name'])
999         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1000         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1001         p.add_local_ts(**cfg['loc_ts'])
1002         p.add_remote_ts(**cfg['rem_ts'])
1003         p.add_responder(cfg['responder'])
1004         p.add_ike_transforms(cfg['ike_ts'])
1005         p.add_esp_transforms(cfg['esp_ts'])
1006         p.add_auth(**cfg['auth'])
1007         p.set_udp_encap(cfg['udp_encap'])
1008         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1009         if 'lifetime_data' in cfg:
1010             p.set_lifetime_data(cfg['lifetime_data'])
1011         if 'tun_itf' in cfg:
1012             p.set_tunnel_interface(cfg['tun_itf'])
1013         p.add_vpp_config()
1014         return p
1015
1016     def test_profile_api(self):
1017         """ test profile dump API """
1018         loc_ts4 = {
1019                     'proto': 8,
1020                     'start_port': 1,
1021                     'end_port': 19,
1022                     'start_addr': '3.3.3.2',
1023                     'end_addr': '3.3.3.3',
1024                 }
1025         rem_ts4 = {
1026                     'proto': 9,
1027                     'start_port': 10,
1028                     'end_port': 119,
1029                     'start_addr': '4.5.76.80',
1030                     'end_addr': '2.3.4.6',
1031                 }
1032
1033         loc_ts6 = {
1034                     'proto': 8,
1035                     'start_port': 1,
1036                     'end_port': 19,
1037                     'start_addr': 'ab::1',
1038                     'end_addr': 'ab::4',
1039                 }
1040         rem_ts6 = {
1041                     'proto': 9,
1042                     'start_port': 10,
1043                     'end_port': 119,
1044                     'start_addr': 'cd::12',
1045                     'end_addr': 'cd::13',
1046                 }
1047
1048         conf = {
1049             'p1': {
1050                 'name': 'p1',
1051                 'loc_id': ('fqdn', b'vpp.home'),
1052                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1053                 'loc_ts': loc_ts4,
1054                 'rem_ts': rem_ts4,
1055                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1056                 'ike_ts': {
1057                         'crypto_alg': 20,
1058                         'crypto_key_size': 32,
1059                         'integ_alg': 1,
1060                         'dh_group': 1},
1061                 'esp_ts': {
1062                         'crypto_alg': 13,
1063                         'crypto_key_size': 24,
1064                         'integ_alg': 2},
1065                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1066                 'udp_encap': True,
1067                 'ipsec_over_udp_port': 4501,
1068                 'lifetime_data': {
1069                     'lifetime': 123,
1070                     'lifetime_maxdata': 20192,
1071                     'lifetime_jitter': 9,
1072                     'handover': 132},
1073             },
1074             'p2': {
1075                 'name': 'p2',
1076                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1077                 'rem_id': ('ip6-addr', b'abcd::1'),
1078                 'loc_ts': loc_ts6,
1079                 'rem_ts': rem_ts6,
1080                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1081                 'ike_ts': {
1082                         'crypto_alg': 12,
1083                         'crypto_key_size': 16,
1084                         'integ_alg': 3,
1085                         'dh_group': 3},
1086                 'esp_ts': {
1087                         'crypto_alg': 9,
1088                         'crypto_key_size': 24,
1089                         'integ_alg': 4},
1090                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1091                 'udp_encap': False,
1092                 'ipsec_over_udp_port': 4600,
1093                 'tun_itf': 0}
1094         }
1095         self.p1 = self.configure_profile(conf['p1'])
1096         self.p2 = self.configure_profile(conf['p2'])
1097
1098         r = self.vapi.ikev2_profile_dump()
1099         self.assertEqual(len(r), 2)
1100         self.verify_profile(r[0].profile, conf['p1'])
1101         self.verify_profile(r[1].profile, conf['p2'])
1102
1103     def verify_id(self, api_id, cfg_id):
1104         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1105         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1106
1107     def verify_ts(self, api_ts, cfg_ts):
1108         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1109         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1110         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1111         self.assertEqual(api_ts.start_addr,
1112                          ip_address(text_type(cfg_ts['start_addr'])))
1113         self.assertEqual(api_ts.end_addr,
1114                          ip_address(text_type(cfg_ts['end_addr'])))
1115
1116     def verify_responder(self, api_r, cfg_r):
1117         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1118         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1119
1120     def verify_transforms(self, api_ts, cfg_ts):
1121         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1122         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1123         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1124
1125     def verify_ike_transforms(self, api_ts, cfg_ts):
1126         self.verify_transforms(api_ts, cfg_ts)
1127         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1128
1129     def verify_esp_transforms(self, api_ts, cfg_ts):
1130         self.verify_transforms(api_ts, cfg_ts)
1131
1132     def verify_auth(self, api_auth, cfg_auth):
1133         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1134         self.assertEqual(api_auth.data, cfg_auth['data'])
1135         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1136
1137     def verify_lifetime_data(self, p, ld):
1138         self.assertEqual(p.lifetime, ld['lifetime'])
1139         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1140         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1141         self.assertEqual(p.handover, ld['handover'])
1142
1143     def verify_profile(self, ap, cp):
1144         self.assertEqual(ap.name, cp['name'])
1145         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1146         self.verify_id(ap.loc_id, cp['loc_id'])
1147         self.verify_id(ap.rem_id, cp['rem_id'])
1148         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1149         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1150         self.verify_responder(ap.responder, cp['responder'])
1151         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1152         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1153         self.verify_auth(ap.auth, cp['auth'])
1154         if 'lifetime_data' in cp:
1155             self.verify_lifetime_data(ap, cp['lifetime_data'])
1156         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1157         if 'tun_itf' in cp:
1158             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1159         else:
1160             self.assertEqual(ap.tun_itf, 0xffffffff)
1161
1162
1163 class TestResponderNATT(TemplateResponder, Ikev2Params):
1164     """ test ikev2 responder - nat traversal """
1165     def config_tc(self):
1166         self.config_params(
1167                 {'natt': True})
1168
1169
1170 class TestResponderPsk(TemplateResponder, Ikev2Params):
1171     """ test ikev2 responder - pre shared key auth """
1172     def config_tc(self):
1173         self.config_params()
1174
1175
1176 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1177     """ test ikev2 responder - cert based auth """
1178     def config_tc(self):
1179         self.config_params({
1180             'auth': 'rsa-sig',
1181             'server-key': 'server-key.pem',
1182             'client-key': 'client-key.pem',
1183             'client-cert': 'client-cert.pem',
1184             'server-cert': 'server-cert.pem'})
1185
1186
1187 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1188         (TemplateResponder, Ikev2Params):
1189     """
1190     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1191     """
1192     def config_tc(self):
1193         self.config_params({
1194             'ike-crypto': ('AES-CBC', 16),
1195             'ike-integ': 'SHA2-256-128',
1196             'esp-crypto': ('AES-CBC', 24),
1197             'esp-integ': 'SHA2-384-192',
1198             'ike-dh': '2048MODPgr'})
1199
1200
1201 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1202         (TemplateResponder, Ikev2Params):
1203     """
1204     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1205     """
1206     def config_tc(self):
1207         self.config_params({
1208             'ike-crypto': ('AES-CBC', 32),
1209             'ike-integ': 'SHA2-256-128',
1210             'esp-crypto': ('AES-GCM-16ICV', 32),
1211             'esp-integ': 'NULL',
1212             'ike-dh': '3072MODPgr'})
1213
1214
1215 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1216     """
1217     IKE:AES_GCM_16_256
1218     """
1219     def config_tc(self):
1220         self.config_params({
1221             'ip6': True,
1222             'natt': True,
1223             'ike-crypto': ('AES-GCM-16ICV', 32),
1224             'ike-integ': 'NULL',
1225             'ike-dh': '2048MODPgr',
1226             'loc_ts': {'start_addr': 'ab:cd::0',
1227                        'end_addr': 'ab:cd::10'},
1228             'rem_ts': {'start_addr': '11::0',
1229                        'end_addr': '11::100'}})
1230
1231
1232 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1233     """ malformed packet test """
1234
1235     def tearDown(self):
1236         pass
1237
1238     def config_tc(self):
1239         self.config_params()
1240
1241     def assert_counter(self, count, name, version='ip4'):
1242         node_name = '/err/ikev2-%s/' % version + name
1243         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1244
1245     def create_ike_init_msg(self, length=None, payload=None):
1246         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1247                           flags='Initiator', exch_type='IKE_SA_INIT')
1248         if payload is not None:
1249             msg /= payload
1250         return self.create_packet(self.pg0, msg, self.sa.sport,
1251                                   self.sa.dport)
1252
1253     def verify_bad_packet_length(self):
1254         ike_msg = self.create_ike_init_msg(length=0xdead)
1255         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1256         self.assert_counter(self.pkt_count, 'Bad packet length')
1257
1258     def verify_bad_sa_payload_length(self):
1259         p = ikev2.IKEv2_payload_SA(length=0xdead)
1260         ike_msg = self.create_ike_init_msg(payload=p)
1261         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1262         self.assert_counter(self.pkt_count, 'Malformed packet')
1263
1264     def test_responder(self):
1265         self.pkt_count = 254
1266         self.verify_bad_packet_length()
1267         self.verify_bad_sa_payload_length()
1268
1269
1270 if __name__ == '__main__':
1271     unittest.main(testRunner=VppTestRunner)