ikev2: support sending requests from responder
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
1 import os
2 from socket import inet_pton
3 from cryptography import x509
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes, hmac
6 from cryptography.hazmat.primitives.asymmetric import dh, padding
7 from cryptography.hazmat.primitives.serialization import load_pem_private_key
8 from cryptography.hazmat.primitives.ciphers import (
9     Cipher,
10     algorithms,
11     modes,
12 )
13 from ipaddress import IPv4Address, IPv6Address, ip_address
14 from scapy.layers.ipsec import ESP
15 from scapy.layers.inet import IP, UDP, Ether
16 from scapy.layers.inet6 import IPv6
17 from scapy.packet import raw, Raw
18 from scapy.utils import long_converter
19 from framework import VppTestCase, VppTestRunner
20 from vpp_ikev2 import Profile, IDType, AuthMethod
21 from vpp_papi import VppEnum
22
23 try:
24     text_type = unicode
25 except NameError:
26     text_type = str
27
28 KEY_PAD = b"Key Pad for IKEv2"
29 SALT_SIZE = 4
30 GCM_ICV_SIZE = 16
31 GCM_IV_SIZE = 8
32
33
34 # defined in rfc3526
35 # tuple structure is (p, g, key_len)
36 DH = {
37     '2048MODPgr': (long_converter("""
38     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
39     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
40     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
41     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
42     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
43     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
44     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
45     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
46     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
47     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
48     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
49
50     '3072MODPgr': (long_converter("""
51     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
62     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
63     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
64     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
65     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
66     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
67 }
68
69
70 class CryptoAlgo(object):
71     def __init__(self, name, cipher, mode):
72         self.name = name
73         self.cipher = cipher
74         self.mode = mode
75         if self.cipher is not None:
76             self.bs = self.cipher.block_size // 8
77
78             if self.name == 'AES-GCM-16ICV':
79                 self.iv_len = GCM_IV_SIZE
80             else:
81                 self.iv_len = self.bs
82
83     def encrypt(self, data, key, aad=None):
84         iv = os.urandom(self.iv_len)
85         if aad is None:
86             encryptor = Cipher(self.cipher(key), self.mode(iv),
87                                default_backend()).encryptor()
88             return iv + encryptor.update(data) + encryptor.finalize()
89         else:
90             salt = key[-SALT_SIZE:]
91             nonce = salt + iv
92             encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
93                                default_backend()).encryptor()
94             encryptor.authenticate_additional_data(aad)
95             data = encryptor.update(data) + encryptor.finalize()
96             data += encryptor.tag[:GCM_ICV_SIZE]
97             return iv + data
98
99     def decrypt(self, data, key, aad=None, icv=None):
100         if aad is None:
101             iv = data[:self.iv_len]
102             ct = data[self.iv_len:]
103             decryptor = Cipher(algorithms.AES(key),
104                                self.mode(iv),
105                                default_backend()).decryptor()
106             return decryptor.update(ct) + decryptor.finalize()
107         else:
108             salt = key[-SALT_SIZE:]
109             nonce = salt + data[:GCM_IV_SIZE]
110             ct = data[GCM_IV_SIZE:]
111             key = key[:-SALT_SIZE]
112             decryptor = Cipher(algorithms.AES(key),
113                                self.mode(nonce, icv, len(icv)),
114                                default_backend()).decryptor()
115             decryptor.authenticate_additional_data(aad)
116             return decryptor.update(ct) + decryptor.finalize()
117
118     def pad(self, data):
119         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
120         data = data + b'\x00' * (pad_len - 1)
121         return data + bytes([pad_len - 1])
122
123
124 class AuthAlgo(object):
125     def __init__(self, name, mac, mod, key_len, trunc_len=None):
126         self.name = name
127         self.mac = mac
128         self.mod = mod
129         self.key_len = key_len
130         self.trunc_len = trunc_len or key_len
131
132
133 CRYPTO_ALGOS = {
134     'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
135     'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
136     'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
137                                 mode=modes.GCM),
138 }
139
140 AUTH_ALGOS = {
141     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
142     'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
143     'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
144     'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
145     'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
146 }
147
148 PRF_ALGOS = {
149     'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
150     'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
151                                   hashes.SHA256, 32),
152 }
153
154
155 class IKEv2ChildSA(object):
156     def __init__(self, local_ts, remote_ts, spi=None):
157         self.spi = spi or os.urandom(4)
158         self.local_ts = local_ts
159         self.remote_ts = remote_ts
160
161
162 class IKEv2SA(object):
163     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
164                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
165                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
166                  auth_method='shared-key', priv_key=None, natt=False):
167         self.natt = natt
168         if natt:
169             self.sport = 4500
170             self.dport = 4500
171         else:
172             self.sport = 500
173             self.dport = 500
174         self.msg_id = 0
175         self.dh_params = None
176         self.test = test
177         self.priv_key = priv_key
178         self.is_initiator = is_initiator
179         nonce = nonce or os.urandom(32)
180         self.auth_data = auth_data
181         self.i_id = i_id
182         self.r_id = r_id
183         if isinstance(id_type, str):
184             self.id_type = IDType.value(id_type)
185         else:
186             self.id_type = id_type
187         self.auth_method = auth_method
188         if self.is_initiator:
189             self.rspi = 8 * b'\x00'
190             self.ispi = spi
191             self.i_nonce = nonce
192         else:
193             self.rspi = spi
194             self.ispi = 8 * b'\x00'
195             self.r_nonce = nonce
196         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
197
198     def new_msg_id(self):
199         self.msg_id += 1
200         return self.msg_id
201
202     @property
203     def my_dh_pub_key(self):
204         if self.is_initiator:
205             return self.i_dh_data
206         return self.r_dh_data
207
208     @property
209     def peer_dh_pub_key(self):
210         if self.is_initiator:
211             return self.r_dh_data
212         return self.i_dh_data
213
214     def compute_secret(self):
215         priv = self.dh_private_key
216         peer = self.peer_dh_pub_key
217         p, g, l = self.ike_group
218         return pow(int.from_bytes(peer, 'big'),
219                    int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
220
221     def generate_dh_data(self):
222         # generate DH keys
223         if self.ike_dh not in DH:
224             raise NotImplementedError('%s not in DH group' % self.ike_dh)
225
226         if self.dh_params is None:
227             dhg = DH[self.ike_dh]
228             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
229             self.dh_params = pn.parameters(default_backend())
230
231         priv = self.dh_params.generate_private_key()
232         pub = priv.public_key()
233         x = priv.private_numbers().x
234         self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
235         y = pub.public_numbers().y
236
237         if self.is_initiator:
238             self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
239         else:
240             self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
241
242     def complete_dh_data(self):
243         self.dh_shared_secret = self.compute_secret()
244
245     def calc_child_keys(self):
246         prf = self.ike_prf_alg.mod()
247         s = self.i_nonce + self.r_nonce
248         c = self.child_sas[0]
249
250         encr_key_len = self.esp_crypto_key_len
251         integ_key_len = self.esp_integ_alg.key_len
252         salt_len = 0 if integ_key_len else 4
253
254         l = (integ_key_len * 2 +
255              encr_key_len * 2 +
256              salt_len * 2)
257         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
258
259         pos = 0
260         c.sk_ei = keymat[pos:pos+encr_key_len]
261         pos += encr_key_len
262
263         if integ_key_len:
264             c.sk_ai = keymat[pos:pos+integ_key_len]
265             pos += integ_key_len
266         else:
267             c.salt_ei = keymat[pos:pos+salt_len]
268             pos += salt_len
269
270         c.sk_er = keymat[pos:pos+encr_key_len]
271         pos += encr_key_len
272
273         if integ_key_len:
274             c.sk_ar = keymat[pos:pos+integ_key_len]
275             pos += integ_key_len
276         else:
277             c.salt_er = keymat[pos:pos+salt_len]
278             pos += salt_len
279
280     def calc_prfplus(self, prf, key, seed, length):
281         r = b''
282         t = None
283         x = 1
284         while len(r) < length and x < 255:
285             if t is not None:
286                 s = t
287             else:
288                 s = b''
289             s = s + seed + bytes([x])
290             t = self.calc_prf(prf, key, s)
291             r = r + t
292             x = x + 1
293
294         if x == 255:
295             return None
296         return r
297
298     def calc_prf(self, prf, key, data):
299         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
300         h.update(data)
301         return h.finalize()
302
303     def calc_keys(self):
304         prf = self.ike_prf_alg.mod()
305         # SKEYSEED = prf(Ni | Nr, g^ir)
306         s = self.i_nonce + self.r_nonce
307         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
308
309         # calculate S = Ni | Nr | SPIi SPIr
310         s = s + self.ispi + self.rspi
311
312         prf_key_trunc = self.ike_prf_alg.trunc_len
313         encr_key_len = self.ike_crypto_key_len
314         tr_prf_key_len = self.ike_prf_alg.key_len
315         integ_key_len = self.ike_integ_alg.key_len
316         if integ_key_len == 0:
317             salt_size = 4
318         else:
319             salt_size = 0
320
321         l = (prf_key_trunc +
322              integ_key_len * 2 +
323              encr_key_len * 2 +
324              tr_prf_key_len * 2 +
325              salt_size * 2)
326         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
327
328         pos = 0
329         self.sk_d = keymat[:pos+prf_key_trunc]
330         pos += prf_key_trunc
331
332         self.sk_ai = keymat[pos:pos+integ_key_len]
333         pos += integ_key_len
334         self.sk_ar = keymat[pos:pos+integ_key_len]
335         pos += integ_key_len
336
337         self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
338         pos += encr_key_len + salt_size
339         self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
340         pos += encr_key_len + salt_size
341
342         self.sk_pi = keymat[pos:pos+tr_prf_key_len]
343         pos += tr_prf_key_len
344         self.sk_pr = keymat[pos:pos+tr_prf_key_len]
345
346     def generate_authmsg(self, prf, packet):
347         if self.is_initiator:
348             id = self.i_id
349             nonce = self.r_nonce
350             key = self.sk_pi
351         else:
352             id = self.r_id
353             nonce = self.i_nonce
354             key = self.sk_pr
355         data = bytes([self.id_type, 0, 0, 0]) + id
356         id_hash = self.calc_prf(prf, key, data)
357         return packet + nonce + id_hash
358
359     def auth_init(self):
360         prf = self.ike_prf_alg.mod()
361         if self.is_initiator:
362             packet = self.init_req_packet
363         else:
364             packet = self.init_resp_packet
365         authmsg = self.generate_authmsg(prf, raw(packet))
366         if self.auth_method == 'shared-key':
367             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
368             self.auth_data = self.calc_prf(prf, psk, authmsg)
369         elif self.auth_method == 'rsa-sig':
370             self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
371                                                 hashes.SHA1())
372         else:
373             raise TypeError('unknown auth method type!')
374
375     def encrypt(self, data, aad=None):
376         data = self.ike_crypto_alg.pad(data)
377         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
378
379     @property
380     def peer_authkey(self):
381         if self.is_initiator:
382             return self.sk_ar
383         return self.sk_ai
384
385     @property
386     def my_authkey(self):
387         if self.is_initiator:
388             return self.sk_ai
389         return self.sk_ar
390
391     @property
392     def my_cryptokey(self):
393         if self.is_initiator:
394             return self.sk_ei
395         return self.sk_er
396
397     @property
398     def peer_cryptokey(self):
399         if self.is_initiator:
400             return self.sk_er
401         return self.sk_ei
402
403     def concat(self, alg, key_len):
404         return alg + '-' + str(key_len * 8)
405
406     @property
407     def vpp_ike_cypto_alg(self):
408         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
409
410     @property
411     def vpp_esp_cypto_alg(self):
412         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
413
414     def verify_hmac(self, ikemsg):
415         integ_trunc = self.ike_integ_alg.trunc_len
416         exp_hmac = ikemsg[-integ_trunc:]
417         data = ikemsg[:-integ_trunc]
418         computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
419                                           self.peer_authkey, data)
420         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
421
422     def compute_hmac(self, integ, key, data):
423         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
424         h.update(data)
425         return h.finalize()
426
427     def decrypt(self, data, aad=None, icv=None):
428         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
429
430     def hmac_and_decrypt(self, ike):
431         ep = ike[ikev2.IKEv2_payload_Encrypted]
432         if self.ike_crypto == 'AES-GCM-16ICV':
433             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
434             ct = ep.load[:-GCM_ICV_SIZE]
435             tag = ep.load[-GCM_ICV_SIZE:]
436             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
437         else:
438             self.verify_hmac(raw(ike))
439             integ_trunc = self.ike_integ_alg.trunc_len
440
441             # remove ICV and decrypt payload
442             ct = ep.load[:-integ_trunc]
443             plain = self.decrypt(ct)
444         # remove padding
445         pad_len = plain[-1]
446         return plain[:-pad_len - 1]
447
448     def build_ts_addr(self, ts, version):
449         return {'starting_address_v' + version: ts['start_addr'],
450                 'ending_address_v' + version: ts['end_addr']}
451
452     def generate_ts(self, is_ip4):
453         c = self.child_sas[0]
454         ts_data = {'IP_protocol_ID': 0,
455                    'start_port': 0,
456                    'end_port': 0xffff}
457         if is_ip4:
458             ts_data.update(self.build_ts_addr(c.local_ts, '4'))
459             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
460             ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
461             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
462         else:
463             ts_data.update(self.build_ts_addr(c.local_ts, '6'))
464             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
465             ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
466             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
467
468         if self.is_initiator:
469             return ([ts1], [ts2])
470         return ([ts2], [ts1])
471
472     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
473         if crypto not in CRYPTO_ALGOS:
474             raise TypeError('unsupported encryption algo %r' % crypto)
475         self.ike_crypto = crypto
476         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
477         self.ike_crypto_key_len = crypto_key_len
478
479         if integ not in AUTH_ALGOS:
480             raise TypeError('unsupported auth algo %r' % integ)
481         self.ike_integ = None if integ == 'NULL' else integ
482         self.ike_integ_alg = AUTH_ALGOS[integ]
483
484         if prf not in PRF_ALGOS:
485             raise TypeError('unsupported prf algo %r' % prf)
486         self.ike_prf = prf
487         self.ike_prf_alg = PRF_ALGOS[prf]
488         self.ike_dh = dh
489         self.ike_group = DH[self.ike_dh]
490
491     def set_esp_props(self, crypto, crypto_key_len, integ):
492         self.esp_crypto_key_len = crypto_key_len
493         if crypto not in CRYPTO_ALGOS:
494             raise TypeError('unsupported encryption algo %r' % crypto)
495         self.esp_crypto = crypto
496         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
497
498         if integ not in AUTH_ALGOS:
499             raise TypeError('unsupported auth algo %r' % integ)
500         self.esp_integ = None if integ == 'NULL' else integ
501         self.esp_integ_alg = AUTH_ALGOS[integ]
502
503     def crypto_attr(self, key_len):
504         if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
505             return (0x800e << 16 | key_len << 3, 12)
506         else:
507             raise Exception('unsupported attribute type')
508
509     def ike_crypto_attr(self):
510         return self.crypto_attr(self.ike_crypto_key_len)
511
512     def esp_crypto_attr(self):
513         return self.crypto_attr(self.esp_crypto_key_len)
514
515     def compute_nat_sha1(self, ip, port, rspi=None):
516         if rspi is None:
517             rspi = self.rspi
518         data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
519         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
520         digest.update(data)
521         return digest.finalize()
522
523
524 class IkePeer(VppTestCase):
525     """ common class for initiator and responder """
526
527     @classmethod
528     def setUpClass(cls):
529         import scapy.contrib.ikev2 as _ikev2
530         globals()['ikev2'] = _ikev2
531         super(IkePeer, cls).setUpClass()
532         cls.create_pg_interfaces(range(2))
533         for i in cls.pg_interfaces:
534             i.admin_up()
535             i.config_ip4()
536             i.resolve_arp()
537             i.config_ip6()
538             i.resolve_ndp()
539
540     @classmethod
541     def tearDownClass(cls):
542         super(IkePeer, cls).tearDownClass()
543
544     def tearDown(self):
545         super(IkePeer, self).tearDown()
546         if self.del_sa_from_responder:
547             self.initiate_del_sa_from_responder()
548         else:
549             self.initiate_del_sa_from_initiator()
550         r = self.vapi.ikev2_sa_dump()
551         self.assertEqual(len(r), 0)
552         sas = self.vapi.ipsec_sa_dump()
553         self.assertEqual(len(sas), 0)
554         self.p.remove_vpp_config()
555         self.assertIsNone(self.p.query_vpp_config())
556
557     def setUp(self):
558         super(IkePeer, self).setUp()
559         self.config_tc()
560         self.p.add_vpp_config()
561         self.assertIsNotNone(self.p.query_vpp_config())
562         self.sa.generate_dh_data()
563         self.vapi.cli('ikev2 set logging level 4')
564         self.vapi.cli('event-lo clear')
565
566     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
567                       use_ip6=False):
568         if use_ip6:
569             src_ip = src_if.remote_ip6
570             dst_ip = src_if.local_ip6
571             ip_layer = IPv6
572         else:
573             src_ip = src_if.remote_ip4
574             dst_ip = src_if.local_ip4
575             ip_layer = IP
576         res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
577                ip_layer(src=src_ip, dst=dst_ip) /
578                UDP(sport=sport, dport=dport))
579         if natt:
580             # insert non ESP marker
581             res = res / Raw(b'\x00' * 4)
582         return res / msg
583
584     def verify_udp(self, udp):
585         self.assertEqual(udp.sport, self.sa.sport)
586         self.assertEqual(udp.dport, self.sa.dport)
587
588     def get_ike_header(self, packet):
589         try:
590             ih = packet[ikev2.IKEv2]
591         except IndexError as e:
592             # this is a workaround for getting IKEv2 layer as both ikev2 and
593             # ipsec register for port 4500
594             esp = packet[ESP]
595             ih = self.verify_and_remove_non_esp_marker(esp)
596         self.assertEqual(ih.version, 0x20)
597         self.assertNotIn('Version', ih.flags)
598         return ih
599
600     def verify_and_remove_non_esp_marker(self, packet):
601         if self.sa.natt:
602             # if we are in nat traversal mode check for non esp marker
603             # and remove it
604             data = raw(packet)
605             self.assertEqual(data[:4], b'\x00' * 4)
606             return ikev2.IKEv2(data[4:])
607         else:
608             return packet
609
610     def encrypt_ike_msg(self, header, plain, first_payload):
611         if self.sa.ike_crypto == 'AES-GCM-16ICV':
612             data = self.sa.ike_crypto_alg.pad(raw(plain))
613             plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
614                 len(ikev2.IKEv2_payload_Encrypted())
615             tlen = plen + len(ikev2.IKEv2())
616
617             # prepare aad data
618             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
619                                                  length=plen)
620             header.length = tlen
621             res = header / sk_p
622             encr = self.sa.encrypt(raw(plain), raw(res))
623             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
624                                                  length=plen, load=encr)
625             res = header / sk_p
626         else:
627             encr = self.sa.encrypt(raw(plain))
628             trunc_len = self.sa.ike_integ_alg.trunc_len
629             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
630             tlen = plen + len(ikev2.IKEv2())
631
632             sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
633                                                  length=plen, load=encr)
634             header.length = tlen
635             res = header / sk_p
636
637             integ_data = raw(res)
638             hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
639                                              self.sa.my_authkey, integ_data)
640             res = res / Raw(hmac_data[:trunc_len])
641         assert(len(res) == tlen)
642         return res
643
644     def verify_ipsec_sas(self):
645         sas = self.vapi.ipsec_sa_dump()
646         self.assertEqual(len(sas), 2)
647         e = VppEnum.vl_api_ipsec_sad_flags_t
648         if self.sa.is_initiator:
649             sa0 = sas[0].entry
650             sa1 = sas[1].entry
651         else:
652             sa1 = sas[0].entry
653             sa0 = sas[1].entry
654
655         c = self.sa.child_sas[0]
656
657         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
658         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
659         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
660
661         if self.sa.esp_integ is None:
662             vpp_integ_alg = 0
663         else:
664             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
665         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
666         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
667
668         # verify crypto keys
669         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
670         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
671         self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
672         self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
673
674         # verify integ keys
675         if vpp_integ_alg:
676             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
677             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
678             self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
679             self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
680         else:
681             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
682             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
683
684     def verify_keymat(self, api_keys, keys, name):
685         km = getattr(keys, name)
686         api_km = getattr(api_keys, name)
687         api_km_len = getattr(api_keys, name + '_len')
688         self.assertEqual(len(km), api_km_len)
689         self.assertEqual(km, api_km[:api_km_len])
690
691     def verify_id(self, api_id, exp_id):
692         self.assertEqual(api_id.type, IDType.value(exp_id.type))
693         self.assertEqual(api_id.data_len, exp_id.data_len)
694         self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
695
696     def verify_ike_sas(self):
697         r = self.vapi.ikev2_sa_dump()
698         self.assertEqual(len(r), 1)
699         sa = r[0].sa
700         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
701         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
702         if self.ip6:
703             if self.sa.is_initiator:
704                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
705                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
706             else:
707                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
708                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
709         else:
710             if self.sa.is_initiator:
711                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
712                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
713             else:
714                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
715                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
716         self.verify_keymat(sa.keys, self.sa, 'sk_d')
717         self.verify_keymat(sa.keys, self.sa, 'sk_ai')
718         self.verify_keymat(sa.keys, self.sa, 'sk_ar')
719         self.verify_keymat(sa.keys, self.sa, 'sk_ei')
720         self.verify_keymat(sa.keys, self.sa, 'sk_er')
721         self.verify_keymat(sa.keys, self.sa, 'sk_pi')
722         self.verify_keymat(sa.keys, self.sa, 'sk_pr')
723
724         self.assertEqual(sa.i_id.type, self.sa.id_type)
725         self.assertEqual(sa.r_id.type, self.sa.id_type)
726         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
727         self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
728         self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
729         self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
730
731         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
732         self.assertEqual(len(r), 1)
733         csa = r[0].child_sa
734         self.assertEqual(csa.sa_index, sa.sa_index)
735         c = self.sa.child_sas[0]
736         if hasattr(c, 'sk_ai'):
737             self.verify_keymat(csa.keys, c, 'sk_ai')
738             self.verify_keymat(csa.keys, c, 'sk_ar')
739         self.verify_keymat(csa.keys, c, 'sk_ei')
740         self.verify_keymat(csa.keys, c, 'sk_er')
741
742         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
743         tsi = tsi[0]
744         tsr = tsr[0]
745         r = self.vapi.ikev2_traffic_selector_dump(
746                 is_initiator=True, sa_index=sa.sa_index,
747                 child_sa_index=csa.child_sa_index)
748         self.assertEqual(len(r), 1)
749         ts = r[0].ts
750         self.verify_ts(r[0].ts, tsi[0], True)
751
752         r = self.vapi.ikev2_traffic_selector_dump(
753                 is_initiator=False, sa_index=sa.sa_index,
754                 child_sa_index=csa.child_sa_index)
755         self.assertEqual(len(r), 1)
756         self.verify_ts(r[0].ts, tsr[0], False)
757
758         n = self.vapi.ikev2_nonce_get(is_initiator=True,
759                                       sa_index=sa.sa_index)
760         self.verify_nonce(n, self.sa.i_nonce)
761         n = self.vapi.ikev2_nonce_get(is_initiator=False,
762                                       sa_index=sa.sa_index)
763         self.verify_nonce(n, self.sa.r_nonce)
764
765     def verify_nonce(self, api_nonce, nonce):
766         self.assertEqual(api_nonce.data_len, len(nonce))
767         self.assertEqual(api_nonce.nonce, nonce)
768
769     def verify_ts(self, api_ts, ts, is_initiator):
770         if is_initiator:
771             self.assertTrue(api_ts.is_local)
772         else:
773             self.assertFalse(api_ts.is_local)
774
775         if self.p.ts_is_ip4:
776             self.assertEqual(api_ts.start_addr,
777                              IPv4Address(ts.starting_address_v4))
778             self.assertEqual(api_ts.end_addr,
779                              IPv4Address(ts.ending_address_v4))
780         else:
781             self.assertEqual(api_ts.start_addr,
782                              IPv6Address(ts.starting_address_v6))
783             self.assertEqual(api_ts.end_addr,
784                              IPv6Address(ts.ending_address_v6))
785         self.assertEqual(api_ts.start_port, ts.start_port)
786         self.assertEqual(api_ts.end_port, ts.end_port)
787         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
788
789
790 class TemplateInitiator(IkePeer):
791     """ initiator test template """
792
793     def initiate_del_sa_from_initiator(self):
794         ispi = int.from_bytes(self.sa.ispi, 'little')
795         self.pg0.enable_capture()
796         self.pg_start()
797         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
798         capture = self.pg0.get_capture(1)
799         ih = self.get_ike_header(capture[0])
800         self.assertNotIn('Response', ih.flags)
801         self.assertIn('Initiator', ih.flags)
802         self.assertEqual(ih.init_SPI, self.sa.ispi)
803         self.assertEqual(ih.resp_SPI, self.sa.rspi)
804         plain = self.sa.hmac_and_decrypt(ih)
805         d = ikev2.IKEv2_payload_Delete(plain)
806         self.assertEqual(d.proto, 1)  # proto=IKEv2
807         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
808                              flags='Response', exch_type='INFORMATIONAL',
809                              id=ih.id, next_payload='Encrypted')
810         resp = self.encrypt_ike_msg(header, b'', None)
811         self.send_and_assert_no_replies(self.pg0, resp)
812
813     def verify_del_sa(self, packet):
814         ih = self.get_ike_header(packet)
815         self.assertEqual(ih.id, self.sa.msg_id)
816         self.assertEqual(ih.exch_type, 37)  # exchange informational
817         self.assertIn('Response', ih.flags)
818         self.assertIn('Initiator', ih.flags)
819         plain = self.sa.hmac_and_decrypt(ih)
820         self.assertEqual(plain, b'')
821
822     def initiate_del_sa_from_responder(self):
823         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
824                              exch_type='INFORMATIONAL',
825                              id=self.sa.new_msg_id())
826         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
827         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
828         packet = self.create_packet(self.pg0, ike_msg,
829                                     self.sa.sport, self.sa.dport,
830                                     self.sa.natt, self.ip6)
831         self.pg0.add_stream(packet)
832         self.pg0.enable_capture()
833         self.pg_start()
834         capture = self.pg0.get_capture(1)
835         self.verify_del_sa(capture[0])
836
837     @staticmethod
838     def find_notify_payload(packet, notify_type):
839         n = packet[ikev2.IKEv2_payload_Notify]
840         while n is not None:
841             if n.type == notify_type:
842                 return n
843             n = n.payload
844         return None
845
846     def verify_nat_detection(self, packet):
847         if self.ip6:
848             iph = packet[IPv6]
849         else:
850             iph = packet[IP]
851         udp = packet[UDP]
852
853         # NAT_DETECTION_SOURCE_IP
854         s = self.find_notify_payload(packet, 16388)
855         self.assertIsNotNone(s)
856         src_sha = self.sa.compute_nat_sha1(
857                 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
858         self.assertEqual(s.load, src_sha)
859
860         # NAT_DETECTION_DESTINATION_IP
861         s = self.find_notify_payload(packet, 16389)
862         self.assertIsNotNone(s)
863         dst_sha = self.sa.compute_nat_sha1(
864                 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
865         self.assertEqual(s.load, dst_sha)
866
867     def verify_sa_init_request(self, packet):
868         ih = packet[ikev2.IKEv2]
869         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
870         self.assertEqual(ih.exch_type, 34)  # SA_INIT
871         self.sa.ispi = ih.init_SPI
872         self.assertEqual(ih.resp_SPI, 8 * b'\x00')
873         self.assertIn('Initiator', ih.flags)
874         self.assertNotIn('Response', ih.flags)
875         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
876         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
877
878         prop = packet[ikev2.IKEv2_payload_Proposal]
879         self.assertEqual(prop.proto, 1)  # proto = ikev2
880         self.assertEqual(prop.proposal, 1)
881         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
882         self.assertEqual(prop.trans[0].transform_id,
883                          self.p.ike_transforms['crypto_alg'])
884         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
885         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
886         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
887         self.assertEqual(prop.trans[2].transform_id,
888                          self.p.ike_transforms['dh_group'])
889
890         self.verify_nat_detection(packet)
891         self.sa.complete_dh_data()
892         self.sa.calc_keys()
893
894     def verify_sa_auth_req(self, packet):
895         ih = self.get_ike_header(packet)
896         self.assertEqual(ih.resp_SPI, self.sa.rspi)
897         self.assertEqual(ih.init_SPI, self.sa.ispi)
898         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
899         self.assertIn('Initiator', ih.flags)
900         self.assertNotIn('Response', ih.flags)
901
902         udp = packet[UDP]
903         self.verify_udp(udp)
904         self.assertEqual(ih.id, self.sa.msg_id + 1)
905         self.sa.msg_id += 1
906         plain = self.sa.hmac_and_decrypt(ih)
907         idi = ikev2.IKEv2_payload_IDi(plain)
908         idr = ikev2.IKEv2_payload_IDr(idi.payload)
909         self.assertEqual(idi.load, self.sa.i_id)
910         self.assertEqual(idr.load, self.sa.r_id)
911
912     def send_init_response(self):
913         tr_attr = self.sa.ike_crypto_attr()
914         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
915                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
916                  key_length=tr_attr[0]) /
917                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
918                  transform_id=self.sa.ike_integ) /
919                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
920                  transform_id=self.sa.ike_prf_alg.name) /
921                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
922                  transform_id=self.sa.ike_dh))
923         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
924                  trans_nb=4, trans=trans))
925         self.sa.init_resp_packet = (
926             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
927                         exch_type='IKE_SA_INIT', flags='Response') /
928             ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
929             ikev2.IKEv2_payload_KE(next_payload='Nonce',
930                                    group=self.sa.ike_dh,
931                                    load=self.sa.my_dh_pub_key) /
932             ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
933
934         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
935                                      self.sa.sport, self.sa.dport,
936                                      self.sa.natt, self.ip6)
937         self.pg_send(self.pg0, ike_msg)
938         capture = self.pg0.get_capture(1)
939         self.verify_sa_auth_req(capture[0])
940
941     def initiate_sa_init(self):
942         self.pg0.enable_capture()
943         self.pg_start()
944         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
945
946         capture = self.pg0.get_capture(1)
947         self.verify_sa_init_request(capture[0])
948         self.send_init_response()
949
950     def send_auth_response(self):
951         tr_attr = self.sa.esp_crypto_attr()
952         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
953                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
954                  key_length=tr_attr[0]) /
955                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
956                  transform_id=self.sa.esp_integ) /
957                  ikev2.IKEv2_payload_Transform(
958                  transform_type='Extended Sequence Number',
959                  transform_id='No ESN') /
960                  ikev2.IKEv2_payload_Transform(
961                  transform_type='Extended Sequence Number',
962                  transform_id='ESN'))
963
964         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
965                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
966
967         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
968         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
969                  IDtype=self.sa.id_type, load=self.sa.i_id) /
970                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
971                  IDtype=self.sa.id_type, load=self.sa.r_id) /
972                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
973                  auth_type=AuthMethod.value(self.sa.auth_method),
974                  load=self.sa.auth_data) /
975                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
976                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
977                  number_of_TSs=len(tsi),
978                  traffic_selector=tsi) /
979                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
980                  number_of_TSs=len(tsr),
981                  traffic_selector=tsr) /
982                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
983
984         header = ikev2.IKEv2(
985                 init_SPI=self.sa.ispi,
986                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
987                 flags='Response', exch_type='IKE_AUTH')
988
989         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
990         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
991                                     self.sa.dport, self.sa.natt, self.ip6)
992         self.pg_send(self.pg0, packet)
993
994     def test_initiator(self):
995         self.initiate_sa_init()
996         self.sa.auth_init()
997         self.sa.calc_child_keys()
998         self.send_auth_response()
999         self.verify_ike_sas()
1000
1001
1002 class TemplateResponder(IkePeer):
1003     """ responder test template """
1004
1005     def initiate_del_sa_from_responder(self):
1006         self.pg0.enable_capture()
1007         self.pg_start()
1008         self.vapi.ikev2_initiate_del_ike_sa(
1009                 ispi=int.from_bytes(self.sa.ispi, 'little'))
1010         capture = self.pg0.get_capture(1)
1011         ih = self.get_ike_header(capture[0])
1012         self.assertNotIn('Response', ih.flags)
1013         self.assertNotIn('Initiator', ih.flags)
1014         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1015         plain = self.sa.hmac_and_decrypt(ih)
1016         d = ikev2.IKEv2_payload_Delete(plain)
1017         self.assertEqual(d.proto, 1)  # proto=IKEv2
1018         self.assertEqual(ih.init_SPI, self.sa.ispi)
1019         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1020         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1021                              flags='Initiator+Response',
1022                              exch_type='INFORMATIONAL',
1023                              id=ih.id, next_payload='Encrypted')
1024         resp = self.encrypt_ike_msg(header, b'', None)
1025         self.send_and_assert_no_replies(self.pg0, resp)
1026
1027     def verify_del_sa(self, packet):
1028         ih = self.get_ike_header(packet)
1029         self.assertEqual(ih.id, self.sa.msg_id)
1030         self.assertEqual(ih.exch_type, 37)  # exchange informational
1031         self.assertIn('Response', ih.flags)
1032         self.assertNotIn('Initiator', ih.flags)
1033         self.assertEqual(ih.next_payload, 46)  # Encrypted
1034         self.assertEqual(ih.init_SPI, self.sa.ispi)
1035         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1036         plain = self.sa.hmac_and_decrypt(ih)
1037         self.assertEqual(plain, b'')
1038
1039     def initiate_del_sa_from_initiator(self):
1040         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1041                              flags='Initiator', exch_type='INFORMATIONAL',
1042                              id=self.sa.new_msg_id())
1043         del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1044         ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1045         packet = self.create_packet(self.pg0, ike_msg,
1046                                     self.sa.sport, self.sa.dport,
1047                                     self.sa.natt, self.ip6)
1048         self.pg0.add_stream(packet)
1049         self.pg0.enable_capture()
1050         self.pg_start()
1051         capture = self.pg0.get_capture(1)
1052         self.verify_del_sa(capture[0])
1053
1054     def send_sa_init_req(self, behind_nat=False):
1055         tr_attr = self.sa.ike_crypto_attr()
1056         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1057                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
1058                  key_length=tr_attr[0]) /
1059                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1060                  transform_id=self.sa.ike_integ) /
1061                  ikev2.IKEv2_payload_Transform(transform_type='PRF',
1062                  transform_id=self.sa.ike_prf_alg.name) /
1063                  ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1064                  transform_id=self.sa.ike_dh))
1065
1066         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1067                  trans_nb=4, trans=trans))
1068
1069         self.sa.init_req_packet = (
1070                 ikev2.IKEv2(init_SPI=self.sa.ispi,
1071                             flags='Initiator', exch_type='IKE_SA_INIT') /
1072                 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1073                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1074                                        group=self.sa.ike_dh,
1075                                        load=self.sa.my_dh_pub_key) /
1076                 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1077                                           load=self.sa.i_nonce))
1078
1079         if behind_nat:
1080             src_address = b'\x0a\x0a\x0a\x01'
1081         else:
1082             src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1083
1084         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1085         dst_nat = self.sa.compute_nat_sha1(
1086                 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1087                 self.sa.dport)
1088         nat_src_detection = ikev2.IKEv2_payload_Notify(
1089                 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1090                 next_payload='Notify')
1091         nat_dst_detection = ikev2.IKEv2_payload_Notify(
1092                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1093         self.sa.init_req_packet = (self.sa.init_req_packet /
1094                                    nat_src_detection /
1095                                    nat_dst_detection)
1096
1097         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1098                                      self.sa.sport, self.sa.dport,
1099                                      self.sa.natt, self.ip6)
1100         self.pg0.add_stream(ike_msg)
1101         self.pg0.enable_capture()
1102         self.pg_start()
1103         capture = self.pg0.get_capture(1)
1104         self.verify_sa_init(capture[0])
1105
1106     def send_sa_auth(self):
1107         tr_attr = self.sa.esp_crypto_attr()
1108         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1109                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
1110                  key_length=tr_attr[0]) /
1111                  ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1112                  transform_id=self.sa.esp_integ) /
1113                  ikev2.IKEv2_payload_Transform(
1114                  transform_type='Extended Sequence Number',
1115                  transform_id='No ESN') /
1116                  ikev2.IKEv2_payload_Transform(
1117                  transform_type='Extended Sequence Number',
1118                  transform_id='ESN'))
1119
1120         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1121                  SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
1122
1123         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1124         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1125                  IDtype=self.sa.id_type, load=self.sa.i_id) /
1126                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1127                  IDtype=self.sa.id_type, load=self.sa.r_id) /
1128                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
1129                  auth_type=AuthMethod.value(self.sa.auth_method),
1130                  load=self.sa.auth_data) /
1131                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1132                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
1133                  number_of_TSs=len(tsi),
1134                  traffic_selector=tsi) /
1135                  ikev2.IKEv2_payload_TSr(next_payload='Notify',
1136                  number_of_TSs=len(tsr),
1137                  traffic_selector=tsr) /
1138                  ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1139
1140         header = ikev2.IKEv2(
1141                 init_SPI=self.sa.ispi,
1142                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1143                 flags='Initiator', exch_type='IKE_AUTH')
1144
1145         ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1146         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1147                                     self.sa.dport, self.sa.natt, self.ip6)
1148         self.pg0.add_stream(packet)
1149         self.pg0.enable_capture()
1150         self.pg_start()
1151         capture = self.pg0.get_capture(1)
1152         self.verify_sa_auth_resp(capture[0])
1153
1154     def verify_sa_init(self, packet):
1155         ih = self.get_ike_header(packet)
1156
1157         self.assertEqual(ih.id, self.sa.msg_id)
1158         self.assertEqual(ih.exch_type, 34)
1159         self.assertIn('Response', ih.flags)
1160         self.assertEqual(ih.init_SPI, self.sa.ispi)
1161         self.assertNotEqual(ih.resp_SPI, 0)
1162         self.sa.rspi = ih.resp_SPI
1163         try:
1164             sa = ih[ikev2.IKEv2_payload_SA]
1165             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1166             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1167         except IndexError as e:
1168             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1169             self.logger.error(ih.show())
1170             raise
1171         self.sa.complete_dh_data()
1172         self.sa.calc_keys()
1173         self.sa.auth_init()
1174
1175     def verify_sa_auth_resp(self, packet):
1176         ike = self.get_ike_header(packet)
1177         udp = packet[UDP]
1178         self.verify_udp(udp)
1179         self.assertEqual(ike.id, self.sa.msg_id)
1180         plain = self.sa.hmac_and_decrypt(ike)
1181         self.sa.calc_child_keys()
1182
1183     def test_responder(self):
1184         self.send_sa_init_req(self.sa.natt)
1185         self.send_sa_auth()
1186         self.verify_ipsec_sas()
1187         self.verify_ike_sas()
1188
1189
1190 class Ikev2Params(object):
1191     def config_params(self, params={}):
1192         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1193         ei = VppEnum.vl_api_ipsec_integ_alg_t
1194         self.vpp_enums = {
1195                 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1196                 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1197                 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1198                 'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1199                 'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1200                 'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1201
1202                 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1203                 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1204                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1205                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1206
1207         self.del_sa_from_responder = False if 'del_sa_from_responder'\
1208             not in params else params['del_sa_from_responder']
1209         is_natt = 'natt' in params and params['natt'] or False
1210         self.p = Profile(self, 'pr1')
1211         self.ip6 = False if 'ip6' not in params else params['ip6']
1212
1213         if 'auth' in params and params['auth'] == 'rsa-sig':
1214             auth_method = 'rsa-sig'
1215             work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1216             self.vapi.ikev2_set_local_key(
1217                     key_file=work_dir + params['server-key'])
1218
1219             client_file = work_dir + params['client-cert']
1220             server_pem = open(work_dir + params['server-cert']).read()
1221             client_priv = open(work_dir + params['client-key']).read()
1222             client_priv = load_pem_private_key(str.encode(client_priv), None,
1223                                                default_backend())
1224             self.peer_cert = x509.load_pem_x509_certificate(
1225                     str.encode(server_pem),
1226                     default_backend())
1227             self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1228             auth_data = None
1229         else:
1230             auth_data = b'$3cr3tpa$$w0rd'
1231             self.p.add_auth(method='shared-key', data=auth_data)
1232             auth_method = 'shared-key'
1233             client_priv = None
1234
1235         is_init = True if 'is_initiator' not in params else\
1236             params['is_initiator']
1237
1238         idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1239         idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1240         if is_init:
1241             self.p.add_local_id(**idr)
1242             self.p.add_remote_id(**idi)
1243         else:
1244             self.p.add_local_id(**idi)
1245             self.p.add_remote_id(**idr)
1246
1247         loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1248             'loc_ts' not in params else params['loc_ts']
1249         rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1250             'rem_ts' not in params else params['rem_ts']
1251         self.p.add_local_ts(**loc_ts)
1252         self.p.add_remote_ts(**rem_ts)
1253         if 'responder' in params:
1254             self.p.add_responder(params['responder'])
1255         if 'ike_transforms' in params:
1256             self.p.add_ike_transforms(params['ike_transforms'])
1257         if 'esp_transforms' in params:
1258             self.p.add_esp_transforms(params['esp_transforms'])
1259
1260         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1261                           is_initiator=is_init,
1262                           id_type=self.p.local_id['id_type'], natt=is_natt,
1263                           priv_key=client_priv, auth_method=auth_method,
1264                           auth_data=auth_data,
1265                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1266
1267         ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1268             params['ike-crypto']
1269         ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1270             params['ike-integ']
1271         ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1272             params['ike-dh']
1273
1274         esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1275             params['esp-crypto']
1276         esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1277             params['esp-integ']
1278
1279         self.sa.set_ike_props(
1280                 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1281                 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1282         self.sa.set_esp_props(
1283                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1284                 integ=esp_integ)
1285
1286
1287 class TestApi(VppTestCase):
1288     """ Test IKEV2 API """
1289     @classmethod
1290     def setUpClass(cls):
1291         super(TestApi, cls).setUpClass()
1292
1293     @classmethod
1294     def tearDownClass(cls):
1295         super(TestApi, cls).tearDownClass()
1296
1297     def tearDown(self):
1298         super(TestApi, self).tearDown()
1299         self.p1.remove_vpp_config()
1300         self.p2.remove_vpp_config()
1301         r = self.vapi.ikev2_profile_dump()
1302         self.assertEqual(len(r), 0)
1303
1304     def configure_profile(self, cfg):
1305         p = Profile(self, cfg['name'])
1306         p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1307         p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1308         p.add_local_ts(**cfg['loc_ts'])
1309         p.add_remote_ts(**cfg['rem_ts'])
1310         p.add_responder(cfg['responder'])
1311         p.add_ike_transforms(cfg['ike_ts'])
1312         p.add_esp_transforms(cfg['esp_ts'])
1313         p.add_auth(**cfg['auth'])
1314         p.set_udp_encap(cfg['udp_encap'])
1315         p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1316         if 'lifetime_data' in cfg:
1317             p.set_lifetime_data(cfg['lifetime_data'])
1318         if 'tun_itf' in cfg:
1319             p.set_tunnel_interface(cfg['tun_itf'])
1320         p.add_vpp_config()
1321         return p
1322
1323     def test_profile_api(self):
1324         """ test profile dump API """
1325         loc_ts4 = {
1326                     'proto': 8,
1327                     'start_port': 1,
1328                     'end_port': 19,
1329                     'start_addr': '3.3.3.2',
1330                     'end_addr': '3.3.3.3',
1331                 }
1332         rem_ts4 = {
1333                     'proto': 9,
1334                     'start_port': 10,
1335                     'end_port': 119,
1336                     'start_addr': '4.5.76.80',
1337                     'end_addr': '2.3.4.6',
1338                 }
1339
1340         loc_ts6 = {
1341                     'proto': 8,
1342                     'start_port': 1,
1343                     'end_port': 19,
1344                     'start_addr': 'ab::1',
1345                     'end_addr': 'ab::4',
1346                 }
1347         rem_ts6 = {
1348                     'proto': 9,
1349                     'start_port': 10,
1350                     'end_port': 119,
1351                     'start_addr': 'cd::12',
1352                     'end_addr': 'cd::13',
1353                 }
1354
1355         conf = {
1356             'p1': {
1357                 'name': 'p1',
1358                 'loc_id': ('fqdn', b'vpp.home'),
1359                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1360                 'loc_ts': loc_ts4,
1361                 'rem_ts': rem_ts4,
1362                 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1363                 'ike_ts': {
1364                         'crypto_alg': 20,
1365                         'crypto_key_size': 32,
1366                         'integ_alg': 1,
1367                         'dh_group': 1},
1368                 'esp_ts': {
1369                         'crypto_alg': 13,
1370                         'crypto_key_size': 24,
1371                         'integ_alg': 2},
1372                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1373                 'udp_encap': True,
1374                 'ipsec_over_udp_port': 4501,
1375                 'lifetime_data': {
1376                     'lifetime': 123,
1377                     'lifetime_maxdata': 20192,
1378                     'lifetime_jitter': 9,
1379                     'handover': 132},
1380             },
1381             'p2': {
1382                 'name': 'p2',
1383                 'loc_id': ('ip4-addr', b'192.168.2.1'),
1384                 'rem_id': ('ip6-addr', b'abcd::1'),
1385                 'loc_ts': loc_ts6,
1386                 'rem_ts': rem_ts6,
1387                 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1388                 'ike_ts': {
1389                         'crypto_alg': 12,
1390                         'crypto_key_size': 16,
1391                         'integ_alg': 3,
1392                         'dh_group': 3},
1393                 'esp_ts': {
1394                         'crypto_alg': 9,
1395                         'crypto_key_size': 24,
1396                         'integ_alg': 4},
1397                 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1398                 'udp_encap': False,
1399                 'ipsec_over_udp_port': 4600,
1400                 'tun_itf': 0}
1401         }
1402         self.p1 = self.configure_profile(conf['p1'])
1403         self.p2 = self.configure_profile(conf['p2'])
1404
1405         r = self.vapi.ikev2_profile_dump()
1406         self.assertEqual(len(r), 2)
1407         self.verify_profile(r[0].profile, conf['p1'])
1408         self.verify_profile(r[1].profile, conf['p2'])
1409
1410     def verify_id(self, api_id, cfg_id):
1411         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1412         self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1413
1414     def verify_ts(self, api_ts, cfg_ts):
1415         self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1416         self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1417         self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1418         self.assertEqual(api_ts.start_addr,
1419                          ip_address(text_type(cfg_ts['start_addr'])))
1420         self.assertEqual(api_ts.end_addr,
1421                          ip_address(text_type(cfg_ts['end_addr'])))
1422
1423     def verify_responder(self, api_r, cfg_r):
1424         self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1425         self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1426
1427     def verify_transforms(self, api_ts, cfg_ts):
1428         self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1429         self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1430         self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1431
1432     def verify_ike_transforms(self, api_ts, cfg_ts):
1433         self.verify_transforms(api_ts, cfg_ts)
1434         self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1435
1436     def verify_esp_transforms(self, api_ts, cfg_ts):
1437         self.verify_transforms(api_ts, cfg_ts)
1438
1439     def verify_auth(self, api_auth, cfg_auth):
1440         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1441         self.assertEqual(api_auth.data, cfg_auth['data'])
1442         self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1443
1444     def verify_lifetime_data(self, p, ld):
1445         self.assertEqual(p.lifetime, ld['lifetime'])
1446         self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1447         self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1448         self.assertEqual(p.handover, ld['handover'])
1449
1450     def verify_profile(self, ap, cp):
1451         self.assertEqual(ap.name, cp['name'])
1452         self.assertEqual(ap.udp_encap, cp['udp_encap'])
1453         self.verify_id(ap.loc_id, cp['loc_id'])
1454         self.verify_id(ap.rem_id, cp['rem_id'])
1455         self.verify_ts(ap.loc_ts, cp['loc_ts'])
1456         self.verify_ts(ap.rem_ts, cp['rem_ts'])
1457         self.verify_responder(ap.responder, cp['responder'])
1458         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1459         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1460         self.verify_auth(ap.auth, cp['auth'])
1461         if 'lifetime_data' in cp:
1462             self.verify_lifetime_data(ap, cp['lifetime_data'])
1463         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1464         if 'tun_itf' in cp:
1465             self.assertEqual(ap.tun_itf, cp['tun_itf'])
1466         else:
1467             self.assertEqual(ap.tun_itf, 0xffffffff)
1468
1469
1470 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1471     """ test ikev2 initiator - pre shared key auth """
1472
1473     def config_tc(self):
1474         self.config_params({
1475             'is_initiator': False,  # seen from test case perspective
1476                                     # thus vpp is initiator
1477             'responder': {'sw_if_index': self.pg0.sw_if_index,
1478                            'addr': self.pg0.remote_ip4},
1479             'ike-crypto': ('AES-GCM-16ICV', 32),
1480             'ike-integ': 'NULL',
1481             'ike-dh': '3072MODPgr',
1482             'ike_transforms': {
1483                 'crypto_alg': 20,  # "aes-gcm-16"
1484                 'crypto_key_size': 256,
1485                 'dh_group': 15,  # "modp-3072"
1486             },
1487             'esp_transforms': {
1488                 'crypto_alg': 12,  # "aes-cbc"
1489                 'crypto_key_size': 256,
1490                 # "hmac-sha2-256-128"
1491                 'integ_alg': 12}})
1492
1493
1494 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1495     """ test ikev2 initiator - delete IKE SA from responder """
1496
1497     def config_tc(self):
1498         self.config_params({
1499             'del_sa_from_responder': True,
1500             'is_initiator': False,  # seen from test case perspective
1501                                     # thus vpp is initiator
1502             'responder': {'sw_if_index': self.pg0.sw_if_index,
1503                            'addr': self.pg0.remote_ip4},
1504             'ike-crypto': ('AES-GCM-16ICV', 32),
1505             'ike-integ': 'NULL',
1506             'ike-dh': '3072MODPgr',
1507             'ike_transforms': {
1508                 'crypto_alg': 20,  # "aes-gcm-16"
1509                 'crypto_key_size': 256,
1510                 'dh_group': 15,  # "modp-3072"
1511             },
1512             'esp_transforms': {
1513                 'crypto_alg': 12,  # "aes-cbc"
1514                 'crypto_key_size': 256,
1515                 # "hmac-sha2-256-128"
1516                 'integ_alg': 12}})
1517
1518
1519 class TestResponderNATT(TemplateResponder, Ikev2Params):
1520     """ test ikev2 responder - nat traversal """
1521     def config_tc(self):
1522         self.config_params(
1523                 {'natt': True})
1524
1525
1526 class TestResponderPsk(TemplateResponder, Ikev2Params):
1527     """ test ikev2 responder - pre shared key auth """
1528     def config_tc(self):
1529         self.config_params()
1530
1531
1532 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1533     """ test ikev2 responder - cert based auth """
1534     def config_tc(self):
1535         self.config_params({
1536             'auth': 'rsa-sig',
1537             'server-key': 'server-key.pem',
1538             'client-key': 'client-key.pem',
1539             'client-cert': 'client-cert.pem',
1540             'server-cert': 'server-cert.pem'})
1541
1542
1543 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1544         (TemplateResponder, Ikev2Params):
1545     """
1546     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1547     """
1548     def config_tc(self):
1549         self.config_params({
1550             'ike-crypto': ('AES-CBC', 16),
1551             'ike-integ': 'SHA2-256-128',
1552             'esp-crypto': ('AES-CBC', 24),
1553             'esp-integ': 'SHA2-384-192',
1554             'ike-dh': '2048MODPgr'})
1555
1556
1557 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1558         (TemplateResponder, Ikev2Params):
1559     """
1560     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1561     """
1562     def config_tc(self):
1563         self.config_params({
1564             'ike-crypto': ('AES-CBC', 32),
1565             'ike-integ': 'SHA2-256-128',
1566             'esp-crypto': ('AES-GCM-16ICV', 32),
1567             'esp-integ': 'NULL',
1568             'ike-dh': '3072MODPgr'})
1569
1570
1571 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1572     """
1573     IKE:AES_GCM_16_256
1574     """
1575     def config_tc(self):
1576         self.config_params({
1577             'del_sa_from_responder': True,
1578             'ip6': True,
1579             'natt': True,
1580             'ike-crypto': ('AES-GCM-16ICV', 32),
1581             'ike-integ': 'NULL',
1582             'ike-dh': '2048MODPgr',
1583             'loc_ts': {'start_addr': 'ab:cd::0',
1584                        'end_addr': 'ab:cd::10'},
1585             'rem_ts': {'start_addr': '11::0',
1586                        'end_addr': '11::100'}})
1587
1588
1589 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1590     """ malformed packet test """
1591
1592     def tearDown(self):
1593         pass
1594
1595     def config_tc(self):
1596         self.config_params()
1597
1598     def assert_counter(self, count, name, version='ip4'):
1599         node_name = '/err/ikev2-%s/' % version + name
1600         self.assertEqual(count, self.statistics.get_err_counter(node_name))
1601
1602     def create_ike_init_msg(self, length=None, payload=None):
1603         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1604                           flags='Initiator', exch_type='IKE_SA_INIT')
1605         if payload is not None:
1606             msg /= payload
1607         return self.create_packet(self.pg0, msg, self.sa.sport,
1608                                   self.sa.dport)
1609
1610     def verify_bad_packet_length(self):
1611         ike_msg = self.create_ike_init_msg(length=0xdead)
1612         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1613         self.assert_counter(self.pkt_count, 'Bad packet length')
1614
1615     def verify_bad_sa_payload_length(self):
1616         p = ikev2.IKEv2_payload_SA(length=0xdead)
1617         ike_msg = self.create_ike_init_msg(payload=p)
1618         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1619         self.assert_counter(self.pkt_count, 'Malformed packet')
1620
1621     def test_responder(self):
1622         self.pkt_count = 254
1623         self.verify_bad_packet_length()
1624         self.verify_bad_sa_payload_length()
1625
1626
1627 if __name__ == '__main__':
1628     unittest.main(testRunner=VppTestRunner)