ikev2: add tests for DPD
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 from scapy.layers.ipsec import ESP
16 from scapy.layers.inet import IP, UDP, Ether
17 from scapy.layers.inet6 import IPv6
18 from scapy.packet import raw, Raw
19 from scapy.utils import long_converter
20 from framework import VppTestCase, VppTestRunner
21 from vpp_ikev2 import Profile, IDType, AuthMethod
22 from vpp_papi import VppEnum
23
24 try:
25     text_type = unicode
26 except NameError:
27     text_type = str
28
29 KEY_PAD = b"Key Pad for IKEv2"
30 SALT_SIZE = 4
31 GCM_ICV_SIZE = 16
32 GCM_IV_SIZE = 8
33
34
35 # defined in rfc3526
36 # tuple structure is (p, g, key_len)
37 DH = {
38     '2048MODPgr': (long_converter("""
39     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
40     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
41     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
42     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
43     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
44     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
45     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
46     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
47     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
48     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
49     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
50
51     '3072MODPgr': (long_converter("""
52     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
63     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
64     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
65     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
66     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
67     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
68 }
69
70
71 class CryptoAlgo(object):
72     def __init__(self, name, cipher, mode):
73         self.name = name
74         self.cipher = cipher
75         self.mode = mode
76         if self.cipher is not None:
77             self.bs = self.cipher.block_size // 8
78
79             if self.name == 'AES-GCM-16ICV':
80                 self.iv_len = GCM_IV_SIZE
81             else:
82                 self.iv_len = self.bs
83
84     def encrypt(self, data, key, aad=None):
85         iv = os.urandom(self.iv_len)
86         if aad is None:
87             encryptor = Cipher(self.cipher(key), self.mode(iv),
88                                default_backend()).encryptor()
89             return iv + encryptor.update(data) + encryptor.finalize()
90         else:
91             salt = key[-SALT_SIZE:]
92             nonce = salt + iv
93             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
94                                default_backend()).encryptor()
95             encryptor.authenticate_additional_data(aad)
96             data = encryptor.update(data) + encryptor.finalize()
97             data += encryptor.tag[:GCM_ICV_SIZE]
98             return iv + data
99
100     def decrypt(self, data, key, aad=None, icv=None):
101         if aad is None:
102             iv = data[:self.iv_len]
103             ct = data[self.iv_len:]
104             decryptor = Cipher(algorithms.AES(key),
105                                self.mode(iv),
106                                default_backend()).decryptor()
107             return decryptor.update(ct) + decryptor.finalize()
108         else:
109             salt = key[-SALT_SIZE:]
110             nonce = salt + data[:GCM_IV_SIZE]
111             ct = data[GCM_IV_SIZE:]
112             key = key[:-SALT_SIZE]
113             decryptor = Cipher(algorithms.AES(key),
114                                self.mode(nonce, icv, len(icv)),
115                                default_backend()).decryptor()
116             decryptor.authenticate_additional_data(aad)
117             return decryptor.update(ct) + decryptor.finalize()
118
119     def pad(self, data):
120         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121         data = data + b'\x00' * (pad_len - 1)
122         return data + bytes([pad_len - 1])
123
124
125 class AuthAlgo(object):
126     def __init__(self, name, mac, mod, key_len, trunc_len=None):
127         self.name = name
128         self.mac = mac
129         self.mod = mod
130         self.key_len = key_len
131         self.trunc_len = trunc_len or key_len
132
133
134 CRYPTO_ALGOS = {
135     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
138                                 mode=modes.GCM),
139 }
140
141 AUTH_ALGOS = {
142     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
147 }
148
149 PRF_ALGOS = {
150     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
152                                   hashes.SHA256, 32),
153 }
154
155 CRYPTO_IDS = {
156     12: 'AES-CBC',
157     20: 'AES-GCM-16ICV',
158 }
159
160 INTEG_IDS = {
161     2: 'HMAC-SHA1-96',
162     12: 'SHA2-256-128',
163     13: 'SHA2-384-192',
164     14: 'SHA2-512-256',
165 }
166
167
168 class IKEv2ChildSA(object):
169     def __init__(self, local_ts, remote_ts, is_initiator):
170         spi = os.urandom(4)
171         if is_initiator:
172             self.ispi = spi
173             self.rspi = None
174         else:
175             self.rspi = spi
176             self.ispi = None
177         self.local_ts = local_ts
178         self.remote_ts = remote_ts
179
180
181 class IKEv2SA(object):
182     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
183                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
184                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
185                  auth_method='shared-key', priv_key=None, natt=False,
186                  udp_encap=False):
187         self.udp_encap = udp_encap
188         self.natt = natt
189         if natt:
190             self.sport = 4500
191             self.dport = 4500
192         else:
193             self.sport = 500
194             self.dport = 500
195         self.msg_id = 0
196         self.dh_params = None
197         self.test = test
198         self.priv_key = priv_key
199         self.is_initiator = is_initiator
200         nonce = nonce or os.urandom(32)
201         self.auth_data = auth_data
202         self.i_id = i_id
203         self.r_id = r_id
204         if isinstance(id_type, str):
205             self.id_type = IDType.value(id_type)
206         else:
207             self.id_type = id_type
208         self.auth_method = auth_method
209         if self.is_initiator:
210             self.rspi = 8 * b'\x00'
211             self.ispi = spi
212             self.i_nonce = nonce
213         else:
214             self.rspi = spi
215             self.ispi = 8 * b'\x00'
216             self.r_nonce = nonce
217         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
218                           self.is_initiator)]
219
220     def new_msg_id(self):
221         self.msg_id += 1
222         return self.msg_id
223
224     @property
225     def my_dh_pub_key(self):
226         if self.is_initiator:
227             return self.i_dh_data
228         return self.r_dh_data
229
230     @property
231     def peer_dh_pub_key(self):
232         if self.is_initiator:
233             return self.r_dh_data
234         return self.i_dh_data
235
236     def compute_secret(self):
237         priv = self.dh_private_key
238         peer = self.peer_dh_pub_key
239         p, g, l = self.ike_group
240         return pow(int.from_bytes(peer, 'big'),
241                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
242
243     def generate_dh_data(self):
244         # generate DH keys
245         if self.ike_dh not in DH:
246             raise NotImplementedError('%s not in DH group' % self.ike_dh)
247
248         if self.dh_params is None:
249             dhg = DH[self.ike_dh]
250             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
251             self.dh_params = pn.parameters(default_backend())
252
253         priv = self.dh_params.generate_private_key()
254         pub = priv.public_key()
255         x = priv.private_numbers().x
256         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
257         y = pub.public_numbers().y
258
259         if self.is_initiator:
260             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
261         else:
262             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
263
264     def complete_dh_data(self):
265         self.dh_shared_secret = self.compute_secret()
266
267     def calc_child_keys(self):
268         prf = self.ike_prf_alg.mod()
269         s = self.i_nonce + self.r_nonce
270         c = self.child_sas[0]
271
272         encr_key_len = self.esp_crypto_key_len
273         integ_key_len = self.esp_integ_alg.key_len
274         salt_len = 0 if integ_key_len else 4
275
276         l = (integ_key_len * 2 +
277              encr_key_len * 2 +
278              salt_len * 2)
279         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
280
281         pos = 0
282         c.sk_ei = keymat[pos:pos+encr_key_len]
283         pos += encr_key_len
284
285         if integ_key_len:
286             c.sk_ai = keymat[pos:pos+integ_key_len]
287             pos += integ_key_len
288         else:
289             c.salt_ei = keymat[pos:pos+salt_len]
290             pos += salt_len
291
292         c.sk_er = keymat[pos:pos+encr_key_len]
293         pos += encr_key_len
294
295         if integ_key_len:
296             c.sk_ar = keymat[pos:pos+integ_key_len]
297             pos += integ_key_len
298         else:
299             c.salt_er = keymat[pos:pos+salt_len]
300             pos += salt_len
301
302     def calc_prfplus(self, prf, key, seed, length):
303         r = b''
304         t = None
305         x = 1
306         while len(r) < length and x < 255:
307             if t is not None:
308                 s = t
309             else:
310                 s = b''
311             s = s + seed + bytes([x])
312             t = self.calc_prf(prf, key, s)
313             r = r + t
314             x = x + 1
315
316         if x == 255:
317             return None
318         return r
319
320     def calc_prf(self, prf, key, data):
321         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
322         h.update(data)
323         return h.finalize()
324
325     def calc_keys(self):
326         prf = self.ike_prf_alg.mod()
327         # SKEYSEED = prf(Ni | Nr, g^ir)
328         s = self.i_nonce + self.r_nonce
329         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
330
331         # calculate S = Ni | Nr | SPIi SPIr
332         s = s + self.ispi + self.rspi
333
334         prf_key_trunc = self.ike_prf_alg.trunc_len
335         encr_key_len = self.ike_crypto_key_len
336         tr_prf_key_len = self.ike_prf_alg.key_len
337         integ_key_len = self.ike_integ_alg.key_len
338         if integ_key_len == 0:
339             salt_size = 4
340         else:
341             salt_size = 0
342
343         l = (prf_key_trunc +
344              integ_key_len * 2 +
345              encr_key_len * 2 +
346              tr_prf_key_len * 2 +
347              salt_size * 2)
348         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
349
350         pos = 0
351         self.sk_d = keymat[:pos+prf_key_trunc]
352         pos += prf_key_trunc
353
354         self.sk_ai = keymat[pos:pos+integ_key_len]
355         pos += integ_key_len
356         self.sk_ar = keymat[pos:pos+integ_key_len]
357         pos += integ_key_len
358
359         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
360         pos += encr_key_len + salt_size
361         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
362         pos += encr_key_len + salt_size
363
364         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
365         pos += tr_prf_key_len
366         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
367
368     def generate_authmsg(self, prf, packet):
369         if self.is_initiator:
370             id = self.i_id
371             nonce = self.r_nonce
372             key = self.sk_pi
373         else:
374             id = self.r_id
375             nonce = self.i_nonce
376             key = self.sk_pr
377         data = bytes([self.id_type, 0, 0, 0]) + id
378         id_hash = self.calc_prf(prf, key, data)
379         return packet + nonce + id_hash
380
381     def auth_init(self):
382         prf = self.ike_prf_alg.mod()
383         if self.is_initiator:
384             packet = self.init_req_packet
385         else:
386             packet = self.init_resp_packet
387         authmsg = self.generate_authmsg(prf, raw(packet))
388         if self.auth_method == 'shared-key':
389             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
390             self.auth_data = self.calc_prf(prf, psk, authmsg)
391         elif self.auth_method == 'rsa-sig':
392             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
393                                                 hashes.SHA1())
394         else:
395             raise TypeError('unknown auth method type!')
396
397     def encrypt(self, data, aad=None):
398         data = self.ike_crypto_alg.pad(data)
399         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
400
401     @property
402     def peer_authkey(self):
403         if self.is_initiator:
404             return self.sk_ar
405         return self.sk_ai
406
407     @property
408     def my_authkey(self):
409         if self.is_initiator:
410             return self.sk_ai
411         return self.sk_ar
412
413     @property
414     def my_cryptokey(self):
415         if self.is_initiator:
416             return self.sk_ei
417         return self.sk_er
418
419     @property
420     def peer_cryptokey(self):
421         if self.is_initiator:
422             return self.sk_er
423         return self.sk_ei
424
425     def concat(self, alg, key_len):
426         return alg + '-' + str(key_len * 8)
427
428     @property
429     def vpp_ike_cypto_alg(self):
430         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
431
432     @property
433     def vpp_esp_cypto_alg(self):
434         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
435
436     def verify_hmac(self, ikemsg):
437         integ_trunc = self.ike_integ_alg.trunc_len
438         exp_hmac = ikemsg[-integ_trunc:]
439         data = ikemsg[:-integ_trunc]
440         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
441                                           self.peer_authkey, data)
442         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
443
444     def compute_hmac(self, integ, key, data):
445         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
446         h.update(data)
447         return h.finalize()
448
449     def decrypt(self, data, aad=None, icv=None):
450         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
451
452     def hmac_and_decrypt(self, ike):
453         ep = ike[ikev2.IKEv2_payload_Encrypted]
454         if self.ike_crypto == 'AES-GCM-16ICV':
455             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
456             ct = ep.load[:-GCM_ICV_SIZE]
457             tag = ep.load[-GCM_ICV_SIZE:]
458             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
459         else:
460             self.verify_hmac(raw(ike))
461             integ_trunc = self.ike_integ_alg.trunc_len
462
463             # remove ICV and decrypt payload
464             ct = ep.load[:-integ_trunc]
465             plain = self.decrypt(ct)
466         # remove padding
467         pad_len = plain[-1]
468         return plain[:-pad_len - 1]
469
470     def build_ts_addr(self, ts, version):
471         return {'starting_address_v' + version: ts['start_addr'],
472                 'ending_address_v' + version: ts['end_addr']}
473
474     def generate_ts(self, is_ip4):
475         c = self.child_sas[0]
476         ts_data = {'IP_protocol_ID': 0,
477                    'start_port': 0,
478                    'end_port': 0xffff}
479         if is_ip4:
480             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
481             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
482             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
483             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
484         else:
485             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
486             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
487             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
488             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
489
490         if self.is_initiator:
491             return ([ts1], [ts2])
492         return ([ts2], [ts1])
493
494     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
495         if crypto not in CRYPTO_ALGOS:
496             raise TypeError('unsupported encryption algo %r' % crypto)
497         self.ike_crypto = crypto
498         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
499         self.ike_crypto_key_len = crypto_key_len
500
501         if integ not in AUTH_ALGOS:
502             raise TypeError('unsupported auth algo %r' % integ)
503         self.ike_integ = None if integ == 'NULL' else integ
504         self.ike_integ_alg = AUTH_ALGOS[integ]
505
506         if prf not in PRF_ALGOS:
507             raise TypeError('unsupported prf algo %r' % prf)
508         self.ike_prf = prf
509         self.ike_prf_alg = PRF_ALGOS[prf]
510         self.ike_dh = dh
511         self.ike_group = DH[self.ike_dh]
512
513     def set_esp_props(self, crypto, crypto_key_len, integ):
514         self.esp_crypto_key_len = crypto_key_len
515         if crypto not in CRYPTO_ALGOS:
516             raise TypeError('unsupported encryption algo %r' % crypto)
517         self.esp_crypto = crypto
518         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
519
520         if integ not in AUTH_ALGOS:
521             raise TypeError('unsupported auth algo %r' % integ)
522         self.esp_integ = None if integ == 'NULL' else integ
523         self.esp_integ_alg = AUTH_ALGOS[integ]
524
525     def crypto_attr(self, key_len):
526         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
527             return (0x800e << 16 | key_len << 3, 12)
528         else:
529             raise Exception('unsupported attribute type')
530
531     def ike_crypto_attr(self):
532         return self.crypto_attr(self.ike_crypto_key_len)
533
534     def esp_crypto_attr(self):
535         return self.crypto_attr(self.esp_crypto_key_len)
536
537     def compute_nat_sha1(self, ip, port, rspi=None):
538         if rspi is None:
539             rspi = self.rspi
540         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
541         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
542         digest.update(data)
543         return digest.finalize()
544
545
546 class IkePeer(VppTestCase):
547     """ common class for initiator and responder """
548
549     @classmethod
550     def setUpClass(cls):
551         import scapy.contrib.ikev2 as _ikev2
552         globals()['ikev2'] = _ikev2
553         super(IkePeer, cls).setUpClass()
554         cls.create_pg_interfaces(range(2))
555         for i in cls.pg_interfaces:
556             i.admin_up()
557             i.config_ip4()
558             i.resolve_arp()
559             i.config_ip6()
560             i.resolve_ndp()
561
562     @classmethod
563     def tearDownClass(cls):
564         super(IkePeer, cls).tearDownClass()
565
566     def tearDown(self):
567         super(IkePeer, self).tearDown()
568         if self.del_sa_from_responder:
569             self.initiate_del_sa_from_responder()
570         else:
571             self.initiate_del_sa_from_initiator()
572         r = self.vapi.ikev2_sa_dump()
573         self.assertEqual(len(r), 0)
574         sas = self.vapi.ipsec_sa_dump()
575         self.assertEqual(len(sas), 0)
576         self.p.remove_vpp_config()
577         self.assertIsNone(self.p.query_vpp_config())
578
579     def setUp(self):
580         super(IkePeer, self).setUp()
581         self.config_tc()
582         self.p.add_vpp_config()
583         self.assertIsNotNone(self.p.query_vpp_config())
584         if self.sa.is_initiator:
585             self.sa.generate_dh_data()
586         self.vapi.cli('ikev2 set logging level 4')
587         self.vapi.cli('event-lo clear')
588
589     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
590                       use_ip6=False):
591         if use_ip6:
592             src_ip = src_if.remote_ip6
593             dst_ip = src_if.local_ip6
594             ip_layer = IPv6
595         else:
596             src_ip = src_if.remote_ip4
597             dst_ip = src_if.local_ip4
598             ip_layer = IP
599         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
600                ip_layer(src=src_ip, dst=dst_ip) /
601                UDP(sport=sport, dport=dport))
602         if natt:
603             # insert non ESP marker
604             res = res / Raw(b'\x00' * 4)
605         return res / msg
606
607     def verify_udp(self, udp):
608         self.assertEqual(udp.sport, self.sa.sport)
609         self.assertEqual(udp.dport, self.sa.dport)
610
611     def get_ike_header(self, packet):
612         try:
613             ih = packet[ikev2.IKEv2]
614         except IndexError as e:
615             # this is a workaround for getting IKEv2 layer as both ikev2 and
616             # ipsec register for port 4500
617             esp = packet[ESP]
618             ih = self.verify_and_remove_non_esp_marker(esp)
619         self.assertEqual(ih.version, 0x20)
620         self.assertNotIn('Version', ih.flags)
621         return ih
622
623     def verify_and_remove_non_esp_marker(self, packet):
624         if self.sa.natt:
625             # if we are in nat traversal mode check for non esp marker
626             # and remove it
627             data = raw(packet)
628             self.assertEqual(data[:4], b'\x00' * 4)
629             return ikev2.IKEv2(data[4:])
630         else:
631             return packet
632
633     def encrypt_ike_msg(self, header, plain, first_payload):
634         if self.sa.ike_crypto == 'AES-GCM-16ICV':
635             data = self.sa.ike_crypto_alg.pad(raw(plain))
636             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
637                 len(ikev2.IKEv2_payload_Encrypted())
638             tlen = plen + len(ikev2.IKEv2())
639
640             # prepare aad data
641             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
642                                                  length=plen)
643             header.length = tlen
644             res = header / sk_p
645             encr = self.sa.encrypt(raw(plain), raw(res))
646             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
647                                                  length=plen, load=encr)
648             res = header / sk_p
649         else:
650             encr = self.sa.encrypt(raw(plain))
651             trunc_len = self.sa.ike_integ_alg.trunc_len
652             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
653             tlen = plen + len(ikev2.IKEv2())
654
655             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
656                                                  length=plen, load=encr)
657             header.length = tlen
658             res = header / sk_p
659
660             integ_data = raw(res)
661             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
662                                              self.sa.my_authkey, integ_data)
663             res = res / Raw(hmac_data[:trunc_len])
664         assert(len(res) == tlen)
665         return res
666
667     def verify_udp_encap(self, ipsec_sa):
668         e = VppEnum.vl_api_ipsec_sad_flags_t
669         if self.sa.udp_encap or self.sa.natt:
670             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
671         else:
672             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
673
674     def verify_ipsec_sas(self, is_rekey=False):
675         sas = self.vapi.ipsec_sa_dump()
676         if is_rekey:
677             # after rekey there is a short period of time in which old
678             # inbound SA is still present
679             sa_count = 3
680         else:
681             sa_count = 2
682         self.assertEqual(len(sas), sa_count)
683         if self.sa.is_initiator:
684             if is_rekey:
685                 sa0 = sas[0].entry
686                 sa1 = sas[2].entry
687             else:
688                 sa0 = sas[0].entry
689                 sa1 = sas[1].entry
690         else:
691             if is_rekey:
692                 sa0 = sas[2].entry
693                 sa1 = sas[0].entry
694             else:
695                 sa1 = sas[0].entry
696                 sa0 = sas[1].entry
697
698         c = self.sa.child_sas[0]
699
700         self.verify_udp_encap(sa0)
701         self.verify_udp_encap(sa1)
702         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
703         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
704         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
705
706         if self.sa.esp_integ is None:
707             vpp_integ_alg = 0
708         else:
709             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
710         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
711         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
712
713         # verify crypto keys
714         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
715         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
716         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
717         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
718
719         # verify integ keys
720         if vpp_integ_alg:
721             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
722             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
723             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
724             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
725         else:
726             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
727             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
728
729     def verify_keymat(self, api_keys, keys, name):
730         km = getattr(keys, name)
731         api_km = getattr(api_keys, name)
732         api_km_len = getattr(api_keys, name + '_len')
733         self.assertEqual(len(km), api_km_len)
734         self.assertEqual(km, api_km[:api_km_len])
735
736     def verify_id(self, api_id, exp_id):
737         self.assertEqual(api_id.type, IDType.value(exp_id.type))
738         self.assertEqual(api_id.data_len, exp_id.data_len)
739         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
740
741     def verify_ike_sas(self):
742         r = self.vapi.ikev2_sa_dump()
743         self.assertEqual(len(r), 1)
744         sa = r[0].sa
745         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
746         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
747         if self.ip6:
748             if self.sa.is_initiator:
749                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
750                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
751             else:
752                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
753                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
754         else:
755             if self.sa.is_initiator:
756                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
757                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
758             else:
759                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
760                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
761         self.verify_keymat(sa.keys, self.sa, 'sk_d')
762         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
763         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
764         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
765         self.verify_keymat(sa.keys, self.sa, 'sk_er')
766         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
767         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
768
769         self.assertEqual(sa.i_id.type, self.sa.id_type)
770         self.assertEqual(sa.r_id.type, self.sa.id_type)
771         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
772         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
773         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
774         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
775
776         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
777         self.assertEqual(len(r), 1)
778         csa = r[0].child_sa
779         self.assertEqual(csa.sa_index, sa.sa_index)
780         c = self.sa.child_sas[0]
781         if hasattr(c, 'sk_ai'):
782             self.verify_keymat(csa.keys, c, 'sk_ai')
783             self.verify_keymat(csa.keys, c, 'sk_ar')
784         self.verify_keymat(csa.keys, c, 'sk_ei')
785         self.verify_keymat(csa.keys, c, 'sk_er')
786         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
787         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
788
789         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
790         tsi = tsi[0]
791         tsr = tsr[0]
792         r = self.vapi.ikev2_traffic_selector_dump(
793                 is_initiator=True, sa_index=sa.sa_index,
794                 child_sa_index=csa.child_sa_index)
795         self.assertEqual(len(r), 1)
796         ts = r[0].ts
797         self.verify_ts(r[0].ts, tsi[0], True)
798
799         r = self.vapi.ikev2_traffic_selector_dump(
800                 is_initiator=False, sa_index=sa.sa_index,
801                 child_sa_index=csa.child_sa_index)
802         self.assertEqual(len(r), 1)
803         self.verify_ts(r[0].ts, tsr[0], False)
804
805         n = self.vapi.ikev2_nonce_get(is_initiator=True,
806                                       sa_index=sa.sa_index)
807         self.verify_nonce(n, self.sa.i_nonce)
808         n = self.vapi.ikev2_nonce_get(is_initiator=False,
809                                       sa_index=sa.sa_index)
810         self.verify_nonce(n, self.sa.r_nonce)
811
812     def verify_nonce(self, api_nonce, nonce):
813         self.assertEqual(api_nonce.data_len, len(nonce))
814         self.assertEqual(api_nonce.nonce, nonce)
815
816     def verify_ts(self, api_ts, ts, is_initiator):
817         if is_initiator:
818             self.assertTrue(api_ts.is_local)
819         else:
820             self.assertFalse(api_ts.is_local)
821
822         if self.p.ts_is_ip4:
823             self.assertEqual(api_ts.start_addr,
824                              IPv4Address(ts.starting_address_v4))
825             self.assertEqual(api_ts.end_addr,
826                              IPv4Address(ts.ending_address_v4))
827         else:
828             self.assertEqual(api_ts.start_addr,
829                              IPv6Address(ts.starting_address_v6))
830             self.assertEqual(api_ts.end_addr,
831                              IPv6Address(ts.ending_address_v6))
832         self.assertEqual(api_ts.start_port, ts.start_port)
833         self.assertEqual(api_ts.end_port, ts.end_port)
834         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
835
836
837 class TemplateInitiator(IkePeer):
838     """ initiator test template """
839
840     def initiate_del_sa_from_initiator(self):
841         ispi = int.from_bytes(self.sa.ispi, 'little')
842         self.pg0.enable_capture()
843         self.pg_start()
844         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
845         capture = self.pg0.get_capture(1)
846         ih = self.get_ike_header(capture[0])
847         self.assertNotIn('Response', ih.flags)
848         self.assertIn('Initiator', ih.flags)
849         self.assertEqual(ih.init_SPI, self.sa.ispi)
850         self.assertEqual(ih.resp_SPI, self.sa.rspi)
851         plain = self.sa.hmac_and_decrypt(ih)
852         d = ikev2.IKEv2_payload_Delete(plain)
853         self.assertEqual(d.proto, 1)  # proto=IKEv2
854         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
855                              flags='Response', exch_type='INFORMATIONAL',
856                              id=ih.id, next_payload='Encrypted')
857         resp = self.encrypt_ike_msg(header, b'', None)
858         self.send_and_assert_no_replies(self.pg0, resp)
859
860     def verify_del_sa(self, packet):
861         ih = self.get_ike_header(packet)
862         self.assertEqual(ih.id, self.sa.msg_id)
863         self.assertEqual(ih.exch_type, 37)  # exchange informational
864         self.assertIn('Response', ih.flags)
865         self.assertIn('Initiator', ih.flags)
866         plain = self.sa.hmac_and_decrypt(ih)
867         self.assertEqual(plain, b'')
868
869     def initiate_del_sa_from_responder(self):
870         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
871                              exch_type='INFORMATIONAL',
872                              id=self.sa.new_msg_id())
873         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
874         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
875         packet = self.create_packet(self.pg0, ike_msg,
876                                     self.sa.sport, self.sa.dport,
877                                     self.sa.natt, self.ip6)
878         self.pg0.add_stream(packet)
879         self.pg0.enable_capture()
880         self.pg_start()
881         capture = self.pg0.get_capture(1)
882         self.verify_del_sa(capture[0])
883
884     @staticmethod
885     def find_notify_payload(packet, notify_type):
886         n = packet[ikev2.IKEv2_payload_Notify]
887         while n is not None:
888             if n.type == notify_type:
889                 return n
890             n = n.payload
891         return None
892
893     def verify_nat_detection(self, packet):
894         if self.ip6:
895             iph = packet[IPv6]
896         else:
897             iph = packet[IP]
898         udp = packet[UDP]
899
900         # NAT_DETECTION_SOURCE_IP
901         s = self.find_notify_payload(packet, 16388)
902         self.assertIsNotNone(s)
903         src_sha = self.sa.compute_nat_sha1(
904                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
905         self.assertEqual(s.load, src_sha)
906
907         # NAT_DETECTION_DESTINATION_IP
908         s = self.find_notify_payload(packet, 16389)
909         self.assertIsNotNone(s)
910         dst_sha = self.sa.compute_nat_sha1(
911                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
912         self.assertEqual(s.load, dst_sha)
913
914     def verify_sa_init_request(self, packet):
915         ih = packet[ikev2.IKEv2]
916         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
917         self.assertEqual(ih.exch_type, 34)  # SA_INIT
918         self.sa.ispi = ih.init_SPI
919         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
920         self.assertIn('Initiator', ih.flags)
921         self.assertNotIn('Response', ih.flags)
922         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
923         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
924
925         prop = packet[ikev2.IKEv2_payload_Proposal]
926         self.assertEqual(prop.proto, 1)  # proto = ikev2
927         self.assertEqual(prop.proposal, 1)
928         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
929         self.assertEqual(prop.trans[0].transform_id,
930                          self.p.ike_transforms['crypto_alg'])
931         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
932         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
933         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
934         self.assertEqual(prop.trans[2].transform_id,
935                          self.p.ike_transforms['dh_group'])
936
937         self.verify_nat_detection(packet)
938         self.sa.set_ike_props(
939                     crypto='AES-GCM-16ICV', crypto_key_len=32,
940                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
941         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
942                               integ='SHA2-256-128')
943         self.sa.generate_dh_data()
944         self.sa.complete_dh_data()
945         self.sa.calc_keys()
946
947     def update_esp_transforms(self, trans, sa):
948         while trans:
949             if trans.transform_type == 1:  # ecryption
950                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
951             elif trans.transform_type == 3:  # integrity
952                 sa.esp_integ = INTEG_IDS[trans.transform_id]
953             trans = trans.payload
954
955     def verify_sa_auth_req(self, packet):
956         ih = self.get_ike_header(packet)
957         self.assertEqual(ih.resp_SPI, self.sa.rspi)
958         self.assertEqual(ih.init_SPI, self.sa.ispi)
959         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
960         self.assertIn('Initiator', ih.flags)
961         self.assertNotIn('Response', ih.flags)
962
963         udp = packet[UDP]
964         self.verify_udp(udp)
965         self.assertEqual(ih.id, self.sa.msg_id + 1)
966         self.sa.msg_id += 1
967         plain = self.sa.hmac_and_decrypt(ih)
968         idi = ikev2.IKEv2_payload_IDi(plain)
969         idr = ikev2.IKEv2_payload_IDr(idi.payload)
970         self.assertEqual(idi.load, self.sa.i_id)
971         self.assertEqual(idr.load, self.sa.r_id)
972         prop = idi[ikev2.IKEv2_payload_Proposal]
973         c = self.sa.child_sas[0]
974         c.ispi = prop.SPI
975         self.update_esp_transforms(
976                 prop[ikev2.IKEv2_payload_Transform], self.sa)
977
978     def send_init_response(self):
979         tr_attr = self.sa.ike_crypto_attr()
980         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
981                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
982                  key_length=tr_attr[0]) /
983                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
984                  transform_id=self.sa.ike_integ) /
985                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
986                  transform_id=self.sa.ike_prf_alg.name) /
987                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
988                  transform_id=self.sa.ike_dh))
989         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
990                  trans_nb=4, trans=trans))
991         self.sa.init_resp_packet = (
992             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
993                         exch_type='IKE_SA_INIT', flags='Response') /
994             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
995             ikev2.IKEv2_payload_KE(next_payload='Nonce',
996                                    group=self.sa.ike_dh,
997                                    load=self.sa.my_dh_pub_key) /
998             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
999
1000         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1001                                      self.sa.sport, self.sa.dport,
1002                                      self.sa.natt, self.ip6)
1003         self.pg_send(self.pg0, ike_msg)
1004         capture = self.pg0.get_capture(1)
1005         self.verify_sa_auth_req(capture[0])
1006
1007     def initiate_sa_init(self):
1008         self.pg0.enable_capture()
1009         self.pg_start()
1010         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1011
1012         capture = self.pg0.get_capture(1)
1013         self.verify_sa_init_request(capture[0])
1014         self.send_init_response()
1015
1016     def send_auth_response(self):
1017         tr_attr = self.sa.esp_crypto_attr()
1018         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1019                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1020                  key_length=tr_attr[0]) /
1021                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1022                  transform_id=self.sa.esp_integ) /
1023                  ikev2.IKEv2_payload_Transform(
1024                  transform_type='Extended Sequence Number',
1025                  transform_id='No ESN') /
1026                  ikev2.IKEv2_payload_Transform(
1027                  transform_type='Extended Sequence Number',
1028                  transform_id='ESN'))
1029
1030         c = self.sa.child_sas[0]
1031         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1032                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1033
1034         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1035         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1036                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1037                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1038                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1039                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1040                  auth_type=AuthMethod.value(self.sa.auth_method),
1041                  load=self.sa.auth_data) /
1042                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1043                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1044                  number_of_TSs=len(tsi),
1045                  traffic_selector=tsi) /
1046                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1047                  number_of_TSs=len(tsr),
1048                  traffic_selector=tsr) /
1049                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1050
1051         header = ikev2.IKEv2(
1052                 init_SPI=self.sa.ispi,
1053                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1054                 flags='Response', exch_type='IKE_AUTH')
1055
1056         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1057         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1058                                     self.sa.dport, self.sa.natt, self.ip6)
1059         self.pg_send(self.pg0, packet)
1060
1061     def test_initiator(self):
1062         self.initiate_sa_init()
1063         self.sa.auth_init()
1064         self.sa.calc_child_keys()
1065         self.send_auth_response()
1066         self.verify_ike_sas()
1067
1068
1069 class TemplateResponder(IkePeer):
1070     """ responder test template """
1071
1072     def initiate_del_sa_from_responder(self):
1073         self.pg0.enable_capture()
1074         self.pg_start()
1075         self.vapi.ikev2_initiate_del_ike_sa(
1076                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1077         capture = self.pg0.get_capture(1)
1078         ih = self.get_ike_header(capture[0])
1079         self.assertNotIn('Response', ih.flags)
1080         self.assertNotIn('Initiator', ih.flags)
1081         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1082         plain = self.sa.hmac_and_decrypt(ih)
1083         d = ikev2.IKEv2_payload_Delete(plain)
1084         self.assertEqual(d.proto, 1)  # proto=IKEv2
1085         self.assertEqual(ih.init_SPI, self.sa.ispi)
1086         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1087         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1088                              flags='Initiator+Response',
1089                              exch_type='INFORMATIONAL',
1090                              id=ih.id, next_payload='Encrypted')
1091         resp = self.encrypt_ike_msg(header, b'', None)
1092         self.send_and_assert_no_replies(self.pg0, resp)
1093
1094     def verify_del_sa(self, packet):
1095         ih = self.get_ike_header(packet)
1096         self.assertEqual(ih.id, self.sa.msg_id)
1097         self.assertEqual(ih.exch_type, 37)  # exchange informational
1098         self.assertIn('Response', ih.flags)
1099         self.assertNotIn('Initiator', ih.flags)
1100         self.assertEqual(ih.next_payload, 46)  # Encrypted
1101         self.assertEqual(ih.init_SPI, self.sa.ispi)
1102         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1103         plain = self.sa.hmac_and_decrypt(ih)
1104         self.assertEqual(plain, b'')
1105
1106     def initiate_del_sa_from_initiator(self):
1107         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1108                              flags='Initiator', exch_type='INFORMATIONAL',
1109                              id=self.sa.new_msg_id())
1110         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1111         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1112         packet = self.create_packet(self.pg0, ike_msg,
1113                                     self.sa.sport, self.sa.dport,
1114                                     self.sa.natt, self.ip6)
1115         self.pg0.add_stream(packet)
1116         self.pg0.enable_capture()
1117         self.pg_start()
1118         capture = self.pg0.get_capture(1)
1119         self.verify_del_sa(capture[0])
1120
1121     def send_sa_init_req(self, behind_nat=False):
1122         tr_attr = self.sa.ike_crypto_attr()
1123         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1124                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1125                  key_length=tr_attr[0]) /
1126                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1127                  transform_id=self.sa.ike_integ) /
1128                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1129                  transform_id=self.sa.ike_prf_alg.name) /
1130                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1131                  transform_id=self.sa.ike_dh))
1132
1133         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1134                  trans_nb=4, trans=trans))
1135
1136         self.sa.init_req_packet = (
1137                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1138                             flags='Initiator', exch_type='IKE_SA_INIT') /
1139                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1140                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1141                                        group=self.sa.ike_dh,
1142                                        load=self.sa.my_dh_pub_key) /
1143                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1144                                           load=self.sa.i_nonce))
1145
1146         if behind_nat:
1147             src_address = b'\x0a\x0a\x0a\x01'
1148         else:
1149             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1150
1151         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1152         dst_nat = self.sa.compute_nat_sha1(
1153                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1154                 self.sa.dport)
1155         nat_src_detection = ikev2.IKEv2_payload_Notify(
1156                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1157                 next_payload='Notify')
1158         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1159                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1160         self.sa.init_req_packet = (self.sa.init_req_packet /
1161                                    nat_src_detection /
1162                                    nat_dst_detection)
1163
1164         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1165                                      self.sa.sport, self.sa.dport,
1166                                      self.sa.natt, self.ip6)
1167         self.pg0.add_stream(ike_msg)
1168         self.pg0.enable_capture()
1169         self.pg_start()
1170         capture = self.pg0.get_capture(1)
1171         self.verify_sa_init(capture[0])
1172
1173     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1174         tr_attr = self.sa.esp_crypto_attr()
1175         last_payload = last_payload or 'Notify'
1176         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1177                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1178                  key_length=tr_attr[0]) /
1179                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1180                  transform_id=self.sa.esp_integ) /
1181                  ikev2.IKEv2_payload_Transform(
1182                  transform_type='Extended Sequence Number',
1183                  transform_id='No ESN') /
1184                  ikev2.IKEv2_payload_Transform(
1185                  transform_type='Extended Sequence Number',
1186                  transform_id='ESN'))
1187
1188         c = self.sa.child_sas[0]
1189         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1190                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1191
1192         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1193         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1194                  auth_type=AuthMethod.value(self.sa.auth_method),
1195                  load=self.sa.auth_data) /
1196                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1197                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1198                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1199                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1200                  number_of_TSs=len(tsr), traffic_selector=tsr))
1201
1202         if is_rekey:
1203             first_payload = 'Nonce'
1204             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1205                      next_payload='SA') / plain /
1206                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1207                      proto='ESP', SPI=c.ispi))
1208         else:
1209             first_payload = 'IDi'
1210             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1211                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1212                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1213                    IDtype=self.sa.id_type, load=self.sa.r_id))
1214             plain = ids / plain
1215         return plain, first_payload
1216
1217     def send_sa_auth(self):
1218         plain, first_payload = self.generate_auth_payload(
1219                     last_payload='Notify')
1220         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1221         header = ikev2.IKEv2(
1222                 init_SPI=self.sa.ispi,
1223                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1224                 flags='Initiator', exch_type='IKE_AUTH')
1225
1226         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1227         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1228                                     self.sa.dport, self.sa.natt, self.ip6)
1229         self.pg0.add_stream(packet)
1230         self.pg0.enable_capture()
1231         self.pg_start()
1232         capture = self.pg0.get_capture(1)
1233         self.verify_sa_auth_resp(capture[0])
1234
1235     def verify_sa_init(self, packet):
1236         ih = self.get_ike_header(packet)
1237
1238         self.assertEqual(ih.id, self.sa.msg_id)
1239         self.assertEqual(ih.exch_type, 34)
1240         self.assertIn('Response', ih.flags)
1241         self.assertEqual(ih.init_SPI, self.sa.ispi)
1242         self.assertNotEqual(ih.resp_SPI, 0)
1243         self.sa.rspi = ih.resp_SPI
1244         try:
1245             sa = ih[ikev2.IKEv2_payload_SA]
1246             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1247             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1248         except IndexError as e:
1249             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1250             self.logger.error(ih.show())
1251             raise
1252         self.sa.complete_dh_data()
1253         self.sa.calc_keys()
1254         self.sa.auth_init()
1255
1256     def verify_sa_auth_resp(self, packet):
1257         ike = self.get_ike_header(packet)
1258         udp = packet[UDP]
1259         self.verify_udp(udp)
1260         self.assertEqual(ike.id, self.sa.msg_id)
1261         plain = self.sa.hmac_and_decrypt(ike)
1262         idr = ikev2.IKEv2_payload_IDr(plain)
1263         prop = idr[ikev2.IKEv2_payload_Proposal]
1264         self.assertEqual(prop.SPIsize, 4)
1265         self.sa.child_sas[0].rspi = prop.SPI
1266         self.sa.calc_child_keys()
1267
1268     def test_responder(self):
1269         self.send_sa_init_req(self.sa.natt)
1270         self.send_sa_auth()
1271         self.verify_ipsec_sas()
1272         self.verify_ike_sas()
1273
1274
1275 class Ikev2Params(object):
1276     def config_params(self, params={}):
1277         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1278         ei = VppEnum.vl_api_ipsec_integ_alg_t
1279         self.vpp_enums = {
1280                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1281                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1282                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1283                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1284                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1285                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1286
1287                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1288                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1289                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1290                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1291
1292         dpd_disabled = True if 'dpd_disabled' not in params else\
1293             params['dpd_disabled']
1294         if dpd_disabled:
1295             self.vapi.cli('ikev2 dpd disable')
1296         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1297             not in params else params['del_sa_from_responder']
1298         is_natt = 'natt' in params and params['natt'] or False
1299         self.p = Profile(self, 'pr1')
1300         self.ip6 = False if 'ip6' not in params else params['ip6']
1301
1302         if 'auth' in params and params['auth'] == 'rsa-sig':
1303             auth_method = 'rsa-sig'
1304             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1305             self.vapi.ikev2_set_local_key(
1306                     key_file=work_dir + params['server-key'])
1307
1308             client_file = work_dir + params['client-cert']
1309             server_pem = open(work_dir + params['server-cert']).read()
1310             client_priv = open(work_dir + params['client-key']).read()
1311             client_priv = load_pem_private_key(str.encode(client_priv), None,
1312                                                default_backend())
1313             self.peer_cert = x509.load_pem_x509_certificate(
1314                     str.encode(server_pem),
1315                     default_backend())
1316             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1317             auth_data = None
1318         else:
1319             auth_data = b'$3cr3tpa$$w0rd'
1320             self.p.add_auth(method='shared-key', data=auth_data)
1321             auth_method = 'shared-key'
1322             client_priv = None
1323
1324         is_init = True if 'is_initiator' not in params else\
1325             params['is_initiator']
1326
1327         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1328         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1329         if is_init:
1330             self.p.add_local_id(**idr)
1331             self.p.add_remote_id(**idi)
1332         else:
1333             self.p.add_local_id(**idi)
1334             self.p.add_remote_id(**idr)
1335
1336         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1337             'loc_ts' not in params else params['loc_ts']
1338         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1339             'rem_ts' not in params else params['rem_ts']
1340         self.p.add_local_ts(**loc_ts)
1341         self.p.add_remote_ts(**rem_ts)
1342         if 'responder' in params:
1343             self.p.add_responder(params['responder'])
1344         if 'ike_transforms' in params:
1345             self.p.add_ike_transforms(params['ike_transforms'])
1346         if 'esp_transforms' in params:
1347             self.p.add_esp_transforms(params['esp_transforms'])
1348
1349         udp_encap = False if 'udp_encap' not in params else\
1350             params['udp_encap']
1351         if udp_encap:
1352             self.p.set_udp_encap(True)
1353
1354         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1355                           is_initiator=is_init,
1356                           id_type=self.p.local_id['id_type'], natt=is_natt,
1357                           priv_key=client_priv, auth_method=auth_method,
1358                           auth_data=auth_data, udp_encap=udp_encap,
1359                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1360         if is_init:
1361             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1362                 params['ike-crypto']
1363             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1364                 params['ike-integ']
1365             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1366                 params['ike-dh']
1367
1368             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1369                 params['esp-crypto']
1370             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1371                 params['esp-integ']
1372
1373             self.sa.set_ike_props(
1374                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1375                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1376             self.sa.set_esp_props(
1377                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1378                     integ=esp_integ)
1379
1380
1381 class TestApi(VppTestCase):
1382     """ Test IKEV2 API """
1383     @classmethod
1384     def setUpClass(cls):
1385         super(TestApi, cls).setUpClass()
1386
1387     @classmethod
1388     def tearDownClass(cls):
1389         super(TestApi, cls).tearDownClass()
1390
1391     def tearDown(self):
1392         super(TestApi, self).tearDown()
1393         self.p1.remove_vpp_config()
1394         self.p2.remove_vpp_config()
1395         r = self.vapi.ikev2_profile_dump()
1396         self.assertEqual(len(r), 0)
1397
1398     def configure_profile(self, cfg):
1399         p = Profile(self, cfg['name'])
1400         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1401         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1402         p.add_local_ts(**cfg['loc_ts'])
1403         p.add_remote_ts(**cfg['rem_ts'])
1404         p.add_responder(cfg['responder'])
1405         p.add_ike_transforms(cfg['ike_ts'])
1406         p.add_esp_transforms(cfg['esp_ts'])
1407         p.add_auth(**cfg['auth'])
1408         p.set_udp_encap(cfg['udp_encap'])
1409         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1410         if 'lifetime_data' in cfg:
1411             p.set_lifetime_data(cfg['lifetime_data'])
1412         if 'tun_itf' in cfg:
1413             p.set_tunnel_interface(cfg['tun_itf'])
1414         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1415             p.disable_natt()
1416         p.add_vpp_config()
1417         return p
1418
1419     def test_profile_api(self):
1420         """ test profile dump API """
1421         loc_ts4 = {
1422                     'proto': 8,
1423                     'start_port': 1,
1424                     'end_port': 19,
1425                     'start_addr': '3.3.3.2',
1426                     'end_addr': '3.3.3.3',
1427                 }
1428         rem_ts4 = {
1429                     'proto': 9,
1430                     'start_port': 10,
1431                     'end_port': 119,
1432                     'start_addr': '4.5.76.80',
1433                     'end_addr': '2.3.4.6',
1434                 }
1435
1436         loc_ts6 = {
1437                     'proto': 8,
1438                     'start_port': 1,
1439                     'end_port': 19,
1440                     'start_addr': 'ab::1',
1441                     'end_addr': 'ab::4',
1442                 }
1443         rem_ts6 = {
1444                     'proto': 9,
1445                     'start_port': 10,
1446                     'end_port': 119,
1447                     'start_addr': 'cd::12',
1448                     'end_addr': 'cd::13',
1449                 }
1450
1451         conf = {
1452             'p1': {
1453                 'name': 'p1',
1454                 'natt_disabled': True,
1455                 'loc_id': ('fqdn', b'vpp.home'),
1456                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1457                 'loc_ts': loc_ts4,
1458                 'rem_ts': rem_ts4,
1459                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1460                 'ike_ts': {
1461                         'crypto_alg': 20,
1462                         'crypto_key_size': 32,
1463                         'integ_alg': 1,
1464                         'dh_group': 1},
1465                 'esp_ts': {
1466                         'crypto_alg': 13,
1467                         'crypto_key_size': 24,
1468                         'integ_alg': 2},
1469                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1470                 'udp_encap': True,
1471                 'ipsec_over_udp_port': 4501,
1472                 'lifetime_data': {
1473                     'lifetime': 123,
1474                     'lifetime_maxdata': 20192,
1475                     'lifetime_jitter': 9,
1476                     'handover': 132},
1477             },
1478             'p2': {
1479                 'name': 'p2',
1480                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1481                 'rem_id': ('ip6-addr', b'abcd::1'),
1482                 'loc_ts': loc_ts6,
1483                 'rem_ts': rem_ts6,
1484                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1485                 'ike_ts': {
1486                         'crypto_alg': 12,
1487                         'crypto_key_size': 16,
1488                         'integ_alg': 3,
1489                         'dh_group': 3},
1490                 'esp_ts': {
1491                         'crypto_alg': 9,
1492                         'crypto_key_size': 24,
1493                         'integ_alg': 4},
1494                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1495                 'udp_encap': False,
1496                 'ipsec_over_udp_port': 4600,
1497                 'tun_itf': 0}
1498         }
1499         self.p1 = self.configure_profile(conf['p1'])
1500         self.p2 = self.configure_profile(conf['p2'])
1501
1502         r = self.vapi.ikev2_profile_dump()
1503         self.assertEqual(len(r), 2)
1504         self.verify_profile(r[0].profile, conf['p1'])
1505         self.verify_profile(r[1].profile, conf['p2'])
1506
1507     def verify_id(self, api_id, cfg_id):
1508         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1509         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1510
1511     def verify_ts(self, api_ts, cfg_ts):
1512         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1513         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1514         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1515         self.assertEqual(api_ts.start_addr,
1516                          ip_address(text_type(cfg_ts['start_addr'])))
1517         self.assertEqual(api_ts.end_addr,
1518                          ip_address(text_type(cfg_ts['end_addr'])))
1519
1520     def verify_responder(self, api_r, cfg_r):
1521         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1522         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1523
1524     def verify_transforms(self, api_ts, cfg_ts):
1525         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1526         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1527         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1528
1529     def verify_ike_transforms(self, api_ts, cfg_ts):
1530         self.verify_transforms(api_ts, cfg_ts)
1531         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1532
1533     def verify_esp_transforms(self, api_ts, cfg_ts):
1534         self.verify_transforms(api_ts, cfg_ts)
1535
1536     def verify_auth(self, api_auth, cfg_auth):
1537         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1538         self.assertEqual(api_auth.data, cfg_auth['data'])
1539         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1540
1541     def verify_lifetime_data(self, p, ld):
1542         self.assertEqual(p.lifetime, ld['lifetime'])
1543         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1544         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1545         self.assertEqual(p.handover, ld['handover'])
1546
1547     def verify_profile(self, ap, cp):
1548         self.assertEqual(ap.name, cp['name'])
1549         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1550         self.verify_id(ap.loc_id, cp['loc_id'])
1551         self.verify_id(ap.rem_id, cp['rem_id'])
1552         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1553         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1554         self.verify_responder(ap.responder, cp['responder'])
1555         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1556         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1557         self.verify_auth(ap.auth, cp['auth'])
1558         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1559         self.assertTrue(natt_dis == ap.natt_disabled)
1560
1561         if 'lifetime_data' in cp:
1562             self.verify_lifetime_data(ap, cp['lifetime_data'])
1563         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1564         if 'tun_itf' in cp:
1565             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1566         else:
1567             self.assertEqual(ap.tun_itf, 0xffffffff)
1568
1569
1570 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1571     """ test ikev2 initiator - pre shared key auth """
1572
1573     def config_tc(self):
1574         self.config_params({
1575             'is_initiator': False,  # seen from test case perspective
1576                                     # thus vpp is initiator
1577             'responder': {'sw_if_index': self.pg0.sw_if_index,
1578                            'addr': self.pg0.remote_ip4},
1579             'ike-crypto': ('AES-GCM-16ICV', 32),
1580             'ike-integ': 'NULL',
1581             'ike-dh': '3072MODPgr',
1582             'ike_transforms': {
1583                 'crypto_alg': 20,  # "aes-gcm-16"
1584                 'crypto_key_size': 256,
1585                 'dh_group': 15,  # "modp-3072"
1586             },
1587             'esp_transforms': {
1588                 'crypto_alg': 12,  # "aes-cbc"
1589                 'crypto_key_size': 256,
1590                 # "hmac-sha2-256-128"
1591                 'integ_alg': 12}})
1592
1593
1594 class TestInitiatorRekey(TestInitiatorPsk):
1595     """ test ikev2 initiator - rekey """
1596
1597     def rekey_from_initiator(self):
1598         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1599         self.pg0.enable_capture()
1600         self.pg_start()
1601         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1602         capture = self.pg0.get_capture(1)
1603         ih = self.get_ike_header(capture[0])
1604         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1605         self.assertNotIn('Response', ih.flags)
1606         self.assertIn('Initiator', ih.flags)
1607         plain = self.sa.hmac_and_decrypt(ih)
1608         sa = ikev2.IKEv2_payload_SA(plain)
1609         prop = sa[ikev2.IKEv2_payload_Proposal]
1610         nonce = sa[ikev2.IKEv2_payload_Nonce]
1611         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1612         self.sa.r_nonce = self.sa.i_nonce
1613         # update new responder SPI
1614         self.sa.child_sas[0].ispi = prop.SPI
1615         self.sa.child_sas[0].rspi = prop.SPI
1616         self.sa.calc_child_keys()
1617         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1618                              flags='Response', exch_type=36,
1619                              id=ih.id, next_payload='Encrypted')
1620         resp = self.encrypt_ike_msg(header, sa, 'SA')
1621         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1622                                     self.sa.dport, self.sa.natt, self.ip6)
1623         self.send_and_assert_no_replies(self.pg0, packet)
1624
1625     def test_initiator(self):
1626         super(TestInitiatorRekey, self).test_initiator()
1627         self.rekey_from_initiator()
1628         self.verify_ike_sas()
1629         self.verify_ipsec_sas(is_rekey=True)
1630
1631
1632 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1633     """ test ikev2 initiator - delete IKE SA from responder """
1634
1635     def config_tc(self):
1636         self.config_params({
1637             'del_sa_from_responder': True,
1638             'is_initiator': False,  # seen from test case perspective
1639                                     # thus vpp is initiator
1640             'responder': {'sw_if_index': self.pg0.sw_if_index,
1641                            'addr': self.pg0.remote_ip4},
1642             'ike-crypto': ('AES-GCM-16ICV', 32),
1643             'ike-integ': 'NULL',
1644             'ike-dh': '3072MODPgr',
1645             'ike_transforms': {
1646                 'crypto_alg': 20,  # "aes-gcm-16"
1647                 'crypto_key_size': 256,
1648                 'dh_group': 15,  # "modp-3072"
1649             },
1650             'esp_transforms': {
1651                 'crypto_alg': 12,  # "aes-cbc"
1652                 'crypto_key_size': 256,
1653                 # "hmac-sha2-256-128"
1654                 'integ_alg': 12}})
1655
1656
1657 class TestResponderNATT(TemplateResponder, Ikev2Params):
1658     """ test ikev2 responder - nat traversal """
1659     def config_tc(self):
1660         self.config_params(
1661                 {'natt': True})
1662
1663
1664 class TestResponderPsk(TemplateResponder, Ikev2Params):
1665     """ test ikev2 responder - pre shared key auth """
1666     def config_tc(self):
1667         self.config_params()
1668
1669
1670 class TestResponderDpd(TestResponderPsk):
1671     """
1672     Dead peer detection test
1673     """
1674     def config_tc(self):
1675         self.config_params({'dpd_disabled': False})
1676
1677     def tearDown(self):
1678         pass
1679
1680     def test_responder(self):
1681         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1682         super(TestResponderDpd, self).test_responder()
1683         self.pg0.enable_capture()
1684         self.pg_start()
1685         # capture empty request but don't reply
1686         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1687         ih = self.get_ike_header(capture[0])
1688         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1689         plain = self.sa.hmac_and_decrypt(ih)
1690         self.assertEqual(plain, b'')
1691         # wait for SA expiration
1692         time.sleep(3)
1693         ike_sas = self.vapi.ikev2_sa_dump()
1694         self.assertEqual(len(ike_sas), 0)
1695         ipsec_sas = self.vapi.ipsec_sa_dump()
1696         self.assertEqual(len(ipsec_sas), 0)
1697
1698
1699 class TestResponderRekey(TestResponderPsk):
1700     """ test ikev2 responder - rekey """
1701
1702     def rekey_from_initiator(self):
1703         sa, first_payload = self.generate_auth_payload(is_rekey=True)
1704         header = ikev2.IKEv2(
1705                 init_SPI=self.sa.ispi,
1706                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1707                 flags='Initiator', exch_type='CREATE_CHILD_SA')
1708
1709         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
1710         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1711                                     self.sa.dport, self.sa.natt, self.ip6)
1712         self.pg0.add_stream(packet)
1713         self.pg0.enable_capture()
1714         self.pg_start()
1715         capture = self.pg0.get_capture(1)
1716         ih = self.get_ike_header(capture[0])
1717         plain = self.sa.hmac_and_decrypt(ih)
1718         sa = ikev2.IKEv2_payload_SA(plain)
1719         prop = sa[ikev2.IKEv2_payload_Proposal]
1720         nonce = sa[ikev2.IKEv2_payload_Nonce]
1721         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1722         # update new responder SPI
1723         self.sa.child_sas[0].rspi = prop.SPI
1724
1725     def test_responder(self):
1726         super(TestResponderRekey, self).test_responder()
1727         self.rekey_from_initiator()
1728         self.sa.calc_child_keys()
1729         self.verify_ike_sas()
1730         self.verify_ipsec_sas(is_rekey=True)
1731
1732
1733 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1734     """ test ikev2 responder - cert based auth """
1735     def config_tc(self):
1736         self.config_params({
1737             'udp_encap': True,
1738             'auth': 'rsa-sig',
1739             'server-key': 'server-key.pem',
1740             'client-key': 'client-key.pem',
1741             'client-cert': 'client-cert.pem',
1742             'server-cert': 'server-cert.pem'})
1743
1744
1745 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1746         (TemplateResponder, Ikev2Params):
1747     """
1748     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1749     """
1750     def config_tc(self):
1751         self.config_params({
1752             'ike-crypto': ('AES-CBC', 16),
1753             'ike-integ': 'SHA2-256-128',
1754             'esp-crypto': ('AES-CBC', 24),
1755             'esp-integ': 'SHA2-384-192',
1756             'ike-dh': '2048MODPgr'})
1757
1758
1759 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1760         (TemplateResponder, Ikev2Params):
1761     """
1762     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1763     """
1764     def config_tc(self):
1765         self.config_params({
1766             'ike-crypto': ('AES-CBC', 32),
1767             'ike-integ': 'SHA2-256-128',
1768             'esp-crypto': ('AES-GCM-16ICV', 32),
1769             'esp-integ': 'NULL',
1770             'ike-dh': '3072MODPgr'})
1771
1772
1773 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1774     """
1775     IKE:AES_GCM_16_256
1776     """
1777     def config_tc(self):
1778         self.config_params({
1779             'del_sa_from_responder': True,
1780             'ip6': True,
1781             'natt': True,
1782             'ike-crypto': ('AES-GCM-16ICV', 32),
1783             'ike-integ': 'NULL',
1784             'ike-dh': '2048MODPgr',
1785             'loc_ts': {'start_addr': 'ab:cd::0',
1786                        'end_addr': 'ab:cd::10'},
1787             'rem_ts': {'start_addr': '11::0',
1788                        'end_addr': '11::100'}})
1789
1790
1791 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1792     """
1793     Test for keep alive messages
1794     """
1795
1796     def send_empty_req_from_responder(self):
1797         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1798                              id=self.sa.new_msg_id(), flags='Initiator',
1799                              exch_type='INFORMATIONAL',
1800                              next_payload='Encrypted')
1801
1802         msg = self.encrypt_ike_msg(header, b'', None)
1803         packet = self.create_packet(self.pg0, msg, self.sa.sport,
1804                                     self.sa.dport, self.sa.natt, self.ip6)
1805         self.pg0.add_stream(packet)
1806         self.pg0.enable_capture()
1807         self.pg_start()
1808         capture = self.pg0.get_capture(1)
1809         ih = self.get_ike_header(capture[0])
1810         self.assertEqual(ih.id, self.sa.msg_id)
1811         plain = self.sa.hmac_and_decrypt(ih)
1812         self.assertEqual(plain, b'')
1813
1814     def test_initiator(self):
1815         super(TestInitiatorKeepaliveMsg, self).test_initiator()
1816         self.send_empty_req_from_responder()
1817
1818
1819 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1820     """ malformed packet test """
1821
1822     def tearDown(self):
1823         pass
1824
1825     def config_tc(self):
1826         self.config_params()
1827
1828     def assert_counter(self, count, name, version='ip4'):
1829         node_name = '/err/ikev2-%s/' % version + name
1830         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1831
1832     def create_ike_init_msg(self, length=None, payload=None):
1833         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1834                           flags='Initiator', exch_type='IKE_SA_INIT')
1835         if payload is not None:
1836             msg /= payload
1837         return self.create_packet(self.pg0, msg, self.sa.sport,
1838                                   self.sa.dport)
1839
1840     def verify_bad_packet_length(self):
1841         ike_msg = self.create_ike_init_msg(length=0xdead)
1842         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1843         self.assert_counter(self.pkt_count, 'Bad packet length')
1844
1845     def verify_bad_sa_payload_length(self):
1846         p = ikev2.IKEv2_payload_SA(length=0xdead)
1847         ike_msg = self.create_ike_init_msg(payload=p)
1848         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1849         self.assert_counter(self.pkt_count, 'Malformed packet')
1850
1851     def test_responder(self):
1852         self.pkt_count = 254
1853         self.verify_bad_packet_length()
1854         self.verify_bad_sa_payload_length()
1855
1856
1857 if __name__ == '__main__':
1858     unittest.main(testRunner=VppTestRunner)