ikev2: better packet parsing functions
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from cryptography import x509
3 from cryptography.hazmat.backends import default_backend
4 from cryptography.hazmat.primitives import hashes, hmac
5 from cryptography.hazmat.primitives.asymmetric import dh, padding
6 from cryptography.hazmat.primitives.serialization import load_pem_private_key
7 from cryptography.hazmat.primitives.ciphers import (
8     Cipher,
9     algorithms,
10     modes,
11 )
12 from ipaddress import IPv4Address
13 from scapy.layers.ipsec import ESP
14 from scapy.layers.inet import IP, UDP, Ether
15 from scapy.packet import raw, Raw
16 from scapy.utils import long_converter
17 from framework import VppTestCase, VppTestRunner
18 from vpp_ikev2 import Profile, IDType, AuthMethod
19 from vpp_papi import VppEnum
20
21
22 KEY_PAD = b"Key Pad for IKEv2"
23 SALT_SIZE = 4
24 GCM_ICV_SIZE = 16
25 GCM_IV_SIZE = 8
26
27
28 # defined in rfc3526
29 # tuple structure is (p, g, key_len)
30 DH = {
31     '2048MODPgr': (long_converter("""
32     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
33     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
34     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
35     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
36     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
37     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
38     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
39     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
40     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
41     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
42     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
43
44     '3072MODPgr': (long_converter("""
45     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
56     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
57     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
58     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
59     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
60     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
61 }
62
63
64 class CryptoAlgo(object):
65     def __init__(self, name, cipher, mode):
66         self.name = name
67         self.cipher = cipher
68         self.mode = mode
69         if self.cipher is not None:
70             self.bs = self.cipher.block_size // 8
71
72             if self.name == 'AES-GCM-16ICV':
73                 self.iv_len = GCM_IV_SIZE
74             else:
75                 self.iv_len = self.bs
76
77     def encrypt(self, data, key, aad=None):
78         iv = os.urandom(self.iv_len)
79         if aad is None:
80             encryptor = Cipher(self.cipher(key), self.mode(iv),
81                                default_backend()).encryptor()
82             return iv + encryptor.update(data) + encryptor.finalize()
83         else:
84             salt = key[-SALT_SIZE:]
85             nonce = salt + iv
86             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
87                                default_backend()).encryptor()
88             encryptor.authenticate_additional_data(aad)
89             data = encryptor.update(data) + encryptor.finalize()
90             data += encryptor.tag[:GCM_ICV_SIZE]
91             return iv + data
92
93     def decrypt(self, data, key, aad=None, icv=None):
94         if aad is None:
95             iv = data[:self.iv_len]
96             ct = data[self.iv_len:]
97             decryptor = Cipher(algorithms.AES(key),
98                                self.mode(iv),
99                                default_backend()).decryptor()
100             return decryptor.update(ct) + decryptor.finalize()
101         else:
102             salt = key[-SALT_SIZE:]
103             nonce = salt + data[:GCM_IV_SIZE]
104             ct = data[GCM_IV_SIZE:]
105             key = key[:-SALT_SIZE]
106             decryptor = Cipher(algorithms.AES(key),
107                                self.mode(nonce, icv, len(icv)),
108                                default_backend()).decryptor()
109             decryptor.authenticate_additional_data(aad)
110             pt = decryptor.update(ct) + decryptor.finalize()
111             pad_len = pt[-1] + 1
112             return pt[:-pad_len]
113
114     def pad(self, data):
115         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
116         data = data + b'\x00' * (pad_len - 1)
117         return data + bytes([pad_len - 1])
118
119
120 class AuthAlgo(object):
121     def __init__(self, name, mac, mod, key_len, trunc_len=None):
122         self.name = name
123         self.mac = mac
124         self.mod = mod
125         self.key_len = key_len
126         self.trunc_len = trunc_len or key_len
127
128
129 CRYPTO_ALGOS = {
130     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
131     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
132     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
133                                 mode=modes.GCM),
134 }
135
136 AUTH_ALGOS = {
137     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
138     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
139     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
140     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
141     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
142 }
143
144 PRF_ALGOS = {
145     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
147                                   hashes.SHA256, 32),
148 }
149
150
151 class IKEv2ChildSA(object):
152     def __init__(self, local_ts, remote_ts, spi=None):
153         self.spi = spi or os.urandom(4)
154         self.local_ts = local_ts
155         self.remote_ts = remote_ts
156
157
158 class IKEv2SA(object):
159     def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
160                  i_id=None, r_id=None, id_type='fqdn', nonce=None,
161                  auth_data=None, local_ts=None, remote_ts=None,
162                  auth_method='shared-key', priv_key=None, natt=False):
163         self.natt = natt
164         if natt:
165             self.sport = 4500
166             self.dport = 4500
167         else:
168             self.sport = 500
169             self.dport = 500
170         self.msg_id = 0
171         self.dh_params = None
172         self.test = test
173         self.priv_key = priv_key
174         self.is_initiator = is_initiator
175         nonce = nonce or os.urandom(32)
176         self.auth_data = auth_data
177         self.i_id = i_id
178         self.r_id = r_id
179         if isinstance(id_type, str):
180             self.id_type = IDType.value(id_type)
181         else:
182             self.id_type = id_type
183         self.auth_method = auth_method
184         if self.is_initiator:
185             self.rspi = None
186             self.ispi = spi
187             self.i_nonce = nonce
188         else:
189             self.rspi = spi
190             self.ispi = None
191             self.r_nonce = None
192         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
193
194     def new_msg_id(self):
195         self.msg_id += 1
196         return self.msg_id
197
198     def dh_pub_key(self):
199         return self.i_dh_data
200
201     def compute_secret(self):
202         priv = self.dh_private_key
203         peer = self.r_dh_data
204         p, g, l = self.ike_group
205         return pow(int.from_bytes(peer, 'big'),
206                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
207
208     def generate_dh_data(self):
209         # generate DH keys
210         if self.is_initiator:
211             if self.ike_dh not in DH:
212                 raise NotImplementedError('%s not in DH group' % self.ike_dh)
213             if self.dh_params is None:
214                 dhg = DH[self.ike_dh]
215                 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
216                 self.dh_params = pn.parameters(default_backend())
217             priv = self.dh_params.generate_private_key()
218             pub = priv.public_key()
219             x = priv.private_numbers().x
220             self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
221             y = pub.public_numbers().y
222             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
223
224     def complete_dh_data(self):
225         self.dh_shared_secret = self.compute_secret()
226
227     def calc_child_keys(self):
228         prf = self.ike_prf_alg.mod()
229         s = self.i_nonce + self.r_nonce
230         c = self.child_sas[0]
231
232         encr_key_len = self.esp_crypto_key_len
233         integ_key_len = self.esp_integ_alg.key_len
234         salt_len = 0 if integ_key_len else 4
235
236         l = (integ_key_len * 2 +
237              encr_key_len * 2 +
238              salt_len * 2)
239         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
240
241         pos = 0
242         c.sk_ei = keymat[pos:pos+encr_key_len]
243         pos += encr_key_len
244
245         if integ_key_len:
246             c.sk_ai = keymat[pos:pos+integ_key_len]
247             pos += integ_key_len
248         else:
249             c.salt_ei = keymat[pos:pos+salt_len]
250             pos += salt_len
251
252         c.sk_er = keymat[pos:pos+encr_key_len]
253         pos += encr_key_len
254
255         if integ_key_len:
256             c.sk_ar = keymat[pos:pos+integ_key_len]
257             pos += integ_key_len
258         else:
259             c.salt_er = keymat[pos:pos+salt_len]
260             pos += salt_len
261
262     def calc_prfplus(self, prf, key, seed, length):
263         r = b''
264         t = None
265         x = 1
266         while len(r) < length and x < 255:
267             if t is not None:
268                 s = t
269             else:
270                 s = b''
271             s = s + seed + bytes([x])
272             t = self.calc_prf(prf, key, s)
273             r = r + t
274             x = x + 1
275
276         if x == 255:
277             return None
278         return r
279
280     def calc_prf(self, prf, key, data):
281         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
282         h.update(data)
283         return h.finalize()
284
285     def calc_keys(self):
286         prf = self.ike_prf_alg.mod()
287         # SKEYSEED = prf(Ni | Nr, g^ir)
288         s = self.i_nonce + self.r_nonce
289         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
290
291         # calculate S = Ni | Nr | SPIi SPIr
292         s = s + self.ispi + self.rspi
293
294         prf_key_trunc = self.ike_prf_alg.trunc_len
295         encr_key_len = self.ike_crypto_key_len
296         tr_prf_key_len = self.ike_prf_alg.key_len
297         integ_key_len = self.ike_integ_alg.key_len
298         if integ_key_len == 0:
299             salt_size = 4
300         else:
301             salt_size = 0
302
303         l = (prf_key_trunc +
304              integ_key_len * 2 +
305              encr_key_len * 2 +
306              tr_prf_key_len * 2 +
307              salt_size * 2)
308         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
309
310         pos = 0
311         self.sk_d = keymat[:pos+prf_key_trunc]
312         pos += prf_key_trunc
313
314         self.sk_ai = keymat[pos:pos+integ_key_len]
315         pos += integ_key_len
316         self.sk_ar = keymat[pos:pos+integ_key_len]
317         pos += integ_key_len
318
319         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
320         pos += encr_key_len + salt_size
321         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
322         pos += encr_key_len + salt_size
323
324         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
325         pos += tr_prf_key_len
326         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
327
328     def generate_authmsg(self, prf, packet):
329         if self.is_initiator:
330             id = self.i_id
331             nonce = self.r_nonce
332             key = self.sk_pi
333         data = bytes([self.id_type, 0, 0, 0]) + id
334         id_hash = self.calc_prf(prf, key, data)
335         return packet + nonce + id_hash
336
337     def auth_init(self):
338         prf = self.ike_prf_alg.mod()
339         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
340         if self.auth_method == 'shared-key':
341             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
342             self.auth_data = self.calc_prf(prf, psk, authmsg)
343         elif self.auth_method == 'rsa-sig':
344             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
345                                                 hashes.SHA1())
346         else:
347             raise TypeError('unknown auth method type!')
348
349     def encrypt(self, data, aad=None):
350         data = self.ike_crypto_alg.pad(data)
351         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
352
353     @property
354     def peer_authkey(self):
355         if self.is_initiator:
356             return self.sk_ar
357         return self.sk_ai
358
359     @property
360     def my_authkey(self):
361         if self.is_initiator:
362             return self.sk_ai
363         return self.sk_ar
364
365     @property
366     def my_cryptokey(self):
367         if self.is_initiator:
368             return self.sk_ei
369         return self.sk_er
370
371     @property
372     def peer_cryptokey(self):
373         if self.is_initiator:
374             return self.sk_er
375         return self.sk_ei
376
377     def concat(self, alg, key_len):
378         return alg + '-' + str(key_len * 8)
379
380     @property
381     def vpp_ike_cypto_alg(self):
382         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
383
384     @property
385     def vpp_esp_cypto_alg(self):
386         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
387
388     def verify_hmac(self, ikemsg):
389         integ_trunc = self.ike_integ_alg.trunc_len
390         exp_hmac = ikemsg[-integ_trunc:]
391         data = ikemsg[:-integ_trunc]
392         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
393                                           self.peer_authkey, data)
394         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
395
396     def compute_hmac(self, integ, key, data):
397         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
398         h.update(data)
399         return h.finalize()
400
401     def decrypt(self, data, aad=None, icv=None):
402         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
403
404     def hmac_and_decrypt(self, ike):
405         ep = ike[ikev2.IKEv2_payload_Encrypted]
406         if self.ike_crypto == 'AES-GCM-16ICV':
407             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
408             ct = ep.load[:-GCM_ICV_SIZE]
409             tag = ep.load[-GCM_ICV_SIZE:]
410             return self.decrypt(ct, raw(ike)[:aad_len], tag)
411         else:
412             self.verify_hmac(raw(ike))
413             integ_trunc = self.ike_integ_alg.trunc_len
414
415             # remove ICV and decrypt payload
416             ct = ep.load[:-integ_trunc]
417             return self.decrypt(ct)
418
419     def generate_ts(self):
420         c = self.child_sas[0]
421         ts1 = ikev2.IPv4TrafficSelector(
422                 IP_protocol_ID=0,
423                 start_port=0,
424                 end_port=0xffff,
425                 starting_address_v4=c.local_ts['start_addr'],
426                 ending_address_v4=c.local_ts['end_addr'])
427         ts2 = ikev2.IPv4TrafficSelector(
428                 IP_protocol_ID=0,
429                 starting_address_v4=c.remote_ts['start_addr'],
430                 ending_address_v4=c.remote_ts['end_addr'])
431         return ([ts1], [ts2])
432
433     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
434         if crypto not in CRYPTO_ALGOS:
435             raise TypeError('unsupported encryption algo %r' % crypto)
436         self.ike_crypto = crypto
437         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
438         self.ike_crypto_key_len = crypto_key_len
439
440         if integ not in AUTH_ALGOS:
441             raise TypeError('unsupported auth algo %r' % integ)
442         self.ike_integ = None if integ == 'NULL' else integ
443         self.ike_integ_alg = AUTH_ALGOS[integ]
444
445         if prf not in PRF_ALGOS:
446             raise TypeError('unsupported prf algo %r' % prf)
447         self.ike_prf = prf
448         self.ike_prf_alg = PRF_ALGOS[prf]
449         self.ike_dh = dh
450         self.ike_group = DH[self.ike_dh]
451
452     def set_esp_props(self, crypto, crypto_key_len, integ):
453         self.esp_crypto_key_len = crypto_key_len
454         if crypto not in CRYPTO_ALGOS:
455             raise TypeError('unsupported encryption algo %r' % crypto)
456         self.esp_crypto = crypto
457         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
458
459         if integ not in AUTH_ALGOS:
460             raise TypeError('unsupported auth algo %r' % integ)
461         self.esp_integ = None if integ == 'NULL' else integ
462         self.esp_integ_alg = AUTH_ALGOS[integ]
463
464     def crypto_attr(self, key_len):
465         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
466             return (0x800e << 16 | key_len << 3, 12)
467         else:
468             raise Exception('unsupported attribute type')
469
470     def ike_crypto_attr(self):
471         return self.crypto_attr(self.ike_crypto_key_len)
472
473     def esp_crypto_attr(self):
474         return self.crypto_attr(self.esp_crypto_key_len)
475
476     def compute_nat_sha1(self, ip, port):
477         data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
478         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
479         digest.update(data)
480         return digest.finalize()
481
482
483 class TemplateResponder(VppTestCase):
484     """ responder test template """
485
486     @classmethod
487     def setUpClass(cls):
488         import scapy.contrib.ikev2 as _ikev2
489         globals()['ikev2'] = _ikev2
490         super(TemplateResponder, cls).setUpClass()
491         cls.create_pg_interfaces(range(2))
492         for i in cls.pg_interfaces:
493             i.admin_up()
494             i.config_ip4()
495             i.resolve_arp()
496
497     @classmethod
498     def tearDownClass(cls):
499         super(TemplateResponder, cls).tearDownClass()
500
501     def setUp(self):
502         super(TemplateResponder, self).setUp()
503         self.config_tc()
504         self.p.add_vpp_config()
505         self.assertIsNotNone(self.p.query_vpp_config())
506         self.sa.generate_dh_data()
507
508     def tearDown(self):
509         super(TemplateResponder, self).tearDown()
510         if self.sa.is_initiator:
511             self.initiate_del_sa()
512             r = self.vapi.ikev2_sa_dump()
513             self.assertEqual(len(r), 0)
514
515         self.p.remove_vpp_config()
516         self.assertIsNone(self.p.query_vpp_config())
517
518     def verify_del_sa(self, packet):
519         ih = self.get_ike_header(packet)
520         self.assertEqual(ih.id, self.sa.msg_id)
521         self.assertEqual(ih.exch_type, 37)  # exchange informational
522
523     def initiate_del_sa(self):
524         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
525                              flags='Initiator', exch_type='INFORMATIONAL',
526                              id=self.sa.new_msg_id())
527         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
528         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
529         packet = self.create_packet(self.pg0, ike_msg,
530                                     self.sa.sport, self.sa.dport,
531                                     self.sa.natt)
532         self.pg0.add_stream(packet)
533         self.pg0.enable_capture()
534         self.pg_start()
535         capture = self.pg0.get_capture(1)
536         self.verify_del_sa(capture[0])
537
538     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False):
539         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
540                IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
541                UDP(sport=sport, dport=dport))
542         if natt:
543             # insert non ESP marker
544             res = res / Raw(b'\x00' * 4)
545         return res / msg
546
547     def send_sa_init(self, behind_nat=False):
548         tr_attr = self.sa.ike_crypto_attr()
549         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
550                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
551                  key_length=tr_attr[0]) /
552                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
553                  transform_id=self.sa.ike_integ) /
554                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
555                  transform_id=self.sa.ike_prf_alg.name) /
556                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
557                  transform_id=self.sa.ike_dh))
558
559         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
560                  trans_nb=4, trans=trans))
561
562         if behind_nat:
563             next_payload = 'Notify'
564         else:
565             next_payload = None
566
567         self.sa.init_req_packet = (
568                 ikev2.IKEv2(init_SPI=self.sa.ispi,
569                             flags='Initiator', exch_type='IKE_SA_INIT') /
570                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
571                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
572                                        group=self.sa.ike_dh,
573                                        load=self.sa.dh_pub_key()) /
574                 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
575                                           load=self.sa.i_nonce))
576
577         if behind_nat:
578             src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
579                                                self.sa.sport)
580             nat_detection = ikev2.IKEv2_payload_Notify(
581                     type='NAT_DETECTION_SOURCE_IP',
582                     load=src_nat)
583             self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
584
585         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
586                                      self.sa.sport, self.sa.dport,
587                                      self.sa.natt)
588         self.pg0.add_stream(ike_msg)
589         self.pg0.enable_capture()
590         self.pg_start()
591         capture = self.pg0.get_capture(1)
592         self.verify_sa_init(capture[0])
593
594     def encrypt_ike_msg(self, header, plain, first_payload):
595         if self.sa.ike_crypto == 'AES-GCM-16ICV':
596             data = self.sa.ike_crypto_alg.pad(raw(plain))
597             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
598                 len(ikev2.IKEv2_payload_Encrypted())
599             tlen = plen + len(ikev2.IKEv2())
600
601             # prepare aad data
602             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
603                                                  length=plen)
604             header.length = tlen
605             res = header / sk_p
606             encr = self.sa.encrypt(raw(plain), raw(res))
607             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
608                                                  length=plen, load=encr)
609             res = header / sk_p
610         else:
611             encr = self.sa.encrypt(raw(plain))
612             trunc_len = self.sa.ike_integ_alg.trunc_len
613             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
614             tlen = plen + len(ikev2.IKEv2())
615
616             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
617                                                  length=plen, load=encr)
618             header.length = tlen
619             res = header / sk_p
620
621             integ_data = raw(res)
622             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
623                                              self.sa.my_authkey, integ_data)
624             res = res / Raw(hmac_data[:trunc_len])
625         assert(len(res) == tlen)
626         return res
627
628     def send_sa_auth(self):
629         tr_attr = self.sa.esp_crypto_attr()
630         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
631                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
632                  key_length=tr_attr[0]) /
633                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
634                  transform_id=self.sa.esp_integ) /
635                  ikev2.IKEv2_payload_Transform(
636                  transform_type='Extended Sequence Number',
637                  transform_id='No ESN') /
638                  ikev2.IKEv2_payload_Transform(
639                  transform_type='Extended Sequence Number',
640                  transform_id='ESN'))
641
642         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
643                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
644
645         tsi, tsr = self.sa.generate_ts()
646         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
647                  IDtype=self.sa.id_type, load=self.sa.i_id) /
648                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
649                  IDtype=self.sa.id_type, load=self.sa.r_id) /
650                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
651                  auth_type=AuthMethod.value(self.sa.auth_method),
652                  load=self.sa.auth_data) /
653                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
654                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
655                  number_of_TSs=len(tsi),
656                  traffic_selector=tsi) /
657                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
658                  number_of_TSs=len(tsr),
659                  traffic_selector=tsr) /
660                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
661
662         header = ikev2.IKEv2(
663                 init_SPI=self.sa.ispi,
664                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
665                 flags='Initiator', exch_type='IKE_AUTH')
666
667         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
668         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
669                                     self.sa.dport, self.sa.natt)
670         self.pg0.add_stream(packet)
671         self.pg0.enable_capture()
672         self.pg_start()
673         capture = self.pg0.get_capture(1)
674         self.verify_sa_auth(capture[0])
675
676     def get_ike_header(self, packet):
677         try:
678             ih = packet[ikev2.IKEv2]
679         except IndexError as e:
680             # this is a workaround for getting IKEv2 layer as both ikev2 and
681             # ipsec register for port 4500
682             esp = packet[ESP]
683             ih = self.verify_and_remove_non_esp_marker(esp)
684         return ih
685
686     def verify_sa_init(self, packet):
687         ih = self.get_ike_header(packet)
688
689         self.assertEqual(ih.id, self.sa.msg_id)
690         self.assertEqual(ih.exch_type, 34)
691         self.assertTrue('Response' in ih.flags)
692         self.assertEqual(ih.init_SPI, self.sa.ispi)
693         self.assertNotEqual(ih.resp_SPI, 0)
694         self.sa.rspi = ih.resp_SPI
695         try:
696             sa = ih[ikev2.IKEv2_payload_SA]
697             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
698             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
699         except IndexError as e:
700             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
701             self.logger.error(ih.show())
702             raise
703         self.sa.complete_dh_data()
704         self.sa.calc_keys()
705         self.sa.auth_init()
706
707     def verify_and_remove_non_esp_marker(self, packet):
708         if self.sa.natt:
709             # if we are in nat traversal mode check for non esp marker
710             # and remove it
711             data = raw(packet)
712             self.assertEqual(data[:4], b'\x00' * 4)
713             return ikev2.IKEv2(data[4:])
714         else:
715             return packet
716
717     def verify_udp(self, udp):
718         self.assertEqual(udp.sport, self.sa.sport)
719         self.assertEqual(udp.dport, self.sa.dport)
720
721     def verify_sa_auth(self, packet):
722         ike = self.get_ike_header(packet)
723         udp = packet[UDP]
724         self.verify_udp(udp)
725         self.assertEqual(ike.id, self.sa.msg_id)
726         plain = self.sa.hmac_and_decrypt(ike)
727         self.sa.calc_child_keys()
728
729     def verify_ipsec_sas(self):
730         sas = self.vapi.ipsec_sa_dump()
731         self.assertEqual(len(sas), 2)
732         sa0 = sas[0].entry
733         sa1 = sas[1].entry
734         c = self.sa.child_sas[0]
735
736         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
737         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
738         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
739
740         if self.sa.esp_integ is None:
741             vpp_integ_alg = 0
742         else:
743             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
744         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
745         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
746
747         # verify crypto keys
748         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
749         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
750         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
751         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
752
753         # verify integ keys
754         if vpp_integ_alg:
755             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
756             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
757             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
758             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
759         else:
760             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
761             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
762
763     def verify_keymat(self, api_keys, keys, name):
764         km = getattr(keys, name)
765         api_km = getattr(api_keys, name)
766         api_km_len = getattr(api_keys, name + '_len')
767         self.assertEqual(len(km), api_km_len)
768         self.assertEqual(km, api_km[:api_km_len])
769
770     def verify_id(self, api_id, exp_id):
771         self.assertEqual(api_id.type, IDType.value(exp_id.type))
772         self.assertEqual(api_id.data_len, exp_id.data_len)
773         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
774
775     def verify_ike_sas(self):
776         r = self.vapi.ikev2_sa_dump()
777         self.assertEqual(len(r), 1)
778         sa = r[0].sa
779         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'little'))
780         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
781         self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
782         self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
783         self.verify_keymat(sa.keys, self.sa, 'sk_d')
784         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
785         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
786         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
787         self.verify_keymat(sa.keys, self.sa, 'sk_er')
788         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
789         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
790
791         self.assertEqual(sa.i_id.type, self.sa.id_type)
792         self.assertEqual(sa.r_id.type, self.sa.id_type)
793         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
794         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
795         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
796         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
797
798         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
799         self.assertEqual(len(r), 1)
800         csa = r[0].child_sa
801         self.assertEqual(csa.sa_index, sa.sa_index)
802         c = self.sa.child_sas[0]
803         if hasattr(c, 'sk_ai'):
804             self.verify_keymat(csa.keys, c, 'sk_ai')
805             self.verify_keymat(csa.keys, c, 'sk_ar')
806         self.verify_keymat(csa.keys, c, 'sk_ei')
807         self.verify_keymat(csa.keys, c, 'sk_er')
808
809         tsi, tsr = self.sa.generate_ts()
810         tsi = tsi[0]
811         tsr = tsr[0]
812         r = self.vapi.ikev2_traffic_selector_dump(
813                 is_initiator=True, sa_index=sa.sa_index,
814                 child_sa_index=csa.child_sa_index)
815         self.assertEqual(len(r), 1)
816         ts = r[0].ts
817         self.verify_ts(r[0].ts, tsi[0], True)
818
819         r = self.vapi.ikev2_traffic_selector_dump(
820                 is_initiator=False, sa_index=sa.sa_index,
821                 child_sa_index=csa.child_sa_index)
822         self.assertEqual(len(r), 1)
823         self.verify_ts(r[0].ts, tsr[0], False)
824
825         n = self.vapi.ikev2_nonce_get(is_initiator=True,
826                                       sa_index=sa.sa_index)
827         self.verify_nonce(n, self.sa.i_nonce)
828         n = self.vapi.ikev2_nonce_get(is_initiator=False,
829                                       sa_index=sa.sa_index)
830         self.verify_nonce(n, self.sa.r_nonce)
831
832     def verify_nonce(self, api_nonce, nonce):
833         self.assertEqual(api_nonce.data_len, len(nonce))
834         self.assertEqual(api_nonce.nonce, nonce)
835
836     def verify_ts(self, api_ts, ts, is_initiator):
837         if is_initiator:
838             self.assertTrue(api_ts.is_local)
839         else:
840             self.assertFalse(api_ts.is_local)
841         self.assertEqual(api_ts.start_addr,
842                          IPv4Address(ts.starting_address_v4))
843         self.assertEqual(api_ts.end_addr,
844                          IPv4Address(ts.ending_address_v4))
845         self.assertEqual(api_ts.start_port, ts.start_port)
846         self.assertEqual(api_ts.end_port, ts.end_port)
847         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
848
849     def test_responder(self):
850         self.send_sa_init(self.sa.natt)
851         self.send_sa_auth()
852         self.verify_ipsec_sas()
853         self.verify_ike_sas()
854
855
856 class Ikev2Params(object):
857     def config_params(self, params={}):
858         ec = VppEnum.vl_api_ipsec_crypto_alg_t
859         ei = VppEnum.vl_api_ipsec_integ_alg_t
860         self.vpp_enums = {
861                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
862                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
863                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
864                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
865                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
866                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
867
868                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
869                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
870                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
871                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
872
873         is_natt = 'natt' in params and params['natt'] or False
874         self.p = Profile(self, 'pr1')
875
876         if 'auth' in params and params['auth'] == 'rsa-sig':
877             auth_method = 'rsa-sig'
878             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
879             self.vapi.ikev2_set_local_key(
880                     key_file=work_dir + params['server-key'])
881
882             client_file = work_dir + params['client-cert']
883             server_pem = open(work_dir + params['server-cert']).read()
884             client_priv = open(work_dir + params['client-key']).read()
885             client_priv = load_pem_private_key(str.encode(client_priv), None,
886                                                default_backend())
887             self.peer_cert = x509.load_pem_x509_certificate(
888                     str.encode(server_pem),
889                     default_backend())
890             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
891             auth_data = None
892         else:
893             auth_data = b'$3cr3tpa$$w0rd'
894             self.p.add_auth(method='shared-key', data=auth_data)
895             auth_method = 'shared-key'
896             client_priv = None
897
898         self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
899         self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
900         self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
901         self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
902
903         self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
904                           r_id=self.p.local_id['data'],
905                           id_type=self.p.local_id['id_type'], natt=is_natt,
906                           priv_key=client_priv, auth_method=auth_method,
907                           auth_data=auth_data,
908                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
909
910         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
911             params['ike-crypto']
912         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
913             params['ike-integ']
914         ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
915
916         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
917             params['esp-crypto']
918         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
919             params['esp-integ']
920
921         self.sa.set_ike_props(
922                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
923                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
924         self.sa.set_esp_props(
925                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
926                 integ=esp_integ)
927
928
929 class TestApi(VppTestCase):
930     """ Test IKEV2 API """
931     @classmethod
932     def setUpClass(cls):
933         super(TestApi, cls).setUpClass()
934
935     @classmethod
936     def tearDownClass(cls):
937         super(TestApi, cls).tearDownClass()
938
939     def tearDown(self):
940         super(TestApi, self).tearDown()
941         self.p1.remove_vpp_config()
942         self.p2.remove_vpp_config()
943         r = self.vapi.ikev2_profile_dump()
944         self.assertEqual(len(r), 0)
945
946     def configure_profile(self, cfg):
947         p = Profile(self, cfg['name'])
948         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
949         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
950         p.add_local_ts(**cfg['loc_ts'])
951         p.add_remote_ts(**cfg['rem_ts'])
952         p.add_responder(cfg['responder'])
953         p.add_ike_transforms(cfg['ike_ts'])
954         p.add_esp_transforms(cfg['esp_ts'])
955         p.add_auth(**cfg['auth'])
956         p.set_udp_encap(cfg['udp_encap'])
957         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
958         if 'lifetime_data' in cfg:
959             p.set_lifetime_data(cfg['lifetime_data'])
960         if 'tun_itf' in cfg:
961             p.set_tunnel_interface(cfg['tun_itf'])
962         p.add_vpp_config()
963         return p
964
965     def test_profile_api(self):
966         """ test profile dump API """
967         loc_ts = {
968                     'proto': 8,
969                     'start_port': 1,
970                     'end_port': 19,
971                     'start_addr': '3.3.3.2',
972                     'end_addr': '3.3.3.3',
973                 }
974         rem_ts = {
975                     'proto': 9,
976                     'start_port': 10,
977                     'end_port': 119,
978                     'start_addr': '4.5.76.80',
979                     'end_addr': '2.3.4.6',
980                 }
981
982         conf = {
983             'p1': {
984                 'name': 'p1',
985                 'loc_id': ('fqdn', b'vpp.home'),
986                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
987                 'loc_ts': loc_ts,
988                 'rem_ts': rem_ts,
989                 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
990                 'ike_ts': {
991                         'crypto_alg': 20,
992                         'crypto_key_size': 32,
993                         'integ_alg': 1,
994                         'dh_group': 1},
995                 'esp_ts': {
996                         'crypto_alg': 13,
997                         'crypto_key_size': 24,
998                         'integ_alg': 2},
999                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1000                 'udp_encap': True,
1001                 'ipsec_over_udp_port': 4501,
1002                 'lifetime_data': {
1003                     'lifetime': 123,
1004                     'lifetime_maxdata': 20192,
1005                     'lifetime_jitter': 9,
1006                     'handover': 132},
1007             },
1008             'p2': {
1009                 'name': 'p2',
1010                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1011                 'rem_id': ('ip4-addr', b'192.168.2.2'),
1012                 'loc_ts': loc_ts,
1013                 'rem_ts': rem_ts,
1014                 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
1015                 'ike_ts': {
1016                         'crypto_alg': 12,
1017                         'crypto_key_size': 16,
1018                         'integ_alg': 3,
1019                         'dh_group': 3},
1020                 'esp_ts': {
1021                         'crypto_alg': 9,
1022                         'crypto_key_size': 24,
1023                         'integ_alg': 4},
1024                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1025                 'udp_encap': False,
1026                 'ipsec_over_udp_port': 4600,
1027                 'tun_itf': 0}
1028         }
1029         self.p1 = self.configure_profile(conf['p1'])
1030         self.p2 = self.configure_profile(conf['p2'])
1031
1032         r = self.vapi.ikev2_profile_dump()
1033         self.assertEqual(len(r), 2)
1034         self.verify_profile(r[0].profile, conf['p1'])
1035         self.verify_profile(r[1].profile, conf['p2'])
1036
1037     def verify_id(self, api_id, cfg_id):
1038         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1039         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1040
1041     def verify_ts(self, api_ts, cfg_ts):
1042         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1043         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1044         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1045         self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
1046         self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
1047
1048     def verify_responder(self, api_r, cfg_r):
1049         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1050         self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
1051
1052     def verify_transforms(self, api_ts, cfg_ts):
1053         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1054         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1055         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1056
1057     def verify_ike_transforms(self, api_ts, cfg_ts):
1058         self.verify_transforms(api_ts, cfg_ts)
1059         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1060
1061     def verify_esp_transforms(self, api_ts, cfg_ts):
1062         self.verify_transforms(api_ts, cfg_ts)
1063
1064     def verify_auth(self, api_auth, cfg_auth):
1065         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1066         self.assertEqual(api_auth.data, cfg_auth['data'])
1067         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1068
1069     def verify_lifetime_data(self, p, ld):
1070         self.assertEqual(p.lifetime, ld['lifetime'])
1071         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1072         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1073         self.assertEqual(p.handover, ld['handover'])
1074
1075     def verify_profile(self, ap, cp):
1076         self.assertEqual(ap.name, cp['name'])
1077         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1078         self.verify_id(ap.loc_id, cp['loc_id'])
1079         self.verify_id(ap.rem_id, cp['rem_id'])
1080         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1081         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1082         self.verify_responder(ap.responder, cp['responder'])
1083         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1084         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1085         self.verify_auth(ap.auth, cp['auth'])
1086         if 'lifetime_data' in cp:
1087             self.verify_lifetime_data(ap, cp['lifetime_data'])
1088         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1089         if 'tun_itf' in cp:
1090             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1091         else:
1092             self.assertEqual(ap.tun_itf, 0xffffffff)
1093
1094
1095 class TestResponderNATT(TemplateResponder, Ikev2Params):
1096     """ test ikev2 responder - nat traversal """
1097     def config_tc(self):
1098         self.config_params(
1099                 {'natt': True})
1100
1101
1102 class TestResponderPsk(TemplateResponder, Ikev2Params):
1103     """ test ikev2 responder - pre shared key auth """
1104     def config_tc(self):
1105         self.config_params()
1106
1107
1108 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1109     """ test ikev2 responder - cert based auth """
1110     def config_tc(self):
1111         self.config_params({
1112             'auth': 'rsa-sig',
1113             'server-key': 'server-key.pem',
1114             'client-key': 'client-key.pem',
1115             'client-cert': 'client-cert.pem',
1116             'server-cert': 'server-cert.pem'})
1117
1118
1119 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1120         (TemplateResponder, Ikev2Params):
1121     """
1122     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1123     """
1124     def config_tc(self):
1125         self.config_params({
1126             'ike-crypto': ('AES-CBC', 16),
1127             'ike-integ': 'SHA2-256-128',
1128             'esp-crypto': ('AES-CBC', 24),
1129             'esp-integ': 'SHA2-384-192',
1130             'ike-dh': '2048MODPgr'})
1131
1132
1133 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1134         (TemplateResponder, Ikev2Params):
1135     """
1136     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1137     """
1138     def config_tc(self):
1139         self.config_params({
1140             'ike-crypto': ('AES-CBC', 32),
1141             'ike-integ': 'SHA2-256-128',
1142             'esp-crypto': ('AES-GCM-16ICV', 32),
1143             'esp-integ': 'NULL',
1144             'ike-dh': '3072MODPgr'})
1145
1146
1147 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1148     """
1149     IKE:AES_GCM_16_256
1150     """
1151     def config_tc(self):
1152         self.config_params({
1153             'ike-crypto': ('AES-GCM-16ICV', 32),
1154             'ike-integ': 'NULL',
1155             'ike-dh': '2048MODPgr'})
1156
1157
1158 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1159     """ malformed packet test """
1160
1161     def tearDown(self):
1162         pass
1163
1164     def config_tc(self):
1165         self.config_params()
1166
1167     def assert_counter(self, count, name):
1168         node_name = '/err/ikev2/' + name
1169         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1170
1171     def create_ike_init_msg(self, length=None, payload=None):
1172         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1173                           flags='Initiator', exch_type='IKE_SA_INIT')
1174         if payload is not None:
1175             msg /= payload
1176         return self.create_packet(self.pg0, msg, self.sa.sport,
1177                                   self.sa.dport)
1178
1179     def verify_bad_packet_length(self):
1180         ike_msg = self.create_ike_init_msg(length=0xdead)
1181         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1182         self.assert_counter(self.pkt_count, 'Bad packet length')
1183
1184     def verify_bad_sa_payload_length(self):
1185         p = ikev2.IKEv2_payload_SA(length=0xdead)
1186         ike_msg = self.create_ike_init_msg(payload=p)
1187         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1188         self.assert_counter(self.pkt_count, 'Malformed packet')
1189
1190     def test_responder(self):
1191         self.pkt_count = 254
1192         self.verify_bad_packet_length()
1193         self.verify_bad_sa_payload_length()
1194
1195
1196 if __name__ == '__main__':
1197     unittest.main(testRunner=VppTestRunner)