ikev2: accept rekey request for IKE SA
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import VppTestCase
23 from asfframework import (
24     tag_fixme_vpp_workers,
25     tag_fixme_ubuntu2204,
26     tag_fixme_debian11,
27     is_distro_ubuntu2204,
28     is_distro_debian11,
29     VppTestRunner,
30 )
31 from vpp_ikev2 import Profile, IDType, AuthMethod
32 from vpp_papi import VppEnum
33
34 try:
35     text_type = unicode
36 except NameError:
37     text_type = str
38
39 KEY_PAD = b"Key Pad for IKEv2"
40 SALT_SIZE = 4
41 GCM_ICV_SIZE = 16
42 GCM_IV_SIZE = 8
43
44
45 # defined in rfc3526
46 # tuple structure is (p, g, key_len)
47 DH = {
48     "2048MODPgr": (
49         long_converter(
50             """
51     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
62         ),
63         2,
64         256,
65     ),
66     "3072MODPgr": (
67         long_converter(
68             """
69     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
70     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
71     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
72     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
73     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
74     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
75     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
76     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
77     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
78     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
79     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
80     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
81     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
82     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
83     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
84     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
85         ),
86         2,
87         384,
88     ),
89 }
90
91
92 class CryptoAlgo(object):
93     def __init__(self, name, cipher, mode):
94         self.name = name
95         self.cipher = cipher
96         self.mode = mode
97         if self.cipher is not None:
98             self.bs = self.cipher.block_size // 8
99
100             if self.name == "AES-GCM-16ICV":
101                 self.iv_len = GCM_IV_SIZE
102             else:
103                 self.iv_len = self.bs
104
105     def encrypt(self, data, key, aad=None):
106         iv = os.urandom(self.iv_len)
107         if aad is None:
108             encryptor = Cipher(
109                 self.cipher(key), self.mode(iv), default_backend()
110             ).encryptor()
111             return iv + encryptor.update(data) + encryptor.finalize()
112         else:
113             salt = key[-SALT_SIZE:]
114             nonce = salt + iv
115             encryptor = Cipher(
116                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
117             ).encryptor()
118             encryptor.authenticate_additional_data(aad)
119             data = encryptor.update(data) + encryptor.finalize()
120             data += encryptor.tag[:GCM_ICV_SIZE]
121             return iv + data
122
123     def decrypt(self, data, key, aad=None, icv=None):
124         if aad is None:
125             iv = data[: self.iv_len]
126             ct = data[self.iv_len :]
127             decryptor = Cipher(
128                 algorithms.AES(key), self.mode(iv), default_backend()
129             ).decryptor()
130             return decryptor.update(ct) + decryptor.finalize()
131         else:
132             salt = key[-SALT_SIZE:]
133             nonce = salt + data[:GCM_IV_SIZE]
134             ct = data[GCM_IV_SIZE:]
135             key = key[:-SALT_SIZE]
136             decryptor = Cipher(
137                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
138             ).decryptor()
139             decryptor.authenticate_additional_data(aad)
140             return decryptor.update(ct) + decryptor.finalize()
141
142     def pad(self, data):
143         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
144         data = data + b"\x00" * (pad_len - 1)
145         return data + bytes([pad_len - 1])
146
147
148 class AuthAlgo(object):
149     def __init__(self, name, mac, mod, key_len, trunc_len=None):
150         self.name = name
151         self.mac = mac
152         self.mod = mod
153         self.key_len = key_len
154         self.trunc_len = trunc_len or key_len
155
156
157 CRYPTO_ALGOS = {
158     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
159     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
160     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
161 }
162
163 AUTH_ALGOS = {
164     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
165     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
166     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
167     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
168     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
169 }
170
171 PRF_ALGOS = {
172     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
173     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
174 }
175
176 CRYPTO_IDS = {
177     12: "AES-CBC",
178     20: "AES-GCM-16ICV",
179 }
180
181 INTEG_IDS = {
182     2: "HMAC-SHA1-96",
183     12: "SHA2-256-128",
184     13: "SHA2-384-192",
185     14: "SHA2-512-256",
186 }
187
188
189 class IKEv2ChildSA(object):
190     def __init__(self, local_ts, remote_ts, is_initiator):
191         spi = os.urandom(4)
192         if is_initiator:
193             self.ispi = spi
194             self.rspi = None
195         else:
196             self.rspi = spi
197             self.ispi = None
198         self.local_ts = local_ts
199         self.remote_ts = remote_ts
200
201
202 class IKEv2SA(object):
203     def __init__(
204         self,
205         test,
206         is_initiator=True,
207         i_id=None,
208         r_id=None,
209         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
210         id_type="fqdn",
211         nonce=None,
212         auth_data=None,
213         local_ts=None,
214         remote_ts=None,
215         auth_method="shared-key",
216         priv_key=None,
217         i_natt=False,
218         r_natt=False,
219         udp_encap=False,
220     ):
221         self.udp_encap = udp_encap
222         self.i_natt = i_natt
223         self.r_natt = r_natt
224         if i_natt or r_natt:
225             self.sport = 4500
226             self.dport = 4500
227         else:
228             self.sport = 500
229             self.dport = 500
230         self.msg_id = 0
231         self.dh_params = None
232         self.test = test
233         self.priv_key = priv_key
234         self.is_initiator = is_initiator
235         nonce = nonce or os.urandom(32)
236         self.auth_data = auth_data
237         self.i_id = i_id
238         self.r_id = r_id
239         if isinstance(id_type, str):
240             self.id_type = IDType.value(id_type)
241         else:
242             self.id_type = id_type
243         self.auth_method = auth_method
244         if self.is_initiator:
245             self.rspi = 8 * b"\x00"
246             self.ispi = spi
247             self.i_nonce = nonce
248         else:
249             self.rspi = spi
250             self.ispi = 8 * b"\x00"
251             self.r_nonce = nonce
252         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
253
254     def new_msg_id(self):
255         self.msg_id += 1
256         return self.msg_id
257
258     @property
259     def my_dh_pub_key(self):
260         if self.is_initiator:
261             return self.i_dh_data
262         return self.r_dh_data
263
264     @property
265     def peer_dh_pub_key(self):
266         if self.is_initiator:
267             return self.r_dh_data
268         return self.i_dh_data
269
270     @property
271     def natt(self):
272         return self.i_natt or self.r_natt
273
274     def compute_secret(self):
275         priv = self.dh_private_key
276         peer = self.peer_dh_pub_key
277         p, g, l = self.ike_group
278         return pow(
279             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
280         ).to_bytes(l, "big")
281
282     def generate_dh_data(self):
283         # generate DH keys
284         if self.ike_dh not in DH:
285             raise NotImplementedError("%s not in DH group" % self.ike_dh)
286
287         if self.dh_params is None:
288             dhg = DH[self.ike_dh]
289             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
290             self.dh_params = pn.parameters(default_backend())
291
292         priv = self.dh_params.generate_private_key()
293         pub = priv.public_key()
294         x = priv.private_numbers().x
295         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
296         y = pub.public_numbers().y
297
298         if self.is_initiator:
299             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
300         else:
301             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
302
303     def complete_dh_data(self):
304         self.dh_shared_secret = self.compute_secret()
305
306     def calc_child_keys(self, kex=False):
307         prf = self.ike_prf_alg.mod()
308         s = self.i_nonce + self.r_nonce
309         if kex:
310             s = self.dh_shared_secret + s
311         c = self.child_sas[0]
312
313         encr_key_len = self.esp_crypto_key_len
314         integ_key_len = self.esp_integ_alg.key_len
315         salt_len = 0 if integ_key_len else 4
316
317         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
318         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
319
320         pos = 0
321         c.sk_ei = keymat[pos : pos + encr_key_len]
322         pos += encr_key_len
323
324         if integ_key_len:
325             c.sk_ai = keymat[pos : pos + integ_key_len]
326             pos += integ_key_len
327         else:
328             c.salt_ei = keymat[pos : pos + salt_len]
329             pos += salt_len
330
331         c.sk_er = keymat[pos : pos + encr_key_len]
332         pos += encr_key_len
333
334         if integ_key_len:
335             c.sk_ar = keymat[pos : pos + integ_key_len]
336             pos += integ_key_len
337         else:
338             c.salt_er = keymat[pos : pos + salt_len]
339             pos += salt_len
340
341     def calc_prfplus(self, prf, key, seed, length):
342         r = b""
343         t = None
344         x = 1
345         while len(r) < length and x < 255:
346             if t is not None:
347                 s = t
348             else:
349                 s = b""
350             s = s + seed + bytes([x])
351             t = self.calc_prf(prf, key, s)
352             r = r + t
353             x = x + 1
354
355         if x == 255:
356             return None
357         return r
358
359     def calc_prf(self, prf, key, data):
360         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
361         h.update(data)
362         return h.finalize()
363
364     def calc_keys(self, sk_d=None):
365         prf = self.ike_prf_alg.mod()
366         if sk_d is None:
367             # SKEYSEED = prf(Ni | Nr, g^ir)
368             self.skeyseed = self.calc_prf(
369                 prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
370             )
371         else:
372             # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
373             self.skeyseed = self.calc_prf(
374                 prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
375             )
376
377         # calculate S = Ni | Nr | SPIi SPIr
378         s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
379
380         prf_key_trunc = self.ike_prf_alg.trunc_len
381         encr_key_len = self.ike_crypto_key_len
382         tr_prf_key_len = self.ike_prf_alg.key_len
383         integ_key_len = self.ike_integ_alg.key_len
384         if integ_key_len == 0:
385             salt_size = 4
386         else:
387             salt_size = 0
388
389         l = (
390             prf_key_trunc
391             + integ_key_len * 2
392             + encr_key_len * 2
393             + tr_prf_key_len * 2
394             + salt_size * 2
395         )
396         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
397
398         pos = 0
399         self.sk_d = keymat[: pos + prf_key_trunc]
400         pos += prf_key_trunc
401
402         self.sk_ai = keymat[pos : pos + integ_key_len]
403         pos += integ_key_len
404         self.sk_ar = keymat[pos : pos + integ_key_len]
405         pos += integ_key_len
406
407         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
408         pos += encr_key_len + salt_size
409         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
410         pos += encr_key_len + salt_size
411
412         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
413         pos += tr_prf_key_len
414         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
415
416     def generate_authmsg(self, prf, packet):
417         if self.is_initiator:
418             id = self.i_id
419             nonce = self.r_nonce
420             key = self.sk_pi
421         else:
422             id = self.r_id
423             nonce = self.i_nonce
424             key = self.sk_pr
425         data = bytes([self.id_type, 0, 0, 0]) + id
426         id_hash = self.calc_prf(prf, key, data)
427         return packet + nonce + id_hash
428
429     def auth_init(self):
430         prf = self.ike_prf_alg.mod()
431         if self.is_initiator:
432             packet = self.init_req_packet
433         else:
434             packet = self.init_resp_packet
435         authmsg = self.generate_authmsg(prf, raw(packet))
436         if self.auth_method == "shared-key":
437             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
438             self.auth_data = self.calc_prf(prf, psk, authmsg)
439         elif self.auth_method == "rsa-sig":
440             self.auth_data = self.priv_key.sign(
441                 authmsg, padding.PKCS1v15(), hashes.SHA1()
442             )
443         else:
444             raise TypeError("unknown auth method type!")
445
446     def encrypt(self, data, aad=None):
447         data = self.ike_crypto_alg.pad(data)
448         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
449
450     @property
451     def peer_authkey(self):
452         if self.is_initiator:
453             return self.sk_ar
454         return self.sk_ai
455
456     @property
457     def my_authkey(self):
458         if self.is_initiator:
459             return self.sk_ai
460         return self.sk_ar
461
462     @property
463     def my_cryptokey(self):
464         if self.is_initiator:
465             return self.sk_ei
466         return self.sk_er
467
468     @property
469     def peer_cryptokey(self):
470         if self.is_initiator:
471             return self.sk_er
472         return self.sk_ei
473
474     def concat(self, alg, key_len):
475         return alg + "-" + str(key_len * 8)
476
477     @property
478     def vpp_ike_cypto_alg(self):
479         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
480
481     @property
482     def vpp_esp_cypto_alg(self):
483         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
484
485     def verify_hmac(self, ikemsg):
486         integ_trunc = self.ike_integ_alg.trunc_len
487         exp_hmac = ikemsg[-integ_trunc:]
488         data = ikemsg[:-integ_trunc]
489         computed_hmac = self.compute_hmac(
490             self.ike_integ_alg.mod(), self.peer_authkey, data
491         )
492         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
493
494     def compute_hmac(self, integ, key, data):
495         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
496         h.update(data)
497         return h.finalize()
498
499     def decrypt(self, data, aad=None, icv=None):
500         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
501
502     def hmac_and_decrypt(self, ike):
503         ep = ike[ikev2.IKEv2_payload_Encrypted]
504         if self.ike_crypto == "AES-GCM-16ICV":
505             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
506             ct = ep.load[:-GCM_ICV_SIZE]
507             tag = ep.load[-GCM_ICV_SIZE:]
508             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
509         else:
510             self.verify_hmac(raw(ike))
511             integ_trunc = self.ike_integ_alg.trunc_len
512
513             # remove ICV and decrypt payload
514             ct = ep.load[:-integ_trunc]
515             plain = self.decrypt(ct)
516         # remove padding
517         pad_len = plain[-1]
518         return plain[: -pad_len - 1]
519
520     def build_ts_addr(self, ts, version):
521         return {
522             "starting_address_v" + version: ts["start_addr"],
523             "ending_address_v" + version: ts["end_addr"],
524         }
525
526     def generate_ts(self, is_ip4):
527         c = self.child_sas[0]
528         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
529         if is_ip4:
530             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
531             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
532             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
533             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
534         else:
535             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
536             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
537             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
538             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
539
540         if self.is_initiator:
541             return ([ts1], [ts2])
542         return ([ts2], [ts1])
543
544     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
545         if crypto not in CRYPTO_ALGOS:
546             raise TypeError("unsupported encryption algo %r" % crypto)
547         self.ike_crypto = crypto
548         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
549         self.ike_crypto_key_len = crypto_key_len
550
551         if integ not in AUTH_ALGOS:
552             raise TypeError("unsupported auth algo %r" % integ)
553         self.ike_integ = None if integ == "NULL" else integ
554         self.ike_integ_alg = AUTH_ALGOS[integ]
555
556         if prf not in PRF_ALGOS:
557             raise TypeError("unsupported prf algo %r" % prf)
558         self.ike_prf = prf
559         self.ike_prf_alg = PRF_ALGOS[prf]
560         self.ike_dh = dh
561         self.ike_group = DH[self.ike_dh]
562
563     def set_esp_props(self, crypto, crypto_key_len, integ):
564         self.esp_crypto_key_len = crypto_key_len
565         if crypto not in CRYPTO_ALGOS:
566             raise TypeError("unsupported encryption algo %r" % crypto)
567         self.esp_crypto = crypto
568         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
569
570         if integ not in AUTH_ALGOS:
571             raise TypeError("unsupported auth algo %r" % integ)
572         self.esp_integ = None if integ == "NULL" else integ
573         self.esp_integ_alg = AUTH_ALGOS[integ]
574
575     def crypto_attr(self, key_len):
576         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
577             return (0x800E << 16 | key_len << 3, 12)
578         else:
579             raise Exception("unsupported attribute type")
580
581     def ike_crypto_attr(self):
582         return self.crypto_attr(self.ike_crypto_key_len)
583
584     def esp_crypto_attr(self):
585         return self.crypto_attr(self.esp_crypto_key_len)
586
587     def compute_nat_sha1(self, ip, port, rspi=None):
588         if rspi is None:
589             rspi = self.rspi
590         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
591         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
592         digest.update(data)
593         return digest.finalize()
594
595     def clone(self, test, **kwargs):
596         if "spi" not in kwargs:
597             kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
598         if "nonce" not in kwargs:
599             kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
600         if self.child_sas:
601             if "local_ts" not in kwargs:
602                 kwargs["local_ts"] = self.child_sas[0].local_ts
603             if "remote_ts" not in kwargs:
604                 kwargs["remote_ts"] = self.child_sas[0].remote_ts
605         sa = type(self)(
606             test,
607             is_initiator=self.is_initiator,
608             i_id=self.i_id,
609             r_id=self.r_id,
610             id_type=self.id_type,
611             auth_data=self.auth_data,
612             auth_method=self.auth_method,
613             priv_key=self.priv_key,
614             i_natt=self.i_natt,
615             r_natt=self.r_natt,
616             udp_encap=self.udp_encap,
617             **kwargs,
618         )
619         if sa.is_initiator:
620             sa.set_ike_props(
621                 crypto=self.ike_crypto,
622                 crypto_key_len=self.ike_crypto_key_len,
623                 integ=self.ike_integ,
624                 prf=self.ike_prf,
625                 dh=self.ike_dh,
626             )
627             sa.set_esp_props(
628                 crypto=self.esp_crypto,
629                 crypto_key_len=self.esp_crypto_key_len,
630                 integ=self.esp_integ,
631             )
632         return sa
633
634
635 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
636 class IkePeer(VppTestCase):
637     """common class for initiator and responder"""
638
639     @classmethod
640     def setUpClass(cls):
641         import scapy.contrib.ikev2 as _ikev2
642
643         globals()["ikev2"] = _ikev2
644         super(IkePeer, cls).setUpClass()
645         cls.create_pg_interfaces(range(2))
646         for i in cls.pg_interfaces:
647             i.admin_up()
648             i.config_ip4()
649             i.resolve_arp()
650             i.config_ip6()
651             i.resolve_ndp()
652
653     @classmethod
654     def tearDownClass(cls):
655         super(IkePeer, cls).tearDownClass()
656
657     def tearDown(self):
658         super(IkePeer, self).tearDown()
659         if self.del_sa_from_responder:
660             self.initiate_del_sa_from_responder()
661         else:
662             self.initiate_del_sa_from_initiator()
663         r = self.vapi.ikev2_sa_dump()
664         self.assertEqual(len(r), 0)
665         sas = self.vapi.ipsec_sa_dump()
666         self.assertEqual(len(sas), 0)
667         self.p.remove_vpp_config()
668         self.assertIsNone(self.p.query_vpp_config())
669
670     def setUp(self):
671         super(IkePeer, self).setUp()
672         self.config_tc()
673         self.p.add_vpp_config()
674         self.assertIsNotNone(self.p.query_vpp_config())
675         if self.sa.is_initiator:
676             self.sa.generate_dh_data()
677         self.vapi.cli("ikev2 set logging level 4")
678         self.vapi.cli("event-lo clear")
679
680     def assert_counter(self, count, name, version="ip4"):
681         node_name = "/err/ikev2-%s/" % version + name
682         self.assertEqual(count, self.statistics.get_err_counter(node_name))
683
684     def create_rekey_request(self, kex=False):
685         sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
686         header = ikev2.IKEv2(
687             init_SPI=self.sa.ispi,
688             resp_SPI=self.sa.rspi,
689             id=self.sa.new_msg_id(),
690             flags="Initiator",
691             exch_type="CREATE_CHILD_SA",
692         )
693
694         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
695         return self.create_packet(
696             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
697         )
698
699     def create_sa_rekey_request(self, **kwargs):
700         sa = self.generate_sa_init_payload(**kwargs)
701         header = ikev2.IKEv2(
702             init_SPI=self.sa.ispi,
703             resp_SPI=self.sa.rspi,
704             id=self.sa.new_msg_id(),
705             flags="Initiator",
706             exch_type="CREATE_CHILD_SA",
707         )
708         ike_msg = self.encrypt_ike_msg(header, sa, "SA")
709         return self.create_packet(
710             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
711         )
712
713     def create_empty_request(self):
714         header = ikev2.IKEv2(
715             init_SPI=self.sa.ispi,
716             resp_SPI=self.sa.rspi,
717             id=self.sa.new_msg_id(),
718             flags="Initiator",
719             exch_type="INFORMATIONAL",
720             next_payload="Encrypted",
721         )
722
723         msg = self.encrypt_ike_msg(header, b"", None)
724         return self.create_packet(
725             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
726         )
727
728     def create_packet(
729         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
730     ):
731         if use_ip6:
732             src_ip = src_if.remote_ip6
733             dst_ip = src_if.local_ip6
734             ip_layer = IPv6
735         else:
736             src_ip = src_if.remote_ip4
737             dst_ip = src_if.local_ip4
738             ip_layer = IP
739         res = (
740             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
741             / ip_layer(src=src_ip, dst=dst_ip)
742             / UDP(sport=sport, dport=dport)
743         )
744         if natt:
745             # insert non ESP marker
746             res = res / Raw(b"\x00" * 4)
747         return res / msg
748
749     def verify_udp(self, udp):
750         self.assertEqual(udp.sport, self.sa.sport)
751         self.assertEqual(udp.dport, self.sa.dport)
752
753     def get_ike_header(self, packet):
754         try:
755             ih = packet[ikev2.IKEv2]
756             ih = self.verify_and_remove_non_esp_marker(ih)
757         except IndexError as e:
758             # this is a workaround for getting IKEv2 layer as both ikev2 and
759             # ipsec register for port 4500
760             esp = packet[ESP]
761             ih = self.verify_and_remove_non_esp_marker(esp)
762         self.assertEqual(ih.version, 0x20)
763         self.assertNotIn("Version", ih.flags)
764         return ih
765
766     def verify_and_remove_non_esp_marker(self, packet):
767         if self.sa.natt:
768             # if we are in nat traversal mode check for non esp marker
769             # and remove it
770             data = raw(packet)
771             self.assertEqual(data[:4], b"\x00" * 4)
772             return ikev2.IKEv2(data[4:])
773         else:
774             return packet
775
776     def encrypt_ike_msg(self, header, plain, first_payload):
777         if self.sa.ike_crypto == "AES-GCM-16ICV":
778             data = self.sa.ike_crypto_alg.pad(raw(plain))
779             plen = (
780                 len(data)
781                 + GCM_IV_SIZE
782                 + GCM_ICV_SIZE
783                 + len(ikev2.IKEv2_payload_Encrypted())
784             )
785             tlen = plen + len(ikev2.IKEv2())
786
787             # prepare aad data
788             sk_p = ikev2.IKEv2_payload_Encrypted(
789                 next_payload=first_payload, length=plen
790             )
791             header.length = tlen
792             res = header / sk_p
793             encr = self.sa.encrypt(raw(plain), raw(res))
794             sk_p = ikev2.IKEv2_payload_Encrypted(
795                 next_payload=first_payload, length=plen, load=encr
796             )
797             res = header / sk_p
798         else:
799             encr = self.sa.encrypt(raw(plain))
800             trunc_len = self.sa.ike_integ_alg.trunc_len
801             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
802             tlen = plen + len(ikev2.IKEv2())
803
804             sk_p = ikev2.IKEv2_payload_Encrypted(
805                 next_payload=first_payload, length=plen, load=encr
806             )
807             header.length = tlen
808             res = header / sk_p
809
810             integ_data = raw(res)
811             hmac_data = self.sa.compute_hmac(
812                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
813             )
814             res = res / Raw(hmac_data[:trunc_len])
815         assert len(res) == tlen
816         return res
817
818     def verify_udp_encap(self, ipsec_sa):
819         e = VppEnum.vl_api_ipsec_sad_flags_t
820         if self.sa.udp_encap or self.sa.natt:
821             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
822         else:
823             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
824
825     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
826         sas = self.vapi.ipsec_sa_dump()
827         if sa_count is None:
828             if is_rekey:
829                 # after rekey there is a short period of time in which old
830                 # inbound SA is still present
831                 sa_count = 3
832             else:
833                 sa_count = 2
834         self.assertEqual(len(sas), sa_count)
835         if self.sa.is_initiator:
836             if is_rekey:
837                 sa0 = sas[0].entry
838                 sa1 = sas[2].entry
839             else:
840                 sa0 = sas[0].entry
841                 sa1 = sas[1].entry
842         else:
843             if is_rekey:
844                 sa0 = sas[2].entry
845                 sa1 = sas[0].entry
846             else:
847                 sa1 = sas[0].entry
848                 sa0 = sas[1].entry
849
850         c = self.sa.child_sas[0]
851
852         self.verify_udp_encap(sa0)
853         self.verify_udp_encap(sa1)
854         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
855         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
856         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
857
858         if self.sa.esp_integ is None:
859             vpp_integ_alg = 0
860         else:
861             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
862         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
863         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
864
865         # verify crypto keys
866         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
867         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
868         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
869         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
870
871         # verify integ keys
872         if vpp_integ_alg:
873             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
874             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
875             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
876             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
877         else:
878             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
879             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
880
881     def verify_keymat(self, api_keys, keys, name):
882         km = getattr(keys, name)
883         api_km = getattr(api_keys, name)
884         api_km_len = getattr(api_keys, name + "_len")
885         self.assertEqual(len(km), api_km_len)
886         self.assertEqual(km, api_km[:api_km_len])
887
888     def verify_id(self, api_id, exp_id):
889         self.assertEqual(api_id.type, IDType.value(exp_id.type))
890         self.assertEqual(api_id.data_len, exp_id.data_len)
891         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
892
893     def verify_ike_sas(self, is_rekey=False):
894         r = self.vapi.ikev2_sa_dump()
895         if is_rekey:
896             sa_count = 2
897             sa = r[1].sa
898         else:
899             sa_count = 1
900             sa = r[0].sa
901         self.assertEqual(len(r), sa_count)
902         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
903         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
904         if self.ip6:
905             if self.sa.is_initiator:
906                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
907                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
908             else:
909                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
910                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
911         else:
912             if self.sa.is_initiator:
913                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
914                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
915             else:
916                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
917                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
918         self.verify_keymat(sa.keys, self.sa, "sk_d")
919         self.verify_keymat(sa.keys, self.sa, "sk_ai")
920         self.verify_keymat(sa.keys, self.sa, "sk_ar")
921         self.verify_keymat(sa.keys, self.sa, "sk_ei")
922         self.verify_keymat(sa.keys, self.sa, "sk_er")
923         self.verify_keymat(sa.keys, self.sa, "sk_pi")
924         self.verify_keymat(sa.keys, self.sa, "sk_pr")
925
926         self.assertEqual(sa.i_id.type, self.sa.id_type)
927         self.assertEqual(sa.r_id.type, self.sa.id_type)
928         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
929         self.assertEqual(sa.r_id.data_len, len(self.idr))
930         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
931         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
932
933         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
934         self.verify_nonce(n, self.sa.i_nonce)
935         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
936         self.verify_nonce(n, self.sa.r_nonce)
937
938         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
939         if is_rekey:
940             self.assertEqual(len(r), 0)
941             return
942
943         self.assertEqual(len(r), 1)
944         csa = r[0].child_sa
945         self.assertEqual(csa.sa_index, sa.sa_index)
946         c = self.sa.child_sas[0]
947         if hasattr(c, "sk_ai"):
948             self.verify_keymat(csa.keys, c, "sk_ai")
949             self.verify_keymat(csa.keys, c, "sk_ar")
950         self.verify_keymat(csa.keys, c, "sk_ei")
951         self.verify_keymat(csa.keys, c, "sk_er")
952         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
953         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
954
955         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
956         tsi = tsi[0]
957         tsr = tsr[0]
958         r = self.vapi.ikev2_traffic_selector_dump(
959             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
960         )
961         self.assertEqual(len(r), 1)
962         ts = r[0].ts
963         self.verify_ts(r[0].ts, tsi[0], True)
964
965         r = self.vapi.ikev2_traffic_selector_dump(
966             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
967         )
968         self.assertEqual(len(r), 1)
969         self.verify_ts(r[0].ts, tsr[0], False)
970
971     def verify_nonce(self, api_nonce, nonce):
972         self.assertEqual(api_nonce.data_len, len(nonce))
973         self.assertEqual(api_nonce.nonce, nonce)
974
975     def verify_ts(self, api_ts, ts, is_initiator):
976         if is_initiator:
977             self.assertTrue(api_ts.is_local)
978         else:
979             self.assertFalse(api_ts.is_local)
980
981         if self.p.ts_is_ip4:
982             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
983             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
984         else:
985             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
986             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
987         self.assertEqual(api_ts.start_port, ts.start_port)
988         self.assertEqual(api_ts.end_port, ts.end_port)
989         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
990
991
992 class TemplateInitiator(IkePeer):
993     """initiator test template"""
994
995     def initiate_del_sa_from_initiator(self):
996         ispi = int.from_bytes(self.sa.ispi, "little")
997         self.pg0.enable_capture()
998         self.pg_start()
999         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
1000         capture = self.pg0.get_capture(1)
1001         ih = self.get_ike_header(capture[0])
1002         self.assertNotIn("Response", ih.flags)
1003         self.assertIn("Initiator", ih.flags)
1004         self.assertEqual(ih.init_SPI, self.sa.ispi)
1005         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1006         plain = self.sa.hmac_and_decrypt(ih)
1007         d = ikev2.IKEv2_payload_Delete(plain)
1008         self.assertEqual(d.proto, 1)  # proto=IKEv2
1009         header = ikev2.IKEv2(
1010             init_SPI=self.sa.ispi,
1011             resp_SPI=self.sa.rspi,
1012             flags="Response",
1013             exch_type="INFORMATIONAL",
1014             id=ih.id,
1015             next_payload="Encrypted",
1016         )
1017         resp = self.encrypt_ike_msg(header, b"", None)
1018         self.send_and_assert_no_replies(self.pg0, resp)
1019
1020     def verify_del_sa(self, packet):
1021         ih = self.get_ike_header(packet)
1022         self.assertEqual(ih.id, self.sa.msg_id)
1023         self.assertEqual(ih.exch_type, 37)  # exchange informational
1024         self.assertIn("Response", ih.flags)
1025         self.assertIn("Initiator", ih.flags)
1026         plain = self.sa.hmac_and_decrypt(ih)
1027         self.assertEqual(plain, b"")
1028
1029     def initiate_del_sa_from_responder(self):
1030         header = ikev2.IKEv2(
1031             init_SPI=self.sa.ispi,
1032             resp_SPI=self.sa.rspi,
1033             exch_type="INFORMATIONAL",
1034             id=self.sa.new_msg_id(),
1035         )
1036         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1037         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1038         packet = self.create_packet(
1039             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1040         )
1041         self.pg0.add_stream(packet)
1042         self.pg0.enable_capture()
1043         self.pg_start()
1044         capture = self.pg0.get_capture(1)
1045         self.verify_del_sa(capture[0])
1046
1047     @staticmethod
1048     def find_notify_payload(packet, notify_type):
1049         n = packet[ikev2.IKEv2_payload_Notify]
1050         while n is not None:
1051             if n.type == notify_type:
1052                 return n
1053             n = n.payload
1054         return None
1055
1056     def verify_nat_detection(self, packet):
1057         if self.ip6:
1058             iph = packet[IPv6]
1059         else:
1060             iph = packet[IP]
1061         udp = packet[UDP]
1062
1063         # NAT_DETECTION_SOURCE_IP
1064         s = self.find_notify_payload(packet, 16388)
1065         self.assertIsNotNone(s)
1066         src_sha = self.sa.compute_nat_sha1(
1067             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
1068         )
1069         self.assertEqual(s.load, src_sha)
1070
1071         # NAT_DETECTION_DESTINATION_IP
1072         s = self.find_notify_payload(packet, 16389)
1073         self.assertIsNotNone(s)
1074         dst_sha = self.sa.compute_nat_sha1(
1075             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1076         )
1077         self.assertEqual(s.load, dst_sha)
1078
1079     def verify_sa_init_request(self, packet):
1080         udp = packet[UDP]
1081         self.sa.dport = udp.sport
1082         ih = packet[ikev2.IKEv2]
1083         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1084         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1085         self.sa.ispi = ih.init_SPI
1086         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1087         self.assertIn("Initiator", ih.flags)
1088         self.assertNotIn("Response", ih.flags)
1089         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1090         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1091
1092         prop = packet[ikev2.IKEv2_payload_Proposal]
1093         self.assertEqual(prop.proto, 1)  # proto = ikev2
1094         self.assertEqual(prop.proposal, 1)
1095         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1096         self.assertEqual(
1097             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1098         )
1099         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1100         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1101         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1102         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1103
1104         self.verify_nat_detection(packet)
1105         self.sa.set_ike_props(
1106             crypto="AES-GCM-16ICV",
1107             crypto_key_len=32,
1108             integ="NULL",
1109             prf="PRF_HMAC_SHA2_256",
1110             dh="3072MODPgr",
1111         )
1112         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1113         self.sa.generate_dh_data()
1114         self.sa.complete_dh_data()
1115         self.sa.calc_keys()
1116
1117     def update_esp_transforms(self, trans, sa):
1118         while trans:
1119             if trans.transform_type == 1:  # ecryption
1120                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1121             elif trans.transform_type == 3:  # integrity
1122                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1123             trans = trans.payload
1124
1125     def verify_sa_auth_req(self, packet):
1126         udp = packet[UDP]
1127         self.sa.dport = udp.sport
1128         ih = self.get_ike_header(packet)
1129         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1130         self.assertEqual(ih.init_SPI, self.sa.ispi)
1131         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1132         self.assertIn("Initiator", ih.flags)
1133         self.assertNotIn("Response", ih.flags)
1134
1135         udp = packet[UDP]
1136         self.verify_udp(udp)
1137         self.assertEqual(ih.id, self.sa.msg_id + 1)
1138         self.sa.msg_id += 1
1139         plain = self.sa.hmac_and_decrypt(ih)
1140         idi = ikev2.IKEv2_payload_IDi(plain)
1141         self.assertEqual(idi.load, self.sa.i_id)
1142         if self.no_idr_auth:
1143             self.assertEqual(idi.next_payload, 39)  # AUTH
1144         else:
1145             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1146             self.assertEqual(idr.load, self.sa.r_id)
1147         prop = idi[ikev2.IKEv2_payload_Proposal]
1148         c = self.sa.child_sas[0]
1149         c.ispi = prop.SPI
1150         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1151
1152     def send_init_response(self):
1153         tr_attr = self.sa.ike_crypto_attr()
1154         trans = (
1155             ikev2.IKEv2_payload_Transform(
1156                 transform_type="Encryption",
1157                 transform_id=self.sa.ike_crypto,
1158                 length=tr_attr[1],
1159                 key_length=tr_attr[0],
1160             )
1161             / ikev2.IKEv2_payload_Transform(
1162                 transform_type="Integrity", transform_id=self.sa.ike_integ
1163             )
1164             / ikev2.IKEv2_payload_Transform(
1165                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1166             )
1167             / ikev2.IKEv2_payload_Transform(
1168                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1169             )
1170         )
1171         props = ikev2.IKEv2_payload_Proposal(
1172             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1173         )
1174
1175         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1176         if self.sa.natt:
1177             dst_address = b"\x0a\x0a\x0a\x0a"
1178         else:
1179             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1180         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1181         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1182
1183         self.sa.init_resp_packet = (
1184             ikev2.IKEv2(
1185                 init_SPI=self.sa.ispi,
1186                 resp_SPI=self.sa.rspi,
1187                 exch_type="IKE_SA_INIT",
1188                 flags="Response",
1189             )
1190             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1191             / ikev2.IKEv2_payload_KE(
1192                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1193             )
1194             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1195             / ikev2.IKEv2_payload_Notify(
1196                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1197             )
1198             / ikev2.IKEv2_payload_Notify(
1199                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1200             )
1201         )
1202
1203         ike_msg = self.create_packet(
1204             self.pg0,
1205             self.sa.init_resp_packet,
1206             self.sa.sport,
1207             self.sa.dport,
1208             False,
1209             self.ip6,
1210         )
1211         self.pg_send(self.pg0, ike_msg)
1212         capture = self.pg0.get_capture(1)
1213         self.verify_sa_auth_req(capture[0])
1214
1215     def initiate_sa_init(self):
1216         self.pg0.enable_capture()
1217         self.pg_start()
1218         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1219
1220         capture = self.pg0.get_capture(1)
1221         self.verify_sa_init_request(capture[0])
1222         self.send_init_response()
1223
1224     def send_auth_response(self):
1225         tr_attr = self.sa.esp_crypto_attr()
1226         trans = (
1227             ikev2.IKEv2_payload_Transform(
1228                 transform_type="Encryption",
1229                 transform_id=self.sa.esp_crypto,
1230                 length=tr_attr[1],
1231                 key_length=tr_attr[0],
1232             )
1233             / ikev2.IKEv2_payload_Transform(
1234                 transform_type="Integrity", transform_id=self.sa.esp_integ
1235             )
1236             / ikev2.IKEv2_payload_Transform(
1237                 transform_type="Extended Sequence Number", transform_id="No ESN"
1238             )
1239             / ikev2.IKEv2_payload_Transform(
1240                 transform_type="Extended Sequence Number", transform_id="ESN"
1241             )
1242         )
1243
1244         c = self.sa.child_sas[0]
1245         props = ikev2.IKEv2_payload_Proposal(
1246             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1247         )
1248
1249         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1250         plain = (
1251             ikev2.IKEv2_payload_IDi(
1252                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1253             )
1254             / ikev2.IKEv2_payload_IDr(
1255                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1256             )
1257             / ikev2.IKEv2_payload_AUTH(
1258                 next_payload="SA",
1259                 auth_type=AuthMethod.value(self.sa.auth_method),
1260                 load=self.sa.auth_data,
1261             )
1262             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1263             / ikev2.IKEv2_payload_TSi(
1264                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1265             )
1266             / ikev2.IKEv2_payload_TSr(
1267                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1268             )
1269             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1270         )
1271
1272         header = ikev2.IKEv2(
1273             init_SPI=self.sa.ispi,
1274             resp_SPI=self.sa.rspi,
1275             id=self.sa.new_msg_id(),
1276             flags="Response",
1277             exch_type="IKE_AUTH",
1278         )
1279
1280         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1281         packet = self.create_packet(
1282             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1283         )
1284         self.pg_send(self.pg0, packet)
1285
1286     def test_initiator(self):
1287         self.initiate_sa_init()
1288         self.sa.auth_init()
1289         self.sa.calc_child_keys()
1290         self.send_auth_response()
1291         self.verify_ike_sas()
1292
1293
1294 class TemplateResponder(IkePeer):
1295     """responder test template"""
1296
1297     def initiate_del_sa_from_responder(self):
1298         self.pg0.enable_capture()
1299         self.pg_start()
1300         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1301         capture = self.pg0.get_capture(1)
1302         ih = self.get_ike_header(capture[0])
1303         self.assertNotIn("Response", ih.flags)
1304         self.assertNotIn("Initiator", ih.flags)
1305         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1306         plain = self.sa.hmac_and_decrypt(ih)
1307         d = ikev2.IKEv2_payload_Delete(plain)
1308         self.assertEqual(d.proto, 1)  # proto=IKEv2
1309         self.assertEqual(ih.init_SPI, self.sa.ispi)
1310         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1311         header = ikev2.IKEv2(
1312             init_SPI=self.sa.ispi,
1313             resp_SPI=self.sa.rspi,
1314             flags="Initiator+Response",
1315             exch_type="INFORMATIONAL",
1316             id=ih.id,
1317             next_payload="Encrypted",
1318         )
1319         resp = self.encrypt_ike_msg(header, b"", None)
1320         self.send_and_assert_no_replies(self.pg0, resp)
1321
1322     def verify_del_sa(self, packet):
1323         ih = self.get_ike_header(packet)
1324         self.assertEqual(ih.id, self.sa.msg_id)
1325         self.assertEqual(ih.exch_type, 37)  # exchange informational
1326         self.assertIn("Response", ih.flags)
1327         self.assertNotIn("Initiator", ih.flags)
1328         self.assertEqual(ih.next_payload, 46)  # Encrypted
1329         self.assertEqual(ih.init_SPI, self.sa.ispi)
1330         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1331         plain = self.sa.hmac_and_decrypt(ih)
1332         self.assertEqual(plain, b"")
1333
1334     def initiate_del_sa_from_initiator(self):
1335         header = ikev2.IKEv2(
1336             init_SPI=self.sa.ispi,
1337             resp_SPI=self.sa.rspi,
1338             flags="Initiator",
1339             exch_type="INFORMATIONAL",
1340             id=self.sa.new_msg_id(),
1341         )
1342         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1343         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1344         packet = self.create_packet(
1345             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1346         )
1347         self.pg0.add_stream(packet)
1348         self.pg0.enable_capture()
1349         self.pg_start()
1350         capture = self.pg0.get_capture(1)
1351         self.verify_del_sa(capture[0])
1352
1353     def generate_sa_init_payload(
1354         self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
1355     ):
1356         tr_attr = self.sa.ike_crypto_attr()
1357         trans = (
1358             ikev2.IKEv2_payload_Transform(
1359                 transform_type="Encryption",
1360                 transform_id=self.sa.ike_crypto,
1361                 length=tr_attr[1],
1362                 key_length=tr_attr[0],
1363             )
1364             / ikev2.IKEv2_payload_Transform(
1365                 transform_type="Integrity", transform_id=self.sa.ike_integ
1366             )
1367             / ikev2.IKEv2_payload_Transform(
1368                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1369             )
1370             / ikev2.IKEv2_payload_Transform(
1371                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1372             )
1373         )
1374
1375         if spi is None:
1376             pargs = {}
1377         else:
1378             pargs = {"SPI": spi, "SPIsize": len(spi)}
1379         props = ikev2.IKEv2_payload_Proposal(
1380             proposal=1,
1381             proto="IKEv2",
1382             trans_nb=4,
1383             trans=trans,
1384             **pargs,
1385         )
1386
1387         return (
1388             ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1389             / ikev2.IKEv2_payload_KE(
1390                 next_payload="Nonce",
1391                 group=self.sa.ike_dh,
1392                 load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
1393             )
1394             / ikev2.IKEv2_payload_Nonce(
1395                 next_payload=next_payload,
1396                 load=self.sa.i_nonce if nonce is None else nonce,
1397             )
1398         )
1399
1400     def send_sa_init_req(self):
1401         self.sa.init_req_packet = ikev2.IKEv2(
1402             init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1403         ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
1404
1405         if not self.ip6:
1406             if self.sa.i_natt:
1407                 src_address = b"\x0a\x0a\x0a\x01"
1408             else:
1409                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1410
1411             if self.sa.r_natt:
1412                 dst_address = b"\x0a\x0a\x0a\x0a"
1413             else:
1414                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1415
1416             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1417             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1418             nat_src_detection = ikev2.IKEv2_payload_Notify(
1419                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1420             )
1421             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1422                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1423             )
1424             self.sa.init_req_packet = (
1425                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1426             )
1427
1428         ike_msg = self.create_packet(
1429             self.pg0,
1430             self.sa.init_req_packet,
1431             self.sa.sport,
1432             self.sa.dport,
1433             self.sa.natt,
1434             self.ip6,
1435         )
1436         self.pg0.add_stream(ike_msg)
1437         self.pg0.enable_capture()
1438         self.pg_start()
1439         capture = self.pg0.get_capture(1)
1440         self.verify_sa_init(capture[0])
1441
1442     def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1443         tr_attr = self.sa.esp_crypto_attr()
1444         last_payload = last_payload or "Notify"
1445         trans_nb = 4
1446         trans = (
1447             ikev2.IKEv2_payload_Transform(
1448                 transform_type="Encryption",
1449                 transform_id=self.sa.esp_crypto,
1450                 length=tr_attr[1],
1451                 key_length=tr_attr[0],
1452             )
1453             / ikev2.IKEv2_payload_Transform(
1454                 transform_type="Integrity", transform_id=self.sa.esp_integ
1455             )
1456             / ikev2.IKEv2_payload_Transform(
1457                 transform_type="Extended Sequence Number", transform_id="No ESN"
1458             )
1459             / ikev2.IKEv2_payload_Transform(
1460                 transform_type="Extended Sequence Number", transform_id="ESN"
1461             )
1462         )
1463
1464         if kex:
1465             trans_nb += 1
1466             trans /= ikev2.IKEv2_payload_Transform(
1467                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1468             )
1469
1470         c = self.sa.child_sas[0]
1471         props = ikev2.IKEv2_payload_Proposal(
1472             proposal=1,
1473             proto="ESP",
1474             SPIsize=4,
1475             SPI=c.ispi,
1476             trans_nb=trans_nb,
1477             trans=trans,
1478         )
1479
1480         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1481         plain = (
1482             ikev2.IKEv2_payload_AUTH(
1483                 next_payload="SA",
1484                 auth_type=AuthMethod.value(self.sa.auth_method),
1485                 load=self.sa.auth_data,
1486             )
1487             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1488             / ikev2.IKEv2_payload_TSi(
1489                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1490             )
1491             / ikev2.IKEv2_payload_TSr(
1492                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1493             )
1494         )
1495
1496         if is_rekey:
1497             first_payload = "Nonce"
1498             if kex:
1499                 head = ikev2.IKEv2_payload_Nonce(
1500                     load=self.sa.i_nonce, next_payload="KE"
1501                 ) / ikev2.IKEv2_payload_KE(
1502                     group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1503                 )
1504             else:
1505                 head = ikev2.IKEv2_payload_Nonce(
1506                     load=self.sa.i_nonce, next_payload="SA"
1507                 )
1508             plain = (
1509                 head
1510                 / plain
1511                 / ikev2.IKEv2_payload_Notify(
1512                     type="REKEY_SA",
1513                     proto="ESP",
1514                     SPI=c.ispi,
1515                     length=8 + len(c.ispi),
1516                     next_payload="Notify",
1517                 )
1518                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1519             )
1520         else:
1521             first_payload = "IDi"
1522             if self.no_idr_auth:
1523                 ids = ikev2.IKEv2_payload_IDi(
1524                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1525                 )
1526             else:
1527                 ids = ikev2.IKEv2_payload_IDi(
1528                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1529                 ) / ikev2.IKEv2_payload_IDr(
1530                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1531                 )
1532             plain = ids / plain
1533         return plain, first_payload
1534
1535     def send_sa_auth(self):
1536         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1537         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1538         header = ikev2.IKEv2(
1539             init_SPI=self.sa.ispi,
1540             resp_SPI=self.sa.rspi,
1541             id=self.sa.new_msg_id(),
1542             flags="Initiator",
1543             exch_type="IKE_AUTH",
1544         )
1545
1546         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1547         packet = self.create_packet(
1548             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1549         )
1550         self.pg0.add_stream(packet)
1551         self.pg0.enable_capture()
1552         self.pg_start()
1553         capture = self.pg0.get_capture(1)
1554         self.verify_sa_auth_resp(capture[0])
1555
1556     def verify_sa_init(self, packet):
1557         ih = self.get_ike_header(packet)
1558
1559         self.assertEqual(ih.id, self.sa.msg_id)
1560         self.assertEqual(ih.exch_type, 34)
1561         self.assertIn("Response", ih.flags)
1562         self.assertEqual(ih.init_SPI, self.sa.ispi)
1563         self.assertNotEqual(ih.resp_SPI, 0)
1564         self.sa.rspi = ih.resp_SPI
1565         try:
1566             sa = ih[ikev2.IKEv2_payload_SA]
1567             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1568             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1569         except IndexError as e:
1570             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1571             self.logger.error(ih.show())
1572             raise
1573         self.sa.complete_dh_data()
1574         self.sa.calc_keys()
1575         self.sa.auth_init()
1576
1577     def verify_sa_auth_resp(self, packet):
1578         ike = self.get_ike_header(packet)
1579         udp = packet[UDP]
1580         self.verify_udp(udp)
1581         self.assertEqual(ike.id, self.sa.msg_id)
1582         plain = self.sa.hmac_and_decrypt(ike)
1583         idr = ikev2.IKEv2_payload_IDr(plain)
1584         prop = idr[ikev2.IKEv2_payload_Proposal]
1585         self.assertEqual(prop.SPIsize, 4)
1586         self.sa.child_sas[0].rspi = prop.SPI
1587         self.sa.calc_child_keys()
1588
1589     IKE_NODE_SUFFIX = "ip4"
1590
1591     def verify_counters(self):
1592         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1593         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1594         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1595
1596         r = self.vapi.ikev2_sa_dump()
1597         s = r[0].sa.stats
1598         self.assertEqual(1, s.n_sa_auth_req)
1599         self.assertEqual(1, s.n_sa_init_req)
1600
1601     def test_responder(self):
1602         self.send_sa_init_req()
1603         self.send_sa_auth()
1604         self.verify_ipsec_sas()
1605         self.verify_ike_sas()
1606         self.verify_counters()
1607
1608
1609 class Ikev2Params(object):
1610     def config_params(self, params={}):
1611         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1612         ei = VppEnum.vl_api_ipsec_integ_alg_t
1613         self.vpp_enums = {
1614             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1615             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1616             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1617             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1618             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1619             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1620             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1621             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1622             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1623             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1624         }
1625
1626         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1627         if dpd_disabled:
1628             self.vapi.cli("ikev2 dpd disable")
1629         self.del_sa_from_responder = (
1630             False
1631             if "del_sa_from_responder" not in params
1632             else params["del_sa_from_responder"]
1633         )
1634         i_natt = False if "i_natt" not in params else params["i_natt"]
1635         r_natt = False if "r_natt" not in params else params["r_natt"]
1636         self.p = Profile(self, "pr1")
1637         self.ip6 = False if "ip6" not in params else params["ip6"]
1638
1639         if "auth" in params and params["auth"] == "rsa-sig":
1640             auth_method = "rsa-sig"
1641             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1642             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1643
1644             client_file = work_dir + params["client-cert"]
1645             server_pem = open(work_dir + params["server-cert"]).read()
1646             client_priv = open(work_dir + params["client-key"]).read()
1647             client_priv = load_pem_private_key(
1648                 str.encode(client_priv), None, default_backend()
1649             )
1650             self.peer_cert = x509.load_pem_x509_certificate(
1651                 str.encode(server_pem), default_backend()
1652             )
1653             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1654             auth_data = None
1655         else:
1656             auth_data = b"$3cr3tpa$$w0rd"
1657             self.p.add_auth(method="shared-key", data=auth_data)
1658             auth_method = "shared-key"
1659             client_priv = None
1660
1661         is_init = True if "is_initiator" not in params else params["is_initiator"]
1662         self.no_idr_auth = params.get("no_idr_in_auth", False)
1663
1664         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1665         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1666         r_id = self.idr = idr["data"]
1667         i_id = self.idi = idi["data"]
1668         if is_init:
1669             # scapy is initiator, VPP is responder
1670             self.p.add_local_id(**idr)
1671             self.p.add_remote_id(**idi)
1672             if self.no_idr_auth:
1673                 r_id = None
1674         else:
1675             # VPP is initiator, scapy is responder
1676             self.p.add_local_id(**idi)
1677             if not self.no_idr_auth:
1678                 self.p.add_remote_id(**idr)
1679
1680         loc_ts = (
1681             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1682             if "loc_ts" not in params
1683             else params["loc_ts"]
1684         )
1685         rem_ts = (
1686             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1687             if "rem_ts" not in params
1688             else params["rem_ts"]
1689         )
1690         self.p.add_local_ts(**loc_ts)
1691         self.p.add_remote_ts(**rem_ts)
1692         if "responder" in params:
1693             self.p.add_responder(params["responder"])
1694         if "ike_transforms" in params:
1695             self.p.add_ike_transforms(params["ike_transforms"])
1696         if "esp_transforms" in params:
1697             self.p.add_esp_transforms(params["esp_transforms"])
1698
1699         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1700         if udp_encap:
1701             self.p.set_udp_encap(True)
1702
1703         if "responder_hostname" in params:
1704             hn = params["responder_hostname"]
1705             self.p.add_responder_hostname(hn)
1706
1707             # configure static dns record
1708             self.vapi.dns_name_server_add_del(
1709                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1710             )
1711             self.vapi.dns_enable_disable(enable=1)
1712
1713             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1714             self.vapi.cli(cmd)
1715
1716         self.sa = IKEv2SA(
1717             self,
1718             i_id=i_id,
1719             r_id=r_id,
1720             is_initiator=is_init,
1721             id_type=self.p.local_id["id_type"],
1722             i_natt=i_natt,
1723             r_natt=r_natt,
1724             priv_key=client_priv,
1725             auth_method=auth_method,
1726             nonce=params.get("nonce"),
1727             auth_data=auth_data,
1728             udp_encap=udp_encap,
1729             local_ts=self.p.remote_ts,
1730             remote_ts=self.p.local_ts,
1731         )
1732
1733         if is_init:
1734             ike_crypto = (
1735                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1736             )
1737             ike_integ = (
1738                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1739             )
1740             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1741
1742             esp_crypto = (
1743                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1744             )
1745             esp_integ = (
1746                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1747             )
1748
1749             self.sa.set_ike_props(
1750                 crypto=ike_crypto[0],
1751                 crypto_key_len=ike_crypto[1],
1752                 integ=ike_integ,
1753                 prf="PRF_HMAC_SHA2_256",
1754                 dh=ike_dh,
1755             )
1756             self.sa.set_esp_props(
1757                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1758             )
1759
1760
1761 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1762 class TestApi(VppTestCase):
1763     """Test IKEV2 API"""
1764
1765     @classmethod
1766     def setUpClass(cls):
1767         super(TestApi, cls).setUpClass()
1768
1769     @classmethod
1770     def tearDownClass(cls):
1771         super(TestApi, cls).tearDownClass()
1772
1773     def tearDown(self):
1774         super(TestApi, self).tearDown()
1775         self.p1.remove_vpp_config()
1776         self.p2.remove_vpp_config()
1777         r = self.vapi.ikev2_profile_dump()
1778         self.assertEqual(len(r), 0)
1779
1780     def configure_profile(self, cfg):
1781         p = Profile(self, cfg["name"])
1782         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1783         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1784         p.add_local_ts(**cfg["loc_ts"])
1785         p.add_remote_ts(**cfg["rem_ts"])
1786         p.add_responder(cfg["responder"])
1787         p.add_ike_transforms(cfg["ike_ts"])
1788         p.add_esp_transforms(cfg["esp_ts"])
1789         p.add_auth(**cfg["auth"])
1790         p.set_udp_encap(cfg["udp_encap"])
1791         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1792         if "lifetime_data" in cfg:
1793             p.set_lifetime_data(cfg["lifetime_data"])
1794         if "tun_itf" in cfg:
1795             p.set_tunnel_interface(cfg["tun_itf"])
1796         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1797             p.disable_natt()
1798         p.add_vpp_config()
1799         return p
1800
1801     def test_profile_api(self):
1802         """test profile dump API"""
1803         loc_ts4 = {
1804             "proto": 8,
1805             "start_port": 1,
1806             "end_port": 19,
1807             "start_addr": "3.3.3.2",
1808             "end_addr": "3.3.3.3",
1809         }
1810         rem_ts4 = {
1811             "proto": 9,
1812             "start_port": 10,
1813             "end_port": 119,
1814             "start_addr": "4.5.76.80",
1815             "end_addr": "2.3.4.6",
1816         }
1817
1818         loc_ts6 = {
1819             "proto": 8,
1820             "start_port": 1,
1821             "end_port": 19,
1822             "start_addr": "ab::1",
1823             "end_addr": "ab::4",
1824         }
1825         rem_ts6 = {
1826             "proto": 9,
1827             "start_port": 10,
1828             "end_port": 119,
1829             "start_addr": "cd::12",
1830             "end_addr": "cd::13",
1831         }
1832
1833         conf = {
1834             "p1": {
1835                 "name": "p1",
1836                 "natt_disabled": True,
1837                 "loc_id": ("fqdn", b"vpp.home"),
1838                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1839                 "loc_ts": loc_ts4,
1840                 "rem_ts": rem_ts4,
1841                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1842                 "ike_ts": {
1843                     "crypto_alg": 20,
1844                     "crypto_key_size": 32,
1845                     "integ_alg": 0,
1846                     "dh_group": 1,
1847                 },
1848                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1849                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1850                 "udp_encap": True,
1851                 "ipsec_over_udp_port": 4501,
1852                 "lifetime_data": {
1853                     "lifetime": 123,
1854                     "lifetime_maxdata": 20192,
1855                     "lifetime_jitter": 9,
1856                     "handover": 132,
1857                 },
1858             },
1859             "p2": {
1860                 "name": "p2",
1861                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1862                 "rem_id": ("ip6-addr", b"abcd::1"),
1863                 "loc_ts": loc_ts6,
1864                 "rem_ts": rem_ts6,
1865                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1866                 "ike_ts": {
1867                     "crypto_alg": 12,
1868                     "crypto_key_size": 16,
1869                     "integ_alg": 3,
1870                     "dh_group": 3,
1871                 },
1872                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1873                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1874                 "udp_encap": False,
1875                 "ipsec_over_udp_port": 4600,
1876                 "tun_itf": 0,
1877             },
1878         }
1879         self.p1 = self.configure_profile(conf["p1"])
1880         self.p2 = self.configure_profile(conf["p2"])
1881
1882         r = self.vapi.ikev2_profile_dump()
1883         self.assertEqual(len(r), 2)
1884         self.verify_profile(r[0].profile, conf["p1"])
1885         self.verify_profile(r[1].profile, conf["p2"])
1886
1887     def verify_id(self, api_id, cfg_id):
1888         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1889         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1890
1891     def verify_ts(self, api_ts, cfg_ts):
1892         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1893         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1894         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1895         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1896         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1897
1898     def verify_responder(self, api_r, cfg_r):
1899         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1900         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1901
1902     def verify_transforms(self, api_ts, cfg_ts):
1903         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1904         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1905         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1906
1907     def verify_ike_transforms(self, api_ts, cfg_ts):
1908         self.verify_transforms(api_ts, cfg_ts)
1909         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1910
1911     def verify_esp_transforms(self, api_ts, cfg_ts):
1912         self.verify_transforms(api_ts, cfg_ts)
1913
1914     def verify_auth(self, api_auth, cfg_auth):
1915         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1916         self.assertEqual(api_auth.data, cfg_auth["data"])
1917         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1918
1919     def verify_lifetime_data(self, p, ld):
1920         self.assertEqual(p.lifetime, ld["lifetime"])
1921         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1922         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1923         self.assertEqual(p.handover, ld["handover"])
1924
1925     def verify_profile(self, ap, cp):
1926         self.assertEqual(ap.name, cp["name"])
1927         self.assertEqual(ap.udp_encap, cp["udp_encap"])
1928         self.verify_id(ap.loc_id, cp["loc_id"])
1929         self.verify_id(ap.rem_id, cp["rem_id"])
1930         self.verify_ts(ap.loc_ts, cp["loc_ts"])
1931         self.verify_ts(ap.rem_ts, cp["rem_ts"])
1932         self.verify_responder(ap.responder, cp["responder"])
1933         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1934         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1935         self.verify_auth(ap.auth, cp["auth"])
1936         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1937         self.assertTrue(natt_dis == ap.natt_disabled)
1938
1939         if "lifetime_data" in cp:
1940             self.verify_lifetime_data(ap, cp["lifetime_data"])
1941         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1942         if "tun_itf" in cp:
1943             self.assertEqual(ap.tun_itf, cp["tun_itf"])
1944         else:
1945             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1946
1947
1948 @tag_fixme_vpp_workers
1949 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1950     """test responder - responder behind NAT"""
1951
1952     IKE_NODE_SUFFIX = "ip4-natt"
1953
1954     def config_tc(self):
1955         self.config_params({"r_natt": True})
1956
1957
1958 @tag_fixme_vpp_workers
1959 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1960     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1961
1962     def config_tc(self):
1963         self.config_params(
1964             {
1965                 "i_natt": True,
1966                 "is_initiator": False,  # seen from test case perspective
1967                 # thus vpp is initiator
1968                 "responder": {
1969                     "sw_if_index": self.pg0.sw_if_index,
1970                     "addr": self.pg0.remote_ip4,
1971                 },
1972                 "ike-crypto": ("AES-GCM-16ICV", 32),
1973                 "ike-integ": "NULL",
1974                 "ike-dh": "3072MODPgr",
1975                 "ike_transforms": {
1976                     "crypto_alg": 20,  # "aes-gcm-16"
1977                     "crypto_key_size": 256,
1978                     "dh_group": 15,  # "modp-3072"
1979                 },
1980                 "esp_transforms": {
1981                     "crypto_alg": 12,  # "aes-cbc"
1982                     "crypto_key_size": 256,
1983                     # "hmac-sha2-256-128"
1984                     "integ_alg": 12,
1985                 },
1986             }
1987         )
1988
1989
1990 @tag_fixme_vpp_workers
1991 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1992     """test ikev2 initiator - pre shared key auth"""
1993
1994     def config_tc(self):
1995         self.config_params(
1996             {
1997                 "is_initiator": False,  # seen from test case perspective
1998                 # thus vpp is initiator
1999                 "ike-crypto": ("AES-GCM-16ICV", 32),
2000                 "ike-integ": "NULL",
2001                 "ike-dh": "3072MODPgr",
2002                 "ike_transforms": {
2003                     "crypto_alg": 20,  # "aes-gcm-16"
2004                     "crypto_key_size": 256,
2005                     "dh_group": 15,  # "modp-3072"
2006                 },
2007                 "esp_transforms": {
2008                     "crypto_alg": 12,  # "aes-cbc"
2009                     "crypto_key_size": 256,
2010                     # "hmac-sha2-256-128"
2011                     "integ_alg": 12,
2012                 },
2013                 "responder_hostname": {
2014                     "hostname": "vpp.responder.org",
2015                     "sw_if_index": self.pg0.sw_if_index,
2016                 },
2017             }
2018         )
2019
2020
2021 @tag_fixme_vpp_workers
2022 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
2023     """test initiator - request window size (1)"""
2024
2025     def rekey_respond(self, req, update_child_sa_data):
2026         ih = self.get_ike_header(req)
2027         plain = self.sa.hmac_and_decrypt(ih)
2028         sa = ikev2.IKEv2_payload_SA(plain)
2029         if update_child_sa_data:
2030             prop = sa[ikev2.IKEv2_payload_Proposal]
2031             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2032             self.sa.r_nonce = self.sa.i_nonce
2033             self.sa.child_sas[0].ispi = prop.SPI
2034             self.sa.child_sas[0].rspi = prop.SPI
2035             self.sa.calc_child_keys()
2036
2037         header = ikev2.IKEv2(
2038             init_SPI=self.sa.ispi,
2039             resp_SPI=self.sa.rspi,
2040             flags="Response",
2041             exch_type=36,
2042             id=ih.id,
2043             next_payload="Encrypted",
2044         )
2045         resp = self.encrypt_ike_msg(header, sa, "SA")
2046         packet = self.create_packet(
2047             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2048         )
2049         self.send_and_assert_no_replies(self.pg0, packet)
2050
2051     def test_initiator(self):
2052         super(TestInitiatorRequestWindowSize, self).test_initiator()
2053         self.pg0.enable_capture()
2054         self.pg_start()
2055         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2056         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2057         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2058         capture = self.pg0.get_capture(2)
2059
2060         # reply in reverse order
2061         self.rekey_respond(capture[1], True)
2062         self.rekey_respond(capture[0], False)
2063
2064         # verify that only the second request was accepted
2065         self.verify_ike_sas()
2066         self.verify_ipsec_sas(is_rekey=True)
2067
2068
2069 @tag_fixme_vpp_workers
2070 class TestInitiatorRekey(TestInitiatorPsk):
2071     """test ikev2 initiator - rekey"""
2072
2073     def rekey_from_initiator(self):
2074         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2075         self.pg0.enable_capture()
2076         self.pg_start()
2077         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2078         capture = self.pg0.get_capture(1)
2079         ih = self.get_ike_header(capture[0])
2080         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
2081         self.assertNotIn("Response", ih.flags)
2082         self.assertIn("Initiator", ih.flags)
2083         plain = self.sa.hmac_and_decrypt(ih)
2084         sa = ikev2.IKEv2_payload_SA(plain)
2085         prop = sa[ikev2.IKEv2_payload_Proposal]
2086         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2087         self.sa.r_nonce = self.sa.i_nonce
2088         # update new responder SPI
2089         self.sa.child_sas[0].ispi = prop.SPI
2090         self.sa.child_sas[0].rspi = prop.SPI
2091         self.sa.calc_child_keys()
2092         header = ikev2.IKEv2(
2093             init_SPI=self.sa.ispi,
2094             resp_SPI=self.sa.rspi,
2095             flags="Response",
2096             exch_type=36,
2097             id=ih.id,
2098             next_payload="Encrypted",
2099         )
2100         resp = self.encrypt_ike_msg(header, sa, "SA")
2101         packet = self.create_packet(
2102             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2103         )
2104         self.send_and_assert_no_replies(self.pg0, packet)
2105
2106     def test_initiator(self):
2107         super(TestInitiatorRekey, self).test_initiator()
2108         self.rekey_from_initiator()
2109         self.verify_ike_sas()
2110         self.verify_ipsec_sas(is_rekey=True)
2111
2112
2113 @tag_fixme_vpp_workers
2114 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2115     """test ikev2 initiator - delete IKE SA from responder"""
2116
2117     def config_tc(self):
2118         self.config_params(
2119             {
2120                 "del_sa_from_responder": True,
2121                 "is_initiator": False,  # seen from test case perspective
2122                 # thus vpp is initiator
2123                 "responder": {
2124                     "sw_if_index": self.pg0.sw_if_index,
2125                     "addr": self.pg0.remote_ip4,
2126                 },
2127                 "ike-crypto": ("AES-GCM-16ICV", 32),
2128                 "ike-integ": "NULL",
2129                 "ike-dh": "3072MODPgr",
2130                 "ike_transforms": {
2131                     "crypto_alg": 20,  # "aes-gcm-16"
2132                     "crypto_key_size": 256,
2133                     "dh_group": 15,  # "modp-3072"
2134                 },
2135                 "esp_transforms": {
2136                     "crypto_alg": 12,  # "aes-cbc"
2137                     "crypto_key_size": 256,
2138                     # "hmac-sha2-256-128"
2139                     "integ_alg": 12,
2140                 },
2141                 "no_idr_in_auth": True,
2142             }
2143         )
2144
2145
2146 @tag_fixme_vpp_workers
2147 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2148     """test ikev2 responder - initiator behind NAT"""
2149
2150     IKE_NODE_SUFFIX = "ip4-natt"
2151
2152     def config_tc(self):
2153         self.config_params({"i_natt": True})
2154
2155
2156 @tag_fixme_vpp_workers
2157 class TestResponderPsk(TemplateResponder, Ikev2Params):
2158     """test ikev2 responder - pre shared key auth"""
2159
2160     def config_tc(self):
2161         self.config_params()
2162
2163
2164 @tag_fixme_vpp_workers
2165 class TestResponderDpd(TestResponderPsk):
2166     """
2167     Dead peer detection test
2168     """
2169
2170     def config_tc(self):
2171         self.config_params({"dpd_disabled": False})
2172
2173     def tearDown(self):
2174         pass
2175
2176     def test_responder(self):
2177         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2178         super(TestResponderDpd, self).test_responder()
2179         self.pg0.enable_capture()
2180         self.pg_start()
2181         # capture empty request but don't reply
2182         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2183         ih = self.get_ike_header(capture[0])
2184         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2185         plain = self.sa.hmac_and_decrypt(ih)
2186         self.assertEqual(plain, b"")
2187         # wait for SA expiration
2188         time.sleep(3)
2189         ike_sas = self.vapi.ikev2_sa_dump()
2190         self.assertEqual(len(ike_sas), 0)
2191         ipsec_sas = self.vapi.ipsec_sa_dump()
2192         self.assertEqual(len(ipsec_sas), 0)
2193
2194
2195 @tag_fixme_vpp_workers
2196 class TestResponderRekey(TestResponderPsk):
2197     """test ikev2 responder - rekey"""
2198
2199     WITH_KEX = False
2200
2201     def send_rekey_from_initiator(self):
2202         if self.WITH_KEX:
2203             self.sa.generate_dh_data()
2204         packet = self.create_rekey_request(kex=self.WITH_KEX)
2205         self.pg0.add_stream(packet)
2206         self.pg0.enable_capture()
2207         self.pg_start()
2208         capture = self.pg0.get_capture(1)
2209         return capture
2210
2211     def process_rekey_response(self, capture):
2212         ih = self.get_ike_header(capture[0])
2213         plain = self.sa.hmac_and_decrypt(ih)
2214         sa = ikev2.IKEv2_payload_SA(plain)
2215         prop = sa[ikev2.IKEv2_payload_Proposal]
2216         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2217         # update new responder SPI
2218         self.sa.child_sas[0].rspi = prop.SPI
2219         if self.WITH_KEX:
2220             self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2221             self.sa.complete_dh_data()
2222         self.sa.calc_child_keys(kex=self.WITH_KEX)
2223
2224     def test_responder(self):
2225         super(TestResponderRekey, self).test_responder()
2226         self.process_rekey_response(self.send_rekey_from_initiator())
2227         self.verify_ike_sas()
2228         self.verify_ipsec_sas(is_rekey=True)
2229         self.assert_counter(1, "rekey_req", "ip4")
2230         r = self.vapi.ikev2_sa_dump()
2231         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2232
2233
2234 @tag_fixme_vpp_workers
2235 class TestResponderRekeyRepeat(TestResponderRekey):
2236     """test ikev2 responder - rekey repeat"""
2237
2238     def test_responder(self):
2239         super(TestResponderRekeyRepeat, self).test_responder()
2240         # rekey request is not accepted until old IPsec SA is expired
2241         capture = self.send_rekey_from_initiator()
2242         ih = self.get_ike_header(capture[0])
2243         plain = self.sa.hmac_and_decrypt(ih)
2244         notify = ikev2.IKEv2_payload_Notify(plain)
2245         self.assertEqual(notify.type, 43)
2246         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2247         # rekey request is accepted after old IPsec SA was expired
2248         for _ in range(50):
2249             if len(self.vapi.ipsec_sa_dump()) != 3:
2250                 break
2251             time.sleep(0.2)
2252         else:
2253             self.fail("old IPsec SA not expired")
2254         self.process_rekey_response(self.send_rekey_from_initiator())
2255         self.verify_ike_sas()
2256         self.verify_ipsec_sas(sa_count=3)
2257
2258
2259 @tag_fixme_vpp_workers
2260 class TestResponderRekeyKEX(TestResponderRekey):
2261     """test ikev2 responder - rekey with key exchange"""
2262
2263     WITH_KEX = True
2264
2265
2266 @tag_fixme_vpp_workers
2267 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2268     """test ikev2 responder - rekey repeat with key exchange"""
2269
2270     WITH_KEX = True
2271
2272
2273 @tag_fixme_vpp_workers
2274 class TestResponderRekeySA(TestResponderPsk):
2275     """test ikev2 responder - rekey IKE SA"""
2276
2277     def send_rekey_from_initiator(self, newsa):
2278         packet = self.create_sa_rekey_request(
2279             spi=newsa.ispi,
2280             dh_pub_key=newsa.my_dh_pub_key,
2281             nonce=newsa.i_nonce,
2282         )
2283         self.pg0.add_stream(packet)
2284         self.pg0.enable_capture()
2285         self.pg_start()
2286         capture = self.pg0.get_capture(1)
2287         return capture
2288
2289     def process_rekey_response(self, newsa, capture):
2290         ih = self.get_ike_header(capture[0])
2291         plain = self.sa.hmac_and_decrypt(ih)
2292         sa = ikev2.IKEv2_payload_SA(plain)
2293         prop = sa[ikev2.IKEv2_payload_Proposal]
2294         newsa.rspi = prop.SPI
2295         newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2296         newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2297         newsa.complete_dh_data()
2298         newsa.calc_keys(sk_d=self.sa.sk_d)
2299         newsa.child_sas = self.sa.child_sas
2300         self.sa.child_sas = []
2301
2302     def test_responder(self):
2303         super(TestResponderRekeySA, self).test_responder()
2304         newsa = self.sa.clone(self, spi=os.urandom(8))
2305         newsa.generate_dh_data()
2306         capture = self.send_rekey_from_initiator(newsa)
2307         self.process_rekey_response(newsa, capture)
2308         self.verify_ike_sas(is_rekey=True)
2309         self.assert_counter(1, "rekey_req", "ip4")
2310         r = self.vapi.ikev2_sa_dump()
2311         self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
2312         self.initiate_del_sa_from_initiator()
2313         self.sa = newsa
2314         self.verify_ike_sas()
2315
2316
2317 @tag_fixme_ubuntu2204
2318 @tag_fixme_debian11
2319 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2320     """test ikev2 responder - non-default table id"""
2321
2322     @classmethod
2323     def setUpClass(cls):
2324         import scapy.contrib.ikev2 as _ikev2
2325
2326         globals()["ikev2"] = _ikev2
2327         super(IkePeer, cls).setUpClass()
2328         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2329             cls, "vpp"
2330         ):
2331             return
2332         cls.create_pg_interfaces(range(1))
2333         cls.vapi.cli("ip table add 1")
2334         cls.vapi.cli("set interface ip table pg0 1")
2335         for i in cls.pg_interfaces:
2336             i.admin_up()
2337             i.config_ip4()
2338             i.resolve_arp()
2339             i.config_ip6()
2340             i.resolve_ndp()
2341
2342     def config_tc(self):
2343         self.config_params({"dpd_disabled": False})
2344
2345     def test_responder(self):
2346         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2347         super(TestResponderVrf, self).test_responder()
2348         self.pg0.enable_capture()
2349         self.pg_start()
2350         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2351         ih = self.get_ike_header(capture[0])
2352         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2353         plain = self.sa.hmac_and_decrypt(ih)
2354         self.assertEqual(plain, b"")
2355
2356
2357 @tag_fixme_vpp_workers
2358 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2359     """test ikev2 responder - cert based auth"""
2360
2361     def config_tc(self):
2362         self.config_params(
2363             {
2364                 "udp_encap": True,
2365                 "auth": "rsa-sig",
2366                 "server-key": "server-key.pem",
2367                 "client-key": "client-key.pem",
2368                 "client-cert": "client-cert.pem",
2369                 "server-cert": "server-cert.pem",
2370             }
2371         )
2372
2373
2374 @tag_fixme_vpp_workers
2375 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2376     TemplateResponder, Ikev2Params
2377 ):
2378     """
2379     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2380     """
2381
2382     def config_tc(self):
2383         self.config_params(
2384             {
2385                 "ike-crypto": ("AES-CBC", 16),
2386                 "ike-integ": "SHA2-256-128",
2387                 "esp-crypto": ("AES-CBC", 24),
2388                 "esp-integ": "SHA2-384-192",
2389                 "ike-dh": "2048MODPgr",
2390                 "nonce": os.urandom(256),
2391                 "no_idr_in_auth": True,
2392             }
2393         )
2394
2395
2396 @tag_fixme_vpp_workers
2397 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2398     TemplateResponder, Ikev2Params
2399 ):
2400
2401     """
2402     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2403     """
2404
2405     def config_tc(self):
2406         self.config_params(
2407             {
2408                 "ike-crypto": ("AES-CBC", 32),
2409                 "ike-integ": "SHA2-256-128",
2410                 "esp-crypto": ("AES-GCM-16ICV", 32),
2411                 "esp-integ": "NULL",
2412                 "ike-dh": "3072MODPgr",
2413             }
2414         )
2415
2416
2417 @tag_fixme_vpp_workers
2418 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2419     """
2420     IKE:AES_GCM_16_256
2421     """
2422
2423     IKE_NODE_SUFFIX = "ip6"
2424
2425     def config_tc(self):
2426         self.config_params(
2427             {
2428                 "del_sa_from_responder": True,
2429                 "ip6": True,
2430                 "natt": True,
2431                 "ike-crypto": ("AES-GCM-16ICV", 32),
2432                 "ike-integ": "NULL",
2433                 "ike-dh": "2048MODPgr",
2434                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2435                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2436             }
2437         )
2438
2439
2440 @tag_fixme_vpp_workers
2441 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2442     """
2443     Test for keep alive messages
2444     """
2445
2446     def send_empty_req_from_responder(self):
2447         packet = self.create_empty_request()
2448         self.pg0.add_stream(packet)
2449         self.pg0.enable_capture()
2450         self.pg_start()
2451         capture = self.pg0.get_capture(1)
2452         ih = self.get_ike_header(capture[0])
2453         self.assertEqual(ih.id, self.sa.msg_id)
2454         plain = self.sa.hmac_and_decrypt(ih)
2455         self.assertEqual(plain, b"")
2456         self.assert_counter(1, "keepalive", "ip4")
2457         r = self.vapi.ikev2_sa_dump()
2458         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2459
2460     def test_initiator(self):
2461         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2462         self.send_empty_req_from_responder()
2463
2464
2465 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2466     """malformed packet test"""
2467
2468     def tearDown(self):
2469         pass
2470
2471     def config_tc(self):
2472         self.config_params()
2473
2474     def create_ike_init_msg(self, length=None, payload=None):
2475         msg = ikev2.IKEv2(
2476             length=length,
2477             init_SPI="\x11" * 8,
2478             flags="Initiator",
2479             exch_type="IKE_SA_INIT",
2480         )
2481         if payload is not None:
2482             msg /= payload
2483         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2484
2485     def verify_bad_packet_length(self):
2486         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2487         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2488         self.assert_counter(self.pkt_count, "bad_length")
2489
2490     def verify_bad_sa_payload_length(self):
2491         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2492         ike_msg = self.create_ike_init_msg(payload=p)
2493         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2494         self.assert_counter(self.pkt_count, "malformed_packet")
2495
2496     def test_responder(self):
2497         self.pkt_count = 254
2498         self.verify_bad_packet_length()
2499         self.verify_bad_sa_payload_length()
2500
2501
2502 if __name__ == "__main__":
2503     unittest.main(testRunner=VppTestRunner)