ikev2: add option to disable NAT traversal
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
9     Cipher,
10     algorithms,
11     modes,
12 )
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
22
23 try:
24     text_type = unicode
25 except NameError:
26     text_type = str
27
28 KEY_PAD = b"Key Pad for IKEv2"
29 SALT_SIZE = 4
30 GCM_ICV_SIZE = 16
31 GCM_IV_SIZE = 8
32
33
34 # defined in rfc3526
35 # tuple structure is (p, g, key_len)
36 DH = {
37     '2048MODPgr': (long_converter("""
38     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
49
50     '3072MODPgr': (long_converter("""
51     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
67 }
68
69
70 class CryptoAlgo(object):
71     def __init__(self, name, cipher, mode):
72         self.name = name
73         self.cipher = cipher
74         self.mode = mode
75         if self.cipher is not None:
76             self.bs = self.cipher.block_size // 8
77
78             if self.name == 'AES-GCM-16ICV':
79                 self.iv_len = GCM_IV_SIZE
80             else:
81                 self.iv_len = self.bs
82
83     def encrypt(self, data, key, aad=None):
84         iv = os.urandom(self.iv_len)
85         if aad is None:
86             encryptor = Cipher(self.cipher(key), self.mode(iv),
87                                default_backend()).encryptor()
88             return iv + encryptor.update(data) + encryptor.finalize()
89         else:
90             salt = key[-SALT_SIZE:]
91             nonce = salt + iv
92             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93                                default_backend()).encryptor()
94             encryptor.authenticate_additional_data(aad)
95             data = encryptor.update(data) + encryptor.finalize()
96             data += encryptor.tag[:GCM_ICV_SIZE]
97             return iv + data
98
99     def decrypt(self, data, key, aad=None, icv=None):
100         if aad is None:
101             iv = data[:self.iv_len]
102             ct = data[self.iv_len:]
103             decryptor = Cipher(algorithms.AES(key),
104                                self.mode(iv),
105                                default_backend()).decryptor()
106             return decryptor.update(ct) + decryptor.finalize()
107         else:
108             salt = key[-SALT_SIZE:]
109             nonce = salt + data[:GCM_IV_SIZE]
110             ct = data[GCM_IV_SIZE:]
111             key = key[:-SALT_SIZE]
112             decryptor = Cipher(algorithms.AES(key),
113                                self.mode(nonce, icv, len(icv)),
114                                default_backend()).decryptor()
115             decryptor.authenticate_additional_data(aad)
116             return decryptor.update(ct) + decryptor.finalize()
117
118     def pad(self, data):
119         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
120         data = data + b'\x00' * (pad_len - 1)
121         return data + bytes([pad_len - 1])
122
123
124 class AuthAlgo(object):
125     def __init__(self, name, mac, mod, key_len, trunc_len=None):
126         self.name = name
127         self.mac = mac
128         self.mod = mod
129         self.key_len = key_len
130         self.trunc_len = trunc_len or key_len
131
132
133 CRYPTO_ALGOS = {
134     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
135     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
136     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
137                                 mode=modes.GCM),
138 }
139
140 AUTH_ALGOS = {
141     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
142     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
143     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
144     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
145     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
146 }
147
148 PRF_ALGOS = {
149     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
150     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
151                                   hashes.SHA256, 32),
152 }
153
154 CRYPTO_IDS = {
155     12: 'AES-CBC',
156     20: 'AES-GCM-16ICV',
157 }
158
159 INTEG_IDS = {
160     2: 'HMAC-SHA1-96',
161     12: 'SHA2-256-128',
162     13: 'SHA2-384-192',
163     14: 'SHA2-512-256',
164 }
165
166
167 class IKEv2ChildSA(object):
168     def __init__(self, local_ts, remote_ts, is_initiator):
169         spi = os.urandom(4)
170         if is_initiator:
171             self.ispi = spi
172             self.rspi = None
173         else:
174             self.rspi = spi
175             self.ispi = None
176         self.local_ts = local_ts
177         self.remote_ts = remote_ts
178
179
180 class IKEv2SA(object):
181     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
182                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
183                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
184                  auth_method='shared-key', priv_key=None, natt=False):
185         self.natt = natt
186         if natt:
187             self.sport = 4500
188             self.dport = 4500
189         else:
190             self.sport = 500
191             self.dport = 500
192         self.msg_id = 0
193         self.dh_params = None
194         self.test = test
195         self.priv_key = priv_key
196         self.is_initiator = is_initiator
197         nonce = nonce or os.urandom(32)
198         self.auth_data = auth_data
199         self.i_id = i_id
200         self.r_id = r_id
201         if isinstance(id_type, str):
202             self.id_type = IDType.value(id_type)
203         else:
204             self.id_type = id_type
205         self.auth_method = auth_method
206         if self.is_initiator:
207             self.rspi = 8 * b'\x00'
208             self.ispi = spi
209             self.i_nonce = nonce
210         else:
211             self.rspi = spi
212             self.ispi = 8 * b'\x00'
213             self.r_nonce = nonce
214         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
215                           self.is_initiator)]
216
217     def new_msg_id(self):
218         self.msg_id += 1
219         return self.msg_id
220
221     @property
222     def my_dh_pub_key(self):
223         if self.is_initiator:
224             return self.i_dh_data
225         return self.r_dh_data
226
227     @property
228     def peer_dh_pub_key(self):
229         if self.is_initiator:
230             return self.r_dh_data
231         return self.i_dh_data
232
233     def compute_secret(self):
234         priv = self.dh_private_key
235         peer = self.peer_dh_pub_key
236         p, g, l = self.ike_group
237         return pow(int.from_bytes(peer, 'big'),
238                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
239
240     def generate_dh_data(self):
241         # generate DH keys
242         if self.ike_dh not in DH:
243             raise NotImplementedError('%s not in DH group' % self.ike_dh)
244
245         if self.dh_params is None:
246             dhg = DH[self.ike_dh]
247             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
248             self.dh_params = pn.parameters(default_backend())
249
250         priv = self.dh_params.generate_private_key()
251         pub = priv.public_key()
252         x = priv.private_numbers().x
253         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
254         y = pub.public_numbers().y
255
256         if self.is_initiator:
257             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
258         else:
259             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
260
261     def complete_dh_data(self):
262         self.dh_shared_secret = self.compute_secret()
263
264     def calc_child_keys(self):
265         prf = self.ike_prf_alg.mod()
266         s = self.i_nonce + self.r_nonce
267         c = self.child_sas[0]
268
269         encr_key_len = self.esp_crypto_key_len
270         integ_key_len = self.esp_integ_alg.key_len
271         salt_len = 0 if integ_key_len else 4
272
273         l = (integ_key_len * 2 +
274              encr_key_len * 2 +
275              salt_len * 2)
276         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
277
278         pos = 0
279         c.sk_ei = keymat[pos:pos+encr_key_len]
280         pos += encr_key_len
281
282         if integ_key_len:
283             c.sk_ai = keymat[pos:pos+integ_key_len]
284             pos += integ_key_len
285         else:
286             c.salt_ei = keymat[pos:pos+salt_len]
287             pos += salt_len
288
289         c.sk_er = keymat[pos:pos+encr_key_len]
290         pos += encr_key_len
291
292         if integ_key_len:
293             c.sk_ar = keymat[pos:pos+integ_key_len]
294             pos += integ_key_len
295         else:
296             c.salt_er = keymat[pos:pos+salt_len]
297             pos += salt_len
298
299     def calc_prfplus(self, prf, key, seed, length):
300         r = b''
301         t = None
302         x = 1
303         while len(r) < length and x < 255:
304             if t is not None:
305                 s = t
306             else:
307                 s = b''
308             s = s + seed + bytes([x])
309             t = self.calc_prf(prf, key, s)
310             r = r + t
311             x = x + 1
312
313         if x == 255:
314             return None
315         return r
316
317     def calc_prf(self, prf, key, data):
318         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
319         h.update(data)
320         return h.finalize()
321
322     def calc_keys(self):
323         prf = self.ike_prf_alg.mod()
324         # SKEYSEED = prf(Ni | Nr, g^ir)
325         s = self.i_nonce + self.r_nonce
326         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
327
328         # calculate S = Ni | Nr | SPIi SPIr
329         s = s + self.ispi + self.rspi
330
331         prf_key_trunc = self.ike_prf_alg.trunc_len
332         encr_key_len = self.ike_crypto_key_len
333         tr_prf_key_len = self.ike_prf_alg.key_len
334         integ_key_len = self.ike_integ_alg.key_len
335         if integ_key_len == 0:
336             salt_size = 4
337         else:
338             salt_size = 0
339
340         l = (prf_key_trunc +
341              integ_key_len * 2 +
342              encr_key_len * 2 +
343              tr_prf_key_len * 2 +
344              salt_size * 2)
345         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
346
347         pos = 0
348         self.sk_d = keymat[:pos+prf_key_trunc]
349         pos += prf_key_trunc
350
351         self.sk_ai = keymat[pos:pos+integ_key_len]
352         pos += integ_key_len
353         self.sk_ar = keymat[pos:pos+integ_key_len]
354         pos += integ_key_len
355
356         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
357         pos += encr_key_len + salt_size
358         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
359         pos += encr_key_len + salt_size
360
361         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
362         pos += tr_prf_key_len
363         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
364
365     def generate_authmsg(self, prf, packet):
366         if self.is_initiator:
367             id = self.i_id
368             nonce = self.r_nonce
369             key = self.sk_pi
370         else:
371             id = self.r_id
372             nonce = self.i_nonce
373             key = self.sk_pr
374         data = bytes([self.id_type, 0, 0, 0]) + id
375         id_hash = self.calc_prf(prf, key, data)
376         return packet + nonce + id_hash
377
378     def auth_init(self):
379         prf = self.ike_prf_alg.mod()
380         if self.is_initiator:
381             packet = self.init_req_packet
382         else:
383             packet = self.init_resp_packet
384         authmsg = self.generate_authmsg(prf, raw(packet))
385         if self.auth_method == 'shared-key':
386             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
387             self.auth_data = self.calc_prf(prf, psk, authmsg)
388         elif self.auth_method == 'rsa-sig':
389             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
390                                                 hashes.SHA1())
391         else:
392             raise TypeError('unknown auth method type!')
393
394     def encrypt(self, data, aad=None):
395         data = self.ike_crypto_alg.pad(data)
396         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
397
398     @property
399     def peer_authkey(self):
400         if self.is_initiator:
401             return self.sk_ar
402         return self.sk_ai
403
404     @property
405     def my_authkey(self):
406         if self.is_initiator:
407             return self.sk_ai
408         return self.sk_ar
409
410     @property
411     def my_cryptokey(self):
412         if self.is_initiator:
413             return self.sk_ei
414         return self.sk_er
415
416     @property
417     def peer_cryptokey(self):
418         if self.is_initiator:
419             return self.sk_er
420         return self.sk_ei
421
422     def concat(self, alg, key_len):
423         return alg + '-' + str(key_len * 8)
424
425     @property
426     def vpp_ike_cypto_alg(self):
427         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
428
429     @property
430     def vpp_esp_cypto_alg(self):
431         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
432
433     def verify_hmac(self, ikemsg):
434         integ_trunc = self.ike_integ_alg.trunc_len
435         exp_hmac = ikemsg[-integ_trunc:]
436         data = ikemsg[:-integ_trunc]
437         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
438                                           self.peer_authkey, data)
439         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
440
441     def compute_hmac(self, integ, key, data):
442         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
443         h.update(data)
444         return h.finalize()
445
446     def decrypt(self, data, aad=None, icv=None):
447         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
448
449     def hmac_and_decrypt(self, ike):
450         ep = ike[ikev2.IKEv2_payload_Encrypted]
451         if self.ike_crypto == 'AES-GCM-16ICV':
452             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
453             ct = ep.load[:-GCM_ICV_SIZE]
454             tag = ep.load[-GCM_ICV_SIZE:]
455             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
456         else:
457             self.verify_hmac(raw(ike))
458             integ_trunc = self.ike_integ_alg.trunc_len
459
460             # remove ICV and decrypt payload
461             ct = ep.load[:-integ_trunc]
462             plain = self.decrypt(ct)
463         # remove padding
464         pad_len = plain[-1]
465         return plain[:-pad_len - 1]
466
467     def build_ts_addr(self, ts, version):
468         return {'starting_address_v' + version: ts['start_addr'],
469                 'ending_address_v' + version: ts['end_addr']}
470
471     def generate_ts(self, is_ip4):
472         c = self.child_sas[0]
473         ts_data = {'IP_protocol_ID': 0,
474                    'start_port': 0,
475                    'end_port': 0xffff}
476         if is_ip4:
477             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
478             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
479             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
480             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
481         else:
482             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
483             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
484             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
485             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
486
487         if self.is_initiator:
488             return ([ts1], [ts2])
489         return ([ts2], [ts1])
490
491     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
492         if crypto not in CRYPTO_ALGOS:
493             raise TypeError('unsupported encryption algo %r' % crypto)
494         self.ike_crypto = crypto
495         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
496         self.ike_crypto_key_len = crypto_key_len
497
498         if integ not in AUTH_ALGOS:
499             raise TypeError('unsupported auth algo %r' % integ)
500         self.ike_integ = None if integ == 'NULL' else integ
501         self.ike_integ_alg = AUTH_ALGOS[integ]
502
503         if prf not in PRF_ALGOS:
504             raise TypeError('unsupported prf algo %r' % prf)
505         self.ike_prf = prf
506         self.ike_prf_alg = PRF_ALGOS[prf]
507         self.ike_dh = dh
508         self.ike_group = DH[self.ike_dh]
509
510     def set_esp_props(self, crypto, crypto_key_len, integ):
511         self.esp_crypto_key_len = crypto_key_len
512         if crypto not in CRYPTO_ALGOS:
513             raise TypeError('unsupported encryption algo %r' % crypto)
514         self.esp_crypto = crypto
515         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
516
517         if integ not in AUTH_ALGOS:
518             raise TypeError('unsupported auth algo %r' % integ)
519         self.esp_integ = None if integ == 'NULL' else integ
520         self.esp_integ_alg = AUTH_ALGOS[integ]
521
522     def crypto_attr(self, key_len):
523         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
524             return (0x800e << 16 | key_len << 3, 12)
525         else:
526             raise Exception('unsupported attribute type')
527
528     def ike_crypto_attr(self):
529         return self.crypto_attr(self.ike_crypto_key_len)
530
531     def esp_crypto_attr(self):
532         return self.crypto_attr(self.esp_crypto_key_len)
533
534     def compute_nat_sha1(self, ip, port, rspi=None):
535         if rspi is None:
536             rspi = self.rspi
537         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
538         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
539         digest.update(data)
540         return digest.finalize()
541
542
543 class IkePeer(VppTestCase):
544     """ common class for initiator and responder """
545
546     @classmethod
547     def setUpClass(cls):
548         import scapy.contrib.ikev2 as _ikev2
549         globals()['ikev2'] = _ikev2
550         super(IkePeer, cls).setUpClass()
551         cls.create_pg_interfaces(range(2))
552         for i in cls.pg_interfaces:
553             i.admin_up()
554             i.config_ip4()
555             i.resolve_arp()
556             i.config_ip6()
557             i.resolve_ndp()
558
559     @classmethod
560     def tearDownClass(cls):
561         super(IkePeer, cls).tearDownClass()
562
563     def tearDown(self):
564         super(IkePeer, self).tearDown()
565         if self.del_sa_from_responder:
566             self.initiate_del_sa_from_responder()
567         else:
568             self.initiate_del_sa_from_initiator()
569         r = self.vapi.ikev2_sa_dump()
570         self.assertEqual(len(r), 0)
571         sas = self.vapi.ipsec_sa_dump()
572         self.assertEqual(len(sas), 0)
573         self.p.remove_vpp_config()
574         self.assertIsNone(self.p.query_vpp_config())
575
576     def setUp(self):
577         super(IkePeer, self).setUp()
578         self.config_tc()
579         self.p.add_vpp_config()
580         self.assertIsNotNone(self.p.query_vpp_config())
581         if self.sa.is_initiator:
582             self.sa.generate_dh_data()
583         self.vapi.cli('ikev2 set logging level 4')
584         self.vapi.cli('event-lo clear')
585         self.vapi.cli('ikev2 dpd disable')
586
587     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
588                       use_ip6=False):
589         if use_ip6:
590             src_ip = src_if.remote_ip6
591             dst_ip = src_if.local_ip6
592             ip_layer = IPv6
593         else:
594             src_ip = src_if.remote_ip4
595             dst_ip = src_if.local_ip4
596             ip_layer = IP
597         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
598                ip_layer(src=src_ip, dst=dst_ip) /
599                UDP(sport=sport, dport=dport))
600         if natt:
601             # insert non ESP marker
602             res = res / Raw(b'\x00' * 4)
603         return res / msg
604
605     def verify_udp(self, udp):
606         self.assertEqual(udp.sport, self.sa.sport)
607         self.assertEqual(udp.dport, self.sa.dport)
608
609     def get_ike_header(self, packet):
610         try:
611             ih = packet[ikev2.IKEv2]
612         except IndexError as e:
613             # this is a workaround for getting IKEv2 layer as both ikev2 and
614             # ipsec register for port 4500
615             esp = packet[ESP]
616             ih = self.verify_and_remove_non_esp_marker(esp)
617         self.assertEqual(ih.version, 0x20)
618         self.assertNotIn('Version', ih.flags)
619         return ih
620
621     def verify_and_remove_non_esp_marker(self, packet):
622         if self.sa.natt:
623             # if we are in nat traversal mode check for non esp marker
624             # and remove it
625             data = raw(packet)
626             self.assertEqual(data[:4], b'\x00' * 4)
627             return ikev2.IKEv2(data[4:])
628         else:
629             return packet
630
631     def encrypt_ike_msg(self, header, plain, first_payload):
632         if self.sa.ike_crypto == 'AES-GCM-16ICV':
633             data = self.sa.ike_crypto_alg.pad(raw(plain))
634             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
635                 len(ikev2.IKEv2_payload_Encrypted())
636             tlen = plen + len(ikev2.IKEv2())
637
638             # prepare aad data
639             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
640                                                  length=plen)
641             header.length = tlen
642             res = header / sk_p
643             encr = self.sa.encrypt(raw(plain), raw(res))
644             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
645                                                  length=plen, load=encr)
646             res = header / sk_p
647         else:
648             encr = self.sa.encrypt(raw(plain))
649             trunc_len = self.sa.ike_integ_alg.trunc_len
650             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
651             tlen = plen + len(ikev2.IKEv2())
652
653             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
654                                                  length=plen, load=encr)
655             header.length = tlen
656             res = header / sk_p
657
658             integ_data = raw(res)
659             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
660                                              self.sa.my_authkey, integ_data)
661             res = res / Raw(hmac_data[:trunc_len])
662         assert(len(res) == tlen)
663         return res
664
665     def verify_ipsec_sas(self, is_rekey=False):
666         sas = self.vapi.ipsec_sa_dump()
667         if is_rekey:
668             # after rekey there is a short period of time in which old
669             # inbound SA is still present
670             sa_count = 3
671         else:
672             sa_count = 2
673         self.assertEqual(len(sas), sa_count)
674         e = VppEnum.vl_api_ipsec_sad_flags_t
675         if self.sa.is_initiator:
676             if is_rekey:
677                 sa0 = sas[0].entry
678                 sa1 = sas[2].entry
679             else:
680                 sa0 = sas[0].entry
681                 sa1 = sas[1].entry
682         else:
683             if is_rekey:
684                 sa0 = sas[2].entry
685                 sa1 = sas[0].entry
686             else:
687                 sa1 = sas[0].entry
688                 sa0 = sas[1].entry
689
690         c = self.sa.child_sas[0]
691
692         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
693         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
694         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
695
696         if self.sa.esp_integ is None:
697             vpp_integ_alg = 0
698         else:
699             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
700         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
701         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
702
703         # verify crypto keys
704         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
705         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
706         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
707         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
708
709         # verify integ keys
710         if vpp_integ_alg:
711             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
712             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
713             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
714             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
715         else:
716             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
717             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
718
719     def verify_keymat(self, api_keys, keys, name):
720         km = getattr(keys, name)
721         api_km = getattr(api_keys, name)
722         api_km_len = getattr(api_keys, name + '_len')
723         self.assertEqual(len(km), api_km_len)
724         self.assertEqual(km, api_km[:api_km_len])
725
726     def verify_id(self, api_id, exp_id):
727         self.assertEqual(api_id.type, IDType.value(exp_id.type))
728         self.assertEqual(api_id.data_len, exp_id.data_len)
729         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
730
731     def verify_ike_sas(self):
732         r = self.vapi.ikev2_sa_dump()
733         self.assertEqual(len(r), 1)
734         sa = r[0].sa
735         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
736         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
737         if self.ip6:
738             if self.sa.is_initiator:
739                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
740                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
741             else:
742                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
743                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
744         else:
745             if self.sa.is_initiator:
746                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
747                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
748             else:
749                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
750                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
751         self.verify_keymat(sa.keys, self.sa, 'sk_d')
752         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
753         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
754         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
755         self.verify_keymat(sa.keys, self.sa, 'sk_er')
756         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
757         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
758
759         self.assertEqual(sa.i_id.type, self.sa.id_type)
760         self.assertEqual(sa.r_id.type, self.sa.id_type)
761         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
762         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
763         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
764         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
765
766         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
767         self.assertEqual(len(r), 1)
768         csa = r[0].child_sa
769         self.assertEqual(csa.sa_index, sa.sa_index)
770         c = self.sa.child_sas[0]
771         if hasattr(c, 'sk_ai'):
772             self.verify_keymat(csa.keys, c, 'sk_ai')
773             self.verify_keymat(csa.keys, c, 'sk_ar')
774         self.verify_keymat(csa.keys, c, 'sk_ei')
775         self.verify_keymat(csa.keys, c, 'sk_er')
776         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
777         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
778
779         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
780         tsi = tsi[0]
781         tsr = tsr[0]
782         r = self.vapi.ikev2_traffic_selector_dump(
783                 is_initiator=True, sa_index=sa.sa_index,
784                 child_sa_index=csa.child_sa_index)
785         self.assertEqual(len(r), 1)
786         ts = r[0].ts
787         self.verify_ts(r[0].ts, tsi[0], True)
788
789         r = self.vapi.ikev2_traffic_selector_dump(
790                 is_initiator=False, sa_index=sa.sa_index,
791                 child_sa_index=csa.child_sa_index)
792         self.assertEqual(len(r), 1)
793         self.verify_ts(r[0].ts, tsr[0], False)
794
795         n = self.vapi.ikev2_nonce_get(is_initiator=True,
796                                       sa_index=sa.sa_index)
797         self.verify_nonce(n, self.sa.i_nonce)
798         n = self.vapi.ikev2_nonce_get(is_initiator=False,
799                                       sa_index=sa.sa_index)
800         self.verify_nonce(n, self.sa.r_nonce)
801
802     def verify_nonce(self, api_nonce, nonce):
803         self.assertEqual(api_nonce.data_len, len(nonce))
804         self.assertEqual(api_nonce.nonce, nonce)
805
806     def verify_ts(self, api_ts, ts, is_initiator):
807         if is_initiator:
808             self.assertTrue(api_ts.is_local)
809         else:
810             self.assertFalse(api_ts.is_local)
811
812         if self.p.ts_is_ip4:
813             self.assertEqual(api_ts.start_addr,
814                              IPv4Address(ts.starting_address_v4))
815             self.assertEqual(api_ts.end_addr,
816                              IPv4Address(ts.ending_address_v4))
817         else:
818             self.assertEqual(api_ts.start_addr,
819                              IPv6Address(ts.starting_address_v6))
820             self.assertEqual(api_ts.end_addr,
821                              IPv6Address(ts.ending_address_v6))
822         self.assertEqual(api_ts.start_port, ts.start_port)
823         self.assertEqual(api_ts.end_port, ts.end_port)
824         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
825
826
827 class TemplateInitiator(IkePeer):
828     """ initiator test template """
829
830     def initiate_del_sa_from_initiator(self):
831         ispi = int.from_bytes(self.sa.ispi, 'little')
832         self.pg0.enable_capture()
833         self.pg_start()
834         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
835         capture = self.pg0.get_capture(1)
836         ih = self.get_ike_header(capture[0])
837         self.assertNotIn('Response', ih.flags)
838         self.assertIn('Initiator', ih.flags)
839         self.assertEqual(ih.init_SPI, self.sa.ispi)
840         self.assertEqual(ih.resp_SPI, self.sa.rspi)
841         plain = self.sa.hmac_and_decrypt(ih)
842         d = ikev2.IKEv2_payload_Delete(plain)
843         self.assertEqual(d.proto, 1)  # proto=IKEv2
844         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
845                              flags='Response', exch_type='INFORMATIONAL',
846                              id=ih.id, next_payload='Encrypted')
847         resp = self.encrypt_ike_msg(header, b'', None)
848         self.send_and_assert_no_replies(self.pg0, resp)
849
850     def verify_del_sa(self, packet):
851         ih = self.get_ike_header(packet)
852         self.assertEqual(ih.id, self.sa.msg_id)
853         self.assertEqual(ih.exch_type, 37)  # exchange informational
854         self.assertIn('Response', ih.flags)
855         self.assertIn('Initiator', ih.flags)
856         plain = self.sa.hmac_and_decrypt(ih)
857         self.assertEqual(plain, b'')
858
859     def initiate_del_sa_from_responder(self):
860         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
861                              exch_type='INFORMATIONAL',
862                              id=self.sa.new_msg_id())
863         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
864         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
865         packet = self.create_packet(self.pg0, ike_msg,
866                                     self.sa.sport, self.sa.dport,
867                                     self.sa.natt, self.ip6)
868         self.pg0.add_stream(packet)
869         self.pg0.enable_capture()
870         self.pg_start()
871         capture = self.pg0.get_capture(1)
872         self.verify_del_sa(capture[0])
873
874     @staticmethod
875     def find_notify_payload(packet, notify_type):
876         n = packet[ikev2.IKEv2_payload_Notify]
877         while n is not None:
878             if n.type == notify_type:
879                 return n
880             n = n.payload
881         return None
882
883     def verify_nat_detection(self, packet):
884         if self.ip6:
885             iph = packet[IPv6]
886         else:
887             iph = packet[IP]
888         udp = packet[UDP]
889
890         # NAT_DETECTION_SOURCE_IP
891         s = self.find_notify_payload(packet, 16388)
892         self.assertIsNotNone(s)
893         src_sha = self.sa.compute_nat_sha1(
894                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
895         self.assertEqual(s.load, src_sha)
896
897         # NAT_DETECTION_DESTINATION_IP
898         s = self.find_notify_payload(packet, 16389)
899         self.assertIsNotNone(s)
900         dst_sha = self.sa.compute_nat_sha1(
901                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
902         self.assertEqual(s.load, dst_sha)
903
904     def verify_sa_init_request(self, packet):
905         ih = packet[ikev2.IKEv2]
906         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
907         self.assertEqual(ih.exch_type, 34)  # SA_INIT
908         self.sa.ispi = ih.init_SPI
909         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
910         self.assertIn('Initiator', ih.flags)
911         self.assertNotIn('Response', ih.flags)
912         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
913         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
914
915         prop = packet[ikev2.IKEv2_payload_Proposal]
916         self.assertEqual(prop.proto, 1)  # proto = ikev2
917         self.assertEqual(prop.proposal, 1)
918         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
919         self.assertEqual(prop.trans[0].transform_id,
920                          self.p.ike_transforms['crypto_alg'])
921         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
922         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
923         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
924         self.assertEqual(prop.trans[2].transform_id,
925                          self.p.ike_transforms['dh_group'])
926
927         self.verify_nat_detection(packet)
928         self.sa.set_ike_props(
929                     crypto='AES-GCM-16ICV', crypto_key_len=32,
930                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
931         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
932                               integ='SHA2-256-128')
933         self.sa.generate_dh_data()
934         self.sa.complete_dh_data()
935         self.sa.calc_keys()
936
937     def update_esp_transforms(self, trans, sa):
938         while trans:
939             if trans.transform_type == 1:  # ecryption
940                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
941             elif trans.transform_type == 3:  # integrity
942                 sa.esp_integ = INTEG_IDS[trans.transform_id]
943             trans = trans.payload
944
945     def verify_sa_auth_req(self, packet):
946         ih = self.get_ike_header(packet)
947         self.assertEqual(ih.resp_SPI, self.sa.rspi)
948         self.assertEqual(ih.init_SPI, self.sa.ispi)
949         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
950         self.assertIn('Initiator', ih.flags)
951         self.assertNotIn('Response', ih.flags)
952
953         udp = packet[UDP]
954         self.verify_udp(udp)
955         self.assertEqual(ih.id, self.sa.msg_id + 1)
956         self.sa.msg_id += 1
957         plain = self.sa.hmac_and_decrypt(ih)
958         idi = ikev2.IKEv2_payload_IDi(plain)
959         idr = ikev2.IKEv2_payload_IDr(idi.payload)
960         self.assertEqual(idi.load, self.sa.i_id)
961         self.assertEqual(idr.load, self.sa.r_id)
962         prop = idi[ikev2.IKEv2_payload_Proposal]
963         c = self.sa.child_sas[0]
964         c.ispi = prop.SPI
965         self.update_esp_transforms(
966                 prop[ikev2.IKEv2_payload_Transform], self.sa)
967
968     def send_init_response(self):
969         tr_attr = self.sa.ike_crypto_attr()
970         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
971                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
972                  key_length=tr_attr[0]) /
973                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
974                  transform_id=self.sa.ike_integ) /
975                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
976                  transform_id=self.sa.ike_prf_alg.name) /
977                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
978                  transform_id=self.sa.ike_dh))
979         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
980                  trans_nb=4, trans=trans))
981         self.sa.init_resp_packet = (
982             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
983                         exch_type='IKE_SA_INIT', flags='Response') /
984             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
985             ikev2.IKEv2_payload_KE(next_payload='Nonce',
986                                    group=self.sa.ike_dh,
987                                    load=self.sa.my_dh_pub_key) /
988             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
989
990         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
991                                      self.sa.sport, self.sa.dport,
992                                      self.sa.natt, self.ip6)
993         self.pg_send(self.pg0, ike_msg)
994         capture = self.pg0.get_capture(1)
995         self.verify_sa_auth_req(capture[0])
996
997     def initiate_sa_init(self):
998         self.pg0.enable_capture()
999         self.pg_start()
1000         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1001
1002         capture = self.pg0.get_capture(1)
1003         self.verify_sa_init_request(capture[0])
1004         self.send_init_response()
1005
1006     def send_auth_response(self):
1007         tr_attr = self.sa.esp_crypto_attr()
1008         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1009                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1010                  key_length=tr_attr[0]) /
1011                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1012                  transform_id=self.sa.esp_integ) /
1013                  ikev2.IKEv2_payload_Transform(
1014                  transform_type='Extended Sequence Number',
1015                  transform_id='No ESN') /
1016                  ikev2.IKEv2_payload_Transform(
1017                  transform_type='Extended Sequence Number',
1018                  transform_id='ESN'))
1019
1020         c = self.sa.child_sas[0]
1021         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1022                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1023
1024         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1025         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1026                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1027                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1028                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1029                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1030                  auth_type=AuthMethod.value(self.sa.auth_method),
1031                  load=self.sa.auth_data) /
1032                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1033                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1034                  number_of_TSs=len(tsi),
1035                  traffic_selector=tsi) /
1036                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1037                  number_of_TSs=len(tsr),
1038                  traffic_selector=tsr) /
1039                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1040
1041         header = ikev2.IKEv2(
1042                 init_SPI=self.sa.ispi,
1043                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1044                 flags='Response', exch_type='IKE_AUTH')
1045
1046         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1047         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1048                                     self.sa.dport, self.sa.natt, self.ip6)
1049         self.pg_send(self.pg0, packet)
1050
1051     def test_initiator(self):
1052         self.initiate_sa_init()
1053         self.sa.auth_init()
1054         self.sa.calc_child_keys()
1055         self.send_auth_response()
1056         self.verify_ike_sas()
1057
1058
1059 class TemplateResponder(IkePeer):
1060     """ responder test template """
1061
1062     def initiate_del_sa_from_responder(self):
1063         self.pg0.enable_capture()
1064         self.pg_start()
1065         self.vapi.ikev2_initiate_del_ike_sa(
1066                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1067         capture = self.pg0.get_capture(1)
1068         ih = self.get_ike_header(capture[0])
1069         self.assertNotIn('Response', ih.flags)
1070         self.assertNotIn('Initiator', ih.flags)
1071         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1072         plain = self.sa.hmac_and_decrypt(ih)
1073         d = ikev2.IKEv2_payload_Delete(plain)
1074         self.assertEqual(d.proto, 1)  # proto=IKEv2
1075         self.assertEqual(ih.init_SPI, self.sa.ispi)
1076         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1077         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1078                              flags='Initiator+Response',
1079                              exch_type='INFORMATIONAL',
1080                              id=ih.id, next_payload='Encrypted')
1081         resp = self.encrypt_ike_msg(header, b'', None)
1082         self.send_and_assert_no_replies(self.pg0, resp)
1083
1084     def verify_del_sa(self, packet):
1085         ih = self.get_ike_header(packet)
1086         self.assertEqual(ih.id, self.sa.msg_id)
1087         self.assertEqual(ih.exch_type, 37)  # exchange informational
1088         self.assertIn('Response', ih.flags)
1089         self.assertNotIn('Initiator', ih.flags)
1090         self.assertEqual(ih.next_payload, 46)  # Encrypted
1091         self.assertEqual(ih.init_SPI, self.sa.ispi)
1092         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1093         plain = self.sa.hmac_and_decrypt(ih)
1094         self.assertEqual(plain, b'')
1095
1096     def initiate_del_sa_from_initiator(self):
1097         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1098                              flags='Initiator', exch_type='INFORMATIONAL',
1099                              id=self.sa.new_msg_id())
1100         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1101         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1102         packet = self.create_packet(self.pg0, ike_msg,
1103                                     self.sa.sport, self.sa.dport,
1104                                     self.sa.natt, self.ip6)
1105         self.pg0.add_stream(packet)
1106         self.pg0.enable_capture()
1107         self.pg_start()
1108         capture = self.pg0.get_capture(1)
1109         self.verify_del_sa(capture[0])
1110
1111     def send_sa_init_req(self, behind_nat=False):
1112         tr_attr = self.sa.ike_crypto_attr()
1113         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1114                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1115                  key_length=tr_attr[0]) /
1116                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1117                  transform_id=self.sa.ike_integ) /
1118                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1119                  transform_id=self.sa.ike_prf_alg.name) /
1120                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1121                  transform_id=self.sa.ike_dh))
1122
1123         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1124                  trans_nb=4, trans=trans))
1125
1126         self.sa.init_req_packet = (
1127                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1128                             flags='Initiator', exch_type='IKE_SA_INIT') /
1129                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1130                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1131                                        group=self.sa.ike_dh,
1132                                        load=self.sa.my_dh_pub_key) /
1133                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1134                                           load=self.sa.i_nonce))
1135
1136         if behind_nat:
1137             src_address = b'\x0a\x0a\x0a\x01'
1138         else:
1139             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1140
1141         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1142         dst_nat = self.sa.compute_nat_sha1(
1143                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1144                 self.sa.dport)
1145         nat_src_detection = ikev2.IKEv2_payload_Notify(
1146                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1147                 next_payload='Notify')
1148         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1149                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1150         self.sa.init_req_packet = (self.sa.init_req_packet /
1151                                    nat_src_detection /
1152                                    nat_dst_detection)
1153
1154         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1155                                      self.sa.sport, self.sa.dport,
1156                                      self.sa.natt, self.ip6)
1157         self.pg0.add_stream(ike_msg)
1158         self.pg0.enable_capture()
1159         self.pg_start()
1160         capture = self.pg0.get_capture(1)
1161         self.verify_sa_init(capture[0])
1162
1163     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1164         tr_attr = self.sa.esp_crypto_attr()
1165         last_payload = last_payload or 'Notify'
1166         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1167                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1168                  key_length=tr_attr[0]) /
1169                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1170                  transform_id=self.sa.esp_integ) /
1171                  ikev2.IKEv2_payload_Transform(
1172                  transform_type='Extended Sequence Number',
1173                  transform_id='No ESN') /
1174                  ikev2.IKEv2_payload_Transform(
1175                  transform_type='Extended Sequence Number',
1176                  transform_id='ESN'))
1177
1178         c = self.sa.child_sas[0]
1179         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1180                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1181
1182         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1183         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1184                  auth_type=AuthMethod.value(self.sa.auth_method),
1185                  load=self.sa.auth_data) /
1186                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1187                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1188                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1189                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1190                  number_of_TSs=len(tsr), traffic_selector=tsr))
1191
1192         if is_rekey:
1193             first_payload = 'Nonce'
1194             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1195                      next_payload='SA') / plain /
1196                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1197                      proto='ESP', SPI=c.ispi))
1198         else:
1199             first_payload = 'IDi'
1200             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1201                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1202                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1203                    IDtype=self.sa.id_type, load=self.sa.r_id))
1204             plain = ids / plain
1205         return plain, first_payload
1206
1207     def send_sa_auth(self):
1208         plain, first_payload = self.generate_auth_payload(
1209                     last_payload='Notify')
1210         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1211         header = ikev2.IKEv2(
1212                 init_SPI=self.sa.ispi,
1213                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1214                 flags='Initiator', exch_type='IKE_AUTH')
1215
1216         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1217         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1218                                     self.sa.dport, self.sa.natt, self.ip6)
1219         self.pg0.add_stream(packet)
1220         self.pg0.enable_capture()
1221         self.pg_start()
1222         capture = self.pg0.get_capture(1)
1223         self.verify_sa_auth_resp(capture[0])
1224
1225     def verify_sa_init(self, packet):
1226         ih = self.get_ike_header(packet)
1227
1228         self.assertEqual(ih.id, self.sa.msg_id)
1229         self.assertEqual(ih.exch_type, 34)
1230         self.assertIn('Response', ih.flags)
1231         self.assertEqual(ih.init_SPI, self.sa.ispi)
1232         self.assertNotEqual(ih.resp_SPI, 0)
1233         self.sa.rspi = ih.resp_SPI
1234         try:
1235             sa = ih[ikev2.IKEv2_payload_SA]
1236             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1237             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1238         except IndexError as e:
1239             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1240             self.logger.error(ih.show())
1241             raise
1242         self.sa.complete_dh_data()
1243         self.sa.calc_keys()
1244         self.sa.auth_init()
1245
1246     def verify_sa_auth_resp(self, packet):
1247         ike = self.get_ike_header(packet)
1248         udp = packet[UDP]
1249         self.verify_udp(udp)
1250         self.assertEqual(ike.id, self.sa.msg_id)
1251         plain = self.sa.hmac_and_decrypt(ike)
1252         idr = ikev2.IKEv2_payload_IDr(plain)
1253         prop = idr[ikev2.IKEv2_payload_Proposal]
1254         self.assertEqual(prop.SPIsize, 4)
1255         self.sa.child_sas[0].rspi = prop.SPI
1256         self.sa.calc_child_keys()
1257
1258     def test_responder(self):
1259         self.send_sa_init_req(self.sa.natt)
1260         self.send_sa_auth()
1261         self.verify_ipsec_sas()
1262         self.verify_ike_sas()
1263
1264
1265 class Ikev2Params(object):
1266     def config_params(self, params={}):
1267         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1268         ei = VppEnum.vl_api_ipsec_integ_alg_t
1269         self.vpp_enums = {
1270                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1271                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1272                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1273                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1274                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1275                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1276
1277                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1278                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1279                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1280                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1281
1282         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1283             not in params else params['del_sa_from_responder']
1284         is_natt = 'natt' in params and params['natt'] or False
1285         self.p = Profile(self, 'pr1')
1286         self.ip6 = False if 'ip6' not in params else params['ip6']
1287
1288         if 'auth' in params and params['auth'] == 'rsa-sig':
1289             auth_method = 'rsa-sig'
1290             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1291             self.vapi.ikev2_set_local_key(
1292                     key_file=work_dir + params['server-key'])
1293
1294             client_file = work_dir + params['client-cert']
1295             server_pem = open(work_dir + params['server-cert']).read()
1296             client_priv = open(work_dir + params['client-key']).read()
1297             client_priv = load_pem_private_key(str.encode(client_priv), None,
1298                                                default_backend())
1299             self.peer_cert = x509.load_pem_x509_certificate(
1300                     str.encode(server_pem),
1301                     default_backend())
1302             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1303             auth_data = None
1304         else:
1305             auth_data = b'$3cr3tpa$$w0rd'
1306             self.p.add_auth(method='shared-key', data=auth_data)
1307             auth_method = 'shared-key'
1308             client_priv = None
1309
1310         is_init = True if 'is_initiator' not in params else\
1311             params['is_initiator']
1312
1313         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1314         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1315         if is_init:
1316             self.p.add_local_id(**idr)
1317             self.p.add_remote_id(**idi)
1318         else:
1319             self.p.add_local_id(**idi)
1320             self.p.add_remote_id(**idr)
1321
1322         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1323             'loc_ts' not in params else params['loc_ts']
1324         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1325             'rem_ts' not in params else params['rem_ts']
1326         self.p.add_local_ts(**loc_ts)
1327         self.p.add_remote_ts(**rem_ts)
1328         if 'responder' in params:
1329             self.p.add_responder(params['responder'])
1330         if 'ike_transforms' in params:
1331             self.p.add_ike_transforms(params['ike_transforms'])
1332         if 'esp_transforms' in params:
1333             self.p.add_esp_transforms(params['esp_transforms'])
1334
1335         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1336                           is_initiator=is_init,
1337                           id_type=self.p.local_id['id_type'], natt=is_natt,
1338                           priv_key=client_priv, auth_method=auth_method,
1339                           auth_data=auth_data,
1340                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1341
1342         if is_init:
1343             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1344                 params['ike-crypto']
1345             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1346                 params['ike-integ']
1347             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1348                 params['ike-dh']
1349
1350             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1351                 params['esp-crypto']
1352             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1353                 params['esp-integ']
1354
1355             self.sa.set_ike_props(
1356                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1357                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1358             self.sa.set_esp_props(
1359                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1360                     integ=esp_integ)
1361
1362
1363 class TestApi(VppTestCase):
1364     """ Test IKEV2 API """
1365     @classmethod
1366     def setUpClass(cls):
1367         super(TestApi, cls).setUpClass()
1368
1369     @classmethod
1370     def tearDownClass(cls):
1371         super(TestApi, cls).tearDownClass()
1372
1373     def tearDown(self):
1374         super(TestApi, self).tearDown()
1375         self.p1.remove_vpp_config()
1376         self.p2.remove_vpp_config()
1377         r = self.vapi.ikev2_profile_dump()
1378         self.assertEqual(len(r), 0)
1379
1380     def configure_profile(self, cfg):
1381         p = Profile(self, cfg['name'])
1382         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1383         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1384         p.add_local_ts(**cfg['loc_ts'])
1385         p.add_remote_ts(**cfg['rem_ts'])
1386         p.add_responder(cfg['responder'])
1387         p.add_ike_transforms(cfg['ike_ts'])
1388         p.add_esp_transforms(cfg['esp_ts'])
1389         p.add_auth(**cfg['auth'])
1390         p.set_udp_encap(cfg['udp_encap'])
1391         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1392         if 'lifetime_data' in cfg:
1393             p.set_lifetime_data(cfg['lifetime_data'])
1394         if 'tun_itf' in cfg:
1395             p.set_tunnel_interface(cfg['tun_itf'])
1396         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1397             p.disable_natt()
1398         p.add_vpp_config()
1399         return p
1400
1401     def test_profile_api(self):
1402         """ test profile dump API """
1403         loc_ts4 = {
1404                     'proto': 8,
1405                     'start_port': 1,
1406                     'end_port': 19,
1407                     'start_addr': '3.3.3.2',
1408                     'end_addr': '3.3.3.3',
1409                 }
1410         rem_ts4 = {
1411                     'proto': 9,
1412                     'start_port': 10,
1413                     'end_port': 119,
1414                     'start_addr': '4.5.76.80',
1415                     'end_addr': '2.3.4.6',
1416                 }
1417
1418         loc_ts6 = {
1419                     'proto': 8,
1420                     'start_port': 1,
1421                     'end_port': 19,
1422                     'start_addr': 'ab::1',
1423                     'end_addr': 'ab::4',
1424                 }
1425         rem_ts6 = {
1426                     'proto': 9,
1427                     'start_port': 10,
1428                     'end_port': 119,
1429                     'start_addr': 'cd::12',
1430                     'end_addr': 'cd::13',
1431                 }
1432
1433         conf = {
1434             'p1': {
1435                 'name': 'p1',
1436                 'natt_disabled': True,
1437                 'loc_id': ('fqdn', b'vpp.home'),
1438                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1439                 'loc_ts': loc_ts4,
1440                 'rem_ts': rem_ts4,
1441                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1442                 'ike_ts': {
1443                         'crypto_alg': 20,
1444                         'crypto_key_size': 32,
1445                         'integ_alg': 1,
1446                         'dh_group': 1},
1447                 'esp_ts': {
1448                         'crypto_alg': 13,
1449                         'crypto_key_size': 24,
1450                         'integ_alg': 2},
1451                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1452                 'udp_encap': True,
1453                 'ipsec_over_udp_port': 4501,
1454                 'lifetime_data': {
1455                     'lifetime': 123,
1456                     'lifetime_maxdata': 20192,
1457                     'lifetime_jitter': 9,
1458                     'handover': 132},
1459             },
1460             'p2': {
1461                 'name': 'p2',
1462                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1463                 'rem_id': ('ip6-addr', b'abcd::1'),
1464                 'loc_ts': loc_ts6,
1465                 'rem_ts': rem_ts6,
1466                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1467                 'ike_ts': {
1468                         'crypto_alg': 12,
1469                         'crypto_key_size': 16,
1470                         'integ_alg': 3,
1471                         'dh_group': 3},
1472                 'esp_ts': {
1473                         'crypto_alg': 9,
1474                         'crypto_key_size': 24,
1475                         'integ_alg': 4},
1476                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1477                 'udp_encap': False,
1478                 'ipsec_over_udp_port': 4600,
1479                 'tun_itf': 0}
1480         }
1481         self.p1 = self.configure_profile(conf['p1'])
1482         self.p2 = self.configure_profile(conf['p2'])
1483
1484         r = self.vapi.ikev2_profile_dump()
1485         self.assertEqual(len(r), 2)
1486         self.verify_profile(r[0].profile, conf['p1'])
1487         self.verify_profile(r[1].profile, conf['p2'])
1488
1489     def verify_id(self, api_id, cfg_id):
1490         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1491         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1492
1493     def verify_ts(self, api_ts, cfg_ts):
1494         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1495         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1496         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1497         self.assertEqual(api_ts.start_addr,
1498                          ip_address(text_type(cfg_ts['start_addr'])))
1499         self.assertEqual(api_ts.end_addr,
1500                          ip_address(text_type(cfg_ts['end_addr'])))
1501
1502     def verify_responder(self, api_r, cfg_r):
1503         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1504         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1505
1506     def verify_transforms(self, api_ts, cfg_ts):
1507         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1508         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1509         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1510
1511     def verify_ike_transforms(self, api_ts, cfg_ts):
1512         self.verify_transforms(api_ts, cfg_ts)
1513         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1514
1515     def verify_esp_transforms(self, api_ts, cfg_ts):
1516         self.verify_transforms(api_ts, cfg_ts)
1517
1518     def verify_auth(self, api_auth, cfg_auth):
1519         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1520         self.assertEqual(api_auth.data, cfg_auth['data'])
1521         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1522
1523     def verify_lifetime_data(self, p, ld):
1524         self.assertEqual(p.lifetime, ld['lifetime'])
1525         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1526         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1527         self.assertEqual(p.handover, ld['handover'])
1528
1529     def verify_profile(self, ap, cp):
1530         self.assertEqual(ap.name, cp['name'])
1531         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1532         self.verify_id(ap.loc_id, cp['loc_id'])
1533         self.verify_id(ap.rem_id, cp['rem_id'])
1534         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1535         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1536         self.verify_responder(ap.responder, cp['responder'])
1537         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1538         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1539         self.verify_auth(ap.auth, cp['auth'])
1540         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1541         self.assertTrue(natt_dis == ap.natt_disabled)
1542
1543         if 'lifetime_data' in cp:
1544             self.verify_lifetime_data(ap, cp['lifetime_data'])
1545         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1546         if 'tun_itf' in cp:
1547             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1548         else:
1549             self.assertEqual(ap.tun_itf, 0xffffffff)
1550
1551
1552 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1553     """ test ikev2 initiator - pre shared key auth """
1554
1555     def config_tc(self):
1556         self.config_params({
1557             'is_initiator': False,  # seen from test case perspective
1558                                     # thus vpp is initiator
1559             'responder': {'sw_if_index': self.pg0.sw_if_index,
1560                            'addr': self.pg0.remote_ip4},
1561             'ike-crypto': ('AES-GCM-16ICV', 32),
1562             'ike-integ': 'NULL',
1563             'ike-dh': '3072MODPgr',
1564             'ike_transforms': {
1565                 'crypto_alg': 20,  # "aes-gcm-16"
1566                 'crypto_key_size': 256,
1567                 'dh_group': 15,  # "modp-3072"
1568             },
1569             'esp_transforms': {
1570                 'crypto_alg': 12,  # "aes-cbc"
1571                 'crypto_key_size': 256,
1572                 # "hmac-sha2-256-128"
1573                 'integ_alg': 12}})
1574
1575
1576 class TestInitiatorRekey(TestInitiatorPsk):
1577     """ test ikev2 initiator - rekey """
1578
1579     def rekey_from_initiator(self):
1580         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1581         self.pg0.enable_capture()
1582         self.pg_start()
1583         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1584         capture = self.pg0.get_capture(1)
1585         ih = self.get_ike_header(capture[0])
1586         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1587         self.assertNotIn('Response', ih.flags)
1588         self.assertIn('Initiator', ih.flags)
1589         plain = self.sa.hmac_and_decrypt(ih)
1590         sa = ikev2.IKEv2_payload_SA(plain)
1591         prop = sa[ikev2.IKEv2_payload_Proposal]
1592         nonce = sa[ikev2.IKEv2_payload_Nonce]
1593         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1594         self.sa.r_nonce = self.sa.i_nonce
1595         # update new responder SPI
1596         self.sa.child_sas[0].ispi = prop.SPI
1597         self.sa.child_sas[0].rspi = prop.SPI
1598         self.sa.calc_child_keys()
1599         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1600                              flags='Response', exch_type=36,
1601                              id=ih.id, next_payload='Encrypted')
1602         resp = self.encrypt_ike_msg(header, sa, 'SA')
1603         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1604                                     self.sa.dport, self.sa.natt, self.ip6)
1605         self.send_and_assert_no_replies(self.pg0, packet)
1606
1607     def test_initiator(self):
1608         super(TestInitiatorRekey, self).test_initiator()
1609         self.rekey_from_initiator()
1610         self.verify_ike_sas()
1611         self.verify_ipsec_sas(is_rekey=True)
1612
1613
1614 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1615     """ test ikev2 initiator - delete IKE SA from responder """
1616
1617     def config_tc(self):
1618         self.config_params({
1619             'del_sa_from_responder': True,
1620             'is_initiator': False,  # seen from test case perspective
1621                                     # thus vpp is initiator
1622             'responder': {'sw_if_index': self.pg0.sw_if_index,
1623                            'addr': self.pg0.remote_ip4},
1624             'ike-crypto': ('AES-GCM-16ICV', 32),
1625             'ike-integ': 'NULL',
1626             'ike-dh': '3072MODPgr',
1627             'ike_transforms': {
1628                 'crypto_alg': 20,  # "aes-gcm-16"
1629                 'crypto_key_size': 256,
1630                 'dh_group': 15,  # "modp-3072"
1631             },
1632             'esp_transforms': {
1633                 'crypto_alg': 12,  # "aes-cbc"
1634                 'crypto_key_size': 256,
1635                 # "hmac-sha2-256-128"
1636                 'integ_alg': 12}})
1637
1638
1639 class TestResponderNATT(TemplateResponder, Ikev2Params):
1640     """ test ikev2 responder - nat traversal """
1641     def config_tc(self):
1642         self.config_params(
1643                 {'natt': True})
1644
1645
1646 class TestResponderPsk(TemplateResponder, Ikev2Params):
1647     """ test ikev2 responder - pre shared key auth """
1648     def config_tc(self):
1649         self.config_params()
1650
1651
1652 class TestResponderRekey(TestResponderPsk):
1653     """ test ikev2 responder - rekey """
1654
1655     def rekey_from_initiator(self):
1656         sa, first_payload = self.generate_auth_payload(is_rekey=True)
1657         header = ikev2.IKEv2(
1658                 init_SPI=self.sa.ispi,
1659                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1660                 flags='Initiator', exch_type='CREATE_CHILD_SA')
1661
1662         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
1663         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1664                                     self.sa.dport, self.sa.natt, self.ip6)
1665         self.pg0.add_stream(packet)
1666         self.pg0.enable_capture()
1667         self.pg_start()
1668         capture = self.pg0.get_capture(1)
1669         ih = self.get_ike_header(capture[0])
1670         plain = self.sa.hmac_and_decrypt(ih)
1671         sa = ikev2.IKEv2_payload_SA(plain)
1672         prop = sa[ikev2.IKEv2_payload_Proposal]
1673         nonce = sa[ikev2.IKEv2_payload_Nonce]
1674         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1675         # update new responder SPI
1676         self.sa.child_sas[0].rspi = prop.SPI
1677
1678     def test_responder(self):
1679         super(TestResponderRekey, self).test_responder()
1680         self.rekey_from_initiator()
1681         self.sa.calc_child_keys()
1682         self.verify_ike_sas()
1683         self.verify_ipsec_sas(is_rekey=True)
1684
1685
1686 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1687     """ test ikev2 responder - cert based auth """
1688     def config_tc(self):
1689         self.config_params({
1690             'auth': 'rsa-sig',
1691             'server-key': 'server-key.pem',
1692             'client-key': 'client-key.pem',
1693             'client-cert': 'client-cert.pem',
1694             'server-cert': 'server-cert.pem'})
1695
1696
1697 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1698         (TemplateResponder, Ikev2Params):
1699     """
1700     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1701     """
1702     def config_tc(self):
1703         self.config_params({
1704             'ike-crypto': ('AES-CBC', 16),
1705             'ike-integ': 'SHA2-256-128',
1706             'esp-crypto': ('AES-CBC', 24),
1707             'esp-integ': 'SHA2-384-192',
1708             'ike-dh': '2048MODPgr'})
1709
1710
1711 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1712         (TemplateResponder, Ikev2Params):
1713     """
1714     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1715     """
1716     def config_tc(self):
1717         self.config_params({
1718             'ike-crypto': ('AES-CBC', 32),
1719             'ike-integ': 'SHA2-256-128',
1720             'esp-crypto': ('AES-GCM-16ICV', 32),
1721             'esp-integ': 'NULL',
1722             'ike-dh': '3072MODPgr'})
1723
1724
1725 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1726     """
1727     IKE:AES_GCM_16_256
1728     """
1729     def config_tc(self):
1730         self.config_params({
1731             'del_sa_from_responder': True,
1732             'ip6': True,
1733             'natt': True,
1734             'ike-crypto': ('AES-GCM-16ICV', 32),
1735             'ike-integ': 'NULL',
1736             'ike-dh': '2048MODPgr',
1737             'loc_ts': {'start_addr': 'ab:cd::0',
1738                        'end_addr': 'ab:cd::10'},
1739             'rem_ts': {'start_addr': '11::0',
1740                        'end_addr': '11::100'}})
1741
1742
1743 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1744     """ malformed packet test """
1745
1746     def tearDown(self):
1747         pass
1748
1749     def config_tc(self):
1750         self.config_params()
1751
1752     def assert_counter(self, count, name, version='ip4'):
1753         node_name = '/err/ikev2-%s/' % version + name
1754         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1755
1756     def create_ike_init_msg(self, length=None, payload=None):
1757         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1758                           flags='Initiator', exch_type='IKE_SA_INIT')
1759         if payload is not None:
1760             msg /= payload
1761         return self.create_packet(self.pg0, msg, self.sa.sport,
1762                                   self.sa.dport)
1763
1764     def verify_bad_packet_length(self):
1765         ike_msg = self.create_ike_init_msg(length=0xdead)
1766         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1767         self.assert_counter(self.pkt_count, 'Bad packet length')
1768
1769     def verify_bad_sa_payload_length(self):
1770         p = ikev2.IKEv2_payload_SA(length=0xdead)
1771         ike_msg = self.create_ike_init_msg(payload=p)
1772         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1773         self.assert_counter(self.pkt_count, 'Malformed packet')
1774
1775     def test_responder(self):
1776         self.pkt_count = 254
1777         self.verify_bad_packet_length()
1778         self.verify_bad_sa_payload_length()
1779
1780
1781 if __name__ == '__main__':
1782     unittest.main(testRunner=VppTestRunner)