tests: make tests less make dependent
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
26
27 try:
28     text_type = unicode
29 except NameError:
30     text_type = str
31
32 KEY_PAD = b"Key Pad for IKEv2"
33 SALT_SIZE = 4
34 GCM_ICV_SIZE = 16
35 GCM_IV_SIZE = 8
36
37
38 # defined in rfc3526
39 # tuple structure is (p, g, key_len)
40 DH = {
41     '2048MODPgr': (long_converter("""
42     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
43     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
44     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
45     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
46     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
47     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
48     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
49     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
50     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
51     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
52     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
53
54     '3072MODPgr': (long_converter("""
55     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
56     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
57     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
58     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
59     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
60     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
61     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
62     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
63     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
64     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
65     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
66     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
67     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
68     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
69     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
70     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
71 }
72
73
74 class CryptoAlgo(object):
75     def __init__(self, name, cipher, mode):
76         self.name = name
77         self.cipher = cipher
78         self.mode = mode
79         if self.cipher is not None:
80             self.bs = self.cipher.block_size // 8
81
82             if self.name == 'AES-GCM-16ICV':
83                 self.iv_len = GCM_IV_SIZE
84             else:
85                 self.iv_len = self.bs
86
87     def encrypt(self, data, key, aad=None):
88         iv = os.urandom(self.iv_len)
89         if aad is None:
90             encryptor = Cipher(self.cipher(key), self.mode(iv),
91                                default_backend()).encryptor()
92             return iv + encryptor.update(data) + encryptor.finalize()
93         else:
94             salt = key[-SALT_SIZE:]
95             nonce = salt + iv
96             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
97                                default_backend()).encryptor()
98             encryptor.authenticate_additional_data(aad)
99             data = encryptor.update(data) + encryptor.finalize()
100             data += encryptor.tag[:GCM_ICV_SIZE]
101             return iv + data
102
103     def decrypt(self, data, key, aad=None, icv=None):
104         if aad is None:
105             iv = data[:self.iv_len]
106             ct = data[self.iv_len:]
107             decryptor = Cipher(algorithms.AES(key),
108                                self.mode(iv),
109                                default_backend()).decryptor()
110             return decryptor.update(ct) + decryptor.finalize()
111         else:
112             salt = key[-SALT_SIZE:]
113             nonce = salt + data[:GCM_IV_SIZE]
114             ct = data[GCM_IV_SIZE:]
115             key = key[:-SALT_SIZE]
116             decryptor = Cipher(algorithms.AES(key),
117                                self.mode(nonce, icv, len(icv)),
118                                default_backend()).decryptor()
119             decryptor.authenticate_additional_data(aad)
120             return decryptor.update(ct) + decryptor.finalize()
121
122     def pad(self, data):
123         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
124         data = data + b'\x00' * (pad_len - 1)
125         return data + bytes([pad_len - 1])
126
127
128 class AuthAlgo(object):
129     def __init__(self, name, mac, mod, key_len, trunc_len=None):
130         self.name = name
131         self.mac = mac
132         self.mod = mod
133         self.key_len = key_len
134         self.trunc_len = trunc_len or key_len
135
136
137 CRYPTO_ALGOS = {
138     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
139     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
140     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
141                                 mode=modes.GCM),
142 }
143
144 AUTH_ALGOS = {
145     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
147     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
148     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
149     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
150 }
151
152 PRF_ALGOS = {
153     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
154     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
155                                   hashes.SHA256, 32),
156 }
157
158 CRYPTO_IDS = {
159     12: 'AES-CBC',
160     20: 'AES-GCM-16ICV',
161 }
162
163 INTEG_IDS = {
164     2: 'HMAC-SHA1-96',
165     12: 'SHA2-256-128',
166     13: 'SHA2-384-192',
167     14: 'SHA2-512-256',
168 }
169
170
171 class IKEv2ChildSA(object):
172     def __init__(self, local_ts, remote_ts, is_initiator):
173         spi = os.urandom(4)
174         if is_initiator:
175             self.ispi = spi
176             self.rspi = None
177         else:
178             self.rspi = spi
179             self.ispi = None
180         self.local_ts = local_ts
181         self.remote_ts = remote_ts
182
183
184 class IKEv2SA(object):
185     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
186                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
187                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
188                  auth_method='shared-key', priv_key=None, i_natt=False,
189                  r_natt=False, udp_encap=False):
190         self.udp_encap = udp_encap
191         self.i_natt = i_natt
192         self.r_natt = r_natt
193         if i_natt or r_natt:
194             self.sport = 4500
195             self.dport = 4500
196         else:
197             self.sport = 500
198             self.dport = 500
199         self.msg_id = 0
200         self.dh_params = None
201         self.test = test
202         self.priv_key = priv_key
203         self.is_initiator = is_initiator
204         nonce = nonce or os.urandom(32)
205         self.auth_data = auth_data
206         self.i_id = i_id
207         self.r_id = r_id
208         if isinstance(id_type, str):
209             self.id_type = IDType.value(id_type)
210         else:
211             self.id_type = id_type
212         self.auth_method = auth_method
213         if self.is_initiator:
214             self.rspi = 8 * b'\x00'
215             self.ispi = spi
216             self.i_nonce = nonce
217         else:
218             self.rspi = spi
219             self.ispi = 8 * b'\x00'
220             self.r_nonce = nonce
221         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
222                           self.is_initiator)]
223
224     def new_msg_id(self):
225         self.msg_id += 1
226         return self.msg_id
227
228     @property
229     def my_dh_pub_key(self):
230         if self.is_initiator:
231             return self.i_dh_data
232         return self.r_dh_data
233
234     @property
235     def peer_dh_pub_key(self):
236         if self.is_initiator:
237             return self.r_dh_data
238         return self.i_dh_data
239
240     @property
241     def natt(self):
242         return self.i_natt or self.r_natt
243
244     def compute_secret(self):
245         priv = self.dh_private_key
246         peer = self.peer_dh_pub_key
247         p, g, l = self.ike_group
248         return pow(int.from_bytes(peer, 'big'),
249                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
250
251     def generate_dh_data(self):
252         # generate DH keys
253         if self.ike_dh not in DH:
254             raise NotImplementedError('%s not in DH group' % self.ike_dh)
255
256         if self.dh_params is None:
257             dhg = DH[self.ike_dh]
258             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
259             self.dh_params = pn.parameters(default_backend())
260
261         priv = self.dh_params.generate_private_key()
262         pub = priv.public_key()
263         x = priv.private_numbers().x
264         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
265         y = pub.public_numbers().y
266
267         if self.is_initiator:
268             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
269         else:
270             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
271
272     def complete_dh_data(self):
273         self.dh_shared_secret = self.compute_secret()
274
275     def calc_child_keys(self):
276         prf = self.ike_prf_alg.mod()
277         s = self.i_nonce + self.r_nonce
278         c = self.child_sas[0]
279
280         encr_key_len = self.esp_crypto_key_len
281         integ_key_len = self.esp_integ_alg.key_len
282         salt_len = 0 if integ_key_len else 4
283
284         l = (integ_key_len * 2 +
285              encr_key_len * 2 +
286              salt_len * 2)
287         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
288
289         pos = 0
290         c.sk_ei = keymat[pos:pos+encr_key_len]
291         pos += encr_key_len
292
293         if integ_key_len:
294             c.sk_ai = keymat[pos:pos+integ_key_len]
295             pos += integ_key_len
296         else:
297             c.salt_ei = keymat[pos:pos+salt_len]
298             pos += salt_len
299
300         c.sk_er = keymat[pos:pos+encr_key_len]
301         pos += encr_key_len
302
303         if integ_key_len:
304             c.sk_ar = keymat[pos:pos+integ_key_len]
305             pos += integ_key_len
306         else:
307             c.salt_er = keymat[pos:pos+salt_len]
308             pos += salt_len
309
310     def calc_prfplus(self, prf, key, seed, length):
311         r = b''
312         t = None
313         x = 1
314         while len(r) < length and x < 255:
315             if t is not None:
316                 s = t
317             else:
318                 s = b''
319             s = s + seed + bytes([x])
320             t = self.calc_prf(prf, key, s)
321             r = r + t
322             x = x + 1
323
324         if x == 255:
325             return None
326         return r
327
328     def calc_prf(self, prf, key, data):
329         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
330         h.update(data)
331         return h.finalize()
332
333     def calc_keys(self):
334         prf = self.ike_prf_alg.mod()
335         # SKEYSEED = prf(Ni | Nr, g^ir)
336         s = self.i_nonce + self.r_nonce
337         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
338
339         # calculate S = Ni | Nr | SPIi SPIr
340         s = s + self.ispi + self.rspi
341
342         prf_key_trunc = self.ike_prf_alg.trunc_len
343         encr_key_len = self.ike_crypto_key_len
344         tr_prf_key_len = self.ike_prf_alg.key_len
345         integ_key_len = self.ike_integ_alg.key_len
346         if integ_key_len == 0:
347             salt_size = 4
348         else:
349             salt_size = 0
350
351         l = (prf_key_trunc +
352              integ_key_len * 2 +
353              encr_key_len * 2 +
354              tr_prf_key_len * 2 +
355              salt_size * 2)
356         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
357
358         pos = 0
359         self.sk_d = keymat[:pos+prf_key_trunc]
360         pos += prf_key_trunc
361
362         self.sk_ai = keymat[pos:pos+integ_key_len]
363         pos += integ_key_len
364         self.sk_ar = keymat[pos:pos+integ_key_len]
365         pos += integ_key_len
366
367         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
368         pos += encr_key_len + salt_size
369         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
370         pos += encr_key_len + salt_size
371
372         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
373         pos += tr_prf_key_len
374         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
375
376     def generate_authmsg(self, prf, packet):
377         if self.is_initiator:
378             id = self.i_id
379             nonce = self.r_nonce
380             key = self.sk_pi
381         else:
382             id = self.r_id
383             nonce = self.i_nonce
384             key = self.sk_pr
385         data = bytes([self.id_type, 0, 0, 0]) + id
386         id_hash = self.calc_prf(prf, key, data)
387         return packet + nonce + id_hash
388
389     def auth_init(self):
390         prf = self.ike_prf_alg.mod()
391         if self.is_initiator:
392             packet = self.init_req_packet
393         else:
394             packet = self.init_resp_packet
395         authmsg = self.generate_authmsg(prf, raw(packet))
396         if self.auth_method == 'shared-key':
397             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
398             self.auth_data = self.calc_prf(prf, psk, authmsg)
399         elif self.auth_method == 'rsa-sig':
400             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
401                                                 hashes.SHA1())
402         else:
403             raise TypeError('unknown auth method type!')
404
405     def encrypt(self, data, aad=None):
406         data = self.ike_crypto_alg.pad(data)
407         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
408
409     @property
410     def peer_authkey(self):
411         if self.is_initiator:
412             return self.sk_ar
413         return self.sk_ai
414
415     @property
416     def my_authkey(self):
417         if self.is_initiator:
418             return self.sk_ai
419         return self.sk_ar
420
421     @property
422     def my_cryptokey(self):
423         if self.is_initiator:
424             return self.sk_ei
425         return self.sk_er
426
427     @property
428     def peer_cryptokey(self):
429         if self.is_initiator:
430             return self.sk_er
431         return self.sk_ei
432
433     def concat(self, alg, key_len):
434         return alg + '-' + str(key_len * 8)
435
436     @property
437     def vpp_ike_cypto_alg(self):
438         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
439
440     @property
441     def vpp_esp_cypto_alg(self):
442         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
443
444     def verify_hmac(self, ikemsg):
445         integ_trunc = self.ike_integ_alg.trunc_len
446         exp_hmac = ikemsg[-integ_trunc:]
447         data = ikemsg[:-integ_trunc]
448         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
449                                           self.peer_authkey, data)
450         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
451
452     def compute_hmac(self, integ, key, data):
453         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
454         h.update(data)
455         return h.finalize()
456
457     def decrypt(self, data, aad=None, icv=None):
458         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
459
460     def hmac_and_decrypt(self, ike):
461         ep = ike[ikev2.IKEv2_payload_Encrypted]
462         if self.ike_crypto == 'AES-GCM-16ICV':
463             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
464             ct = ep.load[:-GCM_ICV_SIZE]
465             tag = ep.load[-GCM_ICV_SIZE:]
466             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
467         else:
468             self.verify_hmac(raw(ike))
469             integ_trunc = self.ike_integ_alg.trunc_len
470
471             # remove ICV and decrypt payload
472             ct = ep.load[:-integ_trunc]
473             plain = self.decrypt(ct)
474         # remove padding
475         pad_len = plain[-1]
476         return plain[:-pad_len - 1]
477
478     def build_ts_addr(self, ts, version):
479         return {'starting_address_v' + version: ts['start_addr'],
480                 'ending_address_v' + version: ts['end_addr']}
481
482     def generate_ts(self, is_ip4):
483         c = self.child_sas[0]
484         ts_data = {'IP_protocol_ID': 0,
485                    'start_port': 0,
486                    'end_port': 0xffff}
487         if is_ip4:
488             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
489             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
490             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
491             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
492         else:
493             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
494             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
495             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
496             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
497
498         if self.is_initiator:
499             return ([ts1], [ts2])
500         return ([ts2], [ts1])
501
502     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
503         if crypto not in CRYPTO_ALGOS:
504             raise TypeError('unsupported encryption algo %r' % crypto)
505         self.ike_crypto = crypto
506         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
507         self.ike_crypto_key_len = crypto_key_len
508
509         if integ not in AUTH_ALGOS:
510             raise TypeError('unsupported auth algo %r' % integ)
511         self.ike_integ = None if integ == 'NULL' else integ
512         self.ike_integ_alg = AUTH_ALGOS[integ]
513
514         if prf not in PRF_ALGOS:
515             raise TypeError('unsupported prf algo %r' % prf)
516         self.ike_prf = prf
517         self.ike_prf_alg = PRF_ALGOS[prf]
518         self.ike_dh = dh
519         self.ike_group = DH[self.ike_dh]
520
521     def set_esp_props(self, crypto, crypto_key_len, integ):
522         self.esp_crypto_key_len = crypto_key_len
523         if crypto not in CRYPTO_ALGOS:
524             raise TypeError('unsupported encryption algo %r' % crypto)
525         self.esp_crypto = crypto
526         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
527
528         if integ not in AUTH_ALGOS:
529             raise TypeError('unsupported auth algo %r' % integ)
530         self.esp_integ = None if integ == 'NULL' else integ
531         self.esp_integ_alg = AUTH_ALGOS[integ]
532
533     def crypto_attr(self, key_len):
534         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
535             return (0x800e << 16 | key_len << 3, 12)
536         else:
537             raise Exception('unsupported attribute type')
538
539     def ike_crypto_attr(self):
540         return self.crypto_attr(self.ike_crypto_key_len)
541
542     def esp_crypto_attr(self):
543         return self.crypto_attr(self.esp_crypto_key_len)
544
545     def compute_nat_sha1(self, ip, port, rspi=None):
546         if rspi is None:
547             rspi = self.rspi
548         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
549         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
550         digest.update(data)
551         return digest.finalize()
552
553
554 class IkePeer(VppTestCase):
555     """ common class for initiator and responder """
556
557     @classmethod
558     def setUpClass(cls):
559         import scapy.contrib.ikev2 as _ikev2
560         globals()['ikev2'] = _ikev2
561         super(IkePeer, cls).setUpClass()
562         cls.create_pg_interfaces(range(2))
563         for i in cls.pg_interfaces:
564             i.admin_up()
565             i.config_ip4()
566             i.resolve_arp()
567             i.config_ip6()
568             i.resolve_ndp()
569
570     @classmethod
571     def tearDownClass(cls):
572         super(IkePeer, cls).tearDownClass()
573
574     def tearDown(self):
575         super(IkePeer, self).tearDown()
576         if self.del_sa_from_responder:
577             self.initiate_del_sa_from_responder()
578         else:
579             self.initiate_del_sa_from_initiator()
580         r = self.vapi.ikev2_sa_dump()
581         self.assertEqual(len(r), 0)
582         sas = self.vapi.ipsec_sa_dump()
583         self.assertEqual(len(sas), 0)
584         self.p.remove_vpp_config()
585         self.assertIsNone(self.p.query_vpp_config())
586
587     def setUp(self):
588         super(IkePeer, self).setUp()
589         self.config_tc()
590         self.p.add_vpp_config()
591         self.assertIsNotNone(self.p.query_vpp_config())
592         if self.sa.is_initiator:
593             self.sa.generate_dh_data()
594         self.vapi.cli('ikev2 set logging level 4')
595         self.vapi.cli('event-lo clear')
596
597     def assert_counter(self, count, name, version='ip4'):
598         node_name = '/err/ikev2-%s/' % version + name
599         self.assertEqual(count, self.statistics.get_err_counter(node_name))
600
601     def create_rekey_request(self):
602         sa, first_payload = self.generate_auth_payload(is_rekey=True)
603         header = ikev2.IKEv2(
604                 init_SPI=self.sa.ispi,
605                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
606                 flags='Initiator', exch_type='CREATE_CHILD_SA')
607
608         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
609         return self.create_packet(self.pg0, ike_msg, self.sa.sport,
610                                   self.sa.dport, self.sa.natt, self.ip6)
611
612     def create_empty_request(self):
613         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
614                              id=self.sa.new_msg_id(), flags='Initiator',
615                              exch_type='INFORMATIONAL',
616                              next_payload='Encrypted')
617
618         msg = self.encrypt_ike_msg(header, b'', None)
619         return self.create_packet(self.pg0, msg, self.sa.sport,
620                                   self.sa.dport, self.sa.natt, self.ip6)
621
622     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
623                       use_ip6=False):
624         if use_ip6:
625             src_ip = src_if.remote_ip6
626             dst_ip = src_if.local_ip6
627             ip_layer = IPv6
628         else:
629             src_ip = src_if.remote_ip4
630             dst_ip = src_if.local_ip4
631             ip_layer = IP
632         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
633                ip_layer(src=src_ip, dst=dst_ip) /
634                UDP(sport=sport, dport=dport))
635         if natt:
636             # insert non ESP marker
637             res = res / Raw(b'\x00' * 4)
638         return res / msg
639
640     def verify_udp(self, udp):
641         self.assertEqual(udp.sport, self.sa.sport)
642         self.assertEqual(udp.dport, self.sa.dport)
643
644     def get_ike_header(self, packet):
645         try:
646             ih = packet[ikev2.IKEv2]
647             ih = self.verify_and_remove_non_esp_marker(ih)
648         except IndexError as e:
649             # this is a workaround for getting IKEv2 layer as both ikev2 and
650             # ipsec register for port 4500
651             esp = packet[ESP]
652             ih = self.verify_and_remove_non_esp_marker(esp)
653         self.assertEqual(ih.version, 0x20)
654         self.assertNotIn('Version', ih.flags)
655         return ih
656
657     def verify_and_remove_non_esp_marker(self, packet):
658         if self.sa.natt:
659             # if we are in nat traversal mode check for non esp marker
660             # and remove it
661             data = raw(packet)
662             self.assertEqual(data[:4], b'\x00' * 4)
663             return ikev2.IKEv2(data[4:])
664         else:
665             return packet
666
667     def encrypt_ike_msg(self, header, plain, first_payload):
668         if self.sa.ike_crypto == 'AES-GCM-16ICV':
669             data = self.sa.ike_crypto_alg.pad(raw(plain))
670             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
671                 len(ikev2.IKEv2_payload_Encrypted())
672             tlen = plen + len(ikev2.IKEv2())
673
674             # prepare aad data
675             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
676                                                  length=plen)
677             header.length = tlen
678             res = header / sk_p
679             encr = self.sa.encrypt(raw(plain), raw(res))
680             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
681                                                  length=plen, load=encr)
682             res = header / sk_p
683         else:
684             encr = self.sa.encrypt(raw(plain))
685             trunc_len = self.sa.ike_integ_alg.trunc_len
686             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
687             tlen = plen + len(ikev2.IKEv2())
688
689             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
690                                                  length=plen, load=encr)
691             header.length = tlen
692             res = header / sk_p
693
694             integ_data = raw(res)
695             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
696                                              self.sa.my_authkey, integ_data)
697             res = res / Raw(hmac_data[:trunc_len])
698         assert(len(res) == tlen)
699         return res
700
701     def verify_udp_encap(self, ipsec_sa):
702         e = VppEnum.vl_api_ipsec_sad_flags_t
703         if self.sa.udp_encap or self.sa.natt:
704             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
705         else:
706             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
707
708     def verify_ipsec_sas(self, is_rekey=False):
709         sas = self.vapi.ipsec_sa_dump()
710         if is_rekey:
711             # after rekey there is a short period of time in which old
712             # inbound SA is still present
713             sa_count = 3
714         else:
715             sa_count = 2
716         self.assertEqual(len(sas), sa_count)
717         if self.sa.is_initiator:
718             if is_rekey:
719                 sa0 = sas[0].entry
720                 sa1 = sas[2].entry
721             else:
722                 sa0 = sas[0].entry
723                 sa1 = sas[1].entry
724         else:
725             if is_rekey:
726                 sa0 = sas[2].entry
727                 sa1 = sas[0].entry
728             else:
729                 sa1 = sas[0].entry
730                 sa0 = sas[1].entry
731
732         c = self.sa.child_sas[0]
733
734         self.verify_udp_encap(sa0)
735         self.verify_udp_encap(sa1)
736         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
737         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
738         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
739
740         if self.sa.esp_integ is None:
741             vpp_integ_alg = 0
742         else:
743             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
744         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
745         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
746
747         # verify crypto keys
748         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
749         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
750         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
751         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
752
753         # verify integ keys
754         if vpp_integ_alg:
755             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
756             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
757             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
758             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
759         else:
760             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
761             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
762
763     def verify_keymat(self, api_keys, keys, name):
764         km = getattr(keys, name)
765         api_km = getattr(api_keys, name)
766         api_km_len = getattr(api_keys, name + '_len')
767         self.assertEqual(len(km), api_km_len)
768         self.assertEqual(km, api_km[:api_km_len])
769
770     def verify_id(self, api_id, exp_id):
771         self.assertEqual(api_id.type, IDType.value(exp_id.type))
772         self.assertEqual(api_id.data_len, exp_id.data_len)
773         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
774
775     def verify_ike_sas(self):
776         r = self.vapi.ikev2_sa_dump()
777         self.assertEqual(len(r), 1)
778         sa = r[0].sa
779         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
780         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
781         if self.ip6:
782             if self.sa.is_initiator:
783                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
784                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
785             else:
786                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
787                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
788         else:
789             if self.sa.is_initiator:
790                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
791                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
792             else:
793                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
794                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
795         self.verify_keymat(sa.keys, self.sa, 'sk_d')
796         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
797         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
798         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
799         self.verify_keymat(sa.keys, self.sa, 'sk_er')
800         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
801         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
802
803         self.assertEqual(sa.i_id.type, self.sa.id_type)
804         self.assertEqual(sa.r_id.type, self.sa.id_type)
805         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
806         self.assertEqual(sa.r_id.data_len, len(self.idr))
807         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
808         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr)
809
810         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
811         self.assertEqual(len(r), 1)
812         csa = r[0].child_sa
813         self.assertEqual(csa.sa_index, sa.sa_index)
814         c = self.sa.child_sas[0]
815         if hasattr(c, 'sk_ai'):
816             self.verify_keymat(csa.keys, c, 'sk_ai')
817             self.verify_keymat(csa.keys, c, 'sk_ar')
818         self.verify_keymat(csa.keys, c, 'sk_ei')
819         self.verify_keymat(csa.keys, c, 'sk_er')
820         self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
821         self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
822
823         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
824         tsi = tsi[0]
825         tsr = tsr[0]
826         r = self.vapi.ikev2_traffic_selector_dump(
827                 is_initiator=True, sa_index=sa.sa_index,
828                 child_sa_index=csa.child_sa_index)
829         self.assertEqual(len(r), 1)
830         ts = r[0].ts
831         self.verify_ts(r[0].ts, tsi[0], True)
832
833         r = self.vapi.ikev2_traffic_selector_dump(
834                 is_initiator=False, sa_index=sa.sa_index,
835                 child_sa_index=csa.child_sa_index)
836         self.assertEqual(len(r), 1)
837         self.verify_ts(r[0].ts, tsr[0], False)
838
839         n = self.vapi.ikev2_nonce_get(is_initiator=True,
840                                       sa_index=sa.sa_index)
841         self.verify_nonce(n, self.sa.i_nonce)
842         n = self.vapi.ikev2_nonce_get(is_initiator=False,
843                                       sa_index=sa.sa_index)
844         self.verify_nonce(n, self.sa.r_nonce)
845
846     def verify_nonce(self, api_nonce, nonce):
847         self.assertEqual(api_nonce.data_len, len(nonce))
848         self.assertEqual(api_nonce.nonce, nonce)
849
850     def verify_ts(self, api_ts, ts, is_initiator):
851         if is_initiator:
852             self.assertTrue(api_ts.is_local)
853         else:
854             self.assertFalse(api_ts.is_local)
855
856         if self.p.ts_is_ip4:
857             self.assertEqual(api_ts.start_addr,
858                              IPv4Address(ts.starting_address_v4))
859             self.assertEqual(api_ts.end_addr,
860                              IPv4Address(ts.ending_address_v4))
861         else:
862             self.assertEqual(api_ts.start_addr,
863                              IPv6Address(ts.starting_address_v6))
864             self.assertEqual(api_ts.end_addr,
865                              IPv6Address(ts.ending_address_v6))
866         self.assertEqual(api_ts.start_port, ts.start_port)
867         self.assertEqual(api_ts.end_port, ts.end_port)
868         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
869
870
871 class TemplateInitiator(IkePeer):
872     """ initiator test template """
873
874     def initiate_del_sa_from_initiator(self):
875         ispi = int.from_bytes(self.sa.ispi, 'little')
876         self.pg0.enable_capture()
877         self.pg_start()
878         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
879         capture = self.pg0.get_capture(1)
880         ih = self.get_ike_header(capture[0])
881         self.assertNotIn('Response', ih.flags)
882         self.assertIn('Initiator', ih.flags)
883         self.assertEqual(ih.init_SPI, self.sa.ispi)
884         self.assertEqual(ih.resp_SPI, self.sa.rspi)
885         plain = self.sa.hmac_and_decrypt(ih)
886         d = ikev2.IKEv2_payload_Delete(plain)
887         self.assertEqual(d.proto, 1)  # proto=IKEv2
888         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
889                              flags='Response', exch_type='INFORMATIONAL',
890                              id=ih.id, next_payload='Encrypted')
891         resp = self.encrypt_ike_msg(header, b'', None)
892         self.send_and_assert_no_replies(self.pg0, resp)
893
894     def verify_del_sa(self, packet):
895         ih = self.get_ike_header(packet)
896         self.assertEqual(ih.id, self.sa.msg_id)
897         self.assertEqual(ih.exch_type, 37)  # exchange informational
898         self.assertIn('Response', ih.flags)
899         self.assertIn('Initiator', ih.flags)
900         plain = self.sa.hmac_and_decrypt(ih)
901         self.assertEqual(plain, b'')
902
903     def initiate_del_sa_from_responder(self):
904         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
905                              exch_type='INFORMATIONAL',
906                              id=self.sa.new_msg_id())
907         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
908         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
909         packet = self.create_packet(self.pg0, ike_msg,
910                                     self.sa.sport, self.sa.dport,
911                                     self.sa.natt, self.ip6)
912         self.pg0.add_stream(packet)
913         self.pg0.enable_capture()
914         self.pg_start()
915         capture = self.pg0.get_capture(1)
916         self.verify_del_sa(capture[0])
917
918     @staticmethod
919     def find_notify_payload(packet, notify_type):
920         n = packet[ikev2.IKEv2_payload_Notify]
921         while n is not None:
922             if n.type == notify_type:
923                 return n
924             n = n.payload
925         return None
926
927     def verify_nat_detection(self, packet):
928         if self.ip6:
929             iph = packet[IPv6]
930         else:
931             iph = packet[IP]
932         udp = packet[UDP]
933
934         # NAT_DETECTION_SOURCE_IP
935         s = self.find_notify_payload(packet, 16388)
936         self.assertIsNotNone(s)
937         src_sha = self.sa.compute_nat_sha1(
938                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
939         self.assertEqual(s.load, src_sha)
940
941         # NAT_DETECTION_DESTINATION_IP
942         s = self.find_notify_payload(packet, 16389)
943         self.assertIsNotNone(s)
944         dst_sha = self.sa.compute_nat_sha1(
945                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
946         self.assertEqual(s.load, dst_sha)
947
948     def verify_sa_init_request(self, packet):
949         udp = packet[UDP]
950         self.sa.dport = udp.sport
951         ih = packet[ikev2.IKEv2]
952         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
953         self.assertEqual(ih.exch_type, 34)  # SA_INIT
954         self.sa.ispi = ih.init_SPI
955         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
956         self.assertIn('Initiator', ih.flags)
957         self.assertNotIn('Response', ih.flags)
958         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
959         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
960
961         prop = packet[ikev2.IKEv2_payload_Proposal]
962         self.assertEqual(prop.proto, 1)  # proto = ikev2
963         self.assertEqual(prop.proposal, 1)
964         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
965         self.assertEqual(prop.trans[0].transform_id,
966                          self.p.ike_transforms['crypto_alg'])
967         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
968         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
969         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
970         self.assertEqual(prop.trans[2].transform_id,
971                          self.p.ike_transforms['dh_group'])
972
973         self.verify_nat_detection(packet)
974         self.sa.set_ike_props(
975                     crypto='AES-GCM-16ICV', crypto_key_len=32,
976                     integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
977         self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
978                               integ='SHA2-256-128')
979         self.sa.generate_dh_data()
980         self.sa.complete_dh_data()
981         self.sa.calc_keys()
982
983     def update_esp_transforms(self, trans, sa):
984         while trans:
985             if trans.transform_type == 1:  # ecryption
986                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
987             elif trans.transform_type == 3:  # integrity
988                 sa.esp_integ = INTEG_IDS[trans.transform_id]
989             trans = trans.payload
990
991     def verify_sa_auth_req(self, packet):
992         udp = packet[UDP]
993         self.sa.dport = udp.sport
994         ih = self.get_ike_header(packet)
995         self.assertEqual(ih.resp_SPI, self.sa.rspi)
996         self.assertEqual(ih.init_SPI, self.sa.ispi)
997         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
998         self.assertIn('Initiator', ih.flags)
999         self.assertNotIn('Response', ih.flags)
1000
1001         udp = packet[UDP]
1002         self.verify_udp(udp)
1003         self.assertEqual(ih.id, self.sa.msg_id + 1)
1004         self.sa.msg_id += 1
1005         plain = self.sa.hmac_and_decrypt(ih)
1006         idi = ikev2.IKEv2_payload_IDi(plain)
1007         self.assertEqual(idi.load, self.sa.i_id)
1008         if self.no_idr_auth:
1009             self.assertEqual(idi.next_payload, 39)  # AUTH
1010         else:
1011             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1012             self.assertEqual(idr.load, self.sa.r_id)
1013         prop = idi[ikev2.IKEv2_payload_Proposal]
1014         c = self.sa.child_sas[0]
1015         c.ispi = prop.SPI
1016         self.update_esp_transforms(
1017                 prop[ikev2.IKEv2_payload_Transform], self.sa)
1018
1019     def send_init_response(self):
1020         tr_attr = self.sa.ike_crypto_attr()
1021         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1022                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1023                  key_length=tr_attr[0]) /
1024                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1025                  transform_id=self.sa.ike_integ) /
1026                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1027                  transform_id=self.sa.ike_prf_alg.name) /
1028                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1029                  transform_id=self.sa.ike_dh))
1030         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1031                  trans_nb=4, trans=trans))
1032
1033         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1034         if self.sa.natt:
1035             dst_address = b'\x0a\x0a\x0a\x0a'
1036         else:
1037             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1038         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1039         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1040
1041         self.sa.init_resp_packet = (
1042             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1043                         exch_type='IKE_SA_INIT', flags='Response') /
1044             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1045             ikev2.IKEv2_payload_KE(next_payload='Nonce',
1046                                    group=self.sa.ike_dh,
1047                                    load=self.sa.my_dh_pub_key) /
1048             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1049                                       next_payload='Notify') /
1050             ikev2.IKEv2_payload_Notify(
1051                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1052                     next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1053                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
1054
1055         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1056                                      self.sa.sport, self.sa.dport,
1057                                      False, self.ip6)
1058         self.pg_send(self.pg0, ike_msg)
1059         capture = self.pg0.get_capture(1)
1060         self.verify_sa_auth_req(capture[0])
1061
1062     def initiate_sa_init(self):
1063         self.pg0.enable_capture()
1064         self.pg_start()
1065         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1066
1067         capture = self.pg0.get_capture(1)
1068         self.verify_sa_init_request(capture[0])
1069         self.send_init_response()
1070
1071     def send_auth_response(self):
1072         tr_attr = self.sa.esp_crypto_attr()
1073         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1074                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1075                  key_length=tr_attr[0]) /
1076                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1077                  transform_id=self.sa.esp_integ) /
1078                  ikev2.IKEv2_payload_Transform(
1079                  transform_type='Extended Sequence Number',
1080                  transform_id='No ESN') /
1081                  ikev2.IKEv2_payload_Transform(
1082                  transform_type='Extended Sequence Number',
1083                  transform_id='ESN'))
1084
1085         c = self.sa.child_sas[0]
1086         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1087                  SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1088
1089         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1090         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1091                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1092                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1093                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1094                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1095                  auth_type=AuthMethod.value(self.sa.auth_method),
1096                  load=self.sa.auth_data) /
1097                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1098                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1099                  number_of_TSs=len(tsi),
1100                  traffic_selector=tsi) /
1101                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1102                  number_of_TSs=len(tsr),
1103                  traffic_selector=tsr) /
1104                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1105
1106         header = ikev2.IKEv2(
1107                 init_SPI=self.sa.ispi,
1108                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1109                 flags='Response', exch_type='IKE_AUTH')
1110
1111         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1112         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1113                                     self.sa.dport, self.sa.natt, self.ip6)
1114         self.pg_send(self.pg0, packet)
1115
1116     def test_initiator(self):
1117         self.initiate_sa_init()
1118         self.sa.auth_init()
1119         self.sa.calc_child_keys()
1120         self.send_auth_response()
1121         self.verify_ike_sas()
1122
1123
1124 class TemplateResponder(IkePeer):
1125     """ responder test template """
1126
1127     def initiate_del_sa_from_responder(self):
1128         self.pg0.enable_capture()
1129         self.pg_start()
1130         self.vapi.ikev2_initiate_del_ike_sa(
1131                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1132         capture = self.pg0.get_capture(1)
1133         ih = self.get_ike_header(capture[0])
1134         self.assertNotIn('Response', ih.flags)
1135         self.assertNotIn('Initiator', ih.flags)
1136         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1137         plain = self.sa.hmac_and_decrypt(ih)
1138         d = ikev2.IKEv2_payload_Delete(plain)
1139         self.assertEqual(d.proto, 1)  # proto=IKEv2
1140         self.assertEqual(ih.init_SPI, self.sa.ispi)
1141         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1142         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1143                              flags='Initiator+Response',
1144                              exch_type='INFORMATIONAL',
1145                              id=ih.id, next_payload='Encrypted')
1146         resp = self.encrypt_ike_msg(header, b'', None)
1147         self.send_and_assert_no_replies(self.pg0, resp)
1148
1149     def verify_del_sa(self, packet):
1150         ih = self.get_ike_header(packet)
1151         self.assertEqual(ih.id, self.sa.msg_id)
1152         self.assertEqual(ih.exch_type, 37)  # exchange informational
1153         self.assertIn('Response', ih.flags)
1154         self.assertNotIn('Initiator', ih.flags)
1155         self.assertEqual(ih.next_payload, 46)  # Encrypted
1156         self.assertEqual(ih.init_SPI, self.sa.ispi)
1157         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1158         plain = self.sa.hmac_and_decrypt(ih)
1159         self.assertEqual(plain, b'')
1160
1161     def initiate_del_sa_from_initiator(self):
1162         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1163                              flags='Initiator', exch_type='INFORMATIONAL',
1164                              id=self.sa.new_msg_id())
1165         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1166         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1167         packet = self.create_packet(self.pg0, ike_msg,
1168                                     self.sa.sport, self.sa.dport,
1169                                     self.sa.natt, self.ip6)
1170         self.pg0.add_stream(packet)
1171         self.pg0.enable_capture()
1172         self.pg_start()
1173         capture = self.pg0.get_capture(1)
1174         self.verify_del_sa(capture[0])
1175
1176     def send_sa_init_req(self):
1177         tr_attr = self.sa.ike_crypto_attr()
1178         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1179                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1180                  key_length=tr_attr[0]) /
1181                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1182                  transform_id=self.sa.ike_integ) /
1183                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1184                  transform_id=self.sa.ike_prf_alg.name) /
1185                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1186                  transform_id=self.sa.ike_dh))
1187
1188         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1189                  trans_nb=4, trans=trans))
1190
1191         next_payload = None if self.ip6 else 'Notify'
1192
1193         self.sa.init_req_packet = (
1194                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1195                             flags='Initiator', exch_type='IKE_SA_INIT') /
1196                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1197                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1198                                        group=self.sa.ike_dh,
1199                                        load=self.sa.my_dh_pub_key) /
1200                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
1201                                           load=self.sa.i_nonce))
1202
1203         if not self.ip6:
1204             if self.sa.i_natt:
1205                 src_address = b'\x0a\x0a\x0a\x01'
1206             else:
1207                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1208
1209             if self.sa.r_natt:
1210                 dst_address = b'\x0a\x0a\x0a\x0a'
1211             else:
1212                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1213
1214             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1215             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1216             nat_src_detection = ikev2.IKEv2_payload_Notify(
1217                     type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1218                     next_payload='Notify')
1219             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1220                     type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1221             self.sa.init_req_packet = (self.sa.init_req_packet /
1222                                        nat_src_detection /
1223                                        nat_dst_detection)
1224
1225         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1226                                      self.sa.sport, self.sa.dport,
1227                                      self.sa.natt, self.ip6)
1228         self.pg0.add_stream(ike_msg)
1229         self.pg0.enable_capture()
1230         self.pg_start()
1231         capture = self.pg0.get_capture(1)
1232         self.verify_sa_init(capture[0])
1233
1234     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1235         tr_attr = self.sa.esp_crypto_attr()
1236         last_payload = last_payload or 'Notify'
1237         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1238                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1239                  key_length=tr_attr[0]) /
1240                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1241                  transform_id=self.sa.esp_integ) /
1242                  ikev2.IKEv2_payload_Transform(
1243                  transform_type='Extended Sequence Number',
1244                  transform_id='No ESN') /
1245                  ikev2.IKEv2_payload_Transform(
1246                  transform_type='Extended Sequence Number',
1247                  transform_id='ESN'))
1248
1249         c = self.sa.child_sas[0]
1250         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1251                  SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1252
1253         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1254         plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1255                  auth_type=AuthMethod.value(self.sa.auth_method),
1256                  load=self.sa.auth_data) /
1257                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1258                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1259                  number_of_TSs=len(tsi), traffic_selector=tsi) /
1260                  ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1261                  number_of_TSs=len(tsr), traffic_selector=tsr))
1262
1263         if is_rekey:
1264             first_payload = 'Nonce'
1265             plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1266                      next_payload='SA') / plain /
1267                      ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1268                      proto='ESP', SPI=c.ispi))
1269         else:
1270             first_payload = 'IDi'
1271             if self.no_idr_auth:
1272                 ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH',
1273                                               IDtype=self.sa.id_type,
1274                                               load=self.sa.i_id)
1275             else:
1276                 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1277                        IDtype=self.sa.id_type, load=self.sa.i_id) /
1278                        ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1279                        IDtype=self.sa.id_type, load=self.sa.r_id))
1280             plain = ids / plain
1281         return plain, first_payload
1282
1283     def send_sa_auth(self):
1284         plain, first_payload = self.generate_auth_payload(
1285                     last_payload='Notify')
1286         plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1287         header = ikev2.IKEv2(
1288                 init_SPI=self.sa.ispi,
1289                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1290                 flags='Initiator', exch_type='IKE_AUTH')
1291
1292         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1293         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1294                                     self.sa.dport, self.sa.natt, self.ip6)
1295         self.pg0.add_stream(packet)
1296         self.pg0.enable_capture()
1297         self.pg_start()
1298         capture = self.pg0.get_capture(1)
1299         self.verify_sa_auth_resp(capture[0])
1300
1301     def verify_sa_init(self, packet):
1302         ih = self.get_ike_header(packet)
1303
1304         self.assertEqual(ih.id, self.sa.msg_id)
1305         self.assertEqual(ih.exch_type, 34)
1306         self.assertIn('Response', ih.flags)
1307         self.assertEqual(ih.init_SPI, self.sa.ispi)
1308         self.assertNotEqual(ih.resp_SPI, 0)
1309         self.sa.rspi = ih.resp_SPI
1310         try:
1311             sa = ih[ikev2.IKEv2_payload_SA]
1312             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1313             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1314         except IndexError as e:
1315             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1316             self.logger.error(ih.show())
1317             raise
1318         self.sa.complete_dh_data()
1319         self.sa.calc_keys()
1320         self.sa.auth_init()
1321
1322     def verify_sa_auth_resp(self, packet):
1323         ike = self.get_ike_header(packet)
1324         udp = packet[UDP]
1325         self.verify_udp(udp)
1326         self.assertEqual(ike.id, self.sa.msg_id)
1327         plain = self.sa.hmac_and_decrypt(ike)
1328         idr = ikev2.IKEv2_payload_IDr(plain)
1329         prop = idr[ikev2.IKEv2_payload_Proposal]
1330         self.assertEqual(prop.SPIsize, 4)
1331         self.sa.child_sas[0].rspi = prop.SPI
1332         self.sa.calc_child_keys()
1333
1334     IKE_NODE_SUFFIX = 'ip4'
1335
1336     def verify_counters(self):
1337         self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
1338         self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
1339         self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1340
1341         r = self.vapi.ikev2_sa_dump()
1342         s = r[0].sa.stats
1343         self.assertEqual(1, s.n_sa_auth_req)
1344         self.assertEqual(1, s.n_sa_init_req)
1345
1346     def test_responder(self):
1347         self.send_sa_init_req()
1348         self.send_sa_auth()
1349         self.verify_ipsec_sas()
1350         self.verify_ike_sas()
1351         self.verify_counters()
1352
1353
1354 class Ikev2Params(object):
1355     def config_params(self, params={}):
1356         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1357         ei = VppEnum.vl_api_ipsec_integ_alg_t
1358         self.vpp_enums = {
1359                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1360                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1361                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1362                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1363                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1364                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1365
1366                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1367                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1368                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1369                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1370
1371         dpd_disabled = True if 'dpd_disabled' not in params else\
1372             params['dpd_disabled']
1373         if dpd_disabled:
1374             self.vapi.cli('ikev2 dpd disable')
1375         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1376             not in params else params['del_sa_from_responder']
1377         i_natt = False if 'i_natt' not in params else params['i_natt']
1378         r_natt = False if 'r_natt' not in params else params['r_natt']
1379         self.p = Profile(self, 'pr1')
1380         self.ip6 = False if 'ip6' not in params else params['ip6']
1381
1382         if 'auth' in params and params['auth'] == 'rsa-sig':
1383             auth_method = 'rsa-sig'
1384             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1385             self.vapi.ikev2_set_local_key(
1386                     key_file=work_dir + params['server-key'])
1387
1388             client_file = work_dir + params['client-cert']
1389             server_pem = open(work_dir + params['server-cert']).read()
1390             client_priv = open(work_dir + params['client-key']).read()
1391             client_priv = load_pem_private_key(str.encode(client_priv), None,
1392                                                default_backend())
1393             self.peer_cert = x509.load_pem_x509_certificate(
1394                     str.encode(server_pem),
1395                     default_backend())
1396             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1397             auth_data = None
1398         else:
1399             auth_data = b'$3cr3tpa$$w0rd'
1400             self.p.add_auth(method='shared-key', data=auth_data)
1401             auth_method = 'shared-key'
1402             client_priv = None
1403
1404         is_init = True if 'is_initiator' not in params else\
1405             params['is_initiator']
1406         self.no_idr_auth = params.get('no_idr_in_auth', False)
1407
1408         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1409         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1410         r_id = self.idr = idr['data']
1411         i_id = self.idi = idi['data']
1412         if is_init:
1413             # scapy is initiator, VPP is responder
1414             self.p.add_local_id(**idr)
1415             self.p.add_remote_id(**idi)
1416             if self.no_idr_auth:
1417                 r_id = None
1418         else:
1419             # VPP is initiator, scapy is responder
1420             self.p.add_local_id(**idi)
1421             if not self.no_idr_auth:
1422                 self.p.add_remote_id(**idr)
1423
1424         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1425             'loc_ts' not in params else params['loc_ts']
1426         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1427             'rem_ts' not in params else params['rem_ts']
1428         self.p.add_local_ts(**loc_ts)
1429         self.p.add_remote_ts(**rem_ts)
1430         if 'responder' in params:
1431             self.p.add_responder(params['responder'])
1432         if 'ike_transforms' in params:
1433             self.p.add_ike_transforms(params['ike_transforms'])
1434         if 'esp_transforms' in params:
1435             self.p.add_esp_transforms(params['esp_transforms'])
1436
1437         udp_encap = False if 'udp_encap' not in params else\
1438             params['udp_encap']
1439         if udp_encap:
1440             self.p.set_udp_encap(True)
1441
1442         if 'responder_hostname' in params:
1443             hn = params['responder_hostname']
1444             self.p.add_responder_hostname(hn)
1445
1446             # configure static dns record
1447             self.vapi.dns_name_server_add_del(
1448                 is_ip6=0, is_add=1,
1449                 server_address=IPv4Address(u'8.8.8.8').packed)
1450             self.vapi.dns_enable_disable(enable=1)
1451
1452             cmd = "dns cache add {} {}".format(hn['hostname'],
1453                                                self.pg0.remote_ip4)
1454             self.vapi.cli(cmd)
1455
1456         self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id,
1457                           is_initiator=is_init,
1458                           id_type=self.p.local_id['id_type'],
1459                           i_natt=i_natt, r_natt=r_natt,
1460                           priv_key=client_priv, auth_method=auth_method,
1461                           nonce=params.get('nonce'),
1462                           auth_data=auth_data, udp_encap=udp_encap,
1463                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1464
1465         if is_init:
1466             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1467                 params['ike-crypto']
1468             ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1469                 params['ike-integ']
1470             ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1471                 params['ike-dh']
1472
1473             esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1474                 params['esp-crypto']
1475             esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1476                 params['esp-integ']
1477
1478             self.sa.set_ike_props(
1479                     crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1480                     integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1481             self.sa.set_esp_props(
1482                     crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1483                     integ=esp_integ)
1484
1485
1486 class TestApi(VppTestCase):
1487     """ Test IKEV2 API """
1488     @classmethod
1489     def setUpClass(cls):
1490         super(TestApi, cls).setUpClass()
1491
1492     @classmethod
1493     def tearDownClass(cls):
1494         super(TestApi, cls).tearDownClass()
1495
1496     def tearDown(self):
1497         super(TestApi, self).tearDown()
1498         self.p1.remove_vpp_config()
1499         self.p2.remove_vpp_config()
1500         r = self.vapi.ikev2_profile_dump()
1501         self.assertEqual(len(r), 0)
1502
1503     def configure_profile(self, cfg):
1504         p = Profile(self, cfg['name'])
1505         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1506         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1507         p.add_local_ts(**cfg['loc_ts'])
1508         p.add_remote_ts(**cfg['rem_ts'])
1509         p.add_responder(cfg['responder'])
1510         p.add_ike_transforms(cfg['ike_ts'])
1511         p.add_esp_transforms(cfg['esp_ts'])
1512         p.add_auth(**cfg['auth'])
1513         p.set_udp_encap(cfg['udp_encap'])
1514         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1515         if 'lifetime_data' in cfg:
1516             p.set_lifetime_data(cfg['lifetime_data'])
1517         if 'tun_itf' in cfg:
1518             p.set_tunnel_interface(cfg['tun_itf'])
1519         if 'natt_disabled' in cfg and cfg['natt_disabled']:
1520             p.disable_natt()
1521         p.add_vpp_config()
1522         return p
1523
1524     def test_profile_api(self):
1525         """ test profile dump API """
1526         loc_ts4 = {
1527                     'proto': 8,
1528                     'start_port': 1,
1529                     'end_port': 19,
1530                     'start_addr': '3.3.3.2',
1531                     'end_addr': '3.3.3.3',
1532                 }
1533         rem_ts4 = {
1534                     'proto': 9,
1535                     'start_port': 10,
1536                     'end_port': 119,
1537                     'start_addr': '4.5.76.80',
1538                     'end_addr': '2.3.4.6',
1539                 }
1540
1541         loc_ts6 = {
1542                     'proto': 8,
1543                     'start_port': 1,
1544                     'end_port': 19,
1545                     'start_addr': 'ab::1',
1546                     'end_addr': 'ab::4',
1547                 }
1548         rem_ts6 = {
1549                     'proto': 9,
1550                     'start_port': 10,
1551                     'end_port': 119,
1552                     'start_addr': 'cd::12',
1553                     'end_addr': 'cd::13',
1554                 }
1555
1556         conf = {
1557             'p1': {
1558                 'name': 'p1',
1559                 'natt_disabled': True,
1560                 'loc_id': ('fqdn', b'vpp.home'),
1561                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1562                 'loc_ts': loc_ts4,
1563                 'rem_ts': rem_ts4,
1564                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1565                 'ike_ts': {
1566                         'crypto_alg': 20,
1567                         'crypto_key_size': 32,
1568                         'integ_alg': 0,
1569                         'dh_group': 1},
1570                 'esp_ts': {
1571                         'crypto_alg': 13,
1572                         'crypto_key_size': 24,
1573                         'integ_alg': 2},
1574                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1575                 'udp_encap': True,
1576                 'ipsec_over_udp_port': 4501,
1577                 'lifetime_data': {
1578                     'lifetime': 123,
1579                     'lifetime_maxdata': 20192,
1580                     'lifetime_jitter': 9,
1581                     'handover': 132},
1582             },
1583             'p2': {
1584                 'name': 'p2',
1585                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1586                 'rem_id': ('ip6-addr', b'abcd::1'),
1587                 'loc_ts': loc_ts6,
1588                 'rem_ts': rem_ts6,
1589                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1590                 'ike_ts': {
1591                         'crypto_alg': 12,
1592                         'crypto_key_size': 16,
1593                         'integ_alg': 3,
1594                         'dh_group': 3},
1595                 'esp_ts': {
1596                         'crypto_alg': 9,
1597                         'crypto_key_size': 24,
1598                         'integ_alg': 4},
1599                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1600                 'udp_encap': False,
1601                 'ipsec_over_udp_port': 4600,
1602                 'tun_itf': 0}
1603         }
1604         self.p1 = self.configure_profile(conf['p1'])
1605         self.p2 = self.configure_profile(conf['p2'])
1606
1607         r = self.vapi.ikev2_profile_dump()
1608         self.assertEqual(len(r), 2)
1609         self.verify_profile(r[0].profile, conf['p1'])
1610         self.verify_profile(r[1].profile, conf['p2'])
1611
1612     def verify_id(self, api_id, cfg_id):
1613         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1614         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1615
1616     def verify_ts(self, api_ts, cfg_ts):
1617         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1618         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1619         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1620         self.assertEqual(api_ts.start_addr,
1621                          ip_address(text_type(cfg_ts['start_addr'])))
1622         self.assertEqual(api_ts.end_addr,
1623                          ip_address(text_type(cfg_ts['end_addr'])))
1624
1625     def verify_responder(self, api_r, cfg_r):
1626         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1627         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1628
1629     def verify_transforms(self, api_ts, cfg_ts):
1630         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1631         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1632         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1633
1634     def verify_ike_transforms(self, api_ts, cfg_ts):
1635         self.verify_transforms(api_ts, cfg_ts)
1636         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1637
1638     def verify_esp_transforms(self, api_ts, cfg_ts):
1639         self.verify_transforms(api_ts, cfg_ts)
1640
1641     def verify_auth(self, api_auth, cfg_auth):
1642         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1643         self.assertEqual(api_auth.data, cfg_auth['data'])
1644         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1645
1646     def verify_lifetime_data(self, p, ld):
1647         self.assertEqual(p.lifetime, ld['lifetime'])
1648         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1649         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1650         self.assertEqual(p.handover, ld['handover'])
1651
1652     def verify_profile(self, ap, cp):
1653         self.assertEqual(ap.name, cp['name'])
1654         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1655         self.verify_id(ap.loc_id, cp['loc_id'])
1656         self.verify_id(ap.rem_id, cp['rem_id'])
1657         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1658         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1659         self.verify_responder(ap.responder, cp['responder'])
1660         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1661         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1662         self.verify_auth(ap.auth, cp['auth'])
1663         natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1664         self.assertTrue(natt_dis == ap.natt_disabled)
1665
1666         if 'lifetime_data' in cp:
1667             self.verify_lifetime_data(ap, cp['lifetime_data'])
1668         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1669         if 'tun_itf' in cp:
1670             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1671         else:
1672             self.assertEqual(ap.tun_itf, 0xffffffff)
1673
1674
1675 @tag_fixme_vpp_workers
1676 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1677     """ test responder - responder behind NAT """
1678
1679     IKE_NODE_SUFFIX = 'ip4-natt'
1680
1681     def config_tc(self):
1682         self.config_params({'r_natt': True})
1683
1684
1685 @tag_fixme_vpp_workers
1686 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1687     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1688
1689     def config_tc(self):
1690         self.config_params({
1691             'i_natt': True,
1692             'is_initiator': False,  # seen from test case perspective
1693                                     # thus vpp is initiator
1694             'responder': {'sw_if_index': self.pg0.sw_if_index,
1695                            'addr': self.pg0.remote_ip4},
1696             'ike-crypto': ('AES-GCM-16ICV', 32),
1697             'ike-integ': 'NULL',
1698             'ike-dh': '3072MODPgr',
1699             'ike_transforms': {
1700                 'crypto_alg': 20,  # "aes-gcm-16"
1701                 'crypto_key_size': 256,
1702                 'dh_group': 15,  # "modp-3072"
1703             },
1704             'esp_transforms': {
1705                 'crypto_alg': 12,  # "aes-cbc"
1706                 'crypto_key_size': 256,
1707                 # "hmac-sha2-256-128"
1708                 'integ_alg': 12}})
1709
1710
1711 @tag_fixme_vpp_workers
1712 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1713     """ test ikev2 initiator - pre shared key auth """
1714
1715     def config_tc(self):
1716         self.config_params({
1717             'is_initiator': False,  # seen from test case perspective
1718                                     # thus vpp is initiator
1719             'ike-crypto': ('AES-GCM-16ICV', 32),
1720             'ike-integ': 'NULL',
1721             'ike-dh': '3072MODPgr',
1722             'ike_transforms': {
1723                 'crypto_alg': 20,  # "aes-gcm-16"
1724                 'crypto_key_size': 256,
1725                 'dh_group': 15,  # "modp-3072"
1726             },
1727             'esp_transforms': {
1728                 'crypto_alg': 12,  # "aes-cbc"
1729                 'crypto_key_size': 256,
1730                 # "hmac-sha2-256-128"
1731                 'integ_alg': 12},
1732             'responder_hostname': {'hostname': 'vpp.responder.org',
1733                                    'sw_if_index': self.pg0.sw_if_index}})
1734
1735
1736 @tag_fixme_vpp_workers
1737 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1738     """ test initiator - request window size (1) """
1739
1740     def rekey_respond(self, req, update_child_sa_data):
1741         ih = self.get_ike_header(req)
1742         plain = self.sa.hmac_and_decrypt(ih)
1743         sa = ikev2.IKEv2_payload_SA(plain)
1744         if update_child_sa_data:
1745             prop = sa[ikev2.IKEv2_payload_Proposal]
1746             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1747             self.sa.r_nonce = self.sa.i_nonce
1748             self.sa.child_sas[0].ispi = prop.SPI
1749             self.sa.child_sas[0].rspi = prop.SPI
1750             self.sa.calc_child_keys()
1751
1752         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1753                              flags='Response', exch_type=36,
1754                              id=ih.id, next_payload='Encrypted')
1755         resp = self.encrypt_ike_msg(header, sa, 'SA')
1756         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1757                                     self.sa.dport, self.sa.natt, self.ip6)
1758         self.send_and_assert_no_replies(self.pg0, packet)
1759
1760     def test_initiator(self):
1761         super(TestInitiatorRequestWindowSize, self).test_initiator()
1762         self.pg0.enable_capture()
1763         self.pg_start()
1764         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1765         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1766         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1767         capture = self.pg0.get_capture(2)
1768
1769         # reply in reverse order
1770         self.rekey_respond(capture[1], True)
1771         self.rekey_respond(capture[0], False)
1772
1773         # verify that only the second request was accepted
1774         self.verify_ike_sas()
1775         self.verify_ipsec_sas(is_rekey=True)
1776
1777
1778 @tag_fixme_vpp_workers
1779 class TestInitiatorRekey(TestInitiatorPsk):
1780     """ test ikev2 initiator - rekey """
1781
1782     def rekey_from_initiator(self):
1783         ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1784         self.pg0.enable_capture()
1785         self.pg_start()
1786         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1787         capture = self.pg0.get_capture(1)
1788         ih = self.get_ike_header(capture[0])
1789         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1790         self.assertNotIn('Response', ih.flags)
1791         self.assertIn('Initiator', ih.flags)
1792         plain = self.sa.hmac_and_decrypt(ih)
1793         sa = ikev2.IKEv2_payload_SA(plain)
1794         prop = sa[ikev2.IKEv2_payload_Proposal]
1795         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1796         self.sa.r_nonce = self.sa.i_nonce
1797         # update new responder SPI
1798         self.sa.child_sas[0].ispi = prop.SPI
1799         self.sa.child_sas[0].rspi = prop.SPI
1800         self.sa.calc_child_keys()
1801         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1802                              flags='Response', exch_type=36,
1803                              id=ih.id, next_payload='Encrypted')
1804         resp = self.encrypt_ike_msg(header, sa, 'SA')
1805         packet = self.create_packet(self.pg0, resp, self.sa.sport,
1806                                     self.sa.dport, self.sa.natt, self.ip6)
1807         self.send_and_assert_no_replies(self.pg0, packet)
1808
1809     def test_initiator(self):
1810         super(TestInitiatorRekey, self).test_initiator()
1811         self.rekey_from_initiator()
1812         self.verify_ike_sas()
1813         self.verify_ipsec_sas(is_rekey=True)
1814
1815
1816 @tag_fixme_vpp_workers
1817 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1818     """ test ikev2 initiator - delete IKE SA from responder """
1819
1820     def config_tc(self):
1821         self.config_params({
1822             'del_sa_from_responder': True,
1823             'is_initiator': False,  # seen from test case perspective
1824                                     # thus vpp is initiator
1825             'responder': {'sw_if_index': self.pg0.sw_if_index,
1826                            'addr': self.pg0.remote_ip4},
1827             'ike-crypto': ('AES-GCM-16ICV', 32),
1828             'ike-integ': 'NULL',
1829             'ike-dh': '3072MODPgr',
1830             'ike_transforms': {
1831                 'crypto_alg': 20,  # "aes-gcm-16"
1832                 'crypto_key_size': 256,
1833                 'dh_group': 15,  # "modp-3072"
1834             },
1835             'esp_transforms': {
1836                 'crypto_alg': 12,  # "aes-cbc"
1837                 'crypto_key_size': 256,
1838                 # "hmac-sha2-256-128"
1839                 'integ_alg': 12},
1840             'no_idr_in_auth': True})
1841
1842
1843 @tag_fixme_vpp_workers
1844 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1845     """ test ikev2 responder - initiator behind NAT """
1846
1847     IKE_NODE_SUFFIX = 'ip4-natt'
1848
1849     def config_tc(self):
1850         self.config_params(
1851                 {'i_natt': True})
1852
1853
1854 @tag_fixme_vpp_workers
1855 class TestResponderPsk(TemplateResponder, Ikev2Params):
1856     """ test ikev2 responder - pre shared key auth """
1857     def config_tc(self):
1858         self.config_params()
1859
1860
1861 @tag_fixme_vpp_workers
1862 class TestResponderDpd(TestResponderPsk):
1863     """
1864     Dead peer detection test
1865     """
1866     def config_tc(self):
1867         self.config_params({'dpd_disabled': False})
1868
1869     def tearDown(self):
1870         pass
1871
1872     def test_responder(self):
1873         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1874         super(TestResponderDpd, self).test_responder()
1875         self.pg0.enable_capture()
1876         self.pg_start()
1877         # capture empty request but don't reply
1878         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1879         ih = self.get_ike_header(capture[0])
1880         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1881         plain = self.sa.hmac_and_decrypt(ih)
1882         self.assertEqual(plain, b'')
1883         # wait for SA expiration
1884         time.sleep(3)
1885         ike_sas = self.vapi.ikev2_sa_dump()
1886         self.assertEqual(len(ike_sas), 0)
1887         ipsec_sas = self.vapi.ipsec_sa_dump()
1888         self.assertEqual(len(ipsec_sas), 0)
1889
1890
1891 @tag_fixme_vpp_workers
1892 class TestResponderRekey(TestResponderPsk):
1893     """ test ikev2 responder - rekey """
1894
1895     def rekey_from_initiator(self):
1896         packet = self.create_rekey_request()
1897         self.pg0.add_stream(packet)
1898         self.pg0.enable_capture()
1899         self.pg_start()
1900         capture = self.pg0.get_capture(1)
1901         ih = self.get_ike_header(capture[0])
1902         plain = self.sa.hmac_and_decrypt(ih)
1903         sa = ikev2.IKEv2_payload_SA(plain)
1904         prop = sa[ikev2.IKEv2_payload_Proposal]
1905         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1906         # update new responder SPI
1907         self.sa.child_sas[0].rspi = prop.SPI
1908
1909     def test_responder(self):
1910         super(TestResponderRekey, self).test_responder()
1911         self.rekey_from_initiator()
1912         self.sa.calc_child_keys()
1913         self.verify_ike_sas()
1914         self.verify_ipsec_sas(is_rekey=True)
1915         self.assert_counter(1, 'rekey_req', 'ip4')
1916         r = self.vapi.ikev2_sa_dump()
1917         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
1918
1919
1920 class TestResponderVrf(TestResponderPsk, Ikev2Params):
1921     """ test ikev2 responder - non-default table id """
1922
1923     @classmethod
1924     def setUpClass(cls):
1925         import scapy.contrib.ikev2 as _ikev2
1926         globals()['ikev2'] = _ikev2
1927         super(IkePeer, cls).setUpClass()
1928         cls.create_pg_interfaces(range(1))
1929         cls.vapi.cli("ip table add 1")
1930         cls.vapi.cli("set interface ip table pg0 1")
1931         for i in cls.pg_interfaces:
1932             i.admin_up()
1933             i.config_ip4()
1934             i.resolve_arp()
1935             i.config_ip6()
1936             i.resolve_ndp()
1937
1938     def config_tc(self):
1939         self.config_params({'dpd_disabled': False})
1940
1941     def test_responder(self):
1942         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1943         super(TestResponderVrf, self).test_responder()
1944         self.pg0.enable_capture()
1945         self.pg_start()
1946         capture = self.pg0.get_capture(expected_count=1, timeout=5)
1947         ih = self.get_ike_header(capture[0])
1948         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1949         plain = self.sa.hmac_and_decrypt(ih)
1950         self.assertEqual(plain, b'')
1951
1952
1953 @tag_fixme_vpp_workers
1954 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1955     """ test ikev2 responder - cert based auth """
1956     def config_tc(self):
1957         self.config_params({
1958             'udp_encap': True,
1959             'auth': 'rsa-sig',
1960             'server-key': 'server-key.pem',
1961             'client-key': 'client-key.pem',
1962             'client-cert': 'client-cert.pem',
1963             'server-cert': 'server-cert.pem'})
1964
1965
1966 @tag_fixme_vpp_workers
1967 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1968         (TemplateResponder, Ikev2Params):
1969     """
1970     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1971     """
1972     def config_tc(self):
1973         self.config_params({
1974             'ike-crypto': ('AES-CBC', 16),
1975             'ike-integ': 'SHA2-256-128',
1976             'esp-crypto': ('AES-CBC', 24),
1977             'esp-integ': 'SHA2-384-192',
1978             'ike-dh': '2048MODPgr',
1979             'nonce': os.urandom(256),
1980             'no_idr_in_auth': True})
1981
1982
1983 @tag_fixme_vpp_workers
1984 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1985         (TemplateResponder, Ikev2Params):
1986
1987     """
1988     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1989     """
1990     def config_tc(self):
1991         self.config_params({
1992             'ike-crypto': ('AES-CBC', 32),
1993             'ike-integ': 'SHA2-256-128',
1994             'esp-crypto': ('AES-GCM-16ICV', 32),
1995             'esp-integ': 'NULL',
1996             'ike-dh': '3072MODPgr'})
1997
1998
1999 @tag_fixme_vpp_workers
2000 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2001     """
2002     IKE:AES_GCM_16_256
2003     """
2004
2005     IKE_NODE_SUFFIX = 'ip6'
2006
2007     def config_tc(self):
2008         self.config_params({
2009             'del_sa_from_responder': True,
2010             'ip6': True,
2011             'natt': True,
2012             'ike-crypto': ('AES-GCM-16ICV', 32),
2013             'ike-integ': 'NULL',
2014             'ike-dh': '2048MODPgr',
2015             'loc_ts': {'start_addr': 'ab:cd::0',
2016                        'end_addr': 'ab:cd::10'},
2017             'rem_ts': {'start_addr': '11::0',
2018                        'end_addr': '11::100'}})
2019
2020
2021 @tag_fixme_vpp_workers
2022 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2023     """
2024     Test for keep alive messages
2025     """
2026
2027     def send_empty_req_from_responder(self):
2028         packet = self.create_empty_request()
2029         self.pg0.add_stream(packet)
2030         self.pg0.enable_capture()
2031         self.pg_start()
2032         capture = self.pg0.get_capture(1)
2033         ih = self.get_ike_header(capture[0])
2034         self.assertEqual(ih.id, self.sa.msg_id)
2035         plain = self.sa.hmac_and_decrypt(ih)
2036         self.assertEqual(plain, b'')
2037         self.assert_counter(1, 'keepalive', 'ip4')
2038         r = self.vapi.ikev2_sa_dump()
2039         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2040
2041     def test_initiator(self):
2042         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2043         self.send_empty_req_from_responder()
2044
2045
2046 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2047     """ malformed packet test """
2048
2049     def tearDown(self):
2050         pass
2051
2052     def config_tc(self):
2053         self.config_params()
2054
2055     def create_ike_init_msg(self, length=None, payload=None):
2056         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2057                           flags='Initiator', exch_type='IKE_SA_INIT')
2058         if payload is not None:
2059             msg /= payload
2060         return self.create_packet(self.pg0, msg, self.sa.sport,
2061                                   self.sa.dport)
2062
2063     def verify_bad_packet_length(self):
2064         ike_msg = self.create_ike_init_msg(length=0xdead)
2065         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2066         self.assert_counter(self.pkt_count, 'bad_length')
2067
2068     def verify_bad_sa_payload_length(self):
2069         p = ikev2.IKEv2_payload_SA(length=0xdead)
2070         ike_msg = self.create_ike_init_msg(payload=p)
2071         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2072         self.assert_counter(self.pkt_count, 'malformed_packet')
2073
2074     def test_responder(self):
2075         self.pkt_count = 254
2076         self.verify_bad_packet_length()
2077         self.verify_bad_sa_payload_length()
2078
2079
2080 if __name__ == '__main__':
2081     unittest.main(testRunner=VppTestRunner)