ikev2: fix setting responder/initiator addresses
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
9     Cipher,
10     algorithms,
11     modes,
12 )
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
22
23 try:
24     text_type = unicode
25 except NameError:
26     text_type = str
27
28 KEY_PAD = b"Key Pad for IKEv2"
29 SALT_SIZE = 4
30 GCM_ICV_SIZE = 16
31 GCM_IV_SIZE = 8
32
33
34 # defined in rfc3526
35 # tuple structure is (p, g, key_len)
36 DH = {
37     '2048MODPgr': (long_converter("""
38     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
49
50     '3072MODPgr': (long_converter("""
51     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
67 }
68
69
70 class CryptoAlgo(object):
71     def __init__(self, name, cipher, mode):
72         self.name = name
73         self.cipher = cipher
74         self.mode = mode
75         if self.cipher is not None:
76             self.bs = self.cipher.block_size // 8
77
78             if self.name == 'AES-GCM-16ICV':
79                 self.iv_len = GCM_IV_SIZE
80             else:
81                 self.iv_len = self.bs
82
83     def encrypt(self, data, key, aad=None):
84         iv = os.urandom(self.iv_len)
85         if aad is None:
86             encryptor = Cipher(self.cipher(key), self.mode(iv),
87                                default_backend()).encryptor()
88             return iv + encryptor.update(data) + encryptor.finalize()
89         else:
90             salt = key[-SALT_SIZE:]
91             nonce = salt + iv
92             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93                                default_backend()).encryptor()
94             encryptor.authenticate_additional_data(aad)
95             data = encryptor.update(data) + encryptor.finalize()
96             data += encryptor.tag[:GCM_ICV_SIZE]
97             return iv + data
98
99     def decrypt(self, data, key, aad=None, icv=None):
100         if aad is None:
101             iv = data[:self.iv_len]
102             ct = data[self.iv_len:]
103             decryptor = Cipher(algorithms.AES(key),
104                                self.mode(iv),
105                                default_backend()).decryptor()
106             return decryptor.update(ct) + decryptor.finalize()
107         else:
108             salt = key[-SALT_SIZE:]
109             nonce = salt + data[:GCM_IV_SIZE]
110             ct = data[GCM_IV_SIZE:]
111             key = key[:-SALT_SIZE]
112             decryptor = Cipher(algorithms.AES(key),
113                                self.mode(nonce, icv, len(icv)),
114                                default_backend()).decryptor()
115             decryptor.authenticate_additional_data(aad)
116             pt = decryptor.update(ct) + decryptor.finalize()
117             pad_len = pt[-1] + 1
118             return pt[:-pad_len]
119
120     def pad(self, data):
121         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
122         data = data + b'\x00' * (pad_len - 1)
123         return data + bytes([pad_len - 1])
124
125
126 class AuthAlgo(object):
127     def __init__(self, name, mac, mod, key_len, trunc_len=None):
128         self.name = name
129         self.mac = mac
130         self.mod = mod
131         self.key_len = key_len
132         self.trunc_len = trunc_len or key_len
133
134
135 CRYPTO_ALGOS = {
136     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
137     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
138     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
139                                 mode=modes.GCM),
140 }
141
142 AUTH_ALGOS = {
143     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
144     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
145     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
146     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
147     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
148 }
149
150 PRF_ALGOS = {
151     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
152     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
153                                   hashes.SHA256, 32),
154 }
155
156
157 class IKEv2ChildSA(object):
158     def __init__(self, local_ts, remote_ts, spi=None):
159         self.spi = spi or os.urandom(4)
160         self.local_ts = local_ts
161         self.remote_ts = remote_ts
162
163
164 class IKEv2SA(object):
165     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
166                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
167                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
168                  auth_method='shared-key', priv_key=None, natt=False):
169         self.natt = natt
170         if natt:
171             self.sport = 4500
172             self.dport = 4500
173         else:
174             self.sport = 500
175             self.dport = 500
176         self.msg_id = 0
177         self.dh_params = None
178         self.test = test
179         self.priv_key = priv_key
180         self.is_initiator = is_initiator
181         nonce = nonce or os.urandom(32)
182         self.auth_data = auth_data
183         self.i_id = i_id
184         self.r_id = r_id
185         if isinstance(id_type, str):
186             self.id_type = IDType.value(id_type)
187         else:
188             self.id_type = id_type
189         self.auth_method = auth_method
190         if self.is_initiator:
191             self.rspi = 8 * b'\x00'
192             self.ispi = spi
193             self.i_nonce = nonce
194         else:
195             self.rspi = spi
196             self.ispi = 8 * b'\x00'
197             self.r_nonce = nonce
198         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
199
200     def new_msg_id(self):
201         self.msg_id += 1
202         return self.msg_id
203
204     @property
205     def my_dh_pub_key(self):
206         if self.is_initiator:
207             return self.i_dh_data
208         return self.r_dh_data
209
210     @property
211     def peer_dh_pub_key(self):
212         if self.is_initiator:
213             return self.r_dh_data
214         return self.i_dh_data
215
216     def compute_secret(self):
217         priv = self.dh_private_key
218         peer = self.peer_dh_pub_key
219         p, g, l = self.ike_group
220         return pow(int.from_bytes(peer, 'big'),
221                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
222
223     def generate_dh_data(self):
224         # generate DH keys
225         if self.ike_dh not in DH:
226             raise NotImplementedError('%s not in DH group' % self.ike_dh)
227
228         if self.dh_params is None:
229             dhg = DH[self.ike_dh]
230             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
231             self.dh_params = pn.parameters(default_backend())
232
233         priv = self.dh_params.generate_private_key()
234         pub = priv.public_key()
235         x = priv.private_numbers().x
236         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
237         y = pub.public_numbers().y
238
239         if self.is_initiator:
240             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
241         else:
242             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
243
244     def complete_dh_data(self):
245         self.dh_shared_secret = self.compute_secret()
246
247     def calc_child_keys(self):
248         prf = self.ike_prf_alg.mod()
249         s = self.i_nonce + self.r_nonce
250         c = self.child_sas[0]
251
252         encr_key_len = self.esp_crypto_key_len
253         integ_key_len = self.esp_integ_alg.key_len
254         salt_len = 0 if integ_key_len else 4
255
256         l = (integ_key_len * 2 +
257              encr_key_len * 2 +
258              salt_len * 2)
259         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
260
261         pos = 0
262         c.sk_ei = keymat[pos:pos+encr_key_len]
263         pos += encr_key_len
264
265         if integ_key_len:
266             c.sk_ai = keymat[pos:pos+integ_key_len]
267             pos += integ_key_len
268         else:
269             c.salt_ei = keymat[pos:pos+salt_len]
270             pos += salt_len
271
272         c.sk_er = keymat[pos:pos+encr_key_len]
273         pos += encr_key_len
274
275         if integ_key_len:
276             c.sk_ar = keymat[pos:pos+integ_key_len]
277             pos += integ_key_len
278         else:
279             c.salt_er = keymat[pos:pos+salt_len]
280             pos += salt_len
281
282     def calc_prfplus(self, prf, key, seed, length):
283         r = b''
284         t = None
285         x = 1
286         while len(r) < length and x < 255:
287             if t is not None:
288                 s = t
289             else:
290                 s = b''
291             s = s + seed + bytes([x])
292             t = self.calc_prf(prf, key, s)
293             r = r + t
294             x = x + 1
295
296         if x == 255:
297             return None
298         return r
299
300     def calc_prf(self, prf, key, data):
301         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
302         h.update(data)
303         return h.finalize()
304
305     def calc_keys(self):
306         prf = self.ike_prf_alg.mod()
307         # SKEYSEED = prf(Ni | Nr, g^ir)
308         s = self.i_nonce + self.r_nonce
309         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
310
311         # calculate S = Ni | Nr | SPIi SPIr
312         s = s + self.ispi + self.rspi
313
314         prf_key_trunc = self.ike_prf_alg.trunc_len
315         encr_key_len = self.ike_crypto_key_len
316         tr_prf_key_len = self.ike_prf_alg.key_len
317         integ_key_len = self.ike_integ_alg.key_len
318         if integ_key_len == 0:
319             salt_size = 4
320         else:
321             salt_size = 0
322
323         l = (prf_key_trunc +
324              integ_key_len * 2 +
325              encr_key_len * 2 +
326              tr_prf_key_len * 2 +
327              salt_size * 2)
328         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
329
330         pos = 0
331         self.sk_d = keymat[:pos+prf_key_trunc]
332         pos += prf_key_trunc
333
334         self.sk_ai = keymat[pos:pos+integ_key_len]
335         pos += integ_key_len
336         self.sk_ar = keymat[pos:pos+integ_key_len]
337         pos += integ_key_len
338
339         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
340         pos += encr_key_len + salt_size
341         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
342         pos += encr_key_len + salt_size
343
344         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
345         pos += tr_prf_key_len
346         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
347
348     def generate_authmsg(self, prf, packet):
349         if self.is_initiator:
350             id = self.i_id
351             nonce = self.r_nonce
352             key = self.sk_pi
353         else:
354             id = self.r_id
355             nonce = self.i_nonce
356             key = self.sk_pr
357         data = bytes([self.id_type, 0, 0, 0]) + id
358         id_hash = self.calc_prf(prf, key, data)
359         return packet + nonce + id_hash
360
361     def auth_init(self):
362         prf = self.ike_prf_alg.mod()
363         if self.is_initiator:
364             packet = self.init_req_packet
365         else:
366             packet = self.init_resp_packet
367         authmsg = self.generate_authmsg(prf, raw(packet))
368         if self.auth_method == 'shared-key':
369             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
370             self.auth_data = self.calc_prf(prf, psk, authmsg)
371         elif self.auth_method == 'rsa-sig':
372             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
373                                                 hashes.SHA1())
374         else:
375             raise TypeError('unknown auth method type!')
376
377     def encrypt(self, data, aad=None):
378         data = self.ike_crypto_alg.pad(data)
379         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
380
381     @property
382     def peer_authkey(self):
383         if self.is_initiator:
384             return self.sk_ar
385         return self.sk_ai
386
387     @property
388     def my_authkey(self):
389         if self.is_initiator:
390             return self.sk_ai
391         return self.sk_ar
392
393     @property
394     def my_cryptokey(self):
395         if self.is_initiator:
396             return self.sk_ei
397         return self.sk_er
398
399     @property
400     def peer_cryptokey(self):
401         if self.is_initiator:
402             return self.sk_er
403         return self.sk_ei
404
405     def concat(self, alg, key_len):
406         return alg + '-' + str(key_len * 8)
407
408     @property
409     def vpp_ike_cypto_alg(self):
410         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
411
412     @property
413     def vpp_esp_cypto_alg(self):
414         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
415
416     def verify_hmac(self, ikemsg):
417         integ_trunc = self.ike_integ_alg.trunc_len
418         exp_hmac = ikemsg[-integ_trunc:]
419         data = ikemsg[:-integ_trunc]
420         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
421                                           self.peer_authkey, data)
422         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
423
424     def compute_hmac(self, integ, key, data):
425         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
426         h.update(data)
427         return h.finalize()
428
429     def decrypt(self, data, aad=None, icv=None):
430         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
431
432     def hmac_and_decrypt(self, ike):
433         ep = ike[ikev2.IKEv2_payload_Encrypted]
434         if self.ike_crypto == 'AES-GCM-16ICV':
435             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
436             ct = ep.load[:-GCM_ICV_SIZE]
437             tag = ep.load[-GCM_ICV_SIZE:]
438             return self.decrypt(ct, raw(ike)[:aad_len], tag)
439         else:
440             self.verify_hmac(raw(ike))
441             integ_trunc = self.ike_integ_alg.trunc_len
442
443             # remove ICV and decrypt payload
444             ct = ep.load[:-integ_trunc]
445             return self.decrypt(ct)
446
447     def build_ts_addr(self, ts, version):
448         return {'starting_address_v' + version: ts['start_addr'],
449                 'ending_address_v' + version: ts['end_addr']}
450
451     def generate_ts(self, is_ip4):
452         c = self.child_sas[0]
453         ts_data = {'IP_protocol_ID': 0,
454                    'start_port': 0,
455                    'end_port': 0xffff}
456         if is_ip4:
457             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
458             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
459             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
460             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
461         else:
462             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
463             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
464             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
465             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
466
467         if self.is_initiator:
468             return ([ts1], [ts2])
469         return ([ts2], [ts1])
470
471     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
472         if crypto not in CRYPTO_ALGOS:
473             raise TypeError('unsupported encryption algo %r' % crypto)
474         self.ike_crypto = crypto
475         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
476         self.ike_crypto_key_len = crypto_key_len
477
478         if integ not in AUTH_ALGOS:
479             raise TypeError('unsupported auth algo %r' % integ)
480         self.ike_integ = None if integ == 'NULL' else integ
481         self.ike_integ_alg = AUTH_ALGOS[integ]
482
483         if prf not in PRF_ALGOS:
484             raise TypeError('unsupported prf algo %r' % prf)
485         self.ike_prf = prf
486         self.ike_prf_alg = PRF_ALGOS[prf]
487         self.ike_dh = dh
488         self.ike_group = DH[self.ike_dh]
489
490     def set_esp_props(self, crypto, crypto_key_len, integ):
491         self.esp_crypto_key_len = crypto_key_len
492         if crypto not in CRYPTO_ALGOS:
493             raise TypeError('unsupported encryption algo %r' % crypto)
494         self.esp_crypto = crypto
495         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
496
497         if integ not in AUTH_ALGOS:
498             raise TypeError('unsupported auth algo %r' % integ)
499         self.esp_integ = None if integ == 'NULL' else integ
500         self.esp_integ_alg = AUTH_ALGOS[integ]
501
502     def crypto_attr(self, key_len):
503         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
504             return (0x800e << 16 | key_len << 3, 12)
505         else:
506             raise Exception('unsupported attribute type')
507
508     def ike_crypto_attr(self):
509         return self.crypto_attr(self.ike_crypto_key_len)
510
511     def esp_crypto_attr(self):
512         return self.crypto_attr(self.esp_crypto_key_len)
513
514     def compute_nat_sha1(self, ip, port, rspi=None):
515         if rspi is None:
516             rspi = self.rspi
517         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
518         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
519         digest.update(data)
520         return digest.finalize()
521
522
523 class IkePeer(VppTestCase):
524     """ common class for initiator and responder """
525
526     @classmethod
527     def setUpClass(cls):
528         import scapy.contrib.ikev2 as _ikev2
529         globals()['ikev2'] = _ikev2
530         super(IkePeer, cls).setUpClass()
531         cls.create_pg_interfaces(range(2))
532         for i in cls.pg_interfaces:
533             i.admin_up()
534             i.config_ip4()
535             i.resolve_arp()
536             i.config_ip6()
537             i.resolve_ndp()
538
539     @classmethod
540     def tearDownClass(cls):
541         super(IkePeer, cls).tearDownClass()
542
543     def setUp(self):
544         super(IkePeer, self).setUp()
545         self.config_tc()
546         self.p.add_vpp_config()
547         self.assertIsNotNone(self.p.query_vpp_config())
548         self.sa.generate_dh_data()
549         self.vapi.cli('ikev2 set logging level 4')
550         self.vapi.cli('event-lo clear')
551
552     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
553                       use_ip6=False):
554         if use_ip6:
555             src_ip = src_if.remote_ip6
556             dst_ip = src_if.local_ip6
557             ip_layer = IPv6
558         else:
559             src_ip = src_if.remote_ip4
560             dst_ip = src_if.local_ip4
561             ip_layer = IP
562         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
563                ip_layer(src=src_ip, dst=dst_ip) /
564                UDP(sport=sport, dport=dport))
565         if natt:
566             # insert non ESP marker
567             res = res / Raw(b'\x00' * 4)
568         return res / msg
569
570     def verify_udp(self, udp):
571         self.assertEqual(udp.sport, self.sa.sport)
572         self.assertEqual(udp.dport, self.sa.dport)
573
574     def get_ike_header(self, packet):
575         try:
576             ih = packet[ikev2.IKEv2]
577         except IndexError as e:
578             # this is a workaround for getting IKEv2 layer as both ikev2 and
579             # ipsec register for port 4500
580             esp = packet[ESP]
581             ih = self.verify_and_remove_non_esp_marker(esp)
582         self.assertEqual(ih.version, 0x20)
583         return ih
584
585     def verify_and_remove_non_esp_marker(self, packet):
586         if self.sa.natt:
587             # if we are in nat traversal mode check for non esp marker
588             # and remove it
589             data = raw(packet)
590             self.assertEqual(data[:4], b'\x00' * 4)
591             return ikev2.IKEv2(data[4:])
592         else:
593             return packet
594
595     def encrypt_ike_msg(self, header, plain, first_payload):
596         if self.sa.ike_crypto == 'AES-GCM-16ICV':
597             data = self.sa.ike_crypto_alg.pad(raw(plain))
598             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
599                 len(ikev2.IKEv2_payload_Encrypted())
600             tlen = plen + len(ikev2.IKEv2())
601
602             # prepare aad data
603             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
604                                                  length=plen)
605             header.length = tlen
606             res = header / sk_p
607             encr = self.sa.encrypt(raw(plain), raw(res))
608             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
609                                                  length=plen, load=encr)
610             res = header / sk_p
611         else:
612             encr = self.sa.encrypt(raw(plain))
613             trunc_len = self.sa.ike_integ_alg.trunc_len
614             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
615             tlen = plen + len(ikev2.IKEv2())
616
617             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
618                                                  length=plen, load=encr)
619             header.length = tlen
620             res = header / sk_p
621
622             integ_data = raw(res)
623             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
624                                              self.sa.my_authkey, integ_data)
625             res = res / Raw(hmac_data[:trunc_len])
626         assert(len(res) == tlen)
627         return res
628
629     def verify_ipsec_sas(self):
630         sas = self.vapi.ipsec_sa_dump()
631         self.assertEqual(len(sas), 2)
632         e = VppEnum.vl_api_ipsec_sad_flags_t
633         if self.sa.is_initiator:
634             sa0 = sas[0].entry
635             sa1 = sas[1].entry
636         else:
637             sa1 = sas[0].entry
638             sa0 = sas[1].entry
639
640         c = self.sa.child_sas[0]
641
642         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
643         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
644         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
645
646         if self.sa.esp_integ is None:
647             vpp_integ_alg = 0
648         else:
649             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
650         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
651         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
652
653         # verify crypto keys
654         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
655         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
656         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
657         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
658
659         # verify integ keys
660         if vpp_integ_alg:
661             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
662             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
663             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
664             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
665         else:
666             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
667             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
668
669     def verify_keymat(self, api_keys, keys, name):
670         km = getattr(keys, name)
671         api_km = getattr(api_keys, name)
672         api_km_len = getattr(api_keys, name + '_len')
673         self.assertEqual(len(km), api_km_len)
674         self.assertEqual(km, api_km[:api_km_len])
675
676     def verify_id(self, api_id, exp_id):
677         self.assertEqual(api_id.type, IDType.value(exp_id.type))
678         self.assertEqual(api_id.data_len, exp_id.data_len)
679         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
680
681     def verify_ike_sas(self):
682         r = self.vapi.ikev2_sa_dump()
683         self.assertEqual(len(r), 1)
684         sa = r[0].sa
685         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
686         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
687         if self.ip6:
688             if self.sa.is_initiator:
689                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
690                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
691             else:
692                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
693                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
694         else:
695             if self.sa.is_initiator:
696                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
697                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
698             else:
699                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
700                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
701         self.verify_keymat(sa.keys, self.sa, 'sk_d')
702         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
703         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
704         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
705         self.verify_keymat(sa.keys, self.sa, 'sk_er')
706         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
707         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
708
709         self.assertEqual(sa.i_id.type, self.sa.id_type)
710         self.assertEqual(sa.r_id.type, self.sa.id_type)
711         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
712         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
713         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
714         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
715
716         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
717         self.assertEqual(len(r), 1)
718         csa = r[0].child_sa
719         self.assertEqual(csa.sa_index, sa.sa_index)
720         c = self.sa.child_sas[0]
721         if hasattr(c, 'sk_ai'):
722             self.verify_keymat(csa.keys, c, 'sk_ai')
723             self.verify_keymat(csa.keys, c, 'sk_ar')
724         self.verify_keymat(csa.keys, c, 'sk_ei')
725         self.verify_keymat(csa.keys, c, 'sk_er')
726
727         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
728         tsi = tsi[0]
729         tsr = tsr[0]
730         r = self.vapi.ikev2_traffic_selector_dump(
731                 is_initiator=True, sa_index=sa.sa_index,
732                 child_sa_index=csa.child_sa_index)
733         self.assertEqual(len(r), 1)
734         ts = r[0].ts
735         self.verify_ts(r[0].ts, tsi[0], True)
736
737         r = self.vapi.ikev2_traffic_selector_dump(
738                 is_initiator=False, sa_index=sa.sa_index,
739                 child_sa_index=csa.child_sa_index)
740         self.assertEqual(len(r), 1)
741         self.verify_ts(r[0].ts, tsr[0], False)
742
743         n = self.vapi.ikev2_nonce_get(is_initiator=True,
744                                       sa_index=sa.sa_index)
745         self.verify_nonce(n, self.sa.i_nonce)
746         n = self.vapi.ikev2_nonce_get(is_initiator=False,
747                                       sa_index=sa.sa_index)
748         self.verify_nonce(n, self.sa.r_nonce)
749
750     def verify_nonce(self, api_nonce, nonce):
751         self.assertEqual(api_nonce.data_len, len(nonce))
752         self.assertEqual(api_nonce.nonce, nonce)
753
754     def verify_ts(self, api_ts, ts, is_initiator):
755         if is_initiator:
756             self.assertTrue(api_ts.is_local)
757         else:
758             self.assertFalse(api_ts.is_local)
759
760         if self.p.ts_is_ip4:
761             self.assertEqual(api_ts.start_addr,
762                              IPv4Address(ts.starting_address_v4))
763             self.assertEqual(api_ts.end_addr,
764                              IPv4Address(ts.ending_address_v4))
765         else:
766             self.assertEqual(api_ts.start_addr,
767                              IPv6Address(ts.starting_address_v6))
768             self.assertEqual(api_ts.end_addr,
769                              IPv6Address(ts.ending_address_v6))
770         self.assertEqual(api_ts.start_port, ts.start_port)
771         self.assertEqual(api_ts.end_port, ts.end_port)
772         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
773
774
775 class TemplateInitiator(IkePeer):
776     """ initiator test template """
777
778     def tearDown(self):
779         super(TemplateInitiator, self).tearDown()
780
781     @staticmethod
782     def find_notify_payload(packet, notify_type):
783         n = packet[ikev2.IKEv2_payload_Notify]
784         while n is not None:
785             if n.type == notify_type:
786                 return n
787             n = n.payload
788         return None
789
790     def verify_nat_detection(self, packet):
791         if self.ip6:
792             iph = packet[IPv6]
793         else:
794             iph = packet[IP]
795         udp = packet[UDP]
796
797         # NAT_DETECTION_SOURCE_IP
798         s = self.find_notify_payload(packet, 16388)
799         self.assertIsNotNone(s)
800         src_sha = self.sa.compute_nat_sha1(
801                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
802         self.assertEqual(s.load, src_sha)
803
804         # NAT_DETECTION_DESTINATION_IP
805         s = self.find_notify_payload(packet, 16389)
806         self.assertIsNotNone(s)
807         dst_sha = self.sa.compute_nat_sha1(
808                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
809         self.assertEqual(s.load, dst_sha)
810
811     def verify_sa_init_request(self, packet):
812         ih = packet[ikev2.IKEv2]
813         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
814         self.assertEqual(ih.exch_type, 34)  # SA_INIT
815         self.sa.ispi = ih.init_SPI
816         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
817         self.assertIn('Initiator', ih.flags)
818         self.assertNotIn('Response', ih.flags)
819         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
820         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
821
822         prop = packet[ikev2.IKEv2_payload_Proposal]
823         self.assertEqual(prop.proto, 1)  # proto = ikev2
824         self.assertEqual(prop.proposal, 1)
825         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
826         self.assertEqual(prop.trans[0].transform_id,
827                          self.p.ike_transforms['crypto_alg'])
828         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
829         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
830         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
831         self.assertEqual(prop.trans[2].transform_id,
832                          self.p.ike_transforms['dh_group'])
833
834         self.verify_nat_detection(packet)
835         self.sa.complete_dh_data()
836         self.sa.calc_keys()
837
838     def verify_sa_auth_req(self, packet):
839         ih = self.get_ike_header(packet)
840         self.assertEqual(ih.resp_SPI, self.sa.rspi)
841         self.assertEqual(ih.init_SPI, self.sa.ispi)
842         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
843         self.assertIn('Initiator', ih.flags)
844         self.assertNotIn('Response', ih.flags)
845
846         udp = packet[UDP]
847         self.verify_udp(udp)
848         self.assertEqual(ih.id, self.sa.msg_id + 1)
849         self.sa.msg_id += 1
850         plain = self.sa.hmac_and_decrypt(ih)
851         idi = ikev2.IKEv2_payload_IDi(plain)
852         idr = ikev2.IKEv2_payload_IDr(idi.payload)
853         self.assertEqual(idi.load, self.sa.i_id)
854         self.assertEqual(idr.load, self.sa.r_id)
855
856     def send_init_response(self):
857         tr_attr = self.sa.ike_crypto_attr()
858         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
859                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
860                  key_length=tr_attr[0]) /
861                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
862                  transform_id=self.sa.ike_integ) /
863                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
864                  transform_id=self.sa.ike_prf_alg.name) /
865                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
866                  transform_id=self.sa.ike_dh))
867         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
868                  trans_nb=4, trans=trans))
869         self.sa.init_resp_packet = (
870             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
871                         exch_type='IKE_SA_INIT', flags='Response') /
872             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
873             ikev2.IKEv2_payload_KE(next_payload='Nonce',
874                                    group=self.sa.ike_dh,
875                                    load=self.sa.my_dh_pub_key) /
876             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
877
878         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
879                                      self.sa.sport, self.sa.dport,
880                                      self.sa.natt, self.ip6)
881         self.pg_send(self.pg0, ike_msg)
882         capture = self.pg0.get_capture(1)
883         self.verify_sa_auth_req(capture[0])
884
885     def initiate_sa_init(self):
886         self.pg0.enable_capture()
887         self.pg_start()
888         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
889
890         capture = self.pg0.get_capture(1)
891         self.verify_sa_init_request(capture[0])
892         self.send_init_response()
893
894     def send_auth_response(self):
895         tr_attr = self.sa.esp_crypto_attr()
896         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
897                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
898                  key_length=tr_attr[0]) /
899                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
900                  transform_id=self.sa.esp_integ) /
901                  ikev2.IKEv2_payload_Transform(
902                  transform_type='Extended Sequence Number',
903                  transform_id='No ESN') /
904                  ikev2.IKEv2_payload_Transform(
905                  transform_type='Extended Sequence Number',
906                  transform_id='ESN'))
907
908         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
909                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
910
911         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
912         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
913                  IDtype=self.sa.id_type, load=self.sa.i_id) /
914                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
915                  IDtype=self.sa.id_type, load=self.sa.r_id) /
916                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
917                  auth_type=AuthMethod.value(self.sa.auth_method),
918                  load=self.sa.auth_data) /
919                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
920                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
921                  number_of_TSs=len(tsi),
922                  traffic_selector=tsi) /
923                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
924                  number_of_TSs=len(tsr),
925                  traffic_selector=tsr) /
926                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
927
928         header = ikev2.IKEv2(
929                 init_SPI=self.sa.ispi,
930                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
931                 flags='Response', exch_type='IKE_AUTH')
932
933         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
934         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
935                                     self.sa.dport, self.sa.natt, self.ip6)
936         self.pg_send(self.pg0, packet)
937
938     def test_initiator(self):
939         self.initiate_sa_init()
940         self.sa.auth_init()
941         self.sa.calc_child_keys()
942         self.send_auth_response()
943         self.verify_ike_sas()
944
945
946 class TemplateResponder(IkePeer):
947     """ responder test template """
948
949     def tearDown(self):
950         super(TemplateResponder, self).tearDown()
951         if self.sa.is_initiator:
952             self.initiate_del_sa()
953             r = self.vapi.ikev2_sa_dump()
954             self.assertEqual(len(r), 0)
955
956         self.p.remove_vpp_config()
957         self.assertIsNone(self.p.query_vpp_config())
958
959     def verify_del_sa(self, packet):
960         ih = self.get_ike_header(packet)
961         self.assertEqual(ih.id, self.sa.msg_id)
962         self.assertEqual(ih.exch_type, 37)  # exchange informational
963
964     def initiate_del_sa(self):
965         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
966                              flags='Initiator', exch_type='INFORMATIONAL',
967                              id=self.sa.new_msg_id())
968         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
969         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
970         packet = self.create_packet(self.pg0, ike_msg,
971                                     self.sa.sport, self.sa.dport,
972                                     self.sa.natt, self.ip6)
973         self.pg0.add_stream(packet)
974         self.pg0.enable_capture()
975         self.pg_start()
976         capture = self.pg0.get_capture(1)
977         self.verify_del_sa(capture[0])
978
979     def send_sa_init_req(self, behind_nat=False):
980         tr_attr = self.sa.ike_crypto_attr()
981         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
982                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
983                  key_length=tr_attr[0]) /
984                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
985                  transform_id=self.sa.ike_integ) /
986                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
987                  transform_id=self.sa.ike_prf_alg.name) /
988                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
989                  transform_id=self.sa.ike_dh))
990
991         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
992                  trans_nb=4, trans=trans))
993
994         self.sa.init_req_packet = (
995                 ikev2.IKEv2(init_SPI=self.sa.ispi,
996                             flags='Initiator', exch_type='IKE_SA_INIT') /
997                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
998                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
999                                        group=self.sa.ike_dh,
1000                                        load=self.sa.my_dh_pub_key) /
1001                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1002                                           load=self.sa.i_nonce))
1003
1004         if behind_nat:
1005             src_address = b'\x0a\x0a\x0a\x01'
1006         else:
1007             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1008
1009         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1010         dst_nat = self.sa.compute_nat_sha1(
1011                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1012                 self.sa.dport)
1013         nat_src_detection = ikev2.IKEv2_payload_Notify(
1014                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1015                 next_payload='Notify')
1016         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1017                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1018         self.sa.init_req_packet = (self.sa.init_req_packet /
1019                                    nat_src_detection /
1020                                    nat_dst_detection)
1021
1022         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1023                                      self.sa.sport, self.sa.dport,
1024                                      self.sa.natt, self.ip6)
1025         self.pg0.add_stream(ike_msg)
1026         self.pg0.enable_capture()
1027         self.pg_start()
1028         capture = self.pg0.get_capture(1)
1029         self.verify_sa_init(capture[0])
1030
1031     def send_sa_auth(self):
1032         tr_attr = self.sa.esp_crypto_attr()
1033         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1034                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1035                  key_length=tr_attr[0]) /
1036                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1037                  transform_id=self.sa.esp_integ) /
1038                  ikev2.IKEv2_payload_Transform(
1039                  transform_type='Extended Sequence Number',
1040                  transform_id='No ESN') /
1041                  ikev2.IKEv2_payload_Transform(
1042                  transform_type='Extended Sequence Number',
1043                  transform_id='ESN'))
1044
1045         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1046                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
1047
1048         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1049         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1050                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1051                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1052                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1053                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1054                  auth_type=AuthMethod.value(self.sa.auth_method),
1055                  load=self.sa.auth_data) /
1056                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1057                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1058                  number_of_TSs=len(tsi),
1059                  traffic_selector=tsi) /
1060                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1061                  number_of_TSs=len(tsr),
1062                  traffic_selector=tsr) /
1063                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1064
1065         header = ikev2.IKEv2(
1066                 init_SPI=self.sa.ispi,
1067                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1068                 flags='Initiator', exch_type='IKE_AUTH')
1069
1070         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1071         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1072                                     self.sa.dport, self.sa.natt, self.ip6)
1073         self.pg0.add_stream(packet)
1074         self.pg0.enable_capture()
1075         self.pg_start()
1076         capture = self.pg0.get_capture(1)
1077         self.verify_sa_auth_resp(capture[0])
1078
1079     def verify_sa_init(self, packet):
1080         ih = self.get_ike_header(packet)
1081
1082         self.assertEqual(ih.id, self.sa.msg_id)
1083         self.assertEqual(ih.exch_type, 34)
1084         self.assertTrue('Response' in ih.flags)
1085         self.assertEqual(ih.init_SPI, self.sa.ispi)
1086         self.assertNotEqual(ih.resp_SPI, 0)
1087         self.sa.rspi = ih.resp_SPI
1088         try:
1089             sa = ih[ikev2.IKEv2_payload_SA]
1090             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1091             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1092         except IndexError as e:
1093             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1094             self.logger.error(ih.show())
1095             raise
1096         self.sa.complete_dh_data()
1097         self.sa.calc_keys()
1098         self.sa.auth_init()
1099
1100     def verify_sa_auth_resp(self, packet):
1101         ike = self.get_ike_header(packet)
1102         udp = packet[UDP]
1103         self.verify_udp(udp)
1104         self.assertEqual(ike.id, self.sa.msg_id)
1105         plain = self.sa.hmac_and_decrypt(ike)
1106         self.sa.calc_child_keys()
1107
1108     def test_responder(self):
1109         self.send_sa_init_req(self.sa.natt)
1110         self.send_sa_auth()
1111         self.verify_ipsec_sas()
1112         self.verify_ike_sas()
1113
1114
1115 class Ikev2Params(object):
1116     def config_params(self, params={}):
1117         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1118         ei = VppEnum.vl_api_ipsec_integ_alg_t
1119         self.vpp_enums = {
1120                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1121                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1122                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1123                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1124                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1125                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1126
1127                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1128                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1129                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1130                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1131
1132         is_natt = 'natt' in params and params['natt'] or False
1133         self.p = Profile(self, 'pr1')
1134         self.ip6 = False if 'ip6' not in params else params['ip6']
1135
1136         if 'auth' in params and params['auth'] == 'rsa-sig':
1137             auth_method = 'rsa-sig'
1138             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1139             self.vapi.ikev2_set_local_key(
1140                     key_file=work_dir + params['server-key'])
1141
1142             client_file = work_dir + params['client-cert']
1143             server_pem = open(work_dir + params['server-cert']).read()
1144             client_priv = open(work_dir + params['client-key']).read()
1145             client_priv = load_pem_private_key(str.encode(client_priv), None,
1146                                                default_backend())
1147             self.peer_cert = x509.load_pem_x509_certificate(
1148                     str.encode(server_pem),
1149                     default_backend())
1150             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1151             auth_data = None
1152         else:
1153             auth_data = b'$3cr3tpa$$w0rd'
1154             self.p.add_auth(method='shared-key', data=auth_data)
1155             auth_method = 'shared-key'
1156             client_priv = None
1157
1158         is_init = True if 'is_initiator' not in params else\
1159             params['is_initiator']
1160
1161         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1162         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1163         if is_init:
1164             self.p.add_local_id(**idr)
1165             self.p.add_remote_id(**idi)
1166         else:
1167             self.p.add_local_id(**idi)
1168             self.p.add_remote_id(**idr)
1169
1170         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1171             'loc_ts' not in params else params['loc_ts']
1172         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1173             'rem_ts' not in params else params['rem_ts']
1174         self.p.add_local_ts(**loc_ts)
1175         self.p.add_remote_ts(**rem_ts)
1176         if 'responder' in params:
1177             self.p.add_responder(params['responder'])
1178         if 'ike_transforms' in params:
1179             self.p.add_ike_transforms(params['ike_transforms'])
1180         if 'esp_transforms' in params:
1181             self.p.add_esp_transforms(params['esp_transforms'])
1182
1183         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1184                           is_initiator=is_init,
1185                           id_type=self.p.local_id['id_type'], natt=is_natt,
1186                           priv_key=client_priv, auth_method=auth_method,
1187                           auth_data=auth_data,
1188                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1189
1190         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1191             params['ike-crypto']
1192         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1193             params['ike-integ']
1194         ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1195             params['ike-dh']
1196
1197         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1198             params['esp-crypto']
1199         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1200             params['esp-integ']
1201
1202         self.sa.set_ike_props(
1203                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1204                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1205         self.sa.set_esp_props(
1206                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1207                 integ=esp_integ)
1208
1209
1210 class TestApi(VppTestCase):
1211     """ Test IKEV2 API """
1212     @classmethod
1213     def setUpClass(cls):
1214         super(TestApi, cls).setUpClass()
1215
1216     @classmethod
1217     def tearDownClass(cls):
1218         super(TestApi, cls).tearDownClass()
1219
1220     def tearDown(self):
1221         super(TestApi, self).tearDown()
1222         self.p1.remove_vpp_config()
1223         self.p2.remove_vpp_config()
1224         r = self.vapi.ikev2_profile_dump()
1225         self.assertEqual(len(r), 0)
1226
1227     def configure_profile(self, cfg):
1228         p = Profile(self, cfg['name'])
1229         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1230         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1231         p.add_local_ts(**cfg['loc_ts'])
1232         p.add_remote_ts(**cfg['rem_ts'])
1233         p.add_responder(cfg['responder'])
1234         p.add_ike_transforms(cfg['ike_ts'])
1235         p.add_esp_transforms(cfg['esp_ts'])
1236         p.add_auth(**cfg['auth'])
1237         p.set_udp_encap(cfg['udp_encap'])
1238         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1239         if 'lifetime_data' in cfg:
1240             p.set_lifetime_data(cfg['lifetime_data'])
1241         if 'tun_itf' in cfg:
1242             p.set_tunnel_interface(cfg['tun_itf'])
1243         p.add_vpp_config()
1244         return p
1245
1246     def test_profile_api(self):
1247         """ test profile dump API """
1248         loc_ts4 = {
1249                     'proto': 8,
1250                     'start_port': 1,
1251                     'end_port': 19,
1252                     'start_addr': '3.3.3.2',
1253                     'end_addr': '3.3.3.3',
1254                 }
1255         rem_ts4 = {
1256                     'proto': 9,
1257                     'start_port': 10,
1258                     'end_port': 119,
1259                     'start_addr': '4.5.76.80',
1260                     'end_addr': '2.3.4.6',
1261                 }
1262
1263         loc_ts6 = {
1264                     'proto': 8,
1265                     'start_port': 1,
1266                     'end_port': 19,
1267                     'start_addr': 'ab::1',
1268                     'end_addr': 'ab::4',
1269                 }
1270         rem_ts6 = {
1271                     'proto': 9,
1272                     'start_port': 10,
1273                     'end_port': 119,
1274                     'start_addr': 'cd::12',
1275                     'end_addr': 'cd::13',
1276                 }
1277
1278         conf = {
1279             'p1': {
1280                 'name': 'p1',
1281                 'loc_id': ('fqdn', b'vpp.home'),
1282                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1283                 'loc_ts': loc_ts4,
1284                 'rem_ts': rem_ts4,
1285                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1286                 'ike_ts': {
1287                         'crypto_alg': 20,
1288                         'crypto_key_size': 32,
1289                         'integ_alg': 1,
1290                         'dh_group': 1},
1291                 'esp_ts': {
1292                         'crypto_alg': 13,
1293                         'crypto_key_size': 24,
1294                         'integ_alg': 2},
1295                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1296                 'udp_encap': True,
1297                 'ipsec_over_udp_port': 4501,
1298                 'lifetime_data': {
1299                     'lifetime': 123,
1300                     'lifetime_maxdata': 20192,
1301                     'lifetime_jitter': 9,
1302                     'handover': 132},
1303             },
1304             'p2': {
1305                 'name': 'p2',
1306                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1307                 'rem_id': ('ip6-addr', b'abcd::1'),
1308                 'loc_ts': loc_ts6,
1309                 'rem_ts': rem_ts6,
1310                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1311                 'ike_ts': {
1312                         'crypto_alg': 12,
1313                         'crypto_key_size': 16,
1314                         'integ_alg': 3,
1315                         'dh_group': 3},
1316                 'esp_ts': {
1317                         'crypto_alg': 9,
1318                         'crypto_key_size': 24,
1319                         'integ_alg': 4},
1320                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1321                 'udp_encap': False,
1322                 'ipsec_over_udp_port': 4600,
1323                 'tun_itf': 0}
1324         }
1325         self.p1 = self.configure_profile(conf['p1'])
1326         self.p2 = self.configure_profile(conf['p2'])
1327
1328         r = self.vapi.ikev2_profile_dump()
1329         self.assertEqual(len(r), 2)
1330         self.verify_profile(r[0].profile, conf['p1'])
1331         self.verify_profile(r[1].profile, conf['p2'])
1332
1333     def verify_id(self, api_id, cfg_id):
1334         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1335         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1336
1337     def verify_ts(self, api_ts, cfg_ts):
1338         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1339         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1340         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1341         self.assertEqual(api_ts.start_addr,
1342                          ip_address(text_type(cfg_ts['start_addr'])))
1343         self.assertEqual(api_ts.end_addr,
1344                          ip_address(text_type(cfg_ts['end_addr'])))
1345
1346     def verify_responder(self, api_r, cfg_r):
1347         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1348         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1349
1350     def verify_transforms(self, api_ts, cfg_ts):
1351         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1352         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1353         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1354
1355     def verify_ike_transforms(self, api_ts, cfg_ts):
1356         self.verify_transforms(api_ts, cfg_ts)
1357         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1358
1359     def verify_esp_transforms(self, api_ts, cfg_ts):
1360         self.verify_transforms(api_ts, cfg_ts)
1361
1362     def verify_auth(self, api_auth, cfg_auth):
1363         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1364         self.assertEqual(api_auth.data, cfg_auth['data'])
1365         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1366
1367     def verify_lifetime_data(self, p, ld):
1368         self.assertEqual(p.lifetime, ld['lifetime'])
1369         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1370         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1371         self.assertEqual(p.handover, ld['handover'])
1372
1373     def verify_profile(self, ap, cp):
1374         self.assertEqual(ap.name, cp['name'])
1375         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1376         self.verify_id(ap.loc_id, cp['loc_id'])
1377         self.verify_id(ap.rem_id, cp['rem_id'])
1378         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1379         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1380         self.verify_responder(ap.responder, cp['responder'])
1381         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1382         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1383         self.verify_auth(ap.auth, cp['auth'])
1384         if 'lifetime_data' in cp:
1385             self.verify_lifetime_data(ap, cp['lifetime_data'])
1386         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1387         if 'tun_itf' in cp:
1388             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1389         else:
1390             self.assertEqual(ap.tun_itf, 0xffffffff)
1391
1392
1393 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1394     """ test ikev2 initiator - pre shared key auth """
1395     def config_tc(self):
1396         self.config_params({
1397             'is_initiator': False,  # seen from test case perspective
1398                                     # thus vpp is initiator
1399             'responder': {'sw_if_index': self.pg0.sw_if_index,
1400                            'addr': self.pg0.remote_ip4},
1401             'ike-crypto': ('AES-GCM-16ICV', 32),
1402             'ike-integ': 'NULL',
1403             'ike-dh': '3072MODPgr',
1404             'ike_transforms': {
1405                 'crypto_alg': 20,  # "aes-gcm-16"
1406                 'crypto_key_size': 256,
1407                 'dh_group': 15,  # "modp-3072"
1408             },
1409             'esp_transforms': {
1410                 'crypto_alg': 12,  # "aes-cbc"
1411                 'crypto_key_size': 256,
1412                 # "hmac-sha2-256-128"
1413                 'integ_alg': 12}})
1414
1415
1416 class TestResponderNATT(TemplateResponder, Ikev2Params):
1417     """ test ikev2 responder - nat traversal """
1418     def config_tc(self):
1419         self.config_params(
1420                 {'natt': True})
1421
1422
1423 class TestResponderPsk(TemplateResponder, Ikev2Params):
1424     """ test ikev2 responder - pre shared key auth """
1425     def config_tc(self):
1426         self.config_params()
1427
1428
1429 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1430     """ test ikev2 responder - cert based auth """
1431     def config_tc(self):
1432         self.config_params({
1433             'auth': 'rsa-sig',
1434             'server-key': 'server-key.pem',
1435             'client-key': 'client-key.pem',
1436             'client-cert': 'client-cert.pem',
1437             'server-cert': 'server-cert.pem'})
1438
1439
1440 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1441         (TemplateResponder, Ikev2Params):
1442     """
1443     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1444     """
1445     def config_tc(self):
1446         self.config_params({
1447             'ike-crypto': ('AES-CBC', 16),
1448             'ike-integ': 'SHA2-256-128',
1449             'esp-crypto': ('AES-CBC', 24),
1450             'esp-integ': 'SHA2-384-192',
1451             'ike-dh': '2048MODPgr'})
1452
1453
1454 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1455         (TemplateResponder, Ikev2Params):
1456     """
1457     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1458     """
1459     def config_tc(self):
1460         self.config_params({
1461             'ike-crypto': ('AES-CBC', 32),
1462             'ike-integ': 'SHA2-256-128',
1463             'esp-crypto': ('AES-GCM-16ICV', 32),
1464             'esp-integ': 'NULL',
1465             'ike-dh': '3072MODPgr'})
1466
1467
1468 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1469     """
1470     IKE:AES_GCM_16_256
1471     """
1472     def config_tc(self):
1473         self.config_params({
1474             'ip6': True,
1475             'natt': True,
1476             'ike-crypto': ('AES-GCM-16ICV', 32),
1477             'ike-integ': 'NULL',
1478             'ike-dh': '2048MODPgr',
1479             'loc_ts': {'start_addr': 'ab:cd::0',
1480                        'end_addr': 'ab:cd::10'},
1481             'rem_ts': {'start_addr': '11::0',
1482                        'end_addr': '11::100'}})
1483
1484
1485 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1486     """ malformed packet test """
1487
1488     def tearDown(self):
1489         pass
1490
1491     def config_tc(self):
1492         self.config_params()
1493
1494     def assert_counter(self, count, name, version='ip4'):
1495         node_name = '/err/ikev2-%s/' % version + name
1496         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1497
1498     def create_ike_init_msg(self, length=None, payload=None):
1499         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1500                           flags='Initiator', exch_type='IKE_SA_INIT')
1501         if payload is not None:
1502             msg /= payload
1503         return self.create_packet(self.pg0, msg, self.sa.sport,
1504                                   self.sa.dport)
1505
1506     def verify_bad_packet_length(self):
1507         ike_msg = self.create_ike_init_msg(length=0xdead)
1508         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1509         self.assert_counter(self.pkt_count, 'Bad packet length')
1510
1511     def verify_bad_sa_payload_length(self):
1512         p = ikev2.IKEv2_payload_SA(length=0xdead)
1513         ike_msg = self.create_ike_init_msg(payload=p)
1514         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1515         self.assert_counter(self.pkt_count, 'Malformed packet')
1516
1517     def test_responder(self):
1518         self.pkt_count = 254
1519         self.verify_bad_packet_length()
1520         self.verify_bad_sa_payload_length()
1521
1522
1523 if __name__ == '__main__':
1524     unittest.main(testRunner=VppTestRunner)