tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import VppTestCase
23 from asfframework import (
24     tag_fixme_vpp_workers,
25     tag_fixme_ubuntu2204,
26     tag_fixme_debian11,
27     is_distro_ubuntu2204,
28     is_distro_debian11,
29     VppTestRunner,
30 )
31 from vpp_ikev2 import Profile, IDType, AuthMethod
32 from vpp_papi import VppEnum
33
34 try:
35     text_type = unicode
36 except NameError:
37     text_type = str
38
39 KEY_PAD = b"Key Pad for IKEv2"
40 SALT_SIZE = 4
41 GCM_ICV_SIZE = 16
42 GCM_IV_SIZE = 8
43
44
45 # defined in rfc3526
46 # tuple structure is (p, g, key_len)
47 DH = {
48     "2048MODPgr": (
49         long_converter(
50             """
51     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
62         ),
63         2,
64         256,
65     ),
66     "3072MODPgr": (
67         long_converter(
68             """
69     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
70     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
71     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
72     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
73     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
74     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
75     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
76     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
77     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
78     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
79     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
80     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
81     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
82     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
83     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
84     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
85         ),
86         2,
87         384,
88     ),
89 }
90
91
92 class CryptoAlgo(object):
93     def __init__(self, name, cipher, mode):
94         self.name = name
95         self.cipher = cipher
96         self.mode = mode
97         if self.cipher is not None:
98             self.bs = self.cipher.block_size // 8
99
100             if self.name == "AES-GCM-16ICV":
101                 self.iv_len = GCM_IV_SIZE
102             else:
103                 self.iv_len = self.bs
104
105     def encrypt(self, data, key, aad=None):
106         iv = os.urandom(self.iv_len)
107         if aad is None:
108             encryptor = Cipher(
109                 self.cipher(key), self.mode(iv), default_backend()
110             ).encryptor()
111             return iv + encryptor.update(data) + encryptor.finalize()
112         else:
113             salt = key[-SALT_SIZE:]
114             nonce = salt + iv
115             encryptor = Cipher(
116                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
117             ).encryptor()
118             encryptor.authenticate_additional_data(aad)
119             data = encryptor.update(data) + encryptor.finalize()
120             data += encryptor.tag[:GCM_ICV_SIZE]
121             return iv + data
122
123     def decrypt(self, data, key, aad=None, icv=None):
124         if aad is None:
125             iv = data[: self.iv_len]
126             ct = data[self.iv_len :]
127             decryptor = Cipher(
128                 algorithms.AES(key), self.mode(iv), default_backend()
129             ).decryptor()
130             return decryptor.update(ct) + decryptor.finalize()
131         else:
132             salt = key[-SALT_SIZE:]
133             nonce = salt + data[:GCM_IV_SIZE]
134             ct = data[GCM_IV_SIZE:]
135             key = key[:-SALT_SIZE]
136             decryptor = Cipher(
137                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
138             ).decryptor()
139             decryptor.authenticate_additional_data(aad)
140             return decryptor.update(ct) + decryptor.finalize()
141
142     def pad(self, data):
143         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
144         data = data + b"\x00" * (pad_len - 1)
145         return data + bytes([pad_len - 1])
146
147
148 class AuthAlgo(object):
149     def __init__(self, name, mac, mod, key_len, trunc_len=None):
150         self.name = name
151         self.mac = mac
152         self.mod = mod
153         self.key_len = key_len
154         self.trunc_len = trunc_len or key_len
155
156
157 CRYPTO_ALGOS = {
158     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
159     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
160     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
161 }
162
163 AUTH_ALGOS = {
164     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
165     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
166     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
167     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
168     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
169 }
170
171 PRF_ALGOS = {
172     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
173     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
174 }
175
176 CRYPTO_IDS = {
177     12: "AES-CBC",
178     20: "AES-GCM-16ICV",
179 }
180
181 INTEG_IDS = {
182     2: "HMAC-SHA1-96",
183     12: "SHA2-256-128",
184     13: "SHA2-384-192",
185     14: "SHA2-512-256",
186 }
187
188
189 class IKEv2ChildSA(object):
190     def __init__(self, local_ts, remote_ts, is_initiator):
191         spi = os.urandom(4)
192         if is_initiator:
193             self.ispi = spi
194             self.rspi = None
195         else:
196             self.rspi = spi
197             self.ispi = None
198         self.local_ts = local_ts
199         self.remote_ts = remote_ts
200
201
202 class IKEv2SA(object):
203     def __init__(
204         self,
205         test,
206         is_initiator=True,
207         i_id=None,
208         r_id=None,
209         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
210         id_type="fqdn",
211         nonce=None,
212         auth_data=None,
213         local_ts=None,
214         remote_ts=None,
215         auth_method="shared-key",
216         priv_key=None,
217         i_natt=False,
218         r_natt=False,
219         udp_encap=False,
220     ):
221         self.udp_encap = udp_encap
222         self.i_natt = i_natt
223         self.r_natt = r_natt
224         if i_natt or r_natt:
225             self.sport = 4500
226             self.dport = 4500
227         else:
228             self.sport = 500
229             self.dport = 500
230         self.msg_id = 0
231         self.dh_params = None
232         self.test = test
233         self.priv_key = priv_key
234         self.is_initiator = is_initiator
235         nonce = nonce or os.urandom(32)
236         self.auth_data = auth_data
237         self.i_id = i_id
238         self.r_id = r_id
239         if isinstance(id_type, str):
240             self.id_type = IDType.value(id_type)
241         else:
242             self.id_type = id_type
243         self.auth_method = auth_method
244         if self.is_initiator:
245             self.rspi = 8 * b"\x00"
246             self.ispi = spi
247             self.i_nonce = nonce
248         else:
249             self.rspi = spi
250             self.ispi = 8 * b"\x00"
251             self.r_nonce = nonce
252         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
253
254     def new_msg_id(self):
255         self.msg_id += 1
256         return self.msg_id
257
258     @property
259     def my_dh_pub_key(self):
260         if self.is_initiator:
261             return self.i_dh_data
262         return self.r_dh_data
263
264     @property
265     def peer_dh_pub_key(self):
266         if self.is_initiator:
267             return self.r_dh_data
268         return self.i_dh_data
269
270     @property
271     def natt(self):
272         return self.i_natt or self.r_natt
273
274     def compute_secret(self):
275         priv = self.dh_private_key
276         peer = self.peer_dh_pub_key
277         p, g, l = self.ike_group
278         return pow(
279             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
280         ).to_bytes(l, "big")
281
282     def generate_dh_data(self):
283         # generate DH keys
284         if self.ike_dh not in DH:
285             raise NotImplementedError("%s not in DH group" % self.ike_dh)
286
287         if self.dh_params is None:
288             dhg = DH[self.ike_dh]
289             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
290             self.dh_params = pn.parameters(default_backend())
291
292         priv = self.dh_params.generate_private_key()
293         pub = priv.public_key()
294         x = priv.private_numbers().x
295         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
296         y = pub.public_numbers().y
297
298         if self.is_initiator:
299             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
300         else:
301             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
302
303     def complete_dh_data(self):
304         self.dh_shared_secret = self.compute_secret()
305
306     def calc_child_keys(self, kex=False):
307         prf = self.ike_prf_alg.mod()
308         s = self.i_nonce + self.r_nonce
309         if kex:
310             s = self.dh_shared_secret + s
311         c = self.child_sas[0]
312
313         encr_key_len = self.esp_crypto_key_len
314         integ_key_len = self.esp_integ_alg.key_len
315         salt_len = 0 if integ_key_len else 4
316
317         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
318         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
319
320         pos = 0
321         c.sk_ei = keymat[pos : pos + encr_key_len]
322         pos += encr_key_len
323
324         if integ_key_len:
325             c.sk_ai = keymat[pos : pos + integ_key_len]
326             pos += integ_key_len
327         else:
328             c.salt_ei = keymat[pos : pos + salt_len]
329             pos += salt_len
330
331         c.sk_er = keymat[pos : pos + encr_key_len]
332         pos += encr_key_len
333
334         if integ_key_len:
335             c.sk_ar = keymat[pos : pos + integ_key_len]
336             pos += integ_key_len
337         else:
338             c.salt_er = keymat[pos : pos + salt_len]
339             pos += salt_len
340
341     def calc_prfplus(self, prf, key, seed, length):
342         r = b""
343         t = None
344         x = 1
345         while len(r) < length and x < 255:
346             if t is not None:
347                 s = t
348             else:
349                 s = b""
350             s = s + seed + bytes([x])
351             t = self.calc_prf(prf, key, s)
352             r = r + t
353             x = x + 1
354
355         if x == 255:
356             return None
357         return r
358
359     def calc_prf(self, prf, key, data):
360         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
361         h.update(data)
362         return h.finalize()
363
364     def calc_keys(self, sk_d=None):
365         prf = self.ike_prf_alg.mod()
366         if sk_d is None:
367             # SKEYSEED = prf(Ni | Nr, g^ir)
368             self.skeyseed = self.calc_prf(
369                 prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
370             )
371         else:
372             # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
373             self.skeyseed = self.calc_prf(
374                 prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
375             )
376
377         # calculate S = Ni | Nr | SPIi SPIr
378         s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
379
380         prf_key_trunc = self.ike_prf_alg.trunc_len
381         encr_key_len = self.ike_crypto_key_len
382         tr_prf_key_len = self.ike_prf_alg.key_len
383         integ_key_len = self.ike_integ_alg.key_len
384         if integ_key_len == 0:
385             salt_size = 4
386         else:
387             salt_size = 0
388
389         l = (
390             prf_key_trunc
391             + integ_key_len * 2
392             + encr_key_len * 2
393             + tr_prf_key_len * 2
394             + salt_size * 2
395         )
396         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
397
398         pos = 0
399         self.sk_d = keymat[: pos + prf_key_trunc]
400         pos += prf_key_trunc
401
402         self.sk_ai = keymat[pos : pos + integ_key_len]
403         pos += integ_key_len
404         self.sk_ar = keymat[pos : pos + integ_key_len]
405         pos += integ_key_len
406
407         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
408         pos += encr_key_len + salt_size
409         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
410         pos += encr_key_len + salt_size
411
412         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
413         pos += tr_prf_key_len
414         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
415
416     def generate_authmsg(self, prf, packet):
417         if self.is_initiator:
418             id = self.i_id
419             nonce = self.r_nonce
420             key = self.sk_pi
421         else:
422             id = self.r_id
423             nonce = self.i_nonce
424             key = self.sk_pr
425         data = bytes([self.id_type, 0, 0, 0]) + id
426         id_hash = self.calc_prf(prf, key, data)
427         return packet + nonce + id_hash
428
429     def auth_init(self):
430         prf = self.ike_prf_alg.mod()
431         if self.is_initiator:
432             packet = self.init_req_packet
433         else:
434             packet = self.init_resp_packet
435         authmsg = self.generate_authmsg(prf, raw(packet))
436         if self.auth_method == "shared-key":
437             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
438             self.auth_data = self.calc_prf(prf, psk, authmsg)
439         elif self.auth_method == "rsa-sig":
440             self.auth_data = self.priv_key.sign(
441                 authmsg, padding.PKCS1v15(), hashes.SHA1()
442             )
443         else:
444             raise TypeError("unknown auth method type!")
445
446     def encrypt(self, data, aad=None):
447         data = self.ike_crypto_alg.pad(data)
448         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
449
450     @property
451     def peer_authkey(self):
452         if self.is_initiator:
453             return self.sk_ar
454         return self.sk_ai
455
456     @property
457     def my_authkey(self):
458         if self.is_initiator:
459             return self.sk_ai
460         return self.sk_ar
461
462     @property
463     def my_cryptokey(self):
464         if self.is_initiator:
465             return self.sk_ei
466         return self.sk_er
467
468     @property
469     def peer_cryptokey(self):
470         if self.is_initiator:
471             return self.sk_er
472         return self.sk_ei
473
474     def concat(self, alg, key_len):
475         return alg + "-" + str(key_len * 8)
476
477     @property
478     def vpp_ike_cypto_alg(self):
479         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
480
481     @property
482     def vpp_esp_cypto_alg(self):
483         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
484
485     def verify_hmac(self, ikemsg):
486         integ_trunc = self.ike_integ_alg.trunc_len
487         exp_hmac = ikemsg[-integ_trunc:]
488         data = ikemsg[:-integ_trunc]
489         computed_hmac = self.compute_hmac(
490             self.ike_integ_alg.mod(), self.peer_authkey, data
491         )
492         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
493
494     def compute_hmac(self, integ, key, data):
495         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
496         h.update(data)
497         return h.finalize()
498
499     def decrypt(self, data, aad=None, icv=None):
500         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
501
502     def hmac_and_decrypt(self, ike):
503         ep = ike[ikev2.IKEv2_payload_Encrypted]
504         if self.ike_crypto == "AES-GCM-16ICV":
505             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
506             ct = ep.load[:-GCM_ICV_SIZE]
507             tag = ep.load[-GCM_ICV_SIZE:]
508             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
509         else:
510             self.verify_hmac(raw(ike))
511             integ_trunc = self.ike_integ_alg.trunc_len
512
513             # remove ICV and decrypt payload
514             ct = ep.load[:-integ_trunc]
515             plain = self.decrypt(ct)
516         # remove padding
517         pad_len = plain[-1]
518         return plain[: -pad_len - 1]
519
520     def build_ts_addr(self, ts, version):
521         return {
522             "starting_address_v" + version: ts["start_addr"],
523             "ending_address_v" + version: ts["end_addr"],
524         }
525
526     def generate_ts(self, is_ip4):
527         c = self.child_sas[0]
528         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
529         if is_ip4:
530             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
531             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
532             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
533             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
534         else:
535             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
536             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
537             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
538             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
539
540         if self.is_initiator:
541             return ([ts1], [ts2])
542         return ([ts2], [ts1])
543
544     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
545         if crypto not in CRYPTO_ALGOS:
546             raise TypeError("unsupported encryption algo %r" % crypto)
547         self.ike_crypto = crypto
548         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
549         self.ike_crypto_key_len = crypto_key_len
550
551         if integ not in AUTH_ALGOS:
552             raise TypeError("unsupported auth algo %r" % integ)
553         self.ike_integ = None if integ == "NULL" else integ
554         self.ike_integ_alg = AUTH_ALGOS[integ]
555
556         if prf not in PRF_ALGOS:
557             raise TypeError("unsupported prf algo %r" % prf)
558         self.ike_prf = prf
559         self.ike_prf_alg = PRF_ALGOS[prf]
560         self.ike_dh = dh
561         self.ike_group = DH[self.ike_dh]
562
563     def set_esp_props(self, crypto, crypto_key_len, integ):
564         self.esp_crypto_key_len = crypto_key_len
565         if crypto not in CRYPTO_ALGOS:
566             raise TypeError("unsupported encryption algo %r" % crypto)
567         self.esp_crypto = crypto
568         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
569
570         if integ not in AUTH_ALGOS:
571             raise TypeError("unsupported auth algo %r" % integ)
572         self.esp_integ = None if integ == "NULL" else integ
573         self.esp_integ_alg = AUTH_ALGOS[integ]
574
575     def crypto_attr(self, key_len):
576         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
577             return (0x800E << 16 | key_len << 3, 12)
578         else:
579             raise Exception("unsupported attribute type")
580
581     def ike_crypto_attr(self):
582         return self.crypto_attr(self.ike_crypto_key_len)
583
584     def esp_crypto_attr(self):
585         return self.crypto_attr(self.esp_crypto_key_len)
586
587     def compute_nat_sha1(self, ip, port, rspi=None):
588         if rspi is None:
589             rspi = self.rspi
590         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
591         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
592         digest.update(data)
593         return digest.finalize()
594
595     def clone(self, test, **kwargs):
596         if "spi" not in kwargs:
597             kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
598         if "nonce" not in kwargs:
599             kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
600         if self.child_sas:
601             if "local_ts" not in kwargs:
602                 kwargs["local_ts"] = self.child_sas[0].local_ts
603             if "remote_ts" not in kwargs:
604                 kwargs["remote_ts"] = self.child_sas[0].remote_ts
605         sa = type(self)(
606             test,
607             is_initiator=self.is_initiator,
608             i_id=self.i_id,
609             r_id=self.r_id,
610             id_type=self.id_type,
611             auth_data=self.auth_data,
612             auth_method=self.auth_method,
613             priv_key=self.priv_key,
614             i_natt=self.i_natt,
615             r_natt=self.r_natt,
616             udp_encap=self.udp_encap,
617             **kwargs,
618         )
619         if sa.is_initiator:
620             sa.set_ike_props(
621                 crypto=self.ike_crypto,
622                 crypto_key_len=self.ike_crypto_key_len,
623                 integ=self.ike_integ,
624                 prf=self.ike_prf,
625                 dh=self.ike_dh,
626             )
627             sa.set_esp_props(
628                 crypto=self.esp_crypto,
629                 crypto_key_len=self.esp_crypto_key_len,
630                 integ=self.esp_integ,
631             )
632         return sa
633
634
635 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
636 class IkePeer(VppTestCase):
637     """common class for initiator and responder"""
638
639     @classmethod
640     def setUpClass(cls):
641         import scapy.contrib.ikev2 as _ikev2
642
643         globals()["ikev2"] = _ikev2
644         super(IkePeer, cls).setUpClass()
645         cls.create_pg_interfaces(range(2))
646         for i in cls.pg_interfaces:
647             i.admin_up()
648             i.config_ip4()
649             i.resolve_arp()
650             i.config_ip6()
651             i.resolve_ndp()
652
653     @classmethod
654     def tearDownClass(cls):
655         super(IkePeer, cls).tearDownClass()
656
657     def tearDown(self):
658         super(IkePeer, self).tearDown()
659         if self.del_sa_from_responder:
660             self.initiate_del_sa_from_responder()
661         else:
662             self.initiate_del_sa_from_initiator()
663         r = self.vapi.ikev2_sa_dump()
664         self.assertEqual(len(r), 0)
665         r = self.vapi.ikev2_sa_v2_dump()
666         self.assertEqual(len(r), 0)
667         sas = self.vapi.ipsec_sa_dump()
668         self.assertEqual(len(sas), 0)
669         self.p.remove_vpp_config()
670         self.assertIsNone(self.p.query_vpp_config())
671
672     def setUp(self):
673         super(IkePeer, self).setUp()
674         self.config_tc()
675         self.p.add_vpp_config()
676         self.assertIsNotNone(self.p.query_vpp_config())
677         if self.sa.is_initiator:
678             self.sa.generate_dh_data()
679         self.vapi.cli("ikev2 set logging level 4")
680         self.vapi.cli("event-lo clear")
681
682     def assert_counter(self, count, name, version="ip4"):
683         node_name = "/err/ikev2-%s/" % version + name
684         self.assertEqual(count, self.statistics.get_err_counter(node_name))
685
686     def create_rekey_request(self, kex=False):
687         sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
688         header = ikev2.IKEv2(
689             init_SPI=self.sa.ispi,
690             resp_SPI=self.sa.rspi,
691             id=self.sa.new_msg_id(),
692             flags="Initiator",
693             exch_type="CREATE_CHILD_SA",
694         )
695
696         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
697         return self.create_packet(
698             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
699         )
700
701     def create_sa_rekey_request(self, **kwargs):
702         sa = self.generate_sa_init_payload(**kwargs)
703         header = ikev2.IKEv2(
704             init_SPI=self.sa.ispi,
705             resp_SPI=self.sa.rspi,
706             id=self.sa.new_msg_id(),
707             flags="Initiator",
708             exch_type="CREATE_CHILD_SA",
709         )
710         ike_msg = self.encrypt_ike_msg(header, sa, "SA")
711         return self.create_packet(
712             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
713         )
714
715     def create_empty_request(self):
716         header = ikev2.IKEv2(
717             init_SPI=self.sa.ispi,
718             resp_SPI=self.sa.rspi,
719             id=self.sa.new_msg_id(),
720             flags="Initiator",
721             exch_type="INFORMATIONAL",
722             next_payload="Encrypted",
723         )
724
725         msg = self.encrypt_ike_msg(header, b"", None)
726         return self.create_packet(
727             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
728         )
729
730     def create_packet(
731         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
732     ):
733         if use_ip6:
734             src_ip = src_if.remote_ip6
735             dst_ip = src_if.local_ip6
736             ip_layer = IPv6
737         else:
738             src_ip = src_if.remote_ip4
739             dst_ip = src_if.local_ip4
740             ip_layer = IP
741         res = (
742             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
743             / ip_layer(src=src_ip, dst=dst_ip)
744             / UDP(sport=sport, dport=dport)
745         )
746         if natt:
747             # insert non ESP marker
748             res = res / Raw(b"\x00" * 4)
749         return res / msg
750
751     def verify_udp(self, udp):
752         self.assertEqual(udp.sport, self.sa.sport)
753         self.assertEqual(udp.dport, self.sa.dport)
754
755     def get_ike_header(self, packet):
756         try:
757             ih = packet[ikev2.IKEv2]
758             ih = self.verify_and_remove_non_esp_marker(ih)
759         except IndexError as e:
760             # this is a workaround for getting IKEv2 layer as both ikev2 and
761             # ipsec register for port 4500
762             esp = packet[ESP]
763             ih = self.verify_and_remove_non_esp_marker(esp)
764         self.assertEqual(ih.version, 0x20)
765         self.assertNotIn("Version", ih.flags)
766         return ih
767
768     def verify_and_remove_non_esp_marker(self, packet):
769         if self.sa.natt:
770             # if we are in nat traversal mode check for non esp marker
771             # and remove it
772             data = raw(packet)
773             self.assertEqual(data[:4], b"\x00" * 4)
774             return ikev2.IKEv2(data[4:])
775         else:
776             return packet
777
778     def encrypt_ike_msg(self, header, plain, first_payload):
779         if self.sa.ike_crypto == "AES-GCM-16ICV":
780             data = self.sa.ike_crypto_alg.pad(raw(plain))
781             plen = (
782                 len(data)
783                 + GCM_IV_SIZE
784                 + GCM_ICV_SIZE
785                 + len(ikev2.IKEv2_payload_Encrypted())
786             )
787             tlen = plen + len(ikev2.IKEv2())
788
789             # prepare aad data
790             sk_p = ikev2.IKEv2_payload_Encrypted(
791                 next_payload=first_payload, length=plen
792             )
793             header.length = tlen
794             res = header / sk_p
795             encr = self.sa.encrypt(raw(plain), raw(res))
796             sk_p = ikev2.IKEv2_payload_Encrypted(
797                 next_payload=first_payload, length=plen, load=encr
798             )
799             res = header / sk_p
800         else:
801             encr = self.sa.encrypt(raw(plain))
802             trunc_len = self.sa.ike_integ_alg.trunc_len
803             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
804             tlen = plen + len(ikev2.IKEv2())
805
806             sk_p = ikev2.IKEv2_payload_Encrypted(
807                 next_payload=first_payload, length=plen, load=encr
808             )
809             header.length = tlen
810             res = header / sk_p
811
812             integ_data = raw(res)
813             hmac_data = self.sa.compute_hmac(
814                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
815             )
816             res = res / Raw(hmac_data[:trunc_len])
817         assert len(res) == tlen
818         return res
819
820     def verify_udp_encap(self, ipsec_sa):
821         e = VppEnum.vl_api_ipsec_sad_flags_t
822         if self.sa.udp_encap or self.sa.natt:
823             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
824         else:
825             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
826
827     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
828         sas = self.vapi.ipsec_sa_dump()
829         if sa_count is None:
830             if is_rekey:
831                 # after rekey there is a short period of time in which old
832                 # inbound SA is still present
833                 sa_count = 3
834             else:
835                 sa_count = 2
836         self.assertEqual(len(sas), sa_count)
837         if self.sa.is_initiator:
838             if is_rekey:
839                 sa0 = sas[0].entry
840                 sa1 = sas[2].entry
841             else:
842                 sa0 = sas[0].entry
843                 sa1 = sas[1].entry
844         else:
845             if is_rekey:
846                 sa0 = sas[2].entry
847                 sa1 = sas[0].entry
848             else:
849                 sa1 = sas[0].entry
850                 sa0 = sas[1].entry
851
852         c = self.sa.child_sas[0]
853
854         self.verify_udp_encap(sa0)
855         self.verify_udp_encap(sa1)
856         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
857         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
858         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
859
860         if self.sa.esp_integ is None:
861             vpp_integ_alg = 0
862         else:
863             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
864         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
865         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
866
867         # verify crypto keys
868         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
869         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
870         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
871         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
872
873         # verify integ keys
874         if vpp_integ_alg:
875             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
876             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
877             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
878             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
879         else:
880             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
881             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
882
883     def verify_keymat(self, api_keys, keys, name):
884         km = getattr(keys, name)
885         api_km = getattr(api_keys, name)
886         api_km_len = getattr(api_keys, name + "_len")
887         self.assertEqual(len(km), api_km_len)
888         self.assertEqual(km, api_km[:api_km_len])
889
890     def verify_id(self, api_id, exp_id):
891         self.assertEqual(api_id.type, IDType.value(exp_id.type))
892         self.assertEqual(api_id.data_len, exp_id.data_len)
893         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
894
895     def verify_ike_sas(self, is_rekey=False):
896         r = self.vapi.ikev2_sa_dump()
897         if is_rekey:
898             sa_count = 2
899             sa = r[1].sa
900         else:
901             sa_count = 1
902             sa = r[0].sa
903         self.assertEqual(len(r), sa_count)
904         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
905         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
906         if self.ip6:
907             if self.sa.is_initiator:
908                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
909                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
910             else:
911                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
912                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
913         else:
914             if self.sa.is_initiator:
915                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
916                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
917             else:
918                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
919                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
920         self.verify_keymat(sa.keys, self.sa, "sk_d")
921         self.verify_keymat(sa.keys, self.sa, "sk_ai")
922         self.verify_keymat(sa.keys, self.sa, "sk_ar")
923         self.verify_keymat(sa.keys, self.sa, "sk_ei")
924         self.verify_keymat(sa.keys, self.sa, "sk_er")
925         self.verify_keymat(sa.keys, self.sa, "sk_pi")
926         self.verify_keymat(sa.keys, self.sa, "sk_pr")
927
928         self.assertEqual(sa.i_id.type, self.sa.id_type)
929         self.assertEqual(sa.r_id.type, self.sa.id_type)
930         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
931         self.assertEqual(sa.r_id.data_len, len(self.idr))
932         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
933         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
934
935         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
936         self.verify_nonce(n, self.sa.i_nonce)
937         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
938         self.verify_nonce(n, self.sa.r_nonce)
939
940         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
941         if is_rekey:
942             self.assertEqual(len(r), 0)
943             return
944
945         self.assertEqual(len(r), 1)
946         csa = r[0].child_sa
947         self.assertEqual(csa.sa_index, sa.sa_index)
948         c = self.sa.child_sas[0]
949         if hasattr(c, "sk_ai"):
950             self.verify_keymat(csa.keys, c, "sk_ai")
951             self.verify_keymat(csa.keys, c, "sk_ar")
952         self.verify_keymat(csa.keys, c, "sk_ei")
953         self.verify_keymat(csa.keys, c, "sk_er")
954         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
955         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
956
957         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
958         tsi = tsi[0]
959         tsr = tsr[0]
960         r = self.vapi.ikev2_traffic_selector_dump(
961             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
962         )
963         self.assertEqual(len(r), 1)
964         ts = r[0].ts
965         self.verify_ts(r[0].ts, tsi[0], True)
966
967         r = self.vapi.ikev2_traffic_selector_dump(
968             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
969         )
970         self.assertEqual(len(r), 1)
971         self.verify_ts(r[0].ts, tsr[0], False)
972
973     def verify_ike_sas_v2(self):
974         r = self.vapi.ikev2_sa_v2_dump()
975         self.assertEqual(len(r), 1)
976         sa = r[0].sa
977         self.assertEqual(self.p.profile_name, sa.profile_name)
978         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
979         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
980         if self.ip6:
981             if self.sa.is_initiator:
982                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
983                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
984             else:
985                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
986                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
987         else:
988             if self.sa.is_initiator:
989                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
990                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
991             else:
992                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
993                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
994         self.verify_keymat(sa.keys, self.sa, "sk_d")
995         self.verify_keymat(sa.keys, self.sa, "sk_ai")
996         self.verify_keymat(sa.keys, self.sa, "sk_ar")
997         self.verify_keymat(sa.keys, self.sa, "sk_ei")
998         self.verify_keymat(sa.keys, self.sa, "sk_er")
999         self.verify_keymat(sa.keys, self.sa, "sk_pi")
1000         self.verify_keymat(sa.keys, self.sa, "sk_pr")
1001
1002         self.assertEqual(sa.i_id.type, self.sa.id_type)
1003         self.assertEqual(sa.r_id.type, self.sa.id_type)
1004         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
1005         self.assertEqual(sa.r_id.data_len, len(self.idr))
1006         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
1007         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
1008
1009         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
1010         self.assertEqual(len(r), 1)
1011         csa = r[0].child_sa
1012         self.assertEqual(csa.sa_index, sa.sa_index)
1013         c = self.sa.child_sas[0]
1014         if hasattr(c, "sk_ai"):
1015             self.verify_keymat(csa.keys, c, "sk_ai")
1016             self.verify_keymat(csa.keys, c, "sk_ar")
1017         self.verify_keymat(csa.keys, c, "sk_ei")
1018         self.verify_keymat(csa.keys, c, "sk_er")
1019         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
1020         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
1021
1022         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1023         tsi = tsi[0]
1024         tsr = tsr[0]
1025         r = self.vapi.ikev2_traffic_selector_dump(
1026             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1027         )
1028         self.assertEqual(len(r), 1)
1029         ts = r[0].ts
1030         self.verify_ts(r[0].ts, tsi[0], True)
1031
1032         r = self.vapi.ikev2_traffic_selector_dump(
1033             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1034         )
1035         self.assertEqual(len(r), 1)
1036         self.verify_ts(r[0].ts, tsr[0], False)
1037
1038         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
1039         self.verify_nonce(n, self.sa.i_nonce)
1040         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
1041         self.verify_nonce(n, self.sa.r_nonce)
1042
1043     def verify_nonce(self, api_nonce, nonce):
1044         self.assertEqual(api_nonce.data_len, len(nonce))
1045         self.assertEqual(api_nonce.nonce, nonce)
1046
1047     def verify_ts(self, api_ts, ts, is_initiator):
1048         if is_initiator:
1049             self.assertTrue(api_ts.is_local)
1050         else:
1051             self.assertFalse(api_ts.is_local)
1052
1053         if self.p.ts_is_ip4:
1054             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
1055             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
1056         else:
1057             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
1058             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
1059         self.assertEqual(api_ts.start_port, ts.start_port)
1060         self.assertEqual(api_ts.end_port, ts.end_port)
1061         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
1062
1063
1064 class TemplateInitiator(IkePeer):
1065     """initiator test template"""
1066
1067     def initiate_del_sa_from_initiator(self):
1068         ispi = int.from_bytes(self.sa.ispi, "little")
1069         self.pg0.enable_capture()
1070         self.pg_start()
1071         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
1072         capture = self.pg0.get_capture(1)
1073         ih = self.get_ike_header(capture[0])
1074         self.assertNotIn("Response", ih.flags)
1075         self.assertIn("Initiator", ih.flags)
1076         self.assertEqual(ih.init_SPI, self.sa.ispi)
1077         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1078         plain = self.sa.hmac_and_decrypt(ih)
1079         d = ikev2.IKEv2_payload_Delete(plain)
1080         self.assertEqual(d.proto, 1)  # proto=IKEv2
1081         header = ikev2.IKEv2(
1082             init_SPI=self.sa.ispi,
1083             resp_SPI=self.sa.rspi,
1084             flags="Response",
1085             exch_type="INFORMATIONAL",
1086             id=ih.id,
1087             next_payload="Encrypted",
1088         )
1089         resp = self.encrypt_ike_msg(header, b"", None)
1090         self.send_and_assert_no_replies(self.pg0, resp)
1091
1092     def verify_del_sa(self, packet):
1093         ih = self.get_ike_header(packet)
1094         self.assertEqual(ih.id, self.sa.msg_id)
1095         self.assertEqual(ih.exch_type, 37)  # exchange informational
1096         self.assertIn("Response", ih.flags)
1097         self.assertIn("Initiator", ih.flags)
1098         plain = self.sa.hmac_and_decrypt(ih)
1099         self.assertEqual(plain, b"")
1100
1101     def initiate_del_sa_from_responder(self):
1102         header = ikev2.IKEv2(
1103             init_SPI=self.sa.ispi,
1104             resp_SPI=self.sa.rspi,
1105             exch_type="INFORMATIONAL",
1106             id=self.sa.new_msg_id(),
1107         )
1108         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1109         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1110         packet = self.create_packet(
1111             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1112         )
1113         self.pg0.add_stream(packet)
1114         self.pg0.enable_capture()
1115         self.pg_start()
1116         capture = self.pg0.get_capture(1)
1117         self.verify_del_sa(capture[0])
1118
1119     @staticmethod
1120     def find_notify_payload(packet, notify_type):
1121         n = packet[ikev2.IKEv2_payload_Notify]
1122         while n is not None:
1123             if n.type == notify_type:
1124                 return n
1125             n = n.payload
1126         return None
1127
1128     def verify_nat_detection(self, packet):
1129         if self.ip6:
1130             iph = packet[IPv6]
1131         else:
1132             iph = packet[IP]
1133         udp = packet[UDP]
1134
1135         # NAT_DETECTION_SOURCE_IP
1136         s = self.find_notify_payload(packet, 16388)
1137         self.assertIsNotNone(s)
1138         src_sha = self.sa.compute_nat_sha1(
1139             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
1140         )
1141         self.assertEqual(s.load, src_sha)
1142
1143         # NAT_DETECTION_DESTINATION_IP
1144         s = self.find_notify_payload(packet, 16389)
1145         self.assertIsNotNone(s)
1146         dst_sha = self.sa.compute_nat_sha1(
1147             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1148         )
1149         self.assertEqual(s.load, dst_sha)
1150
1151     def verify_sa_init_request(self, packet):
1152         udp = packet[UDP]
1153         self.sa.dport = udp.sport
1154         ih = packet[ikev2.IKEv2]
1155         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1156         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1157         self.sa.ispi = ih.init_SPI
1158         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1159         self.assertIn("Initiator", ih.flags)
1160         self.assertNotIn("Response", ih.flags)
1161         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1162         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1163
1164         prop = packet[ikev2.IKEv2_payload_Proposal]
1165         self.assertEqual(prop.proto, 1)  # proto = ikev2
1166         self.assertEqual(prop.proposal, 1)
1167         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1168         self.assertEqual(
1169             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1170         )
1171         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1172         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1173         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1174         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1175
1176         self.verify_nat_detection(packet)
1177         self.sa.set_ike_props(
1178             crypto="AES-GCM-16ICV",
1179             crypto_key_len=32,
1180             integ="NULL",
1181             prf="PRF_HMAC_SHA2_256",
1182             dh="3072MODPgr",
1183         )
1184         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1185         self.sa.generate_dh_data()
1186         self.sa.complete_dh_data()
1187         self.sa.calc_keys()
1188
1189     def update_esp_transforms(self, trans, sa):
1190         while trans:
1191             if trans.transform_type == 1:  # ecryption
1192                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1193             elif trans.transform_type == 3:  # integrity
1194                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1195             trans = trans.payload
1196
1197     def verify_sa_auth_req(self, packet):
1198         udp = packet[UDP]
1199         self.sa.dport = udp.sport
1200         ih = self.get_ike_header(packet)
1201         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1202         self.assertEqual(ih.init_SPI, self.sa.ispi)
1203         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1204         self.assertIn("Initiator", ih.flags)
1205         self.assertNotIn("Response", ih.flags)
1206
1207         udp = packet[UDP]
1208         self.verify_udp(udp)
1209         self.assertEqual(ih.id, self.sa.msg_id + 1)
1210         self.sa.msg_id += 1
1211         plain = self.sa.hmac_and_decrypt(ih)
1212         idi = ikev2.IKEv2_payload_IDi(plain)
1213         self.assertEqual(idi.load, self.sa.i_id)
1214         if self.no_idr_auth:
1215             self.assertEqual(idi.next_payload, 39)  # AUTH
1216         else:
1217             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1218             self.assertEqual(idr.load, self.sa.r_id)
1219         prop = idi[ikev2.IKEv2_payload_Proposal]
1220         c = self.sa.child_sas[0]
1221         c.ispi = prop.SPI
1222         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1223
1224     def send_init_response(self):
1225         tr_attr = self.sa.ike_crypto_attr()
1226         trans = (
1227             ikev2.IKEv2_payload_Transform(
1228                 transform_type="Encryption",
1229                 transform_id=self.sa.ike_crypto,
1230                 length=tr_attr[1],
1231                 key_length=tr_attr[0],
1232             )
1233             / ikev2.IKEv2_payload_Transform(
1234                 transform_type="Integrity", transform_id=self.sa.ike_integ
1235             )
1236             / ikev2.IKEv2_payload_Transform(
1237                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1238             )
1239             / ikev2.IKEv2_payload_Transform(
1240                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1241             )
1242         )
1243         props = ikev2.IKEv2_payload_Proposal(
1244             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1245         )
1246
1247         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1248         if self.sa.natt:
1249             dst_address = b"\x0a\x0a\x0a\x0a"
1250         else:
1251             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1252         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1253         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1254
1255         self.sa.init_resp_packet = (
1256             ikev2.IKEv2(
1257                 init_SPI=self.sa.ispi,
1258                 resp_SPI=self.sa.rspi,
1259                 exch_type="IKE_SA_INIT",
1260                 flags="Response",
1261             )
1262             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1263             / ikev2.IKEv2_payload_KE(
1264                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1265             )
1266             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1267             / ikev2.IKEv2_payload_Notify(
1268                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1269             )
1270             / ikev2.IKEv2_payload_Notify(
1271                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1272             )
1273         )
1274
1275         ike_msg = self.create_packet(
1276             self.pg0,
1277             self.sa.init_resp_packet,
1278             self.sa.sport,
1279             self.sa.dport,
1280             False,
1281             self.ip6,
1282         )
1283         self.pg_send(self.pg0, ike_msg)
1284         capture = self.pg0.get_capture(1)
1285         self.verify_sa_auth_req(capture[0])
1286
1287     def initiate_sa_init(self):
1288         self.pg0.enable_capture()
1289         self.pg_start()
1290         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1291
1292         capture = self.pg0.get_capture(1)
1293         self.verify_sa_init_request(capture[0])
1294         self.send_init_response()
1295
1296     def send_auth_response(self):
1297         tr_attr = self.sa.esp_crypto_attr()
1298         trans = (
1299             ikev2.IKEv2_payload_Transform(
1300                 transform_type="Encryption",
1301                 transform_id=self.sa.esp_crypto,
1302                 length=tr_attr[1],
1303                 key_length=tr_attr[0],
1304             )
1305             / ikev2.IKEv2_payload_Transform(
1306                 transform_type="Integrity", transform_id=self.sa.esp_integ
1307             )
1308             / ikev2.IKEv2_payload_Transform(
1309                 transform_type="Extended Sequence Number", transform_id="No ESN"
1310             )
1311             / ikev2.IKEv2_payload_Transform(
1312                 transform_type="Extended Sequence Number", transform_id="ESN"
1313             )
1314         )
1315
1316         c = self.sa.child_sas[0]
1317         props = ikev2.IKEv2_payload_Proposal(
1318             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1319         )
1320
1321         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1322         plain = (
1323             ikev2.IKEv2_payload_IDi(
1324                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1325             )
1326             / ikev2.IKEv2_payload_IDr(
1327                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1328             )
1329             / ikev2.IKEv2_payload_AUTH(
1330                 next_payload="SA",
1331                 auth_type=AuthMethod.value(self.sa.auth_method),
1332                 load=self.sa.auth_data,
1333             )
1334             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1335             / ikev2.IKEv2_payload_TSi(
1336                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1337             )
1338             / ikev2.IKEv2_payload_TSr(
1339                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1340             )
1341             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1342         )
1343
1344         header = ikev2.IKEv2(
1345             init_SPI=self.sa.ispi,
1346             resp_SPI=self.sa.rspi,
1347             id=self.sa.new_msg_id(),
1348             flags="Response",
1349             exch_type="IKE_AUTH",
1350         )
1351
1352         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1353         packet = self.create_packet(
1354             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1355         )
1356         self.pg_send(self.pg0, packet)
1357
1358     def test_initiator(self):
1359         self.initiate_sa_init()
1360         self.sa.auth_init()
1361         self.sa.calc_child_keys()
1362         self.send_auth_response()
1363         self.verify_ike_sas()
1364         self.verify_ike_sas_v2()
1365
1366
1367 class TemplateResponder(IkePeer):
1368     """responder test template"""
1369
1370     def initiate_del_sa_from_responder(self):
1371         self.pg0.enable_capture()
1372         self.pg_start()
1373         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1374         capture = self.pg0.get_capture(1)
1375         ih = self.get_ike_header(capture[0])
1376         self.assertNotIn("Response", ih.flags)
1377         self.assertNotIn("Initiator", ih.flags)
1378         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1379         plain = self.sa.hmac_and_decrypt(ih)
1380         d = ikev2.IKEv2_payload_Delete(plain)
1381         self.assertEqual(d.proto, 1)  # proto=IKEv2
1382         self.assertEqual(ih.init_SPI, self.sa.ispi)
1383         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1384         header = ikev2.IKEv2(
1385             init_SPI=self.sa.ispi,
1386             resp_SPI=self.sa.rspi,
1387             flags="Initiator+Response",
1388             exch_type="INFORMATIONAL",
1389             id=ih.id,
1390             next_payload="Encrypted",
1391         )
1392         resp = self.encrypt_ike_msg(header, b"", None)
1393         self.send_and_assert_no_replies(self.pg0, resp)
1394
1395     def verify_del_sa(self, packet):
1396         ih = self.get_ike_header(packet)
1397         self.assertEqual(ih.id, self.sa.msg_id)
1398         self.assertEqual(ih.exch_type, 37)  # exchange informational
1399         self.assertIn("Response", ih.flags)
1400         self.assertNotIn("Initiator", ih.flags)
1401         self.assertEqual(ih.next_payload, 46)  # Encrypted
1402         self.assertEqual(ih.init_SPI, self.sa.ispi)
1403         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1404         plain = self.sa.hmac_and_decrypt(ih)
1405         self.assertEqual(plain, b"")
1406
1407     def initiate_del_sa_from_initiator(self):
1408         header = ikev2.IKEv2(
1409             init_SPI=self.sa.ispi,
1410             resp_SPI=self.sa.rspi,
1411             flags="Initiator",
1412             exch_type="INFORMATIONAL",
1413             id=self.sa.new_msg_id(),
1414         )
1415         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1416         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1417         packet = self.create_packet(
1418             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1419         )
1420         self.pg0.add_stream(packet)
1421         self.pg0.enable_capture()
1422         self.pg_start()
1423         capture = self.pg0.get_capture(1)
1424         self.verify_del_sa(capture[0])
1425
1426     def generate_sa_init_payload(
1427         self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
1428     ):
1429         tr_attr = self.sa.ike_crypto_attr()
1430         trans = (
1431             ikev2.IKEv2_payload_Transform(
1432                 transform_type="Encryption",
1433                 transform_id=self.sa.ike_crypto,
1434                 length=tr_attr[1],
1435                 key_length=tr_attr[0],
1436             )
1437             / ikev2.IKEv2_payload_Transform(
1438                 transform_type="Integrity", transform_id=self.sa.ike_integ
1439             )
1440             / ikev2.IKEv2_payload_Transform(
1441                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1442             )
1443             / ikev2.IKEv2_payload_Transform(
1444                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1445             )
1446         )
1447
1448         if spi is None:
1449             pargs = {}
1450         else:
1451             pargs = {"SPI": spi, "SPIsize": len(spi)}
1452         props = ikev2.IKEv2_payload_Proposal(
1453             proposal=1,
1454             proto="IKEv2",
1455             trans_nb=4,
1456             trans=trans,
1457             **pargs,
1458         )
1459
1460         return (
1461             ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1462             / ikev2.IKEv2_payload_KE(
1463                 next_payload="Nonce",
1464                 group=self.sa.ike_dh,
1465                 load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
1466             )
1467             / ikev2.IKEv2_payload_Nonce(
1468                 next_payload=next_payload,
1469                 load=self.sa.i_nonce if nonce is None else nonce,
1470             )
1471         )
1472
1473     def send_sa_init_req(self):
1474         self.sa.init_req_packet = ikev2.IKEv2(
1475             init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1476         ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
1477
1478         if not self.ip6:
1479             if self.sa.i_natt:
1480                 src_address = b"\x0a\x0a\x0a\x01"
1481             else:
1482                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1483
1484             if self.sa.r_natt:
1485                 dst_address = b"\x0a\x0a\x0a\x0a"
1486             else:
1487                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1488
1489             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1490             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1491             nat_src_detection = ikev2.IKEv2_payload_Notify(
1492                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1493             )
1494             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1495                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1496             )
1497             self.sa.init_req_packet = (
1498                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1499             )
1500
1501         ike_msg = self.create_packet(
1502             self.pg0,
1503             self.sa.init_req_packet,
1504             self.sa.sport,
1505             self.sa.dport,
1506             self.sa.natt,
1507             self.ip6,
1508         )
1509         self.pg0.add_stream(ike_msg)
1510         self.pg0.enable_capture()
1511         self.pg_start()
1512         capture = self.pg0.get_capture(1)
1513         self.verify_sa_init(capture[0])
1514
1515     def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1516         tr_attr = self.sa.esp_crypto_attr()
1517         last_payload = last_payload or "Notify"
1518         trans_nb = 4
1519         trans = (
1520             ikev2.IKEv2_payload_Transform(
1521                 transform_type="Encryption",
1522                 transform_id=self.sa.esp_crypto,
1523                 length=tr_attr[1],
1524                 key_length=tr_attr[0],
1525             )
1526             / ikev2.IKEv2_payload_Transform(
1527                 transform_type="Integrity", transform_id=self.sa.esp_integ
1528             )
1529             / ikev2.IKEv2_payload_Transform(
1530                 transform_type="Extended Sequence Number", transform_id="No ESN"
1531             )
1532             / ikev2.IKEv2_payload_Transform(
1533                 transform_type="Extended Sequence Number", transform_id="ESN"
1534             )
1535         )
1536
1537         if kex:
1538             trans_nb += 1
1539             trans /= ikev2.IKEv2_payload_Transform(
1540                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1541             )
1542
1543         c = self.sa.child_sas[0]
1544         props = ikev2.IKEv2_payload_Proposal(
1545             proposal=1,
1546             proto="ESP",
1547             SPIsize=4,
1548             SPI=c.ispi,
1549             trans_nb=trans_nb,
1550             trans=trans,
1551         )
1552
1553         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1554         plain = (
1555             ikev2.IKEv2_payload_AUTH(
1556                 next_payload="SA",
1557                 auth_type=AuthMethod.value(self.sa.auth_method),
1558                 load=self.sa.auth_data,
1559             )
1560             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1561             / ikev2.IKEv2_payload_TSi(
1562                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1563             )
1564             / ikev2.IKEv2_payload_TSr(
1565                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1566             )
1567         )
1568
1569         if is_rekey:
1570             first_payload = "Nonce"
1571             if kex:
1572                 head = ikev2.IKEv2_payload_Nonce(
1573                     load=self.sa.i_nonce, next_payload="KE"
1574                 ) / ikev2.IKEv2_payload_KE(
1575                     group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1576                 )
1577             else:
1578                 head = ikev2.IKEv2_payload_Nonce(
1579                     load=self.sa.i_nonce, next_payload="SA"
1580                 )
1581             plain = (
1582                 head
1583                 / plain
1584                 / ikev2.IKEv2_payload_Notify(
1585                     type="REKEY_SA",
1586                     proto="ESP",
1587                     SPI=c.ispi,
1588                     length=8 + len(c.ispi),
1589                     next_payload="Notify",
1590                 )
1591                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1592             )
1593         else:
1594             first_payload = "IDi"
1595             if self.no_idr_auth:
1596                 ids = ikev2.IKEv2_payload_IDi(
1597                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1598                 )
1599             else:
1600                 ids = ikev2.IKEv2_payload_IDi(
1601                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1602                 ) / ikev2.IKEv2_payload_IDr(
1603                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1604                 )
1605             plain = ids / plain
1606         return plain, first_payload
1607
1608     def send_sa_auth(self):
1609         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1610         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1611         header = ikev2.IKEv2(
1612             init_SPI=self.sa.ispi,
1613             resp_SPI=self.sa.rspi,
1614             id=self.sa.new_msg_id(),
1615             flags="Initiator",
1616             exch_type="IKE_AUTH",
1617         )
1618
1619         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1620         packet = self.create_packet(
1621             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1622         )
1623         self.pg0.add_stream(packet)
1624         self.pg0.enable_capture()
1625         self.pg_start()
1626         capture = self.pg0.get_capture(1)
1627         self.verify_sa_auth_resp(capture[0])
1628
1629     def verify_sa_init(self, packet):
1630         ih = self.get_ike_header(packet)
1631
1632         self.assertEqual(ih.id, self.sa.msg_id)
1633         self.assertEqual(ih.exch_type, 34)
1634         self.assertIn("Response", ih.flags)
1635         self.assertEqual(ih.init_SPI, self.sa.ispi)
1636         self.assertNotEqual(ih.resp_SPI, 0)
1637         self.sa.rspi = ih.resp_SPI
1638         try:
1639             sa = ih[ikev2.IKEv2_payload_SA]
1640             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1641             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1642         except IndexError as e:
1643             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1644             self.logger.error(ih.show())
1645             raise
1646         self.sa.complete_dh_data()
1647         self.sa.calc_keys()
1648         self.sa.auth_init()
1649
1650     def verify_sa_auth_resp(self, packet):
1651         ike = self.get_ike_header(packet)
1652         udp = packet[UDP]
1653         self.verify_udp(udp)
1654         self.assertEqual(ike.id, self.sa.msg_id)
1655         plain = self.sa.hmac_and_decrypt(ike)
1656         idr = ikev2.IKEv2_payload_IDr(plain)
1657         prop = idr[ikev2.IKEv2_payload_Proposal]
1658         self.assertEqual(prop.SPIsize, 4)
1659         self.sa.child_sas[0].rspi = prop.SPI
1660         self.sa.calc_child_keys()
1661
1662     IKE_NODE_SUFFIX = "ip4"
1663
1664     def verify_counters(self):
1665         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1666         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1667         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1668
1669         r = self.vapi.ikev2_sa_dump()
1670         s = r[0].sa.stats
1671         self.assertEqual(1, s.n_sa_auth_req)
1672         self.assertEqual(1, s.n_sa_init_req)
1673
1674         r = self.vapi.ikev2_sa_v2_dump()
1675         s = r[0].sa.stats
1676         self.assertEqual(1, s.n_sa_auth_req)
1677         self.assertEqual(1, s.n_sa_init_req)
1678
1679     def test_responder(self):
1680         self.send_sa_init_req()
1681         self.send_sa_auth()
1682         self.verify_ipsec_sas()
1683         self.verify_ike_sas()
1684         self.verify_ike_sas_v2()
1685         self.verify_counters()
1686
1687
1688 class Ikev2Params(object):
1689     def config_params(self, params={}):
1690         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1691         ei = VppEnum.vl_api_ipsec_integ_alg_t
1692         self.vpp_enums = {
1693             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1694             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1695             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1696             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1697             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1698             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1699             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1700             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1701             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1702             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1703         }
1704
1705         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1706         if dpd_disabled:
1707             self.vapi.cli("ikev2 dpd disable")
1708         self.del_sa_from_responder = (
1709             False
1710             if "del_sa_from_responder" not in params
1711             else params["del_sa_from_responder"]
1712         )
1713         i_natt = False if "i_natt" not in params else params["i_natt"]
1714         r_natt = False if "r_natt" not in params else params["r_natt"]
1715         self.p = Profile(self, "pr1")
1716         self.ip6 = False if "ip6" not in params else params["ip6"]
1717
1718         if "auth" in params and params["auth"] == "rsa-sig":
1719             auth_method = "rsa-sig"
1720             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1721             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1722
1723             client_file = work_dir + params["client-cert"]
1724             server_pem = open(work_dir + params["server-cert"]).read()
1725             client_priv = open(work_dir + params["client-key"]).read()
1726             client_priv = load_pem_private_key(
1727                 str.encode(client_priv), None, default_backend()
1728             )
1729             self.peer_cert = x509.load_pem_x509_certificate(
1730                 str.encode(server_pem), default_backend()
1731             )
1732             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1733             auth_data = None
1734         else:
1735             auth_data = b"$3cr3tpa$$w0rd"
1736             self.p.add_auth(method="shared-key", data=auth_data)
1737             auth_method = "shared-key"
1738             client_priv = None
1739
1740         is_init = True if "is_initiator" not in params else params["is_initiator"]
1741         self.no_idr_auth = params.get("no_idr_in_auth", False)
1742
1743         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1744         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1745         r_id = self.idr = idr["data"]
1746         i_id = self.idi = idi["data"]
1747         if is_init:
1748             # scapy is initiator, VPP is responder
1749             self.p.add_local_id(**idr)
1750             self.p.add_remote_id(**idi)
1751             if self.no_idr_auth:
1752                 r_id = None
1753         else:
1754             # VPP is initiator, scapy is responder
1755             self.p.add_local_id(**idi)
1756             if not self.no_idr_auth:
1757                 self.p.add_remote_id(**idr)
1758
1759         loc_ts = (
1760             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1761             if "loc_ts" not in params
1762             else params["loc_ts"]
1763         )
1764         rem_ts = (
1765             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1766             if "rem_ts" not in params
1767             else params["rem_ts"]
1768         )
1769         self.p.add_local_ts(**loc_ts)
1770         self.p.add_remote_ts(**rem_ts)
1771         if "responder" in params:
1772             self.p.add_responder(params["responder"])
1773         if "ike_transforms" in params:
1774             self.p.add_ike_transforms(params["ike_transforms"])
1775         if "esp_transforms" in params:
1776             self.p.add_esp_transforms(params["esp_transforms"])
1777
1778         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1779         if udp_encap:
1780             self.p.set_udp_encap(True)
1781
1782         if "responder_hostname" in params:
1783             hn = params["responder_hostname"]
1784             self.p.add_responder_hostname(hn)
1785
1786             # configure static dns record
1787             self.vapi.dns_name_server_add_del(
1788                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1789             )
1790             self.vapi.dns_enable_disable(enable=1)
1791
1792             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1793             self.vapi.cli(cmd)
1794
1795         self.sa = IKEv2SA(
1796             self,
1797             i_id=i_id,
1798             r_id=r_id,
1799             is_initiator=is_init,
1800             id_type=self.p.local_id["id_type"],
1801             i_natt=i_natt,
1802             r_natt=r_natt,
1803             priv_key=client_priv,
1804             auth_method=auth_method,
1805             nonce=params.get("nonce"),
1806             auth_data=auth_data,
1807             udp_encap=udp_encap,
1808             local_ts=self.p.remote_ts,
1809             remote_ts=self.p.local_ts,
1810         )
1811
1812         if is_init:
1813             ike_crypto = (
1814                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1815             )
1816             ike_integ = (
1817                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1818             )
1819             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1820
1821             esp_crypto = (
1822                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1823             )
1824             esp_integ = (
1825                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1826             )
1827
1828             self.sa.set_ike_props(
1829                 crypto=ike_crypto[0],
1830                 crypto_key_len=ike_crypto[1],
1831                 integ=ike_integ,
1832                 prf="PRF_HMAC_SHA2_256",
1833                 dh=ike_dh,
1834             )
1835             self.sa.set_esp_props(
1836                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1837             )
1838
1839
1840 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1841 class TestApi(VppTestCase):
1842     """Test IKEV2 API"""
1843
1844     @classmethod
1845     def setUpClass(cls):
1846         super(TestApi, cls).setUpClass()
1847
1848     @classmethod
1849     def tearDownClass(cls):
1850         super(TestApi, cls).tearDownClass()
1851
1852     def tearDown(self):
1853         super(TestApi, self).tearDown()
1854         self.p1.remove_vpp_config()
1855         self.p2.remove_vpp_config()
1856         r = self.vapi.ikev2_profile_dump()
1857         self.assertEqual(len(r), 0)
1858
1859     def configure_profile(self, cfg):
1860         p = Profile(self, cfg["name"])
1861         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1862         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1863         p.add_local_ts(**cfg["loc_ts"])
1864         p.add_remote_ts(**cfg["rem_ts"])
1865         p.add_responder(cfg["responder"])
1866         p.add_ike_transforms(cfg["ike_ts"])
1867         p.add_esp_transforms(cfg["esp_ts"])
1868         p.add_auth(**cfg["auth"])
1869         p.set_udp_encap(cfg["udp_encap"])
1870         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1871         if "lifetime_data" in cfg:
1872             p.set_lifetime_data(cfg["lifetime_data"])
1873         if "tun_itf" in cfg:
1874             p.set_tunnel_interface(cfg["tun_itf"])
1875         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1876             p.disable_natt()
1877         p.add_vpp_config()
1878         return p
1879
1880     def test_profile_api(self):
1881         """test profile dump API"""
1882         loc_ts4 = {
1883             "proto": 8,
1884             "start_port": 1,
1885             "end_port": 19,
1886             "start_addr": "3.3.3.2",
1887             "end_addr": "3.3.3.3",
1888         }
1889         rem_ts4 = {
1890             "proto": 9,
1891             "start_port": 10,
1892             "end_port": 119,
1893             "start_addr": "4.5.76.80",
1894             "end_addr": "2.3.4.6",
1895         }
1896
1897         loc_ts6 = {
1898             "proto": 8,
1899             "start_port": 1,
1900             "end_port": 19,
1901             "start_addr": "ab::1",
1902             "end_addr": "ab::4",
1903         }
1904         rem_ts6 = {
1905             "proto": 9,
1906             "start_port": 10,
1907             "end_port": 119,
1908             "start_addr": "cd::12",
1909             "end_addr": "cd::13",
1910         }
1911
1912         conf = {
1913             "p1": {
1914                 "name": "p1",
1915                 "natt_disabled": True,
1916                 "loc_id": ("fqdn", b"vpp.home"),
1917                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1918                 "loc_ts": loc_ts4,
1919                 "rem_ts": rem_ts4,
1920                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1921                 "ike_ts": {
1922                     "crypto_alg": 20,
1923                     "crypto_key_size": 32,
1924                     "integ_alg": 0,
1925                     "dh_group": 1,
1926                 },
1927                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1928                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1929                 "udp_encap": True,
1930                 "ipsec_over_udp_port": 4501,
1931                 "lifetime_data": {
1932                     "lifetime": 123,
1933                     "lifetime_maxdata": 20192,
1934                     "lifetime_jitter": 9,
1935                     "handover": 132,
1936                 },
1937             },
1938             "p2": {
1939                 "name": "p2",
1940                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1941                 "rem_id": ("ip6-addr", b"abcd::1"),
1942                 "loc_ts": loc_ts6,
1943                 "rem_ts": rem_ts6,
1944                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1945                 "ike_ts": {
1946                     "crypto_alg": 12,
1947                     "crypto_key_size": 16,
1948                     "integ_alg": 3,
1949                     "dh_group": 3,
1950                 },
1951                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1952                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1953                 "udp_encap": False,
1954                 "ipsec_over_udp_port": 4600,
1955                 "tun_itf": 0,
1956             },
1957         }
1958         self.p1 = self.configure_profile(conf["p1"])
1959         self.p2 = self.configure_profile(conf["p2"])
1960
1961         r = self.vapi.ikev2_profile_dump()
1962         self.assertEqual(len(r), 2)
1963         self.verify_profile(r[0].profile, conf["p1"])
1964         self.verify_profile(r[1].profile, conf["p2"])
1965
1966     def verify_id(self, api_id, cfg_id):
1967         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1968         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1969
1970     def verify_ts(self, api_ts, cfg_ts):
1971         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1972         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1973         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1974         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1975         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1976
1977     def verify_responder(self, api_r, cfg_r):
1978         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1979         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1980
1981     def verify_transforms(self, api_ts, cfg_ts):
1982         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1983         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1984         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1985
1986     def verify_ike_transforms(self, api_ts, cfg_ts):
1987         self.verify_transforms(api_ts, cfg_ts)
1988         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1989
1990     def verify_esp_transforms(self, api_ts, cfg_ts):
1991         self.verify_transforms(api_ts, cfg_ts)
1992
1993     def verify_auth(self, api_auth, cfg_auth):
1994         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1995         self.assertEqual(api_auth.data, cfg_auth["data"])
1996         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1997
1998     def verify_lifetime_data(self, p, ld):
1999         self.assertEqual(p.lifetime, ld["lifetime"])
2000         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
2001         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
2002         self.assertEqual(p.handover, ld["handover"])
2003
2004     def verify_profile(self, ap, cp):
2005         self.assertEqual(ap.name, cp["name"])
2006         self.assertEqual(ap.udp_encap, cp["udp_encap"])
2007         self.verify_id(ap.loc_id, cp["loc_id"])
2008         self.verify_id(ap.rem_id, cp["rem_id"])
2009         self.verify_ts(ap.loc_ts, cp["loc_ts"])
2010         self.verify_ts(ap.rem_ts, cp["rem_ts"])
2011         self.verify_responder(ap.responder, cp["responder"])
2012         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
2013         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
2014         self.verify_auth(ap.auth, cp["auth"])
2015         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
2016         self.assertTrue(natt_dis == ap.natt_disabled)
2017
2018         if "lifetime_data" in cp:
2019             self.verify_lifetime_data(ap, cp["lifetime_data"])
2020         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
2021         if "tun_itf" in cp:
2022             self.assertEqual(ap.tun_itf, cp["tun_itf"])
2023         else:
2024             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
2025
2026
2027 @tag_fixme_vpp_workers
2028 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
2029     """test responder - responder behind NAT"""
2030
2031     IKE_NODE_SUFFIX = "ip4-natt"
2032
2033     def config_tc(self):
2034         self.config_params({"r_natt": True})
2035
2036
2037 @tag_fixme_vpp_workers
2038 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
2039     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
2040
2041     def config_tc(self):
2042         self.config_params(
2043             {
2044                 "i_natt": True,
2045                 "is_initiator": False,  # seen from test case perspective
2046                 # thus vpp is initiator
2047                 "responder": {
2048                     "sw_if_index": self.pg0.sw_if_index,
2049                     "addr": self.pg0.remote_ip4,
2050                 },
2051                 "ike-crypto": ("AES-GCM-16ICV", 32),
2052                 "ike-integ": "NULL",
2053                 "ike-dh": "3072MODPgr",
2054                 "ike_transforms": {
2055                     "crypto_alg": 20,  # "aes-gcm-16"
2056                     "crypto_key_size": 256,
2057                     "dh_group": 15,  # "modp-3072"
2058                 },
2059                 "esp_transforms": {
2060                     "crypto_alg": 12,  # "aes-cbc"
2061                     "crypto_key_size": 256,
2062                     # "hmac-sha2-256-128"
2063                     "integ_alg": 12,
2064                 },
2065             }
2066         )
2067
2068
2069 @tag_fixme_vpp_workers
2070 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
2071     """test ikev2 initiator - pre shared key auth"""
2072
2073     def config_tc(self):
2074         self.config_params(
2075             {
2076                 "is_initiator": False,  # seen from test case perspective
2077                 # thus vpp is initiator
2078                 "ike-crypto": ("AES-GCM-16ICV", 32),
2079                 "ike-integ": "NULL",
2080                 "ike-dh": "3072MODPgr",
2081                 "ike_transforms": {
2082                     "crypto_alg": 20,  # "aes-gcm-16"
2083                     "crypto_key_size": 256,
2084                     "dh_group": 15,  # "modp-3072"
2085                 },
2086                 "esp_transforms": {
2087                     "crypto_alg": 12,  # "aes-cbc"
2088                     "crypto_key_size": 256,
2089                     # "hmac-sha2-256-128"
2090                     "integ_alg": 12,
2091                 },
2092                 "responder_hostname": {
2093                     "hostname": "vpp.responder.org",
2094                     "sw_if_index": self.pg0.sw_if_index,
2095                 },
2096             }
2097         )
2098
2099
2100 @tag_fixme_vpp_workers
2101 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
2102     """test initiator - request window size (1)"""
2103
2104     def rekey_respond(self, req, update_child_sa_data):
2105         ih = self.get_ike_header(req)
2106         plain = self.sa.hmac_and_decrypt(ih)
2107         sa = ikev2.IKEv2_payload_SA(plain)
2108         if update_child_sa_data:
2109             prop = sa[ikev2.IKEv2_payload_Proposal]
2110             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2111             self.sa.r_nonce = self.sa.i_nonce
2112             self.sa.child_sas[0].ispi = prop.SPI
2113             self.sa.child_sas[0].rspi = prop.SPI
2114             self.sa.calc_child_keys()
2115
2116         header = ikev2.IKEv2(
2117             init_SPI=self.sa.ispi,
2118             resp_SPI=self.sa.rspi,
2119             flags="Response",
2120             exch_type=36,
2121             id=ih.id,
2122             next_payload="Encrypted",
2123         )
2124         resp = self.encrypt_ike_msg(header, sa, "SA")
2125         packet = self.create_packet(
2126             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2127         )
2128         self.send_and_assert_no_replies(self.pg0, packet)
2129
2130     def test_initiator(self):
2131         super(TestInitiatorRequestWindowSize, self).test_initiator()
2132         self.pg0.enable_capture()
2133         self.pg_start()
2134         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2135         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2136         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2137         capture = self.pg0.get_capture(2)
2138
2139         # reply in reverse order
2140         self.rekey_respond(capture[1], True)
2141         self.rekey_respond(capture[0], False)
2142
2143         # verify that only the second request was accepted
2144         self.verify_ike_sas()
2145         self.verify_ike_sas_v2()
2146         self.verify_ipsec_sas(is_rekey=True)
2147
2148
2149 @tag_fixme_vpp_workers
2150 class TestInitiatorRekey(TestInitiatorPsk):
2151     """test ikev2 initiator - rekey"""
2152
2153     def rekey_from_initiator(self):
2154         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2155         self.pg0.enable_capture()
2156         self.pg_start()
2157         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2158         capture = self.pg0.get_capture(1)
2159         ih = self.get_ike_header(capture[0])
2160         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
2161         self.assertNotIn("Response", ih.flags)
2162         self.assertIn("Initiator", ih.flags)
2163         plain = self.sa.hmac_and_decrypt(ih)
2164         sa = ikev2.IKEv2_payload_SA(plain)
2165         prop = sa[ikev2.IKEv2_payload_Proposal]
2166         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2167         self.sa.r_nonce = self.sa.i_nonce
2168         # update new responder SPI
2169         self.sa.child_sas[0].ispi = prop.SPI
2170         self.sa.child_sas[0].rspi = prop.SPI
2171         self.sa.calc_child_keys()
2172         header = ikev2.IKEv2(
2173             init_SPI=self.sa.ispi,
2174             resp_SPI=self.sa.rspi,
2175             flags="Response",
2176             exch_type=36,
2177             id=ih.id,
2178             next_payload="Encrypted",
2179         )
2180         resp = self.encrypt_ike_msg(header, sa, "SA")
2181         packet = self.create_packet(
2182             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2183         )
2184         self.send_and_assert_no_replies(self.pg0, packet)
2185
2186     def test_initiator(self):
2187         super(TestInitiatorRekey, self).test_initiator()
2188         self.rekey_from_initiator()
2189         self.verify_ike_sas()
2190         self.verify_ike_sas_v2()
2191         self.verify_ipsec_sas(is_rekey=True)
2192
2193
2194 @tag_fixme_vpp_workers
2195 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2196     """test ikev2 initiator - delete IKE SA from responder"""
2197
2198     def config_tc(self):
2199         self.config_params(
2200             {
2201                 "del_sa_from_responder": True,
2202                 "is_initiator": False,  # seen from test case perspective
2203                 # thus vpp is initiator
2204                 "responder": {
2205                     "sw_if_index": self.pg0.sw_if_index,
2206                     "addr": self.pg0.remote_ip4,
2207                 },
2208                 "ike-crypto": ("AES-GCM-16ICV", 32),
2209                 "ike-integ": "NULL",
2210                 "ike-dh": "3072MODPgr",
2211                 "ike_transforms": {
2212                     "crypto_alg": 20,  # "aes-gcm-16"
2213                     "crypto_key_size": 256,
2214                     "dh_group": 15,  # "modp-3072"
2215                 },
2216                 "esp_transforms": {
2217                     "crypto_alg": 12,  # "aes-cbc"
2218                     "crypto_key_size": 256,
2219                     # "hmac-sha2-256-128"
2220                     "integ_alg": 12,
2221                 },
2222                 "no_idr_in_auth": True,
2223             }
2224         )
2225
2226
2227 @tag_fixme_vpp_workers
2228 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2229     """test ikev2 responder - initiator behind NAT"""
2230
2231     IKE_NODE_SUFFIX = "ip4-natt"
2232
2233     def config_tc(self):
2234         self.config_params({"i_natt": True})
2235
2236
2237 @tag_fixme_vpp_workers
2238 class TestResponderPsk(TemplateResponder, Ikev2Params):
2239     """test ikev2 responder - pre shared key auth"""
2240
2241     def config_tc(self):
2242         self.config_params()
2243
2244
2245 @tag_fixme_vpp_workers
2246 class TestResponderDpd(TestResponderPsk):
2247     """
2248     Dead peer detection test
2249     """
2250
2251     def config_tc(self):
2252         self.config_params({"dpd_disabled": False})
2253
2254     def tearDown(self):
2255         pass
2256
2257     def test_responder(self):
2258         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2259         super(TestResponderDpd, self).test_responder()
2260         self.pg0.enable_capture()
2261         self.pg_start()
2262         # capture empty request but don't reply
2263         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2264         ih = self.get_ike_header(capture[0])
2265         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2266         plain = self.sa.hmac_and_decrypt(ih)
2267         self.assertEqual(plain, b"")
2268         # wait for SA expiration
2269         time.sleep(3)
2270         ike_sas = self.vapi.ikev2_sa_dump()
2271         self.assertEqual(len(ike_sas), 0)
2272         ike_sas = self.vapi.ikev2_sa_v2_dump()
2273         self.assertEqual(len(ike_sas), 0)
2274         ipsec_sas = self.vapi.ipsec_sa_dump()
2275         self.assertEqual(len(ipsec_sas), 0)
2276
2277
2278 @tag_fixme_vpp_workers
2279 class TestResponderRekey(TestResponderPsk):
2280     """test ikev2 responder - rekey"""
2281
2282     WITH_KEX = False
2283
2284     def send_rekey_from_initiator(self):
2285         if self.WITH_KEX:
2286             self.sa.generate_dh_data()
2287         packet = self.create_rekey_request(kex=self.WITH_KEX)
2288         self.pg0.add_stream(packet)
2289         self.pg0.enable_capture()
2290         self.pg_start()
2291         capture = self.pg0.get_capture(1)
2292         return capture
2293
2294     def process_rekey_response(self, capture):
2295         ih = self.get_ike_header(capture[0])
2296         plain = self.sa.hmac_and_decrypt(ih)
2297         sa = ikev2.IKEv2_payload_SA(plain)
2298         prop = sa[ikev2.IKEv2_payload_Proposal]
2299         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2300         # update new responder SPI
2301         self.sa.child_sas[0].rspi = prop.SPI
2302         if self.WITH_KEX:
2303             self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2304             self.sa.complete_dh_data()
2305         self.sa.calc_child_keys(kex=self.WITH_KEX)
2306
2307     def test_responder(self):
2308         super(TestResponderRekey, self).test_responder()
2309         self.process_rekey_response(self.send_rekey_from_initiator())
2310         self.verify_ike_sas()
2311         self.verify_ike_sas_v2()
2312         self.verify_ipsec_sas(is_rekey=True)
2313         self.assert_counter(1, "rekey_req", "ip4")
2314         r = self.vapi.ikev2_sa_dump()
2315         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2316         r = self.vapi.ikev2_sa_v2_dump()
2317         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2318
2319
2320 @tag_fixme_vpp_workers
2321 class TestResponderRekeyRepeat(TestResponderRekey):
2322     """test ikev2 responder - rekey repeat"""
2323
2324     def test_responder(self):
2325         super(TestResponderRekeyRepeat, self).test_responder()
2326         # rekey request is not accepted until old IPsec SA is expired
2327         capture = self.send_rekey_from_initiator()
2328         ih = self.get_ike_header(capture[0])
2329         plain = self.sa.hmac_and_decrypt(ih)
2330         notify = ikev2.IKEv2_payload_Notify(plain)
2331         self.assertEqual(notify.type, 43)
2332         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2333         # rekey request is accepted after old IPsec SA was expired
2334         for _ in range(50):
2335             if len(self.vapi.ipsec_sa_dump()) != 3:
2336                 break
2337             time.sleep(0.2)
2338         else:
2339             self.fail("old IPsec SA not expired")
2340         self.process_rekey_response(self.send_rekey_from_initiator())
2341         self.verify_ike_sas()
2342         self.verify_ike_sas_v2()
2343         self.verify_ipsec_sas(sa_count=3)
2344
2345
2346 @tag_fixme_vpp_workers
2347 class TestResponderRekeyKEX(TestResponderRekey):
2348     """test ikev2 responder - rekey with key exchange"""
2349
2350     WITH_KEX = True
2351
2352
2353 @tag_fixme_vpp_workers
2354 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2355     """test ikev2 responder - rekey repeat with key exchange"""
2356
2357     WITH_KEX = True
2358
2359
2360 @tag_fixme_vpp_workers
2361 class TestResponderRekeySA(TestResponderPsk):
2362     """test ikev2 responder - rekey IKE SA"""
2363
2364     def send_rekey_from_initiator(self, newsa):
2365         packet = self.create_sa_rekey_request(
2366             spi=newsa.ispi,
2367             dh_pub_key=newsa.my_dh_pub_key,
2368             nonce=newsa.i_nonce,
2369         )
2370         self.pg0.add_stream(packet)
2371         self.pg0.enable_capture()
2372         self.pg_start()
2373         capture = self.pg0.get_capture(1)
2374         return capture
2375
2376     def process_rekey_response(self, newsa, capture):
2377         ih = self.get_ike_header(capture[0])
2378         plain = self.sa.hmac_and_decrypt(ih)
2379         sa = ikev2.IKEv2_payload_SA(plain)
2380         prop = sa[ikev2.IKEv2_payload_Proposal]
2381         newsa.rspi = prop.SPI
2382         newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2383         newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2384         newsa.complete_dh_data()
2385         newsa.calc_keys(sk_d=self.sa.sk_d)
2386         newsa.child_sas = self.sa.child_sas
2387         self.sa.child_sas = []
2388
2389     def test_responder(self):
2390         super(TestResponderRekeySA, self).test_responder()
2391         newsa = self.sa.clone(self, spi=os.urandom(8))
2392         newsa.generate_dh_data()
2393         capture = self.send_rekey_from_initiator(newsa)
2394         self.process_rekey_response(newsa, capture)
2395         self.verify_ike_sas(is_rekey=True)
2396         self.assert_counter(1, "rekey_req", "ip4")
2397         r = self.vapi.ikev2_sa_dump()
2398         self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
2399         self.initiate_del_sa_from_initiator()
2400         self.sa = newsa
2401         self.verify_ike_sas()
2402
2403
2404 @tag_fixme_ubuntu2204
2405 @tag_fixme_debian11
2406 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2407     """test ikev2 responder - non-default table id"""
2408
2409     @classmethod
2410     def setUpClass(cls):
2411         import scapy.contrib.ikev2 as _ikev2
2412
2413         globals()["ikev2"] = _ikev2
2414         super(IkePeer, cls).setUpClass()
2415         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2416             cls, "vpp"
2417         ):
2418             return
2419         cls.create_pg_interfaces(range(1))
2420         cls.vapi.cli("ip table add 1")
2421         cls.vapi.cli("set interface ip table pg0 1")
2422         for i in cls.pg_interfaces:
2423             i.admin_up()
2424             i.config_ip4()
2425             i.resolve_arp()
2426             i.config_ip6()
2427             i.resolve_ndp()
2428
2429     def config_tc(self):
2430         self.config_params({"dpd_disabled": False})
2431
2432     def test_responder(self):
2433         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2434         super(TestResponderVrf, self).test_responder()
2435         self.pg0.enable_capture()
2436         self.pg_start()
2437         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2438         ih = self.get_ike_header(capture[0])
2439         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2440         plain = self.sa.hmac_and_decrypt(ih)
2441         self.assertEqual(plain, b"")
2442
2443
2444 @tag_fixme_vpp_workers
2445 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2446     """test ikev2 responder - cert based auth"""
2447
2448     def config_tc(self):
2449         self.config_params(
2450             {
2451                 "udp_encap": True,
2452                 "auth": "rsa-sig",
2453                 "server-key": "server-key.pem",
2454                 "client-key": "client-key.pem",
2455                 "client-cert": "client-cert.pem",
2456                 "server-cert": "server-cert.pem",
2457             }
2458         )
2459
2460
2461 @tag_fixme_vpp_workers
2462 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2463     TemplateResponder, Ikev2Params
2464 ):
2465     """
2466     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2467     """
2468
2469     def config_tc(self):
2470         self.config_params(
2471             {
2472                 "ike-crypto": ("AES-CBC", 16),
2473                 "ike-integ": "SHA2-256-128",
2474                 "esp-crypto": ("AES-CBC", 24),
2475                 "esp-integ": "SHA2-384-192",
2476                 "ike-dh": "2048MODPgr",
2477                 "nonce": os.urandom(256),
2478                 "no_idr_in_auth": True,
2479             }
2480         )
2481
2482
2483 @tag_fixme_vpp_workers
2484 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2485     TemplateResponder, Ikev2Params
2486 ):
2487
2488     """
2489     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2490     """
2491
2492     def config_tc(self):
2493         self.config_params(
2494             {
2495                 "ike-crypto": ("AES-CBC", 32),
2496                 "ike-integ": "SHA2-256-128",
2497                 "esp-crypto": ("AES-GCM-16ICV", 32),
2498                 "esp-integ": "NULL",
2499                 "ike-dh": "3072MODPgr",
2500             }
2501         )
2502
2503
2504 @tag_fixme_vpp_workers
2505 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2506     """
2507     IKE:AES_GCM_16_256
2508     """
2509
2510     IKE_NODE_SUFFIX = "ip6"
2511
2512     def config_tc(self):
2513         self.config_params(
2514             {
2515                 "del_sa_from_responder": True,
2516                 "ip6": True,
2517                 "natt": True,
2518                 "ike-crypto": ("AES-GCM-16ICV", 32),
2519                 "ike-integ": "NULL",
2520                 "ike-dh": "2048MODPgr",
2521                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2522                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2523             }
2524         )
2525
2526
2527 @tag_fixme_vpp_workers
2528 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2529     """
2530     Test for keep alive messages
2531     """
2532
2533     def send_empty_req_from_responder(self):
2534         packet = self.create_empty_request()
2535         self.pg0.add_stream(packet)
2536         self.pg0.enable_capture()
2537         self.pg_start()
2538         capture = self.pg0.get_capture(1)
2539         ih = self.get_ike_header(capture[0])
2540         self.assertEqual(ih.id, self.sa.msg_id)
2541         plain = self.sa.hmac_and_decrypt(ih)
2542         self.assertEqual(plain, b"")
2543         self.assert_counter(1, "keepalive", "ip4")
2544         r = self.vapi.ikev2_sa_dump()
2545         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2546         r = self.vapi.ikev2_sa_v2_dump()
2547         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2548
2549     def test_initiator(self):
2550         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2551         self.send_empty_req_from_responder()
2552
2553
2554 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2555     """malformed packet test"""
2556
2557     def tearDown(self):
2558         pass
2559
2560     def config_tc(self):
2561         self.config_params()
2562
2563     def create_ike_init_msg(self, length=None, payload=None):
2564         msg = ikev2.IKEv2(
2565             length=length,
2566             init_SPI="\x11" * 8,
2567             flags="Initiator",
2568             exch_type="IKE_SA_INIT",
2569         )
2570         if payload is not None:
2571             msg /= payload
2572         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2573
2574     def verify_bad_packet_length(self):
2575         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2576         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2577         self.assert_counter(self.pkt_count, "bad_length")
2578
2579     def verify_bad_sa_payload_length(self):
2580         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2581         ike_msg = self.create_ike_init_msg(payload=p)
2582         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2583         self.assert_counter(self.pkt_count, "malformed_packet")
2584
2585     def test_responder(self):
2586         self.pkt_count = 254
2587         self.verify_bad_packet_length()
2588         self.verify_bad_sa_payload_length()
2589
2590
2591 if __name__ == "__main__":
2592     unittest.main(testRunner=VppTestRunner)