ikev2: accept key exchange on CREATE_CHILD_SA
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
26
27 try:
28     text_type = unicode
29 except NameError:
30     text_type = str
31
32 KEY_PAD = b"Key Pad for IKEv2"
33 SALT_SIZE = 4
34 GCM_ICV_SIZE = 16
35 GCM_IV_SIZE = 8
36
37
38 # defined in rfc3526
39 # tuple structure is (p, g, key_len)
40 DH = {
41     "2048MODPgr": (
42         long_converter(
43             """
44     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
45     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
46     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
47     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
48     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
49     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
50     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
51     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
52     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
53     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
54     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
55         ),
56         2,
57         256,
58     ),
59     "3072MODPgr": (
60         long_converter(
61             """
62     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
63     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
64     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
65     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
66     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
67     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
68     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
69     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
70     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
71     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
72     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
73     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
74     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
75     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
76     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
77     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
78         ),
79         2,
80         384,
81     ),
82 }
83
84
85 class CryptoAlgo(object):
86     def __init__(self, name, cipher, mode):
87         self.name = name
88         self.cipher = cipher
89         self.mode = mode
90         if self.cipher is not None:
91             self.bs = self.cipher.block_size // 8
92
93             if self.name == "AES-GCM-16ICV":
94                 self.iv_len = GCM_IV_SIZE
95             else:
96                 self.iv_len = self.bs
97
98     def encrypt(self, data, key, aad=None):
99         iv = os.urandom(self.iv_len)
100         if aad is None:
101             encryptor = Cipher(
102                 self.cipher(key), self.mode(iv), default_backend()
103             ).encryptor()
104             return iv + encryptor.update(data) + encryptor.finalize()
105         else:
106             salt = key[-SALT_SIZE:]
107             nonce = salt + iv
108             encryptor = Cipher(
109                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
110             ).encryptor()
111             encryptor.authenticate_additional_data(aad)
112             data = encryptor.update(data) + encryptor.finalize()
113             data += encryptor.tag[:GCM_ICV_SIZE]
114             return iv + data
115
116     def decrypt(self, data, key, aad=None, icv=None):
117         if aad is None:
118             iv = data[: self.iv_len]
119             ct = data[self.iv_len :]
120             decryptor = Cipher(
121                 algorithms.AES(key), self.mode(iv), default_backend()
122             ).decryptor()
123             return decryptor.update(ct) + decryptor.finalize()
124         else:
125             salt = key[-SALT_SIZE:]
126             nonce = salt + data[:GCM_IV_SIZE]
127             ct = data[GCM_IV_SIZE:]
128             key = key[:-SALT_SIZE]
129             decryptor = Cipher(
130                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
131             ).decryptor()
132             decryptor.authenticate_additional_data(aad)
133             return decryptor.update(ct) + decryptor.finalize()
134
135     def pad(self, data):
136         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
137         data = data + b"\x00" * (pad_len - 1)
138         return data + bytes([pad_len - 1])
139
140
141 class AuthAlgo(object):
142     def __init__(self, name, mac, mod, key_len, trunc_len=None):
143         self.name = name
144         self.mac = mac
145         self.mod = mod
146         self.key_len = key_len
147         self.trunc_len = trunc_len or key_len
148
149
150 CRYPTO_ALGOS = {
151     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
152     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
153     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
154 }
155
156 AUTH_ALGOS = {
157     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
158     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
159     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
160     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
161     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
162 }
163
164 PRF_ALGOS = {
165     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
167 }
168
169 CRYPTO_IDS = {
170     12: "AES-CBC",
171     20: "AES-GCM-16ICV",
172 }
173
174 INTEG_IDS = {
175     2: "HMAC-SHA1-96",
176     12: "SHA2-256-128",
177     13: "SHA2-384-192",
178     14: "SHA2-512-256",
179 }
180
181
182 class IKEv2ChildSA(object):
183     def __init__(self, local_ts, remote_ts, is_initiator):
184         spi = os.urandom(4)
185         if is_initiator:
186             self.ispi = spi
187             self.rspi = None
188         else:
189             self.rspi = spi
190             self.ispi = None
191         self.local_ts = local_ts
192         self.remote_ts = remote_ts
193
194
195 class IKEv2SA(object):
196     def __init__(
197         self,
198         test,
199         is_initiator=True,
200         i_id=None,
201         r_id=None,
202         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
203         id_type="fqdn",
204         nonce=None,
205         auth_data=None,
206         local_ts=None,
207         remote_ts=None,
208         auth_method="shared-key",
209         priv_key=None,
210         i_natt=False,
211         r_natt=False,
212         udp_encap=False,
213     ):
214         self.udp_encap = udp_encap
215         self.i_natt = i_natt
216         self.r_natt = r_natt
217         if i_natt or r_natt:
218             self.sport = 4500
219             self.dport = 4500
220         else:
221             self.sport = 500
222             self.dport = 500
223         self.msg_id = 0
224         self.dh_params = None
225         self.test = test
226         self.priv_key = priv_key
227         self.is_initiator = is_initiator
228         nonce = nonce or os.urandom(32)
229         self.auth_data = auth_data
230         self.i_id = i_id
231         self.r_id = r_id
232         if isinstance(id_type, str):
233             self.id_type = IDType.value(id_type)
234         else:
235             self.id_type = id_type
236         self.auth_method = auth_method
237         if self.is_initiator:
238             self.rspi = 8 * b"\x00"
239             self.ispi = spi
240             self.i_nonce = nonce
241         else:
242             self.rspi = spi
243             self.ispi = 8 * b"\x00"
244             self.r_nonce = nonce
245         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
246
247     def new_msg_id(self):
248         self.msg_id += 1
249         return self.msg_id
250
251     @property
252     def my_dh_pub_key(self):
253         if self.is_initiator:
254             return self.i_dh_data
255         return self.r_dh_data
256
257     @property
258     def peer_dh_pub_key(self):
259         if self.is_initiator:
260             return self.r_dh_data
261         return self.i_dh_data
262
263     @property
264     def natt(self):
265         return self.i_natt or self.r_natt
266
267     def compute_secret(self):
268         priv = self.dh_private_key
269         peer = self.peer_dh_pub_key
270         p, g, l = self.ike_group
271         return pow(
272             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
273         ).to_bytes(l, "big")
274
275     def generate_dh_data(self):
276         # generate DH keys
277         if self.ike_dh not in DH:
278             raise NotImplementedError("%s not in DH group" % self.ike_dh)
279
280         if self.dh_params is None:
281             dhg = DH[self.ike_dh]
282             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
283             self.dh_params = pn.parameters(default_backend())
284
285         priv = self.dh_params.generate_private_key()
286         pub = priv.public_key()
287         x = priv.private_numbers().x
288         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
289         y = pub.public_numbers().y
290
291         if self.is_initiator:
292             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
293         else:
294             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
295
296     def complete_dh_data(self):
297         self.dh_shared_secret = self.compute_secret()
298
299     def calc_child_keys(self, kex=False):
300         prf = self.ike_prf_alg.mod()
301         s = self.i_nonce + self.r_nonce
302         if kex:
303             s = self.dh_shared_secret + s
304         c = self.child_sas[0]
305
306         encr_key_len = self.esp_crypto_key_len
307         integ_key_len = self.esp_integ_alg.key_len
308         salt_len = 0 if integ_key_len else 4
309
310         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
311         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
312
313         pos = 0
314         c.sk_ei = keymat[pos : pos + encr_key_len]
315         pos += encr_key_len
316
317         if integ_key_len:
318             c.sk_ai = keymat[pos : pos + integ_key_len]
319             pos += integ_key_len
320         else:
321             c.salt_ei = keymat[pos : pos + salt_len]
322             pos += salt_len
323
324         c.sk_er = keymat[pos : pos + encr_key_len]
325         pos += encr_key_len
326
327         if integ_key_len:
328             c.sk_ar = keymat[pos : pos + integ_key_len]
329             pos += integ_key_len
330         else:
331             c.salt_er = keymat[pos : pos + salt_len]
332             pos += salt_len
333
334     def calc_prfplus(self, prf, key, seed, length):
335         r = b""
336         t = None
337         x = 1
338         while len(r) < length and x < 255:
339             if t is not None:
340                 s = t
341             else:
342                 s = b""
343             s = s + seed + bytes([x])
344             t = self.calc_prf(prf, key, s)
345             r = r + t
346             x = x + 1
347
348         if x == 255:
349             return None
350         return r
351
352     def calc_prf(self, prf, key, data):
353         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
354         h.update(data)
355         return h.finalize()
356
357     def calc_keys(self):
358         prf = self.ike_prf_alg.mod()
359         # SKEYSEED = prf(Ni | Nr, g^ir)
360         s = self.i_nonce + self.r_nonce
361         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
362
363         # calculate S = Ni | Nr | SPIi SPIr
364         s = s + self.ispi + self.rspi
365
366         prf_key_trunc = self.ike_prf_alg.trunc_len
367         encr_key_len = self.ike_crypto_key_len
368         tr_prf_key_len = self.ike_prf_alg.key_len
369         integ_key_len = self.ike_integ_alg.key_len
370         if integ_key_len == 0:
371             salt_size = 4
372         else:
373             salt_size = 0
374
375         l = (
376             prf_key_trunc
377             + integ_key_len * 2
378             + encr_key_len * 2
379             + tr_prf_key_len * 2
380             + salt_size * 2
381         )
382         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
383
384         pos = 0
385         self.sk_d = keymat[: pos + prf_key_trunc]
386         pos += prf_key_trunc
387
388         self.sk_ai = keymat[pos : pos + integ_key_len]
389         pos += integ_key_len
390         self.sk_ar = keymat[pos : pos + integ_key_len]
391         pos += integ_key_len
392
393         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
394         pos += encr_key_len + salt_size
395         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
396         pos += encr_key_len + salt_size
397
398         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
399         pos += tr_prf_key_len
400         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
401
402     def generate_authmsg(self, prf, packet):
403         if self.is_initiator:
404             id = self.i_id
405             nonce = self.r_nonce
406             key = self.sk_pi
407         else:
408             id = self.r_id
409             nonce = self.i_nonce
410             key = self.sk_pr
411         data = bytes([self.id_type, 0, 0, 0]) + id
412         id_hash = self.calc_prf(prf, key, data)
413         return packet + nonce + id_hash
414
415     def auth_init(self):
416         prf = self.ike_prf_alg.mod()
417         if self.is_initiator:
418             packet = self.init_req_packet
419         else:
420             packet = self.init_resp_packet
421         authmsg = self.generate_authmsg(prf, raw(packet))
422         if self.auth_method == "shared-key":
423             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
424             self.auth_data = self.calc_prf(prf, psk, authmsg)
425         elif self.auth_method == "rsa-sig":
426             self.auth_data = self.priv_key.sign(
427                 authmsg, padding.PKCS1v15(), hashes.SHA1()
428             )
429         else:
430             raise TypeError("unknown auth method type!")
431
432     def encrypt(self, data, aad=None):
433         data = self.ike_crypto_alg.pad(data)
434         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
435
436     @property
437     def peer_authkey(self):
438         if self.is_initiator:
439             return self.sk_ar
440         return self.sk_ai
441
442     @property
443     def my_authkey(self):
444         if self.is_initiator:
445             return self.sk_ai
446         return self.sk_ar
447
448     @property
449     def my_cryptokey(self):
450         if self.is_initiator:
451             return self.sk_ei
452         return self.sk_er
453
454     @property
455     def peer_cryptokey(self):
456         if self.is_initiator:
457             return self.sk_er
458         return self.sk_ei
459
460     def concat(self, alg, key_len):
461         return alg + "-" + str(key_len * 8)
462
463     @property
464     def vpp_ike_cypto_alg(self):
465         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
466
467     @property
468     def vpp_esp_cypto_alg(self):
469         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
470
471     def verify_hmac(self, ikemsg):
472         integ_trunc = self.ike_integ_alg.trunc_len
473         exp_hmac = ikemsg[-integ_trunc:]
474         data = ikemsg[:-integ_trunc]
475         computed_hmac = self.compute_hmac(
476             self.ike_integ_alg.mod(), self.peer_authkey, data
477         )
478         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
479
480     def compute_hmac(self, integ, key, data):
481         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
482         h.update(data)
483         return h.finalize()
484
485     def decrypt(self, data, aad=None, icv=None):
486         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
487
488     def hmac_and_decrypt(self, ike):
489         ep = ike[ikev2.IKEv2_payload_Encrypted]
490         if self.ike_crypto == "AES-GCM-16ICV":
491             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
492             ct = ep.load[:-GCM_ICV_SIZE]
493             tag = ep.load[-GCM_ICV_SIZE:]
494             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
495         else:
496             self.verify_hmac(raw(ike))
497             integ_trunc = self.ike_integ_alg.trunc_len
498
499             # remove ICV and decrypt payload
500             ct = ep.load[:-integ_trunc]
501             plain = self.decrypt(ct)
502         # remove padding
503         pad_len = plain[-1]
504         return plain[: -pad_len - 1]
505
506     def build_ts_addr(self, ts, version):
507         return {
508             "starting_address_v" + version: ts["start_addr"],
509             "ending_address_v" + version: ts["end_addr"],
510         }
511
512     def generate_ts(self, is_ip4):
513         c = self.child_sas[0]
514         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
515         if is_ip4:
516             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
517             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
518             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
519             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
520         else:
521             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
522             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
523             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
524             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
525
526         if self.is_initiator:
527             return ([ts1], [ts2])
528         return ([ts2], [ts1])
529
530     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
531         if crypto not in CRYPTO_ALGOS:
532             raise TypeError("unsupported encryption algo %r" % crypto)
533         self.ike_crypto = crypto
534         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
535         self.ike_crypto_key_len = crypto_key_len
536
537         if integ not in AUTH_ALGOS:
538             raise TypeError("unsupported auth algo %r" % integ)
539         self.ike_integ = None if integ == "NULL" else integ
540         self.ike_integ_alg = AUTH_ALGOS[integ]
541
542         if prf not in PRF_ALGOS:
543             raise TypeError("unsupported prf algo %r" % prf)
544         self.ike_prf = prf
545         self.ike_prf_alg = PRF_ALGOS[prf]
546         self.ike_dh = dh
547         self.ike_group = DH[self.ike_dh]
548
549     def set_esp_props(self, crypto, crypto_key_len, integ):
550         self.esp_crypto_key_len = crypto_key_len
551         if crypto not in CRYPTO_ALGOS:
552             raise TypeError("unsupported encryption algo %r" % crypto)
553         self.esp_crypto = crypto
554         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
555
556         if integ not in AUTH_ALGOS:
557             raise TypeError("unsupported auth algo %r" % integ)
558         self.esp_integ = None if integ == "NULL" else integ
559         self.esp_integ_alg = AUTH_ALGOS[integ]
560
561     def crypto_attr(self, key_len):
562         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
563             return (0x800E << 16 | key_len << 3, 12)
564         else:
565             raise Exception("unsupported attribute type")
566
567     def ike_crypto_attr(self):
568         return self.crypto_attr(self.ike_crypto_key_len)
569
570     def esp_crypto_attr(self):
571         return self.crypto_attr(self.esp_crypto_key_len)
572
573     def compute_nat_sha1(self, ip, port, rspi=None):
574         if rspi is None:
575             rspi = self.rspi
576         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
577         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
578         digest.update(data)
579         return digest.finalize()
580
581
582 class IkePeer(VppTestCase):
583     """common class for initiator and responder"""
584
585     @classmethod
586     def setUpClass(cls):
587         import scapy.contrib.ikev2 as _ikev2
588
589         globals()["ikev2"] = _ikev2
590         super(IkePeer, cls).setUpClass()
591         cls.create_pg_interfaces(range(2))
592         for i in cls.pg_interfaces:
593             i.admin_up()
594             i.config_ip4()
595             i.resolve_arp()
596             i.config_ip6()
597             i.resolve_ndp()
598
599     @classmethod
600     def tearDownClass(cls):
601         super(IkePeer, cls).tearDownClass()
602
603     def tearDown(self):
604         super(IkePeer, self).tearDown()
605         if self.del_sa_from_responder:
606             self.initiate_del_sa_from_responder()
607         else:
608             self.initiate_del_sa_from_initiator()
609         r = self.vapi.ikev2_sa_dump()
610         self.assertEqual(len(r), 0)
611         sas = self.vapi.ipsec_sa_dump()
612         self.assertEqual(len(sas), 0)
613         self.p.remove_vpp_config()
614         self.assertIsNone(self.p.query_vpp_config())
615
616     def setUp(self):
617         super(IkePeer, self).setUp()
618         self.config_tc()
619         self.p.add_vpp_config()
620         self.assertIsNotNone(self.p.query_vpp_config())
621         if self.sa.is_initiator:
622             self.sa.generate_dh_data()
623         self.vapi.cli("ikev2 set logging level 4")
624         self.vapi.cli("event-lo clear")
625
626     def assert_counter(self, count, name, version="ip4"):
627         node_name = "/err/ikev2-%s/" % version + name
628         self.assertEqual(count, self.statistics.get_err_counter(node_name))
629
630     def create_rekey_request(self, kex=False):
631         sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
632         header = ikev2.IKEv2(
633             init_SPI=self.sa.ispi,
634             resp_SPI=self.sa.rspi,
635             id=self.sa.new_msg_id(),
636             flags="Initiator",
637             exch_type="CREATE_CHILD_SA",
638         )
639
640         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
641         return self.create_packet(
642             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
643         )
644
645     def create_empty_request(self):
646         header = ikev2.IKEv2(
647             init_SPI=self.sa.ispi,
648             resp_SPI=self.sa.rspi,
649             id=self.sa.new_msg_id(),
650             flags="Initiator",
651             exch_type="INFORMATIONAL",
652             next_payload="Encrypted",
653         )
654
655         msg = self.encrypt_ike_msg(header, b"", None)
656         return self.create_packet(
657             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
658         )
659
660     def create_packet(
661         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
662     ):
663         if use_ip6:
664             src_ip = src_if.remote_ip6
665             dst_ip = src_if.local_ip6
666             ip_layer = IPv6
667         else:
668             src_ip = src_if.remote_ip4
669             dst_ip = src_if.local_ip4
670             ip_layer = IP
671         res = (
672             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
673             / ip_layer(src=src_ip, dst=dst_ip)
674             / UDP(sport=sport, dport=dport)
675         )
676         if natt:
677             # insert non ESP marker
678             res = res / Raw(b"\x00" * 4)
679         return res / msg
680
681     def verify_udp(self, udp):
682         self.assertEqual(udp.sport, self.sa.sport)
683         self.assertEqual(udp.dport, self.sa.dport)
684
685     def get_ike_header(self, packet):
686         try:
687             ih = packet[ikev2.IKEv2]
688             ih = self.verify_and_remove_non_esp_marker(ih)
689         except IndexError as e:
690             # this is a workaround for getting IKEv2 layer as both ikev2 and
691             # ipsec register for port 4500
692             esp = packet[ESP]
693             ih = self.verify_and_remove_non_esp_marker(esp)
694         self.assertEqual(ih.version, 0x20)
695         self.assertNotIn("Version", ih.flags)
696         return ih
697
698     def verify_and_remove_non_esp_marker(self, packet):
699         if self.sa.natt:
700             # if we are in nat traversal mode check for non esp marker
701             # and remove it
702             data = raw(packet)
703             self.assertEqual(data[:4], b"\x00" * 4)
704             return ikev2.IKEv2(data[4:])
705         else:
706             return packet
707
708     def encrypt_ike_msg(self, header, plain, first_payload):
709         if self.sa.ike_crypto == "AES-GCM-16ICV":
710             data = self.sa.ike_crypto_alg.pad(raw(plain))
711             plen = (
712                 len(data)
713                 + GCM_IV_SIZE
714                 + GCM_ICV_SIZE
715                 + len(ikev2.IKEv2_payload_Encrypted())
716             )
717             tlen = plen + len(ikev2.IKEv2())
718
719             # prepare aad data
720             sk_p = ikev2.IKEv2_payload_Encrypted(
721                 next_payload=first_payload, length=plen
722             )
723             header.length = tlen
724             res = header / sk_p
725             encr = self.sa.encrypt(raw(plain), raw(res))
726             sk_p = ikev2.IKEv2_payload_Encrypted(
727                 next_payload=first_payload, length=plen, load=encr
728             )
729             res = header / sk_p
730         else:
731             encr = self.sa.encrypt(raw(plain))
732             trunc_len = self.sa.ike_integ_alg.trunc_len
733             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
734             tlen = plen + len(ikev2.IKEv2())
735
736             sk_p = ikev2.IKEv2_payload_Encrypted(
737                 next_payload=first_payload, length=plen, load=encr
738             )
739             header.length = tlen
740             res = header / sk_p
741
742             integ_data = raw(res)
743             hmac_data = self.sa.compute_hmac(
744                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
745             )
746             res = res / Raw(hmac_data[:trunc_len])
747         assert len(res) == tlen
748         return res
749
750     def verify_udp_encap(self, ipsec_sa):
751         e = VppEnum.vl_api_ipsec_sad_flags_t
752         if self.sa.udp_encap or self.sa.natt:
753             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
754         else:
755             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
756
757     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
758         sas = self.vapi.ipsec_sa_dump()
759         if sa_count is None:
760             if is_rekey:
761                 # after rekey there is a short period of time in which old
762                 # inbound SA is still present
763                 sa_count = 3
764             else:
765                 sa_count = 2
766         self.assertEqual(len(sas), sa_count)
767         if self.sa.is_initiator:
768             if is_rekey:
769                 sa0 = sas[0].entry
770                 sa1 = sas[2].entry
771             else:
772                 sa0 = sas[0].entry
773                 sa1 = sas[1].entry
774         else:
775             if is_rekey:
776                 sa0 = sas[2].entry
777                 sa1 = sas[0].entry
778             else:
779                 sa1 = sas[0].entry
780                 sa0 = sas[1].entry
781
782         c = self.sa.child_sas[0]
783
784         self.verify_udp_encap(sa0)
785         self.verify_udp_encap(sa1)
786         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
787         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
788         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
789
790         if self.sa.esp_integ is None:
791             vpp_integ_alg = 0
792         else:
793             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
794         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
795         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
796
797         # verify crypto keys
798         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
799         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
800         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
801         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
802
803         # verify integ keys
804         if vpp_integ_alg:
805             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
806             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
807             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
808             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
809         else:
810             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
811             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
812
813     def verify_keymat(self, api_keys, keys, name):
814         km = getattr(keys, name)
815         api_km = getattr(api_keys, name)
816         api_km_len = getattr(api_keys, name + "_len")
817         self.assertEqual(len(km), api_km_len)
818         self.assertEqual(km, api_km[:api_km_len])
819
820     def verify_id(self, api_id, exp_id):
821         self.assertEqual(api_id.type, IDType.value(exp_id.type))
822         self.assertEqual(api_id.data_len, exp_id.data_len)
823         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
824
825     def verify_ike_sas(self):
826         r = self.vapi.ikev2_sa_dump()
827         self.assertEqual(len(r), 1)
828         sa = r[0].sa
829         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
830         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
831         if self.ip6:
832             if self.sa.is_initiator:
833                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
834                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
835             else:
836                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
837                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
838         else:
839             if self.sa.is_initiator:
840                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
841                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
842             else:
843                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
844                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
845         self.verify_keymat(sa.keys, self.sa, "sk_d")
846         self.verify_keymat(sa.keys, self.sa, "sk_ai")
847         self.verify_keymat(sa.keys, self.sa, "sk_ar")
848         self.verify_keymat(sa.keys, self.sa, "sk_ei")
849         self.verify_keymat(sa.keys, self.sa, "sk_er")
850         self.verify_keymat(sa.keys, self.sa, "sk_pi")
851         self.verify_keymat(sa.keys, self.sa, "sk_pr")
852
853         self.assertEqual(sa.i_id.type, self.sa.id_type)
854         self.assertEqual(sa.r_id.type, self.sa.id_type)
855         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
856         self.assertEqual(sa.r_id.data_len, len(self.idr))
857         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
858         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
859
860         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
861         self.assertEqual(len(r), 1)
862         csa = r[0].child_sa
863         self.assertEqual(csa.sa_index, sa.sa_index)
864         c = self.sa.child_sas[0]
865         if hasattr(c, "sk_ai"):
866             self.verify_keymat(csa.keys, c, "sk_ai")
867             self.verify_keymat(csa.keys, c, "sk_ar")
868         self.verify_keymat(csa.keys, c, "sk_ei")
869         self.verify_keymat(csa.keys, c, "sk_er")
870         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
871         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
872
873         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
874         tsi = tsi[0]
875         tsr = tsr[0]
876         r = self.vapi.ikev2_traffic_selector_dump(
877             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
878         )
879         self.assertEqual(len(r), 1)
880         ts = r[0].ts
881         self.verify_ts(r[0].ts, tsi[0], True)
882
883         r = self.vapi.ikev2_traffic_selector_dump(
884             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
885         )
886         self.assertEqual(len(r), 1)
887         self.verify_ts(r[0].ts, tsr[0], False)
888
889         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
890         self.verify_nonce(n, self.sa.i_nonce)
891         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
892         self.verify_nonce(n, self.sa.r_nonce)
893
894     def verify_nonce(self, api_nonce, nonce):
895         self.assertEqual(api_nonce.data_len, len(nonce))
896         self.assertEqual(api_nonce.nonce, nonce)
897
898     def verify_ts(self, api_ts, ts, is_initiator):
899         if is_initiator:
900             self.assertTrue(api_ts.is_local)
901         else:
902             self.assertFalse(api_ts.is_local)
903
904         if self.p.ts_is_ip4:
905             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
906             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
907         else:
908             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
909             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
910         self.assertEqual(api_ts.start_port, ts.start_port)
911         self.assertEqual(api_ts.end_port, ts.end_port)
912         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
913
914
915 class TemplateInitiator(IkePeer):
916     """initiator test template"""
917
918     def initiate_del_sa_from_initiator(self):
919         ispi = int.from_bytes(self.sa.ispi, "little")
920         self.pg0.enable_capture()
921         self.pg_start()
922         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
923         capture = self.pg0.get_capture(1)
924         ih = self.get_ike_header(capture[0])
925         self.assertNotIn("Response", ih.flags)
926         self.assertIn("Initiator", ih.flags)
927         self.assertEqual(ih.init_SPI, self.sa.ispi)
928         self.assertEqual(ih.resp_SPI, self.sa.rspi)
929         plain = self.sa.hmac_and_decrypt(ih)
930         d = ikev2.IKEv2_payload_Delete(plain)
931         self.assertEqual(d.proto, 1)  # proto=IKEv2
932         header = ikev2.IKEv2(
933             init_SPI=self.sa.ispi,
934             resp_SPI=self.sa.rspi,
935             flags="Response",
936             exch_type="INFORMATIONAL",
937             id=ih.id,
938             next_payload="Encrypted",
939         )
940         resp = self.encrypt_ike_msg(header, b"", None)
941         self.send_and_assert_no_replies(self.pg0, resp)
942
943     def verify_del_sa(self, packet):
944         ih = self.get_ike_header(packet)
945         self.assertEqual(ih.id, self.sa.msg_id)
946         self.assertEqual(ih.exch_type, 37)  # exchange informational
947         self.assertIn("Response", ih.flags)
948         self.assertIn("Initiator", ih.flags)
949         plain = self.sa.hmac_and_decrypt(ih)
950         self.assertEqual(plain, b"")
951
952     def initiate_del_sa_from_responder(self):
953         header = ikev2.IKEv2(
954             init_SPI=self.sa.ispi,
955             resp_SPI=self.sa.rspi,
956             exch_type="INFORMATIONAL",
957             id=self.sa.new_msg_id(),
958         )
959         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
960         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
961         packet = self.create_packet(
962             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
963         )
964         self.pg0.add_stream(packet)
965         self.pg0.enable_capture()
966         self.pg_start()
967         capture = self.pg0.get_capture(1)
968         self.verify_del_sa(capture[0])
969
970     @staticmethod
971     def find_notify_payload(packet, notify_type):
972         n = packet[ikev2.IKEv2_payload_Notify]
973         while n is not None:
974             if n.type == notify_type:
975                 return n
976             n = n.payload
977         return None
978
979     def verify_nat_detection(self, packet):
980         if self.ip6:
981             iph = packet[IPv6]
982         else:
983             iph = packet[IP]
984         udp = packet[UDP]
985
986         # NAT_DETECTION_SOURCE_IP
987         s = self.find_notify_payload(packet, 16388)
988         self.assertIsNotNone(s)
989         src_sha = self.sa.compute_nat_sha1(
990             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
991         )
992         self.assertEqual(s.load, src_sha)
993
994         # NAT_DETECTION_DESTINATION_IP
995         s = self.find_notify_payload(packet, 16389)
996         self.assertIsNotNone(s)
997         dst_sha = self.sa.compute_nat_sha1(
998             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
999         )
1000         self.assertEqual(s.load, dst_sha)
1001
1002     def verify_sa_init_request(self, packet):
1003         udp = packet[UDP]
1004         self.sa.dport = udp.sport
1005         ih = packet[ikev2.IKEv2]
1006         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1007         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1008         self.sa.ispi = ih.init_SPI
1009         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1010         self.assertIn("Initiator", ih.flags)
1011         self.assertNotIn("Response", ih.flags)
1012         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1013         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1014
1015         prop = packet[ikev2.IKEv2_payload_Proposal]
1016         self.assertEqual(prop.proto, 1)  # proto = ikev2
1017         self.assertEqual(prop.proposal, 1)
1018         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1019         self.assertEqual(
1020             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1021         )
1022         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1023         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1024         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1025         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1026
1027         self.verify_nat_detection(packet)
1028         self.sa.set_ike_props(
1029             crypto="AES-GCM-16ICV",
1030             crypto_key_len=32,
1031             integ="NULL",
1032             prf="PRF_HMAC_SHA2_256",
1033             dh="3072MODPgr",
1034         )
1035         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1036         self.sa.generate_dh_data()
1037         self.sa.complete_dh_data()
1038         self.sa.calc_keys()
1039
1040     def update_esp_transforms(self, trans, sa):
1041         while trans:
1042             if trans.transform_type == 1:  # ecryption
1043                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1044             elif trans.transform_type == 3:  # integrity
1045                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1046             trans = trans.payload
1047
1048     def verify_sa_auth_req(self, packet):
1049         udp = packet[UDP]
1050         self.sa.dport = udp.sport
1051         ih = self.get_ike_header(packet)
1052         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1053         self.assertEqual(ih.init_SPI, self.sa.ispi)
1054         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1055         self.assertIn("Initiator", ih.flags)
1056         self.assertNotIn("Response", ih.flags)
1057
1058         udp = packet[UDP]
1059         self.verify_udp(udp)
1060         self.assertEqual(ih.id, self.sa.msg_id + 1)
1061         self.sa.msg_id += 1
1062         plain = self.sa.hmac_and_decrypt(ih)
1063         idi = ikev2.IKEv2_payload_IDi(plain)
1064         self.assertEqual(idi.load, self.sa.i_id)
1065         if self.no_idr_auth:
1066             self.assertEqual(idi.next_payload, 39)  # AUTH
1067         else:
1068             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1069             self.assertEqual(idr.load, self.sa.r_id)
1070         prop = idi[ikev2.IKEv2_payload_Proposal]
1071         c = self.sa.child_sas[0]
1072         c.ispi = prop.SPI
1073         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1074
1075     def send_init_response(self):
1076         tr_attr = self.sa.ike_crypto_attr()
1077         trans = (
1078             ikev2.IKEv2_payload_Transform(
1079                 transform_type="Encryption",
1080                 transform_id=self.sa.ike_crypto,
1081                 length=tr_attr[1],
1082                 key_length=tr_attr[0],
1083             )
1084             / ikev2.IKEv2_payload_Transform(
1085                 transform_type="Integrity", transform_id=self.sa.ike_integ
1086             )
1087             / ikev2.IKEv2_payload_Transform(
1088                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1089             )
1090             / ikev2.IKEv2_payload_Transform(
1091                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1092             )
1093         )
1094         props = ikev2.IKEv2_payload_Proposal(
1095             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1096         )
1097
1098         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1099         if self.sa.natt:
1100             dst_address = b"\x0a\x0a\x0a\x0a"
1101         else:
1102             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1103         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1104         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1105
1106         self.sa.init_resp_packet = (
1107             ikev2.IKEv2(
1108                 init_SPI=self.sa.ispi,
1109                 resp_SPI=self.sa.rspi,
1110                 exch_type="IKE_SA_INIT",
1111                 flags="Response",
1112             )
1113             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1114             / ikev2.IKEv2_payload_KE(
1115                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1116             )
1117             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1118             / ikev2.IKEv2_payload_Notify(
1119                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1120             )
1121             / ikev2.IKEv2_payload_Notify(
1122                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1123             )
1124         )
1125
1126         ike_msg = self.create_packet(
1127             self.pg0,
1128             self.sa.init_resp_packet,
1129             self.sa.sport,
1130             self.sa.dport,
1131             False,
1132             self.ip6,
1133         )
1134         self.pg_send(self.pg0, ike_msg)
1135         capture = self.pg0.get_capture(1)
1136         self.verify_sa_auth_req(capture[0])
1137
1138     def initiate_sa_init(self):
1139         self.pg0.enable_capture()
1140         self.pg_start()
1141         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1142
1143         capture = self.pg0.get_capture(1)
1144         self.verify_sa_init_request(capture[0])
1145         self.send_init_response()
1146
1147     def send_auth_response(self):
1148         tr_attr = self.sa.esp_crypto_attr()
1149         trans = (
1150             ikev2.IKEv2_payload_Transform(
1151                 transform_type="Encryption",
1152                 transform_id=self.sa.esp_crypto,
1153                 length=tr_attr[1],
1154                 key_length=tr_attr[0],
1155             )
1156             / ikev2.IKEv2_payload_Transform(
1157                 transform_type="Integrity", transform_id=self.sa.esp_integ
1158             )
1159             / ikev2.IKEv2_payload_Transform(
1160                 transform_type="Extended Sequence Number", transform_id="No ESN"
1161             )
1162             / ikev2.IKEv2_payload_Transform(
1163                 transform_type="Extended Sequence Number", transform_id="ESN"
1164             )
1165         )
1166
1167         c = self.sa.child_sas[0]
1168         props = ikev2.IKEv2_payload_Proposal(
1169             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1170         )
1171
1172         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1173         plain = (
1174             ikev2.IKEv2_payload_IDi(
1175                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1176             )
1177             / ikev2.IKEv2_payload_IDr(
1178                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1179             )
1180             / ikev2.IKEv2_payload_AUTH(
1181                 next_payload="SA",
1182                 auth_type=AuthMethod.value(self.sa.auth_method),
1183                 load=self.sa.auth_data,
1184             )
1185             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1186             / ikev2.IKEv2_payload_TSi(
1187                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1188             )
1189             / ikev2.IKEv2_payload_TSr(
1190                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1191             )
1192             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1193         )
1194
1195         header = ikev2.IKEv2(
1196             init_SPI=self.sa.ispi,
1197             resp_SPI=self.sa.rspi,
1198             id=self.sa.new_msg_id(),
1199             flags="Response",
1200             exch_type="IKE_AUTH",
1201         )
1202
1203         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1204         packet = self.create_packet(
1205             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1206         )
1207         self.pg_send(self.pg0, packet)
1208
1209     def test_initiator(self):
1210         self.initiate_sa_init()
1211         self.sa.auth_init()
1212         self.sa.calc_child_keys()
1213         self.send_auth_response()
1214         self.verify_ike_sas()
1215
1216
1217 class TemplateResponder(IkePeer):
1218     """responder test template"""
1219
1220     def initiate_del_sa_from_responder(self):
1221         self.pg0.enable_capture()
1222         self.pg_start()
1223         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1224         capture = self.pg0.get_capture(1)
1225         ih = self.get_ike_header(capture[0])
1226         self.assertNotIn("Response", ih.flags)
1227         self.assertNotIn("Initiator", ih.flags)
1228         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1229         plain = self.sa.hmac_and_decrypt(ih)
1230         d = ikev2.IKEv2_payload_Delete(plain)
1231         self.assertEqual(d.proto, 1)  # proto=IKEv2
1232         self.assertEqual(ih.init_SPI, self.sa.ispi)
1233         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1234         header = ikev2.IKEv2(
1235             init_SPI=self.sa.ispi,
1236             resp_SPI=self.sa.rspi,
1237             flags="Initiator+Response",
1238             exch_type="INFORMATIONAL",
1239             id=ih.id,
1240             next_payload="Encrypted",
1241         )
1242         resp = self.encrypt_ike_msg(header, b"", None)
1243         self.send_and_assert_no_replies(self.pg0, resp)
1244
1245     def verify_del_sa(self, packet):
1246         ih = self.get_ike_header(packet)
1247         self.assertEqual(ih.id, self.sa.msg_id)
1248         self.assertEqual(ih.exch_type, 37)  # exchange informational
1249         self.assertIn("Response", ih.flags)
1250         self.assertNotIn("Initiator", ih.flags)
1251         self.assertEqual(ih.next_payload, 46)  # Encrypted
1252         self.assertEqual(ih.init_SPI, self.sa.ispi)
1253         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1254         plain = self.sa.hmac_and_decrypt(ih)
1255         self.assertEqual(plain, b"")
1256
1257     def initiate_del_sa_from_initiator(self):
1258         header = ikev2.IKEv2(
1259             init_SPI=self.sa.ispi,
1260             resp_SPI=self.sa.rspi,
1261             flags="Initiator",
1262             exch_type="INFORMATIONAL",
1263             id=self.sa.new_msg_id(),
1264         )
1265         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1266         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1267         packet = self.create_packet(
1268             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1269         )
1270         self.pg0.add_stream(packet)
1271         self.pg0.enable_capture()
1272         self.pg_start()
1273         capture = self.pg0.get_capture(1)
1274         self.verify_del_sa(capture[0])
1275
1276     def send_sa_init_req(self):
1277         tr_attr = self.sa.ike_crypto_attr()
1278         trans = (
1279             ikev2.IKEv2_payload_Transform(
1280                 transform_type="Encryption",
1281                 transform_id=self.sa.ike_crypto,
1282                 length=tr_attr[1],
1283                 key_length=tr_attr[0],
1284             )
1285             / ikev2.IKEv2_payload_Transform(
1286                 transform_type="Integrity", transform_id=self.sa.ike_integ
1287             )
1288             / ikev2.IKEv2_payload_Transform(
1289                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1290             )
1291             / ikev2.IKEv2_payload_Transform(
1292                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1293             )
1294         )
1295
1296         props = ikev2.IKEv2_payload_Proposal(
1297             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1298         )
1299
1300         next_payload = None if self.ip6 else "Notify"
1301
1302         self.sa.init_req_packet = (
1303             ikev2.IKEv2(
1304                 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1305             )
1306             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1307             / ikev2.IKEv2_payload_KE(
1308                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1309             )
1310             / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1311         )
1312
1313         if not self.ip6:
1314             if self.sa.i_natt:
1315                 src_address = b"\x0a\x0a\x0a\x01"
1316             else:
1317                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1318
1319             if self.sa.r_natt:
1320                 dst_address = b"\x0a\x0a\x0a\x0a"
1321             else:
1322                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1323
1324             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1325             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1326             nat_src_detection = ikev2.IKEv2_payload_Notify(
1327                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1328             )
1329             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1330                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1331             )
1332             self.sa.init_req_packet = (
1333                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1334             )
1335
1336         ike_msg = self.create_packet(
1337             self.pg0,
1338             self.sa.init_req_packet,
1339             self.sa.sport,
1340             self.sa.dport,
1341             self.sa.natt,
1342             self.ip6,
1343         )
1344         self.pg0.add_stream(ike_msg)
1345         self.pg0.enable_capture()
1346         self.pg_start()
1347         capture = self.pg0.get_capture(1)
1348         self.verify_sa_init(capture[0])
1349
1350     def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1351         tr_attr = self.sa.esp_crypto_attr()
1352         last_payload = last_payload or "Notify"
1353         trans_nb = 4
1354         trans = (
1355             ikev2.IKEv2_payload_Transform(
1356                 transform_type="Encryption",
1357                 transform_id=self.sa.esp_crypto,
1358                 length=tr_attr[1],
1359                 key_length=tr_attr[0],
1360             )
1361             / ikev2.IKEv2_payload_Transform(
1362                 transform_type="Integrity", transform_id=self.sa.esp_integ
1363             )
1364             / ikev2.IKEv2_payload_Transform(
1365                 transform_type="Extended Sequence Number", transform_id="No ESN"
1366             )
1367             / ikev2.IKEv2_payload_Transform(
1368                 transform_type="Extended Sequence Number", transform_id="ESN"
1369             )
1370         )
1371
1372         if kex:
1373             trans_nb += 1
1374             trans /= ikev2.IKEv2_payload_Transform(
1375                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1376             )
1377
1378         c = self.sa.child_sas[0]
1379         props = ikev2.IKEv2_payload_Proposal(
1380             proposal=1,
1381             proto="ESP",
1382             SPIsize=4,
1383             SPI=c.ispi,
1384             trans_nb=trans_nb,
1385             trans=trans,
1386         )
1387
1388         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1389         plain = (
1390             ikev2.IKEv2_payload_AUTH(
1391                 next_payload="SA",
1392                 auth_type=AuthMethod.value(self.sa.auth_method),
1393                 load=self.sa.auth_data,
1394             )
1395             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1396             / ikev2.IKEv2_payload_TSi(
1397                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1398             )
1399             / ikev2.IKEv2_payload_TSr(
1400                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1401             )
1402         )
1403
1404         if is_rekey:
1405             first_payload = "Nonce"
1406             if kex:
1407                 head = ikev2.IKEv2_payload_Nonce(
1408                     load=self.sa.i_nonce, next_payload="KE"
1409                 ) / ikev2.IKEv2_payload_KE(
1410                     group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1411                 )
1412             else:
1413                 head = ikev2.IKEv2_payload_Nonce(
1414                     load=self.sa.i_nonce, next_payload="SA"
1415                 )
1416             plain = (
1417                 head
1418                 / plain
1419                 / ikev2.IKEv2_payload_Notify(
1420                     type="REKEY_SA",
1421                     proto="ESP",
1422                     SPI=c.ispi,
1423                     length=8 + len(c.ispi),
1424                     next_payload="Notify",
1425                 )
1426                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1427             )
1428         else:
1429             first_payload = "IDi"
1430             if self.no_idr_auth:
1431                 ids = ikev2.IKEv2_payload_IDi(
1432                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1433                 )
1434             else:
1435                 ids = ikev2.IKEv2_payload_IDi(
1436                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1437                 ) / ikev2.IKEv2_payload_IDr(
1438                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1439                 )
1440             plain = ids / plain
1441         return plain, first_payload
1442
1443     def send_sa_auth(self):
1444         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1445         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1446         header = ikev2.IKEv2(
1447             init_SPI=self.sa.ispi,
1448             resp_SPI=self.sa.rspi,
1449             id=self.sa.new_msg_id(),
1450             flags="Initiator",
1451             exch_type="IKE_AUTH",
1452         )
1453
1454         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1455         packet = self.create_packet(
1456             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1457         )
1458         self.pg0.add_stream(packet)
1459         self.pg0.enable_capture()
1460         self.pg_start()
1461         capture = self.pg0.get_capture(1)
1462         self.verify_sa_auth_resp(capture[0])
1463
1464     def verify_sa_init(self, packet):
1465         ih = self.get_ike_header(packet)
1466
1467         self.assertEqual(ih.id, self.sa.msg_id)
1468         self.assertEqual(ih.exch_type, 34)
1469         self.assertIn("Response", ih.flags)
1470         self.assertEqual(ih.init_SPI, self.sa.ispi)
1471         self.assertNotEqual(ih.resp_SPI, 0)
1472         self.sa.rspi = ih.resp_SPI
1473         try:
1474             sa = ih[ikev2.IKEv2_payload_SA]
1475             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1476             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1477         except IndexError as e:
1478             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1479             self.logger.error(ih.show())
1480             raise
1481         self.sa.complete_dh_data()
1482         self.sa.calc_keys()
1483         self.sa.auth_init()
1484
1485     def verify_sa_auth_resp(self, packet):
1486         ike = self.get_ike_header(packet)
1487         udp = packet[UDP]
1488         self.verify_udp(udp)
1489         self.assertEqual(ike.id, self.sa.msg_id)
1490         plain = self.sa.hmac_and_decrypt(ike)
1491         idr = ikev2.IKEv2_payload_IDr(plain)
1492         prop = idr[ikev2.IKEv2_payload_Proposal]
1493         self.assertEqual(prop.SPIsize, 4)
1494         self.sa.child_sas[0].rspi = prop.SPI
1495         self.sa.calc_child_keys()
1496
1497     IKE_NODE_SUFFIX = "ip4"
1498
1499     def verify_counters(self):
1500         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1501         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1502         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1503
1504         r = self.vapi.ikev2_sa_dump()
1505         s = r[0].sa.stats
1506         self.assertEqual(1, s.n_sa_auth_req)
1507         self.assertEqual(1, s.n_sa_init_req)
1508
1509     def test_responder(self):
1510         self.send_sa_init_req()
1511         self.send_sa_auth()
1512         self.verify_ipsec_sas()
1513         self.verify_ike_sas()
1514         self.verify_counters()
1515
1516
1517 class Ikev2Params(object):
1518     def config_params(self, params={}):
1519         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1520         ei = VppEnum.vl_api_ipsec_integ_alg_t
1521         self.vpp_enums = {
1522             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1523             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1524             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1525             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1526             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1527             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1528             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1529             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1530             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1531             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1532         }
1533
1534         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1535         if dpd_disabled:
1536             self.vapi.cli("ikev2 dpd disable")
1537         self.del_sa_from_responder = (
1538             False
1539             if "del_sa_from_responder" not in params
1540             else params["del_sa_from_responder"]
1541         )
1542         i_natt = False if "i_natt" not in params else params["i_natt"]
1543         r_natt = False if "r_natt" not in params else params["r_natt"]
1544         self.p = Profile(self, "pr1")
1545         self.ip6 = False if "ip6" not in params else params["ip6"]
1546
1547         if "auth" in params and params["auth"] == "rsa-sig":
1548             auth_method = "rsa-sig"
1549             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1550             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1551
1552             client_file = work_dir + params["client-cert"]
1553             server_pem = open(work_dir + params["server-cert"]).read()
1554             client_priv = open(work_dir + params["client-key"]).read()
1555             client_priv = load_pem_private_key(
1556                 str.encode(client_priv), None, default_backend()
1557             )
1558             self.peer_cert = x509.load_pem_x509_certificate(
1559                 str.encode(server_pem), default_backend()
1560             )
1561             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1562             auth_data = None
1563         else:
1564             auth_data = b"$3cr3tpa$$w0rd"
1565             self.p.add_auth(method="shared-key", data=auth_data)
1566             auth_method = "shared-key"
1567             client_priv = None
1568
1569         is_init = True if "is_initiator" not in params else params["is_initiator"]
1570         self.no_idr_auth = params.get("no_idr_in_auth", False)
1571
1572         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1573         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1574         r_id = self.idr = idr["data"]
1575         i_id = self.idi = idi["data"]
1576         if is_init:
1577             # scapy is initiator, VPP is responder
1578             self.p.add_local_id(**idr)
1579             self.p.add_remote_id(**idi)
1580             if self.no_idr_auth:
1581                 r_id = None
1582         else:
1583             # VPP is initiator, scapy is responder
1584             self.p.add_local_id(**idi)
1585             if not self.no_idr_auth:
1586                 self.p.add_remote_id(**idr)
1587
1588         loc_ts = (
1589             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1590             if "loc_ts" not in params
1591             else params["loc_ts"]
1592         )
1593         rem_ts = (
1594             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1595             if "rem_ts" not in params
1596             else params["rem_ts"]
1597         )
1598         self.p.add_local_ts(**loc_ts)
1599         self.p.add_remote_ts(**rem_ts)
1600         if "responder" in params:
1601             self.p.add_responder(params["responder"])
1602         if "ike_transforms" in params:
1603             self.p.add_ike_transforms(params["ike_transforms"])
1604         if "esp_transforms" in params:
1605             self.p.add_esp_transforms(params["esp_transforms"])
1606
1607         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1608         if udp_encap:
1609             self.p.set_udp_encap(True)
1610
1611         if "responder_hostname" in params:
1612             hn = params["responder_hostname"]
1613             self.p.add_responder_hostname(hn)
1614
1615             # configure static dns record
1616             self.vapi.dns_name_server_add_del(
1617                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1618             )
1619             self.vapi.dns_enable_disable(enable=1)
1620
1621             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1622             self.vapi.cli(cmd)
1623
1624         self.sa = IKEv2SA(
1625             self,
1626             i_id=i_id,
1627             r_id=r_id,
1628             is_initiator=is_init,
1629             id_type=self.p.local_id["id_type"],
1630             i_natt=i_natt,
1631             r_natt=r_natt,
1632             priv_key=client_priv,
1633             auth_method=auth_method,
1634             nonce=params.get("nonce"),
1635             auth_data=auth_data,
1636             udp_encap=udp_encap,
1637             local_ts=self.p.remote_ts,
1638             remote_ts=self.p.local_ts,
1639         )
1640
1641         if is_init:
1642             ike_crypto = (
1643                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1644             )
1645             ike_integ = (
1646                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1647             )
1648             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1649
1650             esp_crypto = (
1651                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1652             )
1653             esp_integ = (
1654                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1655             )
1656
1657             self.sa.set_ike_props(
1658                 crypto=ike_crypto[0],
1659                 crypto_key_len=ike_crypto[1],
1660                 integ=ike_integ,
1661                 prf="PRF_HMAC_SHA2_256",
1662                 dh=ike_dh,
1663             )
1664             self.sa.set_esp_props(
1665                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1666             )
1667
1668
1669 class TestApi(VppTestCase):
1670     """Test IKEV2 API"""
1671
1672     @classmethod
1673     def setUpClass(cls):
1674         super(TestApi, cls).setUpClass()
1675
1676     @classmethod
1677     def tearDownClass(cls):
1678         super(TestApi, cls).tearDownClass()
1679
1680     def tearDown(self):
1681         super(TestApi, self).tearDown()
1682         self.p1.remove_vpp_config()
1683         self.p2.remove_vpp_config()
1684         r = self.vapi.ikev2_profile_dump()
1685         self.assertEqual(len(r), 0)
1686
1687     def configure_profile(self, cfg):
1688         p = Profile(self, cfg["name"])
1689         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1690         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1691         p.add_local_ts(**cfg["loc_ts"])
1692         p.add_remote_ts(**cfg["rem_ts"])
1693         p.add_responder(cfg["responder"])
1694         p.add_ike_transforms(cfg["ike_ts"])
1695         p.add_esp_transforms(cfg["esp_ts"])
1696         p.add_auth(**cfg["auth"])
1697         p.set_udp_encap(cfg["udp_encap"])
1698         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1699         if "lifetime_data" in cfg:
1700             p.set_lifetime_data(cfg["lifetime_data"])
1701         if "tun_itf" in cfg:
1702             p.set_tunnel_interface(cfg["tun_itf"])
1703         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1704             p.disable_natt()
1705         p.add_vpp_config()
1706         return p
1707
1708     def test_profile_api(self):
1709         """test profile dump API"""
1710         loc_ts4 = {
1711             "proto": 8,
1712             "start_port": 1,
1713             "end_port": 19,
1714             "start_addr": "3.3.3.2",
1715             "end_addr": "3.3.3.3",
1716         }
1717         rem_ts4 = {
1718             "proto": 9,
1719             "start_port": 10,
1720             "end_port": 119,
1721             "start_addr": "4.5.76.80",
1722             "end_addr": "2.3.4.6",
1723         }
1724
1725         loc_ts6 = {
1726             "proto": 8,
1727             "start_port": 1,
1728             "end_port": 19,
1729             "start_addr": "ab::1",
1730             "end_addr": "ab::4",
1731         }
1732         rem_ts6 = {
1733             "proto": 9,
1734             "start_port": 10,
1735             "end_port": 119,
1736             "start_addr": "cd::12",
1737             "end_addr": "cd::13",
1738         }
1739
1740         conf = {
1741             "p1": {
1742                 "name": "p1",
1743                 "natt_disabled": True,
1744                 "loc_id": ("fqdn", b"vpp.home"),
1745                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1746                 "loc_ts": loc_ts4,
1747                 "rem_ts": rem_ts4,
1748                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1749                 "ike_ts": {
1750                     "crypto_alg": 20,
1751                     "crypto_key_size": 32,
1752                     "integ_alg": 0,
1753                     "dh_group": 1,
1754                 },
1755                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1756                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1757                 "udp_encap": True,
1758                 "ipsec_over_udp_port": 4501,
1759                 "lifetime_data": {
1760                     "lifetime": 123,
1761                     "lifetime_maxdata": 20192,
1762                     "lifetime_jitter": 9,
1763                     "handover": 132,
1764                 },
1765             },
1766             "p2": {
1767                 "name": "p2",
1768                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1769                 "rem_id": ("ip6-addr", b"abcd::1"),
1770                 "loc_ts": loc_ts6,
1771                 "rem_ts": rem_ts6,
1772                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1773                 "ike_ts": {
1774                     "crypto_alg": 12,
1775                     "crypto_key_size": 16,
1776                     "integ_alg": 3,
1777                     "dh_group": 3,
1778                 },
1779                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1780                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1781                 "udp_encap": False,
1782                 "ipsec_over_udp_port": 4600,
1783                 "tun_itf": 0,
1784             },
1785         }
1786         self.p1 = self.configure_profile(conf["p1"])
1787         self.p2 = self.configure_profile(conf["p2"])
1788
1789         r = self.vapi.ikev2_profile_dump()
1790         self.assertEqual(len(r), 2)
1791         self.verify_profile(r[0].profile, conf["p1"])
1792         self.verify_profile(r[1].profile, conf["p2"])
1793
1794     def verify_id(self, api_id, cfg_id):
1795         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1796         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1797
1798     def verify_ts(self, api_ts, cfg_ts):
1799         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1800         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1801         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1802         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1803         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1804
1805     def verify_responder(self, api_r, cfg_r):
1806         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1807         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1808
1809     def verify_transforms(self, api_ts, cfg_ts):
1810         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1811         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1812         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1813
1814     def verify_ike_transforms(self, api_ts, cfg_ts):
1815         self.verify_transforms(api_ts, cfg_ts)
1816         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1817
1818     def verify_esp_transforms(self, api_ts, cfg_ts):
1819         self.verify_transforms(api_ts, cfg_ts)
1820
1821     def verify_auth(self, api_auth, cfg_auth):
1822         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1823         self.assertEqual(api_auth.data, cfg_auth["data"])
1824         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1825
1826     def verify_lifetime_data(self, p, ld):
1827         self.assertEqual(p.lifetime, ld["lifetime"])
1828         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1829         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1830         self.assertEqual(p.handover, ld["handover"])
1831
1832     def verify_profile(self, ap, cp):
1833         self.assertEqual(ap.name, cp["name"])
1834         self.assertEqual(ap.udp_encap, cp["udp_encap"])
1835         self.verify_id(ap.loc_id, cp["loc_id"])
1836         self.verify_id(ap.rem_id, cp["rem_id"])
1837         self.verify_ts(ap.loc_ts, cp["loc_ts"])
1838         self.verify_ts(ap.rem_ts, cp["rem_ts"])
1839         self.verify_responder(ap.responder, cp["responder"])
1840         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1841         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1842         self.verify_auth(ap.auth, cp["auth"])
1843         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1844         self.assertTrue(natt_dis == ap.natt_disabled)
1845
1846         if "lifetime_data" in cp:
1847             self.verify_lifetime_data(ap, cp["lifetime_data"])
1848         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1849         if "tun_itf" in cp:
1850             self.assertEqual(ap.tun_itf, cp["tun_itf"])
1851         else:
1852             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1853
1854
1855 @tag_fixme_vpp_workers
1856 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1857     """test responder - responder behind NAT"""
1858
1859     IKE_NODE_SUFFIX = "ip4-natt"
1860
1861     def config_tc(self):
1862         self.config_params({"r_natt": True})
1863
1864
1865 @tag_fixme_vpp_workers
1866 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1867     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1868
1869     def config_tc(self):
1870         self.config_params(
1871             {
1872                 "i_natt": True,
1873                 "is_initiator": False,  # seen from test case perspective
1874                 # thus vpp is initiator
1875                 "responder": {
1876                     "sw_if_index": self.pg0.sw_if_index,
1877                     "addr": self.pg0.remote_ip4,
1878                 },
1879                 "ike-crypto": ("AES-GCM-16ICV", 32),
1880                 "ike-integ": "NULL",
1881                 "ike-dh": "3072MODPgr",
1882                 "ike_transforms": {
1883                     "crypto_alg": 20,  # "aes-gcm-16"
1884                     "crypto_key_size": 256,
1885                     "dh_group": 15,  # "modp-3072"
1886                 },
1887                 "esp_transforms": {
1888                     "crypto_alg": 12,  # "aes-cbc"
1889                     "crypto_key_size": 256,
1890                     # "hmac-sha2-256-128"
1891                     "integ_alg": 12,
1892                 },
1893             }
1894         )
1895
1896
1897 @tag_fixme_vpp_workers
1898 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1899     """test ikev2 initiator - pre shared key auth"""
1900
1901     def config_tc(self):
1902         self.config_params(
1903             {
1904                 "is_initiator": False,  # seen from test case perspective
1905                 # thus vpp is initiator
1906                 "ike-crypto": ("AES-GCM-16ICV", 32),
1907                 "ike-integ": "NULL",
1908                 "ike-dh": "3072MODPgr",
1909                 "ike_transforms": {
1910                     "crypto_alg": 20,  # "aes-gcm-16"
1911                     "crypto_key_size": 256,
1912                     "dh_group": 15,  # "modp-3072"
1913                 },
1914                 "esp_transforms": {
1915                     "crypto_alg": 12,  # "aes-cbc"
1916                     "crypto_key_size": 256,
1917                     # "hmac-sha2-256-128"
1918                     "integ_alg": 12,
1919                 },
1920                 "responder_hostname": {
1921                     "hostname": "vpp.responder.org",
1922                     "sw_if_index": self.pg0.sw_if_index,
1923                 },
1924             }
1925         )
1926
1927
1928 @tag_fixme_vpp_workers
1929 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1930     """test initiator - request window size (1)"""
1931
1932     def rekey_respond(self, req, update_child_sa_data):
1933         ih = self.get_ike_header(req)
1934         plain = self.sa.hmac_and_decrypt(ih)
1935         sa = ikev2.IKEv2_payload_SA(plain)
1936         if update_child_sa_data:
1937             prop = sa[ikev2.IKEv2_payload_Proposal]
1938             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1939             self.sa.r_nonce = self.sa.i_nonce
1940             self.sa.child_sas[0].ispi = prop.SPI
1941             self.sa.child_sas[0].rspi = prop.SPI
1942             self.sa.calc_child_keys()
1943
1944         header = ikev2.IKEv2(
1945             init_SPI=self.sa.ispi,
1946             resp_SPI=self.sa.rspi,
1947             flags="Response",
1948             exch_type=36,
1949             id=ih.id,
1950             next_payload="Encrypted",
1951         )
1952         resp = self.encrypt_ike_msg(header, sa, "SA")
1953         packet = self.create_packet(
1954             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1955         )
1956         self.send_and_assert_no_replies(self.pg0, packet)
1957
1958     def test_initiator(self):
1959         super(TestInitiatorRequestWindowSize, self).test_initiator()
1960         self.pg0.enable_capture()
1961         self.pg_start()
1962         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1963         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1964         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1965         capture = self.pg0.get_capture(2)
1966
1967         # reply in reverse order
1968         self.rekey_respond(capture[1], True)
1969         self.rekey_respond(capture[0], False)
1970
1971         # verify that only the second request was accepted
1972         self.verify_ike_sas()
1973         self.verify_ipsec_sas(is_rekey=True)
1974
1975
1976 @tag_fixme_vpp_workers
1977 class TestInitiatorRekey(TestInitiatorPsk):
1978     """test ikev2 initiator - rekey"""
1979
1980     def rekey_from_initiator(self):
1981         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1982         self.pg0.enable_capture()
1983         self.pg_start()
1984         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1985         capture = self.pg0.get_capture(1)
1986         ih = self.get_ike_header(capture[0])
1987         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1988         self.assertNotIn("Response", ih.flags)
1989         self.assertIn("Initiator", ih.flags)
1990         plain = self.sa.hmac_and_decrypt(ih)
1991         sa = ikev2.IKEv2_payload_SA(plain)
1992         prop = sa[ikev2.IKEv2_payload_Proposal]
1993         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1994         self.sa.r_nonce = self.sa.i_nonce
1995         # update new responder SPI
1996         self.sa.child_sas[0].ispi = prop.SPI
1997         self.sa.child_sas[0].rspi = prop.SPI
1998         self.sa.calc_child_keys()
1999         header = ikev2.IKEv2(
2000             init_SPI=self.sa.ispi,
2001             resp_SPI=self.sa.rspi,
2002             flags="Response",
2003             exch_type=36,
2004             id=ih.id,
2005             next_payload="Encrypted",
2006         )
2007         resp = self.encrypt_ike_msg(header, sa, "SA")
2008         packet = self.create_packet(
2009             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2010         )
2011         self.send_and_assert_no_replies(self.pg0, packet)
2012
2013     def test_initiator(self):
2014         super(TestInitiatorRekey, self).test_initiator()
2015         self.rekey_from_initiator()
2016         self.verify_ike_sas()
2017         self.verify_ipsec_sas(is_rekey=True)
2018
2019
2020 @tag_fixme_vpp_workers
2021 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2022     """test ikev2 initiator - delete IKE SA from responder"""
2023
2024     def config_tc(self):
2025         self.config_params(
2026             {
2027                 "del_sa_from_responder": True,
2028                 "is_initiator": False,  # seen from test case perspective
2029                 # thus vpp is initiator
2030                 "responder": {
2031                     "sw_if_index": self.pg0.sw_if_index,
2032                     "addr": self.pg0.remote_ip4,
2033                 },
2034                 "ike-crypto": ("AES-GCM-16ICV", 32),
2035                 "ike-integ": "NULL",
2036                 "ike-dh": "3072MODPgr",
2037                 "ike_transforms": {
2038                     "crypto_alg": 20,  # "aes-gcm-16"
2039                     "crypto_key_size": 256,
2040                     "dh_group": 15,  # "modp-3072"
2041                 },
2042                 "esp_transforms": {
2043                     "crypto_alg": 12,  # "aes-cbc"
2044                     "crypto_key_size": 256,
2045                     # "hmac-sha2-256-128"
2046                     "integ_alg": 12,
2047                 },
2048                 "no_idr_in_auth": True,
2049             }
2050         )
2051
2052
2053 @tag_fixme_vpp_workers
2054 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2055     """test ikev2 responder - initiator behind NAT"""
2056
2057     IKE_NODE_SUFFIX = "ip4-natt"
2058
2059     def config_tc(self):
2060         self.config_params({"i_natt": True})
2061
2062
2063 @tag_fixme_vpp_workers
2064 class TestResponderPsk(TemplateResponder, Ikev2Params):
2065     """test ikev2 responder - pre shared key auth"""
2066
2067     def config_tc(self):
2068         self.config_params()
2069
2070
2071 @tag_fixme_vpp_workers
2072 class TestResponderDpd(TestResponderPsk):
2073     """
2074     Dead peer detection test
2075     """
2076
2077     def config_tc(self):
2078         self.config_params({"dpd_disabled": False})
2079
2080     def tearDown(self):
2081         pass
2082
2083     def test_responder(self):
2084         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2085         super(TestResponderDpd, self).test_responder()
2086         self.pg0.enable_capture()
2087         self.pg_start()
2088         # capture empty request but don't reply
2089         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2090         ih = self.get_ike_header(capture[0])
2091         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2092         plain = self.sa.hmac_and_decrypt(ih)
2093         self.assertEqual(plain, b"")
2094         # wait for SA expiration
2095         time.sleep(3)
2096         ike_sas = self.vapi.ikev2_sa_dump()
2097         self.assertEqual(len(ike_sas), 0)
2098         ipsec_sas = self.vapi.ipsec_sa_dump()
2099         self.assertEqual(len(ipsec_sas), 0)
2100
2101
2102 @tag_fixme_vpp_workers
2103 class TestResponderRekey(TestResponderPsk):
2104     """test ikev2 responder - rekey"""
2105
2106     WITH_KEX = False
2107
2108     def send_rekey_from_initiator(self):
2109         if self.WITH_KEX:
2110             self.sa.generate_dh_data()
2111         packet = self.create_rekey_request(kex=self.WITH_KEX)
2112         self.pg0.add_stream(packet)
2113         self.pg0.enable_capture()
2114         self.pg_start()
2115         capture = self.pg0.get_capture(1)
2116         return capture
2117
2118     def process_rekey_response(self, capture):
2119         ih = self.get_ike_header(capture[0])
2120         plain = self.sa.hmac_and_decrypt(ih)
2121         sa = ikev2.IKEv2_payload_SA(plain)
2122         prop = sa[ikev2.IKEv2_payload_Proposal]
2123         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2124         # update new responder SPI
2125         self.sa.child_sas[0].rspi = prop.SPI
2126         if self.WITH_KEX:
2127             self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2128             self.sa.complete_dh_data()
2129         self.sa.calc_child_keys(kex=self.WITH_KEX)
2130
2131     def test_responder(self):
2132         super(TestResponderRekey, self).test_responder()
2133         self.process_rekey_response(self.send_rekey_from_initiator())
2134         self.verify_ike_sas()
2135         self.verify_ipsec_sas(is_rekey=True)
2136         self.assert_counter(1, "rekey_req", "ip4")
2137         r = self.vapi.ikev2_sa_dump()
2138         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2139
2140
2141 @tag_fixme_vpp_workers
2142 class TestResponderRekeyRepeat(TestResponderRekey):
2143     """test ikev2 responder - rekey repeat"""
2144
2145     def test_responder(self):
2146         super(TestResponderRekeyRepeat, self).test_responder()
2147         # rekey request is not accepted until old IPsec SA is expired
2148         capture = self.send_rekey_from_initiator()
2149         ih = self.get_ike_header(capture[0])
2150         plain = self.sa.hmac_and_decrypt(ih)
2151         notify = ikev2.IKEv2_payload_Notify(plain)
2152         self.assertEqual(notify.type, 43)
2153         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2154         # rekey request is accepted after old IPsec SA was expired
2155         for _ in range(50):
2156             if len(self.vapi.ipsec_sa_dump()) != 3:
2157                 break
2158             time.sleep(0.2)
2159         else:
2160             self.fail("old IPsec SA not expired")
2161         self.process_rekey_response(self.send_rekey_from_initiator())
2162         self.verify_ike_sas()
2163         self.verify_ipsec_sas(sa_count=3)
2164
2165
2166 @tag_fixme_vpp_workers
2167 class TestResponderRekeyKEX(TestResponderRekey):
2168     """test ikev2 responder - rekey with key exchange"""
2169
2170     WITH_KEX = True
2171
2172
2173 @tag_fixme_vpp_workers
2174 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2175     """test ikev2 responder - rekey repeat with key exchange"""
2176
2177     WITH_KEX = True
2178
2179
2180 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2181     """test ikev2 responder - non-default table id"""
2182
2183     @classmethod
2184     def setUpClass(cls):
2185         import scapy.contrib.ikev2 as _ikev2
2186
2187         globals()["ikev2"] = _ikev2
2188         super(IkePeer, cls).setUpClass()
2189         cls.create_pg_interfaces(range(1))
2190         cls.vapi.cli("ip table add 1")
2191         cls.vapi.cli("set interface ip table pg0 1")
2192         for i in cls.pg_interfaces:
2193             i.admin_up()
2194             i.config_ip4()
2195             i.resolve_arp()
2196             i.config_ip6()
2197             i.resolve_ndp()
2198
2199     def config_tc(self):
2200         self.config_params({"dpd_disabled": False})
2201
2202     def test_responder(self):
2203         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2204         super(TestResponderVrf, self).test_responder()
2205         self.pg0.enable_capture()
2206         self.pg_start()
2207         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2208         ih = self.get_ike_header(capture[0])
2209         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2210         plain = self.sa.hmac_and_decrypt(ih)
2211         self.assertEqual(plain, b"")
2212
2213
2214 @tag_fixme_vpp_workers
2215 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2216     """test ikev2 responder - cert based auth"""
2217
2218     def config_tc(self):
2219         self.config_params(
2220             {
2221                 "udp_encap": True,
2222                 "auth": "rsa-sig",
2223                 "server-key": "server-key.pem",
2224                 "client-key": "client-key.pem",
2225                 "client-cert": "client-cert.pem",
2226                 "server-cert": "server-cert.pem",
2227             }
2228         )
2229
2230
2231 @tag_fixme_vpp_workers
2232 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2233     TemplateResponder, Ikev2Params
2234 ):
2235     """
2236     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2237     """
2238
2239     def config_tc(self):
2240         self.config_params(
2241             {
2242                 "ike-crypto": ("AES-CBC", 16),
2243                 "ike-integ": "SHA2-256-128",
2244                 "esp-crypto": ("AES-CBC", 24),
2245                 "esp-integ": "SHA2-384-192",
2246                 "ike-dh": "2048MODPgr",
2247                 "nonce": os.urandom(256),
2248                 "no_idr_in_auth": True,
2249             }
2250         )
2251
2252
2253 @tag_fixme_vpp_workers
2254 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2255     TemplateResponder, Ikev2Params
2256 ):
2257
2258     """
2259     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2260     """
2261
2262     def config_tc(self):
2263         self.config_params(
2264             {
2265                 "ike-crypto": ("AES-CBC", 32),
2266                 "ike-integ": "SHA2-256-128",
2267                 "esp-crypto": ("AES-GCM-16ICV", 32),
2268                 "esp-integ": "NULL",
2269                 "ike-dh": "3072MODPgr",
2270             }
2271         )
2272
2273
2274 @tag_fixme_vpp_workers
2275 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2276     """
2277     IKE:AES_GCM_16_256
2278     """
2279
2280     IKE_NODE_SUFFIX = "ip6"
2281
2282     def config_tc(self):
2283         self.config_params(
2284             {
2285                 "del_sa_from_responder": True,
2286                 "ip6": True,
2287                 "natt": True,
2288                 "ike-crypto": ("AES-GCM-16ICV", 32),
2289                 "ike-integ": "NULL",
2290                 "ike-dh": "2048MODPgr",
2291                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2292                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2293             }
2294         )
2295
2296
2297 @tag_fixme_vpp_workers
2298 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2299     """
2300     Test for keep alive messages
2301     """
2302
2303     def send_empty_req_from_responder(self):
2304         packet = self.create_empty_request()
2305         self.pg0.add_stream(packet)
2306         self.pg0.enable_capture()
2307         self.pg_start()
2308         capture = self.pg0.get_capture(1)
2309         ih = self.get_ike_header(capture[0])
2310         self.assertEqual(ih.id, self.sa.msg_id)
2311         plain = self.sa.hmac_and_decrypt(ih)
2312         self.assertEqual(plain, b"")
2313         self.assert_counter(1, "keepalive", "ip4")
2314         r = self.vapi.ikev2_sa_dump()
2315         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2316
2317     def test_initiator(self):
2318         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2319         self.send_empty_req_from_responder()
2320
2321
2322 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2323     """malformed packet test"""
2324
2325     def tearDown(self):
2326         pass
2327
2328     def config_tc(self):
2329         self.config_params()
2330
2331     def create_ike_init_msg(self, length=None, payload=None):
2332         msg = ikev2.IKEv2(
2333             length=length,
2334             init_SPI="\x11" * 8,
2335             flags="Initiator",
2336             exch_type="IKE_SA_INIT",
2337         )
2338         if payload is not None:
2339             msg /= payload
2340         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2341
2342     def verify_bad_packet_length(self):
2343         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2344         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2345         self.assert_counter(self.pkt_count, "bad_length")
2346
2347     def verify_bad_sa_payload_length(self):
2348         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2349         ike_msg = self.create_ike_init_msg(payload=p)
2350         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2351         self.assert_counter(self.pkt_count, "malformed_packet")
2352
2353     def test_responder(self):
2354         self.pkt_count = 254
2355         self.verify_bad_packet_length()
2356         self.verify_bad_sa_payload_length()
2357
2358
2359 if __name__ == "__main__":
2360     unittest.main(testRunner=VppTestRunner)