4f41ad272baac44c08c4b9c0aa67a2d828e5767d
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from scapy.layers.ipsec import ESP
17 from scapy.layers.inet import IP, UDP, Ether
18 from scapy.layers.inet6 import IPv6
19 from scapy.packet import raw, Raw
20 from scapy.utils import long_converter
21 from framework import tag_fixme_vpp_workers
22 from framework import VppTestCase, VppTestRunner
23 from vpp_ikev2 import Profile, IDType, AuthMethod
24 from vpp_papi import VppEnum
25
26 try:
27     text_type = unicode
28 except NameError:
29     text_type = str
30
31 KEY_PAD = b"Key Pad for IKEv2"
32 SALT_SIZE = 4
33 GCM_ICV_SIZE = 16
34 GCM_IV_SIZE = 8
35
36
37 # defined in rfc3526
38 # tuple structure is (p, g, key_len)
39 DH = {
40     '2048MODPgr': (long_converter("""
41     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
51     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52
53     '3072MODPgr': (long_converter("""
54     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
55     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
56     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
57     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
58     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
59     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
60     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
61     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
62     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
63     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
64     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
65     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
66     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
67     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
68     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
69     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
70 }
71
72
73 class CryptoAlgo(object):
74     def __init__(self, name, cipher, mode):
75         self.name = name
76         self.cipher = cipher
77         self.mode = mode
78         if self.cipher is not None:
79             self.bs = self.cipher.block_size // 8
80
81             if self.name == 'AES-GCM-16ICV':
82                 self.iv_len = GCM_IV_SIZE
83             else:
84                 self.iv_len = self.bs
85
86     def encrypt(self, data, key, aad=None):
87         iv = os.urandom(self.iv_len)
88         if aad is None:
89             encryptor = Cipher(self.cipher(key), self.mode(iv),
90                                default_backend()).encryptor()
91             return iv + encryptor.update(data) + encryptor.finalize()
92         else:
93             salt = key[-SALT_SIZE:]
94             nonce = salt + iv
95             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
96                                default_backend()).encryptor()
97             encryptor.authenticate_additional_data(aad)
98             data = encryptor.update(data) + encryptor.finalize()
99             data += encryptor.tag[:GCM_ICV_SIZE]
100             return iv + data
101
102     def decrypt(self, data, key, aad=None, icv=None):
103         if aad is None:
104             iv = data[:self.iv_len]
105             ct = data[self.iv_len:]
106             decryptor = Cipher(algorithms.AES(key),
107                                self.mode(iv),
108                                default_backend()).decryptor()
109             return decryptor.update(ct) + decryptor.finalize()
110         else:
111             salt = key[-SALT_SIZE:]
112             nonce = salt + data[:GCM_IV_SIZE]
113             ct = data[GCM_IV_SIZE:]
114             key = key[:-SALT_SIZE]
115             decryptor = Cipher(algorithms.AES(key),
116                                self.mode(nonce, icv, len(icv)),
117                                default_backend()).decryptor()
118             decryptor.authenticate_additional_data(aad)
119             return decryptor.update(ct) + decryptor.finalize()
120
121     def pad(self, data):
122         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
123         data = data + b'\x00' * (pad_len - 1)
124         return data + bytes([pad_len - 1])
125
126
127 class AuthAlgo(object):
128     def __init__(self, name, mac, mod, key_len, trunc_len=None):
129         self.name = name
130         self.mac = mac
131         self.mod = mod
132         self.key_len = key_len
133         self.trunc_len = trunc_len or key_len
134
135
136 CRYPTO_ALGOS = {
137     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
138     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
139     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
140                                 mode=modes.GCM),
141 }
142
143 AUTH_ALGOS = {
144     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
145     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
146     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
147     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
148     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
149 }
150
151 PRF_ALGOS = {
152     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
153     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
154                                   hashes.SHA256, 32),
155 }
156
157 CRYPTO_IDS = {
158     12: 'AES-CBC',
159     20: 'AES-GCM-16ICV',
160 }
161
162 INTEG_IDS = {
163     2: 'HMAC-SHA1-96',
164     12: 'SHA2-256-128',
165     13: 'SHA2-384-192',
166     14: 'SHA2-512-256',
167 }
168
169
170 class IKEv2ChildSA(object):
171     def __init__(self, local_ts, remote_ts, is_initiator):
172         spi = os.urandom(4)
173         if is_initiator:
174             self.ispi = spi
175             self.rspi = None
176         else:
177             self.rspi = spi
178             self.ispi = None
179         self.local_ts = local_ts
180         self.remote_ts = remote_ts
181
182
183 class IKEv2SA(object):
184     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
185                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
186                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
187                  auth_method='shared-key', priv_key=None, i_natt=False,
188                  r_natt=False, udp_encap=False):
189         self.udp_encap = udp_encap
190         self.i_natt = i_natt
191         self.r_natt = r_natt
192         if i_natt or r_natt:
193             self.sport = 4500
194             self.dport = 4500
195         else:
196             self.sport = 500
197             self.dport = 500
198         self.msg_id = 0
199         self.dh_params = None
200         self.test = test
201         self.priv_key = priv_key
202         self.is_initiator = is_initiator
203         nonce = nonce or os.urandom(32)
204         self.auth_data = auth_data
205         self.i_id = i_id
206         self.r_id = r_id
207         if isinstance(id_type, str):
208             self.id_type = IDType.value(id_type)
209         else:
210             self.id_type = id_type
211         self.auth_method = auth_method
212         if self.is_initiator:
213             self.rspi = 8 * b'\x00'
214             self.ispi = spi
215             self.i_nonce = nonce
216         else:
217             self.rspi = spi
218             self.ispi = 8 * b'\x00'
219             self.r_nonce = nonce
220         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
221                           self.is_initiator)]
222
223     def new_msg_id(self):
224         self.msg_id += 1
225         return self.msg_id
226
227     @property
228     def my_dh_pub_key(self):
229         if self.is_initiator:
230             return self.i_dh_data
231         return self.r_dh_data
232
233     @property
234     def peer_dh_pub_key(self):
235         if self.is_initiator:
236             return self.r_dh_data
237         return self.i_dh_data
238
239     @property
240     def natt(self):
241         return self.i_natt or self.r_natt
242
243     def compute_secret(self):
244         priv = self.dh_private_key
245         peer = self.peer_dh_pub_key
246         p, g, l = self.ike_group
247         return pow(int.from_bytes(peer, 'big'),
248                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249
250     def generate_dh_data(self):
251         # generate DH keys
252         if self.ike_dh not in DH:
253             raise NotImplementedError('%s not in DH group' % self.ike_dh)
254
255         if self.dh_params is None:
256             dhg = DH[self.ike_dh]
257             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
258             self.dh_params = pn.parameters(default_backend())
259
260         priv = self.dh_params.generate_private_key()
261         pub = priv.public_key()
262         x = priv.private_numbers().x
263         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
264         y = pub.public_numbers().y
265
266         if self.is_initiator:
267             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
268         else:
269             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
270
271     def complete_dh_data(self):
272         self.dh_shared_secret = self.compute_secret()
273
274     def calc_child_keys(self):
275         prf = self.ike_prf_alg.mod()
276         s = self.i_nonce + self.r_nonce
277         c = self.child_sas[0]
278
279         encr_key_len = self.esp_crypto_key_len
280         integ_key_len = self.esp_integ_alg.key_len
281         salt_len = 0 if integ_key_len else 4
282
283         l = (integ_key_len * 2 +
284              encr_key_len * 2 +
285              salt_len * 2)
286         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
287
288         pos = 0
289         c.sk_ei = keymat[pos:pos+encr_key_len]
290         pos += encr_key_len
291
292         if integ_key_len:
293             c.sk_ai = keymat[pos:pos+integ_key_len]
294             pos += integ_key_len
295         else:
296             c.salt_ei = keymat[pos:pos+salt_len]
297             pos += salt_len
298
299         c.sk_er = keymat[pos:pos+encr_key_len]
300         pos += encr_key_len
301
302         if integ_key_len:
303             c.sk_ar = keymat[pos:pos+integ_key_len]
304             pos += integ_key_len
305         else:
306             c.salt_er = keymat[pos:pos+salt_len]
307             pos += salt_len
308
309     def calc_prfplus(self, prf, key, seed, length):
310         r = b''
311         t = None
312         x = 1
313         while len(r) < length and x < 255:
314             if t is not None:
315                 s = t
316             else:
317                 s = b''
318             s = s + seed + bytes([x])
319             t = self.calc_prf(prf, key, s)
320             r = r + t
321             x = x + 1
322
323         if x == 255:
324             return None
325         return r
326
327     def calc_prf(self, prf, key, data):
328         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
329         h.update(data)
330         return h.finalize()
331
332     def calc_keys(self):
333         prf = self.ike_prf_alg.mod()
334         # SKEYSEED = prf(Ni | Nr, g^ir)
335         s = self.i_nonce + self.r_nonce
336         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337
338         # calculate S = Ni | Nr | SPIi SPIr
339         s = s + self.ispi + self.rspi
340
341         prf_key_trunc = self.ike_prf_alg.trunc_len
342         encr_key_len = self.ike_crypto_key_len
343         tr_prf_key_len = self.ike_prf_alg.key_len
344         integ_key_len = self.ike_integ_alg.key_len
345         if integ_key_len == 0:
346             salt_size = 4
347         else:
348             salt_size = 0
349
350         l = (prf_key_trunc +
351              integ_key_len * 2 +
352              encr_key_len * 2 +
353              tr_prf_key_len * 2 +
354              salt_size * 2)
355         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
356
357         pos = 0
358         self.sk_d = keymat[:pos+prf_key_trunc]
359         pos += prf_key_trunc
360
361         self.sk_ai = keymat[pos:pos+integ_key_len]
362         pos += integ_key_len
363         self.sk_ar = keymat[pos:pos+integ_key_len]
364         pos += integ_key_len
365
366         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
367         pos += encr_key_len + salt_size
368         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
369         pos += encr_key_len + salt_size
370
371         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
372         pos += tr_prf_key_len
373         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374
375     def generate_authmsg(self, prf, packet):
376         if self.is_initiator:
377             id = self.i_id
378             nonce = self.r_nonce
379             key = self.sk_pi
380         else:
381             id = self.r_id
382             nonce = self.i_nonce
383             key = self.sk_pr
384         data = bytes([self.id_type, 0, 0, 0]) + id
385         id_hash = self.calc_prf(prf, key, data)
386         return packet + nonce + id_hash
387
388     def auth_init(self):
389         prf = self.ike_prf_alg.mod()
390         if self.is_initiator:
391             packet = self.init_req_packet
392         else:
393             packet = self.init_resp_packet
394         authmsg = self.generate_authmsg(prf, raw(packet))
395         if self.auth_method == 'shared-key':
396             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
397             self.auth_data = self.calc_prf(prf, psk, authmsg)
398         elif self.auth_method == 'rsa-sig':
399             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
400                                                 hashes.SHA1())
401         else:
402             raise TypeError('unknown auth method type!')
403
404     def encrypt(self, data, aad=None):
405         data = self.ike_crypto_alg.pad(data)
406         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
407
408     @property
409     def peer_authkey(self):
410         if self.is_initiator:
411             return self.sk_ar
412         return self.sk_ai
413
414     @property
415     def my_authkey(self):
416         if self.is_initiator:
417             return self.sk_ai
418         return self.sk_ar
419
420     @property
421     def my_cryptokey(self):
422         if self.is_initiator:
423             return self.sk_ei
424         return self.sk_er
425
426     @property
427     def peer_cryptokey(self):
428         if self.is_initiator:
429             return self.sk_er
430         return self.sk_ei
431
432     def concat(self, alg, key_len):
433         return alg + '-' + str(key_len * 8)
434
435     @property
436     def vpp_ike_cypto_alg(self):
437         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
438
439     @property
440     def vpp_esp_cypto_alg(self):
441         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442
443     def verify_hmac(self, ikemsg):
444         integ_trunc = self.ike_integ_alg.trunc_len
445         exp_hmac = ikemsg[-integ_trunc:]
446         data = ikemsg[:-integ_trunc]
447         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
448                                           self.peer_authkey, data)
449         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450
451     def compute_hmac(self, integ, key, data):
452         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
453         h.update(data)
454         return h.finalize()
455
456     def decrypt(self, data, aad=None, icv=None):
457         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
458
459     def hmac_and_decrypt(self, ike):
460         ep = ike[ikev2.IKEv2_payload_Encrypted]
461         if self.ike_crypto == 'AES-GCM-16ICV':
462             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
463             ct = ep.load[:-GCM_ICV_SIZE]
464             tag = ep.load[-GCM_ICV_SIZE:]
465             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
466         else:
467             self.verify_hmac(raw(ike))
468             integ_trunc = self.ike_integ_alg.trunc_len
469
470             # remove ICV and decrypt payload
471             ct = ep.load[:-integ_trunc]
472             plain = self.decrypt(ct)
473         # remove padding
474         pad_len = plain[-1]
475         return plain[:-pad_len - 1]
476
477     def build_ts_addr(self, ts, version):
478         return {'starting_address_v' + version: ts['start_addr'],
479                 'ending_address_v' + version: ts['end_addr']}
480
481     def generate_ts(self, is_ip4):
482         c = self.child_sas[0]
483         ts_data = {'IP_protocol_ID': 0,
484                    'start_port': 0,
485                    'end_port': 0xffff}
486         if is_ip4:
487             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
488             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
489             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
490             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491         else:
492             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
493             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
494             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
495             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
496
497         if self.is_initiator:
498             return ([ts1], [ts2])
499         return ([ts2], [ts1])
500
501     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
502         if crypto not in CRYPTO_ALGOS:
503             raise TypeError('unsupported encryption algo %r' % crypto)
504         self.ike_crypto = crypto
505         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
506         self.ike_crypto_key_len = crypto_key_len
507
508         if integ not in AUTH_ALGOS:
509             raise TypeError('unsupported auth algo %r' % integ)
510         self.ike_integ = None if integ == 'NULL' else integ
511         self.ike_integ_alg = AUTH_ALGOS[integ]
512
513         if prf not in PRF_ALGOS:
514             raise TypeError('unsupported prf algo %r' % prf)
515         self.ike_prf = prf
516         self.ike_prf_alg = PRF_ALGOS[prf]
517         self.ike_dh = dh
518         self.ike_group = DH[self.ike_dh]
519
520     def set_esp_props(self, crypto, crypto_key_len, integ):
521         self.esp_crypto_key_len = crypto_key_len
522         if crypto not in CRYPTO_ALGOS:
523             raise TypeError('unsupported encryption algo %r' % crypto)
524         self.esp_crypto = crypto
525         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526
527         if integ not in AUTH_ALGOS:
528             raise TypeError('unsupported auth algo %r' % integ)
529         self.esp_integ = None if integ == 'NULL' else integ
530         self.esp_integ_alg = AUTH_ALGOS[integ]
531
532     def crypto_attr(self, key_len):
533         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
534             return (0x800e << 16 | key_len << 3, 12)
535         else:
536             raise Exception('unsupported attribute type')
537
538     def ike_crypto_attr(self):
539         return self.crypto_attr(self.ike_crypto_key_len)
540
541     def esp_crypto_attr(self):
542         return self.crypto_attr(self.esp_crypto_key_len)
543
544     def compute_nat_sha1(self, ip, port, rspi=None):
545         if rspi is None:
546             rspi = self.rspi
547         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
548         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549         digest.update(data)
550         return digest.finalize()
551
552
553 class IkePeer(VppTestCase):
554     """ common class for initiator and responder """
555
556     @classmethod
557     def setUpClass(cls):
558         import scapy.contrib.ikev2 as _ikev2
559         globals()['ikev2'] = _ikev2
560         super(IkePeer, cls).setUpClass()
561         cls.create_pg_interfaces(range(2))
562         for i in cls.pg_interfaces:
563             i.admin_up()
564             i.config_ip4()
565             i.resolve_arp()
566             i.config_ip6()
567             i.resolve_ndp()
568
569     @classmethod
570     def tearDownClass(cls):
571         super(IkePeer, cls).tearDownClass()
572
573     def tearDown(self):
574         super(IkePeer, self).tearDown()
575         if self.del_sa_from_responder:
576             self.initiate_del_sa_from_responder()
577         else:
578             self.initiate_del_sa_from_initiator()
579         r = self.vapi.ikev2_sa_dump()
580         self.assertEqual(len(r), 0)
581         sas = self.vapi.ipsec_sa_dump()
582         self.assertEqual(len(sas), 0)
583         self.p.remove_vpp_config()
584         self.assertIsNone(self.p.query_vpp_config())
585
586     def setUp(self):
587         super(IkePeer, self).setUp()
588         self.config_tc()
589         self.p.add_vpp_config()
590         self.assertIsNotNone(self.p.query_vpp_config())
591         if self.sa.is_initiator:
592             self.sa.generate_dh_data()
593         self.vapi.cli('ikev2 set logging level 4')
594         self.vapi.cli('event-lo clear')
595
596     def assert_counter(self, count, name, version='ip4'):
597         node_name = '/err/ikev2-%s/' % version + name
598         self.assertEqual(count, self.statistics.get_err_counter(node_name))
599
600     def create_rekey_request(self):
601         sa, first_payload = self.generate_auth_payload(is_rekey=True)
602         header = ikev2.IKEv2(
603                 init_SPI=self.sa.ispi,
604                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
605                 flags='Initiator', exch_type='CREATE_CHILD_SA')
606
607         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
608         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
609                                   self.sa.dport, self.sa.natt, self.ip6)
610
611     def create_empty_request(self):
612         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
613                              id=self.sa.new_msg_id(), flags='Initiator',
614                              exch_type='INFORMATIONAL',
615                              next_payload='Encrypted')
616
617         msg = self.encrypt_ike_msg(header, b'', None)
618         return self.create_packet(self.pg0, msg, self.sa.sport,
619                                   self.sa.dport, self.sa.natt, self.ip6)
620
621     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
622                       use_ip6=False):
623         if use_ip6:
624             src_ip = src_if.remote_ip6
625             dst_ip = src_if.local_ip6
626             ip_layer = IPv6
627         else:
628             src_ip = src_if.remote_ip4
629             dst_ip = src_if.local_ip4
630             ip_layer = IP
631         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
632                ip_layer(src=src_ip, dst=dst_ip) /
633                UDP(sport=sport, dport=dport))
634         if natt:
635             # insert non ESP marker
636             res = res / Raw(b'\x00' * 4)
637         return res / msg
638
639     def verify_udp(self, udp):
640         self.assertEqual(udp.sport, self.sa.sport)
641         self.assertEqual(udp.dport, self.sa.dport)
642
643     def get_ike_header(self, packet):
644         try:
645             ih = packet[ikev2.IKEv2]
646             ih = self.verify_and_remove_non_esp_marker(ih)
647         except IndexError as e:
648             # this is a workaround for getting IKEv2 layer as both ikev2 and
649             # ipsec register for port 4500
650             esp = packet[ESP]
651             ih = self.verify_and_remove_non_esp_marker(esp)
652         self.assertEqual(ih.version, 0x20)
653         self.assertNotIn('Version', ih.flags)
654         return ih
655
656     def verify_and_remove_non_esp_marker(self, packet):
657         if self.sa.natt:
658             # if we are in nat traversal mode check for non esp marker
659             # and remove it
660             data = raw(packet)
661             self.assertEqual(data[:4], b'\x00' * 4)
662             return ikev2.IKEv2(data[4:])
663         else:
664             return packet
665
666     def encrypt_ike_msg(self, header, plain, first_payload):
667         if self.sa.ike_crypto == 'AES-GCM-16ICV':
668             data = self.sa.ike_crypto_alg.pad(raw(plain))
669             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
670                 len(ikev2.IKEv2_payload_Encrypted())
671             tlen = plen + len(ikev2.IKEv2())
672
673             # prepare aad data
674             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
675                                                  length=plen)
676             header.length = tlen
677             res = header / sk_p
678             encr = self.sa.encrypt(raw(plain), raw(res))
679             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
680                                                  length=plen, load=encr)
681             res = header / sk_p
682         else:
683             encr = self.sa.encrypt(raw(plain))
684             trunc_len = self.sa.ike_integ_alg.trunc_len
685             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
686             tlen = plen + len(ikev2.IKEv2())
687
688             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
689                                                  length=plen, load=encr)
690             header.length = tlen
691             res = header / sk_p
692
693             integ_data = raw(res)
694             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
695                                              self.sa.my_authkey, integ_data)
696             res = res / Raw(hmac_data[:trunc_len])
697         assert(len(res) == tlen)
698         return res
699
700     def verify_udp_encap(self, ipsec_sa):
701         e = VppEnum.vl_api_ipsec_sad_flags_t
702         if self.sa.udp_encap or self.sa.natt:
703             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
704         else:
705             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706
707     def verify_ipsec_sas(self, is_rekey=False):
708         sas = self.vapi.ipsec_sa_dump()
709         if is_rekey:
710             # after rekey there is a short period of time in which old
711             # inbound SA is still present
712             sa_count = 3
713         else:
714             sa_count = 2
715         self.assertEqual(len(sas), sa_count)
716         if self.sa.is_initiator:
717             if is_rekey:
718                 sa0 = sas[0].entry
719                 sa1 = sas[2].entry
720             else:
721                 sa0 = sas[0].entry
722                 sa1 = sas[1].entry
723         else:
724             if is_rekey:
725                 sa0 = sas[2].entry
726                 sa1 = sas[0].entry
727             else:
728                 sa1 = sas[0].entry
729                 sa0 = sas[1].entry
730
731         c = self.sa.child_sas[0]
732
733         self.verify_udp_encap(sa0)
734         self.verify_udp_encap(sa1)
735         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
736         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
737         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
738
739         if self.sa.esp_integ is None:
740             vpp_integ_alg = 0
741         else:
742             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
743         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
744         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
745
746         # verify crypto keys
747         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
748         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
749         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
750         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
751
752         # verify integ keys
753         if vpp_integ_alg:
754             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
755             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
756             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
757             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
758         else:
759             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
760             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
761
762     def verify_keymat(self, api_keys, keys, name):
763         km = getattr(keys, name)
764         api_km = getattr(api_keys, name)
765         api_km_len = getattr(api_keys, name + '_len')
766         self.assertEqual(len(km), api_km_len)
767         self.assertEqual(km, api_km[:api_km_len])
768
769     def verify_id(self, api_id, exp_id):
770         self.assertEqual(api_id.type, IDType.value(exp_id.type))
771         self.assertEqual(api_id.data_len, exp_id.data_len)
772         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
773
774     def verify_ike_sas(self):
775         r = self.vapi.ikev2_sa_dump()
776         self.assertEqual(len(r), 1)
777         sa = r[0].sa
778         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
779         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
780         if self.ip6:
781             if self.sa.is_initiator:
782                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
783                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
784             else:
785                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
786                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
787         else:
788             if self.sa.is_initiator:
789                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
790                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
791             else:
792                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
793                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
794         self.verify_keymat(sa.keys, self.sa, 'sk_d')
795         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
796         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
797         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
798         self.verify_keymat(sa.keys, self.sa, 'sk_er')
799         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
800         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
801
802         self.assertEqual(sa.i_id.type, self.sa.id_type)
803         self.assertEqual(sa.r_id.type, self.sa.id_type)
804         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
805         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
806         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
807         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
808
809         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
810         self.assertEqual(len(r), 1)
811         csa = r[0].child_sa
812         self.assertEqual(csa.sa_index, sa.sa_index)
813         c = self.sa.child_sas[0]
814         if hasattr(c, 'sk_ai'):
815             self.verify_keymat(csa.keys, c, 'sk_ai')
816             self.verify_keymat(csa.keys, c, 'sk_ar')
817         self.verify_keymat(csa.keys, c, 'sk_ei')
818         self.verify_keymat(csa.keys, c, 'sk_er')
819         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
820         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
821
822         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
823         tsi = tsi[0]
824         tsr = tsr[0]
825         r = self.vapi.ikev2_traffic_selector_dump(
826                 is_initiator=True, sa_index=sa.sa_index,
827                 child_sa_index=csa.child_sa_index)
828         self.assertEqual(len(r), 1)
829         ts = r[0].ts
830         self.verify_ts(r[0].ts, tsi[0], True)
831
832         r = self.vapi.ikev2_traffic_selector_dump(
833                 is_initiator=False, sa_index=sa.sa_index,
834                 child_sa_index=csa.child_sa_index)
835         self.assertEqual(len(r), 1)
836         self.verify_ts(r[0].ts, tsr[0], False)
837
838         n = self.vapi.ikev2_nonce_get(is_initiator=True,
839                                       sa_index=sa.sa_index)
840         self.verify_nonce(n, self.sa.i_nonce)
841         n = self.vapi.ikev2_nonce_get(is_initiator=False,
842                                       sa_index=sa.sa_index)
843         self.verify_nonce(n, self.sa.r_nonce)
844
845     def verify_nonce(self, api_nonce, nonce):
846         self.assertEqual(api_nonce.data_len, len(nonce))
847         self.assertEqual(api_nonce.nonce, nonce)
848
849     def verify_ts(self, api_ts, ts, is_initiator):
850         if is_initiator:
851             self.assertTrue(api_ts.is_local)
852         else:
853             self.assertFalse(api_ts.is_local)
854
855         if self.p.ts_is_ip4:
856             self.assertEqual(api_ts.start_addr,
857                              IPv4Address(ts.starting_address_v4))
858             self.assertEqual(api_ts.end_addr,
859                              IPv4Address(ts.ending_address_v4))
860         else:
861             self.assertEqual(api_ts.start_addr,
862                              IPv6Address(ts.starting_address_v6))
863             self.assertEqual(api_ts.end_addr,
864                              IPv6Address(ts.ending_address_v6))
865         self.assertEqual(api_ts.start_port, ts.start_port)
866         self.assertEqual(api_ts.end_port, ts.end_port)
867         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
868
869
870 class TemplateInitiator(IkePeer):
871     """ initiator test template """
872
873     def initiate_del_sa_from_initiator(self):
874         ispi = int.from_bytes(self.sa.ispi, 'little')
875         self.pg0.enable_capture()
876         self.pg_start()
877         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
878         capture = self.pg0.get_capture(1)
879         ih = self.get_ike_header(capture[0])
880         self.assertNotIn('Response', ih.flags)
881         self.assertIn('Initiator', ih.flags)
882         self.assertEqual(ih.init_SPI, self.sa.ispi)
883         self.assertEqual(ih.resp_SPI, self.sa.rspi)
884         plain = self.sa.hmac_and_decrypt(ih)
885         d = ikev2.IKEv2_payload_Delete(plain)
886         self.assertEqual(d.proto, 1)  # proto=IKEv2
887         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
888                              flags='Response', exch_type='INFORMATIONAL',
889                              id=ih.id, next_payload='Encrypted')
890         resp = self.encrypt_ike_msg(header, b'', None)
891         self.send_and_assert_no_replies(self.pg0, resp)
892
893     def verify_del_sa(self, packet):
894         ih = self.get_ike_header(packet)
895         self.assertEqual(ih.id, self.sa.msg_id)
896         self.assertEqual(ih.exch_type, 37)  # exchange informational
897         self.assertIn('Response', ih.flags)
898         self.assertIn('Initiator', ih.flags)
899         plain = self.sa.hmac_and_decrypt(ih)
900         self.assertEqual(plain, b'')
901
902     def initiate_del_sa_from_responder(self):
903         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
904                              exch_type='INFORMATIONAL',
905                              id=self.sa.new_msg_id())
906         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
907         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
908         packet = self.create_packet(self.pg0, ike_msg,
909                                     self.sa.sport, self.sa.dport,
910                                     self.sa.natt, self.ip6)
911         self.pg0.add_stream(packet)
912         self.pg0.enable_capture()
913         self.pg_start()
914         capture = self.pg0.get_capture(1)
915         self.verify_del_sa(capture[0])
916
917     @staticmethod
918     def find_notify_payload(packet, notify_type):
919         n = packet[ikev2.IKEv2_payload_Notify]
920         while n is not None:
921             if n.type == notify_type:
922                 return n
923             n = n.payload
924         return None
925
926     def verify_nat_detection(self, packet):
927         if self.ip6:
928             iph = packet[IPv6]
929         else:
930             iph = packet[IP]
931         udp = packet[UDP]
932
933         # NAT_DETECTION_SOURCE_IP
934         s = self.find_notify_payload(packet, 16388)
935         self.assertIsNotNone(s)
936         src_sha = self.sa.compute_nat_sha1(
937                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
938         self.assertEqual(s.load, src_sha)
939
940         # NAT_DETECTION_DESTINATION_IP
941         s = self.find_notify_payload(packet, 16389)
942         self.assertIsNotNone(s)
943         dst_sha = self.sa.compute_nat_sha1(
944                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
945         self.assertEqual(s.load, dst_sha)
946
947     def verify_sa_init_request(self, packet):
948         udp = packet[UDP]
949         self.sa.dport = udp.sport
950         ih = packet[ikev2.IKEv2]
951         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
952         self.assertEqual(ih.exch_type, 34)  # SA_INIT
953         self.sa.ispi = ih.init_SPI
954         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
955         self.assertIn('Initiator', ih.flags)
956         self.assertNotIn('Response', ih.flags)
957         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
958         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
959
960         prop = packet[ikev2.IKEv2_payload_Proposal]
961         self.assertEqual(prop.proto, 1)  # proto = ikev2
962         self.assertEqual(prop.proposal, 1)
963         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
964         self.assertEqual(prop.trans[0].transform_id,
965                          self.p.ike_transforms['crypto_alg'])
966         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
967         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
968         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
969         self.assertEqual(prop.trans[2].transform_id,
970                          self.p.ike_transforms['dh_group'])
971
972         self.verify_nat_detection(packet)
973         self.sa.set_ike_props(
974                     crypto='AES-GCM-16ICV', crypto_key_len=32,
975                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
976         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
977                               integ='SHA2-256-128')
978         self.sa.generate_dh_data()
979         self.sa.complete_dh_data()
980         self.sa.calc_keys()
981
982     def update_esp_transforms(self, trans, sa):
983         while trans:
984             if trans.transform_type == 1:  # ecryption
985                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
986             elif trans.transform_type == 3:  # integrity
987                 sa.esp_integ = INTEG_IDS[trans.transform_id]
988             trans = trans.payload
989
990     def verify_sa_auth_req(self, packet):
991         udp = packet[UDP]
992         self.sa.dport = udp.sport
993         ih = self.get_ike_header(packet)
994         self.assertEqual(ih.resp_SPI, self.sa.rspi)
995         self.assertEqual(ih.init_SPI, self.sa.ispi)
996         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
997         self.assertIn('Initiator', ih.flags)
998         self.assertNotIn('Response', ih.flags)
999
1000         udp = packet[UDP]
1001         self.verify_udp(udp)
1002         self.assertEqual(ih.id, self.sa.msg_id + 1)
1003         self.sa.msg_id += 1
1004         plain = self.sa.hmac_and_decrypt(ih)
1005         idi = ikev2.IKEv2_payload_IDi(plain)
1006         idr = ikev2.IKEv2_payload_IDr(idi.payload)
1007         self.assertEqual(idi.load, self.sa.i_id)
1008         self.assertEqual(idr.load, self.sa.r_id)
1009         prop = idi[ikev2.IKEv2_payload_Proposal]
1010         c = self.sa.child_sas[0]
1011         c.ispi = prop.SPI
1012         self.update_esp_transforms(
1013                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1014
1015     def send_init_response(self):
1016         tr_attr = self.sa.ike_crypto_attr()
1017         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1018                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1019                  key_length=tr_attr[0]) /
1020                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1021                  transform_id=self.sa.ike_integ) /
1022                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1023                  transform_id=self.sa.ike_prf_alg.name) /
1024                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1025                  transform_id=self.sa.ike_dh))
1026         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1027                  trans_nb=4, trans=trans))
1028
1029         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1030         if self.sa.natt:
1031             dst_address = b'\x0a\x0a\x0a\x0a'
1032         else:
1033             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1034         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1035         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1036
1037         self.sa.init_resp_packet = (
1038             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1039                         exch_type='IKE_SA_INIT', flags='Response') /
1040             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1041             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1042                                    group=self.sa.ike_dh,
1043                                    load=self.sa.my_dh_pub_key) /
1044             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1045                                       next_payload='Notify') /
1046             ikev2.IKEv2_payload_Notify(
1047                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1048                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1049                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1050
1051         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1052                                      self.sa.sport, self.sa.dport,
1053                                      False, self.ip6)
1054         self.pg_send(self.pg0, ike_msg)
1055         capture = self.pg0.get_capture(1)
1056         self.verify_sa_auth_req(capture[0])
1057
1058     def initiate_sa_init(self):
1059         self.pg0.enable_capture()
1060         self.pg_start()
1061         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1062
1063         capture = self.pg0.get_capture(1)
1064         self.verify_sa_init_request(capture[0])
1065         self.send_init_response()
1066
1067     def send_auth_response(self):
1068         tr_attr = self.sa.esp_crypto_attr()
1069         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1070                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1071                  key_length=tr_attr[0]) /
1072                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1073                  transform_id=self.sa.esp_integ) /
1074                  ikev2.IKEv2_payload_Transform(
1075                  transform_type='Extended Sequence Number',
1076                  transform_id='No ESN') /
1077                  ikev2.IKEv2_payload_Transform(
1078                  transform_type='Extended Sequence Number',
1079                  transform_id='ESN'))
1080
1081         c = self.sa.child_sas[0]
1082         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1083                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1084
1085         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1086         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1087                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1088                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1089                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1090                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1091                  auth_type=AuthMethod.value(self.sa.auth_method),
1092                  load=self.sa.auth_data) /
1093                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1094                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1095                  number_of_TSs=len(tsi),
1096                  traffic_selector=tsi) /
1097                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1098                  number_of_TSs=len(tsr),
1099                  traffic_selector=tsr) /
1100                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1101
1102         header = ikev2.IKEv2(
1103                 init_SPI=self.sa.ispi,
1104                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1105                 flags='Response', exch_type='IKE_AUTH')
1106
1107         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1108         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1109                                     self.sa.dport, self.sa.natt, self.ip6)
1110         self.pg_send(self.pg0, packet)
1111
1112     def test_initiator(self):
1113         self.initiate_sa_init()
1114         self.sa.auth_init()
1115         self.sa.calc_child_keys()
1116         self.send_auth_response()
1117         self.verify_ike_sas()
1118
1119
1120 class TemplateResponder(IkePeer):
1121     """ responder test template """
1122
1123     def initiate_del_sa_from_responder(self):
1124         self.pg0.enable_capture()
1125         self.pg_start()
1126         self.vapi.ikev2_initiate_del_ike_sa(
1127                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1128         capture = self.pg0.get_capture(1)
1129         ih = self.get_ike_header(capture[0])
1130         self.assertNotIn('Response', ih.flags)
1131         self.assertNotIn('Initiator', ih.flags)
1132         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1133         plain = self.sa.hmac_and_decrypt(ih)
1134         d = ikev2.IKEv2_payload_Delete(plain)
1135         self.assertEqual(d.proto, 1)  # proto=IKEv2
1136         self.assertEqual(ih.init_SPI, self.sa.ispi)
1137         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1138         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1139                              flags='Initiator+Response',
1140                              exch_type='INFORMATIONAL',
1141                              id=ih.id, next_payload='Encrypted')
1142         resp = self.encrypt_ike_msg(header, b'', None)
1143         self.send_and_assert_no_replies(self.pg0, resp)
1144
1145     def verify_del_sa(self, packet):
1146         ih = self.get_ike_header(packet)
1147         self.assertEqual(ih.id, self.sa.msg_id)
1148         self.assertEqual(ih.exch_type, 37)  # exchange informational
1149         self.assertIn('Response', ih.flags)
1150         self.assertNotIn('Initiator', ih.flags)
1151         self.assertEqual(ih.next_payload, 46)  # Encrypted
1152         self.assertEqual(ih.init_SPI, self.sa.ispi)
1153         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1154         plain = self.sa.hmac_and_decrypt(ih)
1155         self.assertEqual(plain, b'')
1156
1157     def initiate_del_sa_from_initiator(self):
1158         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1159                              flags='Initiator', exch_type='INFORMATIONAL',
1160                              id=self.sa.new_msg_id())
1161         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1162         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1163         packet = self.create_packet(self.pg0, ike_msg,
1164                                     self.sa.sport, self.sa.dport,
1165                                     self.sa.natt, self.ip6)
1166         self.pg0.add_stream(packet)
1167         self.pg0.enable_capture()
1168         self.pg_start()
1169         capture = self.pg0.get_capture(1)
1170         self.verify_del_sa(capture[0])
1171
1172     def send_sa_init_req(self):
1173         tr_attr = self.sa.ike_crypto_attr()
1174         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1175                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1176                  key_length=tr_attr[0]) /
1177                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1178                  transform_id=self.sa.ike_integ) /
1179                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1180                  transform_id=self.sa.ike_prf_alg.name) /
1181                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1182                  transform_id=self.sa.ike_dh))
1183
1184         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1185                  trans_nb=4, trans=trans))
1186
1187         next_payload = None if self.ip6 else 'Notify'
1188
1189         self.sa.init_req_packet = (
1190                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1191                             flags='Initiator', exch_type='IKE_SA_INIT') /
1192                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1193                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1194                                        group=self.sa.ike_dh,
1195                                        load=self.sa.my_dh_pub_key) /
1196                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1197                                           load=self.sa.i_nonce))
1198
1199         if not self.ip6:
1200             if self.sa.i_natt:
1201                 src_address = b'\x0a\x0a\x0a\x01'
1202             else:
1203                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1204
1205             if self.sa.r_natt:
1206                 dst_address = b'\x0a\x0a\x0a\x0a'
1207             else:
1208                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1209
1210             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1211             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1212             nat_src_detection = ikev2.IKEv2_payload_Notify(
1213                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1214                     next_payload='Notify')
1215             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1216                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1217             self.sa.init_req_packet = (self.sa.init_req_packet /
1218                                        nat_src_detection /
1219                                        nat_dst_detection)
1220
1221         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1222                                      self.sa.sport, self.sa.dport,
1223                                      self.sa.natt, self.ip6)
1224         self.pg0.add_stream(ike_msg)
1225         self.pg0.enable_capture()
1226         self.pg_start()
1227         capture = self.pg0.get_capture(1)
1228         self.verify_sa_init(capture[0])
1229
1230     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1231         tr_attr = self.sa.esp_crypto_attr()
1232         last_payload = last_payload or 'Notify'
1233         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1234                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1235                  key_length=tr_attr[0]) /
1236                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1237                  transform_id=self.sa.esp_integ) /
1238                  ikev2.IKEv2_payload_Transform(
1239                  transform_type='Extended Sequence Number',
1240                  transform_id='No ESN') /
1241                  ikev2.IKEv2_payload_Transform(
1242                  transform_type='Extended Sequence Number',
1243                  transform_id='ESN'))
1244
1245         c = self.sa.child_sas[0]
1246         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1247                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1248
1249         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1250         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1251                  auth_type=AuthMethod.value(self.sa.auth_method),
1252                  load=self.sa.auth_data) /
1253                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1254                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1255                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1256                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1257                  number_of_TSs=len(tsr), traffic_selector=tsr))
1258
1259         if is_rekey:
1260             first_payload = 'Nonce'
1261             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1262                      next_payload='SA') / plain /
1263                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1264                      proto='ESP', SPI=c.ispi))
1265         else:
1266             first_payload = 'IDi'
1267             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1268                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1269                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1270                    IDtype=self.sa.id_type, load=self.sa.r_id))
1271             plain = ids / plain
1272         return plain, first_payload
1273
1274     def send_sa_auth(self):
1275         plain, first_payload = self.generate_auth_payload(
1276                     last_payload='Notify')
1277         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1278         header = ikev2.IKEv2(
1279                 init_SPI=self.sa.ispi,
1280                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1281                 flags='Initiator', exch_type='IKE_AUTH')
1282
1283         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1284         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1285                                     self.sa.dport, self.sa.natt, self.ip6)
1286         self.pg0.add_stream(packet)
1287         self.pg0.enable_capture()
1288         self.pg_start()
1289         capture = self.pg0.get_capture(1)
1290         self.verify_sa_auth_resp(capture[0])
1291
1292     def verify_sa_init(self, packet):
1293         ih = self.get_ike_header(packet)
1294
1295         self.assertEqual(ih.id, self.sa.msg_id)
1296         self.assertEqual(ih.exch_type, 34)
1297         self.assertIn('Response', ih.flags)
1298         self.assertEqual(ih.init_SPI, self.sa.ispi)
1299         self.assertNotEqual(ih.resp_SPI, 0)
1300         self.sa.rspi = ih.resp_SPI
1301         try:
1302             sa = ih[ikev2.IKEv2_payload_SA]
1303             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1304             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1305         except IndexError as e:
1306             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1307             self.logger.error(ih.show())
1308             raise
1309         self.sa.complete_dh_data()
1310         self.sa.calc_keys()
1311         self.sa.auth_init()
1312
1313     def verify_sa_auth_resp(self, packet):
1314         ike = self.get_ike_header(packet)
1315         udp = packet[UDP]
1316         self.verify_udp(udp)
1317         self.assertEqual(ike.id, self.sa.msg_id)
1318         plain = self.sa.hmac_and_decrypt(ike)
1319         idr = ikev2.IKEv2_payload_IDr(plain)
1320         prop = idr[ikev2.IKEv2_payload_Proposal]
1321         self.assertEqual(prop.SPIsize, 4)
1322         self.sa.child_sas[0].rspi = prop.SPI
1323         self.sa.calc_child_keys()
1324
1325     IKE_NODE_SUFFIX = 'ip4'
1326
1327     def verify_counters(self):
1328         self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1329         self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1330         self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1331
1332         r = self.vapi.ikev2_sa_dump()
1333         s = r[0].sa.stats
1334         self.assertEqual(1, s.n_sa_auth_req)
1335         self.assertEqual(1, s.n_sa_init_req)
1336
1337     def test_responder(self):
1338         self.send_sa_init_req()
1339         self.send_sa_auth()
1340         self.verify_ipsec_sas()
1341         self.verify_ike_sas()
1342         self.verify_counters()
1343
1344
1345 class Ikev2Params(object):
1346     def config_params(self, params={}):
1347         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1348         ei = VppEnum.vl_api_ipsec_integ_alg_t
1349         self.vpp_enums = {
1350                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1351                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1352                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1353                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1354                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1355                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1356
1357                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1358                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1359                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1360                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1361
1362         dpd_disabled = True if 'dpd_disabled' not in params else\
1363             params['dpd_disabled']
1364         if dpd_disabled:
1365             self.vapi.cli('ikev2 dpd disable')
1366         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1367             not in params else params['del_sa_from_responder']
1368         i_natt = False if 'i_natt' not in params else params['i_natt']
1369         r_natt = False if 'r_natt' not in params else params['r_natt']
1370         self.p = Profile(self, 'pr1')
1371         self.ip6 = False if 'ip6' not in params else params['ip6']
1372
1373         if 'auth' in params and params['auth'] == 'rsa-sig':
1374             auth_method = 'rsa-sig'
1375             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1376             self.vapi.ikev2_set_local_key(
1377                     key_file=work_dir + params['server-key'])
1378
1379             client_file = work_dir + params['client-cert']
1380             server_pem = open(work_dir + params['server-cert']).read()
1381             client_priv = open(work_dir + params['client-key']).read()
1382             client_priv = load_pem_private_key(str.encode(client_priv), None,
1383                                                default_backend())
1384             self.peer_cert = x509.load_pem_x509_certificate(
1385                     str.encode(server_pem),
1386                     default_backend())
1387             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1388             auth_data = None
1389         else:
1390             auth_data = b'$3cr3tpa$$w0rd'
1391             self.p.add_auth(method='shared-key', data=auth_data)
1392             auth_method = 'shared-key'
1393             client_priv = None
1394
1395         is_init = True if 'is_initiator' not in params else\
1396             params['is_initiator']
1397
1398         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1399         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1400         if is_init:
1401             self.p.add_local_id(**idr)
1402             self.p.add_remote_id(**idi)
1403         else:
1404             self.p.add_local_id(**idi)
1405             self.p.add_remote_id(**idr)
1406
1407         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1408             'loc_ts' not in params else params['loc_ts']
1409         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1410             'rem_ts' not in params else params['rem_ts']
1411         self.p.add_local_ts(**loc_ts)
1412         self.p.add_remote_ts(**rem_ts)
1413         if 'responder' in params:
1414             self.p.add_responder(params['responder'])
1415         if 'ike_transforms' in params:
1416             self.p.add_ike_transforms(params['ike_transforms'])
1417         if 'esp_transforms' in params:
1418             self.p.add_esp_transforms(params['esp_transforms'])
1419
1420         udp_encap = False if 'udp_encap' not in params else\
1421             params['udp_encap']
1422         if udp_encap:
1423             self.p.set_udp_encap(True)
1424
1425         if 'responder_hostname' in params:
1426             hn = params['responder_hostname']
1427             self.p.add_responder_hostname(hn)
1428
1429             # configure static dns record
1430             self.vapi.dns_name_server_add_del(
1431                 is_ip6=0, is_add=1,
1432                 server_address=IPv4Address(u'8.8.8.8').packed)
1433             self.vapi.dns_enable_disable(enable=1)
1434
1435             cmd = "dns cache add {} {}".format(hn['hostname'],
1436                                                self.pg0.remote_ip4)
1437             self.vapi.cli(cmd)
1438
1439         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1440                           is_initiator=is_init,
1441                           id_type=self.p.local_id['id_type'],
1442                           i_natt=i_natt, r_natt=r_natt,
1443                           priv_key=client_priv, auth_method=auth_method,
1444                           nonce=params.get('nonce'),
1445                           auth_data=auth_data, udp_encap=udp_encap,
1446                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1447         if is_init:
1448             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1449                 params['ike-crypto']
1450             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1451                 params['ike-integ']
1452             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1453                 params['ike-dh']
1454
1455             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1456                 params['esp-crypto']
1457             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1458                 params['esp-integ']
1459
1460             self.sa.set_ike_props(
1461                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1462                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1463             self.sa.set_esp_props(
1464                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1465                     integ=esp_integ)
1466
1467
1468 class TestApi(VppTestCase):
1469     """ Test IKEV2 API """
1470     @classmethod
1471     def setUpClass(cls):
1472         super(TestApi, cls).setUpClass()
1473
1474     @classmethod
1475     def tearDownClass(cls):
1476         super(TestApi, cls).tearDownClass()
1477
1478     def tearDown(self):
1479         super(TestApi, self).tearDown()
1480         self.p1.remove_vpp_config()
1481         self.p2.remove_vpp_config()
1482         r = self.vapi.ikev2_profile_dump()
1483         self.assertEqual(len(r), 0)
1484
1485     def configure_profile(self, cfg):
1486         p = Profile(self, cfg['name'])
1487         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1488         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1489         p.add_local_ts(**cfg['loc_ts'])
1490         p.add_remote_ts(**cfg['rem_ts'])
1491         p.add_responder(cfg['responder'])
1492         p.add_ike_transforms(cfg['ike_ts'])
1493         p.add_esp_transforms(cfg['esp_ts'])
1494         p.add_auth(**cfg['auth'])
1495         p.set_udp_encap(cfg['udp_encap'])
1496         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1497         if 'lifetime_data' in cfg:
1498             p.set_lifetime_data(cfg['lifetime_data'])
1499         if 'tun_itf' in cfg:
1500             p.set_tunnel_interface(cfg['tun_itf'])
1501         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1502             p.disable_natt()
1503         p.add_vpp_config()
1504         return p
1505
1506     def test_profile_api(self):
1507         """ test profile dump API """
1508         loc_ts4 = {
1509                     'proto': 8,
1510                     'start_port': 1,
1511                     'end_port': 19,
1512                     'start_addr': '3.3.3.2',
1513                     'end_addr': '3.3.3.3',
1514                 }
1515         rem_ts4 = {
1516                     'proto': 9,
1517                     'start_port': 10,
1518                     'end_port': 119,
1519                     'start_addr': '4.5.76.80',
1520                     'end_addr': '2.3.4.6',
1521                 }
1522
1523         loc_ts6 = {
1524                     'proto': 8,
1525                     'start_port': 1,
1526                     'end_port': 19,
1527                     'start_addr': 'ab::1',
1528                     'end_addr': 'ab::4',
1529                 }
1530         rem_ts6 = {
1531                     'proto': 9,
1532                     'start_port': 10,
1533                     'end_port': 119,
1534                     'start_addr': 'cd::12',
1535                     'end_addr': 'cd::13',
1536                 }
1537
1538         conf = {
1539             'p1': {
1540                 'name': 'p1',
1541                 'natt_disabled': True,
1542                 'loc_id': ('fqdn', b'vpp.home'),
1543                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1544                 'loc_ts': loc_ts4,
1545                 'rem_ts': rem_ts4,
1546                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1547                 'ike_ts': {
1548                         'crypto_alg': 20,
1549                         'crypto_key_size': 32,
1550                         'integ_alg': 0,
1551                         'dh_group': 1},
1552                 'esp_ts': {
1553                         'crypto_alg': 13,
1554                         'crypto_key_size': 24,
1555                         'integ_alg': 2},
1556                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1557                 'udp_encap': True,
1558                 'ipsec_over_udp_port': 4501,
1559                 'lifetime_data': {
1560                     'lifetime': 123,
1561                     'lifetime_maxdata': 20192,
1562                     'lifetime_jitter': 9,
1563                     'handover': 132},
1564             },
1565             'p2': {
1566                 'name': 'p2',
1567                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1568                 'rem_id': ('ip6-addr', b'abcd::1'),
1569                 'loc_ts': loc_ts6,
1570                 'rem_ts': rem_ts6,
1571                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1572                 'ike_ts': {
1573                         'crypto_alg': 12,
1574                         'crypto_key_size': 16,
1575                         'integ_alg': 3,
1576                         'dh_group': 3},
1577                 'esp_ts': {
1578                         'crypto_alg': 9,
1579                         'crypto_key_size': 24,
1580                         'integ_alg': 4},
1581                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1582                 'udp_encap': False,
1583                 'ipsec_over_udp_port': 4600,
1584                 'tun_itf': 0}
1585         }
1586         self.p1 = self.configure_profile(conf['p1'])
1587         self.p2 = self.configure_profile(conf['p2'])
1588
1589         r = self.vapi.ikev2_profile_dump()
1590         self.assertEqual(len(r), 2)
1591         self.verify_profile(r[0].profile, conf['p1'])
1592         self.verify_profile(r[1].profile, conf['p2'])
1593
1594     def verify_id(self, api_id, cfg_id):
1595         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1596         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1597
1598     def verify_ts(self, api_ts, cfg_ts):
1599         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1600         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1601         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1602         self.assertEqual(api_ts.start_addr,
1603                          ip_address(text_type(cfg_ts['start_addr'])))
1604         self.assertEqual(api_ts.end_addr,
1605                          ip_address(text_type(cfg_ts['end_addr'])))
1606
1607     def verify_responder(self, api_r, cfg_r):
1608         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1609         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1610
1611     def verify_transforms(self, api_ts, cfg_ts):
1612         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1613         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1614         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1615
1616     def verify_ike_transforms(self, api_ts, cfg_ts):
1617         self.verify_transforms(api_ts, cfg_ts)
1618         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1619
1620     def verify_esp_transforms(self, api_ts, cfg_ts):
1621         self.verify_transforms(api_ts, cfg_ts)
1622
1623     def verify_auth(self, api_auth, cfg_auth):
1624         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1625         self.assertEqual(api_auth.data, cfg_auth['data'])
1626         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1627
1628     def verify_lifetime_data(self, p, ld):
1629         self.assertEqual(p.lifetime, ld['lifetime'])
1630         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1631         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1632         self.assertEqual(p.handover, ld['handover'])
1633
1634     def verify_profile(self, ap, cp):
1635         self.assertEqual(ap.name, cp['name'])
1636         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1637         self.verify_id(ap.loc_id, cp['loc_id'])
1638         self.verify_id(ap.rem_id, cp['rem_id'])
1639         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1640         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1641         self.verify_responder(ap.responder, cp['responder'])
1642         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1643         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1644         self.verify_auth(ap.auth, cp['auth'])
1645         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1646         self.assertTrue(natt_dis == ap.natt_disabled)
1647
1648         if 'lifetime_data' in cp:
1649             self.verify_lifetime_data(ap, cp['lifetime_data'])
1650         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1651         if 'tun_itf' in cp:
1652             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1653         else:
1654             self.assertEqual(ap.tun_itf, 0xffffffff)
1655
1656
1657 @tag_fixme_vpp_workers
1658 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1659     """ test responder - responder behind NAT """
1660
1661     IKE_NODE_SUFFIX = 'ip4-natt'
1662
1663     def config_tc(self):
1664         self.config_params({'r_natt': True})
1665
1666
1667 @tag_fixme_vpp_workers
1668 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1669     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1670
1671     def config_tc(self):
1672         self.config_params({
1673             'i_natt': True,
1674             'is_initiator': False,  # seen from test case perspective
1675                                     # thus vpp is initiator
1676             'responder': {'sw_if_index': self.pg0.sw_if_index,
1677                            'addr': self.pg0.remote_ip4},
1678             'ike-crypto': ('AES-GCM-16ICV', 32),
1679             'ike-integ': 'NULL',
1680             'ike-dh': '3072MODPgr',
1681             'ike_transforms': {
1682                 'crypto_alg': 20,  # "aes-gcm-16"
1683                 'crypto_key_size': 256,
1684                 'dh_group': 15,  # "modp-3072"
1685             },
1686             'esp_transforms': {
1687                 'crypto_alg': 12,  # "aes-cbc"
1688                 'crypto_key_size': 256,
1689                 # "hmac-sha2-256-128"
1690                 'integ_alg': 12}})
1691
1692
1693 @tag_fixme_vpp_workers
1694 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1695     """ test ikev2 initiator - pre shared key auth """
1696
1697     def config_tc(self):
1698         self.config_params({
1699             'is_initiator': False,  # seen from test case perspective
1700                                     # thus vpp is initiator
1701             'ike-crypto': ('AES-GCM-16ICV', 32),
1702             'ike-integ': 'NULL',
1703             'ike-dh': '3072MODPgr',
1704             'ike_transforms': {
1705                 'crypto_alg': 20,  # "aes-gcm-16"
1706                 'crypto_key_size': 256,
1707                 'dh_group': 15,  # "modp-3072"
1708             },
1709             'esp_transforms': {
1710                 'crypto_alg': 12,  # "aes-cbc"
1711                 'crypto_key_size': 256,
1712                 # "hmac-sha2-256-128"
1713                 'integ_alg': 12},
1714             'responder_hostname': {'hostname': 'vpp.responder.org',
1715                                    'sw_if_index': self.pg0.sw_if_index}})
1716
1717
1718 @tag_fixme_vpp_workers
1719 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1720     """ test initiator - request window size (1) """
1721
1722     def rekey_respond(self, req, update_child_sa_data):
1723         ih = self.get_ike_header(req)
1724         plain = self.sa.hmac_and_decrypt(ih)
1725         sa = ikev2.IKEv2_payload_SA(plain)
1726         if update_child_sa_data:
1727             prop = sa[ikev2.IKEv2_payload_Proposal]
1728             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1729             self.sa.r_nonce = self.sa.i_nonce
1730             self.sa.child_sas[0].ispi = prop.SPI
1731             self.sa.child_sas[0].rspi = prop.SPI
1732             self.sa.calc_child_keys()
1733
1734         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1735                              flags='Response', exch_type=36,
1736                              id=ih.id, next_payload='Encrypted')
1737         resp = self.encrypt_ike_msg(header, sa, 'SA')
1738         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1739                                     self.sa.dport, self.sa.natt, self.ip6)
1740         self.send_and_assert_no_replies(self.pg0, packet)
1741
1742     def test_initiator(self):
1743         super(TestInitiatorRequestWindowSize, self).test_initiator()
1744         self.pg0.enable_capture()
1745         self.pg_start()
1746         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1747         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1748         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1749         capture = self.pg0.get_capture(2)
1750
1751         # reply in reverse order
1752         self.rekey_respond(capture[1], True)
1753         self.rekey_respond(capture[0], False)
1754
1755         # verify that only the second request was accepted
1756         self.verify_ike_sas()
1757         self.verify_ipsec_sas(is_rekey=True)
1758
1759
1760 @tag_fixme_vpp_workers
1761 class TestInitiatorRekey(TestInitiatorPsk):
1762     """ test ikev2 initiator - rekey """
1763
1764     def rekey_from_initiator(self):
1765         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1766         self.pg0.enable_capture()
1767         self.pg_start()
1768         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1769         capture = self.pg0.get_capture(1)
1770         ih = self.get_ike_header(capture[0])
1771         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1772         self.assertNotIn('Response', ih.flags)
1773         self.assertIn('Initiator', ih.flags)
1774         plain = self.sa.hmac_and_decrypt(ih)
1775         sa = ikev2.IKEv2_payload_SA(plain)
1776         prop = sa[ikev2.IKEv2_payload_Proposal]
1777         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1778         self.sa.r_nonce = self.sa.i_nonce
1779         # update new responder SPI
1780         self.sa.child_sas[0].ispi = prop.SPI
1781         self.sa.child_sas[0].rspi = prop.SPI
1782         self.sa.calc_child_keys()
1783         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1784                              flags='Response', exch_type=36,
1785                              id=ih.id, next_payload='Encrypted')
1786         resp = self.encrypt_ike_msg(header, sa, 'SA')
1787         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1788                                     self.sa.dport, self.sa.natt, self.ip6)
1789         self.send_and_assert_no_replies(self.pg0, packet)
1790
1791     def test_initiator(self):
1792         super(TestInitiatorRekey, self).test_initiator()
1793         self.rekey_from_initiator()
1794         self.verify_ike_sas()
1795         self.verify_ipsec_sas(is_rekey=True)
1796
1797
1798 @tag_fixme_vpp_workers
1799 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1800     """ test ikev2 initiator - delete IKE SA from responder """
1801
1802     def config_tc(self):
1803         self.config_params({
1804             'del_sa_from_responder': True,
1805             'is_initiator': False,  # seen from test case perspective
1806                                     # thus vpp is initiator
1807             'responder': {'sw_if_index': self.pg0.sw_if_index,
1808                            'addr': self.pg0.remote_ip4},
1809             'ike-crypto': ('AES-GCM-16ICV', 32),
1810             'ike-integ': 'NULL',
1811             'ike-dh': '3072MODPgr',
1812             'ike_transforms': {
1813                 'crypto_alg': 20,  # "aes-gcm-16"
1814                 'crypto_key_size': 256,
1815                 'dh_group': 15,  # "modp-3072"
1816             },
1817             'esp_transforms': {
1818                 'crypto_alg': 12,  # "aes-cbc"
1819                 'crypto_key_size': 256,
1820                 # "hmac-sha2-256-128"
1821                 'integ_alg': 12}})
1822
1823
1824 @tag_fixme_vpp_workers
1825 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1826     """ test ikev2 responder - initiator behind NAT """
1827
1828     IKE_NODE_SUFFIX = 'ip4-natt'
1829
1830     def config_tc(self):
1831         self.config_params(
1832                 {'i_natt': True})
1833
1834
1835 @tag_fixme_vpp_workers
1836 class TestResponderPsk(TemplateResponder, Ikev2Params):
1837     """ test ikev2 responder - pre shared key auth """
1838     def config_tc(self):
1839         self.config_params()
1840
1841
1842 @tag_fixme_vpp_workers
1843 class TestResponderDpd(TestResponderPsk):
1844     """
1845     Dead peer detection test
1846     """
1847     def config_tc(self):
1848         self.config_params({'dpd_disabled': False})
1849
1850     def tearDown(self):
1851         pass
1852
1853     def test_responder(self):
1854         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1855         super(TestResponderDpd, self).test_responder()
1856         self.pg0.enable_capture()
1857         self.pg_start()
1858         # capture empty request but don't reply
1859         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1860         ih = self.get_ike_header(capture[0])
1861         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1862         plain = self.sa.hmac_and_decrypt(ih)
1863         self.assertEqual(plain, b'')
1864         # wait for SA expiration
1865         time.sleep(3)
1866         ike_sas = self.vapi.ikev2_sa_dump()
1867         self.assertEqual(len(ike_sas), 0)
1868         ipsec_sas = self.vapi.ipsec_sa_dump()
1869         self.assertEqual(len(ipsec_sas), 0)
1870
1871
1872 @tag_fixme_vpp_workers
1873 class TestResponderRekey(TestResponderPsk):
1874     """ test ikev2 responder - rekey """
1875
1876     def rekey_from_initiator(self):
1877         packet = self.create_rekey_request()
1878         self.pg0.add_stream(packet)
1879         self.pg0.enable_capture()
1880         self.pg_start()
1881         capture = self.pg0.get_capture(1)
1882         ih = self.get_ike_header(capture[0])
1883         plain = self.sa.hmac_and_decrypt(ih)
1884         sa = ikev2.IKEv2_payload_SA(plain)
1885         prop = sa[ikev2.IKEv2_payload_Proposal]
1886         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1887         # update new responder SPI
1888         self.sa.child_sas[0].rspi = prop.SPI
1889
1890     def test_responder(self):
1891         super(TestResponderRekey, self).test_responder()
1892         self.rekey_from_initiator()
1893         self.sa.calc_child_keys()
1894         self.verify_ike_sas()
1895         self.verify_ipsec_sas(is_rekey=True)
1896         self.assert_counter(1, 'rekey_req', 'ip4')
1897         r = self.vapi.ikev2_sa_dump()
1898         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1899
1900
1901 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1902     """ test ikev2 responder - non-default table id """
1903
1904     @classmethod
1905     def setUpClass(cls):
1906         import scapy.contrib.ikev2 as _ikev2
1907         globals()['ikev2'] = _ikev2
1908         super(IkePeer, cls).setUpClass()
1909         cls.create_pg_interfaces(range(1))
1910         cls.vapi.cli("ip table add 1")
1911         cls.vapi.cli("set interface ip table pg0 1")
1912         for i in cls.pg_interfaces:
1913             i.admin_up()
1914             i.config_ip4()
1915             i.resolve_arp()
1916             i.config_ip6()
1917             i.resolve_ndp()
1918
1919     def config_tc(self):
1920         self.config_params({'dpd_disabled': False})
1921
1922     def test_responder(self):
1923         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1924         super(TestResponderVrf, self).test_responder()
1925         self.pg0.enable_capture()
1926         self.pg_start()
1927         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1928         ih = self.get_ike_header(capture[0])
1929         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1930         plain = self.sa.hmac_and_decrypt(ih)
1931         self.assertEqual(plain, b'')
1932
1933
1934 @tag_fixme_vpp_workers
1935 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1936     """ test ikev2 responder - cert based auth """
1937     def config_tc(self):
1938         self.config_params({
1939             'udp_encap': True,
1940             'auth': 'rsa-sig',
1941             'server-key': 'server-key.pem',
1942             'client-key': 'client-key.pem',
1943             'client-cert': 'client-cert.pem',
1944             'server-cert': 'server-cert.pem'})
1945
1946
1947 @tag_fixme_vpp_workers
1948 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1949         (TemplateResponder, Ikev2Params):
1950     """
1951     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1952     """
1953     def config_tc(self):
1954         self.config_params({
1955             'ike-crypto': ('AES-CBC', 16),
1956             'ike-integ': 'SHA2-256-128',
1957             'esp-crypto': ('AES-CBC', 24),
1958             'esp-integ': 'SHA2-384-192',
1959             'ike-dh': '2048MODPgr',
1960             'nonce': os.urandom(256)})
1961
1962
1963 @tag_fixme_vpp_workers
1964 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1965         (TemplateResponder, Ikev2Params):
1966
1967     """
1968     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1969     """
1970     def config_tc(self):
1971         self.config_params({
1972             'ike-crypto': ('AES-CBC', 32),
1973             'ike-integ': 'SHA2-256-128',
1974             'esp-crypto': ('AES-GCM-16ICV', 32),
1975             'esp-integ': 'NULL',
1976             'ike-dh': '3072MODPgr'})
1977
1978
1979 @tag_fixme_vpp_workers
1980 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1981     """
1982     IKE:AES_GCM_16_256
1983     """
1984
1985     IKE_NODE_SUFFIX = 'ip6'
1986
1987     def config_tc(self):
1988         self.config_params({
1989             'del_sa_from_responder': True,
1990             'ip6': True,
1991             'natt': True,
1992             'ike-crypto': ('AES-GCM-16ICV', 32),
1993             'ike-integ': 'NULL',
1994             'ike-dh': '2048MODPgr',
1995             'loc_ts': {'start_addr': 'ab:cd::0',
1996                        'end_addr': 'ab:cd::10'},
1997             'rem_ts': {'start_addr': '11::0',
1998                        'end_addr': '11::100'}})
1999
2000
2001 @tag_fixme_vpp_workers
2002 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2003     """
2004     Test for keep alive messages
2005     """
2006
2007     def send_empty_req_from_responder(self):
2008         packet = self.create_empty_request()
2009         self.pg0.add_stream(packet)
2010         self.pg0.enable_capture()
2011         self.pg_start()
2012         capture = self.pg0.get_capture(1)
2013         ih = self.get_ike_header(capture[0])
2014         self.assertEqual(ih.id, self.sa.msg_id)
2015         plain = self.sa.hmac_and_decrypt(ih)
2016         self.assertEqual(plain, b'')
2017         self.assert_counter(1, 'keepalive', 'ip4')
2018         r = self.vapi.ikev2_sa_dump()
2019         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2020
2021     def test_initiator(self):
2022         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2023         self.send_empty_req_from_responder()
2024
2025
2026 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2027     """ malformed packet test """
2028
2029     def tearDown(self):
2030         pass
2031
2032     def config_tc(self):
2033         self.config_params()
2034
2035     def create_ike_init_msg(self, length=None, payload=None):
2036         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2037                           flags='Initiator', exch_type='IKE_SA_INIT')
2038         if payload is not None:
2039             msg /= payload
2040         return self.create_packet(self.pg0, msg, self.sa.sport,
2041                                   self.sa.dport)
2042
2043     def verify_bad_packet_length(self):
2044         ike_msg = self.create_ike_init_msg(length=0xdead)
2045         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2046         self.assert_counter(self.pkt_count, 'bad_length')
2047
2048     def verify_bad_sa_payload_length(self):
2049         p = ikev2.IKEv2_payload_SA(length=0xdead)
2050         ike_msg = self.create_ike_init_msg(payload=p)
2051         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2052         self.assert_counter(self.pkt_count, 'malformed_packet')
2053
2054     def test_responder(self):
2055         self.pkt_count = 254
2056         self.verify_bad_packet_length()
2057         self.verify_bad_sa_payload_length()
2058
2059
2060 if __name__ == '__main__':
2061     unittest.main(testRunner=VppTestRunner)