ikev2: fix udp encap
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
9     Cipher,
10     algorithms,
11     modes,
12 )
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
22
23 try:
24     text_type = unicode
25 except NameError:
26     text_type = str
27
28 KEY_PAD = b"Key Pad for IKEv2"
29 SALT_SIZE = 4
30 GCM_ICV_SIZE = 16
31 GCM_IV_SIZE = 8
32
33
34 # defined in rfc3526
35 # tuple structure is (p, g, key_len)
36 DH = {
37     '2048MODPgr': (long_converter("""
38     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
49
50     '3072MODPgr': (long_converter("""
51     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
67 }
68
69
70 class CryptoAlgo(object):
71     def __init__(self, name, cipher, mode):
72         self.name = name
73         self.cipher = cipher
74         self.mode = mode
75         if self.cipher is not None:
76             self.bs = self.cipher.block_size // 8
77
78             if self.name == 'AES-GCM-16ICV':
79                 self.iv_len = GCM_IV_SIZE
80             else:
81                 self.iv_len = self.bs
82
83     def encrypt(self, data, key, aad=None):
84         iv = os.urandom(self.iv_len)
85         if aad is None:
86             encryptor = Cipher(self.cipher(key), self.mode(iv),
87                                default_backend()).encryptor()
88             return iv + encryptor.update(data) + encryptor.finalize()
89         else:
90             salt = key[-SALT_SIZE:]
91             nonce = salt + iv
92             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93                                default_backend()).encryptor()
94             encryptor.authenticate_additional_data(aad)
95             data = encryptor.update(data) + encryptor.finalize()
96             data += encryptor.tag[:GCM_ICV_SIZE]
97             return iv + data
98
99     def decrypt(self, data, key, aad=None, icv=None):
100         if aad is None:
101             iv = data[:self.iv_len]
102             ct = data[self.iv_len:]
103             decryptor = Cipher(algorithms.AES(key),
104                                self.mode(iv),
105                                default_backend()).decryptor()
106             return decryptor.update(ct) + decryptor.finalize()
107         else:
108             salt = key[-SALT_SIZE:]
109             nonce = salt + data[:GCM_IV_SIZE]
110             ct = data[GCM_IV_SIZE:]
111             key = key[:-SALT_SIZE]
112             decryptor = Cipher(algorithms.AES(key),
113                                self.mode(nonce, icv, len(icv)),
114                                default_backend()).decryptor()
115             decryptor.authenticate_additional_data(aad)
116             return decryptor.update(ct) + decryptor.finalize()
117
118     def pad(self, data):
119         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
120         data = data + b'\x00' * (pad_len - 1)
121         return data + bytes([pad_len - 1])
122
123
124 class AuthAlgo(object):
125     def __init__(self, name, mac, mod, key_len, trunc_len=None):
126         self.name = name
127         self.mac = mac
128         self.mod = mod
129         self.key_len = key_len
130         self.trunc_len = trunc_len or key_len
131
132
133 CRYPTO_ALGOS = {
134     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
135     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
136     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
137                                 mode=modes.GCM),
138 }
139
140 AUTH_ALGOS = {
141     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
142     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
143     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
144     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
145     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
146 }
147
148 PRF_ALGOS = {
149     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
150     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
151                                   hashes.SHA256, 32),
152 }
153
154 CRYPTO_IDS = {
155     12: 'AES-CBC',
156     20: 'AES-GCM-16ICV',
157 }
158
159 INTEG_IDS = {
160     2: 'HMAC-SHA1-96',
161     12: 'SHA2-256-128',
162     13: 'SHA2-384-192',
163     14: 'SHA2-512-256',
164 }
165
166
167 class IKEv2ChildSA(object):
168     def __init__(self, local_ts, remote_ts, is_initiator):
169         spi = os.urandom(4)
170         if is_initiator:
171             self.ispi = spi
172             self.rspi = None
173         else:
174             self.rspi = spi
175             self.ispi = None
176         self.local_ts = local_ts
177         self.remote_ts = remote_ts
178
179
180 class IKEv2SA(object):
181     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
182                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
183                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
184                  auth_method='shared-key', priv_key=None, natt=False,
185                  udp_encap=False):
186         self.udp_encap = udp_encap
187         self.natt = natt
188         if natt:
189             self.sport = 4500
190             self.dport = 4500
191         else:
192             self.sport = 500
193             self.dport = 500
194         self.msg_id = 0
195         self.dh_params = None
196         self.test = test
197         self.priv_key = priv_key
198         self.is_initiator = is_initiator
199         nonce = nonce or os.urandom(32)
200         self.auth_data = auth_data
201         self.i_id = i_id
202         self.r_id = r_id
203         if isinstance(id_type, str):
204             self.id_type = IDType.value(id_type)
205         else:
206             self.id_type = id_type
207         self.auth_method = auth_method
208         if self.is_initiator:
209             self.rspi = 8 * b'\x00'
210             self.ispi = spi
211             self.i_nonce = nonce
212         else:
213             self.rspi = spi
214             self.ispi = 8 * b'\x00'
215             self.r_nonce = nonce
216         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
217                           self.is_initiator)]
218
219     def new_msg_id(self):
220         self.msg_id += 1
221         return self.msg_id
222
223     @property
224     def my_dh_pub_key(self):
225         if self.is_initiator:
226             return self.i_dh_data
227         return self.r_dh_data
228
229     @property
230     def peer_dh_pub_key(self):
231         if self.is_initiator:
232             return self.r_dh_data
233         return self.i_dh_data
234
235     def compute_secret(self):
236         priv = self.dh_private_key
237         peer = self.peer_dh_pub_key
238         p, g, l = self.ike_group
239         return pow(int.from_bytes(peer, 'big'),
240                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
241
242     def generate_dh_data(self):
243         # generate DH keys
244         if self.ike_dh not in DH:
245             raise NotImplementedError('%s not in DH group' % self.ike_dh)
246
247         if self.dh_params is None:
248             dhg = DH[self.ike_dh]
249             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
250             self.dh_params = pn.parameters(default_backend())
251
252         priv = self.dh_params.generate_private_key()
253         pub = priv.public_key()
254         x = priv.private_numbers().x
255         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
256         y = pub.public_numbers().y
257
258         if self.is_initiator:
259             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
260         else:
261             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
262
263     def complete_dh_data(self):
264         self.dh_shared_secret = self.compute_secret()
265
266     def calc_child_keys(self):
267         prf = self.ike_prf_alg.mod()
268         s = self.i_nonce + self.r_nonce
269         c = self.child_sas[0]
270
271         encr_key_len = self.esp_crypto_key_len
272         integ_key_len = self.esp_integ_alg.key_len
273         salt_len = 0 if integ_key_len else 4
274
275         l = (integ_key_len * 2 +
276              encr_key_len * 2 +
277              salt_len * 2)
278         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
279
280         pos = 0
281         c.sk_ei = keymat[pos:pos+encr_key_len]
282         pos += encr_key_len
283
284         if integ_key_len:
285             c.sk_ai = keymat[pos:pos+integ_key_len]
286             pos += integ_key_len
287         else:
288             c.salt_ei = keymat[pos:pos+salt_len]
289             pos += salt_len
290
291         c.sk_er = keymat[pos:pos+encr_key_len]
292         pos += encr_key_len
293
294         if integ_key_len:
295             c.sk_ar = keymat[pos:pos+integ_key_len]
296             pos += integ_key_len
297         else:
298             c.salt_er = keymat[pos:pos+salt_len]
299             pos += salt_len
300
301     def calc_prfplus(self, prf, key, seed, length):
302         r = b''
303         t = None
304         x = 1
305         while len(r) < length and x < 255:
306             if t is not None:
307                 s = t
308             else:
309                 s = b''
310             s = s + seed + bytes([x])
311             t = self.calc_prf(prf, key, s)
312             r = r + t
313             x = x + 1
314
315         if x == 255:
316             return None
317         return r
318
319     def calc_prf(self, prf, key, data):
320         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
321         h.update(data)
322         return h.finalize()
323
324     def calc_keys(self):
325         prf = self.ike_prf_alg.mod()
326         # SKEYSEED = prf(Ni | Nr, g^ir)
327         s = self.i_nonce + self.r_nonce
328         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
329
330         # calculate S = Ni | Nr | SPIi SPIr
331         s = s + self.ispi + self.rspi
332
333         prf_key_trunc = self.ike_prf_alg.trunc_len
334         encr_key_len = self.ike_crypto_key_len
335         tr_prf_key_len = self.ike_prf_alg.key_len
336         integ_key_len = self.ike_integ_alg.key_len
337         if integ_key_len == 0:
338             salt_size = 4
339         else:
340             salt_size = 0
341
342         l = (prf_key_trunc +
343              integ_key_len * 2 +
344              encr_key_len * 2 +
345              tr_prf_key_len * 2 +
346              salt_size * 2)
347         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
348
349         pos = 0
350         self.sk_d = keymat[:pos+prf_key_trunc]
351         pos += prf_key_trunc
352
353         self.sk_ai = keymat[pos:pos+integ_key_len]
354         pos += integ_key_len
355         self.sk_ar = keymat[pos:pos+integ_key_len]
356         pos += integ_key_len
357
358         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
359         pos += encr_key_len + salt_size
360         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
361         pos += encr_key_len + salt_size
362
363         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
364         pos += tr_prf_key_len
365         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
366
367     def generate_authmsg(self, prf, packet):
368         if self.is_initiator:
369             id = self.i_id
370             nonce = self.r_nonce
371             key = self.sk_pi
372         else:
373             id = self.r_id
374             nonce = self.i_nonce
375             key = self.sk_pr
376         data = bytes([self.id_type, 0, 0, 0]) + id
377         id_hash = self.calc_prf(prf, key, data)
378         return packet + nonce + id_hash
379
380     def auth_init(self):
381         prf = self.ike_prf_alg.mod()
382         if self.is_initiator:
383             packet = self.init_req_packet
384         else:
385             packet = self.init_resp_packet
386         authmsg = self.generate_authmsg(prf, raw(packet))
387         if self.auth_method == 'shared-key':
388             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
389             self.auth_data = self.calc_prf(prf, psk, authmsg)
390         elif self.auth_method == 'rsa-sig':
391             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
392                                                 hashes.SHA1())
393         else:
394             raise TypeError('unknown auth method type!')
395
396     def encrypt(self, data, aad=None):
397         data = self.ike_crypto_alg.pad(data)
398         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
399
400     @property
401     def peer_authkey(self):
402         if self.is_initiator:
403             return self.sk_ar
404         return self.sk_ai
405
406     @property
407     def my_authkey(self):
408         if self.is_initiator:
409             return self.sk_ai
410         return self.sk_ar
411
412     @property
413     def my_cryptokey(self):
414         if self.is_initiator:
415             return self.sk_ei
416         return self.sk_er
417
418     @property
419     def peer_cryptokey(self):
420         if self.is_initiator:
421             return self.sk_er
422         return self.sk_ei
423
424     def concat(self, alg, key_len):
425         return alg + '-' + str(key_len * 8)
426
427     @property
428     def vpp_ike_cypto_alg(self):
429         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
430
431     @property
432     def vpp_esp_cypto_alg(self):
433         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
434
435     def verify_hmac(self, ikemsg):
436         integ_trunc = self.ike_integ_alg.trunc_len
437         exp_hmac = ikemsg[-integ_trunc:]
438         data = ikemsg[:-integ_trunc]
439         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
440                                           self.peer_authkey, data)
441         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
442
443     def compute_hmac(self, integ, key, data):
444         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
445         h.update(data)
446         return h.finalize()
447
448     def decrypt(self, data, aad=None, icv=None):
449         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
450
451     def hmac_and_decrypt(self, ike):
452         ep = ike[ikev2.IKEv2_payload_Encrypted]
453         if self.ike_crypto == 'AES-GCM-16ICV':
454             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
455             ct = ep.load[:-GCM_ICV_SIZE]
456             tag = ep.load[-GCM_ICV_SIZE:]
457             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
458         else:
459             self.verify_hmac(raw(ike))
460             integ_trunc = self.ike_integ_alg.trunc_len
461
462             # remove ICV and decrypt payload
463             ct = ep.load[:-integ_trunc]
464             plain = self.decrypt(ct)
465         # remove padding
466         pad_len = plain[-1]
467         return plain[:-pad_len - 1]
468
469     def build_ts_addr(self, ts, version):
470         return {'starting_address_v' + version: ts['start_addr'],
471                 'ending_address_v' + version: ts['end_addr']}
472
473     def generate_ts(self, is_ip4):
474         c = self.child_sas[0]
475         ts_data = {'IP_protocol_ID': 0,
476                    'start_port': 0,
477                    'end_port': 0xffff}
478         if is_ip4:
479             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
480             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
481             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
482             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
483         else:
484             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
485             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
486             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
487             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
488
489         if self.is_initiator:
490             return ([ts1], [ts2])
491         return ([ts2], [ts1])
492
493     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
494         if crypto not in CRYPTO_ALGOS:
495             raise TypeError('unsupported encryption algo %r' % crypto)
496         self.ike_crypto = crypto
497         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
498         self.ike_crypto_key_len = crypto_key_len
499
500         if integ not in AUTH_ALGOS:
501             raise TypeError('unsupported auth algo %r' % integ)
502         self.ike_integ = None if integ == 'NULL' else integ
503         self.ike_integ_alg = AUTH_ALGOS[integ]
504
505         if prf not in PRF_ALGOS:
506             raise TypeError('unsupported prf algo %r' % prf)
507         self.ike_prf = prf
508         self.ike_prf_alg = PRF_ALGOS[prf]
509         self.ike_dh = dh
510         self.ike_group = DH[self.ike_dh]
511
512     def set_esp_props(self, crypto, crypto_key_len, integ):
513         self.esp_crypto_key_len = crypto_key_len
514         if crypto not in CRYPTO_ALGOS:
515             raise TypeError('unsupported encryption algo %r' % crypto)
516         self.esp_crypto = crypto
517         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
518
519         if integ not in AUTH_ALGOS:
520             raise TypeError('unsupported auth algo %r' % integ)
521         self.esp_integ = None if integ == 'NULL' else integ
522         self.esp_integ_alg = AUTH_ALGOS[integ]
523
524     def crypto_attr(self, key_len):
525         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
526             return (0x800e << 16 | key_len << 3, 12)
527         else:
528             raise Exception('unsupported attribute type')
529
530     def ike_crypto_attr(self):
531         return self.crypto_attr(self.ike_crypto_key_len)
532
533     def esp_crypto_attr(self):
534         return self.crypto_attr(self.esp_crypto_key_len)
535
536     def compute_nat_sha1(self, ip, port, rspi=None):
537         if rspi is None:
538             rspi = self.rspi
539         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
540         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
541         digest.update(data)
542         return digest.finalize()
543
544
545 class IkePeer(VppTestCase):
546     """ common class for initiator and responder """
547
548     @classmethod
549     def setUpClass(cls):
550         import scapy.contrib.ikev2 as _ikev2
551         globals()['ikev2'] = _ikev2
552         super(IkePeer, cls).setUpClass()
553         cls.create_pg_interfaces(range(2))
554         for i in cls.pg_interfaces:
555             i.admin_up()
556             i.config_ip4()
557             i.resolve_arp()
558             i.config_ip6()
559             i.resolve_ndp()
560
561     @classmethod
562     def tearDownClass(cls):
563         super(IkePeer, cls).tearDownClass()
564
565     def tearDown(self):
566         super(IkePeer, self).tearDown()
567         if self.del_sa_from_responder:
568             self.initiate_del_sa_from_responder()
569         else:
570             self.initiate_del_sa_from_initiator()
571         r = self.vapi.ikev2_sa_dump()
572         self.assertEqual(len(r), 0)
573         sas = self.vapi.ipsec_sa_dump()
574         self.assertEqual(len(sas), 0)
575         self.p.remove_vpp_config()
576         self.assertIsNone(self.p.query_vpp_config())
577
578     def setUp(self):
579         super(IkePeer, self).setUp()
580         self.config_tc()
581         self.p.add_vpp_config()
582         self.assertIsNotNone(self.p.query_vpp_config())
583         if self.sa.is_initiator:
584             self.sa.generate_dh_data()
585         self.vapi.cli('ikev2 set logging level 4')
586         self.vapi.cli('event-lo clear')
587         self.vapi.cli('ikev2 dpd disable')
588
589     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
590                       use_ip6=False):
591         if use_ip6:
592             src_ip = src_if.remote_ip6
593             dst_ip = src_if.local_ip6
594             ip_layer = IPv6
595         else:
596             src_ip = src_if.remote_ip4
597             dst_ip = src_if.local_ip4
598             ip_layer = IP
599         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
600                ip_layer(src=src_ip, dst=dst_ip) /
601                UDP(sport=sport, dport=dport))
602         if natt:
603             # insert non ESP marker
604             res = res / Raw(b'\x00' * 4)
605         return res / msg
606
607     def verify_udp(self, udp):
608         self.assertEqual(udp.sport, self.sa.sport)
609         self.assertEqual(udp.dport, self.sa.dport)
610
611     def get_ike_header(self, packet):
612         try:
613             ih = packet[ikev2.IKEv2]
614         except IndexError as e:
615             # this is a workaround for getting IKEv2 layer as both ikev2 and
616             # ipsec register for port 4500
617             esp = packet[ESP]
618             ih = self.verify_and_remove_non_esp_marker(esp)
619         self.assertEqual(ih.version, 0x20)
620         self.assertNotIn('Version', ih.flags)
621         return ih
622
623     def verify_and_remove_non_esp_marker(self, packet):
624         if self.sa.natt:
625             # if we are in nat traversal mode check for non esp marker
626             # and remove it
627             data = raw(packet)
628             self.assertEqual(data[:4], b'\x00' * 4)
629             return ikev2.IKEv2(data[4:])
630         else:
631             return packet
632
633     def encrypt_ike_msg(self, header, plain, first_payload):
634         if self.sa.ike_crypto == 'AES-GCM-16ICV':
635             data = self.sa.ike_crypto_alg.pad(raw(plain))
636             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
637                 len(ikev2.IKEv2_payload_Encrypted())
638             tlen = plen + len(ikev2.IKEv2())
639
640             # prepare aad data
641             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
642                                                  length=plen)
643             header.length = tlen
644             res = header / sk_p
645             encr = self.sa.encrypt(raw(plain), raw(res))
646             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
647                                                  length=plen, load=encr)
648             res = header / sk_p
649         else:
650             encr = self.sa.encrypt(raw(plain))
651             trunc_len = self.sa.ike_integ_alg.trunc_len
652             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
653             tlen = plen + len(ikev2.IKEv2())
654
655             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
656                                                  length=plen, load=encr)
657             header.length = tlen
658             res = header / sk_p
659
660             integ_data = raw(res)
661             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
662                                              self.sa.my_authkey, integ_data)
663             res = res / Raw(hmac_data[:trunc_len])
664         assert(len(res) == tlen)
665         return res
666
667     def verify_udp_encap(self, ipsec_sa):
668         e = VppEnum.vl_api_ipsec_sad_flags_t
669         if self.sa.udp_encap or self.sa.natt:
670             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
671         else:
672             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
673
674     def verify_ipsec_sas(self, is_rekey=False):
675         sas = self.vapi.ipsec_sa_dump()
676         if is_rekey:
677             # after rekey there is a short period of time in which old
678             # inbound SA is still present
679             sa_count = 3
680         else:
681             sa_count = 2
682         self.assertEqual(len(sas), sa_count)
683         if self.sa.is_initiator:
684             if is_rekey:
685                 sa0 = sas[0].entry
686                 sa1 = sas[2].entry
687             else:
688                 sa0 = sas[0].entry
689                 sa1 = sas[1].entry
690         else:
691             if is_rekey:
692                 sa0 = sas[2].entry
693                 sa1 = sas[0].entry
694             else:
695                 sa1 = sas[0].entry
696                 sa0 = sas[1].entry
697
698         c = self.sa.child_sas[0]
699
700         self.verify_udp_encap(sa0)
701         self.verify_udp_encap(sa1)
702         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
703         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
704         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
705
706         if self.sa.esp_integ is None:
707             vpp_integ_alg = 0
708         else:
709             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
710         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
711         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
712
713         # verify crypto keys
714         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
715         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
716         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
717         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
718
719         # verify integ keys
720         if vpp_integ_alg:
721             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
722             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
723             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
724             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
725         else:
726             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
727             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
728
729     def verify_keymat(self, api_keys, keys, name):
730         km = getattr(keys, name)
731         api_km = getattr(api_keys, name)
732         api_km_len = getattr(api_keys, name + '_len')
733         self.assertEqual(len(km), api_km_len)
734         self.assertEqual(km, api_km[:api_km_len])
735
736     def verify_id(self, api_id, exp_id):
737         self.assertEqual(api_id.type, IDType.value(exp_id.type))
738         self.assertEqual(api_id.data_len, exp_id.data_len)
739         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
740
741     def verify_ike_sas(self):
742         r = self.vapi.ikev2_sa_dump()
743         self.assertEqual(len(r), 1)
744         sa = r[0].sa
745         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
746         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
747         if self.ip6:
748             if self.sa.is_initiator:
749                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
750                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
751             else:
752                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
753                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
754         else:
755             if self.sa.is_initiator:
756                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
757                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
758             else:
759                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
760                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
761         self.verify_keymat(sa.keys, self.sa, 'sk_d')
762         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
763         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
764         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
765         self.verify_keymat(sa.keys, self.sa, 'sk_er')
766         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
767         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
768
769         self.assertEqual(sa.i_id.type, self.sa.id_type)
770         self.assertEqual(sa.r_id.type, self.sa.id_type)
771         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
772         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
773         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
774         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
775
776         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
777         self.assertEqual(len(r), 1)
778         csa = r[0].child_sa
779         self.assertEqual(csa.sa_index, sa.sa_index)
780         c = self.sa.child_sas[0]
781         if hasattr(c, 'sk_ai'):
782             self.verify_keymat(csa.keys, c, 'sk_ai')
783             self.verify_keymat(csa.keys, c, 'sk_ar')
784         self.verify_keymat(csa.keys, c, 'sk_ei')
785         self.verify_keymat(csa.keys, c, 'sk_er')
786         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
787         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
788
789         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
790         tsi = tsi[0]
791         tsr = tsr[0]
792         r = self.vapi.ikev2_traffic_selector_dump(
793                 is_initiator=True, sa_index=sa.sa_index,
794                 child_sa_index=csa.child_sa_index)
795         self.assertEqual(len(r), 1)
796         ts = r[0].ts
797         self.verify_ts(r[0].ts, tsi[0], True)
798
799         r = self.vapi.ikev2_traffic_selector_dump(
800                 is_initiator=False, sa_index=sa.sa_index,
801                 child_sa_index=csa.child_sa_index)
802         self.assertEqual(len(r), 1)
803         self.verify_ts(r[0].ts, tsr[0], False)
804
805         n = self.vapi.ikev2_nonce_get(is_initiator=True,
806                                       sa_index=sa.sa_index)
807         self.verify_nonce(n, self.sa.i_nonce)
808         n = self.vapi.ikev2_nonce_get(is_initiator=False,
809                                       sa_index=sa.sa_index)
810         self.verify_nonce(n, self.sa.r_nonce)
811
812     def verify_nonce(self, api_nonce, nonce):
813         self.assertEqual(api_nonce.data_len, len(nonce))
814         self.assertEqual(api_nonce.nonce, nonce)
815
816     def verify_ts(self, api_ts, ts, is_initiator):
817         if is_initiator:
818             self.assertTrue(api_ts.is_local)
819         else:
820             self.assertFalse(api_ts.is_local)
821
822         if self.p.ts_is_ip4:
823             self.assertEqual(api_ts.start_addr,
824                              IPv4Address(ts.starting_address_v4))
825             self.assertEqual(api_ts.end_addr,
826                              IPv4Address(ts.ending_address_v4))
827         else:
828             self.assertEqual(api_ts.start_addr,
829                              IPv6Address(ts.starting_address_v6))
830             self.assertEqual(api_ts.end_addr,
831                              IPv6Address(ts.ending_address_v6))
832         self.assertEqual(api_ts.start_port, ts.start_port)
833         self.assertEqual(api_ts.end_port, ts.end_port)
834         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
835
836
837 class TemplateInitiator(IkePeer):
838     """ initiator test template """
839
840     def initiate_del_sa_from_initiator(self):
841         ispi = int.from_bytes(self.sa.ispi, 'little')
842         self.pg0.enable_capture()
843         self.pg_start()
844         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
845         capture = self.pg0.get_capture(1)
846         ih = self.get_ike_header(capture[0])
847         self.assertNotIn('Response', ih.flags)
848         self.assertIn('Initiator', ih.flags)
849         self.assertEqual(ih.init_SPI, self.sa.ispi)
850         self.assertEqual(ih.resp_SPI, self.sa.rspi)
851         plain = self.sa.hmac_and_decrypt(ih)
852         d = ikev2.IKEv2_payload_Delete(plain)
853         self.assertEqual(d.proto, 1)  # proto=IKEv2
854         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
855                              flags='Response', exch_type='INFORMATIONAL',
856                              id=ih.id, next_payload='Encrypted')
857         resp = self.encrypt_ike_msg(header, b'', None)
858         self.send_and_assert_no_replies(self.pg0, resp)
859
860     def verify_del_sa(self, packet):
861         ih = self.get_ike_header(packet)
862         self.assertEqual(ih.id, self.sa.msg_id)
863         self.assertEqual(ih.exch_type, 37)  # exchange informational
864         self.assertIn('Response', ih.flags)
865         self.assertIn('Initiator', ih.flags)
866         plain = self.sa.hmac_and_decrypt(ih)
867         self.assertEqual(plain, b'')
868
869     def initiate_del_sa_from_responder(self):
870         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
871                              exch_type='INFORMATIONAL',
872                              id=self.sa.new_msg_id())
873         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
874         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
875         packet = self.create_packet(self.pg0, ike_msg,
876                                     self.sa.sport, self.sa.dport,
877                                     self.sa.natt, self.ip6)
878         self.pg0.add_stream(packet)
879         self.pg0.enable_capture()
880         self.pg_start()
881         capture = self.pg0.get_capture(1)
882         self.verify_del_sa(capture[0])
883
884     @staticmethod
885     def find_notify_payload(packet, notify_type):
886         n = packet[ikev2.IKEv2_payload_Notify]
887         while n is not None:
888             if n.type == notify_type:
889                 return n
890             n = n.payload
891         return None
892
893     def verify_nat_detection(self, packet):
894         if self.ip6:
895             iph = packet[IPv6]
896         else:
897             iph = packet[IP]
898         udp = packet[UDP]
899
900         # NAT_DETECTION_SOURCE_IP
901         s = self.find_notify_payload(packet, 16388)
902         self.assertIsNotNone(s)
903         src_sha = self.sa.compute_nat_sha1(
904                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
905         self.assertEqual(s.load, src_sha)
906
907         # NAT_DETECTION_DESTINATION_IP
908         s = self.find_notify_payload(packet, 16389)
909         self.assertIsNotNone(s)
910         dst_sha = self.sa.compute_nat_sha1(
911                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
912         self.assertEqual(s.load, dst_sha)
913
914     def verify_sa_init_request(self, packet):
915         ih = packet[ikev2.IKEv2]
916         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
917         self.assertEqual(ih.exch_type, 34)  # SA_INIT
918         self.sa.ispi = ih.init_SPI
919         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
920         self.assertIn('Initiator', ih.flags)
921         self.assertNotIn('Response', ih.flags)
922         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
923         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
924
925         prop = packet[ikev2.IKEv2_payload_Proposal]
926         self.assertEqual(prop.proto, 1)  # proto = ikev2
927         self.assertEqual(prop.proposal, 1)
928         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
929         self.assertEqual(prop.trans[0].transform_id,
930                          self.p.ike_transforms['crypto_alg'])
931         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
932         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
933         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
934         self.assertEqual(prop.trans[2].transform_id,
935                          self.p.ike_transforms['dh_group'])
936
937         self.verify_nat_detection(packet)
938         self.sa.set_ike_props(
939                     crypto='AES-GCM-16ICV', crypto_key_len=32,
940                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
941         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
942                               integ='SHA2-256-128')
943         self.sa.generate_dh_data()
944         self.sa.complete_dh_data()
945         self.sa.calc_keys()
946
947     def update_esp_transforms(self, trans, sa):
948         while trans:
949             if trans.transform_type == 1:  # ecryption
950                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
951             elif trans.transform_type == 3:  # integrity
952                 sa.esp_integ = INTEG_IDS[trans.transform_id]
953             trans = trans.payload
954
955     def verify_sa_auth_req(self, packet):
956         ih = self.get_ike_header(packet)
957         self.assertEqual(ih.resp_SPI, self.sa.rspi)
958         self.assertEqual(ih.init_SPI, self.sa.ispi)
959         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
960         self.assertIn('Initiator', ih.flags)
961         self.assertNotIn('Response', ih.flags)
962
963         udp = packet[UDP]
964         self.verify_udp(udp)
965         self.assertEqual(ih.id, self.sa.msg_id + 1)
966         self.sa.msg_id += 1
967         plain = self.sa.hmac_and_decrypt(ih)
968         idi = ikev2.IKEv2_payload_IDi(plain)
969         idr = ikev2.IKEv2_payload_IDr(idi.payload)
970         self.assertEqual(idi.load, self.sa.i_id)
971         self.assertEqual(idr.load, self.sa.r_id)
972         prop = idi[ikev2.IKEv2_payload_Proposal]
973         c = self.sa.child_sas[0]
974         c.ispi = prop.SPI
975         self.update_esp_transforms(
976                 prop[ikev2.IKEv2_payload_Transform], self.sa)
977
978     def send_init_response(self):
979         tr_attr = self.sa.ike_crypto_attr()
980         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
981                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
982                  key_length=tr_attr[0]) /
983                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
984                  transform_id=self.sa.ike_integ) /
985                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
986                  transform_id=self.sa.ike_prf_alg.name) /
987                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
988                  transform_id=self.sa.ike_dh))
989         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
990                  trans_nb=4, trans=trans))
991         self.sa.init_resp_packet = (
992             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
993                         exch_type='IKE_SA_INIT', flags='Response') /
994             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
995             ikev2.IKEv2_payload_KE(next_payload='Nonce',
996                                    group=self.sa.ike_dh,
997                                    load=self.sa.my_dh_pub_key) /
998             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
999
1000         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1001                                      self.sa.sport, self.sa.dport,
1002                                      self.sa.natt, self.ip6)
1003         self.pg_send(self.pg0, ike_msg)
1004         capture = self.pg0.get_capture(1)
1005         self.verify_sa_auth_req(capture[0])
1006
1007     def initiate_sa_init(self):
1008         self.pg0.enable_capture()
1009         self.pg_start()
1010         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1011
1012         capture = self.pg0.get_capture(1)
1013         self.verify_sa_init_request(capture[0])
1014         self.send_init_response()
1015
1016     def send_auth_response(self):
1017         tr_attr = self.sa.esp_crypto_attr()
1018         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1019                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1020                  key_length=tr_attr[0]) /
1021                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1022                  transform_id=self.sa.esp_integ) /
1023                  ikev2.IKEv2_payload_Transform(
1024                  transform_type='Extended Sequence Number',
1025                  transform_id='No ESN') /
1026                  ikev2.IKEv2_payload_Transform(
1027                  transform_type='Extended Sequence Number',
1028                  transform_id='ESN'))
1029
1030         c = self.sa.child_sas[0]
1031         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1032                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1033
1034         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1035         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1036                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1037                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1038                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1039                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1040                  auth_type=AuthMethod.value(self.sa.auth_method),
1041                  load=self.sa.auth_data) /
1042                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1043                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1044                  number_of_TSs=len(tsi),
1045                  traffic_selector=tsi) /
1046                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1047                  number_of_TSs=len(tsr),
1048                  traffic_selector=tsr) /
1049                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1050
1051         header = ikev2.IKEv2(
1052                 init_SPI=self.sa.ispi,
1053                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1054                 flags='Response', exch_type='IKE_AUTH')
1055
1056         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1057         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1058                                     self.sa.dport, self.sa.natt, self.ip6)
1059         self.pg_send(self.pg0, packet)
1060
1061     def test_initiator(self):
1062         self.initiate_sa_init()
1063         self.sa.auth_init()
1064         self.sa.calc_child_keys()
1065         self.send_auth_response()
1066         self.verify_ike_sas()
1067
1068
1069 class TemplateResponder(IkePeer):
1070     """ responder test template """
1071
1072     def initiate_del_sa_from_responder(self):
1073         self.pg0.enable_capture()
1074         self.pg_start()
1075         self.vapi.ikev2_initiate_del_ike_sa(
1076                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1077         capture = self.pg0.get_capture(1)
1078         ih = self.get_ike_header(capture[0])
1079         self.assertNotIn('Response', ih.flags)
1080         self.assertNotIn('Initiator', ih.flags)
1081         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1082         plain = self.sa.hmac_and_decrypt(ih)
1083         d = ikev2.IKEv2_payload_Delete(plain)
1084         self.assertEqual(d.proto, 1)  # proto=IKEv2
1085         self.assertEqual(ih.init_SPI, self.sa.ispi)
1086         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1087         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1088                              flags='Initiator+Response',
1089                              exch_type='INFORMATIONAL',
1090                              id=ih.id, next_payload='Encrypted')
1091         resp = self.encrypt_ike_msg(header, b'', None)
1092         self.send_and_assert_no_replies(self.pg0, resp)
1093
1094     def verify_del_sa(self, packet):
1095         ih = self.get_ike_header(packet)
1096         self.assertEqual(ih.id, self.sa.msg_id)
1097         self.assertEqual(ih.exch_type, 37)  # exchange informational
1098         self.assertIn('Response', ih.flags)
1099         self.assertNotIn('Initiator', ih.flags)
1100         self.assertEqual(ih.next_payload, 46)  # Encrypted
1101         self.assertEqual(ih.init_SPI, self.sa.ispi)
1102         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1103         plain = self.sa.hmac_and_decrypt(ih)
1104         self.assertEqual(plain, b'')
1105
1106     def initiate_del_sa_from_initiator(self):
1107         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1108                              flags='Initiator', exch_type='INFORMATIONAL',
1109                              id=self.sa.new_msg_id())
1110         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1111         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1112         packet = self.create_packet(self.pg0, ike_msg,
1113                                     self.sa.sport, self.sa.dport,
1114                                     self.sa.natt, self.ip6)
1115         self.pg0.add_stream(packet)
1116         self.pg0.enable_capture()
1117         self.pg_start()
1118         capture = self.pg0.get_capture(1)
1119         self.verify_del_sa(capture[0])
1120
1121     def send_sa_init_req(self, behind_nat=False):
1122         tr_attr = self.sa.ike_crypto_attr()
1123         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1124                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1125                  key_length=tr_attr[0]) /
1126                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1127                  transform_id=self.sa.ike_integ) /
1128                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1129                  transform_id=self.sa.ike_prf_alg.name) /
1130                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1131                  transform_id=self.sa.ike_dh))
1132
1133         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1134                  trans_nb=4, trans=trans))
1135
1136         self.sa.init_req_packet = (
1137                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1138                             flags='Initiator', exch_type='IKE_SA_INIT') /
1139                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1140                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1141                                        group=self.sa.ike_dh,
1142                                        load=self.sa.my_dh_pub_key) /
1143                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1144                                           load=self.sa.i_nonce))
1145
1146         if behind_nat:
1147             src_address = b'\x0a\x0a\x0a\x01'
1148         else:
1149             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1150
1151         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1152         dst_nat = self.sa.compute_nat_sha1(
1153                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1154                 self.sa.dport)
1155         nat_src_detection = ikev2.IKEv2_payload_Notify(
1156                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1157                 next_payload='Notify')
1158         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1159                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1160         self.sa.init_req_packet = (self.sa.init_req_packet /
1161                                    nat_src_detection /
1162                                    nat_dst_detection)
1163
1164         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1165                                      self.sa.sport, self.sa.dport,
1166                                      self.sa.natt, self.ip6)
1167         self.pg0.add_stream(ike_msg)
1168         self.pg0.enable_capture()
1169         self.pg_start()
1170         capture = self.pg0.get_capture(1)
1171         self.verify_sa_init(capture[0])
1172
1173     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1174         tr_attr = self.sa.esp_crypto_attr()
1175         last_payload = last_payload or 'Notify'
1176         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1177                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1178                  key_length=tr_attr[0]) /
1179                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1180                  transform_id=self.sa.esp_integ) /
1181                  ikev2.IKEv2_payload_Transform(
1182                  transform_type='Extended Sequence Number',
1183                  transform_id='No ESN') /
1184                  ikev2.IKEv2_payload_Transform(
1185                  transform_type='Extended Sequence Number',
1186                  transform_id='ESN'))
1187
1188         c = self.sa.child_sas[0]
1189         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1190                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1191
1192         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1193         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1194                  auth_type=AuthMethod.value(self.sa.auth_method),
1195                  load=self.sa.auth_data) /
1196                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1197                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1198                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1199                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1200                  number_of_TSs=len(tsr), traffic_selector=tsr))
1201
1202         if is_rekey:
1203             first_payload = 'Nonce'
1204             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1205                      next_payload='SA') / plain /
1206                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1207                      proto='ESP', SPI=c.ispi))
1208         else:
1209             first_payload = 'IDi'
1210             ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1211                    IDtype=self.sa.id_type, load=self.sa.i_id) /
1212                    ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1213                    IDtype=self.sa.id_type, load=self.sa.r_id))
1214             plain = ids / plain
1215         return plain, first_payload
1216
1217     def send_sa_auth(self):
1218         plain, first_payload = self.generate_auth_payload(
1219                     last_payload='Notify')
1220         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1221         header = ikev2.IKEv2(
1222                 init_SPI=self.sa.ispi,
1223                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1224                 flags='Initiator', exch_type='IKE_AUTH')
1225
1226         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1227         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1228                                     self.sa.dport, self.sa.natt, self.ip6)
1229         self.pg0.add_stream(packet)
1230         self.pg0.enable_capture()
1231         self.pg_start()
1232         capture = self.pg0.get_capture(1)
1233         self.verify_sa_auth_resp(capture[0])
1234
1235     def verify_sa_init(self, packet):
1236         ih = self.get_ike_header(packet)
1237
1238         self.assertEqual(ih.id, self.sa.msg_id)
1239         self.assertEqual(ih.exch_type, 34)
1240         self.assertIn('Response', ih.flags)
1241         self.assertEqual(ih.init_SPI, self.sa.ispi)
1242         self.assertNotEqual(ih.resp_SPI, 0)
1243         self.sa.rspi = ih.resp_SPI
1244         try:
1245             sa = ih[ikev2.IKEv2_payload_SA]
1246             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1247             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1248         except IndexError as e:
1249             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1250             self.logger.error(ih.show())
1251             raise
1252         self.sa.complete_dh_data()
1253         self.sa.calc_keys()
1254         self.sa.auth_init()
1255
1256     def verify_sa_auth_resp(self, packet):
1257         ike = self.get_ike_header(packet)
1258         udp = packet[UDP]
1259         self.verify_udp(udp)
1260         self.assertEqual(ike.id, self.sa.msg_id)
1261         plain = self.sa.hmac_and_decrypt(ike)
1262         idr = ikev2.IKEv2_payload_IDr(plain)
1263         prop = idr[ikev2.IKEv2_payload_Proposal]
1264         self.assertEqual(prop.SPIsize, 4)
1265         self.sa.child_sas[0].rspi = prop.SPI
1266         self.sa.calc_child_keys()
1267
1268     def test_responder(self):
1269         self.send_sa_init_req(self.sa.natt)
1270         self.send_sa_auth()
1271         self.verify_ipsec_sas()
1272         self.verify_ike_sas()
1273
1274
1275 class Ikev2Params(object):
1276     def config_params(self, params={}):
1277         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1278         ei = VppEnum.vl_api_ipsec_integ_alg_t
1279         self.vpp_enums = {
1280                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1281                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1282                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1283                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1284                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1285                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1286
1287                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1288                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1289                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1290                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1291
1292         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1293             not in params else params['del_sa_from_responder']
1294         is_natt = 'natt' in params and params['natt'] or False
1295         self.p = Profile(self, 'pr1')
1296         self.ip6 = False if 'ip6' not in params else params['ip6']
1297
1298         if 'auth' in params and params['auth'] == 'rsa-sig':
1299             auth_method = 'rsa-sig'
1300             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1301             self.vapi.ikev2_set_local_key(
1302                     key_file=work_dir + params['server-key'])
1303
1304             client_file = work_dir + params['client-cert']
1305             server_pem = open(work_dir + params['server-cert']).read()
1306             client_priv = open(work_dir + params['client-key']).read()
1307             client_priv = load_pem_private_key(str.encode(client_priv), None,
1308                                                default_backend())
1309             self.peer_cert = x509.load_pem_x509_certificate(
1310                     str.encode(server_pem),
1311                     default_backend())
1312             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1313             auth_data = None
1314         else:
1315             auth_data = b'$3cr3tpa$$w0rd'
1316             self.p.add_auth(method='shared-key', data=auth_data)
1317             auth_method = 'shared-key'
1318             client_priv = None
1319
1320         is_init = True if 'is_initiator' not in params else\
1321             params['is_initiator']
1322
1323         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1324         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1325         if is_init:
1326             self.p.add_local_id(**idr)
1327             self.p.add_remote_id(**idi)
1328         else:
1329             self.p.add_local_id(**idi)
1330             self.p.add_remote_id(**idr)
1331
1332         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1333             'loc_ts' not in params else params['loc_ts']
1334         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1335             'rem_ts' not in params else params['rem_ts']
1336         self.p.add_local_ts(**loc_ts)
1337         self.p.add_remote_ts(**rem_ts)
1338         if 'responder' in params:
1339             self.p.add_responder(params['responder'])
1340         if 'ike_transforms' in params:
1341             self.p.add_ike_transforms(params['ike_transforms'])
1342         if 'esp_transforms' in params:
1343             self.p.add_esp_transforms(params['esp_transforms'])
1344
1345         udp_encap = False if 'udp_encap' not in params else\
1346             params['udp_encap']
1347         if udp_encap:
1348             self.p.set_udp_encap(True)
1349
1350         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1351                           is_initiator=is_init,
1352                           id_type=self.p.local_id['id_type'], natt=is_natt,
1353                           priv_key=client_priv, auth_method=auth_method,
1354                           auth_data=auth_data, udp_encap=udp_encap,
1355                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1356         if is_init:
1357             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1358                 params['ike-crypto']
1359             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1360                 params['ike-integ']
1361             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1362                 params['ike-dh']
1363
1364             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1365                 params['esp-crypto']
1366             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1367                 params['esp-integ']
1368
1369             self.sa.set_ike_props(
1370                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1371                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1372             self.sa.set_esp_props(
1373                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1374                     integ=esp_integ)
1375
1376
1377 class TestApi(VppTestCase):
1378     """ Test IKEV2 API """
1379     @classmethod
1380     def setUpClass(cls):
1381         super(TestApi, cls).setUpClass()
1382
1383     @classmethod
1384     def tearDownClass(cls):
1385         super(TestApi, cls).tearDownClass()
1386
1387     def tearDown(self):
1388         super(TestApi, self).tearDown()
1389         self.p1.remove_vpp_config()
1390         self.p2.remove_vpp_config()
1391         r = self.vapi.ikev2_profile_dump()
1392         self.assertEqual(len(r), 0)
1393
1394     def configure_profile(self, cfg):
1395         p = Profile(self, cfg['name'])
1396         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1397         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1398         p.add_local_ts(**cfg['loc_ts'])
1399         p.add_remote_ts(**cfg['rem_ts'])
1400         p.add_responder(cfg['responder'])
1401         p.add_ike_transforms(cfg['ike_ts'])
1402         p.add_esp_transforms(cfg['esp_ts'])
1403         p.add_auth(**cfg['auth'])
1404         p.set_udp_encap(cfg['udp_encap'])
1405         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1406         if 'lifetime_data' in cfg:
1407             p.set_lifetime_data(cfg['lifetime_data'])
1408         if 'tun_itf' in cfg:
1409             p.set_tunnel_interface(cfg['tun_itf'])
1410         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1411             p.disable_natt()
1412         p.add_vpp_config()
1413         return p
1414
1415     def test_profile_api(self):
1416         """ test profile dump API """
1417         loc_ts4 = {
1418                     'proto': 8,
1419                     'start_port': 1,
1420                     'end_port': 19,
1421                     'start_addr': '3.3.3.2',
1422                     'end_addr': '3.3.3.3',
1423                 }
1424         rem_ts4 = {
1425                     'proto': 9,
1426                     'start_port': 10,
1427                     'end_port': 119,
1428                     'start_addr': '4.5.76.80',
1429                     'end_addr': '2.3.4.6',
1430                 }
1431
1432         loc_ts6 = {
1433                     'proto': 8,
1434                     'start_port': 1,
1435                     'end_port': 19,
1436                     'start_addr': 'ab::1',
1437                     'end_addr': 'ab::4',
1438                 }
1439         rem_ts6 = {
1440                     'proto': 9,
1441                     'start_port': 10,
1442                     'end_port': 119,
1443                     'start_addr': 'cd::12',
1444                     'end_addr': 'cd::13',
1445                 }
1446
1447         conf = {
1448             'p1': {
1449                 'name': 'p1',
1450                 'natt_disabled': True,
1451                 'loc_id': ('fqdn', b'vpp.home'),
1452                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1453                 'loc_ts': loc_ts4,
1454                 'rem_ts': rem_ts4,
1455                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1456                 'ike_ts': {
1457                         'crypto_alg': 20,
1458                         'crypto_key_size': 32,
1459                         'integ_alg': 1,
1460                         'dh_group': 1},
1461                 'esp_ts': {
1462                         'crypto_alg': 13,
1463                         'crypto_key_size': 24,
1464                         'integ_alg': 2},
1465                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1466                 'udp_encap': True,
1467                 'ipsec_over_udp_port': 4501,
1468                 'lifetime_data': {
1469                     'lifetime': 123,
1470                     'lifetime_maxdata': 20192,
1471                     'lifetime_jitter': 9,
1472                     'handover': 132},
1473             },
1474             'p2': {
1475                 'name': 'p2',
1476                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1477                 'rem_id': ('ip6-addr', b'abcd::1'),
1478                 'loc_ts': loc_ts6,
1479                 'rem_ts': rem_ts6,
1480                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1481                 'ike_ts': {
1482                         'crypto_alg': 12,
1483                         'crypto_key_size': 16,
1484                         'integ_alg': 3,
1485                         'dh_group': 3},
1486                 'esp_ts': {
1487                         'crypto_alg': 9,
1488                         'crypto_key_size': 24,
1489                         'integ_alg': 4},
1490                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1491                 'udp_encap': False,
1492                 'ipsec_over_udp_port': 4600,
1493                 'tun_itf': 0}
1494         }
1495         self.p1 = self.configure_profile(conf['p1'])
1496         self.p2 = self.configure_profile(conf['p2'])
1497
1498         r = self.vapi.ikev2_profile_dump()
1499         self.assertEqual(len(r), 2)
1500         self.verify_profile(r[0].profile, conf['p1'])
1501         self.verify_profile(r[1].profile, conf['p2'])
1502
1503     def verify_id(self, api_id, cfg_id):
1504         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1505         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1506
1507     def verify_ts(self, api_ts, cfg_ts):
1508         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1509         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1510         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1511         self.assertEqual(api_ts.start_addr,
1512                          ip_address(text_type(cfg_ts['start_addr'])))
1513         self.assertEqual(api_ts.end_addr,
1514                          ip_address(text_type(cfg_ts['end_addr'])))
1515
1516     def verify_responder(self, api_r, cfg_r):
1517         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1518         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1519
1520     def verify_transforms(self, api_ts, cfg_ts):
1521         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1522         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1523         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1524
1525     def verify_ike_transforms(self, api_ts, cfg_ts):
1526         self.verify_transforms(api_ts, cfg_ts)
1527         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1528
1529     def verify_esp_transforms(self, api_ts, cfg_ts):
1530         self.verify_transforms(api_ts, cfg_ts)
1531
1532     def verify_auth(self, api_auth, cfg_auth):
1533         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1534         self.assertEqual(api_auth.data, cfg_auth['data'])
1535         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1536
1537     def verify_lifetime_data(self, p, ld):
1538         self.assertEqual(p.lifetime, ld['lifetime'])
1539         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1540         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1541         self.assertEqual(p.handover, ld['handover'])
1542
1543     def verify_profile(self, ap, cp):
1544         self.assertEqual(ap.name, cp['name'])
1545         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1546         self.verify_id(ap.loc_id, cp['loc_id'])
1547         self.verify_id(ap.rem_id, cp['rem_id'])
1548         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1549         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1550         self.verify_responder(ap.responder, cp['responder'])
1551         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1552         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1553         self.verify_auth(ap.auth, cp['auth'])
1554         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1555         self.assertTrue(natt_dis == ap.natt_disabled)
1556
1557         if 'lifetime_data' in cp:
1558             self.verify_lifetime_data(ap, cp['lifetime_data'])
1559         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1560         if 'tun_itf' in cp:
1561             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1562         else:
1563             self.assertEqual(ap.tun_itf, 0xffffffff)
1564
1565
1566 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1567     """ test ikev2 initiator - pre shared key auth """
1568
1569     def config_tc(self):
1570         self.config_params({
1571             'is_initiator': False,  # seen from test case perspective
1572                                     # thus vpp is initiator
1573             'responder': {'sw_if_index': self.pg0.sw_if_index,
1574                            'addr': self.pg0.remote_ip4},
1575             'ike-crypto': ('AES-GCM-16ICV', 32),
1576             'ike-integ': 'NULL',
1577             'ike-dh': '3072MODPgr',
1578             'ike_transforms': {
1579                 'crypto_alg': 20,  # "aes-gcm-16"
1580                 'crypto_key_size': 256,
1581                 'dh_group': 15,  # "modp-3072"
1582             },
1583             'esp_transforms': {
1584                 'crypto_alg': 12,  # "aes-cbc"
1585                 'crypto_key_size': 256,
1586                 # "hmac-sha2-256-128"
1587                 'integ_alg': 12}})
1588
1589
1590 class TestInitiatorRekey(TestInitiatorPsk):
1591     """ test ikev2 initiator - rekey """
1592
1593     def rekey_from_initiator(self):
1594         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1595         self.pg0.enable_capture()
1596         self.pg_start()
1597         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1598         capture = self.pg0.get_capture(1)
1599         ih = self.get_ike_header(capture[0])
1600         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1601         self.assertNotIn('Response', ih.flags)
1602         self.assertIn('Initiator', ih.flags)
1603         plain = self.sa.hmac_and_decrypt(ih)
1604         sa = ikev2.IKEv2_payload_SA(plain)
1605         prop = sa[ikev2.IKEv2_payload_Proposal]
1606         nonce = sa[ikev2.IKEv2_payload_Nonce]
1607         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1608         self.sa.r_nonce = self.sa.i_nonce
1609         # update new responder SPI
1610         self.sa.child_sas[0].ispi = prop.SPI
1611         self.sa.child_sas[0].rspi = prop.SPI
1612         self.sa.calc_child_keys()
1613         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1614                              flags='Response', exch_type=36,
1615                              id=ih.id, next_payload='Encrypted')
1616         resp = self.encrypt_ike_msg(header, sa, 'SA')
1617         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1618                                     self.sa.dport, self.sa.natt, self.ip6)
1619         self.send_and_assert_no_replies(self.pg0, packet)
1620
1621     def test_initiator(self):
1622         super(TestInitiatorRekey, self).test_initiator()
1623         self.rekey_from_initiator()
1624         self.verify_ike_sas()
1625         self.verify_ipsec_sas(is_rekey=True)
1626
1627
1628 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1629     """ test ikev2 initiator - delete IKE SA from responder """
1630
1631     def config_tc(self):
1632         self.config_params({
1633             'del_sa_from_responder': True,
1634             'is_initiator': False,  # seen from test case perspective
1635                                     # thus vpp is initiator
1636             'responder': {'sw_if_index': self.pg0.sw_if_index,
1637                            'addr': self.pg0.remote_ip4},
1638             'ike-crypto': ('AES-GCM-16ICV', 32),
1639             'ike-integ': 'NULL',
1640             'ike-dh': '3072MODPgr',
1641             'ike_transforms': {
1642                 'crypto_alg': 20,  # "aes-gcm-16"
1643                 'crypto_key_size': 256,
1644                 'dh_group': 15,  # "modp-3072"
1645             },
1646             'esp_transforms': {
1647                 'crypto_alg': 12,  # "aes-cbc"
1648                 'crypto_key_size': 256,
1649                 # "hmac-sha2-256-128"
1650                 'integ_alg': 12}})
1651
1652
1653 class TestResponderNATT(TemplateResponder, Ikev2Params):
1654     """ test ikev2 responder - nat traversal """
1655     def config_tc(self):
1656         self.config_params(
1657                 {'natt': True})
1658
1659
1660 class TestResponderPsk(TemplateResponder, Ikev2Params):
1661     """ test ikev2 responder - pre shared key auth """
1662     def config_tc(self):
1663         self.config_params()
1664
1665
1666 class TestResponderRekey(TestResponderPsk):
1667     """ test ikev2 responder - rekey """
1668
1669     def rekey_from_initiator(self):
1670         sa, first_payload = self.generate_auth_payload(is_rekey=True)
1671         header = ikev2.IKEv2(
1672                 init_SPI=self.sa.ispi,
1673                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1674                 flags='Initiator', exch_type='CREATE_CHILD_SA')
1675
1676         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
1677         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1678                                     self.sa.dport, self.sa.natt, self.ip6)
1679         self.pg0.add_stream(packet)
1680         self.pg0.enable_capture()
1681         self.pg_start()
1682         capture = self.pg0.get_capture(1)
1683         ih = self.get_ike_header(capture[0])
1684         plain = self.sa.hmac_and_decrypt(ih)
1685         sa = ikev2.IKEv2_payload_SA(plain)
1686         prop = sa[ikev2.IKEv2_payload_Proposal]
1687         nonce = sa[ikev2.IKEv2_payload_Nonce]
1688         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1689         # update new responder SPI
1690         self.sa.child_sas[0].rspi = prop.SPI
1691
1692     def test_responder(self):
1693         super(TestResponderRekey, self).test_responder()
1694         self.rekey_from_initiator()
1695         self.sa.calc_child_keys()
1696         self.verify_ike_sas()
1697         self.verify_ipsec_sas(is_rekey=True)
1698
1699
1700 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1701     """ test ikev2 responder - cert based auth """
1702     def config_tc(self):
1703         self.config_params({
1704             'udp_encap': True,
1705             'auth': 'rsa-sig',
1706             'server-key': 'server-key.pem',
1707             'client-key': 'client-key.pem',
1708             'client-cert': 'client-cert.pem',
1709             'server-cert': 'server-cert.pem'})
1710
1711
1712 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1713         (TemplateResponder, Ikev2Params):
1714     """
1715     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1716     """
1717     def config_tc(self):
1718         self.config_params({
1719             'ike-crypto': ('AES-CBC', 16),
1720             'ike-integ': 'SHA2-256-128',
1721             'esp-crypto': ('AES-CBC', 24),
1722             'esp-integ': 'SHA2-384-192',
1723             'ike-dh': '2048MODPgr'})
1724
1725
1726 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1727         (TemplateResponder, Ikev2Params):
1728     """
1729     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1730     """
1731     def config_tc(self):
1732         self.config_params({
1733             'ike-crypto': ('AES-CBC', 32),
1734             'ike-integ': 'SHA2-256-128',
1735             'esp-crypto': ('AES-GCM-16ICV', 32),
1736             'esp-integ': 'NULL',
1737             'ike-dh': '3072MODPgr'})
1738
1739
1740 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1741     """
1742     IKE:AES_GCM_16_256
1743     """
1744     def config_tc(self):
1745         self.config_params({
1746             'del_sa_from_responder': True,
1747             'ip6': True,
1748             'natt': True,
1749             'ike-crypto': ('AES-GCM-16ICV', 32),
1750             'ike-integ': 'NULL',
1751             'ike-dh': '2048MODPgr',
1752             'loc_ts': {'start_addr': 'ab:cd::0',
1753                        'end_addr': 'ab:cd::10'},
1754             'rem_ts': {'start_addr': '11::0',
1755                        'end_addr': '11::100'}})
1756
1757
1758 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1759     """ malformed packet test """
1760
1761     def tearDown(self):
1762         pass
1763
1764     def config_tc(self):
1765         self.config_params()
1766
1767     def assert_counter(self, count, name, version='ip4'):
1768         node_name = '/err/ikev2-%s/' % version + name
1769         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1770
1771     def create_ike_init_msg(self, length=None, payload=None):
1772         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1773                           flags='Initiator', exch_type='IKE_SA_INIT')
1774         if payload is not None:
1775             msg /= payload
1776         return self.create_packet(self.pg0, msg, self.sa.sport,
1777                                   self.sa.dport)
1778
1779     def verify_bad_packet_length(self):
1780         ike_msg = self.create_ike_init_msg(length=0xdead)
1781         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1782         self.assert_counter(self.pkt_count, 'Bad packet length')
1783
1784     def verify_bad_sa_payload_length(self):
1785         p = ikev2.IKEv2_payload_SA(length=0xdead)
1786         ike_msg = self.create_ike_init_msg(payload=p)
1787         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1788         self.assert_counter(self.pkt_count, 'Malformed packet')
1789
1790     def test_responder(self):
1791         self.pkt_count = 254
1792         self.verify_bad_packet_length()
1793         self.verify_bad_sa_payload_length()
1794
1795
1796 if __name__ == '__main__':
1797     unittest.main(testRunner=VppTestRunner)