tests: py2 cleanup - remove subclassing of object
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import VppTestCase, VppTestRunner
22 from vpp_ikev2 import Profile, IDType, AuthMethod
23 from vpp_papi import VppEnum
24
25 try:
26     text_type = unicode
27 except NameError:
28     text_type = str
29
30 KEY_PAD = b"Key Pad for IKEv2"
31 SALT_SIZE = 4
32 GCM_ICV_SIZE = 16
33 GCM_IV_SIZE = 8
34
35
36 # defined in rfc3526
37 # tuple structure is (p, g, key_len)
38 DH = {
39     '2048MODPgr': (long_converter("""
40     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
41     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
42     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
43     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
44     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
45     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
46     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
47     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
48     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
49     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
50     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
51
52     '3072MODPgr': (long_converter("""
53     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
54     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
55     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
56     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
57     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
58     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
59     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
60     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
61     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
62     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
63     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
64     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
65     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
66     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
67     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
68     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
69 }
70
71
72 class CryptoAlgo(object):
73     def __init__(self, name, cipher, mode):
74         self.name = name
75         self.cipher = cipher
76         self.mode = mode
77         if self.cipher is not None:
78             self.bs = self.cipher.block_size // 8
79
80             if self.name == 'AES-GCM-16ICV':
81                 self.iv_len = GCM_IV_SIZE
82             else:
83                 self.iv_len = self.bs
84
85     def encrypt(self, data, key, aad=None):
86         iv = os.urandom(self.iv_len)
87         if aad is None:
88             encryptor = Cipher(self.cipher(key), self.mode(iv),
89                                default_backend()).encryptor()
90             return iv + encryptor.update(data) + encryptor.finalize()
91         else:
92             salt = key[-SALT_SIZE:]
93             nonce = salt + iv
94             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
95                                default_backend()).encryptor()
96             encryptor.authenticate_additional_data(aad)
97             data = encryptor.update(data) + encryptor.finalize()
98             data += encryptor.tag[:GCM_ICV_SIZE]
99             return iv + data
100
101     def decrypt(self, data, key, aad=None, icv=None):
102         if aad is None:
103             iv = data[:self.iv_len]
104             ct = data[self.iv_len:]
105             decryptor = Cipher(algorithms.AES(key),
106                                self.mode(iv),
107                                default_backend()).decryptor()
108             return decryptor.update(ct) + decryptor.finalize()
109         else:
110             salt = key[-SALT_SIZE:]
111             nonce = salt + data[:GCM_IV_SIZE]
112             ct = data[GCM_IV_SIZE:]
113             key = key[:-SALT_SIZE]
114             decryptor = Cipher(algorithms.AES(key),
115                                self.mode(nonce, icv, len(icv)),
116                                default_backend()).decryptor()
117             decryptor.authenticate_additional_data(aad)
118             return decryptor.update(ct) + decryptor.finalize()
119
120     def pad(self, data):
121         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122         data = data + b'\x00' * (pad_len - 1)
123         return data + bytes([pad_len - 1])
124
125
126 class AuthAlgo(object):
127     def __init__(self, name, mac, mod, key_len, trunc_len=None):
128         self.name = name
129         self.mac = mac
130         self.mod = mod
131         self.key_len = key_len
132         self.trunc_len = trunc_len or key_len
133
134
135 CRYPTO_ALGOS = {
136     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
139                                 mode=modes.GCM),
140 }
141
142 AUTH_ALGOS = {
143     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
148 }
149
150 PRF_ALGOS = {
151     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
153                                   hashes.SHA256, 32),
154 }
155
156 CRYPTO_IDS = {
157     12: 'AES-CBC',
158     20: 'AES-GCM-16ICV',
159 }
160
161 INTEG_IDS = {
162     2: 'HMAC-SHA1-96',
163     12: 'SHA2-256-128',
164     13: 'SHA2-384-192',
165     14: 'SHA2-512-256',
166 }
167
168
169 class IKEv2ChildSA(object):
170     def __init__(self, local_ts, remote_ts, is_initiator):
171         spi = os.urandom(4)
172         if is_initiator:
173             self.ispi = spi
174             self.rspi = None
175         else:
176             self.rspi = spi
177             self.ispi = None
178         self.local_ts = local_ts
179         self.remote_ts = remote_ts
180
181
182 class IKEv2SA(object):
183     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
184                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
185                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
186                  auth_method='shared-key', priv_key=None, natt=False,
187                  udp_encap=False):
188         self.udp_encap = udp_encap
189         self.natt = natt
190         if natt:
191             self.sport = 4500
192             self.dport = 4500
193         else:
194             self.sport = 500
195             self.dport = 500
196         self.msg_id = 0
197         self.dh_params = None
198         self.test = test
199         self.priv_key = priv_key
200         self.is_initiator = is_initiator
201         nonce = nonce or os.urandom(32)
202         self.auth_data = auth_data
203         self.i_id = i_id
204         self.r_id = r_id
205         if isinstance(id_type, str):
206             self.id_type = IDType.value(id_type)
207         else:
208             self.id_type = id_type
209         self.auth_method = auth_method
210         if self.is_initiator:
211             self.rspi = 8 * b'\x00'
212             self.ispi = spi
213             self.i_nonce = nonce
214         else:
215             self.rspi = spi
216             self.ispi = 8 * b'\x00'
217             self.r_nonce = nonce
218         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
219                           self.is_initiator)]
220
221     def new_msg_id(self):
222         self.msg_id += 1
223         return self.msg_id
224
225     @property
226     def my_dh_pub_key(self):
227         if self.is_initiator:
228             return self.i_dh_data
229         return self.r_dh_data
230
231     @property
232     def peer_dh_pub_key(self):
233         if self.is_initiator:
234             return self.r_dh_data
235         return self.i_dh_data
236
237     def compute_secret(self):
238         priv = self.dh_private_key
239         peer = self.peer_dh_pub_key
240         p, g, l = self.ike_group
241         return pow(int.from_bytes(peer, 'big'),
242                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
243
244     def generate_dh_data(self):
245         # generate DH keys
246         if self.ike_dh not in DH:
247             raise NotImplementedError('%s not in DH group' % self.ike_dh)
248
249         if self.dh_params is None:
250             dhg = DH[self.ike_dh]
251             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
252             self.dh_params = pn.parameters(default_backend())
253
254         priv = self.dh_params.generate_private_key()
255         pub = priv.public_key()
256         x = priv.private_numbers().x
257         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
258         y = pub.public_numbers().y
259
260         if self.is_initiator:
261             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
262         else:
263             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
264
265     def complete_dh_data(self):
266         self.dh_shared_secret = self.compute_secret()
267
268     def calc_child_keys(self):
269         prf = self.ike_prf_alg.mod()
270         s = self.i_nonce + self.r_nonce
271         c = self.child_sas[0]
272
273         encr_key_len = self.esp_crypto_key_len
274         integ_key_len = self.esp_integ_alg.key_len
275         salt_len = 0 if integ_key_len else 4
276
277         l = (integ_key_len * 2 +
278              encr_key_len * 2 +
279              salt_len * 2)
280         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
281
282         pos = 0
283         c.sk_ei = keymat[pos:pos+encr_key_len]
284         pos += encr_key_len
285
286         if integ_key_len:
287             c.sk_ai = keymat[pos:pos+integ_key_len]
288             pos += integ_key_len
289         else:
290             c.salt_ei = keymat[pos:pos+salt_len]
291             pos += salt_len
292
293         c.sk_er = keymat[pos:pos+encr_key_len]
294         pos += encr_key_len
295
296         if integ_key_len:
297             c.sk_ar = keymat[pos:pos+integ_key_len]
298             pos += integ_key_len
299         else:
300             c.salt_er = keymat[pos:pos+salt_len]
301             pos += salt_len
302
303     def calc_prfplus(self, prf, key, seed, length):
304         r = b''
305         t = None
306         x = 1
307         while len(r) < length and x < 255:
308             if t is not None:
309                 s = t
310             else:
311                 s = b''
312             s = s + seed + bytes([x])
313             t = self.calc_prf(prf, key, s)
314             r = r + t
315             x = x + 1
316
317         if x == 255:
318             return None
319         return r
320
321     def calc_prf(self, prf, key, data):
322         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
323         h.update(data)
324         return h.finalize()
325
326     def calc_keys(self):
327         prf = self.ike_prf_alg.mod()
328         # SKEYSEED = prf(Ni | Nr, g^ir)
329         s = self.i_nonce + self.r_nonce
330         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
331
332         # calculate S = Ni | Nr | SPIi SPIr
333         s = s + self.ispi + self.rspi
334
335         prf_key_trunc = self.ike_prf_alg.trunc_len
336         encr_key_len = self.ike_crypto_key_len
337         tr_prf_key_len = self.ike_prf_alg.key_len
338         integ_key_len = self.ike_integ_alg.key_len
339         if integ_key_len == 0:
340             salt_size = 4
341         else:
342             salt_size = 0
343
344         l = (prf_key_trunc +
345              integ_key_len * 2 +
346              encr_key_len * 2 +
347              tr_prf_key_len * 2 +
348              salt_size * 2)
349         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
350
351         pos = 0
352         self.sk_d = keymat[:pos+prf_key_trunc]
353         pos += prf_key_trunc
354
355         self.sk_ai = keymat[pos:pos+integ_key_len]
356         pos += integ_key_len
357         self.sk_ar = keymat[pos:pos+integ_key_len]
358         pos += integ_key_len
359
360         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
361         pos += encr_key_len + salt_size
362         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
363         pos += encr_key_len + salt_size
364
365         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
366         pos += tr_prf_key_len
367         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
368
369     def generate_authmsg(self, prf, packet):
370         if self.is_initiator:
371             id = self.i_id
372             nonce = self.r_nonce
373             key = self.sk_pi
374         else:
375             id = self.r_id
376             nonce = self.i_nonce
377             key = self.sk_pr
378         data = bytes([self.id_type, 0, 0, 0]) + id
379         id_hash = self.calc_prf(prf, key, data)
380         return packet + nonce + id_hash
381
382     def auth_init(self):
383         prf = self.ike_prf_alg.mod()
384         if self.is_initiator:
385             packet = self.init_req_packet
386         else:
387             packet = self.init_resp_packet
388         authmsg = self.generate_authmsg(prf, raw(packet))
389         if self.auth_method == 'shared-key':
390             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
391             self.auth_data = self.calc_prf(prf, psk, authmsg)
392         elif self.auth_method == 'rsa-sig':
393             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
394                                                 hashes.SHA1())
395         else:
396             raise TypeError('unknown auth method type!')
397
398     def encrypt(self, data, aad=None):
399         data = self.ike_crypto_alg.pad(data)
400         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
401
402     @property
403     def peer_authkey(self):
404         if self.is_initiator:
405             return self.sk_ar
406         return self.sk_ai
407
408     @property
409     def my_authkey(self):
410         if self.is_initiator:
411             return self.sk_ai
412         return self.sk_ar
413
414     @property
415     def my_cryptokey(self):
416         if self.is_initiator:
417             return self.sk_ei
418         return self.sk_er
419
420     @property
421     def peer_cryptokey(self):
422         if self.is_initiator:
423             return self.sk_er
424         return self.sk_ei
425
426     def concat(self, alg, key_len):
427         return alg + '-' + str(key_len * 8)
428
429     @property
430     def vpp_ike_cypto_alg(self):
431         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
432
433     @property
434     def vpp_esp_cypto_alg(self):
435         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
436
437     def verify_hmac(self, ikemsg):
438         integ_trunc = self.ike_integ_alg.trunc_len
439         exp_hmac = ikemsg[-integ_trunc:]
440         data = ikemsg[:-integ_trunc]
441         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
442                                           self.peer_authkey, data)
443         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
444
445     def compute_hmac(self, integ, key, data):
446         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
447         h.update(data)
448         return h.finalize()
449
450     def decrypt(self, data, aad=None, icv=None):
451         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
452
453     def hmac_and_decrypt(self, ike):
454         ep = ike[ikev2.IKEv2_payload_Encrypted]
455         if self.ike_crypto == 'AES-GCM-16ICV':
456             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
457             ct = ep.load[:-GCM_ICV_SIZE]
458             tag = ep.load[-GCM_ICV_SIZE:]
459             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
460         else:
461             self.verify_hmac(raw(ike))
462             integ_trunc = self.ike_integ_alg.trunc_len
463
464             # remove ICV and decrypt payload
465             ct = ep.load[:-integ_trunc]
466             plain = self.decrypt(ct)
467         # remove padding
468         pad_len = plain[-1]
469         return plain[:-pad_len - 1]
470
471     def build_ts_addr(self, ts, version):
472         return {'starting_address_v' + version: ts['start_addr'],
473                 'ending_address_v' + version: ts['end_addr']}
474
475     def generate_ts(self, is_ip4):
476         c = self.child_sas[0]
477         ts_data = {'IP_protocol_ID': 0,
478                    'start_port': 0,
479                    'end_port': 0xffff}
480         if is_ip4:
481             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
482             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
483             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
484             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
485         else:
486             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
487             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
488             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
489             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
490
491         if self.is_initiator:
492             return ([ts1], [ts2])
493         return ([ts2], [ts1])
494
495     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
496         if crypto not in CRYPTO_ALGOS:
497             raise TypeError('unsupported encryption algo %r' % crypto)
498         self.ike_crypto = crypto
499         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
500         self.ike_crypto_key_len = crypto_key_len
501
502         if integ not in AUTH_ALGOS:
503             raise TypeError('unsupported auth algo %r' % integ)
504         self.ike_integ = None if integ == 'NULL' else integ
505         self.ike_integ_alg = AUTH_ALGOS[integ]
506
507         if prf not in PRF_ALGOS:
508             raise TypeError('unsupported prf algo %r' % prf)
509         self.ike_prf = prf
510         self.ike_prf_alg = PRF_ALGOS[prf]
511         self.ike_dh = dh
512         self.ike_group = DH[self.ike_dh]
513
514     def set_esp_props(self, crypto, crypto_key_len, integ):
515         self.esp_crypto_key_len = crypto_key_len
516         if crypto not in CRYPTO_ALGOS:
517             raise TypeError('unsupported encryption algo %r' % crypto)
518         self.esp_crypto = crypto
519         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
520
521         if integ not in AUTH_ALGOS:
522             raise TypeError('unsupported auth algo %r' % integ)
523         self.esp_integ = None if integ == 'NULL' else integ
524         self.esp_integ_alg = AUTH_ALGOS[integ]
525
526     def crypto_attr(self, key_len):
527         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
528             return (0x800e << 16 | key_len << 3, 12)
529         else:
530             raise Exception('unsupported attribute type')
531
532     def ike_crypto_attr(self):
533         return self.crypto_attr(self.ike_crypto_key_len)
534
535     def esp_crypto_attr(self):
536         return self.crypto_attr(self.esp_crypto_key_len)
537
538     def compute_nat_sha1(self, ip, port, rspi=None):
539         if rspi is None:
540             rspi = self.rspi
541         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
542         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
543         digest.update(data)
544         return digest.finalize()
545
546
547 class IkePeer(VppTestCase):
548     """ common class for initiator and responder """
549
550     @classmethod
551     def setUpClass(cls):
552         import scapy.contrib.ikev2 as _ikev2
553         globals()['ikev2'] = _ikev2
554         super(IkePeer, cls).setUpClass()
555         cls.create_pg_interfaces(range(2))
556         for i in cls.pg_interfaces:
557             i.admin_up()
558             i.config_ip4()
559             i.resolve_arp()
560             i.config_ip6()
561             i.resolve_ndp()
562
563     @classmethod
564     def tearDownClass(cls):
565         super(IkePeer, cls).tearDownClass()
566
567     def tearDown(self):
568         super(IkePeer, self).tearDown()
569         if self.del_sa_from_responder:
570             self.initiate_del_sa_from_responder()
571         else:
572             self.initiate_del_sa_from_initiator()
573         r = self.vapi.ikev2_sa_dump()
574         self.assertEqual(len(r), 0)
575         sas = self.vapi.ipsec_sa_dump()
576         self.assertEqual(len(sas), 0)
577         self.p.remove_vpp_config()
578         self.assertIsNone(self.p.query_vpp_config())
579
580     def setUp(self):
581         super(IkePeer, self).setUp()
582         self.config_tc()
583         self.p.add_vpp_config()
584         self.assertIsNotNone(self.p.query_vpp_config())
585         if self.sa.is_initiator:
586             self.sa.generate_dh_data()
587         self.vapi.cli('ikev2 set logging level 4')
588         self.vapi.cli('event-lo clear')
589
590     def create_rekey_request(self):
591         sa, first_payload = self.generate_auth_payload(is_rekey=True)
592         header = ikev2.IKEv2(
593                 init_SPI=self.sa.ispi,
594                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
595                 flags='Initiator', exch_type='CREATE_CHILD_SA')
596
597         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
598         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
599                                   self.sa.dport, self.sa.natt, self.ip6)
600
601     def create_empty_request(self):
602         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
603                              id=self.sa.new_msg_id(), flags='Initiator',
604                              exch_type='INFORMATIONAL',
605                              next_payload='Encrypted')
606
607         msg = self.encrypt_ike_msg(header, b'', None)
608         return self.create_packet(self.pg0, msg, self.sa.sport,
609                                   self.sa.dport, self.sa.natt, self.ip6)
610
611     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
612                       use_ip6=False):
613         if use_ip6:
614             src_ip = src_if.remote_ip6
615             dst_ip = src_if.local_ip6
616             ip_layer = IPv6
617         else:
618             src_ip = src_if.remote_ip4
619             dst_ip = src_if.local_ip4
620             ip_layer = IP
621         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
622                ip_layer(src=src_ip, dst=dst_ip) /
623                UDP(sport=sport, dport=dport))
624         if natt:
625             # insert non ESP marker
626             res = res / Raw(b'\x00' * 4)
627         return res / msg
628
629     def verify_udp(self, udp):
630         self.assertEqual(udp.sport, self.sa.sport)
631         self.assertEqual(udp.dport, self.sa.dport)
632
633     def get_ike_header(self, packet):
634         try:
635             ih = packet[ikev2.IKEv2]
636             ih = self.verify_and_remove_non_esp_marker(ih)
637         except IndexError as e:
638             # this is a workaround for getting IKEv2 layer as both ikev2 and
639             # ipsec register for port 4500
640             esp = packet[ESP]
641             ih = self.verify_and_remove_non_esp_marker(esp)
642         self.assertEqual(ih.version, 0x20)
643         self.assertNotIn('Version', ih.flags)
644         return ih
645
646     def verify_and_remove_non_esp_marker(self, packet):
647         if self.sa.natt:
648             # if we are in nat traversal mode check for non esp marker
649             # and remove it
650             data = raw(packet)
651             self.assertEqual(data[:4], b'\x00' * 4)
652             return ikev2.IKEv2(data[4:])
653         else:
654             return packet
655
656     def encrypt_ike_msg(self, header, plain, first_payload):
657         if self.sa.ike_crypto == 'AES-GCM-16ICV':
658             data = self.sa.ike_crypto_alg.pad(raw(plain))
659             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
660                 len(ikev2.IKEv2_payload_Encrypted())
661             tlen = plen + len(ikev2.IKEv2())
662
663             # prepare aad data
664             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
665                                                  length=plen)
666             header.length = tlen
667             res = header / sk_p
668             encr = self.sa.encrypt(raw(plain), raw(res))
669             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
670                                                  length=plen, load=encr)
671             res = header / sk_p
672         else:
673             encr = self.sa.encrypt(raw(plain))
674             trunc_len = self.sa.ike_integ_alg.trunc_len
675             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
676             tlen = plen + len(ikev2.IKEv2())
677
678             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
679                                                  length=plen, load=encr)
680             header.length = tlen
681             res = header / sk_p
682
683             integ_data = raw(res)
684             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
685                                              self.sa.my_authkey, integ_data)
686             res = res / Raw(hmac_data[:trunc_len])
687         assert(len(res) == tlen)
688         return res
689
690     def verify_udp_encap(self, ipsec_sa):
691         e = VppEnum.vl_api_ipsec_sad_flags_t
692         if self.sa.udp_encap or self.sa.natt:
693             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
694         else:
695             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
696
697     def verify_ipsec_sas(self, is_rekey=False):
698         sas = self.vapi.ipsec_sa_dump()
699         if is_rekey:
700             # after rekey there is a short period of time in which old
701             # inbound SA is still present
702             sa_count = 3
703         else:
704             sa_count = 2
705         self.assertEqual(len(sas), sa_count)
706         if self.sa.is_initiator:
707             if is_rekey:
708                 sa0 = sas[0].entry
709                 sa1 = sas[2].entry
710             else:
711                 sa0 = sas[0].entry
712                 sa1 = sas[1].entry
713         else:
714             if is_rekey:
715                 sa0 = sas[2].entry
716                 sa1 = sas[0].entry
717             else:
718                 sa1 = sas[0].entry
719                 sa0 = sas[1].entry
720
721         c = self.sa.child_sas[0]
722
723         self.verify_udp_encap(sa0)
724         self.verify_udp_encap(sa1)
725         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
726         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
727         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
728
729         if self.sa.esp_integ is None:
730             vpp_integ_alg = 0
731         else:
732             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
733         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
734         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
735
736         # verify crypto keys
737         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
738         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
739         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
740         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
741
742         # verify integ keys
743         if vpp_integ_alg:
744             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
745             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
746             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
747             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
748         else:
749             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
750             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
751
752     def verify_keymat(self, api_keys, keys, name):
753         km = getattr(keys, name)
754         api_km = getattr(api_keys, name)
755         api_km_len = getattr(api_keys, name + '_len')
756         self.assertEqual(len(km), api_km_len)
757         self.assertEqual(km, api_km[:api_km_len])
758
759     def verify_id(self, api_id, exp_id):
760         self.assertEqual(api_id.type, IDType.value(exp_id.type))
761         self.assertEqual(api_id.data_len, exp_id.data_len)
762         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
763
764     def verify_ike_sas(self):
765         r = self.vapi.ikev2_sa_dump()
766         self.assertEqual(len(r), 1)
767         sa = r[0].sa
768         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
769         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
770         if self.ip6:
771             if self.sa.is_initiator:
772                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
773                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
774             else:
775                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
776                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
777         else:
778             if self.sa.is_initiator:
779                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
780                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
781             else:
782                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
783                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
784         self.verify_keymat(sa.keys, self.sa, 'sk_d')
785         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
786         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
787         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
788         self.verify_keymat(sa.keys, self.sa, 'sk_er')
789         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
790         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
791
792         self.assertEqual(sa.i_id.type, self.sa.id_type)
793         self.assertEqual(sa.r_id.type, self.sa.id_type)
794         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
795         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
796         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
797         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
798
799         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
800         self.assertEqual(len(r), 1)
801         csa = r[0].child_sa
802         self.assertEqual(csa.sa_index, sa.sa_index)
803         c = self.sa.child_sas[0]
804         if hasattr(c, 'sk_ai'):
805             self.verify_keymat(csa.keys, c, 'sk_ai')
806             self.verify_keymat(csa.keys, c, 'sk_ar')
807         self.verify_keymat(csa.keys, c, 'sk_ei')
808         self.verify_keymat(csa.keys, c, 'sk_er')
809         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
810         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
811
812         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
813         tsi = tsi[0]
814         tsr = tsr[0]
815         r = self.vapi.ikev2_traffic_selector_dump(
816                 is_initiator=True, sa_index=sa.sa_index,
817                 child_sa_index=csa.child_sa_index)
818         self.assertEqual(len(r), 1)
819         ts = r[0].ts
820         self.verify_ts(r[0].ts, tsi[0], True)
821
822         r = self.vapi.ikev2_traffic_selector_dump(
823                 is_initiator=False, sa_index=sa.sa_index,
824                 child_sa_index=csa.child_sa_index)
825         self.assertEqual(len(r), 1)
826         self.verify_ts(r[0].ts, tsr[0], False)
827
828         n = self.vapi.ikev2_nonce_get(is_initiator=True,
829                                       sa_index=sa.sa_index)
830         self.verify_nonce(n, self.sa.i_nonce)
831         n = self.vapi.ikev2_nonce_get(is_initiator=False,
832                                       sa_index=sa.sa_index)
833         self.verify_nonce(n, self.sa.r_nonce)
834
835     def verify_nonce(self, api_nonce, nonce):
836         self.assertEqual(api_nonce.data_len, len(nonce))
837         self.assertEqual(api_nonce.nonce, nonce)
838
839     def verify_ts(self, api_ts, ts, is_initiator):
840         if is_initiator:
841             self.assertTrue(api_ts.is_local)
842         else:
843             self.assertFalse(api_ts.is_local)
844
845         if self.p.ts_is_ip4:
846             self.assertEqual(api_ts.start_addr,
847                              IPv4Address(ts.starting_address_v4))
848             self.assertEqual(api_ts.end_addr,
849                              IPv4Address(ts.ending_address_v4))
850         else:
851             self.assertEqual(api_ts.start_addr,
852                              IPv6Address(ts.starting_address_v6))
853             self.assertEqual(api_ts.end_addr,
854                              IPv6Address(ts.ending_address_v6))
855         self.assertEqual(api_ts.start_port, ts.start_port)
856         self.assertEqual(api_ts.end_port, ts.end_port)
857         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
858
859
860 class TemplateInitiator(IkePeer):
861     """ initiator test template """
862
863     def initiate_del_sa_from_initiator(self):
864         ispi = int.from_bytes(self.sa.ispi, 'little')
865         self.pg0.enable_capture()
866         self.pg_start()
867         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
868         capture = self.pg0.get_capture(1)
869         ih = self.get_ike_header(capture[0])
870         self.assertNotIn('Response', ih.flags)
871         self.assertIn('Initiator', ih.flags)
872         self.assertEqual(ih.init_SPI, self.sa.ispi)
873         self.assertEqual(ih.resp_SPI, self.sa.rspi)
874         plain = self.sa.hmac_and_decrypt(ih)
875         d = ikev2.IKEv2_payload_Delete(plain)
876         self.assertEqual(d.proto, 1)  # proto=IKEv2
877         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
878                              flags='Response', exch_type='INFORMATIONAL',
879                              id=ih.id, next_payload='Encrypted')
880         resp = self.encrypt_ike_msg(header, b'', None)
881         self.send_and_assert_no_replies(self.pg0, resp)
882
883     def verify_del_sa(self, packet):
884         ih = self.get_ike_header(packet)
885         self.assertEqual(ih.id, self.sa.msg_id)
886         self.assertEqual(ih.exch_type, 37)  # exchange informational
887         self.assertIn('Response', ih.flags)
888         self.assertIn('Initiator', ih.flags)
889         plain = self.sa.hmac_and_decrypt(ih)
890         self.assertEqual(plain, b'')
891
892     def initiate_del_sa_from_responder(self):
893         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
894                              exch_type='INFORMATIONAL',
895                              id=self.sa.new_msg_id())
896         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
897         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
898         packet = self.create_packet(self.pg0, ike_msg,
899                                     self.sa.sport, self.sa.dport,
900                                     self.sa.natt, self.ip6)
901         self.pg0.add_stream(packet)
902         self.pg0.enable_capture()
903         self.pg_start()
904         capture = self.pg0.get_capture(1)
905         self.verify_del_sa(capture[0])
906
907     @staticmethod
908     def find_notify_payload(packet, notify_type):
909         n = packet[ikev2.IKEv2_payload_Notify]
910         while n is not None:
911             if n.type == notify_type:
912                 return n
913             n = n.payload
914         return None
915
916     def verify_nat_detection(self, packet):
917         if self.ip6:
918             iph = packet[IPv6]
919         else:
920             iph = packet[IP]
921         udp = packet[UDP]
922
923         # NAT_DETECTION_SOURCE_IP
924         s = self.find_notify_payload(packet, 16388)
925         self.assertIsNotNone(s)
926         src_sha = self.sa.compute_nat_sha1(
927                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
928         self.assertEqual(s.load, src_sha)
929
930         # NAT_DETECTION_DESTINATION_IP
931         s = self.find_notify_payload(packet, 16389)
932         self.assertIsNotNone(s)
933         dst_sha = self.sa.compute_nat_sha1(
934                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
935         self.assertEqual(s.load, dst_sha)
936
937     def verify_sa_init_request(self, packet):
938         udp = packet[UDP]
939         self.sa.dport = udp.sport
940         ih = packet[ikev2.IKEv2]
941         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
942         self.assertEqual(ih.exch_type, 34)  # SA_INIT
943         self.sa.ispi = ih.init_SPI
944         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
945         self.assertIn('Initiator', ih.flags)
946         self.assertNotIn('Response', ih.flags)
947         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
948         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
949
950         prop = packet[ikev2.IKEv2_payload_Proposal]
951         self.assertEqual(prop.proto, 1)  # proto = ikev2
952         self.assertEqual(prop.proposal, 1)
953         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
954         self.assertEqual(prop.trans[0].transform_id,
955                          self.p.ike_transforms['crypto_alg'])
956         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
957         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
958         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
959         self.assertEqual(prop.trans[2].transform_id,
960                          self.p.ike_transforms['dh_group'])
961
962         self.verify_nat_detection(packet)
963         self.sa.set_ike_props(
964                     crypto='AES-GCM-16ICV', crypto_key_len=32,
965                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
966         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
967                               integ='SHA2-256-128')
968         self.sa.generate_dh_data()
969         self.sa.complete_dh_data()
970         self.sa.calc_keys()
971
972     def update_esp_transforms(self, trans, sa):
973         while trans:
974             if trans.transform_type == 1:  # ecryption
975                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
976             elif trans.transform_type == 3:  # integrity
977                 sa.esp_integ = INTEG_IDS[trans.transform_id]
978             trans = trans.payload
979
980     def verify_sa_auth_req(self, packet):
981         udp = packet[UDP]
982         self.sa.dport = udp.sport
983         ih = self.get_ike_header(packet)
984         self.assertEqual(ih.resp_SPI, self.sa.rspi)
985         self.assertEqual(ih.init_SPI, self.sa.ispi)
986         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
987         self.assertIn('Initiator', ih.flags)
988         self.assertNotIn('Response', ih.flags)
989
990         udp = packet[UDP]
991         self.verify_udp(udp)
992         self.assertEqual(ih.id, self.sa.msg_id + 1)
993         self.sa.msg_id += 1
994         plain = self.sa.hmac_and_decrypt(ih)
995         idi = ikev2.IKEv2_payload_IDi(plain)
996         idr = ikev2.IKEv2_payload_IDr(idi.payload)
997         self.assertEqual(idi.load, self.sa.i_id)
998         self.assertEqual(idr.load, self.sa.r_id)
999         prop = idi[ikev2.IKEv2_payload_Proposal]
1000         c = self.sa.child_sas[0]
1001         c.ispi = prop.SPI
1002         self.update_esp_transforms(
1003                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1004
1005     def send_init_response(self):
1006         tr_attr = self.sa.ike_crypto_attr()
1007         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1008                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1009                  key_length=tr_attr[0]) /
1010                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1011                  transform_id=self.sa.ike_integ) /
1012                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1013                  transform_id=self.sa.ike_prf_alg.name) /
1014                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1015                  transform_id=self.sa.ike_dh))
1016         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1017                  trans_nb=4, trans=trans))
1018
1019         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1020         if self.sa.natt:
1021             dst_address = b'\x0a\x0a\x0a\x0a'
1022         else:
1023             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1024         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1025         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1026
1027         self.sa.init_resp_packet = (
1028             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1029                         exch_type='IKE_SA_INIT', flags='Response') /
1030             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1031             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1032                                    group=self.sa.ike_dh,
1033                                    load=self.sa.my_dh_pub_key) /
1034             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1035                                       next_payload='Notify') /
1036             ikev2.IKEv2_payload_Notify(
1037                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1038                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1039                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1040
1041         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1042                                      self.sa.sport, self.sa.dport,
1043                                      False, self.ip6)
1044         self.pg_send(self.pg0, ike_msg)
1045         capture = self.pg0.get_capture(1)
1046         self.verify_sa_auth_req(capture[0])
1047
1048     def initiate_sa_init(self):
1049         self.pg0.enable_capture()
1050         self.pg_start()
1051         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1052
1053         capture = self.pg0.get_capture(1)
1054         self.verify_sa_init_request(capture[0])
1055         self.send_init_response()
1056
1057     def send_auth_response(self):
1058         tr_attr = self.sa.esp_crypto_attr()
1059         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1060                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1061                  key_length=tr_attr[0]) /
1062                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1063                  transform_id=self.sa.esp_integ) /
1064                  ikev2.IKEv2_payload_Transform(
1065                  transform_type='Extended Sequence Number',
1066                  transform_id='No ESN') /
1067                  ikev2.IKEv2_payload_Transform(
1068                  transform_type='Extended Sequence Number',
1069                  transform_id='ESN'))
1070
1071         c = self.sa.child_sas[0]
1072         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1073                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1074
1075         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1076         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1077                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1078                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1079                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1080                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1081                  auth_type=AuthMethod.value(self.sa.auth_method),
1082                  load=self.sa.auth_data) /
1083                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1084                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1085                  number_of_TSs=len(tsi),
1086                  traffic_selector=tsi) /
1087                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1088                  number_of_TSs=len(tsr),
1089                  traffic_selector=tsr) /
1090                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1091
1092         header = ikev2.IKEv2(
1093                 init_SPI=self.sa.ispi,
1094                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1095                 flags='Response', exch_type='IKE_AUTH')
1096
1097         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1098         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1099                                     self.sa.dport, self.sa.natt, self.ip6)
1100         self.pg_send(self.pg0, packet)
1101
1102     def test_initiator(self):
1103         self.initiate_sa_init()
1104         self.sa.auth_init()
1105         self.sa.calc_child_keys()
1106         self.send_auth_response()
1107         self.verify_ike_sas()
1108
1109
1110 class TemplateResponder(IkePeer):
1111     """ responder test template """
1112
1113     def initiate_del_sa_from_responder(self):
1114         self.pg0.enable_capture()
1115         self.pg_start()
1116         self.vapi.ikev2_initiate_del_ike_sa(
1117                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1118         capture = self.pg0.get_capture(1)
1119         ih = self.get_ike_header(capture[0])
1120         self.assertNotIn('Response', ih.flags)
1121         self.assertNotIn('Initiator', ih.flags)
1122         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1123         plain = self.sa.hmac_and_decrypt(ih)
1124         d = ikev2.IKEv2_payload_Delete(plain)
1125         self.assertEqual(d.proto, 1)  # proto=IKEv2
1126         self.assertEqual(ih.init_SPI, self.sa.ispi)
1127         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1128         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1129                              flags='Initiator+Response',
1130                              exch_type='INFORMATIONAL',
1131                              id=ih.id, next_payload='Encrypted')
1132         resp = self.encrypt_ike_msg(header, b'', None)
1133         self.send_and_assert_no_replies(self.pg0, resp)
1134
1135     def verify_del_sa(self, packet):
1136         ih = self.get_ike_header(packet)
1137         self.assertEqual(ih.id, self.sa.msg_id)
1138         self.assertEqual(ih.exch_type, 37)  # exchange informational
1139         self.assertIn('Response', ih.flags)
1140         self.assertNotIn('Initiator', ih.flags)
1141         self.assertEqual(ih.next_payload, 46)  # Encrypted
1142         self.assertEqual(ih.init_SPI, self.sa.ispi)
1143         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1144         plain = self.sa.hmac_and_decrypt(ih)
1145         self.assertEqual(plain, b'')
1146
1147     def initiate_del_sa_from_initiator(self):
1148         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1149                              flags='Initiator', exch_type='INFORMATIONAL',
1150                              id=self.sa.new_msg_id())
1151         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1152         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1153         packet = self.create_packet(self.pg0, ike_msg,
1154                                     self.sa.sport, self.sa.dport,
1155                                     self.sa.natt, self.ip6)
1156         self.pg0.add_stream(packet)
1157         self.pg0.enable_capture()
1158         self.pg_start()
1159         capture = self.pg0.get_capture(1)
1160         self.verify_del_sa(capture[0])
1161
1162     def send_sa_init_req(self, behind_nat=False):
1163         tr_attr = self.sa.ike_crypto_attr()
1164         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1165                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1166                  key_length=tr_attr[0]) /
1167                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1168                  transform_id=self.sa.ike_integ) /
1169                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1170                  transform_id=self.sa.ike_prf_alg.name) /
1171                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1172                  transform_id=self.sa.ike_dh))
1173
1174         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1175                  trans_nb=4, trans=trans))
1176
1177         self.sa.init_req_packet = (
1178                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1179                             flags='Initiator', exch_type='IKE_SA_INIT') /
1180                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1181                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1182                                        group=self.sa.ike_dh,
1183                                        load=self.sa.my_dh_pub_key) /
1184                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1185                                           load=self.sa.i_nonce))
1186
1187         if behind_nat:
1188             src_address = b'\x0a\x0a\x0a\x01'
1189         else:
1190             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1191
1192         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1193         dst_nat = self.sa.compute_nat_sha1(
1194                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1195                 self.sa.dport)
1196         nat_src_detection = ikev2.IKEv2_payload_Notify(
1197                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1198                 next_payload='Notify')
1199         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1200                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1201         self.sa.init_req_packet = (self.sa.init_req_packet /
1202                                    nat_src_detection /
1203                                    nat_dst_detection)
1204
1205         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1206                                      self.sa.sport, self.sa.dport,
1207                                      self.sa.natt, self.ip6)
1208         self.pg0.add_stream(ike_msg)
1209         self.pg0.enable_capture()
1210         self.pg_start()
1211         capture = self.pg0.get_capture(1)
1212         self.verify_sa_init(capture[0])
1213
1214     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1215         tr_attr = self.sa.esp_crypto_attr()
1216         last_payload = last_payload or 'Notify'
1217         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1218                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1219                  key_length=tr_attr[0]) /
1220                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1221                  transform_id=self.sa.esp_integ) /
1222                  ikev2.IKEv2_payload_Transform(
1223                  transform_type='Extended Sequence Number',
1224                  transform_id='No ESN') /
1225                  ikev2.IKEv2_payload_Transform(
1226                  transform_type='Extended Sequence Number',
1227                  transform_id='ESN'))
1228
1229         c = self.sa.child_sas[0]
1230         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1231                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1232
1233         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1234         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1235                  auth_type=AuthMethod.value(self.sa.auth_method),
1236                  load=self.sa.auth_data) /
1237                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1238                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1239                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1240                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1241                  number_of_TSs=len(tsr), traffic_selector=tsr))
1242
1243         if is_rekey:
1244             first_payload = 'Nonce'
1245             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1246                      next_payload='SA') / plain /
1247                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1248                      proto='ESP', SPI=c.ispi))
1249         else:
1250             first_payload = 'IDi'
1251             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1252                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1253                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1254                    IDtype=self.sa.id_type, load=self.sa.r_id))
1255             plain = ids / plain
1256         return plain, first_payload
1257
1258     def send_sa_auth(self):
1259         plain, first_payload = self.generate_auth_payload(
1260                     last_payload='Notify')
1261         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1262         header = ikev2.IKEv2(
1263                 init_SPI=self.sa.ispi,
1264                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1265                 flags='Initiator', exch_type='IKE_AUTH')
1266
1267         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1268         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1269                                     self.sa.dport, self.sa.natt, self.ip6)
1270         self.pg0.add_stream(packet)
1271         self.pg0.enable_capture()
1272         self.pg_start()
1273         capture = self.pg0.get_capture(1)
1274         self.verify_sa_auth_resp(capture[0])
1275
1276     def verify_sa_init(self, packet):
1277         ih = self.get_ike_header(packet)
1278
1279         self.assertEqual(ih.id, self.sa.msg_id)
1280         self.assertEqual(ih.exch_type, 34)
1281         self.assertIn('Response', ih.flags)
1282         self.assertEqual(ih.init_SPI, self.sa.ispi)
1283         self.assertNotEqual(ih.resp_SPI, 0)
1284         self.sa.rspi = ih.resp_SPI
1285         try:
1286             sa = ih[ikev2.IKEv2_payload_SA]
1287             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1288             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1289         except IndexError as e:
1290             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1291             self.logger.error(ih.show())
1292             raise
1293         self.sa.complete_dh_data()
1294         self.sa.calc_keys()
1295         self.sa.auth_init()
1296
1297     def verify_sa_auth_resp(self, packet):
1298         ike = self.get_ike_header(packet)
1299         udp = packet[UDP]
1300         self.verify_udp(udp)
1301         self.assertEqual(ike.id, self.sa.msg_id)
1302         plain = self.sa.hmac_and_decrypt(ike)
1303         idr = ikev2.IKEv2_payload_IDr(plain)
1304         prop = idr[ikev2.IKEv2_payload_Proposal]
1305         self.assertEqual(prop.SPIsize, 4)
1306         self.sa.child_sas[0].rspi = prop.SPI
1307         self.sa.calc_child_keys()
1308
1309     def test_responder(self):
1310         self.send_sa_init_req(self.sa.natt)
1311         self.send_sa_auth()
1312         self.verify_ipsec_sas()
1313         self.verify_ike_sas()
1314
1315
1316 class Ikev2Params(object):
1317     def config_params(self, params={}):
1318         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1319         ei = VppEnum.vl_api_ipsec_integ_alg_t
1320         self.vpp_enums = {
1321                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1322                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1323                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1324                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1325                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1326                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1327
1328                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1329                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1330                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1331                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1332
1333         dpd_disabled = True if 'dpd_disabled' not in params else\
1334             params['dpd_disabled']
1335         if dpd_disabled:
1336             self.vapi.cli('ikev2 dpd disable')
1337         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1338             not in params else params['del_sa_from_responder']
1339         is_natt = 'natt' in params and params['natt'] or False
1340         self.p = Profile(self, 'pr1')
1341         self.ip6 = False if 'ip6' not in params else params['ip6']
1342
1343         if 'auth' in params and params['auth'] == 'rsa-sig':
1344             auth_method = 'rsa-sig'
1345             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1346             self.vapi.ikev2_set_local_key(
1347                     key_file=work_dir + params['server-key'])
1348
1349             client_file = work_dir + params['client-cert']
1350             server_pem = open(work_dir + params['server-cert']).read()
1351             client_priv = open(work_dir + params['client-key']).read()
1352             client_priv = load_pem_private_key(str.encode(client_priv), None,
1353                                                default_backend())
1354             self.peer_cert = x509.load_pem_x509_certificate(
1355                     str.encode(server_pem),
1356                     default_backend())
1357             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1358             auth_data = None
1359         else:
1360             auth_data = b'$3cr3tpa$$w0rd'
1361             self.p.add_auth(method='shared-key', data=auth_data)
1362             auth_method = 'shared-key'
1363             client_priv = None
1364
1365         is_init = True if 'is_initiator' not in params else\
1366             params['is_initiator']
1367
1368         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1369         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1370         if is_init:
1371             self.p.add_local_id(**idr)
1372             self.p.add_remote_id(**idi)
1373         else:
1374             self.p.add_local_id(**idi)
1375             self.p.add_remote_id(**idr)
1376
1377         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1378             'loc_ts' not in params else params['loc_ts']
1379         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1380             'rem_ts' not in params else params['rem_ts']
1381         self.p.add_local_ts(**loc_ts)
1382         self.p.add_remote_ts(**rem_ts)
1383         if 'responder' in params:
1384             self.p.add_responder(params['responder'])
1385         if 'ike_transforms' in params:
1386             self.p.add_ike_transforms(params['ike_transforms'])
1387         if 'esp_transforms' in params:
1388             self.p.add_esp_transforms(params['esp_transforms'])
1389
1390         udp_encap = False if 'udp_encap' not in params else\
1391             params['udp_encap']
1392         if udp_encap:
1393             self.p.set_udp_encap(True)
1394
1395         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1396                           is_initiator=is_init,
1397                           id_type=self.p.local_id['id_type'], natt=is_natt,
1398                           priv_key=client_priv, auth_method=auth_method,
1399                           auth_data=auth_data, udp_encap=udp_encap,
1400                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1401         if is_init:
1402             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1403                 params['ike-crypto']
1404             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1405                 params['ike-integ']
1406             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1407                 params['ike-dh']
1408
1409             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1410                 params['esp-crypto']
1411             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1412                 params['esp-integ']
1413
1414             self.sa.set_ike_props(
1415                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1416                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1417             self.sa.set_esp_props(
1418                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1419                     integ=esp_integ)
1420
1421
1422 class TestApi(VppTestCase):
1423     """ Test IKEV2 API """
1424     @classmethod
1425     def setUpClass(cls):
1426         super(TestApi, cls).setUpClass()
1427
1428     @classmethod
1429     def tearDownClass(cls):
1430         super(TestApi, cls).tearDownClass()
1431
1432     def tearDown(self):
1433         super(TestApi, self).tearDown()
1434         self.p1.remove_vpp_config()
1435         self.p2.remove_vpp_config()
1436         r = self.vapi.ikev2_profile_dump()
1437         self.assertEqual(len(r), 0)
1438
1439     def configure_profile(self, cfg):
1440         p = Profile(self, cfg['name'])
1441         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1442         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1443         p.add_local_ts(**cfg['loc_ts'])
1444         p.add_remote_ts(**cfg['rem_ts'])
1445         p.add_responder(cfg['responder'])
1446         p.add_ike_transforms(cfg['ike_ts'])
1447         p.add_esp_transforms(cfg['esp_ts'])
1448         p.add_auth(**cfg['auth'])
1449         p.set_udp_encap(cfg['udp_encap'])
1450         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1451         if 'lifetime_data' in cfg:
1452             p.set_lifetime_data(cfg['lifetime_data'])
1453         if 'tun_itf' in cfg:
1454             p.set_tunnel_interface(cfg['tun_itf'])
1455         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1456             p.disable_natt()
1457         p.add_vpp_config()
1458         return p
1459
1460     def test_profile_api(self):
1461         """ test profile dump API """
1462         loc_ts4 = {
1463                     'proto': 8,
1464                     'start_port': 1,
1465                     'end_port': 19,
1466                     'start_addr': '3.3.3.2',
1467                     'end_addr': '3.3.3.3',
1468                 }
1469         rem_ts4 = {
1470                     'proto': 9,
1471                     'start_port': 10,
1472                     'end_port': 119,
1473                     'start_addr': '4.5.76.80',
1474                     'end_addr': '2.3.4.6',
1475                 }
1476
1477         loc_ts6 = {
1478                     'proto': 8,
1479                     'start_port': 1,
1480                     'end_port': 19,
1481                     'start_addr': 'ab::1',
1482                     'end_addr': 'ab::4',
1483                 }
1484         rem_ts6 = {
1485                     'proto': 9,
1486                     'start_port': 10,
1487                     'end_port': 119,
1488                     'start_addr': 'cd::12',
1489                     'end_addr': 'cd::13',
1490                 }
1491
1492         conf = {
1493             'p1': {
1494                 'name': 'p1',
1495                 'natt_disabled': True,
1496                 'loc_id': ('fqdn', b'vpp.home'),
1497                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1498                 'loc_ts': loc_ts4,
1499                 'rem_ts': rem_ts4,
1500                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1501                 'ike_ts': {
1502                         'crypto_alg': 20,
1503                         'crypto_key_size': 32,
1504                         'integ_alg': 1,
1505                         'dh_group': 1},
1506                 'esp_ts': {
1507                         'crypto_alg': 13,
1508                         'crypto_key_size': 24,
1509                         'integ_alg': 2},
1510                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1511                 'udp_encap': True,
1512                 'ipsec_over_udp_port': 4501,
1513                 'lifetime_data': {
1514                     'lifetime': 123,
1515                     'lifetime_maxdata': 20192,
1516                     'lifetime_jitter': 9,
1517                     'handover': 132},
1518             },
1519             'p2': {
1520                 'name': 'p2',
1521                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1522                 'rem_id': ('ip6-addr', b'abcd::1'),
1523                 'loc_ts': loc_ts6,
1524                 'rem_ts': rem_ts6,
1525                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1526                 'ike_ts': {
1527                         'crypto_alg': 12,
1528                         'crypto_key_size': 16,
1529                         'integ_alg': 3,
1530                         'dh_group': 3},
1531                 'esp_ts': {
1532                         'crypto_alg': 9,
1533                         'crypto_key_size': 24,
1534                         'integ_alg': 4},
1535                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1536                 'udp_encap': False,
1537                 'ipsec_over_udp_port': 4600,
1538                 'tun_itf': 0}
1539         }
1540         self.p1 = self.configure_profile(conf['p1'])
1541         self.p2 = self.configure_profile(conf['p2'])
1542
1543         r = self.vapi.ikev2_profile_dump()
1544         self.assertEqual(len(r), 2)
1545         self.verify_profile(r[0].profile, conf['p1'])
1546         self.verify_profile(r[1].profile, conf['p2'])
1547
1548     def verify_id(self, api_id, cfg_id):
1549         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1550         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1551
1552     def verify_ts(self, api_ts, cfg_ts):
1553         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1554         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1555         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1556         self.assertEqual(api_ts.start_addr,
1557                          ip_address(text_type(cfg_ts['start_addr'])))
1558         self.assertEqual(api_ts.end_addr,
1559                          ip_address(text_type(cfg_ts['end_addr'])))
1560
1561     def verify_responder(self, api_r, cfg_r):
1562         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1563         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1564
1565     def verify_transforms(self, api_ts, cfg_ts):
1566         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1567         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1568         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1569
1570     def verify_ike_transforms(self, api_ts, cfg_ts):
1571         self.verify_transforms(api_ts, cfg_ts)
1572         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1573
1574     def verify_esp_transforms(self, api_ts, cfg_ts):
1575         self.verify_transforms(api_ts, cfg_ts)
1576
1577     def verify_auth(self, api_auth, cfg_auth):
1578         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1579         self.assertEqual(api_auth.data, cfg_auth['data'])
1580         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1581
1582     def verify_lifetime_data(self, p, ld):
1583         self.assertEqual(p.lifetime, ld['lifetime'])
1584         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1585         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1586         self.assertEqual(p.handover, ld['handover'])
1587
1588     def verify_profile(self, ap, cp):
1589         self.assertEqual(ap.name, cp['name'])
1590         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1591         self.verify_id(ap.loc_id, cp['loc_id'])
1592         self.verify_id(ap.rem_id, cp['rem_id'])
1593         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1594         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1595         self.verify_responder(ap.responder, cp['responder'])
1596         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1597         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1598         self.verify_auth(ap.auth, cp['auth'])
1599         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1600         self.assertTrue(natt_dis == ap.natt_disabled)
1601
1602         if 'lifetime_data' in cp:
1603             self.verify_lifetime_data(ap, cp['lifetime_data'])
1604         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1605         if 'tun_itf' in cp:
1606             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1607         else:
1608             self.assertEqual(ap.tun_itf, 0xffffffff)
1609
1610
1611 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1612     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1613
1614     def config_tc(self):
1615         self.config_params({
1616             'natt': True,
1617             'is_initiator': False,  # seen from test case perspective
1618                                     # thus vpp is initiator
1619             'responder': {'sw_if_index': self.pg0.sw_if_index,
1620                            'addr': self.pg0.remote_ip4},
1621             'ike-crypto': ('AES-GCM-16ICV', 32),
1622             'ike-integ': 'NULL',
1623             'ike-dh': '3072MODPgr',
1624             'ike_transforms': {
1625                 'crypto_alg': 20,  # "aes-gcm-16"
1626                 'crypto_key_size': 256,
1627                 'dh_group': 15,  # "modp-3072"
1628             },
1629             'esp_transforms': {
1630                 'crypto_alg': 12,  # "aes-cbc"
1631                 'crypto_key_size': 256,
1632                 # "hmac-sha2-256-128"
1633                 'integ_alg': 12}})
1634
1635
1636 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1637     """ test ikev2 initiator - pre shared key auth """
1638
1639     def config_tc(self):
1640         self.config_params({
1641             'is_initiator': False,  # seen from test case perspective
1642                                     # thus vpp is initiator
1643             'responder': {'sw_if_index': self.pg0.sw_if_index,
1644                            'addr': self.pg0.remote_ip4},
1645             'ike-crypto': ('AES-GCM-16ICV', 32),
1646             'ike-integ': 'NULL',
1647             'ike-dh': '3072MODPgr',
1648             'ike_transforms': {
1649                 'crypto_alg': 20,  # "aes-gcm-16"
1650                 'crypto_key_size': 256,
1651                 'dh_group': 15,  # "modp-3072"
1652             },
1653             'esp_transforms': {
1654                 'crypto_alg': 12,  # "aes-cbc"
1655                 'crypto_key_size': 256,
1656                 # "hmac-sha2-256-128"
1657                 'integ_alg': 12}})
1658
1659
1660 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1661     """ test initiator - request window size (1) """
1662
1663     def rekey_respond(self, req, update_child_sa_data):
1664         ih = self.get_ike_header(req)
1665         plain = self.sa.hmac_and_decrypt(ih)
1666         sa = ikev2.IKEv2_payload_SA(plain)
1667         if update_child_sa_data:
1668             prop = sa[ikev2.IKEv2_payload_Proposal]
1669             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1670             self.sa.r_nonce = self.sa.i_nonce
1671             self.sa.child_sas[0].ispi = prop.SPI
1672             self.sa.child_sas[0].rspi = prop.SPI
1673             self.sa.calc_child_keys()
1674
1675         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1676                              flags='Response', exch_type=36,
1677                              id=ih.id, next_payload='Encrypted')
1678         resp = self.encrypt_ike_msg(header, sa, 'SA')
1679         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1680                                     self.sa.dport, self.sa.natt, self.ip6)
1681         self.send_and_assert_no_replies(self.pg0, packet)
1682
1683     def test_initiator(self):
1684         super(TestInitiatorRequestWindowSize, self).test_initiator()
1685         self.pg0.enable_capture()
1686         self.pg_start()
1687         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1688         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1689         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1690         capture = self.pg0.get_capture(2)
1691
1692         # reply in reverse order
1693         self.rekey_respond(capture[1], True)
1694         self.rekey_respond(capture[0], False)
1695
1696         # verify that only the second request was accepted
1697         self.verify_ike_sas()
1698         self.verify_ipsec_sas(is_rekey=True)
1699
1700
1701 class TestInitiatorRekey(TestInitiatorPsk):
1702     """ test ikev2 initiator - rekey """
1703
1704     def rekey_from_initiator(self):
1705         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1706         self.pg0.enable_capture()
1707         self.pg_start()
1708         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1709         capture = self.pg0.get_capture(1)
1710         ih = self.get_ike_header(capture[0])
1711         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1712         self.assertNotIn('Response', ih.flags)
1713         self.assertIn('Initiator', ih.flags)
1714         plain = self.sa.hmac_and_decrypt(ih)
1715         sa = ikev2.IKEv2_payload_SA(plain)
1716         prop = sa[ikev2.IKEv2_payload_Proposal]
1717         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1718         self.sa.r_nonce = self.sa.i_nonce
1719         # update new responder SPI
1720         self.sa.child_sas[0].ispi = prop.SPI
1721         self.sa.child_sas[0].rspi = prop.SPI
1722         self.sa.calc_child_keys()
1723         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1724                              flags='Response', exch_type=36,
1725                              id=ih.id, next_payload='Encrypted')
1726         resp = self.encrypt_ike_msg(header, sa, 'SA')
1727         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1728                                     self.sa.dport, self.sa.natt, self.ip6)
1729         self.send_and_assert_no_replies(self.pg0, packet)
1730
1731     def test_initiator(self):
1732         super(TestInitiatorRekey, self).test_initiator()
1733         self.rekey_from_initiator()
1734         self.verify_ike_sas()
1735         self.verify_ipsec_sas(is_rekey=True)
1736
1737
1738 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1739     """ test ikev2 initiator - delete IKE SA from responder """
1740
1741     def config_tc(self):
1742         self.config_params({
1743             'del_sa_from_responder': True,
1744             'is_initiator': False,  # seen from test case perspective
1745                                     # thus vpp is initiator
1746             'responder': {'sw_if_index': self.pg0.sw_if_index,
1747                            'addr': self.pg0.remote_ip4},
1748             'ike-crypto': ('AES-GCM-16ICV', 32),
1749             'ike-integ': 'NULL',
1750             'ike-dh': '3072MODPgr',
1751             'ike_transforms': {
1752                 'crypto_alg': 20,  # "aes-gcm-16"
1753                 'crypto_key_size': 256,
1754                 'dh_group': 15,  # "modp-3072"
1755             },
1756             'esp_transforms': {
1757                 'crypto_alg': 12,  # "aes-cbc"
1758                 'crypto_key_size': 256,
1759                 # "hmac-sha2-256-128"
1760                 'integ_alg': 12}})
1761
1762
1763 class TestResponderNATT(TemplateResponder, Ikev2Params):
1764     """ test ikev2 responder - nat traversal """
1765     def config_tc(self):
1766         self.config_params(
1767                 {'natt': True})
1768
1769
1770 class TestResponderPsk(TemplateResponder, Ikev2Params):
1771     """ test ikev2 responder - pre shared key auth """
1772     def config_tc(self):
1773         self.config_params()
1774
1775
1776 class TestResponderDpd(TestResponderPsk):
1777     """
1778     Dead peer detection test
1779     """
1780     def config_tc(self):
1781         self.config_params({'dpd_disabled': False})
1782
1783     def tearDown(self):
1784         pass
1785
1786     def test_responder(self):
1787         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1788         super(TestResponderDpd, self).test_responder()
1789         self.pg0.enable_capture()
1790         self.pg_start()
1791         # capture empty request but don't reply
1792         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1793         ih = self.get_ike_header(capture[0])
1794         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1795         plain = self.sa.hmac_and_decrypt(ih)
1796         self.assertEqual(plain, b'')
1797         # wait for SA expiration
1798         time.sleep(3)
1799         ike_sas = self.vapi.ikev2_sa_dump()
1800         self.assertEqual(len(ike_sas), 0)
1801         ipsec_sas = self.vapi.ipsec_sa_dump()
1802         self.assertEqual(len(ipsec_sas), 0)
1803
1804
1805 class TestResponderRekey(TestResponderPsk):
1806     """ test ikev2 responder - rekey """
1807
1808     def rekey_from_initiator(self):
1809         packet = self.create_rekey_request()
1810         self.pg0.add_stream(packet)
1811         self.pg0.enable_capture()
1812         self.pg_start()
1813         capture = self.pg0.get_capture(1)
1814         ih = self.get_ike_header(capture[0])
1815         plain = self.sa.hmac_and_decrypt(ih)
1816         sa = ikev2.IKEv2_payload_SA(plain)
1817         prop = sa[ikev2.IKEv2_payload_Proposal]
1818         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1819         # update new responder SPI
1820         self.sa.child_sas[0].rspi = prop.SPI
1821
1822     def test_responder(self):
1823         super(TestResponderRekey, self).test_responder()
1824         self.rekey_from_initiator()
1825         self.sa.calc_child_keys()
1826         self.verify_ike_sas()
1827         self.verify_ipsec_sas(is_rekey=True)
1828
1829
1830 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1831     """ test ikev2 responder - cert based auth """
1832     def config_tc(self):
1833         self.config_params({
1834             'udp_encap': True,
1835             'auth': 'rsa-sig',
1836             'server-key': 'server-key.pem',
1837             'client-key': 'client-key.pem',
1838             'client-cert': 'client-cert.pem',
1839             'server-cert': 'server-cert.pem'})
1840
1841
1842 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1843         (TemplateResponder, Ikev2Params):
1844     """
1845     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1846     """
1847     def config_tc(self):
1848         self.config_params({
1849             'ike-crypto': ('AES-CBC', 16),
1850             'ike-integ': 'SHA2-256-128',
1851             'esp-crypto': ('AES-CBC', 24),
1852             'esp-integ': 'SHA2-384-192',
1853             'ike-dh': '2048MODPgr'})
1854
1855
1856 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1857         (TemplateResponder, Ikev2Params):
1858     """
1859     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1860     """
1861     def config_tc(self):
1862         self.config_params({
1863             'ike-crypto': ('AES-CBC', 32),
1864             'ike-integ': 'SHA2-256-128',
1865             'esp-crypto': ('AES-GCM-16ICV', 32),
1866             'esp-integ': 'NULL',
1867             'ike-dh': '3072MODPgr'})
1868
1869
1870 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1871     """
1872     IKE:AES_GCM_16_256
1873     """
1874     def config_tc(self):
1875         self.config_params({
1876             'del_sa_from_responder': True,
1877             'ip6': True,
1878             'natt': True,
1879             'ike-crypto': ('AES-GCM-16ICV', 32),
1880             'ike-integ': 'NULL',
1881             'ike-dh': '2048MODPgr',
1882             'loc_ts': {'start_addr': 'ab:cd::0',
1883                        'end_addr': 'ab:cd::10'},
1884             'rem_ts': {'start_addr': '11::0',
1885                        'end_addr': '11::100'}})
1886
1887
1888 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1889     """
1890     Test for keep alive messages
1891     """
1892
1893     def send_empty_req_from_responder(self):
1894         packet = self.create_empty_request()
1895         self.pg0.add_stream(packet)
1896         self.pg0.enable_capture()
1897         self.pg_start()
1898         capture = self.pg0.get_capture(1)
1899         ih = self.get_ike_header(capture[0])
1900         self.assertEqual(ih.id, self.sa.msg_id)
1901         plain = self.sa.hmac_and_decrypt(ih)
1902         self.assertEqual(plain, b'')
1903
1904     def test_initiator(self):
1905         super(TestInitiatorKeepaliveMsg, self).test_initiator()
1906         self.send_empty_req_from_responder()
1907
1908
1909 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1910     """ malformed packet test """
1911
1912     def tearDown(self):
1913         pass
1914
1915     def config_tc(self):
1916         self.config_params()
1917
1918     def assert_counter(self, count, name, version='ip4'):
1919         node_name = '/err/ikev2-%s/' % version + name
1920         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1921
1922     def create_ike_init_msg(self, length=None, payload=None):
1923         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1924                           flags='Initiator', exch_type='IKE_SA_INIT')
1925         if payload is not None:
1926             msg /= payload
1927         return self.create_packet(self.pg0, msg, self.sa.sport,
1928                                   self.sa.dport)
1929
1930     def verify_bad_packet_length(self):
1931         ike_msg = self.create_ike_init_msg(length=0xdead)
1932         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1933         self.assert_counter(self.pkt_count, 'Bad packet length')
1934
1935     def verify_bad_sa_payload_length(self):
1936         p = ikev2.IKEv2_payload_SA(length=0xdead)
1937         ike_msg = self.create_ike_init_msg(payload=p)
1938         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1939         self.assert_counter(self.pkt_count, 'Malformed packet')
1940
1941     def test_responder(self):
1942         self.pkt_count = 254
1943         self.verify_bad_packet_length()
1944         self.verify_bad_sa_payload_length()
1945
1946
1947 if __name__ == '__main__':
1948     unittest.main(testRunner=VppTestRunner)