ikev2: fix nat traversal
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 from scapy.layers.ipsec import ESP
16 from scapy.layers.inet import IP, UDP, Ether
17 from scapy.layers.inet6 import IPv6
18 from scapy.packet import raw, Raw
19 from scapy.utils import long_converter
20 from framework import VppTestCase, VppTestRunner
21 from vpp_ikev2 import Profile, IDType, AuthMethod
22 from vpp_papi import VppEnum
23
24 try:
25     text_type = unicode
26 except NameError:
27     text_type = str
28
29 KEY_PAD = b"Key Pad for IKEv2"
30 SALT_SIZE = 4
31 GCM_ICV_SIZE = 16
32 GCM_IV_SIZE = 8
33
34
35 # defined in rfc3526
36 # tuple structure is (p, g, key_len)
37 DH = {
38     '2048MODPgr': (long_converter("""
39     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
40     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
41     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
42     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
43     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
44     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
45     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
46     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
47     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
48     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
49     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
50
51     '3072MODPgr': (long_converter("""
52     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
63     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
64     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
65     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
66     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
67     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
68 }
69
70
71 class CryptoAlgo(object):
72     def __init__(self, name, cipher, mode):
73         self.name = name
74         self.cipher = cipher
75         self.mode = mode
76         if self.cipher is not None:
77             self.bs = self.cipher.block_size // 8
78
79             if self.name == 'AES-GCM-16ICV':
80                 self.iv_len = GCM_IV_SIZE
81             else:
82                 self.iv_len = self.bs
83
84     def encrypt(self, data, key, aad=None):
85         iv = os.urandom(self.iv_len)
86         if aad is None:
87             encryptor = Cipher(self.cipher(key), self.mode(iv),
88                                default_backend()).encryptor()
89             return iv + encryptor.update(data) + encryptor.finalize()
90         else:
91             salt = key[-SALT_SIZE:]
92             nonce = salt + iv
93             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
94                                default_backend()).encryptor()
95             encryptor.authenticate_additional_data(aad)
96             data = encryptor.update(data) + encryptor.finalize()
97             data += encryptor.tag[:GCM_ICV_SIZE]
98             return iv + data
99
100     def decrypt(self, data, key, aad=None, icv=None):
101         if aad is None:
102             iv = data[:self.iv_len]
103             ct = data[self.iv_len:]
104             decryptor = Cipher(algorithms.AES(key),
105                                self.mode(iv),
106                                default_backend()).decryptor()
107             return decryptor.update(ct) + decryptor.finalize()
108         else:
109             salt = key[-SALT_SIZE:]
110             nonce = salt + data[:GCM_IV_SIZE]
111             ct = data[GCM_IV_SIZE:]
112             key = key[:-SALT_SIZE]
113             decryptor = Cipher(algorithms.AES(key),
114                                self.mode(nonce, icv, len(icv)),
115                                default_backend()).decryptor()
116             decryptor.authenticate_additional_data(aad)
117             return decryptor.update(ct) + decryptor.finalize()
118
119     def pad(self, data):
120         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121         data = data + b'\x00' * (pad_len - 1)
122         return data + bytes([pad_len - 1])
123
124
125 class AuthAlgo(object):
126     def __init__(self, name, mac, mod, key_len, trunc_len=None):
127         self.name = name
128         self.mac = mac
129         self.mod = mod
130         self.key_len = key_len
131         self.trunc_len = trunc_len or key_len
132
133
134 CRYPTO_ALGOS = {
135     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
138                                 mode=modes.GCM),
139 }
140
141 AUTH_ALGOS = {
142     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
147 }
148
149 PRF_ALGOS = {
150     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
152                                   hashes.SHA256, 32),
153 }
154
155 CRYPTO_IDS = {
156     12: 'AES-CBC',
157     20: 'AES-GCM-16ICV',
158 }
159
160 INTEG_IDS = {
161     2: 'HMAC-SHA1-96',
162     12: 'SHA2-256-128',
163     13: 'SHA2-384-192',
164     14: 'SHA2-512-256',
165 }
166
167
168 class IKEv2ChildSA(object):
169     def __init__(self, local_ts, remote_ts, is_initiator):
170         spi = os.urandom(4)
171         if is_initiator:
172             self.ispi = spi
173             self.rspi = None
174         else:
175             self.rspi = spi
176             self.ispi = None
177         self.local_ts = local_ts
178         self.remote_ts = remote_ts
179
180
181 class IKEv2SA(object):
182     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
183                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
184                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
185                  auth_method='shared-key', priv_key=None, natt=False,
186                  udp_encap=False):
187         self.udp_encap = udp_encap
188         self.natt = natt
189         if natt:
190             self.sport = 4500
191             self.dport = 4500
192         else:
193             self.sport = 500
194             self.dport = 500
195         self.msg_id = 0
196         self.dh_params = None
197         self.test = test
198         self.priv_key = priv_key
199         self.is_initiator = is_initiator
200         nonce = nonce or os.urandom(32)
201         self.auth_data = auth_data
202         self.i_id = i_id
203         self.r_id = r_id
204         if isinstance(id_type, str):
205             self.id_type = IDType.value(id_type)
206         else:
207             self.id_type = id_type
208         self.auth_method = auth_method
209         if self.is_initiator:
210             self.rspi = 8 * b'\x00'
211             self.ispi = spi
212             self.i_nonce = nonce
213         else:
214             self.rspi = spi
215             self.ispi = 8 * b'\x00'
216             self.r_nonce = nonce
217         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
218                           self.is_initiator)]
219
220     def new_msg_id(self):
221         self.msg_id += 1
222         return self.msg_id
223
224     @property
225     def my_dh_pub_key(self):
226         if self.is_initiator:
227             return self.i_dh_data
228         return self.r_dh_data
229
230     @property
231     def peer_dh_pub_key(self):
232         if self.is_initiator:
233             return self.r_dh_data
234         return self.i_dh_data
235
236     def compute_secret(self):
237         priv = self.dh_private_key
238         peer = self.peer_dh_pub_key
239         p, g, l = self.ike_group
240         return pow(int.from_bytes(peer, 'big'),
241                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
242
243     def generate_dh_data(self):
244         # generate DH keys
245         if self.ike_dh not in DH:
246             raise NotImplementedError('%s not in DH group' % self.ike_dh)
247
248         if self.dh_params is None:
249             dhg = DH[self.ike_dh]
250             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
251             self.dh_params = pn.parameters(default_backend())
252
253         priv = self.dh_params.generate_private_key()
254         pub = priv.public_key()
255         x = priv.private_numbers().x
256         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
257         y = pub.public_numbers().y
258
259         if self.is_initiator:
260             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
261         else:
262             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
263
264     def complete_dh_data(self):
265         self.dh_shared_secret = self.compute_secret()
266
267     def calc_child_keys(self):
268         prf = self.ike_prf_alg.mod()
269         s = self.i_nonce + self.r_nonce
270         c = self.child_sas[0]
271
272         encr_key_len = self.esp_crypto_key_len
273         integ_key_len = self.esp_integ_alg.key_len
274         salt_len = 0 if integ_key_len else 4
275
276         l = (integ_key_len * 2 +
277              encr_key_len * 2 +
278              salt_len * 2)
279         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
280
281         pos = 0
282         c.sk_ei = keymat[pos:pos+encr_key_len]
283         pos += encr_key_len
284
285         if integ_key_len:
286             c.sk_ai = keymat[pos:pos+integ_key_len]
287             pos += integ_key_len
288         else:
289             c.salt_ei = keymat[pos:pos+salt_len]
290             pos += salt_len
291
292         c.sk_er = keymat[pos:pos+encr_key_len]
293         pos += encr_key_len
294
295         if integ_key_len:
296             c.sk_ar = keymat[pos:pos+integ_key_len]
297             pos += integ_key_len
298         else:
299             c.salt_er = keymat[pos:pos+salt_len]
300             pos += salt_len
301
302     def calc_prfplus(self, prf, key, seed, length):
303         r = b''
304         t = None
305         x = 1
306         while len(r) < length and x < 255:
307             if t is not None:
308                 s = t
309             else:
310                 s = b''
311             s = s + seed + bytes([x])
312             t = self.calc_prf(prf, key, s)
313             r = r + t
314             x = x + 1
315
316         if x == 255:
317             return None
318         return r
319
320     def calc_prf(self, prf, key, data):
321         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
322         h.update(data)
323         return h.finalize()
324
325     def calc_keys(self):
326         prf = self.ike_prf_alg.mod()
327         # SKEYSEED = prf(Ni | Nr, g^ir)
328         s = self.i_nonce + self.r_nonce
329         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
330
331         # calculate S = Ni | Nr | SPIi SPIr
332         s = s + self.ispi + self.rspi
333
334         prf_key_trunc = self.ike_prf_alg.trunc_len
335         encr_key_len = self.ike_crypto_key_len
336         tr_prf_key_len = self.ike_prf_alg.key_len
337         integ_key_len = self.ike_integ_alg.key_len
338         if integ_key_len == 0:
339             salt_size = 4
340         else:
341             salt_size = 0
342
343         l = (prf_key_trunc +
344              integ_key_len * 2 +
345              encr_key_len * 2 +
346              tr_prf_key_len * 2 +
347              salt_size * 2)
348         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
349
350         pos = 0
351         self.sk_d = keymat[:pos+prf_key_trunc]
352         pos += prf_key_trunc
353
354         self.sk_ai = keymat[pos:pos+integ_key_len]
355         pos += integ_key_len
356         self.sk_ar = keymat[pos:pos+integ_key_len]
357         pos += integ_key_len
358
359         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
360         pos += encr_key_len + salt_size
361         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
362         pos += encr_key_len + salt_size
363
364         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
365         pos += tr_prf_key_len
366         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
367
368     def generate_authmsg(self, prf, packet):
369         if self.is_initiator:
370             id = self.i_id
371             nonce = self.r_nonce
372             key = self.sk_pi
373         else:
374             id = self.r_id
375             nonce = self.i_nonce
376             key = self.sk_pr
377         data = bytes([self.id_type, 0, 0, 0]) + id
378         id_hash = self.calc_prf(prf, key, data)
379         return packet + nonce + id_hash
380
381     def auth_init(self):
382         prf = self.ike_prf_alg.mod()
383         if self.is_initiator:
384             packet = self.init_req_packet
385         else:
386             packet = self.init_resp_packet
387         authmsg = self.generate_authmsg(prf, raw(packet))
388         if self.auth_method == 'shared-key':
389             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
390             self.auth_data = self.calc_prf(prf, psk, authmsg)
391         elif self.auth_method == 'rsa-sig':
392             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
393                                                 hashes.SHA1())
394         else:
395             raise TypeError('unknown auth method type!')
396
397     def encrypt(self, data, aad=None):
398         data = self.ike_crypto_alg.pad(data)
399         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
400
401     @property
402     def peer_authkey(self):
403         if self.is_initiator:
404             return self.sk_ar
405         return self.sk_ai
406
407     @property
408     def my_authkey(self):
409         if self.is_initiator:
410             return self.sk_ai
411         return self.sk_ar
412
413     @property
414     def my_cryptokey(self):
415         if self.is_initiator:
416             return self.sk_ei
417         return self.sk_er
418
419     @property
420     def peer_cryptokey(self):
421         if self.is_initiator:
422             return self.sk_er
423         return self.sk_ei
424
425     def concat(self, alg, key_len):
426         return alg + '-' + str(key_len * 8)
427
428     @property
429     def vpp_ike_cypto_alg(self):
430         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
431
432     @property
433     def vpp_esp_cypto_alg(self):
434         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
435
436     def verify_hmac(self, ikemsg):
437         integ_trunc = self.ike_integ_alg.trunc_len
438         exp_hmac = ikemsg[-integ_trunc:]
439         data = ikemsg[:-integ_trunc]
440         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
441                                           self.peer_authkey, data)
442         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
443
444     def compute_hmac(self, integ, key, data):
445         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
446         h.update(data)
447         return h.finalize()
448
449     def decrypt(self, data, aad=None, icv=None):
450         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
451
452     def hmac_and_decrypt(self, ike):
453         ep = ike[ikev2.IKEv2_payload_Encrypted]
454         if self.ike_crypto == 'AES-GCM-16ICV':
455             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
456             ct = ep.load[:-GCM_ICV_SIZE]
457             tag = ep.load[-GCM_ICV_SIZE:]
458             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
459         else:
460             self.verify_hmac(raw(ike))
461             integ_trunc = self.ike_integ_alg.trunc_len
462
463             # remove ICV and decrypt payload
464             ct = ep.load[:-integ_trunc]
465             plain = self.decrypt(ct)
466         # remove padding
467         pad_len = plain[-1]
468         return plain[:-pad_len - 1]
469
470     def build_ts_addr(self, ts, version):
471         return {'starting_address_v' + version: ts['start_addr'],
472                 'ending_address_v' + version: ts['end_addr']}
473
474     def generate_ts(self, is_ip4):
475         c = self.child_sas[0]
476         ts_data = {'IP_protocol_ID': 0,
477                    'start_port': 0,
478                    'end_port': 0xffff}
479         if is_ip4:
480             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
481             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
482             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
483             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
484         else:
485             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
486             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
487             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
488             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
489
490         if self.is_initiator:
491             return ([ts1], [ts2])
492         return ([ts2], [ts1])
493
494     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
495         if crypto not in CRYPTO_ALGOS:
496             raise TypeError('unsupported encryption algo %r' % crypto)
497         self.ike_crypto = crypto
498         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
499         self.ike_crypto_key_len = crypto_key_len
500
501         if integ not in AUTH_ALGOS:
502             raise TypeError('unsupported auth algo %r' % integ)
503         self.ike_integ = None if integ == 'NULL' else integ
504         self.ike_integ_alg = AUTH_ALGOS[integ]
505
506         if prf not in PRF_ALGOS:
507             raise TypeError('unsupported prf algo %r' % prf)
508         self.ike_prf = prf
509         self.ike_prf_alg = PRF_ALGOS[prf]
510         self.ike_dh = dh
511         self.ike_group = DH[self.ike_dh]
512
513     def set_esp_props(self, crypto, crypto_key_len, integ):
514         self.esp_crypto_key_len = crypto_key_len
515         if crypto not in CRYPTO_ALGOS:
516             raise TypeError('unsupported encryption algo %r' % crypto)
517         self.esp_crypto = crypto
518         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
519
520         if integ not in AUTH_ALGOS:
521             raise TypeError('unsupported auth algo %r' % integ)
522         self.esp_integ = None if integ == 'NULL' else integ
523         self.esp_integ_alg = AUTH_ALGOS[integ]
524
525     def crypto_attr(self, key_len):
526         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
527             return (0x800e << 16 | key_len << 3, 12)
528         else:
529             raise Exception('unsupported attribute type')
530
531     def ike_crypto_attr(self):
532         return self.crypto_attr(self.ike_crypto_key_len)
533
534     def esp_crypto_attr(self):
535         return self.crypto_attr(self.esp_crypto_key_len)
536
537     def compute_nat_sha1(self, ip, port, rspi=None):
538         if rspi is None:
539             rspi = self.rspi
540         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
541         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
542         digest.update(data)
543         return digest.finalize()
544
545
546 class IkePeer(VppTestCase):
547     """ common class for initiator and responder """
548
549     @classmethod
550     def setUpClass(cls):
551         import scapy.contrib.ikev2 as _ikev2
552         globals()['ikev2'] = _ikev2
553         super(IkePeer, cls).setUpClass()
554         cls.create_pg_interfaces(range(2))
555         for i in cls.pg_interfaces:
556             i.admin_up()
557             i.config_ip4()
558             i.resolve_arp()
559             i.config_ip6()
560             i.resolve_ndp()
561
562     @classmethod
563     def tearDownClass(cls):
564         super(IkePeer, cls).tearDownClass()
565
566     def tearDown(self):
567         super(IkePeer, self).tearDown()
568         if self.del_sa_from_responder:
569             self.initiate_del_sa_from_responder()
570         else:
571             self.initiate_del_sa_from_initiator()
572         r = self.vapi.ikev2_sa_dump()
573         self.assertEqual(len(r), 0)
574         sas = self.vapi.ipsec_sa_dump()
575         self.assertEqual(len(sas), 0)
576         self.p.remove_vpp_config()
577         self.assertIsNone(self.p.query_vpp_config())
578
579     def setUp(self):
580         super(IkePeer, self).setUp()
581         self.config_tc()
582         self.p.add_vpp_config()
583         self.assertIsNotNone(self.p.query_vpp_config())
584         if self.sa.is_initiator:
585             self.sa.generate_dh_data()
586         self.vapi.cli('ikev2 set logging level 4')
587         self.vapi.cli('event-lo clear')
588
589     def create_rekey_request(self):
590         sa, first_payload = self.generate_auth_payload(is_rekey=True)
591         header = ikev2.IKEv2(
592                 init_SPI=self.sa.ispi,
593                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
594                 flags='Initiator', exch_type='CREATE_CHILD_SA')
595
596         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
597         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
598                                   self.sa.dport, self.sa.natt, self.ip6)
599
600     def create_empty_request(self):
601         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
602                              id=self.sa.new_msg_id(), flags='Initiator',
603                              exch_type='INFORMATIONAL',
604                              next_payload='Encrypted')
605
606         msg = self.encrypt_ike_msg(header, b'', None)
607         return self.create_packet(self.pg0, msg, self.sa.sport,
608                                   self.sa.dport, self.sa.natt, self.ip6)
609
610     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
611                       use_ip6=False):
612         if use_ip6:
613             src_ip = src_if.remote_ip6
614             dst_ip = src_if.local_ip6
615             ip_layer = IPv6
616         else:
617             src_ip = src_if.remote_ip4
618             dst_ip = src_if.local_ip4
619             ip_layer = IP
620         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
621                ip_layer(src=src_ip, dst=dst_ip) /
622                UDP(sport=sport, dport=dport))
623         if natt:
624             # insert non ESP marker
625             res = res / Raw(b'\x00' * 4)
626         return res / msg
627
628     def verify_udp(self, udp):
629         self.assertEqual(udp.sport, self.sa.sport)
630         self.assertEqual(udp.dport, self.sa.dport)
631
632     def get_ike_header(self, packet):
633         try:
634             ih = packet[ikev2.IKEv2]
635             ih = self.verify_and_remove_non_esp_marker(ih)
636         except IndexError as e:
637             # this is a workaround for getting IKEv2 layer as both ikev2 and
638             # ipsec register for port 4500
639             esp = packet[ESP]
640             ih = self.verify_and_remove_non_esp_marker(esp)
641         self.assertEqual(ih.version, 0x20)
642         self.assertNotIn('Version', ih.flags)
643         return ih
644
645     def verify_and_remove_non_esp_marker(self, packet):
646         if self.sa.natt:
647             # if we are in nat traversal mode check for non esp marker
648             # and remove it
649             data = raw(packet)
650             self.assertEqual(data[:4], b'\x00' * 4)
651             return ikev2.IKEv2(data[4:])
652         else:
653             return packet
654
655     def encrypt_ike_msg(self, header, plain, first_payload):
656         if self.sa.ike_crypto == 'AES-GCM-16ICV':
657             data = self.sa.ike_crypto_alg.pad(raw(plain))
658             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
659                 len(ikev2.IKEv2_payload_Encrypted())
660             tlen = plen + len(ikev2.IKEv2())
661
662             # prepare aad data
663             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
664                                                  length=plen)
665             header.length = tlen
666             res = header / sk_p
667             encr = self.sa.encrypt(raw(plain), raw(res))
668             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
669                                                  length=plen, load=encr)
670             res = header / sk_p
671         else:
672             encr = self.sa.encrypt(raw(plain))
673             trunc_len = self.sa.ike_integ_alg.trunc_len
674             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
675             tlen = plen + len(ikev2.IKEv2())
676
677             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
678                                                  length=plen, load=encr)
679             header.length = tlen
680             res = header / sk_p
681
682             integ_data = raw(res)
683             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
684                                              self.sa.my_authkey, integ_data)
685             res = res / Raw(hmac_data[:trunc_len])
686         assert(len(res) == tlen)
687         return res
688
689     def verify_udp_encap(self, ipsec_sa):
690         e = VppEnum.vl_api_ipsec_sad_flags_t
691         if self.sa.udp_encap or self.sa.natt:
692             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
693         else:
694             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
695
696     def verify_ipsec_sas(self, is_rekey=False):
697         sas = self.vapi.ipsec_sa_dump()
698         if is_rekey:
699             # after rekey there is a short period of time in which old
700             # inbound SA is still present
701             sa_count = 3
702         else:
703             sa_count = 2
704         self.assertEqual(len(sas), sa_count)
705         if self.sa.is_initiator:
706             if is_rekey:
707                 sa0 = sas[0].entry
708                 sa1 = sas[2].entry
709             else:
710                 sa0 = sas[0].entry
711                 sa1 = sas[1].entry
712         else:
713             if is_rekey:
714                 sa0 = sas[2].entry
715                 sa1 = sas[0].entry
716             else:
717                 sa1 = sas[0].entry
718                 sa0 = sas[1].entry
719
720         c = self.sa.child_sas[0]
721
722         self.verify_udp_encap(sa0)
723         self.verify_udp_encap(sa1)
724         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
725         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
726         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
727
728         if self.sa.esp_integ is None:
729             vpp_integ_alg = 0
730         else:
731             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
732         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
733         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
734
735         # verify crypto keys
736         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
737         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
738         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
739         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
740
741         # verify integ keys
742         if vpp_integ_alg:
743             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
744             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
745             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
746             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
747         else:
748             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
749             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
750
751     def verify_keymat(self, api_keys, keys, name):
752         km = getattr(keys, name)
753         api_km = getattr(api_keys, name)
754         api_km_len = getattr(api_keys, name + '_len')
755         self.assertEqual(len(km), api_km_len)
756         self.assertEqual(km, api_km[:api_km_len])
757
758     def verify_id(self, api_id, exp_id):
759         self.assertEqual(api_id.type, IDType.value(exp_id.type))
760         self.assertEqual(api_id.data_len, exp_id.data_len)
761         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
762
763     def verify_ike_sas(self):
764         r = self.vapi.ikev2_sa_dump()
765         self.assertEqual(len(r), 1)
766         sa = r[0].sa
767         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
768         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
769         if self.ip6:
770             if self.sa.is_initiator:
771                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
772                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
773             else:
774                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
775                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
776         else:
777             if self.sa.is_initiator:
778                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
779                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
780             else:
781                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
782                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
783         self.verify_keymat(sa.keys, self.sa, 'sk_d')
784         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
785         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
786         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
787         self.verify_keymat(sa.keys, self.sa, 'sk_er')
788         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
789         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
790
791         self.assertEqual(sa.i_id.type, self.sa.id_type)
792         self.assertEqual(sa.r_id.type, self.sa.id_type)
793         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
794         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
795         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
796         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
797
798         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
799         self.assertEqual(len(r), 1)
800         csa = r[0].child_sa
801         self.assertEqual(csa.sa_index, sa.sa_index)
802         c = self.sa.child_sas[0]
803         if hasattr(c, 'sk_ai'):
804             self.verify_keymat(csa.keys, c, 'sk_ai')
805             self.verify_keymat(csa.keys, c, 'sk_ar')
806         self.verify_keymat(csa.keys, c, 'sk_ei')
807         self.verify_keymat(csa.keys, c, 'sk_er')
808         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
809         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
810
811         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
812         tsi = tsi[0]
813         tsr = tsr[0]
814         r = self.vapi.ikev2_traffic_selector_dump(
815                 is_initiator=True, sa_index=sa.sa_index,
816                 child_sa_index=csa.child_sa_index)
817         self.assertEqual(len(r), 1)
818         ts = r[0].ts
819         self.verify_ts(r[0].ts, tsi[0], True)
820
821         r = self.vapi.ikev2_traffic_selector_dump(
822                 is_initiator=False, sa_index=sa.sa_index,
823                 child_sa_index=csa.child_sa_index)
824         self.assertEqual(len(r), 1)
825         self.verify_ts(r[0].ts, tsr[0], False)
826
827         n = self.vapi.ikev2_nonce_get(is_initiator=True,
828                                       sa_index=sa.sa_index)
829         self.verify_nonce(n, self.sa.i_nonce)
830         n = self.vapi.ikev2_nonce_get(is_initiator=False,
831                                       sa_index=sa.sa_index)
832         self.verify_nonce(n, self.sa.r_nonce)
833
834     def verify_nonce(self, api_nonce, nonce):
835         self.assertEqual(api_nonce.data_len, len(nonce))
836         self.assertEqual(api_nonce.nonce, nonce)
837
838     def verify_ts(self, api_ts, ts, is_initiator):
839         if is_initiator:
840             self.assertTrue(api_ts.is_local)
841         else:
842             self.assertFalse(api_ts.is_local)
843
844         if self.p.ts_is_ip4:
845             self.assertEqual(api_ts.start_addr,
846                              IPv4Address(ts.starting_address_v4))
847             self.assertEqual(api_ts.end_addr,
848                              IPv4Address(ts.ending_address_v4))
849         else:
850             self.assertEqual(api_ts.start_addr,
851                              IPv6Address(ts.starting_address_v6))
852             self.assertEqual(api_ts.end_addr,
853                              IPv6Address(ts.ending_address_v6))
854         self.assertEqual(api_ts.start_port, ts.start_port)
855         self.assertEqual(api_ts.end_port, ts.end_port)
856         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
857
858
859 class TemplateInitiator(IkePeer):
860     """ initiator test template """
861
862     def initiate_del_sa_from_initiator(self):
863         ispi = int.from_bytes(self.sa.ispi, 'little')
864         self.pg0.enable_capture()
865         self.pg_start()
866         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
867         capture = self.pg0.get_capture(1)
868         ih = self.get_ike_header(capture[0])
869         self.assertNotIn('Response', ih.flags)
870         self.assertIn('Initiator', ih.flags)
871         self.assertEqual(ih.init_SPI, self.sa.ispi)
872         self.assertEqual(ih.resp_SPI, self.sa.rspi)
873         plain = self.sa.hmac_and_decrypt(ih)
874         d = ikev2.IKEv2_payload_Delete(plain)
875         self.assertEqual(d.proto, 1)  # proto=IKEv2
876         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
877                              flags='Response', exch_type='INFORMATIONAL',
878                              id=ih.id, next_payload='Encrypted')
879         resp = self.encrypt_ike_msg(header, b'', None)
880         self.send_and_assert_no_replies(self.pg0, resp)
881
882     def verify_del_sa(self, packet):
883         ih = self.get_ike_header(packet)
884         self.assertEqual(ih.id, self.sa.msg_id)
885         self.assertEqual(ih.exch_type, 37)  # exchange informational
886         self.assertIn('Response', ih.flags)
887         self.assertIn('Initiator', ih.flags)
888         plain = self.sa.hmac_and_decrypt(ih)
889         self.assertEqual(plain, b'')
890
891     def initiate_del_sa_from_responder(self):
892         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
893                              exch_type='INFORMATIONAL',
894                              id=self.sa.new_msg_id())
895         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
896         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
897         packet = self.create_packet(self.pg0, ike_msg,
898                                     self.sa.sport, self.sa.dport,
899                                     self.sa.natt, self.ip6)
900         self.pg0.add_stream(packet)
901         self.pg0.enable_capture()
902         self.pg_start()
903         capture = self.pg0.get_capture(1)
904         self.verify_del_sa(capture[0])
905
906     @staticmethod
907     def find_notify_payload(packet, notify_type):
908         n = packet[ikev2.IKEv2_payload_Notify]
909         while n is not None:
910             if n.type == notify_type:
911                 return n
912             n = n.payload
913         return None
914
915     def verify_nat_detection(self, packet):
916         if self.ip6:
917             iph = packet[IPv6]
918         else:
919             iph = packet[IP]
920         udp = packet[UDP]
921
922         # NAT_DETECTION_SOURCE_IP
923         s = self.find_notify_payload(packet, 16388)
924         self.assertIsNotNone(s)
925         src_sha = self.sa.compute_nat_sha1(
926                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
927         self.assertEqual(s.load, src_sha)
928
929         # NAT_DETECTION_DESTINATION_IP
930         s = self.find_notify_payload(packet, 16389)
931         self.assertIsNotNone(s)
932         dst_sha = self.sa.compute_nat_sha1(
933                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
934         self.assertEqual(s.load, dst_sha)
935
936     def verify_sa_init_request(self, packet):
937         udp = packet[UDP]
938         self.sa.dport = udp.sport
939         ih = packet[ikev2.IKEv2]
940         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
941         self.assertEqual(ih.exch_type, 34)  # SA_INIT
942         self.sa.ispi = ih.init_SPI
943         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
944         self.assertIn('Initiator', ih.flags)
945         self.assertNotIn('Response', ih.flags)
946         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
947         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
948
949         prop = packet[ikev2.IKEv2_payload_Proposal]
950         self.assertEqual(prop.proto, 1)  # proto = ikev2
951         self.assertEqual(prop.proposal, 1)
952         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
953         self.assertEqual(prop.trans[0].transform_id,
954                          self.p.ike_transforms['crypto_alg'])
955         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
956         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
957         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
958         self.assertEqual(prop.trans[2].transform_id,
959                          self.p.ike_transforms['dh_group'])
960
961         self.verify_nat_detection(packet)
962         self.sa.set_ike_props(
963                     crypto='AES-GCM-16ICV', crypto_key_len=32,
964                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
965         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
966                               integ='SHA2-256-128')
967         self.sa.generate_dh_data()
968         self.sa.complete_dh_data()
969         self.sa.calc_keys()
970
971     def update_esp_transforms(self, trans, sa):
972         while trans:
973             if trans.transform_type == 1:  # ecryption
974                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
975             elif trans.transform_type == 3:  # integrity
976                 sa.esp_integ = INTEG_IDS[trans.transform_id]
977             trans = trans.payload
978
979     def verify_sa_auth_req(self, packet):
980         udp = packet[UDP]
981         self.sa.dport = udp.sport
982         ih = self.get_ike_header(packet)
983         self.assertEqual(ih.resp_SPI, self.sa.rspi)
984         self.assertEqual(ih.init_SPI, self.sa.ispi)
985         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
986         self.assertIn('Initiator', ih.flags)
987         self.assertNotIn('Response', ih.flags)
988
989         udp = packet[UDP]
990         self.verify_udp(udp)
991         self.assertEqual(ih.id, self.sa.msg_id + 1)
992         self.sa.msg_id += 1
993         plain = self.sa.hmac_and_decrypt(ih)
994         idi = ikev2.IKEv2_payload_IDi(plain)
995         idr = ikev2.IKEv2_payload_IDr(idi.payload)
996         self.assertEqual(idi.load, self.sa.i_id)
997         self.assertEqual(idr.load, self.sa.r_id)
998         prop = idi[ikev2.IKEv2_payload_Proposal]
999         c = self.sa.child_sas[0]
1000         c.ispi = prop.SPI
1001         self.update_esp_transforms(
1002                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1003
1004     def send_init_response(self):
1005         tr_attr = self.sa.ike_crypto_attr()
1006         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1007                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1008                  key_length=tr_attr[0]) /
1009                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1010                  transform_id=self.sa.ike_integ) /
1011                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1012                  transform_id=self.sa.ike_prf_alg.name) /
1013                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1014                  transform_id=self.sa.ike_dh))
1015         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1016                  trans_nb=4, trans=trans))
1017
1018         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1019         if self.sa.natt:
1020             dst_address = b'\x0a\x0a\x0a\x0a'
1021         else:
1022             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1023         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1024         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1025
1026         self.sa.init_resp_packet = (
1027             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1028                         exch_type='IKE_SA_INIT', flags='Response') /
1029             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1030             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1031                                    group=self.sa.ike_dh,
1032                                    load=self.sa.my_dh_pub_key) /
1033             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1034                                       next_payload='Notify') /
1035             ikev2.IKEv2_payload_Notify(
1036                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1037                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1038                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1039
1040         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1041                                      self.sa.sport, self.sa.dport,
1042                                      False, self.ip6)
1043         self.pg_send(self.pg0, ike_msg)
1044         capture = self.pg0.get_capture(1)
1045         self.verify_sa_auth_req(capture[0])
1046
1047     def initiate_sa_init(self):
1048         self.pg0.enable_capture()
1049         self.pg_start()
1050         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1051
1052         capture = self.pg0.get_capture(1)
1053         self.verify_sa_init_request(capture[0])
1054         self.send_init_response()
1055
1056     def send_auth_response(self):
1057         tr_attr = self.sa.esp_crypto_attr()
1058         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1059                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1060                  key_length=tr_attr[0]) /
1061                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1062                  transform_id=self.sa.esp_integ) /
1063                  ikev2.IKEv2_payload_Transform(
1064                  transform_type='Extended Sequence Number',
1065                  transform_id='No ESN') /
1066                  ikev2.IKEv2_payload_Transform(
1067                  transform_type='Extended Sequence Number',
1068                  transform_id='ESN'))
1069
1070         c = self.sa.child_sas[0]
1071         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1072                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1073
1074         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1075         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1076                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1077                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1078                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1079                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1080                  auth_type=AuthMethod.value(self.sa.auth_method),
1081                  load=self.sa.auth_data) /
1082                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1083                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1084                  number_of_TSs=len(tsi),
1085                  traffic_selector=tsi) /
1086                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1087                  number_of_TSs=len(tsr),
1088                  traffic_selector=tsr) /
1089                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1090
1091         header = ikev2.IKEv2(
1092                 init_SPI=self.sa.ispi,
1093                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1094                 flags='Response', exch_type='IKE_AUTH')
1095
1096         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1097         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1098                                     self.sa.dport, self.sa.natt, self.ip6)
1099         self.pg_send(self.pg0, packet)
1100
1101     def test_initiator(self):
1102         self.initiate_sa_init()
1103         self.sa.auth_init()
1104         self.sa.calc_child_keys()
1105         self.send_auth_response()
1106         self.verify_ike_sas()
1107
1108
1109 class TemplateResponder(IkePeer):
1110     """ responder test template """
1111
1112     def initiate_del_sa_from_responder(self):
1113         self.pg0.enable_capture()
1114         self.pg_start()
1115         self.vapi.ikev2_initiate_del_ike_sa(
1116                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1117         capture = self.pg0.get_capture(1)
1118         ih = self.get_ike_header(capture[0])
1119         self.assertNotIn('Response', ih.flags)
1120         self.assertNotIn('Initiator', ih.flags)
1121         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1122         plain = self.sa.hmac_and_decrypt(ih)
1123         d = ikev2.IKEv2_payload_Delete(plain)
1124         self.assertEqual(d.proto, 1)  # proto=IKEv2
1125         self.assertEqual(ih.init_SPI, self.sa.ispi)
1126         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1127         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1128                              flags='Initiator+Response',
1129                              exch_type='INFORMATIONAL',
1130                              id=ih.id, next_payload='Encrypted')
1131         resp = self.encrypt_ike_msg(header, b'', None)
1132         self.send_and_assert_no_replies(self.pg0, resp)
1133
1134     def verify_del_sa(self, packet):
1135         ih = self.get_ike_header(packet)
1136         self.assertEqual(ih.id, self.sa.msg_id)
1137         self.assertEqual(ih.exch_type, 37)  # exchange informational
1138         self.assertIn('Response', ih.flags)
1139         self.assertNotIn('Initiator', ih.flags)
1140         self.assertEqual(ih.next_payload, 46)  # Encrypted
1141         self.assertEqual(ih.init_SPI, self.sa.ispi)
1142         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1143         plain = self.sa.hmac_and_decrypt(ih)
1144         self.assertEqual(plain, b'')
1145
1146     def initiate_del_sa_from_initiator(self):
1147         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1148                              flags='Initiator', exch_type='INFORMATIONAL',
1149                              id=self.sa.new_msg_id())
1150         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1151         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1152         packet = self.create_packet(self.pg0, ike_msg,
1153                                     self.sa.sport, self.sa.dport,
1154                                     self.sa.natt, self.ip6)
1155         self.pg0.add_stream(packet)
1156         self.pg0.enable_capture()
1157         self.pg_start()
1158         capture = self.pg0.get_capture(1)
1159         self.verify_del_sa(capture[0])
1160
1161     def send_sa_init_req(self, behind_nat=False):
1162         tr_attr = self.sa.ike_crypto_attr()
1163         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1164                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1165                  key_length=tr_attr[0]) /
1166                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1167                  transform_id=self.sa.ike_integ) /
1168                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1169                  transform_id=self.sa.ike_prf_alg.name) /
1170                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1171                  transform_id=self.sa.ike_dh))
1172
1173         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1174                  trans_nb=4, trans=trans))
1175
1176         self.sa.init_req_packet = (
1177                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1178                             flags='Initiator', exch_type='IKE_SA_INIT') /
1179                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1180                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1181                                        group=self.sa.ike_dh,
1182                                        load=self.sa.my_dh_pub_key) /
1183                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1184                                           load=self.sa.i_nonce))
1185
1186         if behind_nat:
1187             src_address = b'\x0a\x0a\x0a\x01'
1188         else:
1189             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1190
1191         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1192         dst_nat = self.sa.compute_nat_sha1(
1193                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1194                 self.sa.dport)
1195         nat_src_detection = ikev2.IKEv2_payload_Notify(
1196                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1197                 next_payload='Notify')
1198         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1199                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1200         self.sa.init_req_packet = (self.sa.init_req_packet /
1201                                    nat_src_detection /
1202                                    nat_dst_detection)
1203
1204         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1205                                      self.sa.sport, self.sa.dport,
1206                                      self.sa.natt, self.ip6)
1207         self.pg0.add_stream(ike_msg)
1208         self.pg0.enable_capture()
1209         self.pg_start()
1210         capture = self.pg0.get_capture(1)
1211         self.verify_sa_init(capture[0])
1212
1213     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1214         tr_attr = self.sa.esp_crypto_attr()
1215         last_payload = last_payload or 'Notify'
1216         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1217                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1218                  key_length=tr_attr[0]) /
1219                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1220                  transform_id=self.sa.esp_integ) /
1221                  ikev2.IKEv2_payload_Transform(
1222                  transform_type='Extended Sequence Number',
1223                  transform_id='No ESN') /
1224                  ikev2.IKEv2_payload_Transform(
1225                  transform_type='Extended Sequence Number',
1226                  transform_id='ESN'))
1227
1228         c = self.sa.child_sas[0]
1229         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1230                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1231
1232         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1233         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1234                  auth_type=AuthMethod.value(self.sa.auth_method),
1235                  load=self.sa.auth_data) /
1236                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1237                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1238                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1239                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1240                  number_of_TSs=len(tsr), traffic_selector=tsr))
1241
1242         if is_rekey:
1243             first_payload = 'Nonce'
1244             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1245                      next_payload='SA') / plain /
1246                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1247                      proto='ESP', SPI=c.ispi))
1248         else:
1249             first_payload = 'IDi'
1250             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1251                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1252                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1253                    IDtype=self.sa.id_type, load=self.sa.r_id))
1254             plain = ids / plain
1255         return plain, first_payload
1256
1257     def send_sa_auth(self):
1258         plain, first_payload = self.generate_auth_payload(
1259                     last_payload='Notify')
1260         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1261         header = ikev2.IKEv2(
1262                 init_SPI=self.sa.ispi,
1263                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1264                 flags='Initiator', exch_type='IKE_AUTH')
1265
1266         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1267         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1268                                     self.sa.dport, self.sa.natt, self.ip6)
1269         self.pg0.add_stream(packet)
1270         self.pg0.enable_capture()
1271         self.pg_start()
1272         capture = self.pg0.get_capture(1)
1273         self.verify_sa_auth_resp(capture[0])
1274
1275     def verify_sa_init(self, packet):
1276         ih = self.get_ike_header(packet)
1277
1278         self.assertEqual(ih.id, self.sa.msg_id)
1279         self.assertEqual(ih.exch_type, 34)
1280         self.assertIn('Response', ih.flags)
1281         self.assertEqual(ih.init_SPI, self.sa.ispi)
1282         self.assertNotEqual(ih.resp_SPI, 0)
1283         self.sa.rspi = ih.resp_SPI
1284         try:
1285             sa = ih[ikev2.IKEv2_payload_SA]
1286             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1287             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1288         except IndexError as e:
1289             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1290             self.logger.error(ih.show())
1291             raise
1292         self.sa.complete_dh_data()
1293         self.sa.calc_keys()
1294         self.sa.auth_init()
1295
1296     def verify_sa_auth_resp(self, packet):
1297         ike = self.get_ike_header(packet)
1298         udp = packet[UDP]
1299         self.verify_udp(udp)
1300         self.assertEqual(ike.id, self.sa.msg_id)
1301         plain = self.sa.hmac_and_decrypt(ike)
1302         idr = ikev2.IKEv2_payload_IDr(plain)
1303         prop = idr[ikev2.IKEv2_payload_Proposal]
1304         self.assertEqual(prop.SPIsize, 4)
1305         self.sa.child_sas[0].rspi = prop.SPI
1306         self.sa.calc_child_keys()
1307
1308     def test_responder(self):
1309         self.send_sa_init_req(self.sa.natt)
1310         self.send_sa_auth()
1311         self.verify_ipsec_sas()
1312         self.verify_ike_sas()
1313
1314
1315 class Ikev2Params(object):
1316     def config_params(self, params={}):
1317         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1318         ei = VppEnum.vl_api_ipsec_integ_alg_t
1319         self.vpp_enums = {
1320                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1321                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1322                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1323                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1324                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1325                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1326
1327                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1328                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1329                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1330                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1331
1332         dpd_disabled = True if 'dpd_disabled' not in params else\
1333             params['dpd_disabled']
1334         if dpd_disabled:
1335             self.vapi.cli('ikev2 dpd disable')
1336         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1337             not in params else params['del_sa_from_responder']
1338         is_natt = 'natt' in params and params['natt'] or False
1339         self.p = Profile(self, 'pr1')
1340         self.ip6 = False if 'ip6' not in params else params['ip6']
1341
1342         if 'auth' in params and params['auth'] == 'rsa-sig':
1343             auth_method = 'rsa-sig'
1344             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1345             self.vapi.ikev2_set_local_key(
1346                     key_file=work_dir + params['server-key'])
1347
1348             client_file = work_dir + params['client-cert']
1349             server_pem = open(work_dir + params['server-cert']).read()
1350             client_priv = open(work_dir + params['client-key']).read()
1351             client_priv = load_pem_private_key(str.encode(client_priv), None,
1352                                                default_backend())
1353             self.peer_cert = x509.load_pem_x509_certificate(
1354                     str.encode(server_pem),
1355                     default_backend())
1356             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1357             auth_data = None
1358         else:
1359             auth_data = b'$3cr3tpa$$w0rd'
1360             self.p.add_auth(method='shared-key', data=auth_data)
1361             auth_method = 'shared-key'
1362             client_priv = None
1363
1364         is_init = True if 'is_initiator' not in params else\
1365             params['is_initiator']
1366
1367         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1368         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1369         if is_init:
1370             self.p.add_local_id(**idr)
1371             self.p.add_remote_id(**idi)
1372         else:
1373             self.p.add_local_id(**idi)
1374             self.p.add_remote_id(**idr)
1375
1376         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1377             'loc_ts' not in params else params['loc_ts']
1378         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1379             'rem_ts' not in params else params['rem_ts']
1380         self.p.add_local_ts(**loc_ts)
1381         self.p.add_remote_ts(**rem_ts)
1382         if 'responder' in params:
1383             self.p.add_responder(params['responder'])
1384         if 'ike_transforms' in params:
1385             self.p.add_ike_transforms(params['ike_transforms'])
1386         if 'esp_transforms' in params:
1387             self.p.add_esp_transforms(params['esp_transforms'])
1388
1389         udp_encap = False if 'udp_encap' not in params else\
1390             params['udp_encap']
1391         if udp_encap:
1392             self.p.set_udp_encap(True)
1393
1394         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1395                           is_initiator=is_init,
1396                           id_type=self.p.local_id['id_type'], natt=is_natt,
1397                           priv_key=client_priv, auth_method=auth_method,
1398                           auth_data=auth_data, udp_encap=udp_encap,
1399                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1400         if is_init:
1401             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1402                 params['ike-crypto']
1403             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1404                 params['ike-integ']
1405             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1406                 params['ike-dh']
1407
1408             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1409                 params['esp-crypto']
1410             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1411                 params['esp-integ']
1412
1413             self.sa.set_ike_props(
1414                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1415                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1416             self.sa.set_esp_props(
1417                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1418                     integ=esp_integ)
1419
1420
1421 class TestApi(VppTestCase):
1422     """ Test IKEV2 API """
1423     @classmethod
1424     def setUpClass(cls):
1425         super(TestApi, cls).setUpClass()
1426
1427     @classmethod
1428     def tearDownClass(cls):
1429         super(TestApi, cls).tearDownClass()
1430
1431     def tearDown(self):
1432         super(TestApi, self).tearDown()
1433         self.p1.remove_vpp_config()
1434         self.p2.remove_vpp_config()
1435         r = self.vapi.ikev2_profile_dump()
1436         self.assertEqual(len(r), 0)
1437
1438     def configure_profile(self, cfg):
1439         p = Profile(self, cfg['name'])
1440         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1441         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1442         p.add_local_ts(**cfg['loc_ts'])
1443         p.add_remote_ts(**cfg['rem_ts'])
1444         p.add_responder(cfg['responder'])
1445         p.add_ike_transforms(cfg['ike_ts'])
1446         p.add_esp_transforms(cfg['esp_ts'])
1447         p.add_auth(**cfg['auth'])
1448         p.set_udp_encap(cfg['udp_encap'])
1449         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1450         if 'lifetime_data' in cfg:
1451             p.set_lifetime_data(cfg['lifetime_data'])
1452         if 'tun_itf' in cfg:
1453             p.set_tunnel_interface(cfg['tun_itf'])
1454         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1455             p.disable_natt()
1456         p.add_vpp_config()
1457         return p
1458
1459     def test_profile_api(self):
1460         """ test profile dump API """
1461         loc_ts4 = {
1462                     'proto': 8,
1463                     'start_port': 1,
1464                     'end_port': 19,
1465                     'start_addr': '3.3.3.2',
1466                     'end_addr': '3.3.3.3',
1467                 }
1468         rem_ts4 = {
1469                     'proto': 9,
1470                     'start_port': 10,
1471                     'end_port': 119,
1472                     'start_addr': '4.5.76.80',
1473                     'end_addr': '2.3.4.6',
1474                 }
1475
1476         loc_ts6 = {
1477                     'proto': 8,
1478                     'start_port': 1,
1479                     'end_port': 19,
1480                     'start_addr': 'ab::1',
1481                     'end_addr': 'ab::4',
1482                 }
1483         rem_ts6 = {
1484                     'proto': 9,
1485                     'start_port': 10,
1486                     'end_port': 119,
1487                     'start_addr': 'cd::12',
1488                     'end_addr': 'cd::13',
1489                 }
1490
1491         conf = {
1492             'p1': {
1493                 'name': 'p1',
1494                 'natt_disabled': True,
1495                 'loc_id': ('fqdn', b'vpp.home'),
1496                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1497                 'loc_ts': loc_ts4,
1498                 'rem_ts': rem_ts4,
1499                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1500                 'ike_ts': {
1501                         'crypto_alg': 20,
1502                         'crypto_key_size': 32,
1503                         'integ_alg': 1,
1504                         'dh_group': 1},
1505                 'esp_ts': {
1506                         'crypto_alg': 13,
1507                         'crypto_key_size': 24,
1508                         'integ_alg': 2},
1509                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1510                 'udp_encap': True,
1511                 'ipsec_over_udp_port': 4501,
1512                 'lifetime_data': {
1513                     'lifetime': 123,
1514                     'lifetime_maxdata': 20192,
1515                     'lifetime_jitter': 9,
1516                     'handover': 132},
1517             },
1518             'p2': {
1519                 'name': 'p2',
1520                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1521                 'rem_id': ('ip6-addr', b'abcd::1'),
1522                 'loc_ts': loc_ts6,
1523                 'rem_ts': rem_ts6,
1524                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1525                 'ike_ts': {
1526                         'crypto_alg': 12,
1527                         'crypto_key_size': 16,
1528                         'integ_alg': 3,
1529                         'dh_group': 3},
1530                 'esp_ts': {
1531                         'crypto_alg': 9,
1532                         'crypto_key_size': 24,
1533                         'integ_alg': 4},
1534                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1535                 'udp_encap': False,
1536                 'ipsec_over_udp_port': 4600,
1537                 'tun_itf': 0}
1538         }
1539         self.p1 = self.configure_profile(conf['p1'])
1540         self.p2 = self.configure_profile(conf['p2'])
1541
1542         r = self.vapi.ikev2_profile_dump()
1543         self.assertEqual(len(r), 2)
1544         self.verify_profile(r[0].profile, conf['p1'])
1545         self.verify_profile(r[1].profile, conf['p2'])
1546
1547     def verify_id(self, api_id, cfg_id):
1548         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1549         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1550
1551     def verify_ts(self, api_ts, cfg_ts):
1552         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1553         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1554         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1555         self.assertEqual(api_ts.start_addr,
1556                          ip_address(text_type(cfg_ts['start_addr'])))
1557         self.assertEqual(api_ts.end_addr,
1558                          ip_address(text_type(cfg_ts['end_addr'])))
1559
1560     def verify_responder(self, api_r, cfg_r):
1561         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1562         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1563
1564     def verify_transforms(self, api_ts, cfg_ts):
1565         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1566         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1567         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1568
1569     def verify_ike_transforms(self, api_ts, cfg_ts):
1570         self.verify_transforms(api_ts, cfg_ts)
1571         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1572
1573     def verify_esp_transforms(self, api_ts, cfg_ts):
1574         self.verify_transforms(api_ts, cfg_ts)
1575
1576     def verify_auth(self, api_auth, cfg_auth):
1577         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1578         self.assertEqual(api_auth.data, cfg_auth['data'])
1579         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1580
1581     def verify_lifetime_data(self, p, ld):
1582         self.assertEqual(p.lifetime, ld['lifetime'])
1583         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1584         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1585         self.assertEqual(p.handover, ld['handover'])
1586
1587     def verify_profile(self, ap, cp):
1588         self.assertEqual(ap.name, cp['name'])
1589         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1590         self.verify_id(ap.loc_id, cp['loc_id'])
1591         self.verify_id(ap.rem_id, cp['rem_id'])
1592         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1593         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1594         self.verify_responder(ap.responder, cp['responder'])
1595         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1596         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1597         self.verify_auth(ap.auth, cp['auth'])
1598         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1599         self.assertTrue(natt_dis == ap.natt_disabled)
1600
1601         if 'lifetime_data' in cp:
1602             self.verify_lifetime_data(ap, cp['lifetime_data'])
1603         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1604         if 'tun_itf' in cp:
1605             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1606         else:
1607             self.assertEqual(ap.tun_itf, 0xffffffff)
1608
1609
1610 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1611     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1612
1613     def config_tc(self):
1614         self.config_params({
1615             'natt': True,
1616             'is_initiator': False,  # seen from test case perspective
1617                                     # thus vpp is initiator
1618             'responder': {'sw_if_index': self.pg0.sw_if_index,
1619                            'addr': self.pg0.remote_ip4},
1620             'ike-crypto': ('AES-GCM-16ICV', 32),
1621             'ike-integ': 'NULL',
1622             'ike-dh': '3072MODPgr',
1623             'ike_transforms': {
1624                 'crypto_alg': 20,  # "aes-gcm-16"
1625                 'crypto_key_size': 256,
1626                 'dh_group': 15,  # "modp-3072"
1627             },
1628             'esp_transforms': {
1629                 'crypto_alg': 12,  # "aes-cbc"
1630                 'crypto_key_size': 256,
1631                 # "hmac-sha2-256-128"
1632                 'integ_alg': 12}})
1633
1634
1635 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1636     """ test ikev2 initiator - pre shared key auth """
1637
1638     def config_tc(self):
1639         self.config_params({
1640             'is_initiator': False,  # seen from test case perspective
1641                                     # thus vpp is initiator
1642             'responder': {'sw_if_index': self.pg0.sw_if_index,
1643                            'addr': self.pg0.remote_ip4},
1644             'ike-crypto': ('AES-GCM-16ICV', 32),
1645             'ike-integ': 'NULL',
1646             'ike-dh': '3072MODPgr',
1647             'ike_transforms': {
1648                 'crypto_alg': 20,  # "aes-gcm-16"
1649                 'crypto_key_size': 256,
1650                 'dh_group': 15,  # "modp-3072"
1651             },
1652             'esp_transforms': {
1653                 'crypto_alg': 12,  # "aes-cbc"
1654                 'crypto_key_size': 256,
1655                 # "hmac-sha2-256-128"
1656                 'integ_alg': 12}})
1657
1658
1659 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1660     """ test initiator - request window size (1) """
1661
1662     def rekey_respond(self, req, update_child_sa_data):
1663         ih = self.get_ike_header(req)
1664         plain = self.sa.hmac_and_decrypt(ih)
1665         sa = ikev2.IKEv2_payload_SA(plain)
1666         if update_child_sa_data:
1667             prop = sa[ikev2.IKEv2_payload_Proposal]
1668             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1669             self.sa.r_nonce = self.sa.i_nonce
1670             self.sa.child_sas[0].ispi = prop.SPI
1671             self.sa.child_sas[0].rspi = prop.SPI
1672             self.sa.calc_child_keys()
1673
1674         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1675                              flags='Response', exch_type=36,
1676                              id=ih.id, next_payload='Encrypted')
1677         resp = self.encrypt_ike_msg(header, sa, 'SA')
1678         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1679                                     self.sa.dport, self.sa.natt, self.ip6)
1680         self.send_and_assert_no_replies(self.pg0, packet)
1681
1682     def test_initiator(self):
1683         super(TestInitiatorRequestWindowSize, self).test_initiator()
1684         self.pg0.enable_capture()
1685         self.pg_start()
1686         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1687         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1688         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1689         capture = self.pg0.get_capture(2)
1690
1691         # reply in reverse order
1692         self.rekey_respond(capture[1], True)
1693         self.rekey_respond(capture[0], False)
1694
1695         # verify that only the second request was accepted
1696         self.verify_ike_sas()
1697         self.verify_ipsec_sas(is_rekey=True)
1698
1699
1700 class TestInitiatorRekey(TestInitiatorPsk):
1701     """ test ikev2 initiator - rekey """
1702
1703     def rekey_from_initiator(self):
1704         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1705         self.pg0.enable_capture()
1706         self.pg_start()
1707         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1708         capture = self.pg0.get_capture(1)
1709         ih = self.get_ike_header(capture[0])
1710         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1711         self.assertNotIn('Response', ih.flags)
1712         self.assertIn('Initiator', ih.flags)
1713         plain = self.sa.hmac_and_decrypt(ih)
1714         sa = ikev2.IKEv2_payload_SA(plain)
1715         prop = sa[ikev2.IKEv2_payload_Proposal]
1716         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1717         self.sa.r_nonce = self.sa.i_nonce
1718         # update new responder SPI
1719         self.sa.child_sas[0].ispi = prop.SPI
1720         self.sa.child_sas[0].rspi = prop.SPI
1721         self.sa.calc_child_keys()
1722         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1723                              flags='Response', exch_type=36,
1724                              id=ih.id, next_payload='Encrypted')
1725         resp = self.encrypt_ike_msg(header, sa, 'SA')
1726         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1727                                     self.sa.dport, self.sa.natt, self.ip6)
1728         self.send_and_assert_no_replies(self.pg0, packet)
1729
1730     def test_initiator(self):
1731         super(TestInitiatorRekey, self).test_initiator()
1732         self.rekey_from_initiator()
1733         self.verify_ike_sas()
1734         self.verify_ipsec_sas(is_rekey=True)
1735
1736
1737 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1738     """ test ikev2 initiator - delete IKE SA from responder """
1739
1740     def config_tc(self):
1741         self.config_params({
1742             'del_sa_from_responder': True,
1743             'is_initiator': False,  # seen from test case perspective
1744                                     # thus vpp is initiator
1745             'responder': {'sw_if_index': self.pg0.sw_if_index,
1746                            'addr': self.pg0.remote_ip4},
1747             'ike-crypto': ('AES-GCM-16ICV', 32),
1748             'ike-integ': 'NULL',
1749             'ike-dh': '3072MODPgr',
1750             'ike_transforms': {
1751                 'crypto_alg': 20,  # "aes-gcm-16"
1752                 'crypto_key_size': 256,
1753                 'dh_group': 15,  # "modp-3072"
1754             },
1755             'esp_transforms': {
1756                 'crypto_alg': 12,  # "aes-cbc"
1757                 'crypto_key_size': 256,
1758                 # "hmac-sha2-256-128"
1759                 'integ_alg': 12}})
1760
1761
1762 class TestResponderNATT(TemplateResponder, Ikev2Params):
1763     """ test ikev2 responder - nat traversal """
1764     def config_tc(self):
1765         self.config_params(
1766                 {'natt': True})
1767
1768
1769 class TestResponderPsk(TemplateResponder, Ikev2Params):
1770     """ test ikev2 responder - pre shared key auth """
1771     def config_tc(self):
1772         self.config_params()
1773
1774
1775 class TestResponderDpd(TestResponderPsk):
1776     """
1777     Dead peer detection test
1778     """
1779     def config_tc(self):
1780         self.config_params({'dpd_disabled': False})
1781
1782     def tearDown(self):
1783         pass
1784
1785     def test_responder(self):
1786         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1787         super(TestResponderDpd, self).test_responder()
1788         self.pg0.enable_capture()
1789         self.pg_start()
1790         # capture empty request but don't reply
1791         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1792         ih = self.get_ike_header(capture[0])
1793         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1794         plain = self.sa.hmac_and_decrypt(ih)
1795         self.assertEqual(plain, b'')
1796         # wait for SA expiration
1797         time.sleep(3)
1798         ike_sas = self.vapi.ikev2_sa_dump()
1799         self.assertEqual(len(ike_sas), 0)
1800         ipsec_sas = self.vapi.ipsec_sa_dump()
1801         self.assertEqual(len(ipsec_sas), 0)
1802
1803
1804 class TestResponderRekey(TestResponderPsk):
1805     """ test ikev2 responder - rekey """
1806
1807     def rekey_from_initiator(self):
1808         packet = self.create_rekey_request()
1809         self.pg0.add_stream(packet)
1810         self.pg0.enable_capture()
1811         self.pg_start()
1812         capture = self.pg0.get_capture(1)
1813         ih = self.get_ike_header(capture[0])
1814         plain = self.sa.hmac_and_decrypt(ih)
1815         sa = ikev2.IKEv2_payload_SA(plain)
1816         prop = sa[ikev2.IKEv2_payload_Proposal]
1817         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1818         # update new responder SPI
1819         self.sa.child_sas[0].rspi = prop.SPI
1820
1821     def test_responder(self):
1822         super(TestResponderRekey, self).test_responder()
1823         self.rekey_from_initiator()
1824         self.sa.calc_child_keys()
1825         self.verify_ike_sas()
1826         self.verify_ipsec_sas(is_rekey=True)
1827
1828
1829 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1830     """ test ikev2 responder - cert based auth """
1831     def config_tc(self):
1832         self.config_params({
1833             'udp_encap': True,
1834             'auth': 'rsa-sig',
1835             'server-key': 'server-key.pem',
1836             'client-key': 'client-key.pem',
1837             'client-cert': 'client-cert.pem',
1838             'server-cert': 'server-cert.pem'})
1839
1840
1841 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1842         (TemplateResponder, Ikev2Params):
1843     """
1844     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1845     """
1846     def config_tc(self):
1847         self.config_params({
1848             'ike-crypto': ('AES-CBC', 16),
1849             'ike-integ': 'SHA2-256-128',
1850             'esp-crypto': ('AES-CBC', 24),
1851             'esp-integ': 'SHA2-384-192',
1852             'ike-dh': '2048MODPgr'})
1853
1854
1855 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1856         (TemplateResponder, Ikev2Params):
1857     """
1858     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1859     """
1860     def config_tc(self):
1861         self.config_params({
1862             'ike-crypto': ('AES-CBC', 32),
1863             'ike-integ': 'SHA2-256-128',
1864             'esp-crypto': ('AES-GCM-16ICV', 32),
1865             'esp-integ': 'NULL',
1866             'ike-dh': '3072MODPgr'})
1867
1868
1869 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1870     """
1871     IKE:AES_GCM_16_256
1872     """
1873     def config_tc(self):
1874         self.config_params({
1875             'del_sa_from_responder': True,
1876             'ip6': True,
1877             'natt': True,
1878             'ike-crypto': ('AES-GCM-16ICV', 32),
1879             'ike-integ': 'NULL',
1880             'ike-dh': '2048MODPgr',
1881             'loc_ts': {'start_addr': 'ab:cd::0',
1882                        'end_addr': 'ab:cd::10'},
1883             'rem_ts': {'start_addr': '11::0',
1884                        'end_addr': '11::100'}})
1885
1886
1887 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1888     """
1889     Test for keep alive messages
1890     """
1891
1892     def send_empty_req_from_responder(self):
1893         packet = self.create_empty_request()
1894         self.pg0.add_stream(packet)
1895         self.pg0.enable_capture()
1896         self.pg_start()
1897         capture = self.pg0.get_capture(1)
1898         ih = self.get_ike_header(capture[0])
1899         self.assertEqual(ih.id, self.sa.msg_id)
1900         plain = self.sa.hmac_and_decrypt(ih)
1901         self.assertEqual(plain, b'')
1902
1903     def test_initiator(self):
1904         super(TestInitiatorKeepaliveMsg, self).test_initiator()
1905         self.send_empty_req_from_responder()
1906
1907
1908 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1909     """ malformed packet test """
1910
1911     def tearDown(self):
1912         pass
1913
1914     def config_tc(self):
1915         self.config_params()
1916
1917     def assert_counter(self, count, name, version='ip4'):
1918         node_name = '/err/ikev2-%s/' % version + name
1919         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1920
1921     def create_ike_init_msg(self, length=None, payload=None):
1922         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1923                           flags='Initiator', exch_type='IKE_SA_INIT')
1924         if payload is not None:
1925             msg /= payload
1926         return self.create_packet(self.pg0, msg, self.sa.sport,
1927                                   self.sa.dport)
1928
1929     def verify_bad_packet_length(self):
1930         ike_msg = self.create_ike_init_msg(length=0xdead)
1931         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1932         self.assert_counter(self.pkt_count, 'Bad packet length')
1933
1934     def verify_bad_sa_payload_length(self):
1935         p = ikev2.IKEv2_payload_SA(length=0xdead)
1936         ike_msg = self.create_ike_init_msg(payload=p)
1937         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1938         self.assert_counter(self.pkt_count, 'Malformed packet')
1939
1940     def test_responder(self):
1941         self.pkt_count = 254
1942         self.verify_bad_packet_length()
1943         self.verify_bad_sa_payload_length()
1944
1945
1946 if __name__ == '__main__':
1947     unittest.main(testRunner=VppTestRunner)