ikev2: do not require optional IDr on IKE AUTH
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import tag_fixme_vpp_workers
22 from framework import VppTestCase, VppTestRunner
23 from vpp_ikev2 import Profile, IDType, AuthMethod
24 from vpp_papi import VppEnum
25
26 try:
27     text_type = unicode
28 except NameError:
29     text_type = str
30
31 KEY_PAD = b"Key Pad for IKEv2"
32 SALT_SIZE = 4
33 GCM_ICV_SIZE = 16
34 GCM_IV_SIZE = 8
35
36
37 # defined in rfc3526
38 # tuple structure is (p, g, key_len)
39 DH = {
40     '2048MODPgr': (long_converter("""
41     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
51     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52
53     '3072MODPgr': (long_converter("""
54     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
55     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
56     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
57     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
58     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
59     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
60     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
61     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
62     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
63     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
64     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
65     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
66     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
67     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
68     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
69     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
70 }
71
72
73 class CryptoAlgo(object):
74     def __init__(self, name, cipher, mode):
75         self.name = name
76         self.cipher = cipher
77         self.mode = mode
78         if self.cipher is not None:
79             self.bs = self.cipher.block_size // 8
80
81             if self.name == 'AES-GCM-16ICV':
82                 self.iv_len = GCM_IV_SIZE
83             else:
84                 self.iv_len = self.bs
85
86     def encrypt(self, data, key, aad=None):
87         iv = os.urandom(self.iv_len)
88         if aad is None:
89             encryptor = Cipher(self.cipher(key), self.mode(iv),
90                                default_backend()).encryptor()
91             return iv + encryptor.update(data) + encryptor.finalize()
92         else:
93             salt = key[-SALT_SIZE:]
94             nonce = salt + iv
95             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
96                                default_backend()).encryptor()
97             encryptor.authenticate_additional_data(aad)
98             data = encryptor.update(data) + encryptor.finalize()
99             data += encryptor.tag[:GCM_ICV_SIZE]
100             return iv + data
101
102     def decrypt(self, data, key, aad=None, icv=None):
103         if aad is None:
104             iv = data[:self.iv_len]
105             ct = data[self.iv_len:]
106             decryptor = Cipher(algorithms.AES(key),
107                                self.mode(iv),
108                                default_backend()).decryptor()
109             return decryptor.update(ct) + decryptor.finalize()
110         else:
111             salt = key[-SALT_SIZE:]
112             nonce = salt + data[:GCM_IV_SIZE]
113             ct = data[GCM_IV_SIZE:]
114             key = key[:-SALT_SIZE]
115             decryptor = Cipher(algorithms.AES(key),
116                                self.mode(nonce, icv, len(icv)),
117                                default_backend()).decryptor()
118             decryptor.authenticate_additional_data(aad)
119             return decryptor.update(ct) + decryptor.finalize()
120
121     def pad(self, data):
122         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
123         data = data + b'\x00' * (pad_len - 1)
124         return data + bytes([pad_len - 1])
125
126
127 class AuthAlgo(object):
128     def __init__(self, name, mac, mod, key_len, trunc_len=None):
129         self.name = name
130         self.mac = mac
131         self.mod = mod
132         self.key_len = key_len
133         self.trunc_len = trunc_len or key_len
134
135
136 CRYPTO_ALGOS = {
137     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
138     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
139     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
140                                 mode=modes.GCM),
141 }
142
143 AUTH_ALGOS = {
144     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
145     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
146     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
147     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
148     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
149 }
150
151 PRF_ALGOS = {
152     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
153     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
154                                   hashes.SHA256, 32),
155 }
156
157 CRYPTO_IDS = {
158     12: 'AES-CBC',
159     20: 'AES-GCM-16ICV',
160 }
161
162 INTEG_IDS = {
163     2: 'HMAC-SHA1-96',
164     12: 'SHA2-256-128',
165     13: 'SHA2-384-192',
166     14: 'SHA2-512-256',
167 }
168
169
170 class IKEv2ChildSA(object):
171     def __init__(self, local_ts, remote_ts, is_initiator):
172         spi = os.urandom(4)
173         if is_initiator:
174             self.ispi = spi
175             self.rspi = None
176         else:
177             self.rspi = spi
178             self.ispi = None
179         self.local_ts = local_ts
180         self.remote_ts = remote_ts
181
182
183 class IKEv2SA(object):
184     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
185                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
186                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
187                  auth_method='shared-key', priv_key=None, i_natt=False,
188                  r_natt=False, udp_encap=False):
189         self.udp_encap = udp_encap
190         self.i_natt = i_natt
191         self.r_natt = r_natt
192         if i_natt or r_natt:
193             self.sport = 4500
194             self.dport = 4500
195         else:
196             self.sport = 500
197             self.dport = 500
198         self.msg_id = 0
199         self.dh_params = None
200         self.test = test
201         self.priv_key = priv_key
202         self.is_initiator = is_initiator
203         nonce = nonce or os.urandom(32)
204         self.auth_data = auth_data
205         self.i_id = i_id
206         self.r_id = r_id
207         if isinstance(id_type, str):
208             self.id_type = IDType.value(id_type)
209         else:
210             self.id_type = id_type
211         self.auth_method = auth_method
212         if self.is_initiator:
213             self.rspi = 8 * b'\x00'
214             self.ispi = spi
215             self.i_nonce = nonce
216         else:
217             self.rspi = spi
218             self.ispi = 8 * b'\x00'
219             self.r_nonce = nonce
220         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
221                           self.is_initiator)]
222
223     def new_msg_id(self):
224         self.msg_id += 1
225         return self.msg_id
226
227     @property
228     def my_dh_pub_key(self):
229         if self.is_initiator:
230             return self.i_dh_data
231         return self.r_dh_data
232
233     @property
234     def peer_dh_pub_key(self):
235         if self.is_initiator:
236             return self.r_dh_data
237         return self.i_dh_data
238
239     @property
240     def natt(self):
241         return self.i_natt or self.r_natt
242
243     def compute_secret(self):
244         priv = self.dh_private_key
245         peer = self.peer_dh_pub_key
246         p, g, l = self.ike_group
247         return pow(int.from_bytes(peer, 'big'),
248                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249
250     def generate_dh_data(self):
251         # generate DH keys
252         if self.ike_dh not in DH:
253             raise NotImplementedError('%s not in DH group' % self.ike_dh)
254
255         if self.dh_params is None:
256             dhg = DH[self.ike_dh]
257             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
258             self.dh_params = pn.parameters(default_backend())
259
260         priv = self.dh_params.generate_private_key()
261         pub = priv.public_key()
262         x = priv.private_numbers().x
263         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
264         y = pub.public_numbers().y
265
266         if self.is_initiator:
267             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
268         else:
269             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
270
271     def complete_dh_data(self):
272         self.dh_shared_secret = self.compute_secret()
273
274     def calc_child_keys(self):
275         prf = self.ike_prf_alg.mod()
276         s = self.i_nonce + self.r_nonce
277         c = self.child_sas[0]
278
279         encr_key_len = self.esp_crypto_key_len
280         integ_key_len = self.esp_integ_alg.key_len
281         salt_len = 0 if integ_key_len else 4
282
283         l = (integ_key_len * 2 +
284              encr_key_len * 2 +
285              salt_len * 2)
286         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
287
288         pos = 0
289         c.sk_ei = keymat[pos:pos+encr_key_len]
290         pos += encr_key_len
291
292         if integ_key_len:
293             c.sk_ai = keymat[pos:pos+integ_key_len]
294             pos += integ_key_len
295         else:
296             c.salt_ei = keymat[pos:pos+salt_len]
297             pos += salt_len
298
299         c.sk_er = keymat[pos:pos+encr_key_len]
300         pos += encr_key_len
301
302         if integ_key_len:
303             c.sk_ar = keymat[pos:pos+integ_key_len]
304             pos += integ_key_len
305         else:
306             c.salt_er = keymat[pos:pos+salt_len]
307             pos += salt_len
308
309     def calc_prfplus(self, prf, key, seed, length):
310         r = b''
311         t = None
312         x = 1
313         while len(r) < length and x < 255:
314             if t is not None:
315                 s = t
316             else:
317                 s = b''
318             s = s + seed + bytes([x])
319             t = self.calc_prf(prf, key, s)
320             r = r + t
321             x = x + 1
322
323         if x == 255:
324             return None
325         return r
326
327     def calc_prf(self, prf, key, data):
328         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
329         h.update(data)
330         return h.finalize()
331
332     def calc_keys(self):
333         prf = self.ike_prf_alg.mod()
334         # SKEYSEED = prf(Ni | Nr, g^ir)
335         s = self.i_nonce + self.r_nonce
336         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337
338         # calculate S = Ni | Nr | SPIi SPIr
339         s = s + self.ispi + self.rspi
340
341         prf_key_trunc = self.ike_prf_alg.trunc_len
342         encr_key_len = self.ike_crypto_key_len
343         tr_prf_key_len = self.ike_prf_alg.key_len
344         integ_key_len = self.ike_integ_alg.key_len
345         if integ_key_len == 0:
346             salt_size = 4
347         else:
348             salt_size = 0
349
350         l = (prf_key_trunc +
351              integ_key_len * 2 +
352              encr_key_len * 2 +
353              tr_prf_key_len * 2 +
354              salt_size * 2)
355         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
356
357         pos = 0
358         self.sk_d = keymat[:pos+prf_key_trunc]
359         pos += prf_key_trunc
360
361         self.sk_ai = keymat[pos:pos+integ_key_len]
362         pos += integ_key_len
363         self.sk_ar = keymat[pos:pos+integ_key_len]
364         pos += integ_key_len
365
366         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
367         pos += encr_key_len + salt_size
368         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
369         pos += encr_key_len + salt_size
370
371         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
372         pos += tr_prf_key_len
373         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374
375     def generate_authmsg(self, prf, packet):
376         if self.is_initiator:
377             id = self.i_id
378             nonce = self.r_nonce
379             key = self.sk_pi
380         else:
381             id = self.r_id
382             nonce = self.i_nonce
383             key = self.sk_pr
384         data = bytes([self.id_type, 0, 0, 0]) + id
385         id_hash = self.calc_prf(prf, key, data)
386         return packet + nonce + id_hash
387
388     def auth_init(self):
389         prf = self.ike_prf_alg.mod()
390         if self.is_initiator:
391             packet = self.init_req_packet
392         else:
393             packet = self.init_resp_packet
394         authmsg = self.generate_authmsg(prf, raw(packet))
395         if self.auth_method == 'shared-key':
396             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
397             self.auth_data = self.calc_prf(prf, psk, authmsg)
398         elif self.auth_method == 'rsa-sig':
399             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
400                                                 hashes.SHA1())
401         else:
402             raise TypeError('unknown auth method type!')
403
404     def encrypt(self, data, aad=None):
405         data = self.ike_crypto_alg.pad(data)
406         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
407
408     @property
409     def peer_authkey(self):
410         if self.is_initiator:
411             return self.sk_ar
412         return self.sk_ai
413
414     @property
415     def my_authkey(self):
416         if self.is_initiator:
417             return self.sk_ai
418         return self.sk_ar
419
420     @property
421     def my_cryptokey(self):
422         if self.is_initiator:
423             return self.sk_ei
424         return self.sk_er
425
426     @property
427     def peer_cryptokey(self):
428         if self.is_initiator:
429             return self.sk_er
430         return self.sk_ei
431
432     def concat(self, alg, key_len):
433         return alg + '-' + str(key_len * 8)
434
435     @property
436     def vpp_ike_cypto_alg(self):
437         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
438
439     @property
440     def vpp_esp_cypto_alg(self):
441         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442
443     def verify_hmac(self, ikemsg):
444         integ_trunc = self.ike_integ_alg.trunc_len
445         exp_hmac = ikemsg[-integ_trunc:]
446         data = ikemsg[:-integ_trunc]
447         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
448                                           self.peer_authkey, data)
449         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450
451     def compute_hmac(self, integ, key, data):
452         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
453         h.update(data)
454         return h.finalize()
455
456     def decrypt(self, data, aad=None, icv=None):
457         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
458
459     def hmac_and_decrypt(self, ike):
460         ep = ike[ikev2.IKEv2_payload_Encrypted]
461         if self.ike_crypto == 'AES-GCM-16ICV':
462             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
463             ct = ep.load[:-GCM_ICV_SIZE]
464             tag = ep.load[-GCM_ICV_SIZE:]
465             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
466         else:
467             self.verify_hmac(raw(ike))
468             integ_trunc = self.ike_integ_alg.trunc_len
469
470             # remove ICV and decrypt payload
471             ct = ep.load[:-integ_trunc]
472             plain = self.decrypt(ct)
473         # remove padding
474         pad_len = plain[-1]
475         return plain[:-pad_len - 1]
476
477     def build_ts_addr(self, ts, version):
478         return {'starting_address_v' + version: ts['start_addr'],
479                 'ending_address_v' + version: ts['end_addr']}
480
481     def generate_ts(self, is_ip4):
482         c = self.child_sas[0]
483         ts_data = {'IP_protocol_ID': 0,
484                    'start_port': 0,
485                    'end_port': 0xffff}
486         if is_ip4:
487             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
488             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
489             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
490             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491         else:
492             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
493             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
494             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
495             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
496
497         if self.is_initiator:
498             return ([ts1], [ts2])
499         return ([ts2], [ts1])
500
501     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
502         if crypto not in CRYPTO_ALGOS:
503             raise TypeError('unsupported encryption algo %r' % crypto)
504         self.ike_crypto = crypto
505         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
506         self.ike_crypto_key_len = crypto_key_len
507
508         if integ not in AUTH_ALGOS:
509             raise TypeError('unsupported auth algo %r' % integ)
510         self.ike_integ = None if integ == 'NULL' else integ
511         self.ike_integ_alg = AUTH_ALGOS[integ]
512
513         if prf not in PRF_ALGOS:
514             raise TypeError('unsupported prf algo %r' % prf)
515         self.ike_prf = prf
516         self.ike_prf_alg = PRF_ALGOS[prf]
517         self.ike_dh = dh
518         self.ike_group = DH[self.ike_dh]
519
520     def set_esp_props(self, crypto, crypto_key_len, integ):
521         self.esp_crypto_key_len = crypto_key_len
522         if crypto not in CRYPTO_ALGOS:
523             raise TypeError('unsupported encryption algo %r' % crypto)
524         self.esp_crypto = crypto
525         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526
527         if integ not in AUTH_ALGOS:
528             raise TypeError('unsupported auth algo %r' % integ)
529         self.esp_integ = None if integ == 'NULL' else integ
530         self.esp_integ_alg = AUTH_ALGOS[integ]
531
532     def crypto_attr(self, key_len):
533         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
534             return (0x800e << 16 | key_len << 3, 12)
535         else:
536             raise Exception('unsupported attribute type')
537
538     def ike_crypto_attr(self):
539         return self.crypto_attr(self.ike_crypto_key_len)
540
541     def esp_crypto_attr(self):
542         return self.crypto_attr(self.esp_crypto_key_len)
543
544     def compute_nat_sha1(self, ip, port, rspi=None):
545         if rspi is None:
546             rspi = self.rspi
547         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
548         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549         digest.update(data)
550         return digest.finalize()
551
552
553 class IkePeer(VppTestCase):
554     """ common class for initiator and responder """
555
556     @classmethod
557     def setUpClass(cls):
558         import scapy.contrib.ikev2 as _ikev2
559         globals()['ikev2'] = _ikev2
560         super(IkePeer, cls).setUpClass()
561         cls.create_pg_interfaces(range(2))
562         for i in cls.pg_interfaces:
563             i.admin_up()
564             i.config_ip4()
565             i.resolve_arp()
566             i.config_ip6()
567             i.resolve_ndp()
568
569     @classmethod
570     def tearDownClass(cls):
571         super(IkePeer, cls).tearDownClass()
572
573     def tearDown(self):
574         super(IkePeer, self).tearDown()
575         if self.del_sa_from_responder:
576             self.initiate_del_sa_from_responder()
577         else:
578             self.initiate_del_sa_from_initiator()
579         r = self.vapi.ikev2_sa_dump()
580         self.assertEqual(len(r), 0)
581         sas = self.vapi.ipsec_sa_dump()
582         self.assertEqual(len(sas), 0)
583         self.p.remove_vpp_config()
584         self.assertIsNone(self.p.query_vpp_config())
585
586     def setUp(self):
587         super(IkePeer, self).setUp()
588         self.config_tc()
589         self.p.add_vpp_config()
590         self.assertIsNotNone(self.p.query_vpp_config())
591         if self.sa.is_initiator:
592             self.sa.generate_dh_data()
593         self.vapi.cli('ikev2 set logging level 4')
594         self.vapi.cli('event-lo clear')
595
596     def assert_counter(self, count, name, version='ip4'):
597         node_name = '/err/ikev2-%s/' % version + name
598         self.assertEqual(count, self.statistics.get_err_counter(node_name))
599
600     def create_rekey_request(self):
601         sa, first_payload = self.generate_auth_payload(is_rekey=True)
602         header = ikev2.IKEv2(
603                 init_SPI=self.sa.ispi,
604                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
605                 flags='Initiator', exch_type='CREATE_CHILD_SA')
606
607         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
608         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
609                                   self.sa.dport, self.sa.natt, self.ip6)
610
611     def create_empty_request(self):
612         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
613                              id=self.sa.new_msg_id(), flags='Initiator',
614                              exch_type='INFORMATIONAL',
615                              next_payload='Encrypted')
616
617         msg = self.encrypt_ike_msg(header, b'', None)
618         return self.create_packet(self.pg0, msg, self.sa.sport,
619                                   self.sa.dport, self.sa.natt, self.ip6)
620
621     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
622                       use_ip6=False):
623         if use_ip6:
624             src_ip = src_if.remote_ip6
625             dst_ip = src_if.local_ip6
626             ip_layer = IPv6
627         else:
628             src_ip = src_if.remote_ip4
629             dst_ip = src_if.local_ip4
630             ip_layer = IP
631         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
632                ip_layer(src=src_ip, dst=dst_ip) /
633                UDP(sport=sport, dport=dport))
634         if natt:
635             # insert non ESP marker
636             res = res / Raw(b'\x00' * 4)
637         return res / msg
638
639     def verify_udp(self, udp):
640         self.assertEqual(udp.sport, self.sa.sport)
641         self.assertEqual(udp.dport, self.sa.dport)
642
643     def get_ike_header(self, packet):
644         try:
645             ih = packet[ikev2.IKEv2]
646             ih = self.verify_and_remove_non_esp_marker(ih)
647         except IndexError as e:
648             # this is a workaround for getting IKEv2 layer as both ikev2 and
649             # ipsec register for port 4500
650             esp = packet[ESP]
651             ih = self.verify_and_remove_non_esp_marker(esp)
652         self.assertEqual(ih.version, 0x20)
653         self.assertNotIn('Version', ih.flags)
654         return ih
655
656     def verify_and_remove_non_esp_marker(self, packet):
657         if self.sa.natt:
658             # if we are in nat traversal mode check for non esp marker
659             # and remove it
660             data = raw(packet)
661             self.assertEqual(data[:4], b'\x00' * 4)
662             return ikev2.IKEv2(data[4:])
663         else:
664             return packet
665
666     def encrypt_ike_msg(self, header, plain, first_payload):
667         if self.sa.ike_crypto == 'AES-GCM-16ICV':
668             data = self.sa.ike_crypto_alg.pad(raw(plain))
669             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
670                 len(ikev2.IKEv2_payload_Encrypted())
671             tlen = plen + len(ikev2.IKEv2())
672
673             # prepare aad data
674             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
675                                                  length=plen)
676             header.length = tlen
677             res = header / sk_p
678             encr = self.sa.encrypt(raw(plain), raw(res))
679             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
680                                                  length=plen, load=encr)
681             res = header / sk_p
682         else:
683             encr = self.sa.encrypt(raw(plain))
684             trunc_len = self.sa.ike_integ_alg.trunc_len
685             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
686             tlen = plen + len(ikev2.IKEv2())
687
688             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
689                                                  length=plen, load=encr)
690             header.length = tlen
691             res = header / sk_p
692
693             integ_data = raw(res)
694             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
695                                              self.sa.my_authkey, integ_data)
696             res = res / Raw(hmac_data[:trunc_len])
697         assert(len(res) == tlen)
698         return res
699
700     def verify_udp_encap(self, ipsec_sa):
701         e = VppEnum.vl_api_ipsec_sad_flags_t
702         if self.sa.udp_encap or self.sa.natt:
703             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
704         else:
705             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706
707     def verify_ipsec_sas(self, is_rekey=False):
708         sas = self.vapi.ipsec_sa_dump()
709         if is_rekey:
710             # after rekey there is a short period of time in which old
711             # inbound SA is still present
712             sa_count = 3
713         else:
714             sa_count = 2
715         self.assertEqual(len(sas), sa_count)
716         if self.sa.is_initiator:
717             if is_rekey:
718                 sa0 = sas[0].entry
719                 sa1 = sas[2].entry
720             else:
721                 sa0 = sas[0].entry
722                 sa1 = sas[1].entry
723         else:
724             if is_rekey:
725                 sa0 = sas[2].entry
726                 sa1 = sas[0].entry
727             else:
728                 sa1 = sas[0].entry
729                 sa0 = sas[1].entry
730
731         c = self.sa.child_sas[0]
732
733         self.verify_udp_encap(sa0)
734         self.verify_udp_encap(sa1)
735         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
736         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
737         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
738
739         if self.sa.esp_integ is None:
740             vpp_integ_alg = 0
741         else:
742             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
743         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
744         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
745
746         # verify crypto keys
747         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
748         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
749         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
750         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
751
752         # verify integ keys
753         if vpp_integ_alg:
754             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
755             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
756             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
757             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
758         else:
759             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
760             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
761
762     def verify_keymat(self, api_keys, keys, name):
763         km = getattr(keys, name)
764         api_km = getattr(api_keys, name)
765         api_km_len = getattr(api_keys, name + '_len')
766         self.assertEqual(len(km), api_km_len)
767         self.assertEqual(km, api_km[:api_km_len])
768
769     def verify_id(self, api_id, exp_id):
770         self.assertEqual(api_id.type, IDType.value(exp_id.type))
771         self.assertEqual(api_id.data_len, exp_id.data_len)
772         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
773
774     def verify_ike_sas(self):
775         r = self.vapi.ikev2_sa_dump()
776         self.assertEqual(len(r), 1)
777         sa = r[0].sa
778         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
779         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
780         if self.ip6:
781             if self.sa.is_initiator:
782                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
783                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
784             else:
785                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
786                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
787         else:
788             if self.sa.is_initiator:
789                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
790                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
791             else:
792                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
793                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
794         self.verify_keymat(sa.keys, self.sa, 'sk_d')
795         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
796         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
797         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
798         self.verify_keymat(sa.keys, self.sa, 'sk_er')
799         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
800         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
801
802         self.assertEqual(sa.i_id.type, self.sa.id_type)
803         self.assertEqual(sa.r_id.type, self.sa.id_type)
804         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
805         self.assertEqual(sa.r_id.data_len, len(self.idr))
806         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
807         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr)
808
809         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
810         self.assertEqual(len(r), 1)
811         csa = r[0].child_sa
812         self.assertEqual(csa.sa_index, sa.sa_index)
813         c = self.sa.child_sas[0]
814         if hasattr(c, 'sk_ai'):
815             self.verify_keymat(csa.keys, c, 'sk_ai')
816             self.verify_keymat(csa.keys, c, 'sk_ar')
817         self.verify_keymat(csa.keys, c, 'sk_ei')
818         self.verify_keymat(csa.keys, c, 'sk_er')
819         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
820         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
821
822         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
823         tsi = tsi[0]
824         tsr = tsr[0]
825         r = self.vapi.ikev2_traffic_selector_dump(
826                 is_initiator=True, sa_index=sa.sa_index,
827                 child_sa_index=csa.child_sa_index)
828         self.assertEqual(len(r), 1)
829         ts = r[0].ts
830         self.verify_ts(r[0].ts, tsi[0], True)
831
832         r = self.vapi.ikev2_traffic_selector_dump(
833                 is_initiator=False, sa_index=sa.sa_index,
834                 child_sa_index=csa.child_sa_index)
835         self.assertEqual(len(r), 1)
836         self.verify_ts(r[0].ts, tsr[0], False)
837
838         n = self.vapi.ikev2_nonce_get(is_initiator=True,
839                                       sa_index=sa.sa_index)
840         self.verify_nonce(n, self.sa.i_nonce)
841         n = self.vapi.ikev2_nonce_get(is_initiator=False,
842                                       sa_index=sa.sa_index)
843         self.verify_nonce(n, self.sa.r_nonce)
844
845     def verify_nonce(self, api_nonce, nonce):
846         self.assertEqual(api_nonce.data_len, len(nonce))
847         self.assertEqual(api_nonce.nonce, nonce)
848
849     def verify_ts(self, api_ts, ts, is_initiator):
850         if is_initiator:
851             self.assertTrue(api_ts.is_local)
852         else:
853             self.assertFalse(api_ts.is_local)
854
855         if self.p.ts_is_ip4:
856             self.assertEqual(api_ts.start_addr,
857                              IPv4Address(ts.starting_address_v4))
858             self.assertEqual(api_ts.end_addr,
859                              IPv4Address(ts.ending_address_v4))
860         else:
861             self.assertEqual(api_ts.start_addr,
862                              IPv6Address(ts.starting_address_v6))
863             self.assertEqual(api_ts.end_addr,
864                              IPv6Address(ts.ending_address_v6))
865         self.assertEqual(api_ts.start_port, ts.start_port)
866         self.assertEqual(api_ts.end_port, ts.end_port)
867         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
868
869
870 class TemplateInitiator(IkePeer):
871     """ initiator test template """
872
873     def initiate_del_sa_from_initiator(self):
874         ispi = int.from_bytes(self.sa.ispi, 'little')
875         self.pg0.enable_capture()
876         self.pg_start()
877         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
878         capture = self.pg0.get_capture(1)
879         ih = self.get_ike_header(capture[0])
880         self.assertNotIn('Response', ih.flags)
881         self.assertIn('Initiator', ih.flags)
882         self.assertEqual(ih.init_SPI, self.sa.ispi)
883         self.assertEqual(ih.resp_SPI, self.sa.rspi)
884         plain = self.sa.hmac_and_decrypt(ih)
885         d = ikev2.IKEv2_payload_Delete(plain)
886         self.assertEqual(d.proto, 1)  # proto=IKEv2
887         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
888                              flags='Response', exch_type='INFORMATIONAL',
889                              id=ih.id, next_payload='Encrypted')
890         resp = self.encrypt_ike_msg(header, b'', None)
891         self.send_and_assert_no_replies(self.pg0, resp)
892
893     def verify_del_sa(self, packet):
894         ih = self.get_ike_header(packet)
895         self.assertEqual(ih.id, self.sa.msg_id)
896         self.assertEqual(ih.exch_type, 37)  # exchange informational
897         self.assertIn('Response', ih.flags)
898         self.assertIn('Initiator', ih.flags)
899         plain = self.sa.hmac_and_decrypt(ih)
900         self.assertEqual(plain, b'')
901
902     def initiate_del_sa_from_responder(self):
903         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
904                              exch_type='INFORMATIONAL',
905                              id=self.sa.new_msg_id())
906         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
907         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
908         packet = self.create_packet(self.pg0, ike_msg,
909                                     self.sa.sport, self.sa.dport,
910                                     self.sa.natt, self.ip6)
911         self.pg0.add_stream(packet)
912         self.pg0.enable_capture()
913         self.pg_start()
914         capture = self.pg0.get_capture(1)
915         self.verify_del_sa(capture[0])
916
917     @staticmethod
918     def find_notify_payload(packet, notify_type):
919         n = packet[ikev2.IKEv2_payload_Notify]
920         while n is not None:
921             if n.type == notify_type:
922                 return n
923             n = n.payload
924         return None
925
926     def verify_nat_detection(self, packet):
927         if self.ip6:
928             iph = packet[IPv6]
929         else:
930             iph = packet[IP]
931         udp = packet[UDP]
932
933         # NAT_DETECTION_SOURCE_IP
934         s = self.find_notify_payload(packet, 16388)
935         self.assertIsNotNone(s)
936         src_sha = self.sa.compute_nat_sha1(
937                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
938         self.assertEqual(s.load, src_sha)
939
940         # NAT_DETECTION_DESTINATION_IP
941         s = self.find_notify_payload(packet, 16389)
942         self.assertIsNotNone(s)
943         dst_sha = self.sa.compute_nat_sha1(
944                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
945         self.assertEqual(s.load, dst_sha)
946
947     def verify_sa_init_request(self, packet):
948         udp = packet[UDP]
949         self.sa.dport = udp.sport
950         ih = packet[ikev2.IKEv2]
951         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
952         self.assertEqual(ih.exch_type, 34)  # SA_INIT
953         self.sa.ispi = ih.init_SPI
954         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
955         self.assertIn('Initiator', ih.flags)
956         self.assertNotIn('Response', ih.flags)
957         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
958         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
959
960         prop = packet[ikev2.IKEv2_payload_Proposal]
961         self.assertEqual(prop.proto, 1)  # proto = ikev2
962         self.assertEqual(prop.proposal, 1)
963         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
964         self.assertEqual(prop.trans[0].transform_id,
965                          self.p.ike_transforms['crypto_alg'])
966         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
967         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
968         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
969         self.assertEqual(prop.trans[2].transform_id,
970                          self.p.ike_transforms['dh_group'])
971
972         self.verify_nat_detection(packet)
973         self.sa.set_ike_props(
974                     crypto='AES-GCM-16ICV', crypto_key_len=32,
975                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
976         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
977                               integ='SHA2-256-128')
978         self.sa.generate_dh_data()
979         self.sa.complete_dh_data()
980         self.sa.calc_keys()
981
982     def update_esp_transforms(self, trans, sa):
983         while trans:
984             if trans.transform_type == 1:  # ecryption
985                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
986             elif trans.transform_type == 3:  # integrity
987                 sa.esp_integ = INTEG_IDS[trans.transform_id]
988             trans = trans.payload
989
990     def verify_sa_auth_req(self, packet):
991         udp = packet[UDP]
992         self.sa.dport = udp.sport
993         ih = self.get_ike_header(packet)
994         self.assertEqual(ih.resp_SPI, self.sa.rspi)
995         self.assertEqual(ih.init_SPI, self.sa.ispi)
996         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
997         self.assertIn('Initiator', ih.flags)
998         self.assertNotIn('Response', ih.flags)
999
1000         udp = packet[UDP]
1001         self.verify_udp(udp)
1002         self.assertEqual(ih.id, self.sa.msg_id + 1)
1003         self.sa.msg_id += 1
1004         plain = self.sa.hmac_and_decrypt(ih)
1005         idi = ikev2.IKEv2_payload_IDi(plain)
1006         self.assertEqual(idi.load, self.sa.i_id)
1007         if self.no_idr_auth:
1008             self.assertEqual(idi.next_payload, 39)  # AUTH
1009         else:
1010             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1011             self.assertEqual(idr.load, self.sa.r_id)
1012         prop = idi[ikev2.IKEv2_payload_Proposal]
1013         c = self.sa.child_sas[0]
1014         c.ispi = prop.SPI
1015         self.update_esp_transforms(
1016                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1017
1018     def send_init_response(self):
1019         tr_attr = self.sa.ike_crypto_attr()
1020         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1021                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1022                  key_length=tr_attr[0]) /
1023                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1024                  transform_id=self.sa.ike_integ) /
1025                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1026                  transform_id=self.sa.ike_prf_alg.name) /
1027                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1028                  transform_id=self.sa.ike_dh))
1029         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1030                  trans_nb=4, trans=trans))
1031
1032         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1033         if self.sa.natt:
1034             dst_address = b'\x0a\x0a\x0a\x0a'
1035         else:
1036             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1037         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1038         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1039
1040         self.sa.init_resp_packet = (
1041             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1042                         exch_type='IKE_SA_INIT', flags='Response') /
1043             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1044             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1045                                    group=self.sa.ike_dh,
1046                                    load=self.sa.my_dh_pub_key) /
1047             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1048                                       next_payload='Notify') /
1049             ikev2.IKEv2_payload_Notify(
1050                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1051                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1052                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1053
1054         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1055                                      self.sa.sport, self.sa.dport,
1056                                      False, self.ip6)
1057         self.pg_send(self.pg0, ike_msg)
1058         capture = self.pg0.get_capture(1)
1059         self.verify_sa_auth_req(capture[0])
1060
1061     def initiate_sa_init(self):
1062         self.pg0.enable_capture()
1063         self.pg_start()
1064         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1065
1066         capture = self.pg0.get_capture(1)
1067         self.verify_sa_init_request(capture[0])
1068         self.send_init_response()
1069
1070     def send_auth_response(self):
1071         tr_attr = self.sa.esp_crypto_attr()
1072         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1073                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1074                  key_length=tr_attr[0]) /
1075                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1076                  transform_id=self.sa.esp_integ) /
1077                  ikev2.IKEv2_payload_Transform(
1078                  transform_type='Extended Sequence Number',
1079                  transform_id='No ESN') /
1080                  ikev2.IKEv2_payload_Transform(
1081                  transform_type='Extended Sequence Number',
1082                  transform_id='ESN'))
1083
1084         c = self.sa.child_sas[0]
1085         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1086                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1087
1088         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1089         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1090                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1091                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1092                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1093                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1094                  auth_type=AuthMethod.value(self.sa.auth_method),
1095                  load=self.sa.auth_data) /
1096                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1097                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1098                  number_of_TSs=len(tsi),
1099                  traffic_selector=tsi) /
1100                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1101                  number_of_TSs=len(tsr),
1102                  traffic_selector=tsr) /
1103                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1104
1105         header = ikev2.IKEv2(
1106                 init_SPI=self.sa.ispi,
1107                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1108                 flags='Response', exch_type='IKE_AUTH')
1109
1110         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1111         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1112                                     self.sa.dport, self.sa.natt, self.ip6)
1113         self.pg_send(self.pg0, packet)
1114
1115     def test_initiator(self):
1116         self.initiate_sa_init()
1117         self.sa.auth_init()
1118         self.sa.calc_child_keys()
1119         self.send_auth_response()
1120         self.verify_ike_sas()
1121
1122
1123 class TemplateResponder(IkePeer):
1124     """ responder test template """
1125
1126     def initiate_del_sa_from_responder(self):
1127         self.pg0.enable_capture()
1128         self.pg_start()
1129         self.vapi.ikev2_initiate_del_ike_sa(
1130                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1131         capture = self.pg0.get_capture(1)
1132         ih = self.get_ike_header(capture[0])
1133         self.assertNotIn('Response', ih.flags)
1134         self.assertNotIn('Initiator', ih.flags)
1135         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1136         plain = self.sa.hmac_and_decrypt(ih)
1137         d = ikev2.IKEv2_payload_Delete(plain)
1138         self.assertEqual(d.proto, 1)  # proto=IKEv2
1139         self.assertEqual(ih.init_SPI, self.sa.ispi)
1140         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1141         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1142                              flags='Initiator+Response',
1143                              exch_type='INFORMATIONAL',
1144                              id=ih.id, next_payload='Encrypted')
1145         resp = self.encrypt_ike_msg(header, b'', None)
1146         self.send_and_assert_no_replies(self.pg0, resp)
1147
1148     def verify_del_sa(self, packet):
1149         ih = self.get_ike_header(packet)
1150         self.assertEqual(ih.id, self.sa.msg_id)
1151         self.assertEqual(ih.exch_type, 37)  # exchange informational
1152         self.assertIn('Response', ih.flags)
1153         self.assertNotIn('Initiator', ih.flags)
1154         self.assertEqual(ih.next_payload, 46)  # Encrypted
1155         self.assertEqual(ih.init_SPI, self.sa.ispi)
1156         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1157         plain = self.sa.hmac_and_decrypt(ih)
1158         self.assertEqual(plain, b'')
1159
1160     def initiate_del_sa_from_initiator(self):
1161         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1162                              flags='Initiator', exch_type='INFORMATIONAL',
1163                              id=self.sa.new_msg_id())
1164         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1165         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1166         packet = self.create_packet(self.pg0, ike_msg,
1167                                     self.sa.sport, self.sa.dport,
1168                                     self.sa.natt, self.ip6)
1169         self.pg0.add_stream(packet)
1170         self.pg0.enable_capture()
1171         self.pg_start()
1172         capture = self.pg0.get_capture(1)
1173         self.verify_del_sa(capture[0])
1174
1175     def send_sa_init_req(self):
1176         tr_attr = self.sa.ike_crypto_attr()
1177         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1178                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1179                  key_length=tr_attr[0]) /
1180                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1181                  transform_id=self.sa.ike_integ) /
1182                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1183                  transform_id=self.sa.ike_prf_alg.name) /
1184                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1185                  transform_id=self.sa.ike_dh))
1186
1187         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1188                  trans_nb=4, trans=trans))
1189
1190         next_payload = None if self.ip6 else 'Notify'
1191
1192         self.sa.init_req_packet = (
1193                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1194                             flags='Initiator', exch_type='IKE_SA_INIT') /
1195                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1196                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1197                                        group=self.sa.ike_dh,
1198                                        load=self.sa.my_dh_pub_key) /
1199                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1200                                           load=self.sa.i_nonce))
1201
1202         if not self.ip6:
1203             if self.sa.i_natt:
1204                 src_address = b'\x0a\x0a\x0a\x01'
1205             else:
1206                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1207
1208             if self.sa.r_natt:
1209                 dst_address = b'\x0a\x0a\x0a\x0a'
1210             else:
1211                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1212
1213             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1214             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1215             nat_src_detection = ikev2.IKEv2_payload_Notify(
1216                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1217                     next_payload='Notify')
1218             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1219                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1220             self.sa.init_req_packet = (self.sa.init_req_packet /
1221                                        nat_src_detection /
1222                                        nat_dst_detection)
1223
1224         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1225                                      self.sa.sport, self.sa.dport,
1226                                      self.sa.natt, self.ip6)
1227         self.pg0.add_stream(ike_msg)
1228         self.pg0.enable_capture()
1229         self.pg_start()
1230         capture = self.pg0.get_capture(1)
1231         self.verify_sa_init(capture[0])
1232
1233     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1234         tr_attr = self.sa.esp_crypto_attr()
1235         last_payload = last_payload or 'Notify'
1236         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1237                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1238                  key_length=tr_attr[0]) /
1239                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1240                  transform_id=self.sa.esp_integ) /
1241                  ikev2.IKEv2_payload_Transform(
1242                  transform_type='Extended Sequence Number',
1243                  transform_id='No ESN') /
1244                  ikev2.IKEv2_payload_Transform(
1245                  transform_type='Extended Sequence Number',
1246                  transform_id='ESN'))
1247
1248         c = self.sa.child_sas[0]
1249         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1250                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1251
1252         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1253         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1254                  auth_type=AuthMethod.value(self.sa.auth_method),
1255                  load=self.sa.auth_data) /
1256                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1257                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1258                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1259                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1260                  number_of_TSs=len(tsr), traffic_selector=tsr))
1261
1262         if is_rekey:
1263             first_payload = 'Nonce'
1264             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1265                      next_payload='SA') / plain /
1266                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1267                      proto='ESP', SPI=c.ispi))
1268         else:
1269             first_payload = 'IDi'
1270             if self.no_idr_auth:
1271                 ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH',
1272                                               IDtype=self.sa.id_type,
1273                                               load=self.sa.i_id)
1274             else:
1275                 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1276                        IDtype=self.sa.id_type, load=self.sa.i_id) /
1277                        ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1278                        IDtype=self.sa.id_type, load=self.sa.r_id))
1279             plain = ids / plain
1280         return plain, first_payload
1281
1282     def send_sa_auth(self):
1283         plain, first_payload = self.generate_auth_payload(
1284                     last_payload='Notify')
1285         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1286         header = ikev2.IKEv2(
1287                 init_SPI=self.sa.ispi,
1288                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1289                 flags='Initiator', exch_type='IKE_AUTH')
1290
1291         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1292         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1293                                     self.sa.dport, self.sa.natt, self.ip6)
1294         self.pg0.add_stream(packet)
1295         self.pg0.enable_capture()
1296         self.pg_start()
1297         capture = self.pg0.get_capture(1)
1298         self.verify_sa_auth_resp(capture[0])
1299
1300     def verify_sa_init(self, packet):
1301         ih = self.get_ike_header(packet)
1302
1303         self.assertEqual(ih.id, self.sa.msg_id)
1304         self.assertEqual(ih.exch_type, 34)
1305         self.assertIn('Response', ih.flags)
1306         self.assertEqual(ih.init_SPI, self.sa.ispi)
1307         self.assertNotEqual(ih.resp_SPI, 0)
1308         self.sa.rspi = ih.resp_SPI
1309         try:
1310             sa = ih[ikev2.IKEv2_payload_SA]
1311             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1312             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1313         except IndexError as e:
1314             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1315             self.logger.error(ih.show())
1316             raise
1317         self.sa.complete_dh_data()
1318         self.sa.calc_keys()
1319         self.sa.auth_init()
1320
1321     def verify_sa_auth_resp(self, packet):
1322         ike = self.get_ike_header(packet)
1323         udp = packet[UDP]
1324         self.verify_udp(udp)
1325         self.assertEqual(ike.id, self.sa.msg_id)
1326         plain = self.sa.hmac_and_decrypt(ike)
1327         idr = ikev2.IKEv2_payload_IDr(plain)
1328         prop = idr[ikev2.IKEv2_payload_Proposal]
1329         self.assertEqual(prop.SPIsize, 4)
1330         self.sa.child_sas[0].rspi = prop.SPI
1331         self.sa.calc_child_keys()
1332
1333     IKE_NODE_SUFFIX = 'ip4'
1334
1335     def verify_counters(self):
1336         self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1337         self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1338         self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1339
1340         r = self.vapi.ikev2_sa_dump()
1341         s = r[0].sa.stats
1342         self.assertEqual(1, s.n_sa_auth_req)
1343         self.assertEqual(1, s.n_sa_init_req)
1344
1345     def test_responder(self):
1346         self.send_sa_init_req()
1347         self.send_sa_auth()
1348         self.verify_ipsec_sas()
1349         self.verify_ike_sas()
1350         self.verify_counters()
1351
1352
1353 class Ikev2Params(object):
1354     def config_params(self, params={}):
1355         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1356         ei = VppEnum.vl_api_ipsec_integ_alg_t
1357         self.vpp_enums = {
1358                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1359                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1360                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1361                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1362                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1363                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1364
1365                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1366                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1367                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1368                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1369
1370         dpd_disabled = True if 'dpd_disabled' not in params else\
1371             params['dpd_disabled']
1372         if dpd_disabled:
1373             self.vapi.cli('ikev2 dpd disable')
1374         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1375             not in params else params['del_sa_from_responder']
1376         i_natt = False if 'i_natt' not in params else params['i_natt']
1377         r_natt = False if 'r_natt' not in params else params['r_natt']
1378         self.p = Profile(self, 'pr1')
1379         self.ip6 = False if 'ip6' not in params else params['ip6']
1380
1381         if 'auth' in params and params['auth'] == 'rsa-sig':
1382             auth_method = 'rsa-sig'
1383             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1384             self.vapi.ikev2_set_local_key(
1385                     key_file=work_dir + params['server-key'])
1386
1387             client_file = work_dir + params['client-cert']
1388             server_pem = open(work_dir + params['server-cert']).read()
1389             client_priv = open(work_dir + params['client-key']).read()
1390             client_priv = load_pem_private_key(str.encode(client_priv), None,
1391                                                default_backend())
1392             self.peer_cert = x509.load_pem_x509_certificate(
1393                     str.encode(server_pem),
1394                     default_backend())
1395             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1396             auth_data = None
1397         else:
1398             auth_data = b'$3cr3tpa$$w0rd'
1399             self.p.add_auth(method='shared-key', data=auth_data)
1400             auth_method = 'shared-key'
1401             client_priv = None
1402
1403         is_init = True if 'is_initiator' not in params else\
1404             params['is_initiator']
1405         self.no_idr_auth = params.get('no_idr_in_auth', False)
1406
1407         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1408         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1409         r_id = self.idr = idr['data']
1410         i_id = self.idi = idi['data']
1411         if is_init:
1412             # scapy is initiator, VPP is responder
1413             self.p.add_local_id(**idr)
1414             self.p.add_remote_id(**idi)
1415             if self.no_idr_auth:
1416                 r_id = None
1417         else:
1418             # VPP is initiator, scapy is responder
1419             self.p.add_local_id(**idi)
1420             if not self.no_idr_auth:
1421                 self.p.add_remote_id(**idr)
1422
1423         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1424             'loc_ts' not in params else params['loc_ts']
1425         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1426             'rem_ts' not in params else params['rem_ts']
1427         self.p.add_local_ts(**loc_ts)
1428         self.p.add_remote_ts(**rem_ts)
1429         if 'responder' in params:
1430             self.p.add_responder(params['responder'])
1431         if 'ike_transforms' in params:
1432             self.p.add_ike_transforms(params['ike_transforms'])
1433         if 'esp_transforms' in params:
1434             self.p.add_esp_transforms(params['esp_transforms'])
1435
1436         udp_encap = False if 'udp_encap' not in params else\
1437             params['udp_encap']
1438         if udp_encap:
1439             self.p.set_udp_encap(True)
1440
1441         if 'responder_hostname' in params:
1442             hn = params['responder_hostname']
1443             self.p.add_responder_hostname(hn)
1444
1445             # configure static dns record
1446             self.vapi.dns_name_server_add_del(
1447                 is_ip6=0, is_add=1,
1448                 server_address=IPv4Address(u'8.8.8.8').packed)
1449             self.vapi.dns_enable_disable(enable=1)
1450
1451             cmd = "dns cache add {} {}".format(hn['hostname'],
1452                                                self.pg0.remote_ip4)
1453             self.vapi.cli(cmd)
1454
1455         self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id,
1456                           is_initiator=is_init,
1457                           id_type=self.p.local_id['id_type'],
1458                           i_natt=i_natt, r_natt=r_natt,
1459                           priv_key=client_priv, auth_method=auth_method,
1460                           nonce=params.get('nonce'),
1461                           auth_data=auth_data, udp_encap=udp_encap,
1462                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1463
1464         if is_init:
1465             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1466                 params['ike-crypto']
1467             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1468                 params['ike-integ']
1469             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1470                 params['ike-dh']
1471
1472             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1473                 params['esp-crypto']
1474             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1475                 params['esp-integ']
1476
1477             self.sa.set_ike_props(
1478                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1479                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1480             self.sa.set_esp_props(
1481                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1482                     integ=esp_integ)
1483
1484
1485 class TestApi(VppTestCase):
1486     """ Test IKEV2 API """
1487     @classmethod
1488     def setUpClass(cls):
1489         super(TestApi, cls).setUpClass()
1490
1491     @classmethod
1492     def tearDownClass(cls):
1493         super(TestApi, cls).tearDownClass()
1494
1495     def tearDown(self):
1496         super(TestApi, self).tearDown()
1497         self.p1.remove_vpp_config()
1498         self.p2.remove_vpp_config()
1499         r = self.vapi.ikev2_profile_dump()
1500         self.assertEqual(len(r), 0)
1501
1502     def configure_profile(self, cfg):
1503         p = Profile(self, cfg['name'])
1504         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1505         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1506         p.add_local_ts(**cfg['loc_ts'])
1507         p.add_remote_ts(**cfg['rem_ts'])
1508         p.add_responder(cfg['responder'])
1509         p.add_ike_transforms(cfg['ike_ts'])
1510         p.add_esp_transforms(cfg['esp_ts'])
1511         p.add_auth(**cfg['auth'])
1512         p.set_udp_encap(cfg['udp_encap'])
1513         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1514         if 'lifetime_data' in cfg:
1515             p.set_lifetime_data(cfg['lifetime_data'])
1516         if 'tun_itf' in cfg:
1517             p.set_tunnel_interface(cfg['tun_itf'])
1518         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1519             p.disable_natt()
1520         p.add_vpp_config()
1521         return p
1522
1523     def test_profile_api(self):
1524         """ test profile dump API """
1525         loc_ts4 = {
1526                     'proto': 8,
1527                     'start_port': 1,
1528                     'end_port': 19,
1529                     'start_addr': '3.3.3.2',
1530                     'end_addr': '3.3.3.3',
1531                 }
1532         rem_ts4 = {
1533                     'proto': 9,
1534                     'start_port': 10,
1535                     'end_port': 119,
1536                     'start_addr': '4.5.76.80',
1537                     'end_addr': '2.3.4.6',
1538                 }
1539
1540         loc_ts6 = {
1541                     'proto': 8,
1542                     'start_port': 1,
1543                     'end_port': 19,
1544                     'start_addr': 'ab::1',
1545                     'end_addr': 'ab::4',
1546                 }
1547         rem_ts6 = {
1548                     'proto': 9,
1549                     'start_port': 10,
1550                     'end_port': 119,
1551                     'start_addr': 'cd::12',
1552                     'end_addr': 'cd::13',
1553                 }
1554
1555         conf = {
1556             'p1': {
1557                 'name': 'p1',
1558                 'natt_disabled': True,
1559                 'loc_id': ('fqdn', b'vpp.home'),
1560                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1561                 'loc_ts': loc_ts4,
1562                 'rem_ts': rem_ts4,
1563                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1564                 'ike_ts': {
1565                         'crypto_alg': 20,
1566                         'crypto_key_size': 32,
1567                         'integ_alg': 0,
1568                         'dh_group': 1},
1569                 'esp_ts': {
1570                         'crypto_alg': 13,
1571                         'crypto_key_size': 24,
1572                         'integ_alg': 2},
1573                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1574                 'udp_encap': True,
1575                 'ipsec_over_udp_port': 4501,
1576                 'lifetime_data': {
1577                     'lifetime': 123,
1578                     'lifetime_maxdata': 20192,
1579                     'lifetime_jitter': 9,
1580                     'handover': 132},
1581             },
1582             'p2': {
1583                 'name': 'p2',
1584                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1585                 'rem_id': ('ip6-addr', b'abcd::1'),
1586                 'loc_ts': loc_ts6,
1587                 'rem_ts': rem_ts6,
1588                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1589                 'ike_ts': {
1590                         'crypto_alg': 12,
1591                         'crypto_key_size': 16,
1592                         'integ_alg': 3,
1593                         'dh_group': 3},
1594                 'esp_ts': {
1595                         'crypto_alg': 9,
1596                         'crypto_key_size': 24,
1597                         'integ_alg': 4},
1598                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1599                 'udp_encap': False,
1600                 'ipsec_over_udp_port': 4600,
1601                 'tun_itf': 0}
1602         }
1603         self.p1 = self.configure_profile(conf['p1'])
1604         self.p2 = self.configure_profile(conf['p2'])
1605
1606         r = self.vapi.ikev2_profile_dump()
1607         self.assertEqual(len(r), 2)
1608         self.verify_profile(r[0].profile, conf['p1'])
1609         self.verify_profile(r[1].profile, conf['p2'])
1610
1611     def verify_id(self, api_id, cfg_id):
1612         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1613         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1614
1615     def verify_ts(self, api_ts, cfg_ts):
1616         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1617         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1618         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1619         self.assertEqual(api_ts.start_addr,
1620                          ip_address(text_type(cfg_ts['start_addr'])))
1621         self.assertEqual(api_ts.end_addr,
1622                          ip_address(text_type(cfg_ts['end_addr'])))
1623
1624     def verify_responder(self, api_r, cfg_r):
1625         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1626         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1627
1628     def verify_transforms(self, api_ts, cfg_ts):
1629         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1630         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1631         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1632
1633     def verify_ike_transforms(self, api_ts, cfg_ts):
1634         self.verify_transforms(api_ts, cfg_ts)
1635         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1636
1637     def verify_esp_transforms(self, api_ts, cfg_ts):
1638         self.verify_transforms(api_ts, cfg_ts)
1639
1640     def verify_auth(self, api_auth, cfg_auth):
1641         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1642         self.assertEqual(api_auth.data, cfg_auth['data'])
1643         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1644
1645     def verify_lifetime_data(self, p, ld):
1646         self.assertEqual(p.lifetime, ld['lifetime'])
1647         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1648         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1649         self.assertEqual(p.handover, ld['handover'])
1650
1651     def verify_profile(self, ap, cp):
1652         self.assertEqual(ap.name, cp['name'])
1653         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1654         self.verify_id(ap.loc_id, cp['loc_id'])
1655         self.verify_id(ap.rem_id, cp['rem_id'])
1656         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1657         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1658         self.verify_responder(ap.responder, cp['responder'])
1659         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1660         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1661         self.verify_auth(ap.auth, cp['auth'])
1662         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1663         self.assertTrue(natt_dis == ap.natt_disabled)
1664
1665         if 'lifetime_data' in cp:
1666             self.verify_lifetime_data(ap, cp['lifetime_data'])
1667         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1668         if 'tun_itf' in cp:
1669             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1670         else:
1671             self.assertEqual(ap.tun_itf, 0xffffffff)
1672
1673
1674 @tag_fixme_vpp_workers
1675 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1676     """ test responder - responder behind NAT """
1677
1678     IKE_NODE_SUFFIX = 'ip4-natt'
1679
1680     def config_tc(self):
1681         self.config_params({'r_natt': True})
1682
1683
1684 @tag_fixme_vpp_workers
1685 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1686     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1687
1688     def config_tc(self):
1689         self.config_params({
1690             'i_natt': True,
1691             'is_initiator': False,  # seen from test case perspective
1692                                     # thus vpp is initiator
1693             'responder': {'sw_if_index': self.pg0.sw_if_index,
1694                            'addr': self.pg0.remote_ip4},
1695             'ike-crypto': ('AES-GCM-16ICV', 32),
1696             'ike-integ': 'NULL',
1697             'ike-dh': '3072MODPgr',
1698             'ike_transforms': {
1699                 'crypto_alg': 20,  # "aes-gcm-16"
1700                 'crypto_key_size': 256,
1701                 'dh_group': 15,  # "modp-3072"
1702             },
1703             'esp_transforms': {
1704                 'crypto_alg': 12,  # "aes-cbc"
1705                 'crypto_key_size': 256,
1706                 # "hmac-sha2-256-128"
1707                 'integ_alg': 12}})
1708
1709
1710 @tag_fixme_vpp_workers
1711 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1712     """ test ikev2 initiator - pre shared key auth """
1713
1714     def config_tc(self):
1715         self.config_params({
1716             'is_initiator': False,  # seen from test case perspective
1717                                     # thus vpp is initiator
1718             'ike-crypto': ('AES-GCM-16ICV', 32),
1719             'ike-integ': 'NULL',
1720             'ike-dh': '3072MODPgr',
1721             'ike_transforms': {
1722                 'crypto_alg': 20,  # "aes-gcm-16"
1723                 'crypto_key_size': 256,
1724                 'dh_group': 15,  # "modp-3072"
1725             },
1726             'esp_transforms': {
1727                 'crypto_alg': 12,  # "aes-cbc"
1728                 'crypto_key_size': 256,
1729                 # "hmac-sha2-256-128"
1730                 'integ_alg': 12},
1731             'responder_hostname': {'hostname': 'vpp.responder.org',
1732                                    'sw_if_index': self.pg0.sw_if_index}})
1733
1734
1735 @tag_fixme_vpp_workers
1736 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1737     """ test initiator - request window size (1) """
1738
1739     def rekey_respond(self, req, update_child_sa_data):
1740         ih = self.get_ike_header(req)
1741         plain = self.sa.hmac_and_decrypt(ih)
1742         sa = ikev2.IKEv2_payload_SA(plain)
1743         if update_child_sa_data:
1744             prop = sa[ikev2.IKEv2_payload_Proposal]
1745             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1746             self.sa.r_nonce = self.sa.i_nonce
1747             self.sa.child_sas[0].ispi = prop.SPI
1748             self.sa.child_sas[0].rspi = prop.SPI
1749             self.sa.calc_child_keys()
1750
1751         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1752                              flags='Response', exch_type=36,
1753                              id=ih.id, next_payload='Encrypted')
1754         resp = self.encrypt_ike_msg(header, sa, 'SA')
1755         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1756                                     self.sa.dport, self.sa.natt, self.ip6)
1757         self.send_and_assert_no_replies(self.pg0, packet)
1758
1759     def test_initiator(self):
1760         super(TestInitiatorRequestWindowSize, self).test_initiator()
1761         self.pg0.enable_capture()
1762         self.pg_start()
1763         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1764         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1765         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1766         capture = self.pg0.get_capture(2)
1767
1768         # reply in reverse order
1769         self.rekey_respond(capture[1], True)
1770         self.rekey_respond(capture[0], False)
1771
1772         # verify that only the second request was accepted
1773         self.verify_ike_sas()
1774         self.verify_ipsec_sas(is_rekey=True)
1775
1776
1777 @tag_fixme_vpp_workers
1778 class TestInitiatorRekey(TestInitiatorPsk):
1779     """ test ikev2 initiator - rekey """
1780
1781     def rekey_from_initiator(self):
1782         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1783         self.pg0.enable_capture()
1784         self.pg_start()
1785         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1786         capture = self.pg0.get_capture(1)
1787         ih = self.get_ike_header(capture[0])
1788         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1789         self.assertNotIn('Response', ih.flags)
1790         self.assertIn('Initiator', ih.flags)
1791         plain = self.sa.hmac_and_decrypt(ih)
1792         sa = ikev2.IKEv2_payload_SA(plain)
1793         prop = sa[ikev2.IKEv2_payload_Proposal]
1794         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1795         self.sa.r_nonce = self.sa.i_nonce
1796         # update new responder SPI
1797         self.sa.child_sas[0].ispi = prop.SPI
1798         self.sa.child_sas[0].rspi = prop.SPI
1799         self.sa.calc_child_keys()
1800         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1801                              flags='Response', exch_type=36,
1802                              id=ih.id, next_payload='Encrypted')
1803         resp = self.encrypt_ike_msg(header, sa, 'SA')
1804         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1805                                     self.sa.dport, self.sa.natt, self.ip6)
1806         self.send_and_assert_no_replies(self.pg0, packet)
1807
1808     def test_initiator(self):
1809         super(TestInitiatorRekey, self).test_initiator()
1810         self.rekey_from_initiator()
1811         self.verify_ike_sas()
1812         self.verify_ipsec_sas(is_rekey=True)
1813
1814
1815 @tag_fixme_vpp_workers
1816 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1817     """ test ikev2 initiator - delete IKE SA from responder """
1818
1819     def config_tc(self):
1820         self.config_params({
1821             'del_sa_from_responder': True,
1822             'is_initiator': False,  # seen from test case perspective
1823                                     # thus vpp is initiator
1824             'responder': {'sw_if_index': self.pg0.sw_if_index,
1825                            'addr': self.pg0.remote_ip4},
1826             'ike-crypto': ('AES-GCM-16ICV', 32),
1827             'ike-integ': 'NULL',
1828             'ike-dh': '3072MODPgr',
1829             'ike_transforms': {
1830                 'crypto_alg': 20,  # "aes-gcm-16"
1831                 'crypto_key_size': 256,
1832                 'dh_group': 15,  # "modp-3072"
1833             },
1834             'esp_transforms': {
1835                 'crypto_alg': 12,  # "aes-cbc"
1836                 'crypto_key_size': 256,
1837                 # "hmac-sha2-256-128"
1838                 'integ_alg': 12},
1839             'no_idr_in_auth': True})
1840
1841
1842 @tag_fixme_vpp_workers
1843 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1844     """ test ikev2 responder - initiator behind NAT """
1845
1846     IKE_NODE_SUFFIX = 'ip4-natt'
1847
1848     def config_tc(self):
1849         self.config_params(
1850                 {'i_natt': True})
1851
1852
1853 @tag_fixme_vpp_workers
1854 class TestResponderPsk(TemplateResponder, Ikev2Params):
1855     """ test ikev2 responder - pre shared key auth """
1856     def config_tc(self):
1857         self.config_params()
1858
1859
1860 @tag_fixme_vpp_workers
1861 class TestResponderDpd(TestResponderPsk):
1862     """
1863     Dead peer detection test
1864     """
1865     def config_tc(self):
1866         self.config_params({'dpd_disabled': False})
1867
1868     def tearDown(self):
1869         pass
1870
1871     def test_responder(self):
1872         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1873         super(TestResponderDpd, self).test_responder()
1874         self.pg0.enable_capture()
1875         self.pg_start()
1876         # capture empty request but don't reply
1877         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1878         ih = self.get_ike_header(capture[0])
1879         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1880         plain = self.sa.hmac_and_decrypt(ih)
1881         self.assertEqual(plain, b'')
1882         # wait for SA expiration
1883         time.sleep(3)
1884         ike_sas = self.vapi.ikev2_sa_dump()
1885         self.assertEqual(len(ike_sas), 0)
1886         ipsec_sas = self.vapi.ipsec_sa_dump()
1887         self.assertEqual(len(ipsec_sas), 0)
1888
1889
1890 @tag_fixme_vpp_workers
1891 class TestResponderRekey(TestResponderPsk):
1892     """ test ikev2 responder - rekey """
1893
1894     def rekey_from_initiator(self):
1895         packet = self.create_rekey_request()
1896         self.pg0.add_stream(packet)
1897         self.pg0.enable_capture()
1898         self.pg_start()
1899         capture = self.pg0.get_capture(1)
1900         ih = self.get_ike_header(capture[0])
1901         plain = self.sa.hmac_and_decrypt(ih)
1902         sa = ikev2.IKEv2_payload_SA(plain)
1903         prop = sa[ikev2.IKEv2_payload_Proposal]
1904         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1905         # update new responder SPI
1906         self.sa.child_sas[0].rspi = prop.SPI
1907
1908     def test_responder(self):
1909         super(TestResponderRekey, self).test_responder()
1910         self.rekey_from_initiator()
1911         self.sa.calc_child_keys()
1912         self.verify_ike_sas()
1913         self.verify_ipsec_sas(is_rekey=True)
1914         self.assert_counter(1, 'rekey_req', 'ip4')
1915         r = self.vapi.ikev2_sa_dump()
1916         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1917
1918
1919 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1920     """ test ikev2 responder - non-default table id """
1921
1922     @classmethod
1923     def setUpClass(cls):
1924         import scapy.contrib.ikev2 as _ikev2
1925         globals()['ikev2'] = _ikev2
1926         super(IkePeer, cls).setUpClass()
1927         cls.create_pg_interfaces(range(1))
1928         cls.vapi.cli("ip table add 1")
1929         cls.vapi.cli("set interface ip table pg0 1")
1930         for i in cls.pg_interfaces:
1931             i.admin_up()
1932             i.config_ip4()
1933             i.resolve_arp()
1934             i.config_ip6()
1935             i.resolve_ndp()
1936
1937     def config_tc(self):
1938         self.config_params({'dpd_disabled': False})
1939
1940     def test_responder(self):
1941         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1942         super(TestResponderVrf, self).test_responder()
1943         self.pg0.enable_capture()
1944         self.pg_start()
1945         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1946         ih = self.get_ike_header(capture[0])
1947         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1948         plain = self.sa.hmac_and_decrypt(ih)
1949         self.assertEqual(plain, b'')
1950
1951
1952 @tag_fixme_vpp_workers
1953 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1954     """ test ikev2 responder - cert based auth """
1955     def config_tc(self):
1956         self.config_params({
1957             'udp_encap': True,
1958             'auth': 'rsa-sig',
1959             'server-key': 'server-key.pem',
1960             'client-key': 'client-key.pem',
1961             'client-cert': 'client-cert.pem',
1962             'server-cert': 'server-cert.pem'})
1963
1964
1965 @tag_fixme_vpp_workers
1966 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1967         (TemplateResponder, Ikev2Params):
1968     """
1969     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1970     """
1971     def config_tc(self):
1972         self.config_params({
1973             'ike-crypto': ('AES-CBC', 16),
1974             'ike-integ': 'SHA2-256-128',
1975             'esp-crypto': ('AES-CBC', 24),
1976             'esp-integ': 'SHA2-384-192',
1977             'ike-dh': '2048MODPgr',
1978             'nonce': os.urandom(256),
1979             'no_idr_in_auth': True})
1980
1981
1982 @tag_fixme_vpp_workers
1983 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1984         (TemplateResponder, Ikev2Params):
1985
1986     """
1987     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1988     """
1989     def config_tc(self):
1990         self.config_params({
1991             'ike-crypto': ('AES-CBC', 32),
1992             'ike-integ': 'SHA2-256-128',
1993             'esp-crypto': ('AES-GCM-16ICV', 32),
1994             'esp-integ': 'NULL',
1995             'ike-dh': '3072MODPgr'})
1996
1997
1998 @tag_fixme_vpp_workers
1999 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2000     """
2001     IKE:AES_GCM_16_256
2002     """
2003
2004     IKE_NODE_SUFFIX = 'ip6'
2005
2006     def config_tc(self):
2007         self.config_params({
2008             'del_sa_from_responder': True,
2009             'ip6': True,
2010             'natt': True,
2011             'ike-crypto': ('AES-GCM-16ICV', 32),
2012             'ike-integ': 'NULL',
2013             'ike-dh': '2048MODPgr',
2014             'loc_ts': {'start_addr': 'ab:cd::0',
2015                        'end_addr': 'ab:cd::10'},
2016             'rem_ts': {'start_addr': '11::0',
2017                        'end_addr': '11::100'}})
2018
2019
2020 @tag_fixme_vpp_workers
2021 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2022     """
2023     Test for keep alive messages
2024     """
2025
2026     def send_empty_req_from_responder(self):
2027         packet = self.create_empty_request()
2028         self.pg0.add_stream(packet)
2029         self.pg0.enable_capture()
2030         self.pg_start()
2031         capture = self.pg0.get_capture(1)
2032         ih = self.get_ike_header(capture[0])
2033         self.assertEqual(ih.id, self.sa.msg_id)
2034         plain = self.sa.hmac_and_decrypt(ih)
2035         self.assertEqual(plain, b'')
2036         self.assert_counter(1, 'keepalive', 'ip4')
2037         r = self.vapi.ikev2_sa_dump()
2038         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2039
2040     def test_initiator(self):
2041         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2042         self.send_empty_req_from_responder()
2043
2044
2045 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2046     """ malformed packet test """
2047
2048     def tearDown(self):
2049         pass
2050
2051     def config_tc(self):
2052         self.config_params()
2053
2054     def create_ike_init_msg(self, length=None, payload=None):
2055         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2056                           flags='Initiator', exch_type='IKE_SA_INIT')
2057         if payload is not None:
2058             msg /= payload
2059         return self.create_packet(self.pg0, msg, self.sa.sport,
2060                                   self.sa.dport)
2061
2062     def verify_bad_packet_length(self):
2063         ike_msg = self.create_ike_init_msg(length=0xdead)
2064         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2065         self.assert_counter(self.pkt_count, 'bad_length')
2066
2067     def verify_bad_sa_payload_length(self):
2068         p = ikev2.IKEv2_payload_SA(length=0xdead)
2069         ike_msg = self.create_ike_init_msg(payload=p)
2070         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2071         self.assert_counter(self.pkt_count, 'malformed_packet')
2072
2073     def test_responder(self):
2074         self.pkt_count = 254
2075         self.verify_bad_packet_length()
2076         self.verify_bad_sa_payload_length()
2077
2078
2079 if __name__ == '__main__':
2080     unittest.main(testRunner=VppTestRunner)