ikev2: add per SA stats
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
24
25 try:
26     text_type = unicode
27 except NameError:
28     text_type = str
29
30 KEY_PAD = b"Key Pad for IKEv2"
31 SALT_SIZE = 4
32 GCM_ICV_SIZE = 16
33 GCM_IV_SIZE = 8
34
35
36 # defined in rfc3526
37 # tuple structure is (p, g, key_len)
38 DH = {
39     '2048MODPgr': (long_converter("""
40     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
51
52     '3072MODPgr': (long_converter("""
53     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
69 }
70
71
72 class CryptoAlgo(object):
73     def __init__(self, name, cipher, mode):
74         self.name = name
75         self.cipher = cipher
76         self.mode = mode
77         if self.cipher is not None:
78             self.bs = self.cipher.block_size // 8
79
80             if self.name == 'AES-GCM-16ICV':
81                 self.iv_len = GCM_IV_SIZE
82             else:
83                 self.iv_len = self.bs
84
85     def encrypt(self, data, key, aad=None):
86         iv = os.urandom(self.iv_len)
87         if aad is None:
88             encryptor = Cipher(self.cipher(key), self.mode(iv),
89                                default_backend()).encryptor()
90             return iv + encryptor.update(data) + encryptor.finalize()
91         else:
92             salt = key[-SALT_SIZE:]
93             nonce = salt + iv
94             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95                                default_backend()).encryptor()
96             encryptor.authenticate_additional_data(aad)
97             data = encryptor.update(data) + encryptor.finalize()
98             data += encryptor.tag[:GCM_ICV_SIZE]
99             return iv + data
100
101     def decrypt(self, data, key, aad=None, icv=None):
102         if aad is None:
103             iv = data[:self.iv_len]
104             ct = data[self.iv_len:]
105             decryptor = Cipher(algorithms.AES(key),
106                                self.mode(iv),
107                                default_backend()).decryptor()
108             return decryptor.update(ct) + decryptor.finalize()
109         else:
110             salt = key[-SALT_SIZE:]
111             nonce = salt + data[:GCM_IV_SIZE]
112             ct = data[GCM_IV_SIZE:]
113             key = key[:-SALT_SIZE]
114             decryptor = Cipher(algorithms.AES(key),
115                                self.mode(nonce, icv, len(icv)),
116                                default_backend()).decryptor()
117             decryptor.authenticate_additional_data(aad)
118             return decryptor.update(ct) + decryptor.finalize()
119
120     def pad(self, data):
121         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122         data = data + b'\x00' * (pad_len - 1)
123         return data + bytes([pad_len - 1])
124
125
126 class AuthAlgo(object):
127     def __init__(self, name, mac, mod, key_len, trunc_len=None):
128         self.name = name
129         self.mac = mac
130         self.mod = mod
131         self.key_len = key_len
132         self.trunc_len = trunc_len or key_len
133
134
135 CRYPTO_ALGOS = {
136     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
139                                 mode=modes.GCM),
140 }
141
142 AUTH_ALGOS = {
143     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
148 }
149
150 PRF_ALGOS = {
151     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
153                                   hashes.SHA256, 32),
154 }
155
156 CRYPTO_IDS = {
157     12: 'AES-CBC',
158     20: 'AES-GCM-16ICV',
159 }
160
161 INTEG_IDS = {
162     2: 'HMAC-SHA1-96',
163     12: 'SHA2-256-128',
164     13: 'SHA2-384-192',
165     14: 'SHA2-512-256',
166 }
167
168
169 class IKEv2ChildSA(object):
170     def __init__(self, local_ts, remote_ts, is_initiator):
171         spi = os.urandom(4)
172         if is_initiator:
173             self.ispi = spi
174             self.rspi = None
175         else:
176             self.rspi = spi
177             self.ispi = None
178         self.local_ts = local_ts
179         self.remote_ts = remote_ts
180
181
182 class IKEv2SA(object):
183     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186                  auth_method='shared-key', priv_key=None, i_natt=False,
187                  r_natt=False, udp_encap=False):
188         self.udp_encap = udp_encap
189         self.i_natt = i_natt
190         self.r_natt = r_natt
191         if i_natt or r_natt:
192             self.sport = 4500
193             self.dport = 4500
194         else:
195             self.sport = 500
196             self.dport = 500
197         self.msg_id = 0
198         self.dh_params = None
199         self.test = test
200         self.priv_key = priv_key
201         self.is_initiator = is_initiator
202         nonce = nonce or os.urandom(32)
203         self.auth_data = auth_data
204         self.i_id = i_id
205         self.r_id = r_id
206         if isinstance(id_type, str):
207             self.id_type = IDType.value(id_type)
208         else:
209             self.id_type = id_type
210         self.auth_method = auth_method
211         if self.is_initiator:
212             self.rspi = 8 * b'\x00'
213             self.ispi = spi
214             self.i_nonce = nonce
215         else:
216             self.rspi = spi
217             self.ispi = 8 * b'\x00'
218             self.r_nonce = nonce
219         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
220                           self.is_initiator)]
221
222     def new_msg_id(self):
223         self.msg_id += 1
224         return self.msg_id
225
226     @property
227     def my_dh_pub_key(self):
228         if self.is_initiator:
229             return self.i_dh_data
230         return self.r_dh_data
231
232     @property
233     def peer_dh_pub_key(self):
234         if self.is_initiator:
235             return self.r_dh_data
236         return self.i_dh_data
237
238     @property
239     def natt(self):
240         return self.i_natt or self.r_natt
241
242     def compute_secret(self):
243         priv = self.dh_private_key
244         peer = self.peer_dh_pub_key
245         p, g, l = self.ike_group
246         return pow(int.from_bytes(peer, 'big'),
247                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
248
249     def generate_dh_data(self):
250         # generate DH keys
251         if self.ike_dh not in DH:
252             raise NotImplementedError('%s not in DH group' % self.ike_dh)
253
254         if self.dh_params is None:
255             dhg = DH[self.ike_dh]
256             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
257             self.dh_params = pn.parameters(default_backend())
258
259         priv = self.dh_params.generate_private_key()
260         pub = priv.public_key()
261         x = priv.private_numbers().x
262         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
263         y = pub.public_numbers().y
264
265         if self.is_initiator:
266             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
267         else:
268             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
269
270     def complete_dh_data(self):
271         self.dh_shared_secret = self.compute_secret()
272
273     def calc_child_keys(self):
274         prf = self.ike_prf_alg.mod()
275         s = self.i_nonce + self.r_nonce
276         c = self.child_sas[0]
277
278         encr_key_len = self.esp_crypto_key_len
279         integ_key_len = self.esp_integ_alg.key_len
280         salt_len = 0 if integ_key_len else 4
281
282         l = (integ_key_len * 2 +
283              encr_key_len * 2 +
284              salt_len * 2)
285         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
286
287         pos = 0
288         c.sk_ei = keymat[pos:pos+encr_key_len]
289         pos += encr_key_len
290
291         if integ_key_len:
292             c.sk_ai = keymat[pos:pos+integ_key_len]
293             pos += integ_key_len
294         else:
295             c.salt_ei = keymat[pos:pos+salt_len]
296             pos += salt_len
297
298         c.sk_er = keymat[pos:pos+encr_key_len]
299         pos += encr_key_len
300
301         if integ_key_len:
302             c.sk_ar = keymat[pos:pos+integ_key_len]
303             pos += integ_key_len
304         else:
305             c.salt_er = keymat[pos:pos+salt_len]
306             pos += salt_len
307
308     def calc_prfplus(self, prf, key, seed, length):
309         r = b''
310         t = None
311         x = 1
312         while len(r) < length and x < 255:
313             if t is not None:
314                 s = t
315             else:
316                 s = b''
317             s = s + seed + bytes([x])
318             t = self.calc_prf(prf, key, s)
319             r = r + t
320             x = x + 1
321
322         if x == 255:
323             return None
324         return r
325
326     def calc_prf(self, prf, key, data):
327         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
328         h.update(data)
329         return h.finalize()
330
331     def calc_keys(self):
332         prf = self.ike_prf_alg.mod()
333         # SKEYSEED = prf(Ni | Nr, g^ir)
334         s = self.i_nonce + self.r_nonce
335         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
336
337         # calculate S = Ni | Nr | SPIi SPIr
338         s = s + self.ispi + self.rspi
339
340         prf_key_trunc = self.ike_prf_alg.trunc_len
341         encr_key_len = self.ike_crypto_key_len
342         tr_prf_key_len = self.ike_prf_alg.key_len
343         integ_key_len = self.ike_integ_alg.key_len
344         if integ_key_len == 0:
345             salt_size = 4
346         else:
347             salt_size = 0
348
349         l = (prf_key_trunc +
350              integ_key_len * 2 +
351              encr_key_len * 2 +
352              tr_prf_key_len * 2 +
353              salt_size * 2)
354         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
355
356         pos = 0
357         self.sk_d = keymat[:pos+prf_key_trunc]
358         pos += prf_key_trunc
359
360         self.sk_ai = keymat[pos:pos+integ_key_len]
361         pos += integ_key_len
362         self.sk_ar = keymat[pos:pos+integ_key_len]
363         pos += integ_key_len
364
365         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
366         pos += encr_key_len + salt_size
367         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
368         pos += encr_key_len + salt_size
369
370         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
371         pos += tr_prf_key_len
372         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
373
374     def generate_authmsg(self, prf, packet):
375         if self.is_initiator:
376             id = self.i_id
377             nonce = self.r_nonce
378             key = self.sk_pi
379         else:
380             id = self.r_id
381             nonce = self.i_nonce
382             key = self.sk_pr
383         data = bytes([self.id_type, 0, 0, 0]) + id
384         id_hash = self.calc_prf(prf, key, data)
385         return packet + nonce + id_hash
386
387     def auth_init(self):
388         prf = self.ike_prf_alg.mod()
389         if self.is_initiator:
390             packet = self.init_req_packet
391         else:
392             packet = self.init_resp_packet
393         authmsg = self.generate_authmsg(prf, raw(packet))
394         if self.auth_method == 'shared-key':
395             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
396             self.auth_data = self.calc_prf(prf, psk, authmsg)
397         elif self.auth_method == 'rsa-sig':
398             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
399                                                 hashes.SHA1())
400         else:
401             raise TypeError('unknown auth method type!')
402
403     def encrypt(self, data, aad=None):
404         data = self.ike_crypto_alg.pad(data)
405         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
406
407     @property
408     def peer_authkey(self):
409         if self.is_initiator:
410             return self.sk_ar
411         return self.sk_ai
412
413     @property
414     def my_authkey(self):
415         if self.is_initiator:
416             return self.sk_ai
417         return self.sk_ar
418
419     @property
420     def my_cryptokey(self):
421         if self.is_initiator:
422             return self.sk_ei
423         return self.sk_er
424
425     @property
426     def peer_cryptokey(self):
427         if self.is_initiator:
428             return self.sk_er
429         return self.sk_ei
430
431     def concat(self, alg, key_len):
432         return alg + '-' + str(key_len * 8)
433
434     @property
435     def vpp_ike_cypto_alg(self):
436         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
437
438     @property
439     def vpp_esp_cypto_alg(self):
440         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
441
442     def verify_hmac(self, ikemsg):
443         integ_trunc = self.ike_integ_alg.trunc_len
444         exp_hmac = ikemsg[-integ_trunc:]
445         data = ikemsg[:-integ_trunc]
446         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
447                                           self.peer_authkey, data)
448         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
449
450     def compute_hmac(self, integ, key, data):
451         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
452         h.update(data)
453         return h.finalize()
454
455     def decrypt(self, data, aad=None, icv=None):
456         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
457
458     def hmac_and_decrypt(self, ike):
459         ep = ike[ikev2.IKEv2_payload_Encrypted]
460         if self.ike_crypto == 'AES-GCM-16ICV':
461             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
462             ct = ep.load[:-GCM_ICV_SIZE]
463             tag = ep.load[-GCM_ICV_SIZE:]
464             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
465         else:
466             self.verify_hmac(raw(ike))
467             integ_trunc = self.ike_integ_alg.trunc_len
468
469             # remove ICV and decrypt payload
470             ct = ep.load[:-integ_trunc]
471             plain = self.decrypt(ct)
472         # remove padding
473         pad_len = plain[-1]
474         return plain[:-pad_len - 1]
475
476     def build_ts_addr(self, ts, version):
477         return {'starting_address_v' + version: ts['start_addr'],
478                 'ending_address_v' + version: ts['end_addr']}
479
480     def generate_ts(self, is_ip4):
481         c = self.child_sas[0]
482         ts_data = {'IP_protocol_ID': 0,
483                    'start_port': 0,
484                    'end_port': 0xffff}
485         if is_ip4:
486             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
487             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
488             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
489             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
490         else:
491             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
492             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
493             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
494             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
495
496         if self.is_initiator:
497             return ([ts1], [ts2])
498         return ([ts2], [ts1])
499
500     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
501         if crypto not in CRYPTO_ALGOS:
502             raise TypeError('unsupported encryption algo %r' % crypto)
503         self.ike_crypto = crypto
504         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
505         self.ike_crypto_key_len = crypto_key_len
506
507         if integ not in AUTH_ALGOS:
508             raise TypeError('unsupported auth algo %r' % integ)
509         self.ike_integ = None if integ == 'NULL' else integ
510         self.ike_integ_alg = AUTH_ALGOS[integ]
511
512         if prf not in PRF_ALGOS:
513             raise TypeError('unsupported prf algo %r' % prf)
514         self.ike_prf = prf
515         self.ike_prf_alg = PRF_ALGOS[prf]
516         self.ike_dh = dh
517         self.ike_group = DH[self.ike_dh]
518
519     def set_esp_props(self, crypto, crypto_key_len, integ):
520         self.esp_crypto_key_len = crypto_key_len
521         if crypto not in CRYPTO_ALGOS:
522             raise TypeError('unsupported encryption algo %r' % crypto)
523         self.esp_crypto = crypto
524         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
525
526         if integ not in AUTH_ALGOS:
527             raise TypeError('unsupported auth algo %r' % integ)
528         self.esp_integ = None if integ == 'NULL' else integ
529         self.esp_integ_alg = AUTH_ALGOS[integ]
530
531     def crypto_attr(self, key_len):
532         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
533             return (0x800e << 16 | key_len << 3, 12)
534         else:
535             raise Exception('unsupported attribute type')
536
537     def ike_crypto_attr(self):
538         return self.crypto_attr(self.ike_crypto_key_len)
539
540     def esp_crypto_attr(self):
541         return self.crypto_attr(self.esp_crypto_key_len)
542
543     def compute_nat_sha1(self, ip, port, rspi=None):
544         if rspi is None:
545             rspi = self.rspi
546         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
547         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
548         digest.update(data)
549         return digest.finalize()
550
551
552 class IkePeer(VppTestCase):
553     """ common class for initiator and responder """
554
555     @classmethod
556     def setUpClass(cls):
557         import scapy.contrib.ikev2 as _ikev2
558         globals()['ikev2'] = _ikev2
559         super(IkePeer, cls).setUpClass()
560         cls.create_pg_interfaces(range(2))
561         for i in cls.pg_interfaces:
562             i.admin_up()
563             i.config_ip4()
564             i.resolve_arp()
565             i.config_ip6()
566             i.resolve_ndp()
567
568     @classmethod
569     def tearDownClass(cls):
570         super(IkePeer, cls).tearDownClass()
571
572     def tearDown(self):
573         super(IkePeer, self).tearDown()
574         if self.del_sa_from_responder:
575             self.initiate_del_sa_from_responder()
576         else:
577             self.initiate_del_sa_from_initiator()
578         r = self.vapi.ikev2_sa_dump()
579         self.assertEqual(len(r), 0)
580         sas = self.vapi.ipsec_sa_dump()
581         self.assertEqual(len(sas), 0)
582         self.p.remove_vpp_config()
583         self.assertIsNone(self.p.query_vpp_config())
584
585     def setUp(self):
586         super(IkePeer, self).setUp()
587         self.config_tc()
588         self.p.add_vpp_config()
589         self.assertIsNotNone(self.p.query_vpp_config())
590         if self.sa.is_initiator:
591             self.sa.generate_dh_data()
592         self.vapi.cli('ikev2 set logging level 4')
593         self.vapi.cli('event-lo clear')
594
595     def assert_counter(self, count, name, version='ip4'):
596         node_name = '/err/ikev2-%s/' % version + name
597         self.assertEqual(count, self.statistics.get_err_counter(node_name))
598
599     def create_rekey_request(self):
600         sa, first_payload = self.generate_auth_payload(is_rekey=True)
601         header = ikev2.IKEv2(
602                 init_SPI=self.sa.ispi,
603                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
604                 flags='Initiator', exch_type='CREATE_CHILD_SA')
605
606         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
607         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
608                                   self.sa.dport, self.sa.natt, self.ip6)
609
610     def create_empty_request(self):
611         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
612                              id=self.sa.new_msg_id(), flags='Initiator',
613                              exch_type='INFORMATIONAL',
614                              next_payload='Encrypted')
615
616         msg = self.encrypt_ike_msg(header, b'', None)
617         return self.create_packet(self.pg0, msg, self.sa.sport,
618                                   self.sa.dport, self.sa.natt, self.ip6)
619
620     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
621                       use_ip6=False):
622         if use_ip6:
623             src_ip = src_if.remote_ip6
624             dst_ip = src_if.local_ip6
625             ip_layer = IPv6
626         else:
627             src_ip = src_if.remote_ip4
628             dst_ip = src_if.local_ip4
629             ip_layer = IP
630         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
631                ip_layer(src=src_ip, dst=dst_ip) /
632                UDP(sport=sport, dport=dport))
633         if natt:
634             # insert non ESP marker
635             res = res / Raw(b'\x00' * 4)
636         return res / msg
637
638     def verify_udp(self, udp):
639         self.assertEqual(udp.sport, self.sa.sport)
640         self.assertEqual(udp.dport, self.sa.dport)
641
642     def get_ike_header(self, packet):
643         try:
644             ih = packet[ikev2.IKEv2]
645             ih = self.verify_and_remove_non_esp_marker(ih)
646         except IndexError as e:
647             # this is a workaround for getting IKEv2 layer as both ikev2 and
648             # ipsec register for port 4500
649             esp = packet[ESP]
650             ih = self.verify_and_remove_non_esp_marker(esp)
651         self.assertEqual(ih.version, 0x20)
652         self.assertNotIn('Version', ih.flags)
653         return ih
654
655     def verify_and_remove_non_esp_marker(self, packet):
656         if self.sa.natt:
657             # if we are in nat traversal mode check for non esp marker
658             # and remove it
659             data = raw(packet)
660             self.assertEqual(data[:4], b'\x00' * 4)
661             return ikev2.IKEv2(data[4:])
662         else:
663             return packet
664
665     def encrypt_ike_msg(self, header, plain, first_payload):
666         if self.sa.ike_crypto == 'AES-GCM-16ICV':
667             data = self.sa.ike_crypto_alg.pad(raw(plain))
668             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
669                 len(ikev2.IKEv2_payload_Encrypted())
670             tlen = plen + len(ikev2.IKEv2())
671
672             # prepare aad data
673             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
674                                                  length=plen)
675             header.length = tlen
676             res = header / sk_p
677             encr = self.sa.encrypt(raw(plain), raw(res))
678             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
679                                                  length=plen, load=encr)
680             res = header / sk_p
681         else:
682             encr = self.sa.encrypt(raw(plain))
683             trunc_len = self.sa.ike_integ_alg.trunc_len
684             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
685             tlen = plen + len(ikev2.IKEv2())
686
687             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
688                                                  length=plen, load=encr)
689             header.length = tlen
690             res = header / sk_p
691
692             integ_data = raw(res)
693             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
694                                              self.sa.my_authkey, integ_data)
695             res = res / Raw(hmac_data[:trunc_len])
696         assert(len(res) == tlen)
697         return res
698
699     def verify_udp_encap(self, ipsec_sa):
700         e = VppEnum.vl_api_ipsec_sad_flags_t
701         if self.sa.udp_encap or self.sa.natt:
702             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
703         else:
704             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
705
706     def verify_ipsec_sas(self, is_rekey=False):
707         sas = self.vapi.ipsec_sa_dump()
708         if is_rekey:
709             # after rekey there is a short period of time in which old
710             # inbound SA is still present
711             sa_count = 3
712         else:
713             sa_count = 2
714         self.assertEqual(len(sas), sa_count)
715         if self.sa.is_initiator:
716             if is_rekey:
717                 sa0 = sas[0].entry
718                 sa1 = sas[2].entry
719             else:
720                 sa0 = sas[0].entry
721                 sa1 = sas[1].entry
722         else:
723             if is_rekey:
724                 sa0 = sas[2].entry
725                 sa1 = sas[0].entry
726             else:
727                 sa1 = sas[0].entry
728                 sa0 = sas[1].entry
729
730         c = self.sa.child_sas[0]
731
732         self.verify_udp_encap(sa0)
733         self.verify_udp_encap(sa1)
734         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
735         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
736         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
737
738         if self.sa.esp_integ is None:
739             vpp_integ_alg = 0
740         else:
741             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
742         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
743         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
744
745         # verify crypto keys
746         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
747         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
748         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
749         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
750
751         # verify integ keys
752         if vpp_integ_alg:
753             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
754             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
755             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
756             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
757         else:
758             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
759             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
760
761     def verify_keymat(self, api_keys, keys, name):
762         km = getattr(keys, name)
763         api_km = getattr(api_keys, name)
764         api_km_len = getattr(api_keys, name + '_len')
765         self.assertEqual(len(km), api_km_len)
766         self.assertEqual(km, api_km[:api_km_len])
767
768     def verify_id(self, api_id, exp_id):
769         self.assertEqual(api_id.type, IDType.value(exp_id.type))
770         self.assertEqual(api_id.data_len, exp_id.data_len)
771         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
772
773     def verify_ike_sas(self):
774         r = self.vapi.ikev2_sa_dump()
775         self.assertEqual(len(r), 1)
776         sa = r[0].sa
777         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
778         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
779         if self.ip6:
780             if self.sa.is_initiator:
781                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
782                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
783             else:
784                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
785                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
786         else:
787             if self.sa.is_initiator:
788                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
789                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
790             else:
791                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
792                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
793         self.verify_keymat(sa.keys, self.sa, 'sk_d')
794         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
795         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
796         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
797         self.verify_keymat(sa.keys, self.sa, 'sk_er')
798         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
799         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
800
801         self.assertEqual(sa.i_id.type, self.sa.id_type)
802         self.assertEqual(sa.r_id.type, self.sa.id_type)
803         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
804         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
805         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
806         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
807
808         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
809         self.assertEqual(len(r), 1)
810         csa = r[0].child_sa
811         self.assertEqual(csa.sa_index, sa.sa_index)
812         c = self.sa.child_sas[0]
813         if hasattr(c, 'sk_ai'):
814             self.verify_keymat(csa.keys, c, 'sk_ai')
815             self.verify_keymat(csa.keys, c, 'sk_ar')
816         self.verify_keymat(csa.keys, c, 'sk_ei')
817         self.verify_keymat(csa.keys, c, 'sk_er')
818         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
819         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
820
821         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
822         tsi = tsi[0]
823         tsr = tsr[0]
824         r = self.vapi.ikev2_traffic_selector_dump(
825                 is_initiator=True, sa_index=sa.sa_index,
826                 child_sa_index=csa.child_sa_index)
827         self.assertEqual(len(r), 1)
828         ts = r[0].ts
829         self.verify_ts(r[0].ts, tsi[0], True)
830
831         r = self.vapi.ikev2_traffic_selector_dump(
832                 is_initiator=False, sa_index=sa.sa_index,
833                 child_sa_index=csa.child_sa_index)
834         self.assertEqual(len(r), 1)
835         self.verify_ts(r[0].ts, tsr[0], False)
836
837         n = self.vapi.ikev2_nonce_get(is_initiator=True,
838                                       sa_index=sa.sa_index)
839         self.verify_nonce(n, self.sa.i_nonce)
840         n = self.vapi.ikev2_nonce_get(is_initiator=False,
841                                       sa_index=sa.sa_index)
842         self.verify_nonce(n, self.sa.r_nonce)
843
844     def verify_nonce(self, api_nonce, nonce):
845         self.assertEqual(api_nonce.data_len, len(nonce))
846         self.assertEqual(api_nonce.nonce, nonce)
847
848     def verify_ts(self, api_ts, ts, is_initiator):
849         if is_initiator:
850             self.assertTrue(api_ts.is_local)
851         else:
852             self.assertFalse(api_ts.is_local)
853
854         if self.p.ts_is_ip4:
855             self.assertEqual(api_ts.start_addr,
856                              IPv4Address(ts.starting_address_v4))
857             self.assertEqual(api_ts.end_addr,
858                              IPv4Address(ts.ending_address_v4))
859         else:
860             self.assertEqual(api_ts.start_addr,
861                              IPv6Address(ts.starting_address_v6))
862             self.assertEqual(api_ts.end_addr,
863                              IPv6Address(ts.ending_address_v6))
864         self.assertEqual(api_ts.start_port, ts.start_port)
865         self.assertEqual(api_ts.end_port, ts.end_port)
866         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
867
868
869 class TemplateInitiator(IkePeer):
870     """ initiator test template """
871
872     def initiate_del_sa_from_initiator(self):
873         ispi = int.from_bytes(self.sa.ispi, 'little')
874         self.pg0.enable_capture()
875         self.pg_start()
876         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
877         capture = self.pg0.get_capture(1)
878         ih = self.get_ike_header(capture[0])
879         self.assertNotIn('Response', ih.flags)
880         self.assertIn('Initiator', ih.flags)
881         self.assertEqual(ih.init_SPI, self.sa.ispi)
882         self.assertEqual(ih.resp_SPI, self.sa.rspi)
883         plain = self.sa.hmac_and_decrypt(ih)
884         d = ikev2.IKEv2_payload_Delete(plain)
885         self.assertEqual(d.proto, 1)  # proto=IKEv2
886         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
887                              flags='Response', exch_type='INFORMATIONAL',
888                              id=ih.id, next_payload='Encrypted')
889         resp = self.encrypt_ike_msg(header, b'', None)
890         self.send_and_assert_no_replies(self.pg0, resp)
891
892     def verify_del_sa(self, packet):
893         ih = self.get_ike_header(packet)
894         self.assertEqual(ih.id, self.sa.msg_id)
895         self.assertEqual(ih.exch_type, 37)  # exchange informational
896         self.assertIn('Response', ih.flags)
897         self.assertIn('Initiator', ih.flags)
898         plain = self.sa.hmac_and_decrypt(ih)
899         self.assertEqual(plain, b'')
900
901     def initiate_del_sa_from_responder(self):
902         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
903                              exch_type='INFORMATIONAL',
904                              id=self.sa.new_msg_id())
905         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
906         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
907         packet = self.create_packet(self.pg0, ike_msg,
908                                     self.sa.sport, self.sa.dport,
909                                     self.sa.natt, self.ip6)
910         self.pg0.add_stream(packet)
911         self.pg0.enable_capture()
912         self.pg_start()
913         capture = self.pg0.get_capture(1)
914         self.verify_del_sa(capture[0])
915
916     @staticmethod
917     def find_notify_payload(packet, notify_type):
918         n = packet[ikev2.IKEv2_payload_Notify]
919         while n is not None:
920             if n.type == notify_type:
921                 return n
922             n = n.payload
923         return None
924
925     def verify_nat_detection(self, packet):
926         if self.ip6:
927             iph = packet[IPv6]
928         else:
929             iph = packet[IP]
930         udp = packet[UDP]
931
932         # NAT_DETECTION_SOURCE_IP
933         s = self.find_notify_payload(packet, 16388)
934         self.assertIsNotNone(s)
935         src_sha = self.sa.compute_nat_sha1(
936                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
937         self.assertEqual(s.load, src_sha)
938
939         # NAT_DETECTION_DESTINATION_IP
940         s = self.find_notify_payload(packet, 16389)
941         self.assertIsNotNone(s)
942         dst_sha = self.sa.compute_nat_sha1(
943                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
944         self.assertEqual(s.load, dst_sha)
945
946     def verify_sa_init_request(self, packet):
947         udp = packet[UDP]
948         self.sa.dport = udp.sport
949         ih = packet[ikev2.IKEv2]
950         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
951         self.assertEqual(ih.exch_type, 34)  # SA_INIT
952         self.sa.ispi = ih.init_SPI
953         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
954         self.assertIn('Initiator', ih.flags)
955         self.assertNotIn('Response', ih.flags)
956         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
957         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
958
959         prop = packet[ikev2.IKEv2_payload_Proposal]
960         self.assertEqual(prop.proto, 1)  # proto = ikev2
961         self.assertEqual(prop.proposal, 1)
962         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
963         self.assertEqual(prop.trans[0].transform_id,
964                          self.p.ike_transforms['crypto_alg'])
965         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
966         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
967         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
968         self.assertEqual(prop.trans[2].transform_id,
969                          self.p.ike_transforms['dh_group'])
970
971         self.verify_nat_detection(packet)
972         self.sa.set_ike_props(
973                     crypto='AES-GCM-16ICV', crypto_key_len=32,
974                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
975         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
976                               integ='SHA2-256-128')
977         self.sa.generate_dh_data()
978         self.sa.complete_dh_data()
979         self.sa.calc_keys()
980
981     def update_esp_transforms(self, trans, sa):
982         while trans:
983             if trans.transform_type == 1:  # ecryption
984                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
985             elif trans.transform_type == 3:  # integrity
986                 sa.esp_integ = INTEG_IDS[trans.transform_id]
987             trans = trans.payload
988
989     def verify_sa_auth_req(self, packet):
990         udp = packet[UDP]
991         self.sa.dport = udp.sport
992         ih = self.get_ike_header(packet)
993         self.assertEqual(ih.resp_SPI, self.sa.rspi)
994         self.assertEqual(ih.init_SPI, self.sa.ispi)
995         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
996         self.assertIn('Initiator', ih.flags)
997         self.assertNotIn('Response', ih.flags)
998
999         udp = packet[UDP]
1000         self.verify_udp(udp)
1001         self.assertEqual(ih.id, self.sa.msg_id + 1)
1002         self.sa.msg_id += 1
1003         plain = self.sa.hmac_and_decrypt(ih)
1004         idi = ikev2.IKEv2_payload_IDi(plain)
1005         idr = ikev2.IKEv2_payload_IDr(idi.payload)
1006         self.assertEqual(idi.load, self.sa.i_id)
1007         self.assertEqual(idr.load, self.sa.r_id)
1008         prop = idi[ikev2.IKEv2_payload_Proposal]
1009         c = self.sa.child_sas[0]
1010         c.ispi = prop.SPI
1011         self.update_esp_transforms(
1012                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1013
1014     def send_init_response(self):
1015         tr_attr = self.sa.ike_crypto_attr()
1016         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1017                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1018                  key_length=tr_attr[0]) /
1019                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1020                  transform_id=self.sa.ike_integ) /
1021                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1022                  transform_id=self.sa.ike_prf_alg.name) /
1023                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1024                  transform_id=self.sa.ike_dh))
1025         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1026                  trans_nb=4, trans=trans))
1027
1028         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1029         if self.sa.natt:
1030             dst_address = b'\x0a\x0a\x0a\x0a'
1031         else:
1032             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1033         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1034         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1035
1036         self.sa.init_resp_packet = (
1037             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1038                         exch_type='IKE_SA_INIT', flags='Response') /
1039             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1040             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1041                                    group=self.sa.ike_dh,
1042                                    load=self.sa.my_dh_pub_key) /
1043             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1044                                       next_payload='Notify') /
1045             ikev2.IKEv2_payload_Notify(
1046                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1047                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1048                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1049
1050         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1051                                      self.sa.sport, self.sa.dport,
1052                                      False, self.ip6)
1053         self.pg_send(self.pg0, ike_msg)
1054         capture = self.pg0.get_capture(1)
1055         self.verify_sa_auth_req(capture[0])
1056
1057     def initiate_sa_init(self):
1058         self.pg0.enable_capture()
1059         self.pg_start()
1060         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1061
1062         capture = self.pg0.get_capture(1)
1063         self.verify_sa_init_request(capture[0])
1064         self.send_init_response()
1065
1066     def send_auth_response(self):
1067         tr_attr = self.sa.esp_crypto_attr()
1068         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1069                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1070                  key_length=tr_attr[0]) /
1071                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1072                  transform_id=self.sa.esp_integ) /
1073                  ikev2.IKEv2_payload_Transform(
1074                  transform_type='Extended Sequence Number',
1075                  transform_id='No ESN') /
1076                  ikev2.IKEv2_payload_Transform(
1077                  transform_type='Extended Sequence Number',
1078                  transform_id='ESN'))
1079
1080         c = self.sa.child_sas[0]
1081         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1082                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1083
1084         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1085         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1086                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1087                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1088                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1089                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1090                  auth_type=AuthMethod.value(self.sa.auth_method),
1091                  load=self.sa.auth_data) /
1092                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1093                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1094                  number_of_TSs=len(tsi),
1095                  traffic_selector=tsi) /
1096                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1097                  number_of_TSs=len(tsr),
1098                  traffic_selector=tsr) /
1099                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1100
1101         header = ikev2.IKEv2(
1102                 init_SPI=self.sa.ispi,
1103                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1104                 flags='Response', exch_type='IKE_AUTH')
1105
1106         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1107         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1108                                     self.sa.dport, self.sa.natt, self.ip6)
1109         self.pg_send(self.pg0, packet)
1110
1111     def test_initiator(self):
1112         self.initiate_sa_init()
1113         self.sa.auth_init()
1114         self.sa.calc_child_keys()
1115         self.send_auth_response()
1116         self.verify_ike_sas()
1117
1118
1119 class TemplateResponder(IkePeer):
1120     """ responder test template """
1121
1122     def initiate_del_sa_from_responder(self):
1123         self.pg0.enable_capture()
1124         self.pg_start()
1125         self.vapi.ikev2_initiate_del_ike_sa(
1126                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1127         capture = self.pg0.get_capture(1)
1128         ih = self.get_ike_header(capture[0])
1129         self.assertNotIn('Response', ih.flags)
1130         self.assertNotIn('Initiator', ih.flags)
1131         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1132         plain = self.sa.hmac_and_decrypt(ih)
1133         d = ikev2.IKEv2_payload_Delete(plain)
1134         self.assertEqual(d.proto, 1)  # proto=IKEv2
1135         self.assertEqual(ih.init_SPI, self.sa.ispi)
1136         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1137         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1138                              flags='Initiator+Response',
1139                              exch_type='INFORMATIONAL',
1140                              id=ih.id, next_payload='Encrypted')
1141         resp = self.encrypt_ike_msg(header, b'', None)
1142         self.send_and_assert_no_replies(self.pg0, resp)
1143
1144     def verify_del_sa(self, packet):
1145         ih = self.get_ike_header(packet)
1146         self.assertEqual(ih.id, self.sa.msg_id)
1147         self.assertEqual(ih.exch_type, 37)  # exchange informational
1148         self.assertIn('Response', ih.flags)
1149         self.assertNotIn('Initiator', ih.flags)
1150         self.assertEqual(ih.next_payload, 46)  # Encrypted
1151         self.assertEqual(ih.init_SPI, self.sa.ispi)
1152         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1153         plain = self.sa.hmac_and_decrypt(ih)
1154         self.assertEqual(plain, b'')
1155
1156     def initiate_del_sa_from_initiator(self):
1157         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1158                              flags='Initiator', exch_type='INFORMATIONAL',
1159                              id=self.sa.new_msg_id())
1160         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1161         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1162         packet = self.create_packet(self.pg0, ike_msg,
1163                                     self.sa.sport, self.sa.dport,
1164                                     self.sa.natt, self.ip6)
1165         self.pg0.add_stream(packet)
1166         self.pg0.enable_capture()
1167         self.pg_start()
1168         capture = self.pg0.get_capture(1)
1169         self.verify_del_sa(capture[0])
1170
1171     def send_sa_init_req(self):
1172         tr_attr = self.sa.ike_crypto_attr()
1173         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1174                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1175                  key_length=tr_attr[0]) /
1176                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1177                  transform_id=self.sa.ike_integ) /
1178                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1179                  transform_id=self.sa.ike_prf_alg.name) /
1180                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1181                  transform_id=self.sa.ike_dh))
1182
1183         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1184                  trans_nb=4, trans=trans))
1185
1186         next_payload = None if self.ip6 else 'Notify'
1187
1188         self.sa.init_req_packet = (
1189                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1190                             flags='Initiator', exch_type='IKE_SA_INIT') /
1191                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1192                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1193                                        group=self.sa.ike_dh,
1194                                        load=self.sa.my_dh_pub_key) /
1195                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1196                                           load=self.sa.i_nonce))
1197
1198         if not self.ip6:
1199             if self.sa.i_natt:
1200                 src_address = b'\x0a\x0a\x0a\x01'
1201             else:
1202                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1203
1204             if self.sa.r_natt:
1205                 dst_address = b'\x0a\x0a\x0a\x0a'
1206             else:
1207                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1208
1209             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1210             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1211             nat_src_detection = ikev2.IKEv2_payload_Notify(
1212                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1213                     next_payload='Notify')
1214             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1215                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1216             self.sa.init_req_packet = (self.sa.init_req_packet /
1217                                        nat_src_detection /
1218                                        nat_dst_detection)
1219
1220         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1221                                      self.sa.sport, self.sa.dport,
1222                                      self.sa.natt, self.ip6)
1223         self.pg0.add_stream(ike_msg)
1224         self.pg0.enable_capture()
1225         self.pg_start()
1226         capture = self.pg0.get_capture(1)
1227         self.verify_sa_init(capture[0])
1228
1229     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1230         tr_attr = self.sa.esp_crypto_attr()
1231         last_payload = last_payload or 'Notify'
1232         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1233                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1234                  key_length=tr_attr[0]) /
1235                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1236                  transform_id=self.sa.esp_integ) /
1237                  ikev2.IKEv2_payload_Transform(
1238                  transform_type='Extended Sequence Number',
1239                  transform_id='No ESN') /
1240                  ikev2.IKEv2_payload_Transform(
1241                  transform_type='Extended Sequence Number',
1242                  transform_id='ESN'))
1243
1244         c = self.sa.child_sas[0]
1245         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1246                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1247
1248         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1249         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1250                  auth_type=AuthMethod.value(self.sa.auth_method),
1251                  load=self.sa.auth_data) /
1252                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1253                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1254                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1255                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1256                  number_of_TSs=len(tsr), traffic_selector=tsr))
1257
1258         if is_rekey:
1259             first_payload = 'Nonce'
1260             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1261                      next_payload='SA') / plain /
1262                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1263                      proto='ESP', SPI=c.ispi))
1264         else:
1265             first_payload = 'IDi'
1266             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1267                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1268                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1269                    IDtype=self.sa.id_type, load=self.sa.r_id))
1270             plain = ids / plain
1271         return plain, first_payload
1272
1273     def send_sa_auth(self):
1274         plain, first_payload = self.generate_auth_payload(
1275                     last_payload='Notify')
1276         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1277         header = ikev2.IKEv2(
1278                 init_SPI=self.sa.ispi,
1279                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1280                 flags='Initiator', exch_type='IKE_AUTH')
1281
1282         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1283         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1284                                     self.sa.dport, self.sa.natt, self.ip6)
1285         self.pg0.add_stream(packet)
1286         self.pg0.enable_capture()
1287         self.pg_start()
1288         capture = self.pg0.get_capture(1)
1289         self.verify_sa_auth_resp(capture[0])
1290
1291     def verify_sa_init(self, packet):
1292         ih = self.get_ike_header(packet)
1293
1294         self.assertEqual(ih.id, self.sa.msg_id)
1295         self.assertEqual(ih.exch_type, 34)
1296         self.assertIn('Response', ih.flags)
1297         self.assertEqual(ih.init_SPI, self.sa.ispi)
1298         self.assertNotEqual(ih.resp_SPI, 0)
1299         self.sa.rspi = ih.resp_SPI
1300         try:
1301             sa = ih[ikev2.IKEv2_payload_SA]
1302             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1303             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1304         except IndexError as e:
1305             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1306             self.logger.error(ih.show())
1307             raise
1308         self.sa.complete_dh_data()
1309         self.sa.calc_keys()
1310         self.sa.auth_init()
1311
1312     def verify_sa_auth_resp(self, packet):
1313         ike = self.get_ike_header(packet)
1314         udp = packet[UDP]
1315         self.verify_udp(udp)
1316         self.assertEqual(ike.id, self.sa.msg_id)
1317         plain = self.sa.hmac_and_decrypt(ike)
1318         idr = ikev2.IKEv2_payload_IDr(plain)
1319         prop = idr[ikev2.IKEv2_payload_Proposal]
1320         self.assertEqual(prop.SPIsize, 4)
1321         self.sa.child_sas[0].rspi = prop.SPI
1322         self.sa.calc_child_keys()
1323
1324     IKE_NODE_SUFFIX = 'ip4'
1325
1326     def verify_counters(self):
1327         self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1328         self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1329         self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1330
1331         r = self.vapi.ikev2_sa_dump()
1332         s = r[0].sa.stats
1333         self.assertEqual(1, s.n_sa_auth_req)
1334         self.assertEqual(1, s.n_sa_init_req)
1335
1336     def test_responder(self):
1337         self.send_sa_init_req()
1338         self.send_sa_auth()
1339         self.verify_ipsec_sas()
1340         self.verify_ike_sas()
1341         self.verify_counters()
1342
1343
1344 class Ikev2Params(object):
1345     def config_params(self, params={}):
1346         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1347         ei = VppEnum.vl_api_ipsec_integ_alg_t
1348         self.vpp_enums = {
1349                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1350                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1351                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1352                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1353                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1354                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1355
1356                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1357                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1358                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1359                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1360
1361         dpd_disabled = True if 'dpd_disabled' not in params else\
1362             params['dpd_disabled']
1363         if dpd_disabled:
1364             self.vapi.cli('ikev2 dpd disable')
1365         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1366             not in params else params['del_sa_from_responder']
1367         i_natt = False if 'i_natt' not in params else params['i_natt']
1368         r_natt = False if 'r_natt' not in params else params['r_natt']
1369         self.p = Profile(self, 'pr1')
1370         self.ip6 = False if 'ip6' not in params else params['ip6']
1371
1372         if 'auth' in params and params['auth'] == 'rsa-sig':
1373             auth_method = 'rsa-sig'
1374             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1375             self.vapi.ikev2_set_local_key(
1376                     key_file=work_dir + params['server-key'])
1377
1378             client_file = work_dir + params['client-cert']
1379             server_pem = open(work_dir + params['server-cert']).read()
1380             client_priv = open(work_dir + params['client-key']).read()
1381             client_priv = load_pem_private_key(str.encode(client_priv), None,
1382                                                default_backend())
1383             self.peer_cert = x509.load_pem_x509_certificate(
1384                     str.encode(server_pem),
1385                     default_backend())
1386             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1387             auth_data = None
1388         else:
1389             auth_data = b'$3cr3tpa$$w0rd'
1390             self.p.add_auth(method='shared-key', data=auth_data)
1391             auth_method = 'shared-key'
1392             client_priv = None
1393
1394         is_init = True if 'is_initiator' not in params else\
1395             params['is_initiator']
1396
1397         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1398         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1399         if is_init:
1400             self.p.add_local_id(**idr)
1401             self.p.add_remote_id(**idi)
1402         else:
1403             self.p.add_local_id(**idi)
1404             self.p.add_remote_id(**idr)
1405
1406         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1407             'loc_ts' not in params else params['loc_ts']
1408         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1409             'rem_ts' not in params else params['rem_ts']
1410         self.p.add_local_ts(**loc_ts)
1411         self.p.add_remote_ts(**rem_ts)
1412         if 'responder' in params:
1413             self.p.add_responder(params['responder'])
1414         if 'ike_transforms' in params:
1415             self.p.add_ike_transforms(params['ike_transforms'])
1416         if 'esp_transforms' in params:
1417             self.p.add_esp_transforms(params['esp_transforms'])
1418
1419         udp_encap = False if 'udp_encap' not in params else\
1420             params['udp_encap']
1421         if udp_encap:
1422             self.p.set_udp_encap(True)
1423
1424         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1425                           is_initiator=is_init,
1426                           id_type=self.p.local_id['id_type'],
1427                           i_natt=i_natt, r_natt=r_natt,
1428                           priv_key=client_priv, auth_method=auth_method,
1429                           auth_data=auth_data, udp_encap=udp_encap,
1430                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1431         if is_init:
1432             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1433                 params['ike-crypto']
1434             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1435                 params['ike-integ']
1436             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1437                 params['ike-dh']
1438
1439             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1440                 params['esp-crypto']
1441             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1442                 params['esp-integ']
1443
1444             self.sa.set_ike_props(
1445                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1446                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1447             self.sa.set_esp_props(
1448                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1449                     integ=esp_integ)
1450
1451
1452 class TestApi(VppTestCase):
1453     """ Test IKEV2 API """
1454     @classmethod
1455     def setUpClass(cls):
1456         super(TestApi, cls).setUpClass()
1457
1458     @classmethod
1459     def tearDownClass(cls):
1460         super(TestApi, cls).tearDownClass()
1461
1462     def tearDown(self):
1463         super(TestApi, self).tearDown()
1464         self.p1.remove_vpp_config()
1465         self.p2.remove_vpp_config()
1466         r = self.vapi.ikev2_profile_dump()
1467         self.assertEqual(len(r), 0)
1468
1469     def configure_profile(self, cfg):
1470         p = Profile(self, cfg['name'])
1471         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1472         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1473         p.add_local_ts(**cfg['loc_ts'])
1474         p.add_remote_ts(**cfg['rem_ts'])
1475         p.add_responder(cfg['responder'])
1476         p.add_ike_transforms(cfg['ike_ts'])
1477         p.add_esp_transforms(cfg['esp_ts'])
1478         p.add_auth(**cfg['auth'])
1479         p.set_udp_encap(cfg['udp_encap'])
1480         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1481         if 'lifetime_data' in cfg:
1482             p.set_lifetime_data(cfg['lifetime_data'])
1483         if 'tun_itf' in cfg:
1484             p.set_tunnel_interface(cfg['tun_itf'])
1485         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1486             p.disable_natt()
1487         p.add_vpp_config()
1488         return p
1489
1490     def test_profile_api(self):
1491         """ test profile dump API """
1492         loc_ts4 = {
1493                     'proto': 8,
1494                     'start_port': 1,
1495                     'end_port': 19,
1496                     'start_addr': '3.3.3.2',
1497                     'end_addr': '3.3.3.3',
1498                 }
1499         rem_ts4 = {
1500                     'proto': 9,
1501                     'start_port': 10,
1502                     'end_port': 119,
1503                     'start_addr': '4.5.76.80',
1504                     'end_addr': '2.3.4.6',
1505                 }
1506
1507         loc_ts6 = {
1508                     'proto': 8,
1509                     'start_port': 1,
1510                     'end_port': 19,
1511                     'start_addr': 'ab::1',
1512                     'end_addr': 'ab::4',
1513                 }
1514         rem_ts6 = {
1515                     'proto': 9,
1516                     'start_port': 10,
1517                     'end_port': 119,
1518                     'start_addr': 'cd::12',
1519                     'end_addr': 'cd::13',
1520                 }
1521
1522         conf = {
1523             'p1': {
1524                 'name': 'p1',
1525                 'natt_disabled': True,
1526                 'loc_id': ('fqdn', b'vpp.home'),
1527                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1528                 'loc_ts': loc_ts4,
1529                 'rem_ts': rem_ts4,
1530                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1531                 'ike_ts': {
1532                         'crypto_alg': 20,
1533                         'crypto_key_size': 32,
1534                         'integ_alg': 1,
1535                         'dh_group': 1},
1536                 'esp_ts': {
1537                         'crypto_alg': 13,
1538                         'crypto_key_size': 24,
1539                         'integ_alg': 2},
1540                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1541                 'udp_encap': True,
1542                 'ipsec_over_udp_port': 4501,
1543                 'lifetime_data': {
1544                     'lifetime': 123,
1545                     'lifetime_maxdata': 20192,
1546                     'lifetime_jitter': 9,
1547                     'handover': 132},
1548             },
1549             'p2': {
1550                 'name': 'p2',
1551                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1552                 'rem_id': ('ip6-addr', b'abcd::1'),
1553                 'loc_ts': loc_ts6,
1554                 'rem_ts': rem_ts6,
1555                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1556                 'ike_ts': {
1557                         'crypto_alg': 12,
1558                         'crypto_key_size': 16,
1559                         'integ_alg': 3,
1560                         'dh_group': 3},
1561                 'esp_ts': {
1562                         'crypto_alg': 9,
1563                         'crypto_key_size': 24,
1564                         'integ_alg': 4},
1565                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1566                 'udp_encap': False,
1567                 'ipsec_over_udp_port': 4600,
1568                 'tun_itf': 0}
1569         }
1570         self.p1 = self.configure_profile(conf['p1'])
1571         self.p2 = self.configure_profile(conf['p2'])
1572
1573         r = self.vapi.ikev2_profile_dump()
1574         self.assertEqual(len(r), 2)
1575         self.verify_profile(r[0].profile, conf['p1'])
1576         self.verify_profile(r[1].profile, conf['p2'])
1577
1578     def verify_id(self, api_id, cfg_id):
1579         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1580         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1581
1582     def verify_ts(self, api_ts, cfg_ts):
1583         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1584         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1585         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1586         self.assertEqual(api_ts.start_addr,
1587                          ip_address(text_type(cfg_ts['start_addr'])))
1588         self.assertEqual(api_ts.end_addr,
1589                          ip_address(text_type(cfg_ts['end_addr'])))
1590
1591     def verify_responder(self, api_r, cfg_r):
1592         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1593         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1594
1595     def verify_transforms(self, api_ts, cfg_ts):
1596         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1597         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1598         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1599
1600     def verify_ike_transforms(self, api_ts, cfg_ts):
1601         self.verify_transforms(api_ts, cfg_ts)
1602         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1603
1604     def verify_esp_transforms(self, api_ts, cfg_ts):
1605         self.verify_transforms(api_ts, cfg_ts)
1606
1607     def verify_auth(self, api_auth, cfg_auth):
1608         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1609         self.assertEqual(api_auth.data, cfg_auth['data'])
1610         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1611
1612     def verify_lifetime_data(self, p, ld):
1613         self.assertEqual(p.lifetime, ld['lifetime'])
1614         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1615         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1616         self.assertEqual(p.handover, ld['handover'])
1617
1618     def verify_profile(self, ap, cp):
1619         self.assertEqual(ap.name, cp['name'])
1620         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1621         self.verify_id(ap.loc_id, cp['loc_id'])
1622         self.verify_id(ap.rem_id, cp['rem_id'])
1623         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1624         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1625         self.verify_responder(ap.responder, cp['responder'])
1626         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1627         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1628         self.verify_auth(ap.auth, cp['auth'])
1629         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1630         self.assertTrue(natt_dis == ap.natt_disabled)
1631
1632         if 'lifetime_data' in cp:
1633             self.verify_lifetime_data(ap, cp['lifetime_data'])
1634         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1635         if 'tun_itf' in cp:
1636             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1637         else:
1638             self.assertEqual(ap.tun_itf, 0xffffffff)
1639
1640
1641 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1642     """ test responder - responder behind NAT """
1643
1644     IKE_NODE_SUFFIX = 'ip4-natt'
1645
1646     def config_tc(self):
1647         self.config_params({'r_natt': True})
1648
1649
1650 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1651     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1652
1653     def config_tc(self):
1654         self.config_params({
1655             'i_natt': True,
1656             'is_initiator': False,  # seen from test case perspective
1657                                     # thus vpp is initiator
1658             'responder': {'sw_if_index': self.pg0.sw_if_index,
1659                            'addr': self.pg0.remote_ip4},
1660             'ike-crypto': ('AES-GCM-16ICV', 32),
1661             'ike-integ': 'NULL',
1662             'ike-dh': '3072MODPgr',
1663             'ike_transforms': {
1664                 'crypto_alg': 20,  # "aes-gcm-16"
1665                 'crypto_key_size': 256,
1666                 'dh_group': 15,  # "modp-3072"
1667             },
1668             'esp_transforms': {
1669                 'crypto_alg': 12,  # "aes-cbc"
1670                 'crypto_key_size': 256,
1671                 # "hmac-sha2-256-128"
1672                 'integ_alg': 12}})
1673
1674
1675 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1676     """ test ikev2 initiator - pre shared key auth """
1677
1678     def config_tc(self):
1679         self.config_params({
1680             'is_initiator': False,  # seen from test case perspective
1681                                     # thus vpp is initiator
1682             'responder': {'sw_if_index': self.pg0.sw_if_index,
1683                            'addr': self.pg0.remote_ip4},
1684             'ike-crypto': ('AES-GCM-16ICV', 32),
1685             'ike-integ': 'NULL',
1686             'ike-dh': '3072MODPgr',
1687             'ike_transforms': {
1688                 'crypto_alg': 20,  # "aes-gcm-16"
1689                 'crypto_key_size': 256,
1690                 'dh_group': 15,  # "modp-3072"
1691             },
1692             'esp_transforms': {
1693                 'crypto_alg': 12,  # "aes-cbc"
1694                 'crypto_key_size': 256,
1695                 # "hmac-sha2-256-128"
1696                 'integ_alg': 12}})
1697
1698
1699 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1700     """ test initiator - request window size (1) """
1701
1702     def rekey_respond(self, req, update_child_sa_data):
1703         ih = self.get_ike_header(req)
1704         plain = self.sa.hmac_and_decrypt(ih)
1705         sa = ikev2.IKEv2_payload_SA(plain)
1706         if update_child_sa_data:
1707             prop = sa[ikev2.IKEv2_payload_Proposal]
1708             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1709             self.sa.r_nonce = self.sa.i_nonce
1710             self.sa.child_sas[0].ispi = prop.SPI
1711             self.sa.child_sas[0].rspi = prop.SPI
1712             self.sa.calc_child_keys()
1713
1714         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1715                              flags='Response', exch_type=36,
1716                              id=ih.id, next_payload='Encrypted')
1717         resp = self.encrypt_ike_msg(header, sa, 'SA')
1718         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1719                                     self.sa.dport, self.sa.natt, self.ip6)
1720         self.send_and_assert_no_replies(self.pg0, packet)
1721
1722     def test_initiator(self):
1723         super(TestInitiatorRequestWindowSize, self).test_initiator()
1724         self.pg0.enable_capture()
1725         self.pg_start()
1726         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1727         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1728         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1729         capture = self.pg0.get_capture(2)
1730
1731         # reply in reverse order
1732         self.rekey_respond(capture[1], True)
1733         self.rekey_respond(capture[0], False)
1734
1735         # verify that only the second request was accepted
1736         self.verify_ike_sas()
1737         self.verify_ipsec_sas(is_rekey=True)
1738
1739
1740 class TestInitiatorRekey(TestInitiatorPsk):
1741     """ test ikev2 initiator - rekey """
1742
1743     def rekey_from_initiator(self):
1744         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1745         self.pg0.enable_capture()
1746         self.pg_start()
1747         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1748         capture = self.pg0.get_capture(1)
1749         ih = self.get_ike_header(capture[0])
1750         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1751         self.assertNotIn('Response', ih.flags)
1752         self.assertIn('Initiator', ih.flags)
1753         plain = self.sa.hmac_and_decrypt(ih)
1754         sa = ikev2.IKEv2_payload_SA(plain)
1755         prop = sa[ikev2.IKEv2_payload_Proposal]
1756         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1757         self.sa.r_nonce = self.sa.i_nonce
1758         # update new responder SPI
1759         self.sa.child_sas[0].ispi = prop.SPI
1760         self.sa.child_sas[0].rspi = prop.SPI
1761         self.sa.calc_child_keys()
1762         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1763                              flags='Response', exch_type=36,
1764                              id=ih.id, next_payload='Encrypted')
1765         resp = self.encrypt_ike_msg(header, sa, 'SA')
1766         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1767                                     self.sa.dport, self.sa.natt, self.ip6)
1768         self.send_and_assert_no_replies(self.pg0, packet)
1769
1770     def test_initiator(self):
1771         super(TestInitiatorRekey, self).test_initiator()
1772         self.rekey_from_initiator()
1773         self.verify_ike_sas()
1774         self.verify_ipsec_sas(is_rekey=True)
1775
1776
1777 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1778     """ test ikev2 initiator - delete IKE SA from responder """
1779
1780     def config_tc(self):
1781         self.config_params({
1782             'del_sa_from_responder': True,
1783             'is_initiator': False,  # seen from test case perspective
1784                                     # thus vpp is initiator
1785             'responder': {'sw_if_index': self.pg0.sw_if_index,
1786                            'addr': self.pg0.remote_ip4},
1787             'ike-crypto': ('AES-GCM-16ICV', 32),
1788             'ike-integ': 'NULL',
1789             'ike-dh': '3072MODPgr',
1790             'ike_transforms': {
1791                 'crypto_alg': 20,  # "aes-gcm-16"
1792                 'crypto_key_size': 256,
1793                 'dh_group': 15,  # "modp-3072"
1794             },
1795             'esp_transforms': {
1796                 'crypto_alg': 12,  # "aes-cbc"
1797                 'crypto_key_size': 256,
1798                 # "hmac-sha2-256-128"
1799                 'integ_alg': 12}})
1800
1801
1802 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1803     """ test ikev2 responder - initiator behind NAT """
1804
1805     IKE_NODE_SUFFIX = 'ip4-natt'
1806
1807     def config_tc(self):
1808         self.config_params(
1809                 {'i_natt': True})
1810
1811
1812 class TestResponderPsk(TemplateResponder, Ikev2Params):
1813     """ test ikev2 responder - pre shared key auth """
1814     def config_tc(self):
1815         self.config_params()
1816
1817
1818 class TestResponderDpd(TestResponderPsk):
1819     """
1820     Dead peer detection test
1821     """
1822     def config_tc(self):
1823         self.config_params({'dpd_disabled': False})
1824
1825     def tearDown(self):
1826         pass
1827
1828     def test_responder(self):
1829         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1830         super(TestResponderDpd, self).test_responder()
1831         self.pg0.enable_capture()
1832         self.pg_start()
1833         # capture empty request but don't reply
1834         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1835         ih = self.get_ike_header(capture[0])
1836         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1837         plain = self.sa.hmac_and_decrypt(ih)
1838         self.assertEqual(plain, b'')
1839         # wait for SA expiration
1840         time.sleep(3)
1841         ike_sas = self.vapi.ikev2_sa_dump()
1842         self.assertEqual(len(ike_sas), 0)
1843         ipsec_sas = self.vapi.ipsec_sa_dump()
1844         self.assertEqual(len(ipsec_sas), 0)
1845
1846
1847 class TestResponderRekey(TestResponderPsk):
1848     """ test ikev2 responder - rekey """
1849
1850     def rekey_from_initiator(self):
1851         packet = self.create_rekey_request()
1852         self.pg0.add_stream(packet)
1853         self.pg0.enable_capture()
1854         self.pg_start()
1855         capture = self.pg0.get_capture(1)
1856         ih = self.get_ike_header(capture[0])
1857         plain = self.sa.hmac_and_decrypt(ih)
1858         sa = ikev2.IKEv2_payload_SA(plain)
1859         prop = sa[ikev2.IKEv2_payload_Proposal]
1860         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1861         # update new responder SPI
1862         self.sa.child_sas[0].rspi = prop.SPI
1863
1864     def test_responder(self):
1865         super(TestResponderRekey, self).test_responder()
1866         self.rekey_from_initiator()
1867         self.sa.calc_child_keys()
1868         self.verify_ike_sas()
1869         self.verify_ipsec_sas(is_rekey=True)
1870         self.assert_counter(1, 'rekey_req', 'ip4')
1871         r = self.vapi.ikev2_sa_dump()
1872         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1873
1874
1875 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1876     """ test ikev2 responder - cert based auth """
1877     def config_tc(self):
1878         self.config_params({
1879             'udp_encap': True,
1880             'auth': 'rsa-sig',
1881             'server-key': 'server-key.pem',
1882             'client-key': 'client-key.pem',
1883             'client-cert': 'client-cert.pem',
1884             'server-cert': 'server-cert.pem'})
1885
1886
1887 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1888         (TemplateResponder, Ikev2Params):
1889     """
1890     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1891     """
1892     def config_tc(self):
1893         self.config_params({
1894             'ike-crypto': ('AES-CBC', 16),
1895             'ike-integ': 'SHA2-256-128',
1896             'esp-crypto': ('AES-CBC', 24),
1897             'esp-integ': 'SHA2-384-192',
1898             'ike-dh': '2048MODPgr'})
1899
1900
1901 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1902         (TemplateResponder, Ikev2Params):
1903
1904     """
1905     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1906     """
1907     def config_tc(self):
1908         self.config_params({
1909             'ike-crypto': ('AES-CBC', 32),
1910             'ike-integ': 'SHA2-256-128',
1911             'esp-crypto': ('AES-GCM-16ICV', 32),
1912             'esp-integ': 'NULL',
1913             'ike-dh': '3072MODPgr'})
1914
1915
1916 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1917     """
1918     IKE:AES_GCM_16_256
1919     """
1920
1921     IKE_NODE_SUFFIX = 'ip6'
1922
1923     def config_tc(self):
1924         self.config_params({
1925             'del_sa_from_responder': True,
1926             'ip6': True,
1927             'natt': True,
1928             'ike-crypto': ('AES-GCM-16ICV', 32),
1929             'ike-integ': 'NULL',
1930             'ike-dh': '2048MODPgr',
1931             'loc_ts': {'start_addr': 'ab:cd::0',
1932                        'end_addr': 'ab:cd::10'},
1933             'rem_ts': {'start_addr': '11::0',
1934                        'end_addr': '11::100'}})
1935
1936
1937 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1938     """
1939     Test for keep alive messages
1940     """
1941
1942     def send_empty_req_from_responder(self):
1943         packet = self.create_empty_request()
1944         self.pg0.add_stream(packet)
1945         self.pg0.enable_capture()
1946         self.pg_start()
1947         capture = self.pg0.get_capture(1)
1948         ih = self.get_ike_header(capture[0])
1949         self.assertEqual(ih.id, self.sa.msg_id)
1950         plain = self.sa.hmac_and_decrypt(ih)
1951         self.assertEqual(plain, b'')
1952         self.assert_counter(1, 'keepalive', 'ip4')
1953         r = self.vapi.ikev2_sa_dump()
1954         self.assertEqual(1, r[0].sa.stats.n_keepalives)
1955
1956     def test_initiator(self):
1957         super(TestInitiatorKeepaliveMsg, self).test_initiator()
1958         self.send_empty_req_from_responder()
1959
1960
1961 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1962     """ malformed packet test """
1963
1964     def tearDown(self):
1965         pass
1966
1967     def config_tc(self):
1968         self.config_params()
1969
1970     def create_ike_init_msg(self, length=None, payload=None):
1971         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1972                           flags='Initiator', exch_type='IKE_SA_INIT')
1973         if payload is not None:
1974             msg /= payload
1975         return self.create_packet(self.pg0, msg, self.sa.sport,
1976                                   self.sa.dport)
1977
1978     def verify_bad_packet_length(self):
1979         ike_msg = self.create_ike_init_msg(length=0xdead)
1980         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1981         self.assert_counter(self.pkt_count, 'bad_length')
1982
1983     def verify_bad_sa_payload_length(self):
1984         p = ikev2.IKEv2_payload_SA(length=0xdead)
1985         ike_msg = self.create_ike_init_msg(payload=p)
1986         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1987         self.assert_counter(self.pkt_count, 'malformed_packet')
1988
1989     def test_responder(self):
1990         self.pkt_count = 254
1991         self.verify_bad_packet_length()
1992         self.verify_bad_sa_payload_length()
1993
1994
1995 if __name__ == '__main__':
1996     unittest.main(testRunner=VppTestRunner)