vppinfra: add support for precomputed SHA2 HMAC key and chained buffers
[vpp.git] / test / test_ikev2.py
1 import os
2 import socket
3 import time
4 from socket import inet_pton
5 from cryptography import x509
6 from cryptography.hazmat.backends import default_backend
7 from cryptography.hazmat.primitives import hashes, hmac
8 from cryptography.hazmat.primitives.asymmetric import dh, padding
9 from cryptography.hazmat.primitives.serialization import load_pem_private_key
10 from cryptography.hazmat.primitives.ciphers import (
11     Cipher,
12     algorithms,
13     modes,
14 )
15 from ipaddress import IPv4Address, IPv6Address, ip_address
16 import unittest
17 from config import config
18 from scapy.layers.ipsec import ESP
19 from scapy.layers.inet import IP, UDP, Ether
20 from scapy.layers.inet6 import IPv6
21 from scapy.packet import raw, Raw
22 from scapy.utils import long_converter
23 from framework import VppTestCase
24 from asfframework import (
25     tag_fixme_vpp_workers,
26     tag_fixme_ubuntu2204,
27     tag_fixme_debian11,
28     is_distro_ubuntu2204,
29     is_distro_debian11,
30     VppTestRunner,
31 )
32 from vpp_ikev2 import Profile, IDType, AuthMethod
33 from vpp_papi import VppEnum
34
35 try:
36     text_type = unicode
37 except NameError:
38     text_type = str
39
40 KEY_PAD = b"Key Pad for IKEv2"
41 SALT_SIZE = 4
42 GCM_ICV_SIZE = 16
43 GCM_IV_SIZE = 8
44
45
46 # defined in rfc3526
47 # tuple structure is (p, g, key_len)
48 DH = {
49     "2048MODPgr": (
50         long_converter(
51             """
52     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
63         ),
64         2,
65         256,
66     ),
67     "3072MODPgr": (
68         long_converter(
69             """
70     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
71     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
72     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
73     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
74     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
75     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
76     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
77     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
78     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
79     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
80     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
81     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
82     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
83     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
84     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
85     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
86         ),
87         2,
88         384,
89     ),
90 }
91
92
93 class CryptoAlgo(object):
94     def __init__(self, name, cipher, mode):
95         self.name = name
96         self.cipher = cipher
97         self.mode = mode
98         if self.cipher is not None:
99             self.bs = self.cipher.block_size // 8
100
101             if self.name == "AES-GCM-16ICV":
102                 self.iv_len = GCM_IV_SIZE
103             else:
104                 self.iv_len = self.bs
105
106     def encrypt(self, data, key, aad=None):
107         iv = os.urandom(self.iv_len)
108         if aad is None:
109             encryptor = Cipher(
110                 self.cipher(key), self.mode(iv), default_backend()
111             ).encryptor()
112             return iv + encryptor.update(data) + encryptor.finalize()
113         else:
114             salt = key[-SALT_SIZE:]
115             nonce = salt + iv
116             encryptor = Cipher(
117                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
118             ).encryptor()
119             encryptor.authenticate_additional_data(aad)
120             data = encryptor.update(data) + encryptor.finalize()
121             data += encryptor.tag[:GCM_ICV_SIZE]
122             return iv + data
123
124     def decrypt(self, data, key, aad=None, icv=None):
125         if aad is None:
126             iv = data[: self.iv_len]
127             ct = data[self.iv_len :]
128             decryptor = Cipher(
129                 algorithms.AES(key), self.mode(iv), default_backend()
130             ).decryptor()
131             return decryptor.update(ct) + decryptor.finalize()
132         else:
133             salt = key[-SALT_SIZE:]
134             nonce = salt + data[:GCM_IV_SIZE]
135             ct = data[GCM_IV_SIZE:]
136             key = key[:-SALT_SIZE]
137             decryptor = Cipher(
138                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
139             ).decryptor()
140             decryptor.authenticate_additional_data(aad)
141             return decryptor.update(ct) + decryptor.finalize()
142
143     def pad(self, data):
144         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
145         data = data + b"\x00" * (pad_len - 1)
146         return data + bytes([pad_len - 1])
147
148
149 class AuthAlgo(object):
150     def __init__(self, name, mac, mod, key_len, trunc_len=None):
151         self.name = name
152         self.mac = mac
153         self.mod = mod
154         self.key_len = key_len
155         self.trunc_len = trunc_len or key_len
156
157
158 CRYPTO_ALGOS = {
159     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
160     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
161     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
162 }
163
164 AUTH_ALGOS = {
165     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
167     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
168     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
169     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
170 }
171
172 PRF_ALGOS = {
173     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
174     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
175 }
176
177 CRYPTO_IDS = {
178     12: "AES-CBC",
179     20: "AES-GCM-16ICV",
180 }
181
182 INTEG_IDS = {
183     2: "HMAC-SHA1-96",
184     12: "SHA2-256-128",
185     13: "SHA2-384-192",
186     14: "SHA2-512-256",
187 }
188
189
190 class IKEv2ChildSA(object):
191     def __init__(self, local_ts, remote_ts, is_initiator):
192         spi = os.urandom(4)
193         if is_initiator:
194             self.ispi = spi
195             self.rspi = None
196         else:
197             self.rspi = spi
198             self.ispi = None
199         self.local_ts = local_ts
200         self.remote_ts = remote_ts
201
202
203 class IKEv2SA(object):
204     def __init__(
205         self,
206         test,
207         is_initiator=True,
208         i_id=None,
209         r_id=None,
210         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
211         id_type="fqdn",
212         nonce=None,
213         auth_data=None,
214         local_ts=None,
215         remote_ts=None,
216         auth_method="shared-key",
217         priv_key=None,
218         i_natt=False,
219         r_natt=False,
220         udp_encap=False,
221     ):
222         self.udp_encap = udp_encap
223         self.i_natt = i_natt
224         self.r_natt = r_natt
225         if i_natt or r_natt:
226             self.sport = 4500
227             self.dport = 4500
228         else:
229             self.sport = 500
230             self.dport = 500
231         self.msg_id = 0
232         self.dh_params = None
233         self.test = test
234         self.priv_key = priv_key
235         self.is_initiator = is_initiator
236         nonce = nonce or os.urandom(32)
237         self.auth_data = auth_data
238         self.i_id = i_id
239         self.r_id = r_id
240         if isinstance(id_type, str):
241             self.id_type = IDType.value(id_type)
242         else:
243             self.id_type = id_type
244         self.auth_method = auth_method
245         if self.is_initiator:
246             self.rspi = 8 * b"\x00"
247             self.ispi = spi
248             self.i_nonce = nonce
249         else:
250             self.rspi = spi
251             self.ispi = 8 * b"\x00"
252             self.r_nonce = nonce
253         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
254
255     def new_msg_id(self):
256         self.msg_id += 1
257         return self.msg_id
258
259     @property
260     def my_dh_pub_key(self):
261         if self.is_initiator:
262             return self.i_dh_data
263         return self.r_dh_data
264
265     @property
266     def peer_dh_pub_key(self):
267         if self.is_initiator:
268             return self.r_dh_data
269         return self.i_dh_data
270
271     @property
272     def natt(self):
273         return self.i_natt or self.r_natt
274
275     def compute_secret(self):
276         priv = self.dh_private_key
277         peer = self.peer_dh_pub_key
278         p, g, l = self.ike_group
279         return pow(
280             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
281         ).to_bytes(l, "big")
282
283     def generate_dh_data(self):
284         # generate DH keys
285         if self.ike_dh not in DH:
286             raise NotImplementedError("%s not in DH group" % self.ike_dh)
287
288         if self.dh_params is None:
289             dhg = DH[self.ike_dh]
290             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
291             self.dh_params = pn.parameters(default_backend())
292
293         priv = self.dh_params.generate_private_key()
294         pub = priv.public_key()
295         x = priv.private_numbers().x
296         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
297         y = pub.public_numbers().y
298
299         if self.is_initiator:
300             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
301         else:
302             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
303
304     def complete_dh_data(self):
305         self.dh_shared_secret = self.compute_secret()
306
307     def calc_child_keys(self, kex=False):
308         prf = self.ike_prf_alg.mod()
309         s = self.i_nonce + self.r_nonce
310         if kex:
311             s = self.dh_shared_secret + s
312         c = self.child_sas[0]
313
314         encr_key_len = self.esp_crypto_key_len
315         integ_key_len = self.esp_integ_alg.key_len
316         salt_len = 0 if integ_key_len else 4
317
318         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
319         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
320
321         pos = 0
322         c.sk_ei = keymat[pos : pos + encr_key_len]
323         pos += encr_key_len
324
325         if integ_key_len:
326             c.sk_ai = keymat[pos : pos + integ_key_len]
327             pos += integ_key_len
328         else:
329             c.salt_ei = keymat[pos : pos + salt_len]
330             pos += salt_len
331
332         c.sk_er = keymat[pos : pos + encr_key_len]
333         pos += encr_key_len
334
335         if integ_key_len:
336             c.sk_ar = keymat[pos : pos + integ_key_len]
337             pos += integ_key_len
338         else:
339             c.salt_er = keymat[pos : pos + salt_len]
340             pos += salt_len
341
342     def calc_prfplus(self, prf, key, seed, length):
343         r = b""
344         t = None
345         x = 1
346         while len(r) < length and x < 255:
347             if t is not None:
348                 s = t
349             else:
350                 s = b""
351             s = s + seed + bytes([x])
352             t = self.calc_prf(prf, key, s)
353             r = r + t
354             x = x + 1
355
356         if x == 255:
357             return None
358         return r
359
360     def calc_prf(self, prf, key, data):
361         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
362         h.update(data)
363         return h.finalize()
364
365     def calc_keys(self, sk_d=None):
366         prf = self.ike_prf_alg.mod()
367         if sk_d is None:
368             # SKEYSEED = prf(Ni | Nr, g^ir)
369             self.skeyseed = self.calc_prf(
370                 prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
371             )
372         else:
373             # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
374             self.skeyseed = self.calc_prf(
375                 prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
376             )
377
378         # calculate S = Ni | Nr | SPIi SPIr
379         s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
380
381         prf_key_trunc = self.ike_prf_alg.trunc_len
382         encr_key_len = self.ike_crypto_key_len
383         tr_prf_key_len = self.ike_prf_alg.key_len
384         integ_key_len = self.ike_integ_alg.key_len
385         if integ_key_len == 0:
386             salt_size = 4
387         else:
388             salt_size = 0
389
390         l = (
391             prf_key_trunc
392             + integ_key_len * 2
393             + encr_key_len * 2
394             + tr_prf_key_len * 2
395             + salt_size * 2
396         )
397         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
398
399         pos = 0
400         self.sk_d = keymat[: pos + prf_key_trunc]
401         pos += prf_key_trunc
402
403         self.sk_ai = keymat[pos : pos + integ_key_len]
404         pos += integ_key_len
405         self.sk_ar = keymat[pos : pos + integ_key_len]
406         pos += integ_key_len
407
408         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
409         pos += encr_key_len + salt_size
410         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
411         pos += encr_key_len + salt_size
412
413         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
414         pos += tr_prf_key_len
415         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
416
417     def generate_authmsg(self, prf, packet):
418         if self.is_initiator:
419             id = self.i_id
420             nonce = self.r_nonce
421             key = self.sk_pi
422         else:
423             id = self.r_id
424             nonce = self.i_nonce
425             key = self.sk_pr
426         data = bytes([self.id_type, 0, 0, 0]) + id
427         id_hash = self.calc_prf(prf, key, data)
428         return packet + nonce + id_hash
429
430     def auth_init(self):
431         prf = self.ike_prf_alg.mod()
432         if self.is_initiator:
433             packet = self.init_req_packet
434         else:
435             packet = self.init_resp_packet
436         authmsg = self.generate_authmsg(prf, raw(packet))
437         if self.auth_method == "shared-key":
438             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
439             self.auth_data = self.calc_prf(prf, psk, authmsg)
440         elif self.auth_method == "rsa-sig":
441             self.auth_data = self.priv_key.sign(
442                 authmsg, padding.PKCS1v15(), hashes.SHA1()
443             )
444         else:
445             raise TypeError("unknown auth method type!")
446
447     def encrypt(self, data, aad=None):
448         data = self.ike_crypto_alg.pad(data)
449         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
450
451     @property
452     def peer_authkey(self):
453         if self.is_initiator:
454             return self.sk_ar
455         return self.sk_ai
456
457     @property
458     def my_authkey(self):
459         if self.is_initiator:
460             return self.sk_ai
461         return self.sk_ar
462
463     @property
464     def my_cryptokey(self):
465         if self.is_initiator:
466             return self.sk_ei
467         return self.sk_er
468
469     @property
470     def peer_cryptokey(self):
471         if self.is_initiator:
472             return self.sk_er
473         return self.sk_ei
474
475     def concat(self, alg, key_len):
476         return alg + "-" + str(key_len * 8)
477
478     @property
479     def vpp_ike_cypto_alg(self):
480         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
481
482     @property
483     def vpp_esp_cypto_alg(self):
484         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
485
486     def verify_hmac(self, ikemsg):
487         integ_trunc = self.ike_integ_alg.trunc_len
488         exp_hmac = ikemsg[-integ_trunc:]
489         data = ikemsg[:-integ_trunc]
490         computed_hmac = self.compute_hmac(
491             self.ike_integ_alg.mod(), self.peer_authkey, data
492         )
493         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
494
495     def compute_hmac(self, integ, key, data):
496         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
497         h.update(data)
498         return h.finalize()
499
500     def decrypt(self, data, aad=None, icv=None):
501         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
502
503     def hmac_and_decrypt(self, ike):
504         ep = ike[ikev2.IKEv2_payload_Encrypted]
505         if self.ike_crypto == "AES-GCM-16ICV":
506             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
507             ct = ep.load[:-GCM_ICV_SIZE]
508             tag = ep.load[-GCM_ICV_SIZE:]
509             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
510         else:
511             self.verify_hmac(raw(ike))
512             integ_trunc = self.ike_integ_alg.trunc_len
513
514             # remove ICV and decrypt payload
515             ct = ep.load[:-integ_trunc]
516             plain = self.decrypt(ct)
517         # remove padding
518         pad_len = plain[-1]
519         return plain[: -pad_len - 1]
520
521     def build_ts_addr(self, ts, version):
522         return {
523             "starting_address_v" + version: ts["start_addr"],
524             "ending_address_v" + version: ts["end_addr"],
525         }
526
527     def generate_ts(self, is_ip4):
528         c = self.child_sas[0]
529         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
530         if is_ip4:
531             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
532             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
533             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
534             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
535         else:
536             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
537             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
538             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
539             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
540
541         if self.is_initiator:
542             return ([ts1], [ts2])
543         return ([ts2], [ts1])
544
545     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
546         if crypto not in CRYPTO_ALGOS:
547             raise TypeError("unsupported encryption algo %r" % crypto)
548         self.ike_crypto = crypto
549         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
550         self.ike_crypto_key_len = crypto_key_len
551
552         if integ not in AUTH_ALGOS:
553             raise TypeError("unsupported auth algo %r" % integ)
554         self.ike_integ = None if integ == "NULL" else integ
555         self.ike_integ_alg = AUTH_ALGOS[integ]
556
557         if prf not in PRF_ALGOS:
558             raise TypeError("unsupported prf algo %r" % prf)
559         self.ike_prf = prf
560         self.ike_prf_alg = PRF_ALGOS[prf]
561         self.ike_dh = dh
562         self.ike_group = DH[self.ike_dh]
563
564     def set_esp_props(self, crypto, crypto_key_len, integ):
565         self.esp_crypto_key_len = crypto_key_len
566         if crypto not in CRYPTO_ALGOS:
567             raise TypeError("unsupported encryption algo %r" % crypto)
568         self.esp_crypto = crypto
569         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
570
571         if integ not in AUTH_ALGOS:
572             raise TypeError("unsupported auth algo %r" % integ)
573         self.esp_integ = None if integ == "NULL" else integ
574         self.esp_integ_alg = AUTH_ALGOS[integ]
575
576     def crypto_attr(self, key_len):
577         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
578             return (0x800E << 16 | key_len << 3, 12)
579         else:
580             raise Exception("unsupported attribute type")
581
582     def ike_crypto_attr(self):
583         return self.crypto_attr(self.ike_crypto_key_len)
584
585     def esp_crypto_attr(self):
586         return self.crypto_attr(self.esp_crypto_key_len)
587
588     def compute_nat_sha1(self, ip, port, rspi=None):
589         if rspi is None:
590             rspi = self.rspi
591         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
592         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
593         digest.update(data)
594         return digest.finalize()
595
596     def clone(self, test, **kwargs):
597         if "spi" not in kwargs:
598             kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
599         if "nonce" not in kwargs:
600             kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
601         if self.child_sas:
602             if "local_ts" not in kwargs:
603                 kwargs["local_ts"] = self.child_sas[0].local_ts
604             if "remote_ts" not in kwargs:
605                 kwargs["remote_ts"] = self.child_sas[0].remote_ts
606         sa = type(self)(
607             test,
608             is_initiator=self.is_initiator,
609             i_id=self.i_id,
610             r_id=self.r_id,
611             id_type=self.id_type,
612             auth_data=self.auth_data,
613             auth_method=self.auth_method,
614             priv_key=self.priv_key,
615             i_natt=self.i_natt,
616             r_natt=self.r_natt,
617             udp_encap=self.udp_encap,
618             **kwargs,
619         )
620         if sa.is_initiator:
621             sa.set_ike_props(
622                 crypto=self.ike_crypto,
623                 crypto_key_len=self.ike_crypto_key_len,
624                 integ=self.ike_integ,
625                 prf=self.ike_prf,
626                 dh=self.ike_dh,
627             )
628             sa.set_esp_props(
629                 crypto=self.esp_crypto,
630                 crypto_key_len=self.esp_crypto_key_len,
631                 integ=self.esp_integ,
632             )
633         return sa
634
635
636 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
637 class IkePeer(VppTestCase):
638     """common class for initiator and responder"""
639
640     @classmethod
641     def setUpClass(cls):
642         import scapy.contrib.ikev2 as _ikev2
643
644         globals()["ikev2"] = _ikev2
645         super(IkePeer, cls).setUpClass()
646         cls.create_pg_interfaces(range(2))
647         for i in cls.pg_interfaces:
648             i.admin_up()
649             i.config_ip4()
650             i.resolve_arp()
651             i.config_ip6()
652             i.resolve_ndp()
653
654     @classmethod
655     def tearDownClass(cls):
656         super(IkePeer, cls).tearDownClass()
657
658     def tearDown(self):
659         super(IkePeer, self).tearDown()
660         if self.del_sa_from_responder:
661             self.initiate_del_sa_from_responder()
662         else:
663             self.initiate_del_sa_from_initiator()
664         r = self.vapi.ikev2_sa_dump()
665         self.assertEqual(len(r), 0)
666         r = self.vapi.ikev2_sa_v2_dump()
667         self.assertEqual(len(r), 0)
668         sas = self.vapi.ipsec_sa_dump()
669         self.assertEqual(len(sas), 0)
670         self.p.remove_vpp_config()
671         self.assertIsNone(self.p.query_vpp_config())
672
673     def setUp(self):
674         super(IkePeer, self).setUp()
675         self.config_tc()
676         self.p.add_vpp_config()
677         self.assertIsNotNone(self.p.query_vpp_config())
678         if self.sa.is_initiator:
679             self.sa.generate_dh_data()
680         self.vapi.cli("ikev2 set logging level 4")
681         self.vapi.cli("event-lo clear")
682
683     def assert_counter(self, count, name, version="ip4"):
684         node_name = "/err/ikev2-%s/" % version + name
685         self.assertEqual(count, self.statistics.get_err_counter(node_name))
686
687     def create_rekey_request(self, kex=False):
688         sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
689         header = ikev2.IKEv2(
690             init_SPI=self.sa.ispi,
691             resp_SPI=self.sa.rspi,
692             id=self.sa.new_msg_id(),
693             flags="Initiator",
694             exch_type="CREATE_CHILD_SA",
695         )
696
697         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
698         return self.create_packet(
699             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
700         )
701
702     def create_sa_rekey_request(self, **kwargs):
703         sa = self.generate_sa_init_payload(**kwargs)
704         header = ikev2.IKEv2(
705             init_SPI=self.sa.ispi,
706             resp_SPI=self.sa.rspi,
707             id=self.sa.new_msg_id(),
708             flags="Initiator",
709             exch_type="CREATE_CHILD_SA",
710         )
711         ike_msg = self.encrypt_ike_msg(header, sa, "SA")
712         return self.create_packet(
713             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
714         )
715
716     def create_empty_request(self):
717         header = ikev2.IKEv2(
718             init_SPI=self.sa.ispi,
719             resp_SPI=self.sa.rspi,
720             id=self.sa.new_msg_id(),
721             flags="Initiator",
722             exch_type="INFORMATIONAL",
723             next_payload="Encrypted",
724         )
725
726         msg = self.encrypt_ike_msg(header, b"", None)
727         return self.create_packet(
728             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
729         )
730
731     def create_packet(
732         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
733     ):
734         if use_ip6:
735             src_ip = src_if.remote_ip6
736             dst_ip = src_if.local_ip6
737             ip_layer = IPv6
738         else:
739             src_ip = src_if.remote_ip4
740             dst_ip = src_if.local_ip4
741             ip_layer = IP
742         res = (
743             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
744             / ip_layer(src=src_ip, dst=dst_ip)
745             / UDP(sport=sport, dport=dport)
746         )
747         if natt:
748             # insert non ESP marker
749             res = res / Raw(b"\x00" * 4)
750         return res / msg
751
752     def verify_udp(self, udp):
753         self.assertEqual(udp.sport, self.sa.sport)
754         self.assertEqual(udp.dport, self.sa.dport)
755
756     def get_ike_header(self, packet):
757         try:
758             ih = packet[ikev2.IKEv2]
759             ih = self.verify_and_remove_non_esp_marker(ih)
760         except IndexError as e:
761             # this is a workaround for getting IKEv2 layer as both ikev2 and
762             # ipsec register for port 4500
763             esp = packet[ESP]
764             ih = self.verify_and_remove_non_esp_marker(esp)
765         self.assertEqual(ih.version, 0x20)
766         self.assertNotIn("Version", ih.flags)
767         return ih
768
769     def verify_and_remove_non_esp_marker(self, packet):
770         if self.sa.natt:
771             # if we are in nat traversal mode check for non esp marker
772             # and remove it
773             data = raw(packet)
774             self.assertEqual(data[:4], b"\x00" * 4)
775             return ikev2.IKEv2(data[4:])
776         else:
777             return packet
778
779     def encrypt_ike_msg(self, header, plain, first_payload):
780         if self.sa.ike_crypto == "AES-GCM-16ICV":
781             data = self.sa.ike_crypto_alg.pad(raw(plain))
782             plen = (
783                 len(data)
784                 + GCM_IV_SIZE
785                 + GCM_ICV_SIZE
786                 + len(ikev2.IKEv2_payload_Encrypted())
787             )
788             tlen = plen + len(ikev2.IKEv2())
789
790             # prepare aad data
791             sk_p = ikev2.IKEv2_payload_Encrypted(
792                 next_payload=first_payload, length=plen
793             )
794             header.length = tlen
795             res = header / sk_p
796             encr = self.sa.encrypt(raw(plain), raw(res))
797             sk_p = ikev2.IKEv2_payload_Encrypted(
798                 next_payload=first_payload, length=plen, load=encr
799             )
800             res = header / sk_p
801         else:
802             encr = self.sa.encrypt(raw(plain))
803             trunc_len = self.sa.ike_integ_alg.trunc_len
804             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
805             tlen = plen + len(ikev2.IKEv2())
806
807             sk_p = ikev2.IKEv2_payload_Encrypted(
808                 next_payload=first_payload, length=plen, load=encr
809             )
810             header.length = tlen
811             res = header / sk_p
812
813             integ_data = raw(res)
814             hmac_data = self.sa.compute_hmac(
815                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
816             )
817             res = res / Raw(hmac_data[:trunc_len])
818         assert len(res) == tlen
819         return res
820
821     def verify_udp_encap(self, ipsec_sa):
822         e = VppEnum.vl_api_ipsec_sad_flags_t
823         if self.sa.udp_encap or self.sa.natt:
824             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
825         else:
826             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
827
828     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
829         sas = self.vapi.ipsec_sa_dump()
830         if sa_count is None:
831             if is_rekey:
832                 # after rekey there is a short period of time in which old
833                 # inbound SA is still present
834                 sa_count = 3
835             else:
836                 sa_count = 2
837         self.assertEqual(len(sas), sa_count)
838         if self.sa.is_initiator:
839             if is_rekey:
840                 sa0 = sas[0].entry
841                 sa1 = sas[2].entry
842             else:
843                 sa0 = sas[0].entry
844                 sa1 = sas[1].entry
845         else:
846             if is_rekey:
847                 sa0 = sas[2].entry
848                 sa1 = sas[0].entry
849             else:
850                 sa1 = sas[0].entry
851                 sa0 = sas[1].entry
852
853         c = self.sa.child_sas[0]
854
855         self.verify_udp_encap(sa0)
856         self.verify_udp_encap(sa1)
857         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
858         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
859         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
860
861         if self.sa.esp_integ is None:
862             vpp_integ_alg = 0
863         else:
864             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
865         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
866         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
867
868         # verify crypto keys
869         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
870         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
871         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
872         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
873
874         # verify integ keys
875         if vpp_integ_alg:
876             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
877             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
878             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
879             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
880         else:
881             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
882             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
883
884     def verify_keymat(self, api_keys, keys, name):
885         km = getattr(keys, name)
886         api_km = getattr(api_keys, name)
887         api_km_len = getattr(api_keys, name + "_len")
888         self.assertEqual(len(km), api_km_len)
889         self.assertEqual(km, api_km[:api_km_len])
890
891     def verify_id(self, api_id, exp_id):
892         self.assertEqual(api_id.type, IDType.value(exp_id.type))
893         self.assertEqual(api_id.data_len, exp_id.data_len)
894         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
895
896     def verify_ike_sas(self, is_rekey=False):
897         r = self.vapi.ikev2_sa_dump()
898         if is_rekey:
899             sa_count = 2
900             sa = r[1].sa
901         else:
902             sa_count = 1
903             sa = r[0].sa
904         self.assertEqual(len(r), sa_count)
905         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
906         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
907         if self.ip6:
908             if self.sa.is_initiator:
909                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
910                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
911             else:
912                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
913                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
914         else:
915             if self.sa.is_initiator:
916                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
917                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
918             else:
919                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
920                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
921         self.verify_keymat(sa.keys, self.sa, "sk_d")
922         self.verify_keymat(sa.keys, self.sa, "sk_ai")
923         self.verify_keymat(sa.keys, self.sa, "sk_ar")
924         self.verify_keymat(sa.keys, self.sa, "sk_ei")
925         self.verify_keymat(sa.keys, self.sa, "sk_er")
926         self.verify_keymat(sa.keys, self.sa, "sk_pi")
927         self.verify_keymat(sa.keys, self.sa, "sk_pr")
928
929         self.assertEqual(sa.i_id.type, self.sa.id_type)
930         self.assertEqual(sa.r_id.type, self.sa.id_type)
931         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
932         self.assertEqual(sa.r_id.data_len, len(self.idr))
933         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
934         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
935
936         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
937         self.verify_nonce(n, self.sa.i_nonce)
938         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
939         self.verify_nonce(n, self.sa.r_nonce)
940
941         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
942         if is_rekey:
943             self.assertEqual(len(r), 0)
944             return
945
946         self.assertEqual(len(r), 1)
947         csa = r[0].child_sa
948         self.assertEqual(csa.sa_index, sa.sa_index)
949         c = self.sa.child_sas[0]
950         if hasattr(c, "sk_ai"):
951             self.verify_keymat(csa.keys, c, "sk_ai")
952             self.verify_keymat(csa.keys, c, "sk_ar")
953         self.verify_keymat(csa.keys, c, "sk_ei")
954         self.verify_keymat(csa.keys, c, "sk_er")
955         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
956         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
957
958         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
959         tsi = tsi[0]
960         tsr = tsr[0]
961         r = self.vapi.ikev2_traffic_selector_dump(
962             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
963         )
964         self.assertEqual(len(r), 1)
965         ts = r[0].ts
966         self.verify_ts(r[0].ts, tsi[0], True)
967
968         r = self.vapi.ikev2_traffic_selector_dump(
969             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
970         )
971         self.assertEqual(len(r), 1)
972         self.verify_ts(r[0].ts, tsr[0], False)
973
974     def verify_ike_sas_v2(self):
975         r = self.vapi.ikev2_sa_v2_dump()
976         self.assertEqual(len(r), 1)
977         sa = r[0].sa
978         self.assertEqual(self.p.profile_name, sa.profile_name)
979         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
980         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
981         if self.ip6:
982             if self.sa.is_initiator:
983                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
984                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
985             else:
986                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
987                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
988         else:
989             if self.sa.is_initiator:
990                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
991                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
992             else:
993                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
994                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
995         self.verify_keymat(sa.keys, self.sa, "sk_d")
996         self.verify_keymat(sa.keys, self.sa, "sk_ai")
997         self.verify_keymat(sa.keys, self.sa, "sk_ar")
998         self.verify_keymat(sa.keys, self.sa, "sk_ei")
999         self.verify_keymat(sa.keys, self.sa, "sk_er")
1000         self.verify_keymat(sa.keys, self.sa, "sk_pi")
1001         self.verify_keymat(sa.keys, self.sa, "sk_pr")
1002
1003         self.assertEqual(sa.i_id.type, self.sa.id_type)
1004         self.assertEqual(sa.r_id.type, self.sa.id_type)
1005         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
1006         self.assertEqual(sa.r_id.data_len, len(self.idr))
1007         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
1008         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
1009
1010         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
1011         self.assertEqual(len(r), 1)
1012         csa = r[0].child_sa
1013         self.assertEqual(csa.sa_index, sa.sa_index)
1014         c = self.sa.child_sas[0]
1015         if hasattr(c, "sk_ai"):
1016             self.verify_keymat(csa.keys, c, "sk_ai")
1017             self.verify_keymat(csa.keys, c, "sk_ar")
1018         self.verify_keymat(csa.keys, c, "sk_ei")
1019         self.verify_keymat(csa.keys, c, "sk_er")
1020         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
1021         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
1022
1023         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1024         tsi = tsi[0]
1025         tsr = tsr[0]
1026         r = self.vapi.ikev2_traffic_selector_dump(
1027             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1028         )
1029         self.assertEqual(len(r), 1)
1030         ts = r[0].ts
1031         self.verify_ts(r[0].ts, tsi[0], True)
1032
1033         r = self.vapi.ikev2_traffic_selector_dump(
1034             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
1035         )
1036         self.assertEqual(len(r), 1)
1037         self.verify_ts(r[0].ts, tsr[0], False)
1038
1039         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
1040         self.verify_nonce(n, self.sa.i_nonce)
1041         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
1042         self.verify_nonce(n, self.sa.r_nonce)
1043
1044     def verify_nonce(self, api_nonce, nonce):
1045         self.assertEqual(api_nonce.data_len, len(nonce))
1046         self.assertEqual(api_nonce.nonce, nonce)
1047
1048     def verify_ts(self, api_ts, ts, is_initiator):
1049         if is_initiator:
1050             self.assertTrue(api_ts.is_local)
1051         else:
1052             self.assertFalse(api_ts.is_local)
1053
1054         if self.p.ts_is_ip4:
1055             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
1056             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
1057         else:
1058             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
1059             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
1060         self.assertEqual(api_ts.start_port, ts.start_port)
1061         self.assertEqual(api_ts.end_port, ts.end_port)
1062         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
1063
1064
1065 class TemplateInitiator(IkePeer):
1066     """initiator test template"""
1067
1068     def initiate_del_sa_from_initiator(self):
1069         ispi = int.from_bytes(self.sa.ispi, "little")
1070         self.pg0.enable_capture()
1071         self.pg_start()
1072         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
1073         capture = self.pg0.get_capture(1)
1074         ih = self.get_ike_header(capture[0])
1075         self.assertNotIn("Response", ih.flags)
1076         self.assertIn("Initiator", ih.flags)
1077         self.assertEqual(ih.init_SPI, self.sa.ispi)
1078         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1079         plain = self.sa.hmac_and_decrypt(ih)
1080         d = ikev2.IKEv2_payload_Delete(plain)
1081         self.assertEqual(d.proto, 1)  # proto=IKEv2
1082         header = ikev2.IKEv2(
1083             init_SPI=self.sa.ispi,
1084             resp_SPI=self.sa.rspi,
1085             flags="Response",
1086             exch_type="INFORMATIONAL",
1087             id=ih.id,
1088             next_payload="Encrypted",
1089         )
1090         resp = self.encrypt_ike_msg(header, b"", None)
1091         self.send_and_assert_no_replies(self.pg0, resp)
1092
1093     def verify_del_sa(self, packet):
1094         ih = self.get_ike_header(packet)
1095         self.assertEqual(ih.id, self.sa.msg_id)
1096         self.assertEqual(ih.exch_type, 37)  # exchange informational
1097         self.assertIn("Response", ih.flags)
1098         self.assertIn("Initiator", ih.flags)
1099         plain = self.sa.hmac_and_decrypt(ih)
1100         self.assertEqual(plain, b"")
1101
1102     def initiate_del_sa_from_responder(self):
1103         header = ikev2.IKEv2(
1104             init_SPI=self.sa.ispi,
1105             resp_SPI=self.sa.rspi,
1106             exch_type="INFORMATIONAL",
1107             id=self.sa.new_msg_id(),
1108         )
1109         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1110         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1111         packet = self.create_packet(
1112             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1113         )
1114         self.pg0.add_stream(packet)
1115         self.pg0.enable_capture()
1116         self.pg_start()
1117         capture = self.pg0.get_capture(1)
1118         self.verify_del_sa(capture[0])
1119
1120     @staticmethod
1121     def find_notify_payload(packet, notify_type):
1122         n = packet[ikev2.IKEv2_payload_Notify]
1123         while n is not None:
1124             if n.type == notify_type:
1125                 return n
1126             n = n.payload
1127         return None
1128
1129     def verify_nat_detection(self, packet):
1130         if self.ip6:
1131             iph = packet[IPv6]
1132         else:
1133             iph = packet[IP]
1134         udp = packet[UDP]
1135
1136         # NAT_DETECTION_SOURCE_IP
1137         s = self.find_notify_payload(packet, 16388)
1138         self.assertIsNotNone(s)
1139         src_sha = self.sa.compute_nat_sha1(
1140             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
1141         )
1142         self.assertEqual(s.load, src_sha)
1143
1144         # NAT_DETECTION_DESTINATION_IP
1145         s = self.find_notify_payload(packet, 16389)
1146         self.assertIsNotNone(s)
1147         dst_sha = self.sa.compute_nat_sha1(
1148             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1149         )
1150         self.assertEqual(s.load, dst_sha)
1151
1152     def verify_sa_init_request(self, packet):
1153         udp = packet[UDP]
1154         self.sa.dport = udp.sport
1155         ih = packet[ikev2.IKEv2]
1156         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1157         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1158         self.sa.ispi = ih.init_SPI
1159         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1160         self.assertIn("Initiator", ih.flags)
1161         self.assertNotIn("Response", ih.flags)
1162         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1163         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1164
1165         prop = packet[ikev2.IKEv2_payload_Proposal]
1166         self.assertEqual(prop.proto, 1)  # proto = ikev2
1167         self.assertEqual(prop.proposal, 1)
1168         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1169         self.assertEqual(
1170             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1171         )
1172         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1173         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1174         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1175         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1176
1177         self.verify_nat_detection(packet)
1178         self.sa.set_ike_props(
1179             crypto="AES-GCM-16ICV",
1180             crypto_key_len=32,
1181             integ="NULL",
1182             prf="PRF_HMAC_SHA2_256",
1183             dh="3072MODPgr",
1184         )
1185         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1186         self.sa.generate_dh_data()
1187         self.sa.complete_dh_data()
1188         self.sa.calc_keys()
1189
1190     def update_esp_transforms(self, trans, sa):
1191         while trans:
1192             if trans.transform_type == 1:  # ecryption
1193                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1194             elif trans.transform_type == 3:  # integrity
1195                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1196             trans = trans.payload
1197
1198     def verify_sa_auth_req(self, packet):
1199         udp = packet[UDP]
1200         self.sa.dport = udp.sport
1201         ih = self.get_ike_header(packet)
1202         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1203         self.assertEqual(ih.init_SPI, self.sa.ispi)
1204         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1205         self.assertIn("Initiator", ih.flags)
1206         self.assertNotIn("Response", ih.flags)
1207
1208         udp = packet[UDP]
1209         self.verify_udp(udp)
1210         self.assertEqual(ih.id, self.sa.msg_id + 1)
1211         self.sa.msg_id += 1
1212         plain = self.sa.hmac_and_decrypt(ih)
1213         idi = ikev2.IKEv2_payload_IDi(plain)
1214         self.assertEqual(idi.load, self.sa.i_id)
1215         if self.no_idr_auth:
1216             self.assertEqual(idi.next_payload, 39)  # AUTH
1217         else:
1218             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1219             self.assertEqual(idr.load, self.sa.r_id)
1220         prop = idi[ikev2.IKEv2_payload_Proposal]
1221         c = self.sa.child_sas[0]
1222         c.ispi = prop.SPI
1223         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1224
1225     def send_init_response(self):
1226         tr_attr = self.sa.ike_crypto_attr()
1227         trans = (
1228             ikev2.IKEv2_payload_Transform(
1229                 transform_type="Encryption",
1230                 transform_id=self.sa.ike_crypto,
1231                 length=tr_attr[1],
1232                 key_length=tr_attr[0],
1233             )
1234             / ikev2.IKEv2_payload_Transform(
1235                 transform_type="Integrity", transform_id=self.sa.ike_integ
1236             )
1237             / ikev2.IKEv2_payload_Transform(
1238                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1239             )
1240             / ikev2.IKEv2_payload_Transform(
1241                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1242             )
1243         )
1244         props = ikev2.IKEv2_payload_Proposal(
1245             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1246         )
1247
1248         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1249         if self.sa.natt:
1250             dst_address = b"\x0a\x0a\x0a\x0a"
1251         else:
1252             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1253         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1254         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1255
1256         self.sa.init_resp_packet = (
1257             ikev2.IKEv2(
1258                 init_SPI=self.sa.ispi,
1259                 resp_SPI=self.sa.rspi,
1260                 exch_type="IKE_SA_INIT",
1261                 flags="Response",
1262             )
1263             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1264             / ikev2.IKEv2_payload_KE(
1265                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1266             )
1267             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1268             / ikev2.IKEv2_payload_Notify(
1269                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1270             )
1271             / ikev2.IKEv2_payload_Notify(
1272                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1273             )
1274         )
1275
1276         ike_msg = self.create_packet(
1277             self.pg0,
1278             self.sa.init_resp_packet,
1279             self.sa.sport,
1280             self.sa.dport,
1281             False,
1282             self.ip6,
1283         )
1284         self.pg_send(self.pg0, ike_msg)
1285         capture = self.pg0.get_capture(1)
1286         self.verify_sa_auth_req(capture[0])
1287
1288     def initiate_sa_init(self):
1289         self.pg0.enable_capture()
1290         self.pg_start()
1291         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1292
1293         capture = self.pg0.get_capture(1)
1294         self.verify_sa_init_request(capture[0])
1295         self.send_init_response()
1296
1297     def send_auth_response(self):
1298         tr_attr = self.sa.esp_crypto_attr()
1299         trans = (
1300             ikev2.IKEv2_payload_Transform(
1301                 transform_type="Encryption",
1302                 transform_id=self.sa.esp_crypto,
1303                 length=tr_attr[1],
1304                 key_length=tr_attr[0],
1305             )
1306             / ikev2.IKEv2_payload_Transform(
1307                 transform_type="Integrity", transform_id=self.sa.esp_integ
1308             )
1309             / ikev2.IKEv2_payload_Transform(
1310                 transform_type="Extended Sequence Number", transform_id="No ESN"
1311             )
1312             / ikev2.IKEv2_payload_Transform(
1313                 transform_type="Extended Sequence Number", transform_id="ESN"
1314             )
1315         )
1316
1317         c = self.sa.child_sas[0]
1318         props = ikev2.IKEv2_payload_Proposal(
1319             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1320         )
1321
1322         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1323         plain = (
1324             ikev2.IKEv2_payload_IDi(
1325                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1326             )
1327             / ikev2.IKEv2_payload_IDr(
1328                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1329             )
1330             / ikev2.IKEv2_payload_AUTH(
1331                 next_payload="SA",
1332                 auth_type=AuthMethod.value(self.sa.auth_method),
1333                 load=self.sa.auth_data,
1334             )
1335             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1336             / ikev2.IKEv2_payload_TSi(
1337                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1338             )
1339             / ikev2.IKEv2_payload_TSr(
1340                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1341             )
1342             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1343         )
1344
1345         header = ikev2.IKEv2(
1346             init_SPI=self.sa.ispi,
1347             resp_SPI=self.sa.rspi,
1348             id=self.sa.new_msg_id(),
1349             flags="Response",
1350             exch_type="IKE_AUTH",
1351         )
1352
1353         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1354         packet = self.create_packet(
1355             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1356         )
1357         self.pg_send(self.pg0, packet)
1358
1359     def test_initiator(self):
1360         self.initiate_sa_init()
1361         self.sa.auth_init()
1362         self.sa.calc_child_keys()
1363         self.send_auth_response()
1364         self.verify_ike_sas()
1365         self.verify_ike_sas_v2()
1366
1367
1368 class TemplateResponder(IkePeer):
1369     """responder test template"""
1370
1371     def initiate_del_sa_from_responder(self):
1372         self.pg0.enable_capture()
1373         self.pg_start()
1374         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1375         capture = self.pg0.get_capture(1)
1376         ih = self.get_ike_header(capture[0])
1377         self.assertNotIn("Response", ih.flags)
1378         self.assertNotIn("Initiator", ih.flags)
1379         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1380         plain = self.sa.hmac_and_decrypt(ih)
1381         d = ikev2.IKEv2_payload_Delete(plain)
1382         self.assertEqual(d.proto, 1)  # proto=IKEv2
1383         self.assertEqual(ih.init_SPI, self.sa.ispi)
1384         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1385         header = ikev2.IKEv2(
1386             init_SPI=self.sa.ispi,
1387             resp_SPI=self.sa.rspi,
1388             flags="Initiator+Response",
1389             exch_type="INFORMATIONAL",
1390             id=ih.id,
1391             next_payload="Encrypted",
1392         )
1393         resp = self.encrypt_ike_msg(header, b"", None)
1394         self.send_and_assert_no_replies(self.pg0, resp)
1395
1396     def verify_del_sa(self, packet):
1397         ih = self.get_ike_header(packet)
1398         self.assertEqual(ih.id, self.sa.msg_id)
1399         self.assertEqual(ih.exch_type, 37)  # exchange informational
1400         self.assertIn("Response", ih.flags)
1401         self.assertNotIn("Initiator", ih.flags)
1402         self.assertEqual(ih.next_payload, 46)  # Encrypted
1403         self.assertEqual(ih.init_SPI, self.sa.ispi)
1404         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1405         plain = self.sa.hmac_and_decrypt(ih)
1406         self.assertEqual(plain, b"")
1407
1408     def initiate_del_sa_from_initiator(self):
1409         header = ikev2.IKEv2(
1410             init_SPI=self.sa.ispi,
1411             resp_SPI=self.sa.rspi,
1412             flags="Initiator",
1413             exch_type="INFORMATIONAL",
1414             id=self.sa.new_msg_id(),
1415         )
1416         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1417         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1418         packet = self.create_packet(
1419             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1420         )
1421         self.pg0.add_stream(packet)
1422         self.pg0.enable_capture()
1423         self.pg_start()
1424         capture = self.pg0.get_capture(1)
1425         self.verify_del_sa(capture[0])
1426
1427     def generate_sa_init_payload(
1428         self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
1429     ):
1430         tr_attr = self.sa.ike_crypto_attr()
1431         trans = (
1432             ikev2.IKEv2_payload_Transform(
1433                 transform_type="Encryption",
1434                 transform_id=self.sa.ike_crypto,
1435                 length=tr_attr[1],
1436                 key_length=tr_attr[0],
1437             )
1438             / ikev2.IKEv2_payload_Transform(
1439                 transform_type="Integrity", transform_id=self.sa.ike_integ
1440             )
1441             / ikev2.IKEv2_payload_Transform(
1442                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1443             )
1444             / ikev2.IKEv2_payload_Transform(
1445                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1446             )
1447         )
1448
1449         if spi is None:
1450             pargs = {}
1451         else:
1452             pargs = {"SPI": spi, "SPIsize": len(spi)}
1453         props = ikev2.IKEv2_payload_Proposal(
1454             proposal=1,
1455             proto="IKEv2",
1456             trans_nb=4,
1457             trans=trans,
1458             **pargs,
1459         )
1460
1461         return (
1462             ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1463             / ikev2.IKEv2_payload_KE(
1464                 next_payload="Nonce",
1465                 group=self.sa.ike_dh,
1466                 load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
1467             )
1468             / ikev2.IKEv2_payload_Nonce(
1469                 next_payload=next_payload,
1470                 load=self.sa.i_nonce if nonce is None else nonce,
1471             )
1472         )
1473
1474     def send_sa_init_req(self):
1475         self.sa.init_req_packet = ikev2.IKEv2(
1476             init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1477         ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
1478
1479         if not self.ip6:
1480             if self.sa.i_natt:
1481                 src_address = b"\x0a\x0a\x0a\x01"
1482             else:
1483                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1484
1485             if self.sa.r_natt:
1486                 dst_address = b"\x0a\x0a\x0a\x0a"
1487             else:
1488                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1489
1490             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1491             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1492             nat_src_detection = ikev2.IKEv2_payload_Notify(
1493                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1494             )
1495             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1496                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1497             )
1498             self.sa.init_req_packet = (
1499                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1500             )
1501
1502         ike_msg = self.create_packet(
1503             self.pg0,
1504             self.sa.init_req_packet,
1505             self.sa.sport,
1506             self.sa.dport,
1507             self.sa.natt,
1508             self.ip6,
1509         )
1510         self.pg0.add_stream(ike_msg)
1511         self.pg0.enable_capture()
1512         self.pg_start()
1513         capture = self.pg0.get_capture(1)
1514         self.verify_sa_init(capture[0])
1515
1516     def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1517         tr_attr = self.sa.esp_crypto_attr()
1518         last_payload = last_payload or "Notify"
1519         trans_nb = 4
1520         trans = (
1521             ikev2.IKEv2_payload_Transform(
1522                 transform_type="Encryption",
1523                 transform_id=self.sa.esp_crypto,
1524                 length=tr_attr[1],
1525                 key_length=tr_attr[0],
1526             )
1527             / ikev2.IKEv2_payload_Transform(
1528                 transform_type="Integrity", transform_id=self.sa.esp_integ
1529             )
1530             / ikev2.IKEv2_payload_Transform(
1531                 transform_type="Extended Sequence Number", transform_id="No ESN"
1532             )
1533             / ikev2.IKEv2_payload_Transform(
1534                 transform_type="Extended Sequence Number", transform_id="ESN"
1535             )
1536         )
1537
1538         if kex:
1539             trans_nb += 1
1540             trans /= ikev2.IKEv2_payload_Transform(
1541                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1542             )
1543
1544         c = self.sa.child_sas[0]
1545         props = ikev2.IKEv2_payload_Proposal(
1546             proposal=1,
1547             proto="ESP",
1548             SPIsize=4,
1549             SPI=c.ispi,
1550             trans_nb=trans_nb,
1551             trans=trans,
1552         )
1553
1554         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1555         plain = (
1556             ikev2.IKEv2_payload_AUTH(
1557                 next_payload="SA",
1558                 auth_type=AuthMethod.value(self.sa.auth_method),
1559                 load=self.sa.auth_data,
1560             )
1561             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1562             / ikev2.IKEv2_payload_TSi(
1563                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1564             )
1565             / ikev2.IKEv2_payload_TSr(
1566                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1567             )
1568         )
1569
1570         if is_rekey:
1571             first_payload = "Nonce"
1572             if kex:
1573                 head = ikev2.IKEv2_payload_Nonce(
1574                     load=self.sa.i_nonce, next_payload="KE"
1575                 ) / ikev2.IKEv2_payload_KE(
1576                     group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1577                 )
1578             else:
1579                 head = ikev2.IKEv2_payload_Nonce(
1580                     load=self.sa.i_nonce, next_payload="SA"
1581                 )
1582             plain = (
1583                 head
1584                 / plain
1585                 / ikev2.IKEv2_payload_Notify(
1586                     type="REKEY_SA",
1587                     proto="ESP",
1588                     SPI=c.ispi,
1589                     length=8 + len(c.ispi),
1590                     next_payload="Notify",
1591                 )
1592                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1593             )
1594         else:
1595             first_payload = "IDi"
1596             if self.no_idr_auth:
1597                 ids = ikev2.IKEv2_payload_IDi(
1598                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1599                 )
1600             else:
1601                 ids = ikev2.IKEv2_payload_IDi(
1602                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1603                 ) / ikev2.IKEv2_payload_IDr(
1604                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1605                 )
1606             plain = ids / plain
1607         return plain, first_payload
1608
1609     def send_sa_auth(self):
1610         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1611         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1612         header = ikev2.IKEv2(
1613             init_SPI=self.sa.ispi,
1614             resp_SPI=self.sa.rspi,
1615             id=self.sa.new_msg_id(),
1616             flags="Initiator",
1617             exch_type="IKE_AUTH",
1618         )
1619
1620         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1621         packet = self.create_packet(
1622             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1623         )
1624         self.pg0.add_stream(packet)
1625         self.pg0.enable_capture()
1626         self.pg_start()
1627         capture = self.pg0.get_capture(1)
1628         self.verify_sa_auth_resp(capture[0])
1629
1630     def verify_sa_init(self, packet):
1631         ih = self.get_ike_header(packet)
1632
1633         self.assertEqual(ih.id, self.sa.msg_id)
1634         self.assertEqual(ih.exch_type, 34)
1635         self.assertIn("Response", ih.flags)
1636         self.assertEqual(ih.init_SPI, self.sa.ispi)
1637         self.assertNotEqual(ih.resp_SPI, 0)
1638         self.sa.rspi = ih.resp_SPI
1639         try:
1640             sa = ih[ikev2.IKEv2_payload_SA]
1641             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1642             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1643         except IndexError as e:
1644             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1645             self.logger.error(ih.show())
1646             raise
1647         self.sa.complete_dh_data()
1648         self.sa.calc_keys()
1649         self.sa.auth_init()
1650
1651     def verify_sa_auth_resp(self, packet):
1652         ike = self.get_ike_header(packet)
1653         udp = packet[UDP]
1654         self.verify_udp(udp)
1655         self.assertEqual(ike.id, self.sa.msg_id)
1656         plain = self.sa.hmac_and_decrypt(ike)
1657         idr = ikev2.IKEv2_payload_IDr(plain)
1658         prop = idr[ikev2.IKEv2_payload_Proposal]
1659         self.assertEqual(prop.SPIsize, 4)
1660         self.sa.child_sas[0].rspi = prop.SPI
1661         self.sa.calc_child_keys()
1662
1663     IKE_NODE_SUFFIX = "ip4"
1664
1665     def verify_counters(self):
1666         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1667         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1668         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1669
1670         r = self.vapi.ikev2_sa_dump()
1671         s = r[0].sa.stats
1672         self.assertEqual(1, s.n_sa_auth_req)
1673         self.assertEqual(1, s.n_sa_init_req)
1674
1675         r = self.vapi.ikev2_sa_v2_dump()
1676         s = r[0].sa.stats
1677         self.assertEqual(1, s.n_sa_auth_req)
1678         self.assertEqual(1, s.n_sa_init_req)
1679
1680     def test_responder(self):
1681         self.send_sa_init_req()
1682         self.send_sa_auth()
1683         self.verify_ipsec_sas()
1684         self.verify_ike_sas()
1685         self.verify_ike_sas_v2()
1686         self.verify_counters()
1687
1688
1689 class Ikev2Params(object):
1690     def config_params(self, params={}):
1691         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1692         ei = VppEnum.vl_api_ipsec_integ_alg_t
1693         self.vpp_enums = {
1694             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1695             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1696             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1697             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1698             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1699             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1700             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1701             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1702             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1703             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1704         }
1705
1706         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1707         if dpd_disabled:
1708             self.vapi.cli("ikev2 dpd disable")
1709         self.del_sa_from_responder = (
1710             False
1711             if "del_sa_from_responder" not in params
1712             else params["del_sa_from_responder"]
1713         )
1714         i_natt = False if "i_natt" not in params else params["i_natt"]
1715         r_natt = False if "r_natt" not in params else params["r_natt"]
1716         self.p = Profile(self, "pr1")
1717         self.ip6 = False if "ip6" not in params else params["ip6"]
1718
1719         if "auth" in params and params["auth"] == "rsa-sig":
1720             auth_method = "rsa-sig"
1721             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1722             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1723
1724             client_file = work_dir + params["client-cert"]
1725             server_pem = open(work_dir + params["server-cert"]).read()
1726             client_priv = open(work_dir + params["client-key"]).read()
1727             client_priv = load_pem_private_key(
1728                 str.encode(client_priv), None, default_backend()
1729             )
1730             self.peer_cert = x509.load_pem_x509_certificate(
1731                 str.encode(server_pem), default_backend()
1732             )
1733             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1734             auth_data = None
1735         else:
1736             auth_data = b"$3cr3tpa$$w0rd"
1737             self.p.add_auth(method="shared-key", data=auth_data)
1738             auth_method = "shared-key"
1739             client_priv = None
1740
1741         is_init = True if "is_initiator" not in params else params["is_initiator"]
1742         self.no_idr_auth = params.get("no_idr_in_auth", False)
1743
1744         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1745         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1746         r_id = self.idr = idr["data"]
1747         i_id = self.idi = idi["data"]
1748         if is_init:
1749             # scapy is initiator, VPP is responder
1750             self.p.add_local_id(**idr)
1751             self.p.add_remote_id(**idi)
1752             if self.no_idr_auth:
1753                 r_id = None
1754         else:
1755             # VPP is initiator, scapy is responder
1756             self.p.add_local_id(**idi)
1757             if not self.no_idr_auth:
1758                 self.p.add_remote_id(**idr)
1759
1760         loc_ts = (
1761             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1762             if "loc_ts" not in params
1763             else params["loc_ts"]
1764         )
1765         rem_ts = (
1766             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1767             if "rem_ts" not in params
1768             else params["rem_ts"]
1769         )
1770         self.p.add_local_ts(**loc_ts)
1771         self.p.add_remote_ts(**rem_ts)
1772         if "responder" in params:
1773             self.p.add_responder(params["responder"])
1774         if "ike_transforms" in params:
1775             self.p.add_ike_transforms(params["ike_transforms"])
1776         if "esp_transforms" in params:
1777             self.p.add_esp_transforms(params["esp_transforms"])
1778
1779         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1780         if udp_encap:
1781             self.p.set_udp_encap(True)
1782
1783         if "responder_hostname" in params:
1784             hn = params["responder_hostname"]
1785             self.p.add_responder_hostname(hn)
1786
1787             # configure static dns record
1788             self.vapi.dns_name_server_add_del(
1789                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1790             )
1791             self.vapi.dns_enable_disable(enable=1)
1792
1793             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1794             self.vapi.cli(cmd)
1795
1796         self.sa = IKEv2SA(
1797             self,
1798             i_id=i_id,
1799             r_id=r_id,
1800             is_initiator=is_init,
1801             id_type=self.p.local_id["id_type"],
1802             i_natt=i_natt,
1803             r_natt=r_natt,
1804             priv_key=client_priv,
1805             auth_method=auth_method,
1806             nonce=params.get("nonce"),
1807             auth_data=auth_data,
1808             udp_encap=udp_encap,
1809             local_ts=self.p.remote_ts,
1810             remote_ts=self.p.local_ts,
1811         )
1812
1813         if is_init:
1814             ike_crypto = (
1815                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1816             )
1817             ike_integ = (
1818                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1819             )
1820             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1821
1822             esp_crypto = (
1823                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1824             )
1825             esp_integ = (
1826                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1827             )
1828
1829             self.sa.set_ike_props(
1830                 crypto=ike_crypto[0],
1831                 crypto_key_len=ike_crypto[1],
1832                 integ=ike_integ,
1833                 prf="PRF_HMAC_SHA2_256",
1834                 dh=ike_dh,
1835             )
1836             self.sa.set_esp_props(
1837                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1838             )
1839
1840
1841 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1842 class TestApi(VppTestCase):
1843     """Test IKEV2 API"""
1844
1845     @classmethod
1846     def setUpClass(cls):
1847         super(TestApi, cls).setUpClass()
1848
1849     @classmethod
1850     def tearDownClass(cls):
1851         super(TestApi, cls).tearDownClass()
1852
1853     def tearDown(self):
1854         super(TestApi, self).tearDown()
1855         self.p1.remove_vpp_config()
1856         self.p2.remove_vpp_config()
1857         r = self.vapi.ikev2_profile_dump()
1858         self.assertEqual(len(r), 0)
1859
1860     def configure_profile(self, cfg):
1861         p = Profile(self, cfg["name"])
1862         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1863         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1864         p.add_local_ts(**cfg["loc_ts"])
1865         p.add_remote_ts(**cfg["rem_ts"])
1866         p.add_responder(cfg["responder"])
1867         p.add_ike_transforms(cfg["ike_ts"])
1868         p.add_esp_transforms(cfg["esp_ts"])
1869         p.add_auth(**cfg["auth"])
1870         p.set_udp_encap(cfg["udp_encap"])
1871         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1872         if "lifetime_data" in cfg:
1873             p.set_lifetime_data(cfg["lifetime_data"])
1874         if "tun_itf" in cfg:
1875             p.set_tunnel_interface(cfg["tun_itf"])
1876         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1877             p.disable_natt()
1878         p.add_vpp_config()
1879         return p
1880
1881     def test_profile_api(self):
1882         """test profile dump API"""
1883         loc_ts4 = {
1884             "proto": 8,
1885             "start_port": 1,
1886             "end_port": 19,
1887             "start_addr": "3.3.3.2",
1888             "end_addr": "3.3.3.3",
1889         }
1890         rem_ts4 = {
1891             "proto": 9,
1892             "start_port": 10,
1893             "end_port": 119,
1894             "start_addr": "4.5.76.80",
1895             "end_addr": "2.3.4.6",
1896         }
1897
1898         loc_ts6 = {
1899             "proto": 8,
1900             "start_port": 1,
1901             "end_port": 19,
1902             "start_addr": "ab::1",
1903             "end_addr": "ab::4",
1904         }
1905         rem_ts6 = {
1906             "proto": 9,
1907             "start_port": 10,
1908             "end_port": 119,
1909             "start_addr": "cd::12",
1910             "end_addr": "cd::13",
1911         }
1912
1913         conf = {
1914             "p1": {
1915                 "name": "p1",
1916                 "natt_disabled": True,
1917                 "loc_id": ("fqdn", b"vpp.home"),
1918                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1919                 "loc_ts": loc_ts4,
1920                 "rem_ts": rem_ts4,
1921                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1922                 "ike_ts": {
1923                     "crypto_alg": 20,
1924                     "crypto_key_size": 32,
1925                     "integ_alg": 0,
1926                     "dh_group": 1,
1927                 },
1928                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1929                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1930                 "udp_encap": True,
1931                 "ipsec_over_udp_port": 4501,
1932                 "lifetime_data": {
1933                     "lifetime": 123,
1934                     "lifetime_maxdata": 20192,
1935                     "lifetime_jitter": 9,
1936                     "handover": 132,
1937                 },
1938             },
1939             "p2": {
1940                 "name": "p2",
1941                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1942                 "rem_id": ("ip6-addr", b"abcd::1"),
1943                 "loc_ts": loc_ts6,
1944                 "rem_ts": rem_ts6,
1945                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1946                 "ike_ts": {
1947                     "crypto_alg": 12,
1948                     "crypto_key_size": 16,
1949                     "integ_alg": 3,
1950                     "dh_group": 3,
1951                 },
1952                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1953                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1954                 "udp_encap": False,
1955                 "ipsec_over_udp_port": 4600,
1956                 "tun_itf": 0,
1957             },
1958         }
1959         self.p1 = self.configure_profile(conf["p1"])
1960         self.p2 = self.configure_profile(conf["p2"])
1961
1962         r = self.vapi.ikev2_profile_dump()
1963         self.assertEqual(len(r), 2)
1964         self.verify_profile(r[0].profile, conf["p1"])
1965         self.verify_profile(r[1].profile, conf["p2"])
1966
1967     def verify_id(self, api_id, cfg_id):
1968         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1969         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1970
1971     def verify_ts(self, api_ts, cfg_ts):
1972         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1973         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1974         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1975         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1976         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1977
1978     def verify_responder(self, api_r, cfg_r):
1979         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1980         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1981
1982     def verify_transforms(self, api_ts, cfg_ts):
1983         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1984         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1985         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1986
1987     def verify_ike_transforms(self, api_ts, cfg_ts):
1988         self.verify_transforms(api_ts, cfg_ts)
1989         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1990
1991     def verify_esp_transforms(self, api_ts, cfg_ts):
1992         self.verify_transforms(api_ts, cfg_ts)
1993
1994     def verify_auth(self, api_auth, cfg_auth):
1995         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1996         self.assertEqual(api_auth.data, cfg_auth["data"])
1997         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1998
1999     def verify_lifetime_data(self, p, ld):
2000         self.assertEqual(p.lifetime, ld["lifetime"])
2001         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
2002         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
2003         self.assertEqual(p.handover, ld["handover"])
2004
2005     def verify_profile(self, ap, cp):
2006         self.assertEqual(ap.name, cp["name"])
2007         self.assertEqual(ap.udp_encap, cp["udp_encap"])
2008         self.verify_id(ap.loc_id, cp["loc_id"])
2009         self.verify_id(ap.rem_id, cp["rem_id"])
2010         self.verify_ts(ap.loc_ts, cp["loc_ts"])
2011         self.verify_ts(ap.rem_ts, cp["rem_ts"])
2012         self.verify_responder(ap.responder, cp["responder"])
2013         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
2014         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
2015         self.verify_auth(ap.auth, cp["auth"])
2016         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
2017         self.assertTrue(natt_dis == ap.natt_disabled)
2018
2019         if "lifetime_data" in cp:
2020             self.verify_lifetime_data(ap, cp["lifetime_data"])
2021         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
2022         if "tun_itf" in cp:
2023             self.assertEqual(ap.tun_itf, cp["tun_itf"])
2024         else:
2025             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
2026
2027
2028 @tag_fixme_vpp_workers
2029 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
2030     """test responder - responder behind NAT"""
2031
2032     IKE_NODE_SUFFIX = "ip4-natt"
2033
2034     def config_tc(self):
2035         self.config_params({"r_natt": True})
2036
2037
2038 @tag_fixme_vpp_workers
2039 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
2040     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
2041
2042     def config_tc(self):
2043         self.config_params(
2044             {
2045                 "i_natt": True,
2046                 "is_initiator": False,  # seen from test case perspective
2047                 # thus vpp is initiator
2048                 "responder": {
2049                     "sw_if_index": self.pg0.sw_if_index,
2050                     "addr": self.pg0.remote_ip4,
2051                 },
2052                 "ike-crypto": ("AES-GCM-16ICV", 32),
2053                 "ike-integ": "NULL",
2054                 "ike-dh": "3072MODPgr",
2055                 "ike_transforms": {
2056                     "crypto_alg": 20,  # "aes-gcm-16"
2057                     "crypto_key_size": 256,
2058                     "dh_group": 15,  # "modp-3072"
2059                 },
2060                 "esp_transforms": {
2061                     "crypto_alg": 12,  # "aes-cbc"
2062                     "crypto_key_size": 256,
2063                     # "hmac-sha2-256-128"
2064                     "integ_alg": 12,
2065                 },
2066             }
2067         )
2068
2069
2070 @tag_fixme_vpp_workers
2071 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
2072     """test ikev2 initiator - pre shared key auth"""
2073
2074     def config_tc(self):
2075         self.config_params(
2076             {
2077                 "is_initiator": False,  # seen from test case perspective
2078                 # thus vpp is initiator
2079                 "ike-crypto": ("AES-GCM-16ICV", 32),
2080                 "ike-integ": "NULL",
2081                 "ike-dh": "3072MODPgr",
2082                 "ike_transforms": {
2083                     "crypto_alg": 20,  # "aes-gcm-16"
2084                     "crypto_key_size": 256,
2085                     "dh_group": 15,  # "modp-3072"
2086                 },
2087                 "esp_transforms": {
2088                     "crypto_alg": 12,  # "aes-cbc"
2089                     "crypto_key_size": 256,
2090                     # "hmac-sha2-256-128"
2091                     "integ_alg": 12,
2092                 },
2093                 "responder_hostname": {
2094                     "hostname": "vpp.responder.org",
2095                     "sw_if_index": self.pg0.sw_if_index,
2096                 },
2097             }
2098         )
2099
2100
2101 @tag_fixme_vpp_workers
2102 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
2103     """test initiator - request window size (1)"""
2104
2105     def rekey_respond(self, req, update_child_sa_data):
2106         ih = self.get_ike_header(req)
2107         plain = self.sa.hmac_and_decrypt(ih)
2108         sa = ikev2.IKEv2_payload_SA(plain)
2109         if update_child_sa_data:
2110             prop = sa[ikev2.IKEv2_payload_Proposal]
2111             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2112             self.sa.r_nonce = self.sa.i_nonce
2113             self.sa.child_sas[0].ispi = prop.SPI
2114             self.sa.child_sas[0].rspi = prop.SPI
2115             self.sa.calc_child_keys()
2116
2117         header = ikev2.IKEv2(
2118             init_SPI=self.sa.ispi,
2119             resp_SPI=self.sa.rspi,
2120             flags="Response",
2121             exch_type=36,
2122             id=ih.id,
2123             next_payload="Encrypted",
2124         )
2125         resp = self.encrypt_ike_msg(header, sa, "SA")
2126         packet = self.create_packet(
2127             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2128         )
2129         self.send_and_assert_no_replies(self.pg0, packet)
2130
2131     def test_initiator(self):
2132         super(TestInitiatorRequestWindowSize, self).test_initiator()
2133         self.pg0.enable_capture()
2134         self.pg_start()
2135         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2136         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2137         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2138         capture = self.pg0.get_capture(2)
2139
2140         # reply in reverse order
2141         self.rekey_respond(capture[1], True)
2142         self.rekey_respond(capture[0], False)
2143
2144         # verify that only the second request was accepted
2145         self.verify_ike_sas()
2146         self.verify_ike_sas_v2()
2147         self.verify_ipsec_sas(is_rekey=True)
2148
2149
2150 @tag_fixme_vpp_workers
2151 class TestInitiatorRekey(TestInitiatorPsk):
2152     """test ikev2 initiator - rekey"""
2153
2154     def rekey_from_initiator(self):
2155         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
2156         self.pg0.enable_capture()
2157         self.pg_start()
2158         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
2159         capture = self.pg0.get_capture(1)
2160         ih = self.get_ike_header(capture[0])
2161         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
2162         self.assertNotIn("Response", ih.flags)
2163         self.assertIn("Initiator", ih.flags)
2164         plain = self.sa.hmac_and_decrypt(ih)
2165         sa = ikev2.IKEv2_payload_SA(plain)
2166         prop = sa[ikev2.IKEv2_payload_Proposal]
2167         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2168         self.sa.r_nonce = self.sa.i_nonce
2169         # update new responder SPI
2170         self.sa.child_sas[0].ispi = prop.SPI
2171         self.sa.child_sas[0].rspi = prop.SPI
2172         self.sa.calc_child_keys()
2173         header = ikev2.IKEv2(
2174             init_SPI=self.sa.ispi,
2175             resp_SPI=self.sa.rspi,
2176             flags="Response",
2177             exch_type=36,
2178             id=ih.id,
2179             next_payload="Encrypted",
2180         )
2181         resp = self.encrypt_ike_msg(header, sa, "SA")
2182         packet = self.create_packet(
2183             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2184         )
2185         self.send_and_assert_no_replies(self.pg0, packet)
2186
2187     def test_initiator(self):
2188         super(TestInitiatorRekey, self).test_initiator()
2189         self.rekey_from_initiator()
2190         self.verify_ike_sas()
2191         self.verify_ike_sas_v2()
2192         self.verify_ipsec_sas(is_rekey=True)
2193
2194
2195 @tag_fixme_vpp_workers
2196 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2197     """test ikev2 initiator - delete IKE SA from responder"""
2198
2199     def config_tc(self):
2200         self.config_params(
2201             {
2202                 "del_sa_from_responder": True,
2203                 "is_initiator": False,  # seen from test case perspective
2204                 # thus vpp is initiator
2205                 "responder": {
2206                     "sw_if_index": self.pg0.sw_if_index,
2207                     "addr": self.pg0.remote_ip4,
2208                 },
2209                 "ike-crypto": ("AES-GCM-16ICV", 32),
2210                 "ike-integ": "NULL",
2211                 "ike-dh": "3072MODPgr",
2212                 "ike_transforms": {
2213                     "crypto_alg": 20,  # "aes-gcm-16"
2214                     "crypto_key_size": 256,
2215                     "dh_group": 15,  # "modp-3072"
2216                 },
2217                 "esp_transforms": {
2218                     "crypto_alg": 12,  # "aes-cbc"
2219                     "crypto_key_size": 256,
2220                     # "hmac-sha2-256-128"
2221                     "integ_alg": 12,
2222                 },
2223                 "no_idr_in_auth": True,
2224             }
2225         )
2226
2227
2228 @tag_fixme_vpp_workers
2229 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2230     """test ikev2 responder - initiator behind NAT"""
2231
2232     IKE_NODE_SUFFIX = "ip4-natt"
2233
2234     def config_tc(self):
2235         self.config_params({"i_natt": True})
2236
2237
2238 @tag_fixme_vpp_workers
2239 class TestResponderPsk(TemplateResponder, Ikev2Params):
2240     """test ikev2 responder - pre shared key auth"""
2241
2242     def config_tc(self):
2243         self.config_params()
2244
2245
2246 @tag_fixme_vpp_workers
2247 class TestResponderDpd(TestResponderPsk):
2248     """
2249     Dead peer detection test
2250     """
2251
2252     def config_tc(self):
2253         self.config_params({"dpd_disabled": False})
2254
2255     def tearDown(self):
2256         pass
2257
2258     def test_responder(self):
2259         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2260         super(TestResponderDpd, self).test_responder()
2261         self.pg0.enable_capture()
2262         self.pg_start()
2263         # capture empty request but don't reply
2264         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2265         ih = self.get_ike_header(capture[0])
2266         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2267         plain = self.sa.hmac_and_decrypt(ih)
2268         self.assertEqual(plain, b"")
2269         # wait for SA expiration
2270         time.sleep(3)
2271         ike_sas = self.vapi.ikev2_sa_dump()
2272         self.assertEqual(len(ike_sas), 0)
2273         ike_sas = self.vapi.ikev2_sa_v2_dump()
2274         self.assertEqual(len(ike_sas), 0)
2275         ipsec_sas = self.vapi.ipsec_sa_dump()
2276         self.assertEqual(len(ipsec_sas), 0)
2277
2278
2279 @tag_fixme_vpp_workers
2280 class TestResponderRekey(TestResponderPsk):
2281     """test ikev2 responder - rekey"""
2282
2283     WITH_KEX = False
2284
2285     def send_rekey_from_initiator(self):
2286         if self.WITH_KEX:
2287             self.sa.generate_dh_data()
2288         packet = self.create_rekey_request(kex=self.WITH_KEX)
2289         self.pg0.add_stream(packet)
2290         self.pg0.enable_capture()
2291         self.pg_start()
2292         capture = self.pg0.get_capture(1)
2293         return capture
2294
2295     def process_rekey_response(self, capture):
2296         ih = self.get_ike_header(capture[0])
2297         plain = self.sa.hmac_and_decrypt(ih)
2298         sa = ikev2.IKEv2_payload_SA(plain)
2299         prop = sa[ikev2.IKEv2_payload_Proposal]
2300         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2301         # update new responder SPI
2302         self.sa.child_sas[0].rspi = prop.SPI
2303         if self.WITH_KEX:
2304             self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2305             self.sa.complete_dh_data()
2306         self.sa.calc_child_keys(kex=self.WITH_KEX)
2307
2308     def test_responder(self):
2309         super(TestResponderRekey, self).test_responder()
2310         self.process_rekey_response(self.send_rekey_from_initiator())
2311         self.verify_ike_sas()
2312         self.verify_ike_sas_v2()
2313         self.verify_ipsec_sas(is_rekey=True)
2314         self.assert_counter(1, "rekey_req", "ip4")
2315         r = self.vapi.ikev2_sa_dump()
2316         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2317         r = self.vapi.ikev2_sa_v2_dump()
2318         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2319
2320
2321 @tag_fixme_vpp_workers
2322 class TestResponderRekeyRepeat(TestResponderRekey):
2323     """test ikev2 responder - rekey repeat"""
2324
2325     def test_responder(self):
2326         super(TestResponderRekeyRepeat, self).test_responder()
2327         # rekey request is not accepted until old IPsec SA is expired
2328         capture = self.send_rekey_from_initiator()
2329         ih = self.get_ike_header(capture[0])
2330         plain = self.sa.hmac_and_decrypt(ih)
2331         notify = ikev2.IKEv2_payload_Notify(plain)
2332         self.assertEqual(notify.type, 43)
2333         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2334         # rekey request is accepted after old IPsec SA was expired
2335         for _ in range(50):
2336             if len(self.vapi.ipsec_sa_dump()) != 3:
2337                 break
2338             time.sleep(0.2)
2339         else:
2340             self.fail("old IPsec SA not expired")
2341         self.process_rekey_response(self.send_rekey_from_initiator())
2342         self.verify_ike_sas()
2343         self.verify_ike_sas_v2()
2344         self.verify_ipsec_sas(sa_count=3)
2345
2346
2347 @tag_fixme_vpp_workers
2348 class TestResponderRekeyKEX(TestResponderRekey):
2349     """test ikev2 responder - rekey with key exchange"""
2350
2351     WITH_KEX = True
2352
2353
2354 @tag_fixme_vpp_workers
2355 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2356     """test ikev2 responder - rekey repeat with key exchange"""
2357
2358     WITH_KEX = True
2359
2360
2361 @tag_fixme_vpp_workers
2362 class TestResponderRekeySA(TestResponderPsk):
2363     """test ikev2 responder - rekey IKE SA"""
2364
2365     def send_rekey_from_initiator(self, newsa):
2366         packet = self.create_sa_rekey_request(
2367             spi=newsa.ispi,
2368             dh_pub_key=newsa.my_dh_pub_key,
2369             nonce=newsa.i_nonce,
2370         )
2371         self.pg0.add_stream(packet)
2372         self.pg0.enable_capture()
2373         self.pg_start()
2374         capture = self.pg0.get_capture(1)
2375         return capture
2376
2377     def process_rekey_response(self, newsa, capture):
2378         ih = self.get_ike_header(capture[0])
2379         plain = self.sa.hmac_and_decrypt(ih)
2380         sa = ikev2.IKEv2_payload_SA(plain)
2381         prop = sa[ikev2.IKEv2_payload_Proposal]
2382         newsa.rspi = prop.SPI
2383         newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2384         newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2385         newsa.complete_dh_data()
2386         newsa.calc_keys(sk_d=self.sa.sk_d)
2387         newsa.child_sas = self.sa.child_sas
2388         self.sa.child_sas = []
2389
2390     def test_responder(self):
2391         super(TestResponderRekeySA, self).test_responder()
2392         newsa = self.sa.clone(self, spi=os.urandom(8))
2393         newsa.generate_dh_data()
2394         capture = self.send_rekey_from_initiator(newsa)
2395         self.process_rekey_response(newsa, capture)
2396         self.verify_ike_sas(is_rekey=True)
2397         self.assert_counter(1, "rekey_req", "ip4")
2398         r = self.vapi.ikev2_sa_dump()
2399         self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
2400         self.initiate_del_sa_from_initiator()
2401         self.sa = newsa
2402         self.verify_ike_sas()
2403
2404
2405 @tag_fixme_ubuntu2204
2406 @tag_fixme_debian11
2407 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2408     """test ikev2 responder - non-default table id"""
2409
2410     @classmethod
2411     def setUpClass(cls):
2412         import scapy.contrib.ikev2 as _ikev2
2413
2414         globals()["ikev2"] = _ikev2
2415         super(IkePeer, cls).setUpClass()
2416         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2417             cls, "vpp"
2418         ):
2419             return
2420         cls.create_pg_interfaces(range(1))
2421         cls.vapi.cli("ip table add 1")
2422         cls.vapi.cli("set interface ip table pg0 1")
2423         for i in cls.pg_interfaces:
2424             i.admin_up()
2425             i.config_ip4()
2426             i.resolve_arp()
2427             i.config_ip6()
2428             i.resolve_ndp()
2429
2430     def config_tc(self):
2431         self.config_params({"dpd_disabled": False})
2432
2433     def test_responder(self):
2434         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2435         super(TestResponderVrf, self).test_responder()
2436         self.pg0.enable_capture()
2437         self.pg_start()
2438         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2439         ih = self.get_ike_header(capture[0])
2440         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2441         plain = self.sa.hmac_and_decrypt(ih)
2442         self.assertEqual(plain, b"")
2443
2444
2445 @tag_fixme_vpp_workers
2446 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2447     """test ikev2 responder - cert based auth"""
2448
2449     def config_tc(self):
2450         self.config_params(
2451             {
2452                 "udp_encap": True,
2453                 "auth": "rsa-sig",
2454                 "server-key": "server-key.pem",
2455                 "client-key": "client-key.pem",
2456                 "client-cert": "client-cert.pem",
2457                 "server-cert": "server-cert.pem",
2458             }
2459         )
2460
2461
2462 @tag_fixme_vpp_workers
2463 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2464     TemplateResponder, Ikev2Params
2465 ):
2466     """
2467     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2468     """
2469
2470     def config_tc(self):
2471         self.config_params(
2472             {
2473                 "ike-crypto": ("AES-CBC", 16),
2474                 "ike-integ": "SHA2-256-128",
2475                 "esp-crypto": ("AES-CBC", 24),
2476                 "esp-integ": "SHA2-384-192",
2477                 "ike-dh": "2048MODPgr",
2478                 "nonce": os.urandom(256),
2479                 "no_idr_in_auth": True,
2480             }
2481         )
2482
2483
2484 @tag_fixme_vpp_workers
2485 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2486     TemplateResponder, Ikev2Params
2487 ):
2488     """
2489     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2490     """
2491
2492     def config_tc(self):
2493         self.config_params(
2494             {
2495                 "ike-crypto": ("AES-CBC", 32),
2496                 "ike-integ": "SHA2-256-128",
2497                 "esp-crypto": ("AES-GCM-16ICV", 32),
2498                 "esp-integ": "NULL",
2499                 "ike-dh": "3072MODPgr",
2500             }
2501         )
2502
2503
2504 @tag_fixme_vpp_workers
2505 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2506     """
2507     IKE:AES_GCM_16_256
2508     """
2509
2510     IKE_NODE_SUFFIX = "ip6"
2511
2512     def config_tc(self):
2513         self.config_params(
2514             {
2515                 "del_sa_from_responder": True,
2516                 "ip6": True,
2517                 "natt": True,
2518                 "ike-crypto": ("AES-GCM-16ICV", 32),
2519                 "ike-integ": "NULL",
2520                 "ike-dh": "2048MODPgr",
2521                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2522                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2523             }
2524         )
2525
2526
2527 @tag_fixme_vpp_workers
2528 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2529     """
2530     Test for keep alive messages
2531     """
2532
2533     def send_empty_req_from_responder(self):
2534         packet = self.create_empty_request()
2535         self.pg0.add_stream(packet)
2536         self.pg0.enable_capture()
2537         self.pg_start()
2538         capture = self.pg0.get_capture(1)
2539         ih = self.get_ike_header(capture[0])
2540         self.assertEqual(ih.id, self.sa.msg_id)
2541         plain = self.sa.hmac_and_decrypt(ih)
2542         self.assertEqual(plain, b"")
2543         self.assert_counter(1, "keepalive", "ip4")
2544         r = self.vapi.ikev2_sa_dump()
2545         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2546         r = self.vapi.ikev2_sa_v2_dump()
2547         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2548
2549     def test_initiator(self):
2550         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2551         self.send_empty_req_from_responder()
2552
2553
2554 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2555     """malformed packet test"""
2556
2557     def tearDown(self):
2558         pass
2559
2560     def config_tc(self):
2561         self.config_params()
2562
2563     def create_ike_init_msg(self, length=None, payload=None):
2564         msg = ikev2.IKEv2(
2565             length=length,
2566             init_SPI="\x11" * 8,
2567             flags="Initiator",
2568             exch_type="IKE_SA_INIT",
2569         )
2570         if payload is not None:
2571             msg /= payload
2572         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2573
2574     def verify_bad_packet_length(self):
2575         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2576         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2577         self.assert_counter(self.pkt_count, "bad_length")
2578
2579     def verify_bad_sa_payload_length(self):
2580         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2581         ike_msg = self.create_ike_init_msg(payload=p)
2582         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2583         self.assert_counter(self.pkt_count, "malformed_packet")
2584
2585     def test_responder(self):
2586         self.pkt_count = 254
2587         self.verify_bad_packet_length()
2588         self.verify_bad_sa_payload_length()
2589
2590
2591 if __name__ == "__main__":
2592     unittest.main(testRunner=VppTestRunner)