ikev2: use new counters data model & add more counters
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
24
25 try:
26     text_type = unicode
27 except NameError:
28     text_type = str
29
30 KEY_PAD = b"Key Pad for IKEv2"
31 SALT_SIZE = 4
32 GCM_ICV_SIZE = 16
33 GCM_IV_SIZE = 8
34
35
36 # defined in rfc3526
37 # tuple structure is (p, g, key_len)
38 DH = {
39     '2048MODPgr': (long_converter("""
40     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
51
52     '3072MODPgr': (long_converter("""
53     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
69 }
70
71
72 class CryptoAlgo(object):
73     def __init__(self, name, cipher, mode):
74         self.name = name
75         self.cipher = cipher
76         self.mode = mode
77         if self.cipher is not None:
78             self.bs = self.cipher.block_size // 8
79
80             if self.name == 'AES-GCM-16ICV':
81                 self.iv_len = GCM_IV_SIZE
82             else:
83                 self.iv_len = self.bs
84
85     def encrypt(self, data, key, aad=None):
86         iv = os.urandom(self.iv_len)
87         if aad is None:
88             encryptor = Cipher(self.cipher(key), self.mode(iv),
89                                default_backend()).encryptor()
90             return iv + encryptor.update(data) + encryptor.finalize()
91         else:
92             salt = key[-SALT_SIZE:]
93             nonce = salt + iv
94             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95                                default_backend()).encryptor()
96             encryptor.authenticate_additional_data(aad)
97             data = encryptor.update(data) + encryptor.finalize()
98             data += encryptor.tag[:GCM_ICV_SIZE]
99             return iv + data
100
101     def decrypt(self, data, key, aad=None, icv=None):
102         if aad is None:
103             iv = data[:self.iv_len]
104             ct = data[self.iv_len:]
105             decryptor = Cipher(algorithms.AES(key),
106                                self.mode(iv),
107                                default_backend()).decryptor()
108             return decryptor.update(ct) + decryptor.finalize()
109         else:
110             salt = key[-SALT_SIZE:]
111             nonce = salt + data[:GCM_IV_SIZE]
112             ct = data[GCM_IV_SIZE:]
113             key = key[:-SALT_SIZE]
114             decryptor = Cipher(algorithms.AES(key),
115                                self.mode(nonce, icv, len(icv)),
116                                default_backend()).decryptor()
117             decryptor.authenticate_additional_data(aad)
118             return decryptor.update(ct) + decryptor.finalize()
119
120     def pad(self, data):
121         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122         data = data + b'\x00' * (pad_len - 1)
123         return data + bytes([pad_len - 1])
124
125
126 class AuthAlgo(object):
127     def __init__(self, name, mac, mod, key_len, trunc_len=None):
128         self.name = name
129         self.mac = mac
130         self.mod = mod
131         self.key_len = key_len
132         self.trunc_len = trunc_len or key_len
133
134
135 CRYPTO_ALGOS = {
136     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
139                                 mode=modes.GCM),
140 }
141
142 AUTH_ALGOS = {
143     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
148 }
149
150 PRF_ALGOS = {
151     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
153                                   hashes.SHA256, 32),
154 }
155
156 CRYPTO_IDS = {
157     12: 'AES-CBC',
158     20: 'AES-GCM-16ICV',
159 }
160
161 INTEG_IDS = {
162     2: 'HMAC-SHA1-96',
163     12: 'SHA2-256-128',
164     13: 'SHA2-384-192',
165     14: 'SHA2-512-256',
166 }
167
168
169 class IKEv2ChildSA(object):
170     def __init__(self, local_ts, remote_ts, is_initiator):
171         spi = os.urandom(4)
172         if is_initiator:
173             self.ispi = spi
174             self.rspi = None
175         else:
176             self.rspi = spi
177             self.ispi = None
178         self.local_ts = local_ts
179         self.remote_ts = remote_ts
180
181
182 class IKEv2SA(object):
183     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186                  auth_method='shared-key', priv_key=None, i_natt=False,
187                  r_natt=False, udp_encap=False):
188         self.udp_encap = udp_encap
189         self.i_natt = i_natt
190         self.r_natt = r_natt
191         if i_natt or r_natt:
192             self.sport = 4500
193             self.dport = 4500
194         else:
195             self.sport = 500
196             self.dport = 500
197         self.msg_id = 0
198         self.dh_params = None
199         self.test = test
200         self.priv_key = priv_key
201         self.is_initiator = is_initiator
202         nonce = nonce or os.urandom(32)
203         self.auth_data = auth_data
204         self.i_id = i_id
205         self.r_id = r_id
206         if isinstance(id_type, str):
207             self.id_type = IDType.value(id_type)
208         else:
209             self.id_type = id_type
210         self.auth_method = auth_method
211         if self.is_initiator:
212             self.rspi = 8 * b'\x00'
213             self.ispi = spi
214             self.i_nonce = nonce
215         else:
216             self.rspi = spi
217             self.ispi = 8 * b'\x00'
218             self.r_nonce = nonce
219         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
220                           self.is_initiator)]
221
222     def new_msg_id(self):
223         self.msg_id += 1
224         return self.msg_id
225
226     @property
227     def my_dh_pub_key(self):
228         if self.is_initiator:
229             return self.i_dh_data
230         return self.r_dh_data
231
232     @property
233     def peer_dh_pub_key(self):
234         if self.is_initiator:
235             return self.r_dh_data
236         return self.i_dh_data
237
238     @property
239     def natt(self):
240         return self.i_natt or self.r_natt
241
242     def compute_secret(self):
243         priv = self.dh_private_key
244         peer = self.peer_dh_pub_key
245         p, g, l = self.ike_group
246         return pow(int.from_bytes(peer, 'big'),
247                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
248
249     def generate_dh_data(self):
250         # generate DH keys
251         if self.ike_dh not in DH:
252             raise NotImplementedError('%s not in DH group' % self.ike_dh)
253
254         if self.dh_params is None:
255             dhg = DH[self.ike_dh]
256             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
257             self.dh_params = pn.parameters(default_backend())
258
259         priv = self.dh_params.generate_private_key()
260         pub = priv.public_key()
261         x = priv.private_numbers().x
262         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
263         y = pub.public_numbers().y
264
265         if self.is_initiator:
266             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
267         else:
268             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
269
270     def complete_dh_data(self):
271         self.dh_shared_secret = self.compute_secret()
272
273     def calc_child_keys(self):
274         prf = self.ike_prf_alg.mod()
275         s = self.i_nonce + self.r_nonce
276         c = self.child_sas[0]
277
278         encr_key_len = self.esp_crypto_key_len
279         integ_key_len = self.esp_integ_alg.key_len
280         salt_len = 0 if integ_key_len else 4
281
282         l = (integ_key_len * 2 +
283              encr_key_len * 2 +
284              salt_len * 2)
285         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
286
287         pos = 0
288         c.sk_ei = keymat[pos:pos+encr_key_len]
289         pos += encr_key_len
290
291         if integ_key_len:
292             c.sk_ai = keymat[pos:pos+integ_key_len]
293             pos += integ_key_len
294         else:
295             c.salt_ei = keymat[pos:pos+salt_len]
296             pos += salt_len
297
298         c.sk_er = keymat[pos:pos+encr_key_len]
299         pos += encr_key_len
300
301         if integ_key_len:
302             c.sk_ar = keymat[pos:pos+integ_key_len]
303             pos += integ_key_len
304         else:
305             c.salt_er = keymat[pos:pos+salt_len]
306             pos += salt_len
307
308     def calc_prfplus(self, prf, key, seed, length):
309         r = b''
310         t = None
311         x = 1
312         while len(r) < length and x < 255:
313             if t is not None:
314                 s = t
315             else:
316                 s = b''
317             s = s + seed + bytes([x])
318             t = self.calc_prf(prf, key, s)
319             r = r + t
320             x = x + 1
321
322         if x == 255:
323             return None
324         return r
325
326     def calc_prf(self, prf, key, data):
327         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
328         h.update(data)
329         return h.finalize()
330
331     def calc_keys(self):
332         prf = self.ike_prf_alg.mod()
333         # SKEYSEED = prf(Ni | Nr, g^ir)
334         s = self.i_nonce + self.r_nonce
335         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
336
337         # calculate S = Ni | Nr | SPIi SPIr
338         s = s + self.ispi + self.rspi
339
340         prf_key_trunc = self.ike_prf_alg.trunc_len
341         encr_key_len = self.ike_crypto_key_len
342         tr_prf_key_len = self.ike_prf_alg.key_len
343         integ_key_len = self.ike_integ_alg.key_len
344         if integ_key_len == 0:
345             salt_size = 4
346         else:
347             salt_size = 0
348
349         l = (prf_key_trunc +
350              integ_key_len * 2 +
351              encr_key_len * 2 +
352              tr_prf_key_len * 2 +
353              salt_size * 2)
354         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
355
356         pos = 0
357         self.sk_d = keymat[:pos+prf_key_trunc]
358         pos += prf_key_trunc
359
360         self.sk_ai = keymat[pos:pos+integ_key_len]
361         pos += integ_key_len
362         self.sk_ar = keymat[pos:pos+integ_key_len]
363         pos += integ_key_len
364
365         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
366         pos += encr_key_len + salt_size
367         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
368         pos += encr_key_len + salt_size
369
370         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
371         pos += tr_prf_key_len
372         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
373
374     def generate_authmsg(self, prf, packet):
375         if self.is_initiator:
376             id = self.i_id
377             nonce = self.r_nonce
378             key = self.sk_pi
379         else:
380             id = self.r_id
381             nonce = self.i_nonce
382             key = self.sk_pr
383         data = bytes([self.id_type, 0, 0, 0]) + id
384         id_hash = self.calc_prf(prf, key, data)
385         return packet + nonce + id_hash
386
387     def auth_init(self):
388         prf = self.ike_prf_alg.mod()
389         if self.is_initiator:
390             packet = self.init_req_packet
391         else:
392             packet = self.init_resp_packet
393         authmsg = self.generate_authmsg(prf, raw(packet))
394         if self.auth_method == 'shared-key':
395             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
396             self.auth_data = self.calc_prf(prf, psk, authmsg)
397         elif self.auth_method == 'rsa-sig':
398             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
399                                                 hashes.SHA1())
400         else:
401             raise TypeError('unknown auth method type!')
402
403     def encrypt(self, data, aad=None):
404         data = self.ike_crypto_alg.pad(data)
405         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
406
407     @property
408     def peer_authkey(self):
409         if self.is_initiator:
410             return self.sk_ar
411         return self.sk_ai
412
413     @property
414     def my_authkey(self):
415         if self.is_initiator:
416             return self.sk_ai
417         return self.sk_ar
418
419     @property
420     def my_cryptokey(self):
421         if self.is_initiator:
422             return self.sk_ei
423         return self.sk_er
424
425     @property
426     def peer_cryptokey(self):
427         if self.is_initiator:
428             return self.sk_er
429         return self.sk_ei
430
431     def concat(self, alg, key_len):
432         return alg + '-' + str(key_len * 8)
433
434     @property
435     def vpp_ike_cypto_alg(self):
436         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
437
438     @property
439     def vpp_esp_cypto_alg(self):
440         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
441
442     def verify_hmac(self, ikemsg):
443         integ_trunc = self.ike_integ_alg.trunc_len
444         exp_hmac = ikemsg[-integ_trunc:]
445         data = ikemsg[:-integ_trunc]
446         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
447                                           self.peer_authkey, data)
448         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
449
450     def compute_hmac(self, integ, key, data):
451         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
452         h.update(data)
453         return h.finalize()
454
455     def decrypt(self, data, aad=None, icv=None):
456         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
457
458     def hmac_and_decrypt(self, ike):
459         ep = ike[ikev2.IKEv2_payload_Encrypted]
460         if self.ike_crypto == 'AES-GCM-16ICV':
461             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
462             ct = ep.load[:-GCM_ICV_SIZE]
463             tag = ep.load[-GCM_ICV_SIZE:]
464             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
465         else:
466             self.verify_hmac(raw(ike))
467             integ_trunc = self.ike_integ_alg.trunc_len
468
469             # remove ICV and decrypt payload
470             ct = ep.load[:-integ_trunc]
471             plain = self.decrypt(ct)
472         # remove padding
473         pad_len = plain[-1]
474         return plain[:-pad_len - 1]
475
476     def build_ts_addr(self, ts, version):
477         return {'starting_address_v' + version: ts['start_addr'],
478                 'ending_address_v' + version: ts['end_addr']}
479
480     def generate_ts(self, is_ip4):
481         c = self.child_sas[0]
482         ts_data = {'IP_protocol_ID': 0,
483                    'start_port': 0,
484                    'end_port': 0xffff}
485         if is_ip4:
486             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
487             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
488             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
489             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
490         else:
491             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
492             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
493             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
494             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
495
496         if self.is_initiator:
497             return ([ts1], [ts2])
498         return ([ts2], [ts1])
499
500     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
501         if crypto not in CRYPTO_ALGOS:
502             raise TypeError('unsupported encryption algo %r' % crypto)
503         self.ike_crypto = crypto
504         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
505         self.ike_crypto_key_len = crypto_key_len
506
507         if integ not in AUTH_ALGOS:
508             raise TypeError('unsupported auth algo %r' % integ)
509         self.ike_integ = None if integ == 'NULL' else integ
510         self.ike_integ_alg = AUTH_ALGOS[integ]
511
512         if prf not in PRF_ALGOS:
513             raise TypeError('unsupported prf algo %r' % prf)
514         self.ike_prf = prf
515         self.ike_prf_alg = PRF_ALGOS[prf]
516         self.ike_dh = dh
517         self.ike_group = DH[self.ike_dh]
518
519     def set_esp_props(self, crypto, crypto_key_len, integ):
520         self.esp_crypto_key_len = crypto_key_len
521         if crypto not in CRYPTO_ALGOS:
522             raise TypeError('unsupported encryption algo %r' % crypto)
523         self.esp_crypto = crypto
524         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
525
526         if integ not in AUTH_ALGOS:
527             raise TypeError('unsupported auth algo %r' % integ)
528         self.esp_integ = None if integ == 'NULL' else integ
529         self.esp_integ_alg = AUTH_ALGOS[integ]
530
531     def crypto_attr(self, key_len):
532         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
533             return (0x800e << 16 | key_len << 3, 12)
534         else:
535             raise Exception('unsupported attribute type')
536
537     def ike_crypto_attr(self):
538         return self.crypto_attr(self.ike_crypto_key_len)
539
540     def esp_crypto_attr(self):
541         return self.crypto_attr(self.esp_crypto_key_len)
542
543     def compute_nat_sha1(self, ip, port, rspi=None):
544         if rspi is None:
545             rspi = self.rspi
546         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
547         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
548         digest.update(data)
549         return digest.finalize()
550
551
552 class IkePeer(VppTestCase):
553     """ common class for initiator and responder """
554
555     @classmethod
556     def setUpClass(cls):
557         import scapy.contrib.ikev2 as _ikev2
558         globals()['ikev2'] = _ikev2
559         super(IkePeer, cls).setUpClass()
560         cls.create_pg_interfaces(range(2))
561         for i in cls.pg_interfaces:
562             i.admin_up()
563             i.config_ip4()
564             i.resolve_arp()
565             i.config_ip6()
566             i.resolve_ndp()
567
568     @classmethod
569     def tearDownClass(cls):
570         super(IkePeer, cls).tearDownClass()
571
572     def tearDown(self):
573         super(IkePeer, self).tearDown()
574         if self.del_sa_from_responder:
575             self.initiate_del_sa_from_responder()
576         else:
577             self.initiate_del_sa_from_initiator()
578         r = self.vapi.ikev2_sa_dump()
579         self.assertEqual(len(r), 0)
580         sas = self.vapi.ipsec_sa_dump()
581         self.assertEqual(len(sas), 0)
582         self.p.remove_vpp_config()
583         self.assertIsNone(self.p.query_vpp_config())
584
585     def setUp(self):
586         super(IkePeer, self).setUp()
587         self.config_tc()
588         self.p.add_vpp_config()
589         self.assertIsNotNone(self.p.query_vpp_config())
590         if self.sa.is_initiator:
591             self.sa.generate_dh_data()
592         self.vapi.cli('ikev2 set logging level 4')
593         self.vapi.cli('event-lo clear')
594
595     def assert_counter(self, count, name, version='ip4'):
596         node_name = '/err/ikev2-%s/' % version + name
597         self.assertEqual(count, self.statistics.get_err_counter(node_name))
598
599     def create_rekey_request(self):
600         sa, first_payload = self.generate_auth_payload(is_rekey=True)
601         header = ikev2.IKEv2(
602                 init_SPI=self.sa.ispi,
603                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
604                 flags='Initiator', exch_type='CREATE_CHILD_SA')
605
606         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
607         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
608                                   self.sa.dport, self.sa.natt, self.ip6)
609
610     def create_empty_request(self):
611         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
612                              id=self.sa.new_msg_id(), flags='Initiator',
613                              exch_type='INFORMATIONAL',
614                              next_payload='Encrypted')
615
616         msg = self.encrypt_ike_msg(header, b'', None)
617         return self.create_packet(self.pg0, msg, self.sa.sport,
618                                   self.sa.dport, self.sa.natt, self.ip6)
619
620     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
621                       use_ip6=False):
622         if use_ip6:
623             src_ip = src_if.remote_ip6
624             dst_ip = src_if.local_ip6
625             ip_layer = IPv6
626         else:
627             src_ip = src_if.remote_ip4
628             dst_ip = src_if.local_ip4
629             ip_layer = IP
630         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
631                ip_layer(src=src_ip, dst=dst_ip) /
632                UDP(sport=sport, dport=dport))
633         if natt:
634             # insert non ESP marker
635             res = res / Raw(b'\x00' * 4)
636         return res / msg
637
638     def verify_udp(self, udp):
639         self.assertEqual(udp.sport, self.sa.sport)
640         self.assertEqual(udp.dport, self.sa.dport)
641
642     def get_ike_header(self, packet):
643         try:
644             ih = packet[ikev2.IKEv2]
645             ih = self.verify_and_remove_non_esp_marker(ih)
646         except IndexError as e:
647             # this is a workaround for getting IKEv2 layer as both ikev2 and
648             # ipsec register for port 4500
649             esp = packet[ESP]
650             ih = self.verify_and_remove_non_esp_marker(esp)
651         self.assertEqual(ih.version, 0x20)
652         self.assertNotIn('Version', ih.flags)
653         return ih
654
655     def verify_and_remove_non_esp_marker(self, packet):
656         if self.sa.natt:
657             # if we are in nat traversal mode check for non esp marker
658             # and remove it
659             data = raw(packet)
660             self.assertEqual(data[:4], b'\x00' * 4)
661             return ikev2.IKEv2(data[4:])
662         else:
663             return packet
664
665     def encrypt_ike_msg(self, header, plain, first_payload):
666         if self.sa.ike_crypto == 'AES-GCM-16ICV':
667             data = self.sa.ike_crypto_alg.pad(raw(plain))
668             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
669                 len(ikev2.IKEv2_payload_Encrypted())
670             tlen = plen + len(ikev2.IKEv2())
671
672             # prepare aad data
673             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
674                                                  length=plen)
675             header.length = tlen
676             res = header / sk_p
677             encr = self.sa.encrypt(raw(plain), raw(res))
678             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
679                                                  length=plen, load=encr)
680             res = header / sk_p
681         else:
682             encr = self.sa.encrypt(raw(plain))
683             trunc_len = self.sa.ike_integ_alg.trunc_len
684             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
685             tlen = plen + len(ikev2.IKEv2())
686
687             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
688                                                  length=plen, load=encr)
689             header.length = tlen
690             res = header / sk_p
691
692             integ_data = raw(res)
693             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
694                                              self.sa.my_authkey, integ_data)
695             res = res / Raw(hmac_data[:trunc_len])
696         assert(len(res) == tlen)
697         return res
698
699     def verify_udp_encap(self, ipsec_sa):
700         e = VppEnum.vl_api_ipsec_sad_flags_t
701         if self.sa.udp_encap or self.sa.natt:
702             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
703         else:
704             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
705
706     def verify_ipsec_sas(self, is_rekey=False):
707         sas = self.vapi.ipsec_sa_dump()
708         if is_rekey:
709             # after rekey there is a short period of time in which old
710             # inbound SA is still present
711             sa_count = 3
712         else:
713             sa_count = 2
714         self.assertEqual(len(sas), sa_count)
715         if self.sa.is_initiator:
716             if is_rekey:
717                 sa0 = sas[0].entry
718                 sa1 = sas[2].entry
719             else:
720                 sa0 = sas[0].entry
721                 sa1 = sas[1].entry
722         else:
723             if is_rekey:
724                 sa0 = sas[2].entry
725                 sa1 = sas[0].entry
726             else:
727                 sa1 = sas[0].entry
728                 sa0 = sas[1].entry
729
730         c = self.sa.child_sas[0]
731
732         self.verify_udp_encap(sa0)
733         self.verify_udp_encap(sa1)
734         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
735         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
736         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
737
738         if self.sa.esp_integ is None:
739             vpp_integ_alg = 0
740         else:
741             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
742         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
743         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
744
745         # verify crypto keys
746         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
747         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
748         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
749         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
750
751         # verify integ keys
752         if vpp_integ_alg:
753             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
754             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
755             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
756             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
757         else:
758             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
759             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
760
761     def verify_keymat(self, api_keys, keys, name):
762         km = getattr(keys, name)
763         api_km = getattr(api_keys, name)
764         api_km_len = getattr(api_keys, name + '_len')
765         self.assertEqual(len(km), api_km_len)
766         self.assertEqual(km, api_km[:api_km_len])
767
768     def verify_id(self, api_id, exp_id):
769         self.assertEqual(api_id.type, IDType.value(exp_id.type))
770         self.assertEqual(api_id.data_len, exp_id.data_len)
771         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
772
773     def verify_ike_sas(self):
774         r = self.vapi.ikev2_sa_dump()
775         self.assertEqual(len(r), 1)
776         sa = r[0].sa
777         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
778         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
779         if self.ip6:
780             if self.sa.is_initiator:
781                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
782                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
783             else:
784                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
785                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
786         else:
787             if self.sa.is_initiator:
788                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
789                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
790             else:
791                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
792                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
793         self.verify_keymat(sa.keys, self.sa, 'sk_d')
794         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
795         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
796         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
797         self.verify_keymat(sa.keys, self.sa, 'sk_er')
798         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
799         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
800
801         self.assertEqual(sa.i_id.type, self.sa.id_type)
802         self.assertEqual(sa.r_id.type, self.sa.id_type)
803         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
804         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
805         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
806         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
807
808         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
809         self.assertEqual(len(r), 1)
810         csa = r[0].child_sa
811         self.assertEqual(csa.sa_index, sa.sa_index)
812         c = self.sa.child_sas[0]
813         if hasattr(c, 'sk_ai'):
814             self.verify_keymat(csa.keys, c, 'sk_ai')
815             self.verify_keymat(csa.keys, c, 'sk_ar')
816         self.verify_keymat(csa.keys, c, 'sk_ei')
817         self.verify_keymat(csa.keys, c, 'sk_er')
818         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
819         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
820
821         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
822         tsi = tsi[0]
823         tsr = tsr[0]
824         r = self.vapi.ikev2_traffic_selector_dump(
825                 is_initiator=True, sa_index=sa.sa_index,
826                 child_sa_index=csa.child_sa_index)
827         self.assertEqual(len(r), 1)
828         ts = r[0].ts
829         self.verify_ts(r[0].ts, tsi[0], True)
830
831         r = self.vapi.ikev2_traffic_selector_dump(
832                 is_initiator=False, sa_index=sa.sa_index,
833                 child_sa_index=csa.child_sa_index)
834         self.assertEqual(len(r), 1)
835         self.verify_ts(r[0].ts, tsr[0], False)
836
837         n = self.vapi.ikev2_nonce_get(is_initiator=True,
838                                       sa_index=sa.sa_index)
839         self.verify_nonce(n, self.sa.i_nonce)
840         n = self.vapi.ikev2_nonce_get(is_initiator=False,
841                                       sa_index=sa.sa_index)
842         self.verify_nonce(n, self.sa.r_nonce)
843
844     def verify_nonce(self, api_nonce, nonce):
845         self.assertEqual(api_nonce.data_len, len(nonce))
846         self.assertEqual(api_nonce.nonce, nonce)
847
848     def verify_ts(self, api_ts, ts, is_initiator):
849         if is_initiator:
850             self.assertTrue(api_ts.is_local)
851         else:
852             self.assertFalse(api_ts.is_local)
853
854         if self.p.ts_is_ip4:
855             self.assertEqual(api_ts.start_addr,
856                              IPv4Address(ts.starting_address_v4))
857             self.assertEqual(api_ts.end_addr,
858                              IPv4Address(ts.ending_address_v4))
859         else:
860             self.assertEqual(api_ts.start_addr,
861                              IPv6Address(ts.starting_address_v6))
862             self.assertEqual(api_ts.end_addr,
863                              IPv6Address(ts.ending_address_v6))
864         self.assertEqual(api_ts.start_port, ts.start_port)
865         self.assertEqual(api_ts.end_port, ts.end_port)
866         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
867
868
869 class TemplateInitiator(IkePeer):
870     """ initiator test template """
871
872     def initiate_del_sa_from_initiator(self):
873         ispi = int.from_bytes(self.sa.ispi, 'little')
874         self.pg0.enable_capture()
875         self.pg_start()
876         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
877         capture = self.pg0.get_capture(1)
878         ih = self.get_ike_header(capture[0])
879         self.assertNotIn('Response', ih.flags)
880         self.assertIn('Initiator', ih.flags)
881         self.assertEqual(ih.init_SPI, self.sa.ispi)
882         self.assertEqual(ih.resp_SPI, self.sa.rspi)
883         plain = self.sa.hmac_and_decrypt(ih)
884         d = ikev2.IKEv2_payload_Delete(plain)
885         self.assertEqual(d.proto, 1)  # proto=IKEv2
886         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
887                              flags='Response', exch_type='INFORMATIONAL',
888                              id=ih.id, next_payload='Encrypted')
889         resp = self.encrypt_ike_msg(header, b'', None)
890         self.send_and_assert_no_replies(self.pg0, resp)
891
892     def verify_del_sa(self, packet):
893         ih = self.get_ike_header(packet)
894         self.assertEqual(ih.id, self.sa.msg_id)
895         self.assertEqual(ih.exch_type, 37)  # exchange informational
896         self.assertIn('Response', ih.flags)
897         self.assertIn('Initiator', ih.flags)
898         plain = self.sa.hmac_and_decrypt(ih)
899         self.assertEqual(plain, b'')
900
901     def initiate_del_sa_from_responder(self):
902         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
903                              exch_type='INFORMATIONAL',
904                              id=self.sa.new_msg_id())
905         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
906         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
907         packet = self.create_packet(self.pg0, ike_msg,
908                                     self.sa.sport, self.sa.dport,
909                                     self.sa.natt, self.ip6)
910         self.pg0.add_stream(packet)
911         self.pg0.enable_capture()
912         self.pg_start()
913         capture = self.pg0.get_capture(1)
914         self.verify_del_sa(capture[0])
915
916     @staticmethod
917     def find_notify_payload(packet, notify_type):
918         n = packet[ikev2.IKEv2_payload_Notify]
919         while n is not None:
920             if n.type == notify_type:
921                 return n
922             n = n.payload
923         return None
924
925     def verify_nat_detection(self, packet):
926         if self.ip6:
927             iph = packet[IPv6]
928         else:
929             iph = packet[IP]
930         udp = packet[UDP]
931
932         # NAT_DETECTION_SOURCE_IP
933         s = self.find_notify_payload(packet, 16388)
934         self.assertIsNotNone(s)
935         src_sha = self.sa.compute_nat_sha1(
936                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
937         self.assertEqual(s.load, src_sha)
938
939         # NAT_DETECTION_DESTINATION_IP
940         s = self.find_notify_payload(packet, 16389)
941         self.assertIsNotNone(s)
942         dst_sha = self.sa.compute_nat_sha1(
943                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
944         self.assertEqual(s.load, dst_sha)
945
946     def verify_sa_init_request(self, packet):
947         udp = packet[UDP]
948         self.sa.dport = udp.sport
949         ih = packet[ikev2.IKEv2]
950         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
951         self.assertEqual(ih.exch_type, 34)  # SA_INIT
952         self.sa.ispi = ih.init_SPI
953         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
954         self.assertIn('Initiator', ih.flags)
955         self.assertNotIn('Response', ih.flags)
956         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
957         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
958
959         prop = packet[ikev2.IKEv2_payload_Proposal]
960         self.assertEqual(prop.proto, 1)  # proto = ikev2
961         self.assertEqual(prop.proposal, 1)
962         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
963         self.assertEqual(prop.trans[0].transform_id,
964                          self.p.ike_transforms['crypto_alg'])
965         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
966         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
967         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
968         self.assertEqual(prop.trans[2].transform_id,
969                          self.p.ike_transforms['dh_group'])
970
971         self.verify_nat_detection(packet)
972         self.sa.set_ike_props(
973                     crypto='AES-GCM-16ICV', crypto_key_len=32,
974                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
975         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
976                               integ='SHA2-256-128')
977         self.sa.generate_dh_data()
978         self.sa.complete_dh_data()
979         self.sa.calc_keys()
980
981     def update_esp_transforms(self, trans, sa):
982         while trans:
983             if trans.transform_type == 1:  # ecryption
984                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
985             elif trans.transform_type == 3:  # integrity
986                 sa.esp_integ = INTEG_IDS[trans.transform_id]
987             trans = trans.payload
988
989     def verify_sa_auth_req(self, packet):
990         udp = packet[UDP]
991         self.sa.dport = udp.sport
992         ih = self.get_ike_header(packet)
993         self.assertEqual(ih.resp_SPI, self.sa.rspi)
994         self.assertEqual(ih.init_SPI, self.sa.ispi)
995         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
996         self.assertIn('Initiator', ih.flags)
997         self.assertNotIn('Response', ih.flags)
998
999         udp = packet[UDP]
1000         self.verify_udp(udp)
1001         self.assertEqual(ih.id, self.sa.msg_id + 1)
1002         self.sa.msg_id += 1
1003         plain = self.sa.hmac_and_decrypt(ih)
1004         idi = ikev2.IKEv2_payload_IDi(plain)
1005         idr = ikev2.IKEv2_payload_IDr(idi.payload)
1006         self.assertEqual(idi.load, self.sa.i_id)
1007         self.assertEqual(idr.load, self.sa.r_id)
1008         prop = idi[ikev2.IKEv2_payload_Proposal]
1009         c = self.sa.child_sas[0]
1010         c.ispi = prop.SPI
1011         self.update_esp_transforms(
1012                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1013
1014     def send_init_response(self):
1015         tr_attr = self.sa.ike_crypto_attr()
1016         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1017                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1018                  key_length=tr_attr[0]) /
1019                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1020                  transform_id=self.sa.ike_integ) /
1021                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1022                  transform_id=self.sa.ike_prf_alg.name) /
1023                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1024                  transform_id=self.sa.ike_dh))
1025         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1026                  trans_nb=4, trans=trans))
1027
1028         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1029         if self.sa.natt:
1030             dst_address = b'\x0a\x0a\x0a\x0a'
1031         else:
1032             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1033         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1034         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1035
1036         self.sa.init_resp_packet = (
1037             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1038                         exch_type='IKE_SA_INIT', flags='Response') /
1039             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1040             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1041                                    group=self.sa.ike_dh,
1042                                    load=self.sa.my_dh_pub_key) /
1043             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1044                                       next_payload='Notify') /
1045             ikev2.IKEv2_payload_Notify(
1046                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1047                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1048                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1049
1050         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1051                                      self.sa.sport, self.sa.dport,
1052                                      False, self.ip6)
1053         self.pg_send(self.pg0, ike_msg)
1054         capture = self.pg0.get_capture(1)
1055         self.verify_sa_auth_req(capture[0])
1056
1057     def initiate_sa_init(self):
1058         self.pg0.enable_capture()
1059         self.pg_start()
1060         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1061
1062         capture = self.pg0.get_capture(1)
1063         self.verify_sa_init_request(capture[0])
1064         self.send_init_response()
1065
1066     def send_auth_response(self):
1067         tr_attr = self.sa.esp_crypto_attr()
1068         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1069                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1070                  key_length=tr_attr[0]) /
1071                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1072                  transform_id=self.sa.esp_integ) /
1073                  ikev2.IKEv2_payload_Transform(
1074                  transform_type='Extended Sequence Number',
1075                  transform_id='No ESN') /
1076                  ikev2.IKEv2_payload_Transform(
1077                  transform_type='Extended Sequence Number',
1078                  transform_id='ESN'))
1079
1080         c = self.sa.child_sas[0]
1081         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1082                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1083
1084         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1085         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1086                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1087                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1088                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1089                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1090                  auth_type=AuthMethod.value(self.sa.auth_method),
1091                  load=self.sa.auth_data) /
1092                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1093                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1094                  number_of_TSs=len(tsi),
1095                  traffic_selector=tsi) /
1096                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1097                  number_of_TSs=len(tsr),
1098                  traffic_selector=tsr) /
1099                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1100
1101         header = ikev2.IKEv2(
1102                 init_SPI=self.sa.ispi,
1103                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1104                 flags='Response', exch_type='IKE_AUTH')
1105
1106         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1107         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1108                                     self.sa.dport, self.sa.natt, self.ip6)
1109         self.pg_send(self.pg0, packet)
1110
1111     def test_initiator(self):
1112         self.initiate_sa_init()
1113         self.sa.auth_init()
1114         self.sa.calc_child_keys()
1115         self.send_auth_response()
1116         self.verify_ike_sas()
1117
1118
1119 class TemplateResponder(IkePeer):
1120     """ responder test template """
1121
1122     def initiate_del_sa_from_responder(self):
1123         self.pg0.enable_capture()
1124         self.pg_start()
1125         self.vapi.ikev2_initiate_del_ike_sa(
1126                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1127         capture = self.pg0.get_capture(1)
1128         ih = self.get_ike_header(capture[0])
1129         self.assertNotIn('Response', ih.flags)
1130         self.assertNotIn('Initiator', ih.flags)
1131         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1132         plain = self.sa.hmac_and_decrypt(ih)
1133         d = ikev2.IKEv2_payload_Delete(plain)
1134         self.assertEqual(d.proto, 1)  # proto=IKEv2
1135         self.assertEqual(ih.init_SPI, self.sa.ispi)
1136         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1137         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1138                              flags='Initiator+Response',
1139                              exch_type='INFORMATIONAL',
1140                              id=ih.id, next_payload='Encrypted')
1141         resp = self.encrypt_ike_msg(header, b'', None)
1142         self.send_and_assert_no_replies(self.pg0, resp)
1143
1144     def verify_del_sa(self, packet):
1145         ih = self.get_ike_header(packet)
1146         self.assertEqual(ih.id, self.sa.msg_id)
1147         self.assertEqual(ih.exch_type, 37)  # exchange informational
1148         self.assertIn('Response', ih.flags)
1149         self.assertNotIn('Initiator', ih.flags)
1150         self.assertEqual(ih.next_payload, 46)  # Encrypted
1151         self.assertEqual(ih.init_SPI, self.sa.ispi)
1152         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1153         plain = self.sa.hmac_and_decrypt(ih)
1154         self.assertEqual(plain, b'')
1155
1156     def initiate_del_sa_from_initiator(self):
1157         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1158                              flags='Initiator', exch_type='INFORMATIONAL',
1159                              id=self.sa.new_msg_id())
1160         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1161         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1162         packet = self.create_packet(self.pg0, ike_msg,
1163                                     self.sa.sport, self.sa.dport,
1164                                     self.sa.natt, self.ip6)
1165         self.pg0.add_stream(packet)
1166         self.pg0.enable_capture()
1167         self.pg_start()
1168         capture = self.pg0.get_capture(1)
1169         self.verify_del_sa(capture[0])
1170
1171     def send_sa_init_req(self):
1172         tr_attr = self.sa.ike_crypto_attr()
1173         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1174                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1175                  key_length=tr_attr[0]) /
1176                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1177                  transform_id=self.sa.ike_integ) /
1178                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1179                  transform_id=self.sa.ike_prf_alg.name) /
1180                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1181                  transform_id=self.sa.ike_dh))
1182
1183         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1184                  trans_nb=4, trans=trans))
1185
1186         next_payload = None if self.ip6 else 'Notify'
1187
1188         self.sa.init_req_packet = (
1189                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1190                             flags='Initiator', exch_type='IKE_SA_INIT') /
1191                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1192                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1193                                        group=self.sa.ike_dh,
1194                                        load=self.sa.my_dh_pub_key) /
1195                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1196                                           load=self.sa.i_nonce))
1197
1198         if not self.ip6:
1199             if self.sa.i_natt:
1200                 src_address = b'\x0a\x0a\x0a\x01'
1201             else:
1202                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1203
1204             if self.sa.r_natt:
1205                 dst_address = b'\x0a\x0a\x0a\x0a'
1206             else:
1207                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1208
1209             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1210             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1211             nat_src_detection = ikev2.IKEv2_payload_Notify(
1212                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1213                     next_payload='Notify')
1214             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1215                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1216             self.sa.init_req_packet = (self.sa.init_req_packet /
1217                                        nat_src_detection /
1218                                        nat_dst_detection)
1219
1220         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1221                                      self.sa.sport, self.sa.dport,
1222                                      self.sa.natt, self.ip6)
1223         self.pg0.add_stream(ike_msg)
1224         self.pg0.enable_capture()
1225         self.pg_start()
1226         capture = self.pg0.get_capture(1)
1227         self.verify_sa_init(capture[0])
1228
1229     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1230         tr_attr = self.sa.esp_crypto_attr()
1231         last_payload = last_payload or 'Notify'
1232         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1233                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1234                  key_length=tr_attr[0]) /
1235                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1236                  transform_id=self.sa.esp_integ) /
1237                  ikev2.IKEv2_payload_Transform(
1238                  transform_type='Extended Sequence Number',
1239                  transform_id='No ESN') /
1240                  ikev2.IKEv2_payload_Transform(
1241                  transform_type='Extended Sequence Number',
1242                  transform_id='ESN'))
1243
1244         c = self.sa.child_sas[0]
1245         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1246                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1247
1248         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1249         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1250                  auth_type=AuthMethod.value(self.sa.auth_method),
1251                  load=self.sa.auth_data) /
1252                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1253                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1254                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1255                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1256                  number_of_TSs=len(tsr), traffic_selector=tsr))
1257
1258         if is_rekey:
1259             first_payload = 'Nonce'
1260             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1261                      next_payload='SA') / plain /
1262                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1263                      proto='ESP', SPI=c.ispi))
1264         else:
1265             first_payload = 'IDi'
1266             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1267                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1268                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1269                    IDtype=self.sa.id_type, load=self.sa.r_id))
1270             plain = ids / plain
1271         return plain, first_payload
1272
1273     def send_sa_auth(self):
1274         plain, first_payload = self.generate_auth_payload(
1275                     last_payload='Notify')
1276         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1277         header = ikev2.IKEv2(
1278                 init_SPI=self.sa.ispi,
1279                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1280                 flags='Initiator', exch_type='IKE_AUTH')
1281
1282         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1283         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1284                                     self.sa.dport, self.sa.natt, self.ip6)
1285         self.pg0.add_stream(packet)
1286         self.pg0.enable_capture()
1287         self.pg_start()
1288         capture = self.pg0.get_capture(1)
1289         self.verify_sa_auth_resp(capture[0])
1290
1291     def verify_sa_init(self, packet):
1292         ih = self.get_ike_header(packet)
1293
1294         self.assertEqual(ih.id, self.sa.msg_id)
1295         self.assertEqual(ih.exch_type, 34)
1296         self.assertIn('Response', ih.flags)
1297         self.assertEqual(ih.init_SPI, self.sa.ispi)
1298         self.assertNotEqual(ih.resp_SPI, 0)
1299         self.sa.rspi = ih.resp_SPI
1300         try:
1301             sa = ih[ikev2.IKEv2_payload_SA]
1302             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1303             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1304         except IndexError as e:
1305             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1306             self.logger.error(ih.show())
1307             raise
1308         self.sa.complete_dh_data()
1309         self.sa.calc_keys()
1310         self.sa.auth_init()
1311
1312     def verify_sa_auth_resp(self, packet):
1313         ike = self.get_ike_header(packet)
1314         udp = packet[UDP]
1315         self.verify_udp(udp)
1316         self.assertEqual(ike.id, self.sa.msg_id)
1317         plain = self.sa.hmac_and_decrypt(ike)
1318         idr = ikev2.IKEv2_payload_IDr(plain)
1319         prop = idr[ikev2.IKEv2_payload_Proposal]
1320         self.assertEqual(prop.SPIsize, 4)
1321         self.sa.child_sas[0].rspi = prop.SPI
1322         self.sa.calc_child_keys()
1323
1324     IKE_NODE_SUFFIX = 'ip4'
1325
1326     def verify_counters(self):
1327         self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1328         self.assert_counter(1, 'exchange_sa_req', self.IKE_NODE_SUFFIX)
1329         self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1330
1331     def test_responder(self):
1332         self.send_sa_init_req()
1333         self.send_sa_auth()
1334         self.verify_ipsec_sas()
1335         self.verify_ike_sas()
1336         self.verify_counters()
1337
1338
1339 class Ikev2Params(object):
1340     def config_params(self, params={}):
1341         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1342         ei = VppEnum.vl_api_ipsec_integ_alg_t
1343         self.vpp_enums = {
1344                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1345                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1346                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1347                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1348                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1349                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1350
1351                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1352                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1353                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1354                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1355
1356         dpd_disabled = True if 'dpd_disabled' not in params else\
1357             params['dpd_disabled']
1358         if dpd_disabled:
1359             self.vapi.cli('ikev2 dpd disable')
1360         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1361             not in params else params['del_sa_from_responder']
1362         i_natt = False if 'i_natt' not in params else params['i_natt']
1363         r_natt = False if 'r_natt' not in params else params['r_natt']
1364         self.p = Profile(self, 'pr1')
1365         self.ip6 = False if 'ip6' not in params else params['ip6']
1366
1367         if 'auth' in params and params['auth'] == 'rsa-sig':
1368             auth_method = 'rsa-sig'
1369             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1370             self.vapi.ikev2_set_local_key(
1371                     key_file=work_dir + params['server-key'])
1372
1373             client_file = work_dir + params['client-cert']
1374             server_pem = open(work_dir + params['server-cert']).read()
1375             client_priv = open(work_dir + params['client-key']).read()
1376             client_priv = load_pem_private_key(str.encode(client_priv), None,
1377                                                default_backend())
1378             self.peer_cert = x509.load_pem_x509_certificate(
1379                     str.encode(server_pem),
1380                     default_backend())
1381             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1382             auth_data = None
1383         else:
1384             auth_data = b'$3cr3tpa$$w0rd'
1385             self.p.add_auth(method='shared-key', data=auth_data)
1386             auth_method = 'shared-key'
1387             client_priv = None
1388
1389         is_init = True if 'is_initiator' not in params else\
1390             params['is_initiator']
1391
1392         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1393         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1394         if is_init:
1395             self.p.add_local_id(**idr)
1396             self.p.add_remote_id(**idi)
1397         else:
1398             self.p.add_local_id(**idi)
1399             self.p.add_remote_id(**idr)
1400
1401         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1402             'loc_ts' not in params else params['loc_ts']
1403         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1404             'rem_ts' not in params else params['rem_ts']
1405         self.p.add_local_ts(**loc_ts)
1406         self.p.add_remote_ts(**rem_ts)
1407         if 'responder' in params:
1408             self.p.add_responder(params['responder'])
1409         if 'ike_transforms' in params:
1410             self.p.add_ike_transforms(params['ike_transforms'])
1411         if 'esp_transforms' in params:
1412             self.p.add_esp_transforms(params['esp_transforms'])
1413
1414         udp_encap = False if 'udp_encap' not in params else\
1415             params['udp_encap']
1416         if udp_encap:
1417             self.p.set_udp_encap(True)
1418
1419         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1420                           is_initiator=is_init,
1421                           id_type=self.p.local_id['id_type'],
1422                           i_natt=i_natt, r_natt=r_natt,
1423                           priv_key=client_priv, auth_method=auth_method,
1424                           auth_data=auth_data, udp_encap=udp_encap,
1425                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1426         if is_init:
1427             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1428                 params['ike-crypto']
1429             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1430                 params['ike-integ']
1431             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1432                 params['ike-dh']
1433
1434             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1435                 params['esp-crypto']
1436             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1437                 params['esp-integ']
1438
1439             self.sa.set_ike_props(
1440                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1441                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1442             self.sa.set_esp_props(
1443                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1444                     integ=esp_integ)
1445
1446
1447 class TestApi(VppTestCase):
1448     """ Test IKEV2 API """
1449     @classmethod
1450     def setUpClass(cls):
1451         super(TestApi, cls).setUpClass()
1452
1453     @classmethod
1454     def tearDownClass(cls):
1455         super(TestApi, cls).tearDownClass()
1456
1457     def tearDown(self):
1458         super(TestApi, self).tearDown()
1459         self.p1.remove_vpp_config()
1460         self.p2.remove_vpp_config()
1461         r = self.vapi.ikev2_profile_dump()
1462         self.assertEqual(len(r), 0)
1463
1464     def configure_profile(self, cfg):
1465         p = Profile(self, cfg['name'])
1466         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1467         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1468         p.add_local_ts(**cfg['loc_ts'])
1469         p.add_remote_ts(**cfg['rem_ts'])
1470         p.add_responder(cfg['responder'])
1471         p.add_ike_transforms(cfg['ike_ts'])
1472         p.add_esp_transforms(cfg['esp_ts'])
1473         p.add_auth(**cfg['auth'])
1474         p.set_udp_encap(cfg['udp_encap'])
1475         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1476         if 'lifetime_data' in cfg:
1477             p.set_lifetime_data(cfg['lifetime_data'])
1478         if 'tun_itf' in cfg:
1479             p.set_tunnel_interface(cfg['tun_itf'])
1480         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1481             p.disable_natt()
1482         p.add_vpp_config()
1483         return p
1484
1485     def test_profile_api(self):
1486         """ test profile dump API """
1487         loc_ts4 = {
1488                     'proto': 8,
1489                     'start_port': 1,
1490                     'end_port': 19,
1491                     'start_addr': '3.3.3.2',
1492                     'end_addr': '3.3.3.3',
1493                 }
1494         rem_ts4 = {
1495                     'proto': 9,
1496                     'start_port': 10,
1497                     'end_port': 119,
1498                     'start_addr': '4.5.76.80',
1499                     'end_addr': '2.3.4.6',
1500                 }
1501
1502         loc_ts6 = {
1503                     'proto': 8,
1504                     'start_port': 1,
1505                     'end_port': 19,
1506                     'start_addr': 'ab::1',
1507                     'end_addr': 'ab::4',
1508                 }
1509         rem_ts6 = {
1510                     'proto': 9,
1511                     'start_port': 10,
1512                     'end_port': 119,
1513                     'start_addr': 'cd::12',
1514                     'end_addr': 'cd::13',
1515                 }
1516
1517         conf = {
1518             'p1': {
1519                 'name': 'p1',
1520                 'natt_disabled': True,
1521                 'loc_id': ('fqdn', b'vpp.home'),
1522                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1523                 'loc_ts': loc_ts4,
1524                 'rem_ts': rem_ts4,
1525                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1526                 'ike_ts': {
1527                         'crypto_alg': 20,
1528                         'crypto_key_size': 32,
1529                         'integ_alg': 1,
1530                         'dh_group': 1},
1531                 'esp_ts': {
1532                         'crypto_alg': 13,
1533                         'crypto_key_size': 24,
1534                         'integ_alg': 2},
1535                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1536                 'udp_encap': True,
1537                 'ipsec_over_udp_port': 4501,
1538                 'lifetime_data': {
1539                     'lifetime': 123,
1540                     'lifetime_maxdata': 20192,
1541                     'lifetime_jitter': 9,
1542                     'handover': 132},
1543             },
1544             'p2': {
1545                 'name': 'p2',
1546                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1547                 'rem_id': ('ip6-addr', b'abcd::1'),
1548                 'loc_ts': loc_ts6,
1549                 'rem_ts': rem_ts6,
1550                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1551                 'ike_ts': {
1552                         'crypto_alg': 12,
1553                         'crypto_key_size': 16,
1554                         'integ_alg': 3,
1555                         'dh_group': 3},
1556                 'esp_ts': {
1557                         'crypto_alg': 9,
1558                         'crypto_key_size': 24,
1559                         'integ_alg': 4},
1560                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1561                 'udp_encap': False,
1562                 'ipsec_over_udp_port': 4600,
1563                 'tun_itf': 0}
1564         }
1565         self.p1 = self.configure_profile(conf['p1'])
1566         self.p2 = self.configure_profile(conf['p2'])
1567
1568         r = self.vapi.ikev2_profile_dump()
1569         self.assertEqual(len(r), 2)
1570         self.verify_profile(r[0].profile, conf['p1'])
1571         self.verify_profile(r[1].profile, conf['p2'])
1572
1573     def verify_id(self, api_id, cfg_id):
1574         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1575         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1576
1577     def verify_ts(self, api_ts, cfg_ts):
1578         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1579         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1580         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1581         self.assertEqual(api_ts.start_addr,
1582                          ip_address(text_type(cfg_ts['start_addr'])))
1583         self.assertEqual(api_ts.end_addr,
1584                          ip_address(text_type(cfg_ts['end_addr'])))
1585
1586     def verify_responder(self, api_r, cfg_r):
1587         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1588         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1589
1590     def verify_transforms(self, api_ts, cfg_ts):
1591         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1592         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1593         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1594
1595     def verify_ike_transforms(self, api_ts, cfg_ts):
1596         self.verify_transforms(api_ts, cfg_ts)
1597         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1598
1599     def verify_esp_transforms(self, api_ts, cfg_ts):
1600         self.verify_transforms(api_ts, cfg_ts)
1601
1602     def verify_auth(self, api_auth, cfg_auth):
1603         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1604         self.assertEqual(api_auth.data, cfg_auth['data'])
1605         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1606
1607     def verify_lifetime_data(self, p, ld):
1608         self.assertEqual(p.lifetime, ld['lifetime'])
1609         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1610         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1611         self.assertEqual(p.handover, ld['handover'])
1612
1613     def verify_profile(self, ap, cp):
1614         self.assertEqual(ap.name, cp['name'])
1615         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1616         self.verify_id(ap.loc_id, cp['loc_id'])
1617         self.verify_id(ap.rem_id, cp['rem_id'])
1618         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1619         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1620         self.verify_responder(ap.responder, cp['responder'])
1621         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1622         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1623         self.verify_auth(ap.auth, cp['auth'])
1624         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1625         self.assertTrue(natt_dis == ap.natt_disabled)
1626
1627         if 'lifetime_data' in cp:
1628             self.verify_lifetime_data(ap, cp['lifetime_data'])
1629         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1630         if 'tun_itf' in cp:
1631             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1632         else:
1633             self.assertEqual(ap.tun_itf, 0xffffffff)
1634
1635
1636 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1637     """ test responder - responder behind NAT """
1638
1639     IKE_NODE_SUFFIX = 'ip4-natt'
1640
1641     def config_tc(self):
1642         self.config_params({'r_natt': True})
1643
1644
1645 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1646     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1647
1648     def config_tc(self):
1649         self.config_params({
1650             'i_natt': True,
1651             'is_initiator': False,  # seen from test case perspective
1652                                     # thus vpp is initiator
1653             'responder': {'sw_if_index': self.pg0.sw_if_index,
1654                            'addr': self.pg0.remote_ip4},
1655             'ike-crypto': ('AES-GCM-16ICV', 32),
1656             'ike-integ': 'NULL',
1657             'ike-dh': '3072MODPgr',
1658             'ike_transforms': {
1659                 'crypto_alg': 20,  # "aes-gcm-16"
1660                 'crypto_key_size': 256,
1661                 'dh_group': 15,  # "modp-3072"
1662             },
1663             'esp_transforms': {
1664                 'crypto_alg': 12,  # "aes-cbc"
1665                 'crypto_key_size': 256,
1666                 # "hmac-sha2-256-128"
1667                 'integ_alg': 12}})
1668
1669
1670 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1671     """ test ikev2 initiator - pre shared key auth """
1672
1673     def config_tc(self):
1674         self.config_params({
1675             'is_initiator': False,  # seen from test case perspective
1676                                     # thus vpp is initiator
1677             'responder': {'sw_if_index': self.pg0.sw_if_index,
1678                            'addr': self.pg0.remote_ip4},
1679             'ike-crypto': ('AES-GCM-16ICV', 32),
1680             'ike-integ': 'NULL',
1681             'ike-dh': '3072MODPgr',
1682             'ike_transforms': {
1683                 'crypto_alg': 20,  # "aes-gcm-16"
1684                 'crypto_key_size': 256,
1685                 'dh_group': 15,  # "modp-3072"
1686             },
1687             'esp_transforms': {
1688                 'crypto_alg': 12,  # "aes-cbc"
1689                 'crypto_key_size': 256,
1690                 # "hmac-sha2-256-128"
1691                 'integ_alg': 12}})
1692
1693
1694 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1695     """ test initiator - request window size (1) """
1696
1697     def rekey_respond(self, req, update_child_sa_data):
1698         ih = self.get_ike_header(req)
1699         plain = self.sa.hmac_and_decrypt(ih)
1700         sa = ikev2.IKEv2_payload_SA(plain)
1701         if update_child_sa_data:
1702             prop = sa[ikev2.IKEv2_payload_Proposal]
1703             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1704             self.sa.r_nonce = self.sa.i_nonce
1705             self.sa.child_sas[0].ispi = prop.SPI
1706             self.sa.child_sas[0].rspi = prop.SPI
1707             self.sa.calc_child_keys()
1708
1709         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1710                              flags='Response', exch_type=36,
1711                              id=ih.id, next_payload='Encrypted')
1712         resp = self.encrypt_ike_msg(header, sa, 'SA')
1713         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1714                                     self.sa.dport, self.sa.natt, self.ip6)
1715         self.send_and_assert_no_replies(self.pg0, packet)
1716
1717     def test_initiator(self):
1718         super(TestInitiatorRequestWindowSize, self).test_initiator()
1719         self.pg0.enable_capture()
1720         self.pg_start()
1721         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1722         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1723         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1724         capture = self.pg0.get_capture(2)
1725
1726         # reply in reverse order
1727         self.rekey_respond(capture[1], True)
1728         self.rekey_respond(capture[0], False)
1729
1730         # verify that only the second request was accepted
1731         self.verify_ike_sas()
1732         self.verify_ipsec_sas(is_rekey=True)
1733
1734
1735 class TestInitiatorRekey(TestInitiatorPsk):
1736     """ test ikev2 initiator - rekey """
1737
1738     def rekey_from_initiator(self):
1739         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1740         self.pg0.enable_capture()
1741         self.pg_start()
1742         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1743         capture = self.pg0.get_capture(1)
1744         ih = self.get_ike_header(capture[0])
1745         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1746         self.assertNotIn('Response', ih.flags)
1747         self.assertIn('Initiator', ih.flags)
1748         plain = self.sa.hmac_and_decrypt(ih)
1749         sa = ikev2.IKEv2_payload_SA(plain)
1750         prop = sa[ikev2.IKEv2_payload_Proposal]
1751         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1752         self.sa.r_nonce = self.sa.i_nonce
1753         # update new responder SPI
1754         self.sa.child_sas[0].ispi = prop.SPI
1755         self.sa.child_sas[0].rspi = prop.SPI
1756         self.sa.calc_child_keys()
1757         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1758                              flags='Response', exch_type=36,
1759                              id=ih.id, next_payload='Encrypted')
1760         resp = self.encrypt_ike_msg(header, sa, 'SA')
1761         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1762                                     self.sa.dport, self.sa.natt, self.ip6)
1763         self.send_and_assert_no_replies(self.pg0, packet)
1764
1765     def test_initiator(self):
1766         super(TestInitiatorRekey, self).test_initiator()
1767         self.rekey_from_initiator()
1768         self.verify_ike_sas()
1769         self.verify_ipsec_sas(is_rekey=True)
1770
1771
1772 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1773     """ test ikev2 initiator - delete IKE SA from responder """
1774
1775     def config_tc(self):
1776         self.config_params({
1777             'del_sa_from_responder': True,
1778             'is_initiator': False,  # seen from test case perspective
1779                                     # thus vpp is initiator
1780             'responder': {'sw_if_index': self.pg0.sw_if_index,
1781                            'addr': self.pg0.remote_ip4},
1782             'ike-crypto': ('AES-GCM-16ICV', 32),
1783             'ike-integ': 'NULL',
1784             'ike-dh': '3072MODPgr',
1785             'ike_transforms': {
1786                 'crypto_alg': 20,  # "aes-gcm-16"
1787                 'crypto_key_size': 256,
1788                 'dh_group': 15,  # "modp-3072"
1789             },
1790             'esp_transforms': {
1791                 'crypto_alg': 12,  # "aes-cbc"
1792                 'crypto_key_size': 256,
1793                 # "hmac-sha2-256-128"
1794                 'integ_alg': 12}})
1795
1796
1797 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1798     """ test ikev2 responder - initiator behind NAT """
1799
1800     IKE_NODE_SUFFIX = 'ip4-natt'
1801
1802     def config_tc(self):
1803         self.config_params(
1804                 {'i_natt': True})
1805
1806
1807 class TestResponderPsk(TemplateResponder, Ikev2Params):
1808     """ test ikev2 responder - pre shared key auth """
1809     def config_tc(self):
1810         self.config_params()
1811
1812
1813 class TestResponderDpd(TestResponderPsk):
1814     """
1815     Dead peer detection test
1816     """
1817     def config_tc(self):
1818         self.config_params({'dpd_disabled': False})
1819
1820     def tearDown(self):
1821         pass
1822
1823     def test_responder(self):
1824         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1825         super(TestResponderDpd, self).test_responder()
1826         self.pg0.enable_capture()
1827         self.pg_start()
1828         # capture empty request but don't reply
1829         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1830         ih = self.get_ike_header(capture[0])
1831         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1832         plain = self.sa.hmac_and_decrypt(ih)
1833         self.assertEqual(plain, b'')
1834         # wait for SA expiration
1835         time.sleep(3)
1836         ike_sas = self.vapi.ikev2_sa_dump()
1837         self.assertEqual(len(ike_sas), 0)
1838         ipsec_sas = self.vapi.ipsec_sa_dump()
1839         self.assertEqual(len(ipsec_sas), 0)
1840
1841
1842 class TestResponderRekey(TestResponderPsk):
1843     """ test ikev2 responder - rekey """
1844
1845     def rekey_from_initiator(self):
1846         packet = self.create_rekey_request()
1847         self.pg0.add_stream(packet)
1848         self.pg0.enable_capture()
1849         self.pg_start()
1850         capture = self.pg0.get_capture(1)
1851         ih = self.get_ike_header(capture[0])
1852         plain = self.sa.hmac_and_decrypt(ih)
1853         sa = ikev2.IKEv2_payload_SA(plain)
1854         prop = sa[ikev2.IKEv2_payload_Proposal]
1855         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1856         # update new responder SPI
1857         self.sa.child_sas[0].rspi = prop.SPI
1858
1859     def test_responder(self):
1860         super(TestResponderRekey, self).test_responder()
1861         self.rekey_from_initiator()
1862         self.sa.calc_child_keys()
1863         self.verify_ike_sas()
1864         self.verify_ipsec_sas(is_rekey=True)
1865
1866
1867 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1868     """ test ikev2 responder - cert based auth """
1869     def config_tc(self):
1870         self.config_params({
1871             'udp_encap': True,
1872             'auth': 'rsa-sig',
1873             'server-key': 'server-key.pem',
1874             'client-key': 'client-key.pem',
1875             'client-cert': 'client-cert.pem',
1876             'server-cert': 'server-cert.pem'})
1877
1878
1879 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1880         (TemplateResponder, Ikev2Params):
1881     """
1882     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1883     """
1884     def config_tc(self):
1885         self.config_params({
1886             'ike-crypto': ('AES-CBC', 16),
1887             'ike-integ': 'SHA2-256-128',
1888             'esp-crypto': ('AES-CBC', 24),
1889             'esp-integ': 'SHA2-384-192',
1890             'ike-dh': '2048MODPgr'})
1891
1892
1893 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1894         (TemplateResponder, Ikev2Params):
1895
1896     """
1897     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1898     """
1899     def config_tc(self):
1900         self.config_params({
1901             'ike-crypto': ('AES-CBC', 32),
1902             'ike-integ': 'SHA2-256-128',
1903             'esp-crypto': ('AES-GCM-16ICV', 32),
1904             'esp-integ': 'NULL',
1905             'ike-dh': '3072MODPgr'})
1906
1907
1908 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1909     """
1910     IKE:AES_GCM_16_256
1911     """
1912
1913     IKE_NODE_SUFFIX = 'ip6'
1914
1915     def config_tc(self):
1916         self.config_params({
1917             'del_sa_from_responder': True,
1918             'ip6': True,
1919             'natt': True,
1920             'ike-crypto': ('AES-GCM-16ICV', 32),
1921             'ike-integ': 'NULL',
1922             'ike-dh': '2048MODPgr',
1923             'loc_ts': {'start_addr': 'ab:cd::0',
1924                        'end_addr': 'ab:cd::10'},
1925             'rem_ts': {'start_addr': '11::0',
1926                        'end_addr': '11::100'}})
1927
1928
1929 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1930     """
1931     Test for keep alive messages
1932     """
1933
1934     def send_empty_req_from_responder(self):
1935         packet = self.create_empty_request()
1936         self.pg0.add_stream(packet)
1937         self.pg0.enable_capture()
1938         self.pg_start()
1939         capture = self.pg0.get_capture(1)
1940         ih = self.get_ike_header(capture[0])
1941         self.assertEqual(ih.id, self.sa.msg_id)
1942         plain = self.sa.hmac_and_decrypt(ih)
1943         self.assertEqual(plain, b'')
1944         self.assert_counter(1, 'keepalive', 'ip4')
1945
1946     def test_initiator(self):
1947         super(TestInitiatorKeepaliveMsg, self).test_initiator()
1948         self.send_empty_req_from_responder()
1949
1950
1951 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1952     """ malformed packet test """
1953
1954     def tearDown(self):
1955         pass
1956
1957     def config_tc(self):
1958         self.config_params()
1959
1960     def create_ike_init_msg(self, length=None, payload=None):
1961         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1962                           flags='Initiator', exch_type='IKE_SA_INIT')
1963         if payload is not None:
1964             msg /= payload
1965         return self.create_packet(self.pg0, msg, self.sa.sport,
1966                                   self.sa.dport)
1967
1968     def verify_bad_packet_length(self):
1969         ike_msg = self.create_ike_init_msg(length=0xdead)
1970         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1971         self.assert_counter(self.pkt_count, 'bad_length')
1972
1973     def verify_bad_sa_payload_length(self):
1974         p = ikev2.IKEv2_payload_SA(length=0xdead)
1975         ike_msg = self.create_ike_init_msg(payload=p)
1976         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1977         self.assert_counter(self.pkt_count, 'malformed_packet')
1978
1979     def test_responder(self):
1980         self.pkt_count = 254
1981         self.verify_bad_packet_length()
1982         self.verify_bad_sa_payload_length()
1983
1984
1985 if __name__ == '__main__':
1986     unittest.main(testRunner=VppTestRunner)