ikev2: fix initial contact cleanup
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
8     Cipher,
9     algorithms,
10     modes,
11 )
12 from ipaddress import IPv4Address, IPv6Address, ip_address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.layers.inet6 import IPv6
16 from scapy.packet import raw, Raw
17 from scapy.utils import long_converter
18 from framework import VppTestCase, VppTestRunner
19 from vpp_ikev2 import Profile, IDType, AuthMethod
20 from vpp_papi import VppEnum
21
22 try:
23     text_type = unicode
24 except NameError:
25     text_type = str
26
27 KEY_PAD = b"Key Pad for IKEv2"
28 SALT_SIZE = 4
29 GCM_ICV_SIZE = 16
30 GCM_IV_SIZE = 8
31
32
33 # defined in rfc3526
34 # tuple structure is (p, g, key_len)
35 DH = {
36     '2048MODPgr': (long_converter("""
37     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
38     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
39     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
40     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
41     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
42     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
43     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
44     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
45     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
46     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
47     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
48
49     '3072MODPgr': (long_converter("""
50     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
51     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
52     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
53     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
54     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
55     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
56     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
57     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
58     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
59     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
60     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
61     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
62     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
63     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
64     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
65     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
66 }
67
68
69 class CryptoAlgo(object):
70     def __init__(self, name, cipher, mode):
71         self.name = name
72         self.cipher = cipher
73         self.mode = mode
74         if self.cipher is not None:
75             self.bs = self.cipher.block_size // 8
76
77             if self.name == 'AES-GCM-16ICV':
78                 self.iv_len = GCM_IV_SIZE
79             else:
80                 self.iv_len = self.bs
81
82     def encrypt(self, data, key, aad=None):
83         iv = os.urandom(self.iv_len)
84         if aad is None:
85             encryptor = Cipher(self.cipher(key), self.mode(iv),
86                                default_backend()).encryptor()
87             return iv + encryptor.update(data) + encryptor.finalize()
88         else:
89             salt = key[-SALT_SIZE:]
90             nonce = salt + iv
91             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
92                                default_backend()).encryptor()
93             encryptor.authenticate_additional_data(aad)
94             data = encryptor.update(data) + encryptor.finalize()
95             data += encryptor.tag[:GCM_ICV_SIZE]
96             return iv + data
97
98     def decrypt(self, data, key, aad=None, icv=None):
99         if aad is None:
100             iv = data[:self.iv_len]
101             ct = data[self.iv_len:]
102             decryptor = Cipher(algorithms.AES(key),
103                                self.mode(iv),
104                                default_backend()).decryptor()
105             return decryptor.update(ct) + decryptor.finalize()
106         else:
107             salt = key[-SALT_SIZE:]
108             nonce = salt + data[:GCM_IV_SIZE]
109             ct = data[GCM_IV_SIZE:]
110             key = key[:-SALT_SIZE]
111             decryptor = Cipher(algorithms.AES(key),
112                                self.mode(nonce, icv, len(icv)),
113                                default_backend()).decryptor()
114             decryptor.authenticate_additional_data(aad)
115             pt = decryptor.update(ct) + decryptor.finalize()
116             pad_len = pt[-1] + 1
117             return pt[:-pad_len]
118
119     def pad(self, data):
120         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121         data = data + b'\x00' * (pad_len - 1)
122         return data + bytes([pad_len - 1])
123
124
125 class AuthAlgo(object):
126     def __init__(self, name, mac, mod, key_len, trunc_len=None):
127         self.name = name
128         self.mac = mac
129         self.mod = mod
130         self.key_len = key_len
131         self.trunc_len = trunc_len or key_len
132
133
134 CRYPTO_ALGOS = {
135     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
138                                 mode=modes.GCM),
139 }
140
141 AUTH_ALGOS = {
142     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
147 }
148
149 PRF_ALGOS = {
150     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
152                                   hashes.SHA256, 32),
153 }
154
155
156 class IKEv2ChildSA(object):
157     def __init__(self, local_ts, remote_ts, spi=None):
158         self.spi = spi or os.urandom(4)
159         self.local_ts = local_ts
160         self.remote_ts = remote_ts
161
162
163 class IKEv2SA(object):
164     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
165                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
166                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
167                  auth_method='shared-key', priv_key=None, natt=False):
168         self.natt = natt
169         if natt:
170             self.sport = 4500
171             self.dport = 4500
172         else:
173             self.sport = 500
174             self.dport = 500
175         self.msg_id = 0
176         self.dh_params = None
177         self.test = test
178         self.priv_key = priv_key
179         self.is_initiator = is_initiator
180         nonce = nonce or os.urandom(32)
181         self.auth_data = auth_data
182         self.i_id = i_id
183         self.r_id = r_id
184         if isinstance(id_type, str):
185             self.id_type = IDType.value(id_type)
186         else:
187             self.id_type = id_type
188         self.auth_method = auth_method
189         if self.is_initiator:
190             self.rspi = 8 * b'\x00'
191             self.ispi = spi
192             self.i_nonce = nonce
193         else:
194             self.rspi = spi
195             self.ispi = 8 * b'\x00'
196             self.r_nonce = nonce
197         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
198
199     def new_msg_id(self):
200         self.msg_id += 1
201         return self.msg_id
202
203     @property
204     def my_dh_pub_key(self):
205         if self.is_initiator:
206             return self.i_dh_data
207         return self.r_dh_data
208
209     @property
210     def peer_dh_pub_key(self):
211         if self.is_initiator:
212             return self.r_dh_data
213         return self.i_dh_data
214
215     def compute_secret(self):
216         priv = self.dh_private_key
217         peer = self.peer_dh_pub_key
218         p, g, l = self.ike_group
219         return pow(int.from_bytes(peer, 'big'),
220                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
221
222     def generate_dh_data(self):
223         # generate DH keys
224         if self.ike_dh not in DH:
225             raise NotImplementedError('%s not in DH group' % self.ike_dh)
226
227         if self.dh_params is None:
228             dhg = DH[self.ike_dh]
229             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
230             self.dh_params = pn.parameters(default_backend())
231
232         priv = self.dh_params.generate_private_key()
233         pub = priv.public_key()
234         x = priv.private_numbers().x
235         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
236         y = pub.public_numbers().y
237
238         if self.is_initiator:
239             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
240         else:
241             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
242
243     def complete_dh_data(self):
244         self.dh_shared_secret = self.compute_secret()
245
246     def calc_child_keys(self):
247         prf = self.ike_prf_alg.mod()
248         s = self.i_nonce + self.r_nonce
249         c = self.child_sas[0]
250
251         encr_key_len = self.esp_crypto_key_len
252         integ_key_len = self.esp_integ_alg.key_len
253         salt_len = 0 if integ_key_len else 4
254
255         l = (integ_key_len * 2 +
256              encr_key_len * 2 +
257              salt_len * 2)
258         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
259
260         pos = 0
261         c.sk_ei = keymat[pos:pos+encr_key_len]
262         pos += encr_key_len
263
264         if integ_key_len:
265             c.sk_ai = keymat[pos:pos+integ_key_len]
266             pos += integ_key_len
267         else:
268             c.salt_ei = keymat[pos:pos+salt_len]
269             pos += salt_len
270
271         c.sk_er = keymat[pos:pos+encr_key_len]
272         pos += encr_key_len
273
274         if integ_key_len:
275             c.sk_ar = keymat[pos:pos+integ_key_len]
276             pos += integ_key_len
277         else:
278             c.salt_er = keymat[pos:pos+salt_len]
279             pos += salt_len
280
281     def calc_prfplus(self, prf, key, seed, length):
282         r = b''
283         t = None
284         x = 1
285         while len(r) < length and x < 255:
286             if t is not None:
287                 s = t
288             else:
289                 s = b''
290             s = s + seed + bytes([x])
291             t = self.calc_prf(prf, key, s)
292             r = r + t
293             x = x + 1
294
295         if x == 255:
296             return None
297         return r
298
299     def calc_prf(self, prf, key, data):
300         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
301         h.update(data)
302         return h.finalize()
303
304     def calc_keys(self):
305         prf = self.ike_prf_alg.mod()
306         # SKEYSEED = prf(Ni | Nr, g^ir)
307         s = self.i_nonce + self.r_nonce
308         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
309
310         # calculate S = Ni | Nr | SPIi SPIr
311         s = s + self.ispi + self.rspi
312
313         prf_key_trunc = self.ike_prf_alg.trunc_len
314         encr_key_len = self.ike_crypto_key_len
315         tr_prf_key_len = self.ike_prf_alg.key_len
316         integ_key_len = self.ike_integ_alg.key_len
317         if integ_key_len == 0:
318             salt_size = 4
319         else:
320             salt_size = 0
321
322         l = (prf_key_trunc +
323              integ_key_len * 2 +
324              encr_key_len * 2 +
325              tr_prf_key_len * 2 +
326              salt_size * 2)
327         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
328
329         pos = 0
330         self.sk_d = keymat[:pos+prf_key_trunc]
331         pos += prf_key_trunc
332
333         self.sk_ai = keymat[pos:pos+integ_key_len]
334         pos += integ_key_len
335         self.sk_ar = keymat[pos:pos+integ_key_len]
336         pos += integ_key_len
337
338         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
339         pos += encr_key_len + salt_size
340         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
341         pos += encr_key_len + salt_size
342
343         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
344         pos += tr_prf_key_len
345         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
346
347     def generate_authmsg(self, prf, packet):
348         if self.is_initiator:
349             id = self.i_id
350             nonce = self.r_nonce
351             key = self.sk_pi
352         else:
353             id = self.r_id
354             nonce = self.i_nonce
355             key = self.sk_pr
356         data = bytes([self.id_type, 0, 0, 0]) + id
357         id_hash = self.calc_prf(prf, key, data)
358         return packet + nonce + id_hash
359
360     def auth_init(self):
361         prf = self.ike_prf_alg.mod()
362         if self.is_initiator:
363             packet = self.init_req_packet
364         else:
365             packet = self.init_resp_packet
366         authmsg = self.generate_authmsg(prf, raw(packet))
367         if self.auth_method == 'shared-key':
368             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
369             self.auth_data = self.calc_prf(prf, psk, authmsg)
370         elif self.auth_method == 'rsa-sig':
371             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
372                                                 hashes.SHA1())
373         else:
374             raise TypeError('unknown auth method type!')
375
376     def encrypt(self, data, aad=None):
377         data = self.ike_crypto_alg.pad(data)
378         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
379
380     @property
381     def peer_authkey(self):
382         if self.is_initiator:
383             return self.sk_ar
384         return self.sk_ai
385
386     @property
387     def my_authkey(self):
388         if self.is_initiator:
389             return self.sk_ai
390         return self.sk_ar
391
392     @property
393     def my_cryptokey(self):
394         if self.is_initiator:
395             return self.sk_ei
396         return self.sk_er
397
398     @property
399     def peer_cryptokey(self):
400         if self.is_initiator:
401             return self.sk_er
402         return self.sk_ei
403
404     def concat(self, alg, key_len):
405         return alg + '-' + str(key_len * 8)
406
407     @property
408     def vpp_ike_cypto_alg(self):
409         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
410
411     @property
412     def vpp_esp_cypto_alg(self):
413         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
414
415     def verify_hmac(self, ikemsg):
416         integ_trunc = self.ike_integ_alg.trunc_len
417         exp_hmac = ikemsg[-integ_trunc:]
418         data = ikemsg[:-integ_trunc]
419         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
420                                           self.peer_authkey, data)
421         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
422
423     def compute_hmac(self, integ, key, data):
424         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
425         h.update(data)
426         return h.finalize()
427
428     def decrypt(self, data, aad=None, icv=None):
429         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
430
431     def hmac_and_decrypt(self, ike):
432         ep = ike[ikev2.IKEv2_payload_Encrypted]
433         if self.ike_crypto == 'AES-GCM-16ICV':
434             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
435             ct = ep.load[:-GCM_ICV_SIZE]
436             tag = ep.load[-GCM_ICV_SIZE:]
437             return self.decrypt(ct, raw(ike)[:aad_len], tag)
438         else:
439             self.verify_hmac(raw(ike))
440             integ_trunc = self.ike_integ_alg.trunc_len
441
442             # remove ICV and decrypt payload
443             ct = ep.load[:-integ_trunc]
444             return self.decrypt(ct)
445
446     def build_ts_addr(self, ts, version):
447         return {'starting_address_v' + version: ts['start_addr'],
448                 'ending_address_v' + version: ts['end_addr']}
449
450     def generate_ts(self, is_ip4):
451         c = self.child_sas[0]
452         ts_data = {'IP_protocol_ID': 0,
453                    'start_port': 0,
454                    'end_port': 0xffff}
455         if is_ip4:
456             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
457             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
458             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
459             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
460         else:
461             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
462             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
463             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
464             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
465
466         if self.is_initiator:
467             return ([ts1], [ts2])
468         return ([ts2], [ts1])
469
470     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
471         if crypto not in CRYPTO_ALGOS:
472             raise TypeError('unsupported encryption algo %r' % crypto)
473         self.ike_crypto = crypto
474         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
475         self.ike_crypto_key_len = crypto_key_len
476
477         if integ not in AUTH_ALGOS:
478             raise TypeError('unsupported auth algo %r' % integ)
479         self.ike_integ = None if integ == 'NULL' else integ
480         self.ike_integ_alg = AUTH_ALGOS[integ]
481
482         if prf not in PRF_ALGOS:
483             raise TypeError('unsupported prf algo %r' % prf)
484         self.ike_prf = prf
485         self.ike_prf_alg = PRF_ALGOS[prf]
486         self.ike_dh = dh
487         self.ike_group = DH[self.ike_dh]
488
489     def set_esp_props(self, crypto, crypto_key_len, integ):
490         self.esp_crypto_key_len = crypto_key_len
491         if crypto not in CRYPTO_ALGOS:
492             raise TypeError('unsupported encryption algo %r' % crypto)
493         self.esp_crypto = crypto
494         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
495
496         if integ not in AUTH_ALGOS:
497             raise TypeError('unsupported auth algo %r' % integ)
498         self.esp_integ = None if integ == 'NULL' else integ
499         self.esp_integ_alg = AUTH_ALGOS[integ]
500
501     def crypto_attr(self, key_len):
502         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
503             return (0x800e << 16 | key_len << 3, 12)
504         else:
505             raise Exception('unsupported attribute type')
506
507     def ike_crypto_attr(self):
508         return self.crypto_attr(self.ike_crypto_key_len)
509
510     def esp_crypto_attr(self):
511         return self.crypto_attr(self.esp_crypto_key_len)
512
513     def compute_nat_sha1(self, ip, port):
514         data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
515         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
516         digest.update(data)
517         return digest.finalize()
518
519
520 class IkePeer(VppTestCase):
521     """ common class for initiator and responder """
522
523     @classmethod
524     def setUpClass(cls):
525         import scapy.contrib.ikev2 as _ikev2
526         globals()['ikev2'] = _ikev2
527         super(IkePeer, cls).setUpClass()
528         cls.create_pg_interfaces(range(2))
529         for i in cls.pg_interfaces:
530             i.admin_up()
531             i.config_ip4()
532             i.resolve_arp()
533             i.config_ip6()
534             i.resolve_ndp()
535
536     @classmethod
537     def tearDownClass(cls):
538         super(IkePeer, cls).tearDownClass()
539
540     def setUp(self):
541         super(IkePeer, self).setUp()
542         self.config_tc()
543         self.p.add_vpp_config()
544         self.assertIsNotNone(self.p.query_vpp_config())
545         self.sa.generate_dh_data()
546         self.vapi.cli('ikev2 set logging level 4')
547         self.vapi.cli('event-lo clear')
548
549     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
550                       use_ip6=False):
551         if use_ip6:
552             src_ip = src_if.remote_ip6
553             dst_ip = src_if.local_ip6
554             ip_layer = IPv6
555         else:
556             src_ip = src_if.remote_ip4
557             dst_ip = src_if.local_ip4
558             ip_layer = IP
559         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
560                ip_layer(src=src_ip, dst=dst_ip) /
561                UDP(sport=sport, dport=dport))
562         if natt:
563             # insert non ESP marker
564             res = res / Raw(b'\x00' * 4)
565         return res / msg
566
567     def verify_udp(self, udp):
568         self.assertEqual(udp.sport, self.sa.sport)
569         self.assertEqual(udp.dport, self.sa.dport)
570
571     def get_ike_header(self, packet):
572         try:
573             ih = packet[ikev2.IKEv2]
574         except IndexError as e:
575             # this is a workaround for getting IKEv2 layer as both ikev2 and
576             # ipsec register for port 4500
577             esp = packet[ESP]
578             ih = self.verify_and_remove_non_esp_marker(esp)
579         self.assertEqual(ih.version, 0x20)
580         return ih
581
582     def verify_and_remove_non_esp_marker(self, packet):
583         if self.sa.natt:
584             # if we are in nat traversal mode check for non esp marker
585             # and remove it
586             data = raw(packet)
587             self.assertEqual(data[:4], b'\x00' * 4)
588             return ikev2.IKEv2(data[4:])
589         else:
590             return packet
591
592     def encrypt_ike_msg(self, header, plain, first_payload):
593         if self.sa.ike_crypto == 'AES-GCM-16ICV':
594             data = self.sa.ike_crypto_alg.pad(raw(plain))
595             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
596                 len(ikev2.IKEv2_payload_Encrypted())
597             tlen = plen + len(ikev2.IKEv2())
598
599             # prepare aad data
600             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
601                                                  length=plen)
602             header.length = tlen
603             res = header / sk_p
604             encr = self.sa.encrypt(raw(plain), raw(res))
605             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
606                                                  length=plen, load=encr)
607             res = header / sk_p
608         else:
609             encr = self.sa.encrypt(raw(plain))
610             trunc_len = self.sa.ike_integ_alg.trunc_len
611             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
612             tlen = plen + len(ikev2.IKEv2())
613
614             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
615                                                  length=plen, load=encr)
616             header.length = tlen
617             res = header / sk_p
618
619             integ_data = raw(res)
620             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
621                                              self.sa.my_authkey, integ_data)
622             res = res / Raw(hmac_data[:trunc_len])
623         assert(len(res) == tlen)
624         return res
625
626     def verify_ipsec_sas(self):
627         sas = self.vapi.ipsec_sa_dump()
628         self.assertEqual(len(sas), 2)
629         e = VppEnum.vl_api_ipsec_sad_flags_t
630         if self.sa.is_initiator:
631             sa0 = sas[0].entry
632             sa1 = sas[1].entry
633         else:
634             sa1 = sas[0].entry
635             sa0 = sas[1].entry
636
637         c = self.sa.child_sas[0]
638
639         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
640         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
641         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
642
643         if self.sa.esp_integ is None:
644             vpp_integ_alg = 0
645         else:
646             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
647         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
648         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
649
650         # verify crypto keys
651         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
652         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
653         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
654         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
655
656         # verify integ keys
657         if vpp_integ_alg:
658             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
659             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
660             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
661             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
662         else:
663             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
664             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
665
666     def verify_keymat(self, api_keys, keys, name):
667         km = getattr(keys, name)
668         api_km = getattr(api_keys, name)
669         api_km_len = getattr(api_keys, name + '_len')
670         self.assertEqual(len(km), api_km_len)
671         self.assertEqual(km, api_km[:api_km_len])
672
673     def verify_id(self, api_id, exp_id):
674         self.assertEqual(api_id.type, IDType.value(exp_id.type))
675         self.assertEqual(api_id.data_len, exp_id.data_len)
676         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
677
678     def verify_ike_sas(self):
679         r = self.vapi.ikev2_sa_dump()
680         self.assertEqual(len(r), 1)
681         sa = r[0].sa
682         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
683         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
684         if self.ip6:
685             if self.sa.is_initiator:
686                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
687                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
688             else:
689                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
690                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
691         else:
692             if self.sa.is_initiator:
693                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
694                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
695             else:
696                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
697                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
698         self.verify_keymat(sa.keys, self.sa, 'sk_d')
699         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
700         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
701         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
702         self.verify_keymat(sa.keys, self.sa, 'sk_er')
703         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
704         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
705
706         self.assertEqual(sa.i_id.type, self.sa.id_type)
707         self.assertEqual(sa.r_id.type, self.sa.id_type)
708         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
709         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
710         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
711         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
712
713         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
714         self.assertEqual(len(r), 1)
715         csa = r[0].child_sa
716         self.assertEqual(csa.sa_index, sa.sa_index)
717         c = self.sa.child_sas[0]
718         if hasattr(c, 'sk_ai'):
719             self.verify_keymat(csa.keys, c, 'sk_ai')
720             self.verify_keymat(csa.keys, c, 'sk_ar')
721         self.verify_keymat(csa.keys, c, 'sk_ei')
722         self.verify_keymat(csa.keys, c, 'sk_er')
723
724         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
725         tsi = tsi[0]
726         tsr = tsr[0]
727         r = self.vapi.ikev2_traffic_selector_dump(
728                 is_initiator=True, sa_index=sa.sa_index,
729                 child_sa_index=csa.child_sa_index)
730         self.assertEqual(len(r), 1)
731         ts = r[0].ts
732         self.verify_ts(r[0].ts, tsi[0], True)
733
734         r = self.vapi.ikev2_traffic_selector_dump(
735                 is_initiator=False, sa_index=sa.sa_index,
736                 child_sa_index=csa.child_sa_index)
737         self.assertEqual(len(r), 1)
738         self.verify_ts(r[0].ts, tsr[0], False)
739
740         n = self.vapi.ikev2_nonce_get(is_initiator=True,
741                                       sa_index=sa.sa_index)
742         self.verify_nonce(n, self.sa.i_nonce)
743         n = self.vapi.ikev2_nonce_get(is_initiator=False,
744                                       sa_index=sa.sa_index)
745         self.verify_nonce(n, self.sa.r_nonce)
746
747     def verify_nonce(self, api_nonce, nonce):
748         self.assertEqual(api_nonce.data_len, len(nonce))
749         self.assertEqual(api_nonce.nonce, nonce)
750
751     def verify_ts(self, api_ts, ts, is_initiator):
752         if is_initiator:
753             self.assertTrue(api_ts.is_local)
754         else:
755             self.assertFalse(api_ts.is_local)
756
757         if self.p.ts_is_ip4:
758             self.assertEqual(api_ts.start_addr,
759                              IPv4Address(ts.starting_address_v4))
760             self.assertEqual(api_ts.end_addr,
761                              IPv4Address(ts.ending_address_v4))
762         else:
763             self.assertEqual(api_ts.start_addr,
764                              IPv6Address(ts.starting_address_v6))
765             self.assertEqual(api_ts.end_addr,
766                              IPv6Address(ts.ending_address_v6))
767         self.assertEqual(api_ts.start_port, ts.start_port)
768         self.assertEqual(api_ts.end_port, ts.end_port)
769         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
770
771
772 class TemplateInitiator(IkePeer):
773     """ initiator test template """
774
775     def tearDown(self):
776         super(TemplateInitiator, self).tearDown()
777
778     def verify_sa_init_request(self, packet):
779         ih = packet[ikev2.IKEv2]
780         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
781         self.assertEqual(ih.exch_type, 34)  # SA_INIT
782         self.sa.ispi = ih.init_SPI
783         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
784         self.assertIn('Initiator', ih.flags)
785         self.assertNotIn('Response', ih.flags)
786         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
787         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
788
789         prop = packet[ikev2.IKEv2_payload_Proposal]
790         self.assertEqual(prop.proto, 1)  # proto = ikev2
791         self.assertEqual(prop.proposal, 1)
792         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
793         self.assertEqual(prop.trans[0].transform_id,
794                          self.p.ike_transforms['crypto_alg'])
795         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
796         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
797         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
798         self.assertEqual(prop.trans[2].transform_id,
799                          self.p.ike_transforms['dh_group'])
800
801         self.sa.complete_dh_data()
802         self.sa.calc_keys()
803
804     def verify_sa_auth_req(self, packet):
805         ih = self.get_ike_header(packet)
806         self.assertEqual(ih.resp_SPI, self.sa.rspi)
807         self.assertEqual(ih.init_SPI, self.sa.ispi)
808         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
809         self.assertIn('Initiator', ih.flags)
810         self.assertNotIn('Response', ih.flags)
811
812         udp = packet[UDP]
813         self.verify_udp(udp)
814         self.assertEqual(ih.id, self.sa.msg_id + 1)
815         self.sa.msg_id += 1
816         plain = self.sa.hmac_and_decrypt(ih)
817         idi = ikev2.IKEv2_payload_IDi(plain)
818         idr = ikev2.IKEv2_payload_IDr(idi.payload)
819         self.assertEqual(idi.load, self.sa.i_id)
820         self.assertEqual(idr.load, self.sa.r_id)
821
822     def send_init_response(self):
823         tr_attr = self.sa.ike_crypto_attr()
824         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
825                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
826                  key_length=tr_attr[0]) /
827                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
828                  transform_id=self.sa.ike_integ) /
829                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
830                  transform_id=self.sa.ike_prf_alg.name) /
831                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
832                  transform_id=self.sa.ike_dh))
833         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
834                  trans_nb=4, trans=trans))
835         self.sa.init_resp_packet = (
836             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
837                         exch_type='IKE_SA_INIT', flags='Response') /
838             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
839             ikev2.IKEv2_payload_KE(next_payload='Nonce',
840                                    group=self.sa.ike_dh,
841                                    load=self.sa.my_dh_pub_key) /
842             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
843
844         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
845                                      self.sa.sport, self.sa.dport,
846                                      self.sa.natt, self.ip6)
847         self.pg_send(self.pg0, ike_msg)
848         capture = self.pg0.get_capture(1)
849         self.verify_sa_auth_req(capture[0])
850
851     def initiate_sa_init(self):
852         self.pg0.enable_capture()
853         self.pg_start()
854         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
855
856         capture = self.pg0.get_capture(1)
857         self.verify_sa_init_request(capture[0])
858         self.send_init_response()
859
860     def send_auth_response(self):
861         tr_attr = self.sa.esp_crypto_attr()
862         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
863                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
864                  key_length=tr_attr[0]) /
865                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
866                  transform_id=self.sa.esp_integ) /
867                  ikev2.IKEv2_payload_Transform(
868                  transform_type='Extended Sequence Number',
869                  transform_id='No ESN') /
870                  ikev2.IKEv2_payload_Transform(
871                  transform_type='Extended Sequence Number',
872                  transform_id='ESN'))
873
874         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
875                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
876
877         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
878         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
879                  IDtype=self.sa.id_type, load=self.sa.i_id) /
880                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
881                  IDtype=self.sa.id_type, load=self.sa.r_id) /
882                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
883                  auth_type=AuthMethod.value(self.sa.auth_method),
884                  load=self.sa.auth_data) /
885                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
886                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
887                  number_of_TSs=len(tsi),
888                  traffic_selector=tsi) /
889                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
890                  number_of_TSs=len(tsr),
891                  traffic_selector=tsr) /
892                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
893
894         header = ikev2.IKEv2(
895                 init_SPI=self.sa.ispi,
896                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
897                 flags='Response', exch_type='IKE_AUTH')
898
899         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
900         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
901                                     self.sa.dport, self.sa.natt, self.ip6)
902         self.pg_send(self.pg0, packet)
903
904     def test_initiator(self):
905         self.initiate_sa_init()
906         self.sa.auth_init()
907         self.sa.calc_child_keys()
908         self.send_auth_response()
909         self.verify_ike_sas()
910
911
912 class TemplateResponder(IkePeer):
913     """ responder test template """
914
915     def tearDown(self):
916         super(TemplateResponder, self).tearDown()
917         if self.sa.is_initiator:
918             self.initiate_del_sa()
919             r = self.vapi.ikev2_sa_dump()
920             self.assertEqual(len(r), 0)
921
922         self.p.remove_vpp_config()
923         self.assertIsNone(self.p.query_vpp_config())
924
925     def verify_del_sa(self, packet):
926         ih = self.get_ike_header(packet)
927         self.assertEqual(ih.id, self.sa.msg_id)
928         self.assertEqual(ih.exch_type, 37)  # exchange informational
929
930     def initiate_del_sa(self):
931         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
932                              flags='Initiator', exch_type='INFORMATIONAL',
933                              id=self.sa.new_msg_id())
934         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
935         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
936         packet = self.create_packet(self.pg0, ike_msg,
937                                     self.sa.sport, self.sa.dport,
938                                     self.sa.natt, self.ip6)
939         self.pg0.add_stream(packet)
940         self.pg0.enable_capture()
941         self.pg_start()
942         capture = self.pg0.get_capture(1)
943         self.verify_del_sa(capture[0])
944
945     def send_sa_init_req(self, behind_nat=False):
946         tr_attr = self.sa.ike_crypto_attr()
947         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
948                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
949                  key_length=tr_attr[0]) /
950                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
951                  transform_id=self.sa.ike_integ) /
952                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
953                  transform_id=self.sa.ike_prf_alg.name) /
954                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
955                  transform_id=self.sa.ike_dh))
956
957         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
958                  trans_nb=4, trans=trans))
959
960         if behind_nat:
961             next_payload = 'Notify'
962         else:
963             next_payload = None
964
965         self.sa.init_req_packet = (
966                 ikev2.IKEv2(init_SPI=self.sa.ispi,
967                             flags='Initiator', exch_type='IKE_SA_INIT') /
968                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
969                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
970                                        group=self.sa.ike_dh,
971                                        load=self.sa.my_dh_pub_key) /
972                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
973                                           load=self.sa.i_nonce))
974
975         if behind_nat:
976             src_address = b'\x0a\x0a\x0a\x01'
977         else:
978             src_address = bytes(self.pg0.local_ip4, 'ascii')
979
980         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
981         dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
982                                            self.sa.sport)
983         nat_src_detection = ikev2.IKEv2_payload_Notify(
984                 type='NAT_DETECTION_SOURCE_IP', load=src_nat)
985         nat_dst_detection = ikev2.IKEv2_payload_Notify(
986                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
987         self.sa.init_req_packet = (self.sa.init_req_packet /
988                                    nat_src_detection /
989                                    nat_dst_detection)
990
991         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
992                                      self.sa.sport, self.sa.dport,
993                                      self.sa.natt, self.ip6)
994         self.pg0.add_stream(ike_msg)
995         self.pg0.enable_capture()
996         self.pg_start()
997         capture = self.pg0.get_capture(1)
998         self.verify_sa_init(capture[0])
999
1000     def send_sa_auth(self):
1001         tr_attr = self.sa.esp_crypto_attr()
1002         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1003                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1004                  key_length=tr_attr[0]) /
1005                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1006                  transform_id=self.sa.esp_integ) /
1007                  ikev2.IKEv2_payload_Transform(
1008                  transform_type='Extended Sequence Number',
1009                  transform_id='No ESN') /
1010                  ikev2.IKEv2_payload_Transform(
1011                  transform_type='Extended Sequence Number',
1012                  transform_id='ESN'))
1013
1014         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1015                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
1016
1017         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1018         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1019                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1020                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1021                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1022                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1023                  auth_type=AuthMethod.value(self.sa.auth_method),
1024                  load=self.sa.auth_data) /
1025                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1026                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1027                  number_of_TSs=len(tsi),
1028                  traffic_selector=tsi) /
1029                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1030                  number_of_TSs=len(tsr),
1031                  traffic_selector=tsr) /
1032                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1033
1034         header = ikev2.IKEv2(
1035                 init_SPI=self.sa.ispi,
1036                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1037                 flags='Initiator', exch_type='IKE_AUTH')
1038
1039         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1040         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1041                                     self.sa.dport, self.sa.natt, self.ip6)
1042         self.pg0.add_stream(packet)
1043         self.pg0.enable_capture()
1044         self.pg_start()
1045         capture = self.pg0.get_capture(1)
1046         self.verify_sa_auth_resp(capture[0])
1047
1048     def verify_sa_init(self, packet):
1049         ih = self.get_ike_header(packet)
1050
1051         self.assertEqual(ih.id, self.sa.msg_id)
1052         self.assertEqual(ih.exch_type, 34)
1053         self.assertTrue('Response' in ih.flags)
1054         self.assertEqual(ih.init_SPI, self.sa.ispi)
1055         self.assertNotEqual(ih.resp_SPI, 0)
1056         self.sa.rspi = ih.resp_SPI
1057         try:
1058             sa = ih[ikev2.IKEv2_payload_SA]
1059             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1060             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1061         except IndexError as e:
1062             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1063             self.logger.error(ih.show())
1064             raise
1065         self.sa.complete_dh_data()
1066         self.sa.calc_keys()
1067         self.sa.auth_init()
1068
1069     def verify_sa_auth_resp(self, packet):
1070         ike = self.get_ike_header(packet)
1071         udp = packet[UDP]
1072         self.verify_udp(udp)
1073         self.assertEqual(ike.id, self.sa.msg_id)
1074         plain = self.sa.hmac_and_decrypt(ike)
1075         self.sa.calc_child_keys()
1076
1077     def test_responder(self):
1078         self.send_sa_init_req(self.sa.natt)
1079         self.send_sa_auth()
1080         self.verify_ipsec_sas()
1081         self.verify_ike_sas()
1082
1083
1084 class Ikev2Params(object):
1085     def config_params(self, params={}):
1086         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1087         ei = VppEnum.vl_api_ipsec_integ_alg_t
1088         self.vpp_enums = {
1089                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1090                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1091                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1092                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1093                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1094                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1095
1096                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1097                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1098                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1099                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1100
1101         is_natt = 'natt' in params and params['natt'] or False
1102         self.p = Profile(self, 'pr1')
1103         self.ip6 = False if 'ip6' not in params else params['ip6']
1104
1105         if 'auth' in params and params['auth'] == 'rsa-sig':
1106             auth_method = 'rsa-sig'
1107             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1108             self.vapi.ikev2_set_local_key(
1109                     key_file=work_dir + params['server-key'])
1110
1111             client_file = work_dir + params['client-cert']
1112             server_pem = open(work_dir + params['server-cert']).read()
1113             client_priv = open(work_dir + params['client-key']).read()
1114             client_priv = load_pem_private_key(str.encode(client_priv), None,
1115                                                default_backend())
1116             self.peer_cert = x509.load_pem_x509_certificate(
1117                     str.encode(server_pem),
1118                     default_backend())
1119             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1120             auth_data = None
1121         else:
1122             auth_data = b'$3cr3tpa$$w0rd'
1123             self.p.add_auth(method='shared-key', data=auth_data)
1124             auth_method = 'shared-key'
1125             client_priv = None
1126
1127         is_init = True if 'is_initiator' not in params else\
1128             params['is_initiator']
1129
1130         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1131         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1132         if is_init:
1133             self.p.add_local_id(**idr)
1134             self.p.add_remote_id(**idi)
1135         else:
1136             self.p.add_local_id(**idi)
1137             self.p.add_remote_id(**idr)
1138
1139         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1140             'loc_ts' not in params else params['loc_ts']
1141         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1142             'rem_ts' not in params else params['rem_ts']
1143         self.p.add_local_ts(**loc_ts)
1144         self.p.add_remote_ts(**rem_ts)
1145         if 'responder' in params:
1146             self.p.add_responder(params['responder'])
1147         if 'ike_transforms' in params:
1148             self.p.add_ike_transforms(params['ike_transforms'])
1149         if 'esp_transforms' in params:
1150             self.p.add_esp_transforms(params['esp_transforms'])
1151
1152         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1153                           is_initiator=is_init,
1154                           id_type=self.p.local_id['id_type'], natt=is_natt,
1155                           priv_key=client_priv, auth_method=auth_method,
1156                           auth_data=auth_data,
1157                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1158
1159         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1160             params['ike-crypto']
1161         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1162             params['ike-integ']
1163         ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1164             params['ike-dh']
1165
1166         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1167             params['esp-crypto']
1168         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1169             params['esp-integ']
1170
1171         self.sa.set_ike_props(
1172                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1173                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1174         self.sa.set_esp_props(
1175                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1176                 integ=esp_integ)
1177
1178
1179 class TestApi(VppTestCase):
1180     """ Test IKEV2 API """
1181     @classmethod
1182     def setUpClass(cls):
1183         super(TestApi, cls).setUpClass()
1184
1185     @classmethod
1186     def tearDownClass(cls):
1187         super(TestApi, cls).tearDownClass()
1188
1189     def tearDown(self):
1190         super(TestApi, self).tearDown()
1191         self.p1.remove_vpp_config()
1192         self.p2.remove_vpp_config()
1193         r = self.vapi.ikev2_profile_dump()
1194         self.assertEqual(len(r), 0)
1195
1196     def configure_profile(self, cfg):
1197         p = Profile(self, cfg['name'])
1198         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1199         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1200         p.add_local_ts(**cfg['loc_ts'])
1201         p.add_remote_ts(**cfg['rem_ts'])
1202         p.add_responder(cfg['responder'])
1203         p.add_ike_transforms(cfg['ike_ts'])
1204         p.add_esp_transforms(cfg['esp_ts'])
1205         p.add_auth(**cfg['auth'])
1206         p.set_udp_encap(cfg['udp_encap'])
1207         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1208         if 'lifetime_data' in cfg:
1209             p.set_lifetime_data(cfg['lifetime_data'])
1210         if 'tun_itf' in cfg:
1211             p.set_tunnel_interface(cfg['tun_itf'])
1212         p.add_vpp_config()
1213         return p
1214
1215     def test_profile_api(self):
1216         """ test profile dump API """
1217         loc_ts4 = {
1218                     'proto': 8,
1219                     'start_port': 1,
1220                     'end_port': 19,
1221                     'start_addr': '3.3.3.2',
1222                     'end_addr': '3.3.3.3',
1223                 }
1224         rem_ts4 = {
1225                     'proto': 9,
1226                     'start_port': 10,
1227                     'end_port': 119,
1228                     'start_addr': '4.5.76.80',
1229                     'end_addr': '2.3.4.6',
1230                 }
1231
1232         loc_ts6 = {
1233                     'proto': 8,
1234                     'start_port': 1,
1235                     'end_port': 19,
1236                     'start_addr': 'ab::1',
1237                     'end_addr': 'ab::4',
1238                 }
1239         rem_ts6 = {
1240                     'proto': 9,
1241                     'start_port': 10,
1242                     'end_port': 119,
1243                     'start_addr': 'cd::12',
1244                     'end_addr': 'cd::13',
1245                 }
1246
1247         conf = {
1248             'p1': {
1249                 'name': 'p1',
1250                 'loc_id': ('fqdn', b'vpp.home'),
1251                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1252                 'loc_ts': loc_ts4,
1253                 'rem_ts': rem_ts4,
1254                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1255                 'ike_ts': {
1256                         'crypto_alg': 20,
1257                         'crypto_key_size': 32,
1258                         'integ_alg': 1,
1259                         'dh_group': 1},
1260                 'esp_ts': {
1261                         'crypto_alg': 13,
1262                         'crypto_key_size': 24,
1263                         'integ_alg': 2},
1264                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1265                 'udp_encap': True,
1266                 'ipsec_over_udp_port': 4501,
1267                 'lifetime_data': {
1268                     'lifetime': 123,
1269                     'lifetime_maxdata': 20192,
1270                     'lifetime_jitter': 9,
1271                     'handover': 132},
1272             },
1273             'p2': {
1274                 'name': 'p2',
1275                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1276                 'rem_id': ('ip6-addr', b'abcd::1'),
1277                 'loc_ts': loc_ts6,
1278                 'rem_ts': rem_ts6,
1279                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1280                 'ike_ts': {
1281                         'crypto_alg': 12,
1282                         'crypto_key_size': 16,
1283                         'integ_alg': 3,
1284                         'dh_group': 3},
1285                 'esp_ts': {
1286                         'crypto_alg': 9,
1287                         'crypto_key_size': 24,
1288                         'integ_alg': 4},
1289                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1290                 'udp_encap': False,
1291                 'ipsec_over_udp_port': 4600,
1292                 'tun_itf': 0}
1293         }
1294         self.p1 = self.configure_profile(conf['p1'])
1295         self.p2 = self.configure_profile(conf['p2'])
1296
1297         r = self.vapi.ikev2_profile_dump()
1298         self.assertEqual(len(r), 2)
1299         self.verify_profile(r[0].profile, conf['p1'])
1300         self.verify_profile(r[1].profile, conf['p2'])
1301
1302     def verify_id(self, api_id, cfg_id):
1303         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1304         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1305
1306     def verify_ts(self, api_ts, cfg_ts):
1307         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1308         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1309         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1310         self.assertEqual(api_ts.start_addr,
1311                          ip_address(text_type(cfg_ts['start_addr'])))
1312         self.assertEqual(api_ts.end_addr,
1313                          ip_address(text_type(cfg_ts['end_addr'])))
1314
1315     def verify_responder(self, api_r, cfg_r):
1316         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1317         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1318
1319     def verify_transforms(self, api_ts, cfg_ts):
1320         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1321         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1322         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1323
1324     def verify_ike_transforms(self, api_ts, cfg_ts):
1325         self.verify_transforms(api_ts, cfg_ts)
1326         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1327
1328     def verify_esp_transforms(self, api_ts, cfg_ts):
1329         self.verify_transforms(api_ts, cfg_ts)
1330
1331     def verify_auth(self, api_auth, cfg_auth):
1332         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1333         self.assertEqual(api_auth.data, cfg_auth['data'])
1334         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1335
1336     def verify_lifetime_data(self, p, ld):
1337         self.assertEqual(p.lifetime, ld['lifetime'])
1338         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1339         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1340         self.assertEqual(p.handover, ld['handover'])
1341
1342     def verify_profile(self, ap, cp):
1343         self.assertEqual(ap.name, cp['name'])
1344         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1345         self.verify_id(ap.loc_id, cp['loc_id'])
1346         self.verify_id(ap.rem_id, cp['rem_id'])
1347         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1348         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1349         self.verify_responder(ap.responder, cp['responder'])
1350         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1351         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1352         self.verify_auth(ap.auth, cp['auth'])
1353         if 'lifetime_data' in cp:
1354             self.verify_lifetime_data(ap, cp['lifetime_data'])
1355         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1356         if 'tun_itf' in cp:
1357             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1358         else:
1359             self.assertEqual(ap.tun_itf, 0xffffffff)
1360
1361
1362 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1363     """ test ikev2 initiator - pre shared key auth """
1364     def config_tc(self):
1365         self.config_params({
1366             'is_initiator': False,  # seen from test case perspective
1367                                     # thus vpp is initiator
1368             'responder': {'sw_if_index': self.pg0.sw_if_index,
1369                            'addr': self.pg0.remote_ip4},
1370             'ike-crypto': ('AES-GCM-16ICV', 32),
1371             'ike-integ': 'NULL',
1372             'ike-dh': '3072MODPgr',
1373             'ike_transforms': {
1374                 'crypto_alg': 20,  # "aes-gcm-16"
1375                 'crypto_key_size': 256,
1376                 'dh_group': 15,  # "modp-3072"
1377             },
1378             'esp_transforms': {
1379                 'crypto_alg': 12,  # "aes-cbc"
1380                 'crypto_key_size': 256,
1381                 # "hmac-sha2-256-128"
1382                 'integ_alg': 12}})
1383
1384
1385 class TestResponderNATT(TemplateResponder, Ikev2Params):
1386     """ test ikev2 responder - nat traversal """
1387     def config_tc(self):
1388         self.config_params(
1389                 {'natt': True})
1390
1391
1392 class TestResponderPsk(TemplateResponder, Ikev2Params):
1393     """ test ikev2 responder - pre shared key auth """
1394     def config_tc(self):
1395         self.config_params()
1396
1397
1398 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1399     """ test ikev2 responder - cert based auth """
1400     def config_tc(self):
1401         self.config_params({
1402             'auth': 'rsa-sig',
1403             'server-key': 'server-key.pem',
1404             'client-key': 'client-key.pem',
1405             'client-cert': 'client-cert.pem',
1406             'server-cert': 'server-cert.pem'})
1407
1408
1409 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1410         (TemplateResponder, Ikev2Params):
1411     """
1412     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1413     """
1414     def config_tc(self):
1415         self.config_params({
1416             'ike-crypto': ('AES-CBC', 16),
1417             'ike-integ': 'SHA2-256-128',
1418             'esp-crypto': ('AES-CBC', 24),
1419             'esp-integ': 'SHA2-384-192',
1420             'ike-dh': '2048MODPgr'})
1421
1422
1423 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1424         (TemplateResponder, Ikev2Params):
1425     """
1426     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1427     """
1428     def config_tc(self):
1429         self.config_params({
1430             'ike-crypto': ('AES-CBC', 32),
1431             'ike-integ': 'SHA2-256-128',
1432             'esp-crypto': ('AES-GCM-16ICV', 32),
1433             'esp-integ': 'NULL',
1434             'ike-dh': '3072MODPgr'})
1435
1436
1437 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1438     """
1439     IKE:AES_GCM_16_256
1440     """
1441     def config_tc(self):
1442         self.config_params({
1443             'ip6': True,
1444             'natt': True,
1445             'ike-crypto': ('AES-GCM-16ICV', 32),
1446             'ike-integ': 'NULL',
1447             'ike-dh': '2048MODPgr',
1448             'loc_ts': {'start_addr': 'ab:cd::0',
1449                        'end_addr': 'ab:cd::10'},
1450             'rem_ts': {'start_addr': '11::0',
1451                        'end_addr': '11::100'}})
1452
1453
1454 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1455     """ malformed packet test """
1456
1457     def tearDown(self):
1458         pass
1459
1460     def config_tc(self):
1461         self.config_params()
1462
1463     def assert_counter(self, count, name, version='ip4'):
1464         node_name = '/err/ikev2-%s/' % version + name
1465         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1466
1467     def create_ike_init_msg(self, length=None, payload=None):
1468         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1469                           flags='Initiator', exch_type='IKE_SA_INIT')
1470         if payload is not None:
1471             msg /= payload
1472         return self.create_packet(self.pg0, msg, self.sa.sport,
1473                                   self.sa.dport)
1474
1475     def verify_bad_packet_length(self):
1476         ike_msg = self.create_ike_init_msg(length=0xdead)
1477         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1478         self.assert_counter(self.pkt_count, 'Bad packet length')
1479
1480     def verify_bad_sa_payload_length(self):
1481         p = ikev2.IKEv2_payload_SA(length=0xdead)
1482         ike_msg = self.create_ike_init_msg(payload=p)
1483         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1484         self.assert_counter(self.pkt_count, 'Malformed packet')
1485
1486     def test_responder(self):
1487         self.pkt_count = 254
1488         self.verify_bad_packet_length()
1489         self.verify_bad_sa_payload_length()
1490
1491
1492 if __name__ == '__main__':
1493     unittest.main(testRunner=VppTestRunner)