ikev2: support responder hostname
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import tag_fixme_vpp_workers
22 from framework import VppTestCase, VppTestRunner
23 from vpp_ikev2 import Profile, IDType, AuthMethod
24 from vpp_papi import VppEnum
25
26 try:
27     text_type = unicode
28 except NameError:
29     text_type = str
30
31 KEY_PAD = b"Key Pad for IKEv2"
32 SALT_SIZE = 4
33 GCM_ICV_SIZE = 16
34 GCM_IV_SIZE = 8
35
36
37 # defined in rfc3526
38 # tuple structure is (p, g, key_len)
39 DH = {
40     '2048MODPgr': (long_converter("""
41     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
51     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52
53     '3072MODPgr': (long_converter("""
54     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
55     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
56     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
57     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
58     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
59     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
60     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
61     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
62     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
63     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
64     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
65     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
66     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
67     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
68     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
69     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
70 }
71
72
73 class CryptoAlgo(object):
74     def __init__(self, name, cipher, mode):
75         self.name = name
76         self.cipher = cipher
77         self.mode = mode
78         if self.cipher is not None:
79             self.bs = self.cipher.block_size // 8
80
81             if self.name == 'AES-GCM-16ICV':
82                 self.iv_len = GCM_IV_SIZE
83             else:
84                 self.iv_len = self.bs
85
86     def encrypt(self, data, key, aad=None):
87         iv = os.urandom(self.iv_len)
88         if aad is None:
89             encryptor = Cipher(self.cipher(key), self.mode(iv),
90                                default_backend()).encryptor()
91             return iv + encryptor.update(data) + encryptor.finalize()
92         else:
93             salt = key[-SALT_SIZE:]
94             nonce = salt + iv
95             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
96                                default_backend()).encryptor()
97             encryptor.authenticate_additional_data(aad)
98             data = encryptor.update(data) + encryptor.finalize()
99             data += encryptor.tag[:GCM_ICV_SIZE]
100             return iv + data
101
102     def decrypt(self, data, key, aad=None, icv=None):
103         if aad is None:
104             iv = data[:self.iv_len]
105             ct = data[self.iv_len:]
106             decryptor = Cipher(algorithms.AES(key),
107                                self.mode(iv),
108                                default_backend()).decryptor()
109             return decryptor.update(ct) + decryptor.finalize()
110         else:
111             salt = key[-SALT_SIZE:]
112             nonce = salt + data[:GCM_IV_SIZE]
113             ct = data[GCM_IV_SIZE:]
114             key = key[:-SALT_SIZE]
115             decryptor = Cipher(algorithms.AES(key),
116                                self.mode(nonce, icv, len(icv)),
117                                default_backend()).decryptor()
118             decryptor.authenticate_additional_data(aad)
119             return decryptor.update(ct) + decryptor.finalize()
120
121     def pad(self, data):
122         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
123         data = data + b'\x00' * (pad_len - 1)
124         return data + bytes([pad_len - 1])
125
126
127 class AuthAlgo(object):
128     def __init__(self, name, mac, mod, key_len, trunc_len=None):
129         self.name = name
130         self.mac = mac
131         self.mod = mod
132         self.key_len = key_len
133         self.trunc_len = trunc_len or key_len
134
135
136 CRYPTO_ALGOS = {
137     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
138     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
139     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
140                                 mode=modes.GCM),
141 }
142
143 AUTH_ALGOS = {
144     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
145     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
146     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
147     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
148     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
149 }
150
151 PRF_ALGOS = {
152     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
153     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
154                                   hashes.SHA256, 32),
155 }
156
157 CRYPTO_IDS = {
158     12: 'AES-CBC',
159     20: 'AES-GCM-16ICV',
160 }
161
162 INTEG_IDS = {
163     2: 'HMAC-SHA1-96',
164     12: 'SHA2-256-128',
165     13: 'SHA2-384-192',
166     14: 'SHA2-512-256',
167 }
168
169
170 class IKEv2ChildSA(object):
171     def __init__(self, local_ts, remote_ts, is_initiator):
172         spi = os.urandom(4)
173         if is_initiator:
174             self.ispi = spi
175             self.rspi = None
176         else:
177             self.rspi = spi
178             self.ispi = None
179         self.local_ts = local_ts
180         self.remote_ts = remote_ts
181
182
183 class IKEv2SA(object):
184     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
185                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
186                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
187                  auth_method='shared-key', priv_key=None, i_natt=False,
188                  r_natt=False, udp_encap=False):
189         self.udp_encap = udp_encap
190         self.i_natt = i_natt
191         self.r_natt = r_natt
192         if i_natt or r_natt:
193             self.sport = 4500
194             self.dport = 4500
195         else:
196             self.sport = 500
197             self.dport = 500
198         self.msg_id = 0
199         self.dh_params = None
200         self.test = test
201         self.priv_key = priv_key
202         self.is_initiator = is_initiator
203         nonce = nonce or os.urandom(32)
204         self.auth_data = auth_data
205         self.i_id = i_id
206         self.r_id = r_id
207         if isinstance(id_type, str):
208             self.id_type = IDType.value(id_type)
209         else:
210             self.id_type = id_type
211         self.auth_method = auth_method
212         if self.is_initiator:
213             self.rspi = 8 * b'\x00'
214             self.ispi = spi
215             self.i_nonce = nonce
216         else:
217             self.rspi = spi
218             self.ispi = 8 * b'\x00'
219             self.r_nonce = nonce
220         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
221                           self.is_initiator)]
222
223     def new_msg_id(self):
224         self.msg_id += 1
225         return self.msg_id
226
227     @property
228     def my_dh_pub_key(self):
229         if self.is_initiator:
230             return self.i_dh_data
231         return self.r_dh_data
232
233     @property
234     def peer_dh_pub_key(self):
235         if self.is_initiator:
236             return self.r_dh_data
237         return self.i_dh_data
238
239     @property
240     def natt(self):
241         return self.i_natt or self.r_natt
242
243     def compute_secret(self):
244         priv = self.dh_private_key
245         peer = self.peer_dh_pub_key
246         p, g, l = self.ike_group
247         return pow(int.from_bytes(peer, 'big'),
248                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249
250     def generate_dh_data(self):
251         # generate DH keys
252         if self.ike_dh not in DH:
253             raise NotImplementedError('%s not in DH group' % self.ike_dh)
254
255         if self.dh_params is None:
256             dhg = DH[self.ike_dh]
257             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
258             self.dh_params = pn.parameters(default_backend())
259
260         priv = self.dh_params.generate_private_key()
261         pub = priv.public_key()
262         x = priv.private_numbers().x
263         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
264         y = pub.public_numbers().y
265
266         if self.is_initiator:
267             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
268         else:
269             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
270
271     def complete_dh_data(self):
272         self.dh_shared_secret = self.compute_secret()
273
274     def calc_child_keys(self):
275         prf = self.ike_prf_alg.mod()
276         s = self.i_nonce + self.r_nonce
277         c = self.child_sas[0]
278
279         encr_key_len = self.esp_crypto_key_len
280         integ_key_len = self.esp_integ_alg.key_len
281         salt_len = 0 if integ_key_len else 4
282
283         l = (integ_key_len * 2 +
284              encr_key_len * 2 +
285              salt_len * 2)
286         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
287
288         pos = 0
289         c.sk_ei = keymat[pos:pos+encr_key_len]
290         pos += encr_key_len
291
292         if integ_key_len:
293             c.sk_ai = keymat[pos:pos+integ_key_len]
294             pos += integ_key_len
295         else:
296             c.salt_ei = keymat[pos:pos+salt_len]
297             pos += salt_len
298
299         c.sk_er = keymat[pos:pos+encr_key_len]
300         pos += encr_key_len
301
302         if integ_key_len:
303             c.sk_ar = keymat[pos:pos+integ_key_len]
304             pos += integ_key_len
305         else:
306             c.salt_er = keymat[pos:pos+salt_len]
307             pos += salt_len
308
309     def calc_prfplus(self, prf, key, seed, length):
310         r = b''
311         t = None
312         x = 1
313         while len(r) < length and x < 255:
314             if t is not None:
315                 s = t
316             else:
317                 s = b''
318             s = s + seed + bytes([x])
319             t = self.calc_prf(prf, key, s)
320             r = r + t
321             x = x + 1
322
323         if x == 255:
324             return None
325         return r
326
327     def calc_prf(self, prf, key, data):
328         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
329         h.update(data)
330         return h.finalize()
331
332     def calc_keys(self):
333         prf = self.ike_prf_alg.mod()
334         # SKEYSEED = prf(Ni | Nr, g^ir)
335         s = self.i_nonce + self.r_nonce
336         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337
338         # calculate S = Ni | Nr | SPIi SPIr
339         s = s + self.ispi + self.rspi
340
341         prf_key_trunc = self.ike_prf_alg.trunc_len
342         encr_key_len = self.ike_crypto_key_len
343         tr_prf_key_len = self.ike_prf_alg.key_len
344         integ_key_len = self.ike_integ_alg.key_len
345         if integ_key_len == 0:
346             salt_size = 4
347         else:
348             salt_size = 0
349
350         l = (prf_key_trunc +
351              integ_key_len * 2 +
352              encr_key_len * 2 +
353              tr_prf_key_len * 2 +
354              salt_size * 2)
355         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
356
357         pos = 0
358         self.sk_d = keymat[:pos+prf_key_trunc]
359         pos += prf_key_trunc
360
361         self.sk_ai = keymat[pos:pos+integ_key_len]
362         pos += integ_key_len
363         self.sk_ar = keymat[pos:pos+integ_key_len]
364         pos += integ_key_len
365
366         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
367         pos += encr_key_len + salt_size
368         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
369         pos += encr_key_len + salt_size
370
371         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
372         pos += tr_prf_key_len
373         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374
375     def generate_authmsg(self, prf, packet):
376         if self.is_initiator:
377             id = self.i_id
378             nonce = self.r_nonce
379             key = self.sk_pi
380         else:
381             id = self.r_id
382             nonce = self.i_nonce
383             key = self.sk_pr
384         data = bytes([self.id_type, 0, 0, 0]) + id
385         id_hash = self.calc_prf(prf, key, data)
386         return packet + nonce + id_hash
387
388     def auth_init(self):
389         prf = self.ike_prf_alg.mod()
390         if self.is_initiator:
391             packet = self.init_req_packet
392         else:
393             packet = self.init_resp_packet
394         authmsg = self.generate_authmsg(prf, raw(packet))
395         if self.auth_method == 'shared-key':
396             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
397             self.auth_data = self.calc_prf(prf, psk, authmsg)
398         elif self.auth_method == 'rsa-sig':
399             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
400                                                 hashes.SHA1())
401         else:
402             raise TypeError('unknown auth method type!')
403
404     def encrypt(self, data, aad=None):
405         data = self.ike_crypto_alg.pad(data)
406         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
407
408     @property
409     def peer_authkey(self):
410         if self.is_initiator:
411             return self.sk_ar
412         return self.sk_ai
413
414     @property
415     def my_authkey(self):
416         if self.is_initiator:
417             return self.sk_ai
418         return self.sk_ar
419
420     @property
421     def my_cryptokey(self):
422         if self.is_initiator:
423             return self.sk_ei
424         return self.sk_er
425
426     @property
427     def peer_cryptokey(self):
428         if self.is_initiator:
429             return self.sk_er
430         return self.sk_ei
431
432     def concat(self, alg, key_len):
433         return alg + '-' + str(key_len * 8)
434
435     @property
436     def vpp_ike_cypto_alg(self):
437         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
438
439     @property
440     def vpp_esp_cypto_alg(self):
441         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442
443     def verify_hmac(self, ikemsg):
444         integ_trunc = self.ike_integ_alg.trunc_len
445         exp_hmac = ikemsg[-integ_trunc:]
446         data = ikemsg[:-integ_trunc]
447         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
448                                           self.peer_authkey, data)
449         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450
451     def compute_hmac(self, integ, key, data):
452         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
453         h.update(data)
454         return h.finalize()
455
456     def decrypt(self, data, aad=None, icv=None):
457         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
458
459     def hmac_and_decrypt(self, ike):
460         ep = ike[ikev2.IKEv2_payload_Encrypted]
461         if self.ike_crypto == 'AES-GCM-16ICV':
462             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
463             ct = ep.load[:-GCM_ICV_SIZE]
464             tag = ep.load[-GCM_ICV_SIZE:]
465             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
466         else:
467             self.verify_hmac(raw(ike))
468             integ_trunc = self.ike_integ_alg.trunc_len
469
470             # remove ICV and decrypt payload
471             ct = ep.load[:-integ_trunc]
472             plain = self.decrypt(ct)
473         # remove padding
474         pad_len = plain[-1]
475         return plain[:-pad_len - 1]
476
477     def build_ts_addr(self, ts, version):
478         return {'starting_address_v' + version: ts['start_addr'],
479                 'ending_address_v' + version: ts['end_addr']}
480
481     def generate_ts(self, is_ip4):
482         c = self.child_sas[0]
483         ts_data = {'IP_protocol_ID': 0,
484                    'start_port': 0,
485                    'end_port': 0xffff}
486         if is_ip4:
487             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
488             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
489             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
490             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491         else:
492             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
493             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
494             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
495             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
496
497         if self.is_initiator:
498             return ([ts1], [ts2])
499         return ([ts2], [ts1])
500
501     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
502         if crypto not in CRYPTO_ALGOS:
503             raise TypeError('unsupported encryption algo %r' % crypto)
504         self.ike_crypto = crypto
505         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
506         self.ike_crypto_key_len = crypto_key_len
507
508         if integ not in AUTH_ALGOS:
509             raise TypeError('unsupported auth algo %r' % integ)
510         self.ike_integ = None if integ == 'NULL' else integ
511         self.ike_integ_alg = AUTH_ALGOS[integ]
512
513         if prf not in PRF_ALGOS:
514             raise TypeError('unsupported prf algo %r' % prf)
515         self.ike_prf = prf
516         self.ike_prf_alg = PRF_ALGOS[prf]
517         self.ike_dh = dh
518         self.ike_group = DH[self.ike_dh]
519
520     def set_esp_props(self, crypto, crypto_key_len, integ):
521         self.esp_crypto_key_len = crypto_key_len
522         if crypto not in CRYPTO_ALGOS:
523             raise TypeError('unsupported encryption algo %r' % crypto)
524         self.esp_crypto = crypto
525         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526
527         if integ not in AUTH_ALGOS:
528             raise TypeError('unsupported auth algo %r' % integ)
529         self.esp_integ = None if integ == 'NULL' else integ
530         self.esp_integ_alg = AUTH_ALGOS[integ]
531
532     def crypto_attr(self, key_len):
533         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
534             return (0x800e << 16 | key_len << 3, 12)
535         else:
536             raise Exception('unsupported attribute type')
537
538     def ike_crypto_attr(self):
539         return self.crypto_attr(self.ike_crypto_key_len)
540
541     def esp_crypto_attr(self):
542         return self.crypto_attr(self.esp_crypto_key_len)
543
544     def compute_nat_sha1(self, ip, port, rspi=None):
545         if rspi is None:
546             rspi = self.rspi
547         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
548         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549         digest.update(data)
550         return digest.finalize()
551
552
553 class IkePeer(VppTestCase):
554     """ common class for initiator and responder """
555
556     @classmethod
557     def setUpClass(cls):
558         import scapy.contrib.ikev2 as _ikev2
559         globals()['ikev2'] = _ikev2
560         super(IkePeer, cls).setUpClass()
561         cls.create_pg_interfaces(range(2))
562         for i in cls.pg_interfaces:
563             i.admin_up()
564             i.config_ip4()
565             i.resolve_arp()
566             i.config_ip6()
567             i.resolve_ndp()
568
569     @classmethod
570     def tearDownClass(cls):
571         super(IkePeer, cls).tearDownClass()
572
573     def tearDown(self):
574         super(IkePeer, self).tearDown()
575         if self.del_sa_from_responder:
576             self.initiate_del_sa_from_responder()
577         else:
578             self.initiate_del_sa_from_initiator()
579         r = self.vapi.ikev2_sa_dump()
580         self.assertEqual(len(r), 0)
581         sas = self.vapi.ipsec_sa_dump()
582         self.assertEqual(len(sas), 0)
583         self.p.remove_vpp_config()
584         self.assertIsNone(self.p.query_vpp_config())
585
586     def setUp(self):
587         super(IkePeer, self).setUp()
588         self.config_tc()
589         self.p.add_vpp_config()
590         self.assertIsNotNone(self.p.query_vpp_config())
591         if self.sa.is_initiator:
592             self.sa.generate_dh_data()
593         self.vapi.cli('ikev2 set logging level 4')
594         self.vapi.cli('event-lo clear')
595
596     def assert_counter(self, count, name, version='ip4'):
597         node_name = '/err/ikev2-%s/' % version + name
598         self.assertEqual(count, self.statistics.get_err_counter(node_name))
599
600     def create_rekey_request(self):
601         sa, first_payload = self.generate_auth_payload(is_rekey=True)
602         header = ikev2.IKEv2(
603                 init_SPI=self.sa.ispi,
604                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
605                 flags='Initiator', exch_type='CREATE_CHILD_SA')
606
607         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
608         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
609                                   self.sa.dport, self.sa.natt, self.ip6)
610
611     def create_empty_request(self):
612         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
613                              id=self.sa.new_msg_id(), flags='Initiator',
614                              exch_type='INFORMATIONAL',
615                              next_payload='Encrypted')
616
617         msg = self.encrypt_ike_msg(header, b'', None)
618         return self.create_packet(self.pg0, msg, self.sa.sport,
619                                   self.sa.dport, self.sa.natt, self.ip6)
620
621     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
622                       use_ip6=False):
623         if use_ip6:
624             src_ip = src_if.remote_ip6
625             dst_ip = src_if.local_ip6
626             ip_layer = IPv6
627         else:
628             src_ip = src_if.remote_ip4
629             dst_ip = src_if.local_ip4
630             ip_layer = IP
631         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
632                ip_layer(src=src_ip, dst=dst_ip) /
633                UDP(sport=sport, dport=dport))
634         if natt:
635             # insert non ESP marker
636             res = res / Raw(b'\x00' * 4)
637         return res / msg
638
639     def verify_udp(self, udp):
640         self.assertEqual(udp.sport, self.sa.sport)
641         self.assertEqual(udp.dport, self.sa.dport)
642
643     def get_ike_header(self, packet):
644         try:
645             ih = packet[ikev2.IKEv2]
646             ih = self.verify_and_remove_non_esp_marker(ih)
647         except IndexError as e:
648             # this is a workaround for getting IKEv2 layer as both ikev2 and
649             # ipsec register for port 4500
650             esp = packet[ESP]
651             ih = self.verify_and_remove_non_esp_marker(esp)
652         self.assertEqual(ih.version, 0x20)
653         self.assertNotIn('Version', ih.flags)
654         return ih
655
656     def verify_and_remove_non_esp_marker(self, packet):
657         if self.sa.natt:
658             # if we are in nat traversal mode check for non esp marker
659             # and remove it
660             data = raw(packet)
661             self.assertEqual(data[:4], b'\x00' * 4)
662             return ikev2.IKEv2(data[4:])
663         else:
664             return packet
665
666     def encrypt_ike_msg(self, header, plain, first_payload):
667         if self.sa.ike_crypto == 'AES-GCM-16ICV':
668             data = self.sa.ike_crypto_alg.pad(raw(plain))
669             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
670                 len(ikev2.IKEv2_payload_Encrypted())
671             tlen = plen + len(ikev2.IKEv2())
672
673             # prepare aad data
674             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
675                                                  length=plen)
676             header.length = tlen
677             res = header / sk_p
678             encr = self.sa.encrypt(raw(plain), raw(res))
679             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
680                                                  length=plen, load=encr)
681             res = header / sk_p
682         else:
683             encr = self.sa.encrypt(raw(plain))
684             trunc_len = self.sa.ike_integ_alg.trunc_len
685             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
686             tlen = plen + len(ikev2.IKEv2())
687
688             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
689                                                  length=plen, load=encr)
690             header.length = tlen
691             res = header / sk_p
692
693             integ_data = raw(res)
694             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
695                                              self.sa.my_authkey, integ_data)
696             res = res / Raw(hmac_data[:trunc_len])
697         assert(len(res) == tlen)
698         return res
699
700     def verify_udp_encap(self, ipsec_sa):
701         e = VppEnum.vl_api_ipsec_sad_flags_t
702         if self.sa.udp_encap or self.sa.natt:
703             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
704         else:
705             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706
707     def verify_ipsec_sas(self, is_rekey=False):
708         sas = self.vapi.ipsec_sa_dump()
709         if is_rekey:
710             # after rekey there is a short period of time in which old
711             # inbound SA is still present
712             sa_count = 3
713         else:
714             sa_count = 2
715         self.assertEqual(len(sas), sa_count)
716         if self.sa.is_initiator:
717             if is_rekey:
718                 sa0 = sas[0].entry
719                 sa1 = sas[2].entry
720             else:
721                 sa0 = sas[0].entry
722                 sa1 = sas[1].entry
723         else:
724             if is_rekey:
725                 sa0 = sas[2].entry
726                 sa1 = sas[0].entry
727             else:
728                 sa1 = sas[0].entry
729                 sa0 = sas[1].entry
730
731         c = self.sa.child_sas[0]
732
733         self.verify_udp_encap(sa0)
734         self.verify_udp_encap(sa1)
735         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
736         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
737         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
738
739         if self.sa.esp_integ is None:
740             vpp_integ_alg = 0
741         else:
742             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
743         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
744         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
745
746         # verify crypto keys
747         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
748         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
749         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
750         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
751
752         # verify integ keys
753         if vpp_integ_alg:
754             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
755             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
756             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
757             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
758         else:
759             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
760             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
761
762     def verify_keymat(self, api_keys, keys, name):
763         km = getattr(keys, name)
764         api_km = getattr(api_keys, name)
765         api_km_len = getattr(api_keys, name + '_len')
766         self.assertEqual(len(km), api_km_len)
767         self.assertEqual(km, api_km[:api_km_len])
768
769     def verify_id(self, api_id, exp_id):
770         self.assertEqual(api_id.type, IDType.value(exp_id.type))
771         self.assertEqual(api_id.data_len, exp_id.data_len)
772         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
773
774     def verify_ike_sas(self):
775         r = self.vapi.ikev2_sa_dump()
776         self.assertEqual(len(r), 1)
777         sa = r[0].sa
778         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
779         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
780         if self.ip6:
781             if self.sa.is_initiator:
782                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
783                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
784             else:
785                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
786                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
787         else:
788             if self.sa.is_initiator:
789                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
790                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
791             else:
792                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
793                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
794         self.verify_keymat(sa.keys, self.sa, 'sk_d')
795         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
796         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
797         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
798         self.verify_keymat(sa.keys, self.sa, 'sk_er')
799         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
800         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
801
802         self.assertEqual(sa.i_id.type, self.sa.id_type)
803         self.assertEqual(sa.r_id.type, self.sa.id_type)
804         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
805         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
806         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
807         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
808
809         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
810         self.assertEqual(len(r), 1)
811         csa = r[0].child_sa
812         self.assertEqual(csa.sa_index, sa.sa_index)
813         c = self.sa.child_sas[0]
814         if hasattr(c, 'sk_ai'):
815             self.verify_keymat(csa.keys, c, 'sk_ai')
816             self.verify_keymat(csa.keys, c, 'sk_ar')
817         self.verify_keymat(csa.keys, c, 'sk_ei')
818         self.verify_keymat(csa.keys, c, 'sk_er')
819         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
820         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
821
822         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
823         tsi = tsi[0]
824         tsr = tsr[0]
825         r = self.vapi.ikev2_traffic_selector_dump(
826                 is_initiator=True, sa_index=sa.sa_index,
827                 child_sa_index=csa.child_sa_index)
828         self.assertEqual(len(r), 1)
829         ts = r[0].ts
830         self.verify_ts(r[0].ts, tsi[0], True)
831
832         r = self.vapi.ikev2_traffic_selector_dump(
833                 is_initiator=False, sa_index=sa.sa_index,
834                 child_sa_index=csa.child_sa_index)
835         self.assertEqual(len(r), 1)
836         self.verify_ts(r[0].ts, tsr[0], False)
837
838         n = self.vapi.ikev2_nonce_get(is_initiator=True,
839                                       sa_index=sa.sa_index)
840         self.verify_nonce(n, self.sa.i_nonce)
841         n = self.vapi.ikev2_nonce_get(is_initiator=False,
842                                       sa_index=sa.sa_index)
843         self.verify_nonce(n, self.sa.r_nonce)
844
845     def verify_nonce(self, api_nonce, nonce):
846         self.assertEqual(api_nonce.data_len, len(nonce))
847         self.assertEqual(api_nonce.nonce, nonce)
848
849     def verify_ts(self, api_ts, ts, is_initiator):
850         if is_initiator:
851             self.assertTrue(api_ts.is_local)
852         else:
853             self.assertFalse(api_ts.is_local)
854
855         if self.p.ts_is_ip4:
856             self.assertEqual(api_ts.start_addr,
857                              IPv4Address(ts.starting_address_v4))
858             self.assertEqual(api_ts.end_addr,
859                              IPv4Address(ts.ending_address_v4))
860         else:
861             self.assertEqual(api_ts.start_addr,
862                              IPv6Address(ts.starting_address_v6))
863             self.assertEqual(api_ts.end_addr,
864                              IPv6Address(ts.ending_address_v6))
865         self.assertEqual(api_ts.start_port, ts.start_port)
866         self.assertEqual(api_ts.end_port, ts.end_port)
867         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
868
869
870 class TemplateInitiator(IkePeer):
871     """ initiator test template """
872
873     def initiate_del_sa_from_initiator(self):
874         ispi = int.from_bytes(self.sa.ispi, 'little')
875         self.pg0.enable_capture()
876         self.pg_start()
877         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
878         capture = self.pg0.get_capture(1)
879         ih = self.get_ike_header(capture[0])
880         self.assertNotIn('Response', ih.flags)
881         self.assertIn('Initiator', ih.flags)
882         self.assertEqual(ih.init_SPI, self.sa.ispi)
883         self.assertEqual(ih.resp_SPI, self.sa.rspi)
884         plain = self.sa.hmac_and_decrypt(ih)
885         d = ikev2.IKEv2_payload_Delete(plain)
886         self.assertEqual(d.proto, 1)  # proto=IKEv2
887         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
888                              flags='Response', exch_type='INFORMATIONAL',
889                              id=ih.id, next_payload='Encrypted')
890         resp = self.encrypt_ike_msg(header, b'', None)
891         self.send_and_assert_no_replies(self.pg0, resp)
892
893     def verify_del_sa(self, packet):
894         ih = self.get_ike_header(packet)
895         self.assertEqual(ih.id, self.sa.msg_id)
896         self.assertEqual(ih.exch_type, 37)  # exchange informational
897         self.assertIn('Response', ih.flags)
898         self.assertIn('Initiator', ih.flags)
899         plain = self.sa.hmac_and_decrypt(ih)
900         self.assertEqual(plain, b'')
901
902     def initiate_del_sa_from_responder(self):
903         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
904                              exch_type='INFORMATIONAL',
905                              id=self.sa.new_msg_id())
906         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
907         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
908         packet = self.create_packet(self.pg0, ike_msg,
909                                     self.sa.sport, self.sa.dport,
910                                     self.sa.natt, self.ip6)
911         self.pg0.add_stream(packet)
912         self.pg0.enable_capture()
913         self.pg_start()
914         capture = self.pg0.get_capture(1)
915         self.verify_del_sa(capture[0])
916
917     @staticmethod
918     def find_notify_payload(packet, notify_type):
919         n = packet[ikev2.IKEv2_payload_Notify]
920         while n is not None:
921             if n.type == notify_type:
922                 return n
923             n = n.payload
924         return None
925
926     def verify_nat_detection(self, packet):
927         if self.ip6:
928             iph = packet[IPv6]
929         else:
930             iph = packet[IP]
931         udp = packet[UDP]
932
933         # NAT_DETECTION_SOURCE_IP
934         s = self.find_notify_payload(packet, 16388)
935         self.assertIsNotNone(s)
936         src_sha = self.sa.compute_nat_sha1(
937                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
938         self.assertEqual(s.load, src_sha)
939
940         # NAT_DETECTION_DESTINATION_IP
941         s = self.find_notify_payload(packet, 16389)
942         self.assertIsNotNone(s)
943         dst_sha = self.sa.compute_nat_sha1(
944                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
945         self.assertEqual(s.load, dst_sha)
946
947     def verify_sa_init_request(self, packet):
948         udp = packet[UDP]
949         self.sa.dport = udp.sport
950         ih = packet[ikev2.IKEv2]
951         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
952         self.assertEqual(ih.exch_type, 34)  # SA_INIT
953         self.sa.ispi = ih.init_SPI
954         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
955         self.assertIn('Initiator', ih.flags)
956         self.assertNotIn('Response', ih.flags)
957         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
958         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
959
960         prop = packet[ikev2.IKEv2_payload_Proposal]
961         self.assertEqual(prop.proto, 1)  # proto = ikev2
962         self.assertEqual(prop.proposal, 1)
963         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
964         self.assertEqual(prop.trans[0].transform_id,
965                          self.p.ike_transforms['crypto_alg'])
966         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
967         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
968         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
969         self.assertEqual(prop.trans[2].transform_id,
970                          self.p.ike_transforms['dh_group'])
971
972         self.verify_nat_detection(packet)
973         self.sa.set_ike_props(
974                     crypto='AES-GCM-16ICV', crypto_key_len=32,
975                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
976         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
977                               integ='SHA2-256-128')
978         self.sa.generate_dh_data()
979         self.sa.complete_dh_data()
980         self.sa.calc_keys()
981
982     def update_esp_transforms(self, trans, sa):
983         while trans:
984             if trans.transform_type == 1:  # ecryption
985                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
986             elif trans.transform_type == 3:  # integrity
987                 sa.esp_integ = INTEG_IDS[trans.transform_id]
988             trans = trans.payload
989
990     def verify_sa_auth_req(self, packet):
991         udp = packet[UDP]
992         self.sa.dport = udp.sport
993         ih = self.get_ike_header(packet)
994         self.assertEqual(ih.resp_SPI, self.sa.rspi)
995         self.assertEqual(ih.init_SPI, self.sa.ispi)
996         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
997         self.assertIn('Initiator', ih.flags)
998         self.assertNotIn('Response', ih.flags)
999
1000         udp = packet[UDP]
1001         self.verify_udp(udp)
1002         self.assertEqual(ih.id, self.sa.msg_id + 1)
1003         self.sa.msg_id += 1
1004         plain = self.sa.hmac_and_decrypt(ih)
1005         idi = ikev2.IKEv2_payload_IDi(plain)
1006         idr = ikev2.IKEv2_payload_IDr(idi.payload)
1007         self.assertEqual(idi.load, self.sa.i_id)
1008         self.assertEqual(idr.load, self.sa.r_id)
1009         prop = idi[ikev2.IKEv2_payload_Proposal]
1010         c = self.sa.child_sas[0]
1011         c.ispi = prop.SPI
1012         self.update_esp_transforms(
1013                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1014
1015     def send_init_response(self):
1016         tr_attr = self.sa.ike_crypto_attr()
1017         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1018                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1019                  key_length=tr_attr[0]) /
1020                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1021                  transform_id=self.sa.ike_integ) /
1022                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1023                  transform_id=self.sa.ike_prf_alg.name) /
1024                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1025                  transform_id=self.sa.ike_dh))
1026         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1027                  trans_nb=4, trans=trans))
1028
1029         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1030         if self.sa.natt:
1031             dst_address = b'\x0a\x0a\x0a\x0a'
1032         else:
1033             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1034         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1035         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1036
1037         self.sa.init_resp_packet = (
1038             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1039                         exch_type='IKE_SA_INIT', flags='Response') /
1040             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1041             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1042                                    group=self.sa.ike_dh,
1043                                    load=self.sa.my_dh_pub_key) /
1044             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1045                                       next_payload='Notify') /
1046             ikev2.IKEv2_payload_Notify(
1047                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1048                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1049                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1050
1051         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1052                                      self.sa.sport, self.sa.dport,
1053                                      False, self.ip6)
1054         self.pg_send(self.pg0, ike_msg)
1055         capture = self.pg0.get_capture(1)
1056         self.verify_sa_auth_req(capture[0])
1057
1058     def initiate_sa_init(self):
1059         self.pg0.enable_capture()
1060         self.pg_start()
1061         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1062
1063         capture = self.pg0.get_capture(1)
1064         self.verify_sa_init_request(capture[0])
1065         self.send_init_response()
1066
1067     def send_auth_response(self):
1068         tr_attr = self.sa.esp_crypto_attr()
1069         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1070                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1071                  key_length=tr_attr[0]) /
1072                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1073                  transform_id=self.sa.esp_integ) /
1074                  ikev2.IKEv2_payload_Transform(
1075                  transform_type='Extended Sequence Number',
1076                  transform_id='No ESN') /
1077                  ikev2.IKEv2_payload_Transform(
1078                  transform_type='Extended Sequence Number',
1079                  transform_id='ESN'))
1080
1081         c = self.sa.child_sas[0]
1082         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1083                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1084
1085         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1086         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1087                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1088                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1089                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1090                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1091                  auth_type=AuthMethod.value(self.sa.auth_method),
1092                  load=self.sa.auth_data) /
1093                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1094                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1095                  number_of_TSs=len(tsi),
1096                  traffic_selector=tsi) /
1097                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1098                  number_of_TSs=len(tsr),
1099                  traffic_selector=tsr) /
1100                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1101
1102         header = ikev2.IKEv2(
1103                 init_SPI=self.sa.ispi,
1104                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1105                 flags='Response', exch_type='IKE_AUTH')
1106
1107         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1108         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1109                                     self.sa.dport, self.sa.natt, self.ip6)
1110         self.pg_send(self.pg0, packet)
1111
1112     def test_initiator(self):
1113         self.initiate_sa_init()
1114         self.sa.auth_init()
1115         self.sa.calc_child_keys()
1116         self.send_auth_response()
1117         self.verify_ike_sas()
1118
1119
1120 class TemplateResponder(IkePeer):
1121     """ responder test template """
1122
1123     def initiate_del_sa_from_responder(self):
1124         self.pg0.enable_capture()
1125         self.pg_start()
1126         self.vapi.ikev2_initiate_del_ike_sa(
1127                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1128         capture = self.pg0.get_capture(1)
1129         ih = self.get_ike_header(capture[0])
1130         self.assertNotIn('Response', ih.flags)
1131         self.assertNotIn('Initiator', ih.flags)
1132         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1133         plain = self.sa.hmac_and_decrypt(ih)
1134         d = ikev2.IKEv2_payload_Delete(plain)
1135         self.assertEqual(d.proto, 1)  # proto=IKEv2
1136         self.assertEqual(ih.init_SPI, self.sa.ispi)
1137         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1138         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1139                              flags='Initiator+Response',
1140                              exch_type='INFORMATIONAL',
1141                              id=ih.id, next_payload='Encrypted')
1142         resp = self.encrypt_ike_msg(header, b'', None)
1143         self.send_and_assert_no_replies(self.pg0, resp)
1144
1145     def verify_del_sa(self, packet):
1146         ih = self.get_ike_header(packet)
1147         self.assertEqual(ih.id, self.sa.msg_id)
1148         self.assertEqual(ih.exch_type, 37)  # exchange informational
1149         self.assertIn('Response', ih.flags)
1150         self.assertNotIn('Initiator', ih.flags)
1151         self.assertEqual(ih.next_payload, 46)  # Encrypted
1152         self.assertEqual(ih.init_SPI, self.sa.ispi)
1153         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1154         plain = self.sa.hmac_and_decrypt(ih)
1155         self.assertEqual(plain, b'')
1156
1157     def initiate_del_sa_from_initiator(self):
1158         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1159                              flags='Initiator', exch_type='INFORMATIONAL',
1160                              id=self.sa.new_msg_id())
1161         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1162         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1163         packet = self.create_packet(self.pg0, ike_msg,
1164                                     self.sa.sport, self.sa.dport,
1165                                     self.sa.natt, self.ip6)
1166         self.pg0.add_stream(packet)
1167         self.pg0.enable_capture()
1168         self.pg_start()
1169         capture = self.pg0.get_capture(1)
1170         self.verify_del_sa(capture[0])
1171
1172     def send_sa_init_req(self):
1173         tr_attr = self.sa.ike_crypto_attr()
1174         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1175                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1176                  key_length=tr_attr[0]) /
1177                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1178                  transform_id=self.sa.ike_integ) /
1179                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1180                  transform_id=self.sa.ike_prf_alg.name) /
1181                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1182                  transform_id=self.sa.ike_dh))
1183
1184         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1185                  trans_nb=4, trans=trans))
1186
1187         next_payload = None if self.ip6 else 'Notify'
1188
1189         self.sa.init_req_packet = (
1190                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1191                             flags='Initiator', exch_type='IKE_SA_INIT') /
1192                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1193                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1194                                        group=self.sa.ike_dh,
1195                                        load=self.sa.my_dh_pub_key) /
1196                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1197                                           load=self.sa.i_nonce))
1198
1199         if not self.ip6:
1200             if self.sa.i_natt:
1201                 src_address = b'\x0a\x0a\x0a\x01'
1202             else:
1203                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1204
1205             if self.sa.r_natt:
1206                 dst_address = b'\x0a\x0a\x0a\x0a'
1207             else:
1208                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1209
1210             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1211             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1212             nat_src_detection = ikev2.IKEv2_payload_Notify(
1213                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1214                     next_payload='Notify')
1215             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1216                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1217             self.sa.init_req_packet = (self.sa.init_req_packet /
1218                                        nat_src_detection /
1219                                        nat_dst_detection)
1220
1221         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1222                                      self.sa.sport, self.sa.dport,
1223                                      self.sa.natt, self.ip6)
1224         self.pg0.add_stream(ike_msg)
1225         self.pg0.enable_capture()
1226         self.pg_start()
1227         capture = self.pg0.get_capture(1)
1228         self.verify_sa_init(capture[0])
1229
1230     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1231         tr_attr = self.sa.esp_crypto_attr()
1232         last_payload = last_payload or 'Notify'
1233         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1234                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1235                  key_length=tr_attr[0]) /
1236                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1237                  transform_id=self.sa.esp_integ) /
1238                  ikev2.IKEv2_payload_Transform(
1239                  transform_type='Extended Sequence Number',
1240                  transform_id='No ESN') /
1241                  ikev2.IKEv2_payload_Transform(
1242                  transform_type='Extended Sequence Number',
1243                  transform_id='ESN'))
1244
1245         c = self.sa.child_sas[0]
1246         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1247                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1248
1249         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1250         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1251                  auth_type=AuthMethod.value(self.sa.auth_method),
1252                  load=self.sa.auth_data) /
1253                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1254                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1255                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1256                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1257                  number_of_TSs=len(tsr), traffic_selector=tsr))
1258
1259         if is_rekey:
1260             first_payload = 'Nonce'
1261             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1262                      next_payload='SA') / plain /
1263                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1264                      proto='ESP', SPI=c.ispi))
1265         else:
1266             first_payload = 'IDi'
1267             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1268                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1269                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1270                    IDtype=self.sa.id_type, load=self.sa.r_id))
1271             plain = ids / plain
1272         return plain, first_payload
1273
1274     def send_sa_auth(self):
1275         plain, first_payload = self.generate_auth_payload(
1276                     last_payload='Notify')
1277         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1278         header = ikev2.IKEv2(
1279                 init_SPI=self.sa.ispi,
1280                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1281                 flags='Initiator', exch_type='IKE_AUTH')
1282
1283         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1284         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1285                                     self.sa.dport, self.sa.natt, self.ip6)
1286         self.pg0.add_stream(packet)
1287         self.pg0.enable_capture()
1288         self.pg_start()
1289         capture = self.pg0.get_capture(1)
1290         self.verify_sa_auth_resp(capture[0])
1291
1292     def verify_sa_init(self, packet):
1293         ih = self.get_ike_header(packet)
1294
1295         self.assertEqual(ih.id, self.sa.msg_id)
1296         self.assertEqual(ih.exch_type, 34)
1297         self.assertIn('Response', ih.flags)
1298         self.assertEqual(ih.init_SPI, self.sa.ispi)
1299         self.assertNotEqual(ih.resp_SPI, 0)
1300         self.sa.rspi = ih.resp_SPI
1301         try:
1302             sa = ih[ikev2.IKEv2_payload_SA]
1303             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1304             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1305         except IndexError as e:
1306             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1307             self.logger.error(ih.show())
1308             raise
1309         self.sa.complete_dh_data()
1310         self.sa.calc_keys()
1311         self.sa.auth_init()
1312
1313     def verify_sa_auth_resp(self, packet):
1314         ike = self.get_ike_header(packet)
1315         udp = packet[UDP]
1316         self.verify_udp(udp)
1317         self.assertEqual(ike.id, self.sa.msg_id)
1318         plain = self.sa.hmac_and_decrypt(ike)
1319         idr = ikev2.IKEv2_payload_IDr(plain)
1320         prop = idr[ikev2.IKEv2_payload_Proposal]
1321         self.assertEqual(prop.SPIsize, 4)
1322         self.sa.child_sas[0].rspi = prop.SPI
1323         self.sa.calc_child_keys()
1324
1325     IKE_NODE_SUFFIX = 'ip4'
1326
1327     def verify_counters(self):
1328         self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1329         self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1330         self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1331
1332         r = self.vapi.ikev2_sa_dump()
1333         s = r[0].sa.stats
1334         self.assertEqual(1, s.n_sa_auth_req)
1335         self.assertEqual(1, s.n_sa_init_req)
1336
1337     def test_responder(self):
1338         self.send_sa_init_req()
1339         self.send_sa_auth()
1340         self.verify_ipsec_sas()
1341         self.verify_ike_sas()
1342         self.verify_counters()
1343
1344
1345 class Ikev2Params(object):
1346     def config_params(self, params={}):
1347         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1348         ei = VppEnum.vl_api_ipsec_integ_alg_t
1349         self.vpp_enums = {
1350                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1351                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1352                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1353                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1354                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1355                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1356
1357                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1358                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1359                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1360                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1361
1362         dpd_disabled = True if 'dpd_disabled' not in params else\
1363             params['dpd_disabled']
1364         if dpd_disabled:
1365             self.vapi.cli('ikev2 dpd disable')
1366         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1367             not in params else params['del_sa_from_responder']
1368         i_natt = False if 'i_natt' not in params else params['i_natt']
1369         r_natt = False if 'r_natt' not in params else params['r_natt']
1370         self.p = Profile(self, 'pr1')
1371         self.ip6 = False if 'ip6' not in params else params['ip6']
1372
1373         if 'auth' in params and params['auth'] == 'rsa-sig':
1374             auth_method = 'rsa-sig'
1375             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1376             self.vapi.ikev2_set_local_key(
1377                     key_file=work_dir + params['server-key'])
1378
1379             client_file = work_dir + params['client-cert']
1380             server_pem = open(work_dir + params['server-cert']).read()
1381             client_priv = open(work_dir + params['client-key']).read()
1382             client_priv = load_pem_private_key(str.encode(client_priv), None,
1383                                                default_backend())
1384             self.peer_cert = x509.load_pem_x509_certificate(
1385                     str.encode(server_pem),
1386                     default_backend())
1387             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1388             auth_data = None
1389         else:
1390             auth_data = b'$3cr3tpa$$w0rd'
1391             self.p.add_auth(method='shared-key', data=auth_data)
1392             auth_method = 'shared-key'
1393             client_priv = None
1394
1395         is_init = True if 'is_initiator' not in params else\
1396             params['is_initiator']
1397
1398         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1399         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1400         if is_init:
1401             self.p.add_local_id(**idr)
1402             self.p.add_remote_id(**idi)
1403         else:
1404             self.p.add_local_id(**idi)
1405             self.p.add_remote_id(**idr)
1406
1407         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1408             'loc_ts' not in params else params['loc_ts']
1409         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1410             'rem_ts' not in params else params['rem_ts']
1411         self.p.add_local_ts(**loc_ts)
1412         self.p.add_remote_ts(**rem_ts)
1413         if 'responder' in params:
1414             self.p.add_responder(params['responder'])
1415         if 'ike_transforms' in params:
1416             self.p.add_ike_transforms(params['ike_transforms'])
1417         if 'esp_transforms' in params:
1418             self.p.add_esp_transforms(params['esp_transforms'])
1419
1420         udp_encap = False if 'udp_encap' not in params else\
1421             params['udp_encap']
1422         if udp_encap:
1423             self.p.set_udp_encap(True)
1424
1425         if 'responder_hostname' in params:
1426             hn = params['responder_hostname']
1427             self.p.add_responder_hostname(hn)
1428
1429             # configure static dns record
1430             self.vapi.dns_name_server_add_del(
1431                 is_ip6=0, is_add=1,
1432                 server_address=IPv4Address(u'8.8.8.8').packed)
1433             self.vapi.dns_enable_disable(enable=1)
1434
1435             cmd = "dns cache add {} {}".format(hn['hostname'],
1436                                                self.pg0.remote_ip4)
1437             self.vapi.cli(cmd)
1438
1439         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1440                           is_initiator=is_init,
1441                           id_type=self.p.local_id['id_type'],
1442                           i_natt=i_natt, r_natt=r_natt,
1443                           priv_key=client_priv, auth_method=auth_method,
1444                           auth_data=auth_data, udp_encap=udp_encap,
1445                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1446         if is_init:
1447             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1448                 params['ike-crypto']
1449             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1450                 params['ike-integ']
1451             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1452                 params['ike-dh']
1453
1454             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1455                 params['esp-crypto']
1456             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1457                 params['esp-integ']
1458
1459             self.sa.set_ike_props(
1460                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1461                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1462             self.sa.set_esp_props(
1463                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1464                     integ=esp_integ)
1465
1466
1467 class TestApi(VppTestCase):
1468     """ Test IKEV2 API """
1469     @classmethod
1470     def setUpClass(cls):
1471         super(TestApi, cls).setUpClass()
1472
1473     @classmethod
1474     def tearDownClass(cls):
1475         super(TestApi, cls).tearDownClass()
1476
1477     def tearDown(self):
1478         super(TestApi, self).tearDown()
1479         self.p1.remove_vpp_config()
1480         self.p2.remove_vpp_config()
1481         r = self.vapi.ikev2_profile_dump()
1482         self.assertEqual(len(r), 0)
1483
1484     def configure_profile(self, cfg):
1485         p = Profile(self, cfg['name'])
1486         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1487         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1488         p.add_local_ts(**cfg['loc_ts'])
1489         p.add_remote_ts(**cfg['rem_ts'])
1490         p.add_responder(cfg['responder'])
1491         p.add_ike_transforms(cfg['ike_ts'])
1492         p.add_esp_transforms(cfg['esp_ts'])
1493         p.add_auth(**cfg['auth'])
1494         p.set_udp_encap(cfg['udp_encap'])
1495         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1496         if 'lifetime_data' in cfg:
1497             p.set_lifetime_data(cfg['lifetime_data'])
1498         if 'tun_itf' in cfg:
1499             p.set_tunnel_interface(cfg['tun_itf'])
1500         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1501             p.disable_natt()
1502         p.add_vpp_config()
1503         return p
1504
1505     def test_profile_api(self):
1506         """ test profile dump API """
1507         loc_ts4 = {
1508                     'proto': 8,
1509                     'start_port': 1,
1510                     'end_port': 19,
1511                     'start_addr': '3.3.3.2',
1512                     'end_addr': '3.3.3.3',
1513                 }
1514         rem_ts4 = {
1515                     'proto': 9,
1516                     'start_port': 10,
1517                     'end_port': 119,
1518                     'start_addr': '4.5.76.80',
1519                     'end_addr': '2.3.4.6',
1520                 }
1521
1522         loc_ts6 = {
1523                     'proto': 8,
1524                     'start_port': 1,
1525                     'end_port': 19,
1526                     'start_addr': 'ab::1',
1527                     'end_addr': 'ab::4',
1528                 }
1529         rem_ts6 = {
1530                     'proto': 9,
1531                     'start_port': 10,
1532                     'end_port': 119,
1533                     'start_addr': 'cd::12',
1534                     'end_addr': 'cd::13',
1535                 }
1536
1537         conf = {
1538             'p1': {
1539                 'name': 'p1',
1540                 'natt_disabled': True,
1541                 'loc_id': ('fqdn', b'vpp.home'),
1542                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1543                 'loc_ts': loc_ts4,
1544                 'rem_ts': rem_ts4,
1545                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1546                 'ike_ts': {
1547                         'crypto_alg': 20,
1548                         'crypto_key_size': 32,
1549                         'integ_alg': 1,
1550                         'dh_group': 1},
1551                 'esp_ts': {
1552                         'crypto_alg': 13,
1553                         'crypto_key_size': 24,
1554                         'integ_alg': 2},
1555                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1556                 'udp_encap': True,
1557                 'ipsec_over_udp_port': 4501,
1558                 'lifetime_data': {
1559                     'lifetime': 123,
1560                     'lifetime_maxdata': 20192,
1561                     'lifetime_jitter': 9,
1562                     'handover': 132},
1563             },
1564             'p2': {
1565                 'name': 'p2',
1566                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1567                 'rem_id': ('ip6-addr', b'abcd::1'),
1568                 'loc_ts': loc_ts6,
1569                 'rem_ts': rem_ts6,
1570                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1571                 'ike_ts': {
1572                         'crypto_alg': 12,
1573                         'crypto_key_size': 16,
1574                         'integ_alg': 3,
1575                         'dh_group': 3},
1576                 'esp_ts': {
1577                         'crypto_alg': 9,
1578                         'crypto_key_size': 24,
1579                         'integ_alg': 4},
1580                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1581                 'udp_encap': False,
1582                 'ipsec_over_udp_port': 4600,
1583                 'tun_itf': 0}
1584         }
1585         self.p1 = self.configure_profile(conf['p1'])
1586         self.p2 = self.configure_profile(conf['p2'])
1587
1588         r = self.vapi.ikev2_profile_dump()
1589         self.assertEqual(len(r), 2)
1590         self.verify_profile(r[0].profile, conf['p1'])
1591         self.verify_profile(r[1].profile, conf['p2'])
1592
1593     def verify_id(self, api_id, cfg_id):
1594         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1595         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1596
1597     def verify_ts(self, api_ts, cfg_ts):
1598         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1599         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1600         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1601         self.assertEqual(api_ts.start_addr,
1602                          ip_address(text_type(cfg_ts['start_addr'])))
1603         self.assertEqual(api_ts.end_addr,
1604                          ip_address(text_type(cfg_ts['end_addr'])))
1605
1606     def verify_responder(self, api_r, cfg_r):
1607         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1608         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1609
1610     def verify_transforms(self, api_ts, cfg_ts):
1611         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1612         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1613         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1614
1615     def verify_ike_transforms(self, api_ts, cfg_ts):
1616         self.verify_transforms(api_ts, cfg_ts)
1617         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1618
1619     def verify_esp_transforms(self, api_ts, cfg_ts):
1620         self.verify_transforms(api_ts, cfg_ts)
1621
1622     def verify_auth(self, api_auth, cfg_auth):
1623         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1624         self.assertEqual(api_auth.data, cfg_auth['data'])
1625         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1626
1627     def verify_lifetime_data(self, p, ld):
1628         self.assertEqual(p.lifetime, ld['lifetime'])
1629         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1630         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1631         self.assertEqual(p.handover, ld['handover'])
1632
1633     def verify_profile(self, ap, cp):
1634         self.assertEqual(ap.name, cp['name'])
1635         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1636         self.verify_id(ap.loc_id, cp['loc_id'])
1637         self.verify_id(ap.rem_id, cp['rem_id'])
1638         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1639         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1640         self.verify_responder(ap.responder, cp['responder'])
1641         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1642         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1643         self.verify_auth(ap.auth, cp['auth'])
1644         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1645         self.assertTrue(natt_dis == ap.natt_disabled)
1646
1647         if 'lifetime_data' in cp:
1648             self.verify_lifetime_data(ap, cp['lifetime_data'])
1649         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1650         if 'tun_itf' in cp:
1651             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1652         else:
1653             self.assertEqual(ap.tun_itf, 0xffffffff)
1654
1655
1656 @tag_fixme_vpp_workers
1657 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1658     """ test responder - responder behind NAT """
1659
1660     IKE_NODE_SUFFIX = 'ip4-natt'
1661
1662     def config_tc(self):
1663         self.config_params({'r_natt': True})
1664
1665
1666 @tag_fixme_vpp_workers
1667 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1668     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1669
1670     def config_tc(self):
1671         self.config_params({
1672             'i_natt': True,
1673             'is_initiator': False,  # seen from test case perspective
1674                                     # thus vpp is initiator
1675             'responder': {'sw_if_index': self.pg0.sw_if_index,
1676                            'addr': self.pg0.remote_ip4},
1677             'ike-crypto': ('AES-GCM-16ICV', 32),
1678             'ike-integ': 'NULL',
1679             'ike-dh': '3072MODPgr',
1680             'ike_transforms': {
1681                 'crypto_alg': 20,  # "aes-gcm-16"
1682                 'crypto_key_size': 256,
1683                 'dh_group': 15,  # "modp-3072"
1684             },
1685             'esp_transforms': {
1686                 'crypto_alg': 12,  # "aes-cbc"
1687                 'crypto_key_size': 256,
1688                 # "hmac-sha2-256-128"
1689                 'integ_alg': 12}})
1690
1691
1692 @tag_fixme_vpp_workers
1693 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1694     """ test ikev2 initiator - pre shared key auth """
1695
1696     def config_tc(self):
1697         self.config_params({
1698             'is_initiator': False,  # seen from test case perspective
1699                                     # thus vpp is initiator
1700             'ike-crypto': ('AES-GCM-16ICV', 32),
1701             'ike-integ': 'NULL',
1702             'ike-dh': '3072MODPgr',
1703             'ike_transforms': {
1704                 'crypto_alg': 20,  # "aes-gcm-16"
1705                 'crypto_key_size': 256,
1706                 'dh_group': 15,  # "modp-3072"
1707             },
1708             'esp_transforms': {
1709                 'crypto_alg': 12,  # "aes-cbc"
1710                 'crypto_key_size': 256,
1711                 # "hmac-sha2-256-128"
1712                 'integ_alg': 12},
1713             'responder_hostname': {'hostname': 'vpp.responder.org',
1714                                    'sw_if_index': self.pg0.sw_if_index}})
1715
1716
1717 @tag_fixme_vpp_workers
1718 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1719     """ test initiator - request window size (1) """
1720
1721     def rekey_respond(self, req, update_child_sa_data):
1722         ih = self.get_ike_header(req)
1723         plain = self.sa.hmac_and_decrypt(ih)
1724         sa = ikev2.IKEv2_payload_SA(plain)
1725         if update_child_sa_data:
1726             prop = sa[ikev2.IKEv2_payload_Proposal]
1727             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1728             self.sa.r_nonce = self.sa.i_nonce
1729             self.sa.child_sas[0].ispi = prop.SPI
1730             self.sa.child_sas[0].rspi = prop.SPI
1731             self.sa.calc_child_keys()
1732
1733         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1734                              flags='Response', exch_type=36,
1735                              id=ih.id, next_payload='Encrypted')
1736         resp = self.encrypt_ike_msg(header, sa, 'SA')
1737         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1738                                     self.sa.dport, self.sa.natt, self.ip6)
1739         self.send_and_assert_no_replies(self.pg0, packet)
1740
1741     def test_initiator(self):
1742         super(TestInitiatorRequestWindowSize, self).test_initiator()
1743         self.pg0.enable_capture()
1744         self.pg_start()
1745         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1746         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1747         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1748         capture = self.pg0.get_capture(2)
1749
1750         # reply in reverse order
1751         self.rekey_respond(capture[1], True)
1752         self.rekey_respond(capture[0], False)
1753
1754         # verify that only the second request was accepted
1755         self.verify_ike_sas()
1756         self.verify_ipsec_sas(is_rekey=True)
1757
1758
1759 @tag_fixme_vpp_workers
1760 class TestInitiatorRekey(TestInitiatorPsk):
1761     """ test ikev2 initiator - rekey """
1762
1763     def rekey_from_initiator(self):
1764         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1765         self.pg0.enable_capture()
1766         self.pg_start()
1767         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1768         capture = self.pg0.get_capture(1)
1769         ih = self.get_ike_header(capture[0])
1770         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1771         self.assertNotIn('Response', ih.flags)
1772         self.assertIn('Initiator', ih.flags)
1773         plain = self.sa.hmac_and_decrypt(ih)
1774         sa = ikev2.IKEv2_payload_SA(plain)
1775         prop = sa[ikev2.IKEv2_payload_Proposal]
1776         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1777         self.sa.r_nonce = self.sa.i_nonce
1778         # update new responder SPI
1779         self.sa.child_sas[0].ispi = prop.SPI
1780         self.sa.child_sas[0].rspi = prop.SPI
1781         self.sa.calc_child_keys()
1782         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1783                              flags='Response', exch_type=36,
1784                              id=ih.id, next_payload='Encrypted')
1785         resp = self.encrypt_ike_msg(header, sa, 'SA')
1786         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1787                                     self.sa.dport, self.sa.natt, self.ip6)
1788         self.send_and_assert_no_replies(self.pg0, packet)
1789
1790     def test_initiator(self):
1791         super(TestInitiatorRekey, self).test_initiator()
1792         self.rekey_from_initiator()
1793         self.verify_ike_sas()
1794         self.verify_ipsec_sas(is_rekey=True)
1795
1796
1797 @tag_fixme_vpp_workers
1798 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1799     """ test ikev2 initiator - delete IKE SA from responder """
1800
1801     def config_tc(self):
1802         self.config_params({
1803             'del_sa_from_responder': True,
1804             'is_initiator': False,  # seen from test case perspective
1805                                     # thus vpp is initiator
1806             'responder': {'sw_if_index': self.pg0.sw_if_index,
1807                            'addr': self.pg0.remote_ip4},
1808             'ike-crypto': ('AES-GCM-16ICV', 32),
1809             'ike-integ': 'NULL',
1810             'ike-dh': '3072MODPgr',
1811             'ike_transforms': {
1812                 'crypto_alg': 20,  # "aes-gcm-16"
1813                 'crypto_key_size': 256,
1814                 'dh_group': 15,  # "modp-3072"
1815             },
1816             'esp_transforms': {
1817                 'crypto_alg': 12,  # "aes-cbc"
1818                 'crypto_key_size': 256,
1819                 # "hmac-sha2-256-128"
1820                 'integ_alg': 12}})
1821
1822
1823 @tag_fixme_vpp_workers
1824 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1825     """ test ikev2 responder - initiator behind NAT """
1826
1827     IKE_NODE_SUFFIX = 'ip4-natt'
1828
1829     def config_tc(self):
1830         self.config_params(
1831                 {'i_natt': True})
1832
1833
1834 @tag_fixme_vpp_workers
1835 class TestResponderPsk(TemplateResponder, Ikev2Params):
1836     """ test ikev2 responder - pre shared key auth """
1837     def config_tc(self):
1838         self.config_params()
1839
1840
1841 @tag_fixme_vpp_workers
1842 class TestResponderDpd(TestResponderPsk):
1843     """
1844     Dead peer detection test
1845     """
1846     def config_tc(self):
1847         self.config_params({'dpd_disabled': False})
1848
1849     def tearDown(self):
1850         pass
1851
1852     def test_responder(self):
1853         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1854         super(TestResponderDpd, self).test_responder()
1855         self.pg0.enable_capture()
1856         self.pg_start()
1857         # capture empty request but don't reply
1858         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1859         ih = self.get_ike_header(capture[0])
1860         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1861         plain = self.sa.hmac_and_decrypt(ih)
1862         self.assertEqual(plain, b'')
1863         # wait for SA expiration
1864         time.sleep(3)
1865         ike_sas = self.vapi.ikev2_sa_dump()
1866         self.assertEqual(len(ike_sas), 0)
1867         ipsec_sas = self.vapi.ipsec_sa_dump()
1868         self.assertEqual(len(ipsec_sas), 0)
1869
1870
1871 @tag_fixme_vpp_workers
1872 class TestResponderRekey(TestResponderPsk):
1873     """ test ikev2 responder - rekey """
1874
1875     def rekey_from_initiator(self):
1876         packet = self.create_rekey_request()
1877         self.pg0.add_stream(packet)
1878         self.pg0.enable_capture()
1879         self.pg_start()
1880         capture = self.pg0.get_capture(1)
1881         ih = self.get_ike_header(capture[0])
1882         plain = self.sa.hmac_and_decrypt(ih)
1883         sa = ikev2.IKEv2_payload_SA(plain)
1884         prop = sa[ikev2.IKEv2_payload_Proposal]
1885         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1886         # update new responder SPI
1887         self.sa.child_sas[0].rspi = prop.SPI
1888
1889     def test_responder(self):
1890         super(TestResponderRekey, self).test_responder()
1891         self.rekey_from_initiator()
1892         self.sa.calc_child_keys()
1893         self.verify_ike_sas()
1894         self.verify_ipsec_sas(is_rekey=True)
1895         self.assert_counter(1, 'rekey_req', 'ip4')
1896         r = self.vapi.ikev2_sa_dump()
1897         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1898
1899
1900 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1901     """ test ikev2 responder - non-default table id """
1902
1903     @classmethod
1904     def setUpClass(cls):
1905         import scapy.contrib.ikev2 as _ikev2
1906         globals()['ikev2'] = _ikev2
1907         super(IkePeer, cls).setUpClass()
1908         cls.create_pg_interfaces(range(1))
1909         cls.vapi.cli("ip table add 1")
1910         cls.vapi.cli("set interface ip table pg0 1")
1911         for i in cls.pg_interfaces:
1912             i.admin_up()
1913             i.config_ip4()
1914             i.resolve_arp()
1915             i.config_ip6()
1916             i.resolve_ndp()
1917
1918     def config_tc(self):
1919         self.config_params({'dpd_disabled': False})
1920
1921     def test_responder(self):
1922         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1923         super(TestResponderVrf, self).test_responder()
1924         self.pg0.enable_capture()
1925         self.pg_start()
1926         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1927         ih = self.get_ike_header(capture[0])
1928         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1929         plain = self.sa.hmac_and_decrypt(ih)
1930         self.assertEqual(plain, b'')
1931
1932
1933 @tag_fixme_vpp_workers
1934 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1935     """ test ikev2 responder - cert based auth """
1936     def config_tc(self):
1937         self.config_params({
1938             'udp_encap': True,
1939             'auth': 'rsa-sig',
1940             'server-key': 'server-key.pem',
1941             'client-key': 'client-key.pem',
1942             'client-cert': 'client-cert.pem',
1943             'server-cert': 'server-cert.pem'})
1944
1945
1946 @tag_fixme_vpp_workers
1947 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1948         (TemplateResponder, Ikev2Params):
1949     """
1950     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1951     """
1952     def config_tc(self):
1953         self.config_params({
1954             'ike-crypto': ('AES-CBC', 16),
1955             'ike-integ': 'SHA2-256-128',
1956             'esp-crypto': ('AES-CBC', 24),
1957             'esp-integ': 'SHA2-384-192',
1958             'ike-dh': '2048MODPgr'})
1959
1960
1961 @tag_fixme_vpp_workers
1962 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1963         (TemplateResponder, Ikev2Params):
1964
1965     """
1966     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1967     """
1968     def config_tc(self):
1969         self.config_params({
1970             'ike-crypto': ('AES-CBC', 32),
1971             'ike-integ': 'SHA2-256-128',
1972             'esp-crypto': ('AES-GCM-16ICV', 32),
1973             'esp-integ': 'NULL',
1974             'ike-dh': '3072MODPgr'})
1975
1976
1977 @tag_fixme_vpp_workers
1978 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1979     """
1980     IKE:AES_GCM_16_256
1981     """
1982
1983     IKE_NODE_SUFFIX = 'ip6'
1984
1985     def config_tc(self):
1986         self.config_params({
1987             'del_sa_from_responder': True,
1988             'ip6': True,
1989             'natt': True,
1990             'ike-crypto': ('AES-GCM-16ICV', 32),
1991             'ike-integ': 'NULL',
1992             'ike-dh': '2048MODPgr',
1993             'loc_ts': {'start_addr': 'ab:cd::0',
1994                        'end_addr': 'ab:cd::10'},
1995             'rem_ts': {'start_addr': '11::0',
1996                        'end_addr': '11::100'}})
1997
1998
1999 @tag_fixme_vpp_workers
2000 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2001     """
2002     Test for keep alive messages
2003     """
2004
2005     def send_empty_req_from_responder(self):
2006         packet = self.create_empty_request()
2007         self.pg0.add_stream(packet)
2008         self.pg0.enable_capture()
2009         self.pg_start()
2010         capture = self.pg0.get_capture(1)
2011         ih = self.get_ike_header(capture[0])
2012         self.assertEqual(ih.id, self.sa.msg_id)
2013         plain = self.sa.hmac_and_decrypt(ih)
2014         self.assertEqual(plain, b'')
2015         self.assert_counter(1, 'keepalive', 'ip4')
2016         r = self.vapi.ikev2_sa_dump()
2017         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2018
2019     def test_initiator(self):
2020         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2021         self.send_empty_req_from_responder()
2022
2023
2024 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2025     """ malformed packet test """
2026
2027     def tearDown(self):
2028         pass
2029
2030     def config_tc(self):
2031         self.config_params()
2032
2033     def create_ike_init_msg(self, length=None, payload=None):
2034         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2035                           flags='Initiator', exch_type='IKE_SA_INIT')
2036         if payload is not None:
2037             msg /= payload
2038         return self.create_packet(self.pg0, msg, self.sa.sport,
2039                                   self.sa.dport)
2040
2041     def verify_bad_packet_length(self):
2042         ike_msg = self.create_ike_init_msg(length=0xdead)
2043         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2044         self.assert_counter(self.pkt_count, 'bad_length')
2045
2046     def verify_bad_sa_payload_length(self):
2047         p = ikev2.IKEv2_payload_SA(length=0xdead)
2048         ike_msg = self.create_ike_init_msg(payload=p)
2049         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2050         self.assert_counter(self.pkt_count, 'malformed_packet')
2051
2052     def test_responder(self):
2053         self.pkt_count = 254
2054         self.verify_bad_packet_length()
2055         self.verify_bad_sa_payload_length()
2056
2057
2058 if __name__ == '__main__':
2059     unittest.main(testRunner=VppTestRunner)