30ee2b981102f605a7f85e55b4d5dcd27379d970
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import VppTestCase
23 from asfframework import (
24     tag_fixme_vpp_workers,
25     tag_fixme_ubuntu2204,
26     tag_fixme_debian11,
27     is_distro_ubuntu2204,
28     is_distro_debian11,
29     VppTestRunner,
30 )
31 from vpp_ikev2 import Profile, IDType, AuthMethod
32 from vpp_papi import VppEnum
33
34 try:
35     text_type = unicode
36 except NameError:
37     text_type = str
38
39 KEY_PAD = b"Key Pad for IKEv2"
40 SALT_SIZE = 4
41 GCM_ICV_SIZE = 16
42 GCM_IV_SIZE = 8
43
44
45 # defined in rfc3526
46 # tuple structure is (p, g, key_len)
47 DH = {
48     "2048MODPgr": (
49         long_converter(
50             """
51     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
52     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
53     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
54     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
55     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
56     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
57     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
58     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
59     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
60     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
61     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
62         ),
63         2,
64         256,
65     ),
66     "3072MODPgr": (
67         long_converter(
68             """
69     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
70     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
71     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
72     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
73     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
74     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
75     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
76     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
77     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
78     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
79     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
80     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
81     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
82     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
83     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
84     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
85         ),
86         2,
87         384,
88     ),
89 }
90
91
92 class CryptoAlgo(object):
93     def __init__(self, name, cipher, mode):
94         self.name = name
95         self.cipher = cipher
96         self.mode = mode
97         if self.cipher is not None:
98             self.bs = self.cipher.block_size // 8
99
100             if self.name == "AES-GCM-16ICV":
101                 self.iv_len = GCM_IV_SIZE
102             else:
103                 self.iv_len = self.bs
104
105     def encrypt(self, data, key, aad=None):
106         iv = os.urandom(self.iv_len)
107         if aad is None:
108             encryptor = Cipher(
109                 self.cipher(key), self.mode(iv), default_backend()
110             ).encryptor()
111             return iv + encryptor.update(data) + encryptor.finalize()
112         else:
113             salt = key[-SALT_SIZE:]
114             nonce = salt + iv
115             encryptor = Cipher(
116                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
117             ).encryptor()
118             encryptor.authenticate_additional_data(aad)
119             data = encryptor.update(data) + encryptor.finalize()
120             data += encryptor.tag[:GCM_ICV_SIZE]
121             return iv + data
122
123     def decrypt(self, data, key, aad=None, icv=None):
124         if aad is None:
125             iv = data[: self.iv_len]
126             ct = data[self.iv_len :]
127             decryptor = Cipher(
128                 algorithms.AES(key), self.mode(iv), default_backend()
129             ).decryptor()
130             return decryptor.update(ct) + decryptor.finalize()
131         else:
132             salt = key[-SALT_SIZE:]
133             nonce = salt + data[:GCM_IV_SIZE]
134             ct = data[GCM_IV_SIZE:]
135             key = key[:-SALT_SIZE]
136             decryptor = Cipher(
137                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
138             ).decryptor()
139             decryptor.authenticate_additional_data(aad)
140             return decryptor.update(ct) + decryptor.finalize()
141
142     def pad(self, data):
143         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
144         data = data + b"\x00" * (pad_len - 1)
145         return data + bytes([pad_len - 1])
146
147
148 class AuthAlgo(object):
149     def __init__(self, name, mac, mod, key_len, trunc_len=None):
150         self.name = name
151         self.mac = mac
152         self.mod = mod
153         self.key_len = key_len
154         self.trunc_len = trunc_len or key_len
155
156
157 CRYPTO_ALGOS = {
158     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
159     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
160     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
161 }
162
163 AUTH_ALGOS = {
164     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
165     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
166     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
167     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
168     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
169 }
170
171 PRF_ALGOS = {
172     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
173     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
174 }
175
176 CRYPTO_IDS = {
177     12: "AES-CBC",
178     20: "AES-GCM-16ICV",
179 }
180
181 INTEG_IDS = {
182     2: "HMAC-SHA1-96",
183     12: "SHA2-256-128",
184     13: "SHA2-384-192",
185     14: "SHA2-512-256",
186 }
187
188
189 class IKEv2ChildSA(object):
190     def __init__(self, local_ts, remote_ts, is_initiator):
191         spi = os.urandom(4)
192         if is_initiator:
193             self.ispi = spi
194             self.rspi = None
195         else:
196             self.rspi = spi
197             self.ispi = None
198         self.local_ts = local_ts
199         self.remote_ts = remote_ts
200
201
202 class IKEv2SA(object):
203     def __init__(
204         self,
205         test,
206         is_initiator=True,
207         i_id=None,
208         r_id=None,
209         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
210         id_type="fqdn",
211         nonce=None,
212         auth_data=None,
213         local_ts=None,
214         remote_ts=None,
215         auth_method="shared-key",
216         priv_key=None,
217         i_natt=False,
218         r_natt=False,
219         udp_encap=False,
220     ):
221         self.udp_encap = udp_encap
222         self.i_natt = i_natt
223         self.r_natt = r_natt
224         if i_natt or r_natt:
225             self.sport = 4500
226             self.dport = 4500
227         else:
228             self.sport = 500
229             self.dport = 500
230         self.msg_id = 0
231         self.dh_params = None
232         self.test = test
233         self.priv_key = priv_key
234         self.is_initiator = is_initiator
235         nonce = nonce or os.urandom(32)
236         self.auth_data = auth_data
237         self.i_id = i_id
238         self.r_id = r_id
239         if isinstance(id_type, str):
240             self.id_type = IDType.value(id_type)
241         else:
242             self.id_type = id_type
243         self.auth_method = auth_method
244         if self.is_initiator:
245             self.rspi = 8 * b"\x00"
246             self.ispi = spi
247             self.i_nonce = nonce
248         else:
249             self.rspi = spi
250             self.ispi = 8 * b"\x00"
251             self.r_nonce = nonce
252         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
253
254     def new_msg_id(self):
255         self.msg_id += 1
256         return self.msg_id
257
258     @property
259     def my_dh_pub_key(self):
260         if self.is_initiator:
261             return self.i_dh_data
262         return self.r_dh_data
263
264     @property
265     def peer_dh_pub_key(self):
266         if self.is_initiator:
267             return self.r_dh_data
268         return self.i_dh_data
269
270     @property
271     def natt(self):
272         return self.i_natt or self.r_natt
273
274     def compute_secret(self):
275         priv = self.dh_private_key
276         peer = self.peer_dh_pub_key
277         p, g, l = self.ike_group
278         return pow(
279             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
280         ).to_bytes(l, "big")
281
282     def generate_dh_data(self):
283         # generate DH keys
284         if self.ike_dh not in DH:
285             raise NotImplementedError("%s not in DH group" % self.ike_dh)
286
287         if self.dh_params is None:
288             dhg = DH[self.ike_dh]
289             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
290             self.dh_params = pn.parameters(default_backend())
291
292         priv = self.dh_params.generate_private_key()
293         pub = priv.public_key()
294         x = priv.private_numbers().x
295         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
296         y = pub.public_numbers().y
297
298         if self.is_initiator:
299             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
300         else:
301             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
302
303     def complete_dh_data(self):
304         self.dh_shared_secret = self.compute_secret()
305
306     def calc_child_keys(self, kex=False):
307         prf = self.ike_prf_alg.mod()
308         s = self.i_nonce + self.r_nonce
309         if kex:
310             s = self.dh_shared_secret + s
311         c = self.child_sas[0]
312
313         encr_key_len = self.esp_crypto_key_len
314         integ_key_len = self.esp_integ_alg.key_len
315         salt_len = 0 if integ_key_len else 4
316
317         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
318         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
319
320         pos = 0
321         c.sk_ei = keymat[pos : pos + encr_key_len]
322         pos += encr_key_len
323
324         if integ_key_len:
325             c.sk_ai = keymat[pos : pos + integ_key_len]
326             pos += integ_key_len
327         else:
328             c.salt_ei = keymat[pos : pos + salt_len]
329             pos += salt_len
330
331         c.sk_er = keymat[pos : pos + encr_key_len]
332         pos += encr_key_len
333
334         if integ_key_len:
335             c.sk_ar = keymat[pos : pos + integ_key_len]
336             pos += integ_key_len
337         else:
338             c.salt_er = keymat[pos : pos + salt_len]
339             pos += salt_len
340
341     def calc_prfplus(self, prf, key, seed, length):
342         r = b""
343         t = None
344         x = 1
345         while len(r) < length and x < 255:
346             if t is not None:
347                 s = t
348             else:
349                 s = b""
350             s = s + seed + bytes([x])
351             t = self.calc_prf(prf, key, s)
352             r = r + t
353             x = x + 1
354
355         if x == 255:
356             return None
357         return r
358
359     def calc_prf(self, prf, key, data):
360         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
361         h.update(data)
362         return h.finalize()
363
364     def calc_keys(self):
365         prf = self.ike_prf_alg.mod()
366         # SKEYSEED = prf(Ni | Nr, g^ir)
367         s = self.i_nonce + self.r_nonce
368         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
369
370         # calculate S = Ni | Nr | SPIi SPIr
371         s = s + self.ispi + self.rspi
372
373         prf_key_trunc = self.ike_prf_alg.trunc_len
374         encr_key_len = self.ike_crypto_key_len
375         tr_prf_key_len = self.ike_prf_alg.key_len
376         integ_key_len = self.ike_integ_alg.key_len
377         if integ_key_len == 0:
378             salt_size = 4
379         else:
380             salt_size = 0
381
382         l = (
383             prf_key_trunc
384             + integ_key_len * 2
385             + encr_key_len * 2
386             + tr_prf_key_len * 2
387             + salt_size * 2
388         )
389         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
390
391         pos = 0
392         self.sk_d = keymat[: pos + prf_key_trunc]
393         pos += prf_key_trunc
394
395         self.sk_ai = keymat[pos : pos + integ_key_len]
396         pos += integ_key_len
397         self.sk_ar = keymat[pos : pos + integ_key_len]
398         pos += integ_key_len
399
400         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
401         pos += encr_key_len + salt_size
402         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
403         pos += encr_key_len + salt_size
404
405         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
406         pos += tr_prf_key_len
407         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
408
409     def generate_authmsg(self, prf, packet):
410         if self.is_initiator:
411             id = self.i_id
412             nonce = self.r_nonce
413             key = self.sk_pi
414         else:
415             id = self.r_id
416             nonce = self.i_nonce
417             key = self.sk_pr
418         data = bytes([self.id_type, 0, 0, 0]) + id
419         id_hash = self.calc_prf(prf, key, data)
420         return packet + nonce + id_hash
421
422     def auth_init(self):
423         prf = self.ike_prf_alg.mod()
424         if self.is_initiator:
425             packet = self.init_req_packet
426         else:
427             packet = self.init_resp_packet
428         authmsg = self.generate_authmsg(prf, raw(packet))
429         if self.auth_method == "shared-key":
430             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
431             self.auth_data = self.calc_prf(prf, psk, authmsg)
432         elif self.auth_method == "rsa-sig":
433             self.auth_data = self.priv_key.sign(
434                 authmsg, padding.PKCS1v15(), hashes.SHA1()
435             )
436         else:
437             raise TypeError("unknown auth method type!")
438
439     def encrypt(self, data, aad=None):
440         data = self.ike_crypto_alg.pad(data)
441         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
442
443     @property
444     def peer_authkey(self):
445         if self.is_initiator:
446             return self.sk_ar
447         return self.sk_ai
448
449     @property
450     def my_authkey(self):
451         if self.is_initiator:
452             return self.sk_ai
453         return self.sk_ar
454
455     @property
456     def my_cryptokey(self):
457         if self.is_initiator:
458             return self.sk_ei
459         return self.sk_er
460
461     @property
462     def peer_cryptokey(self):
463         if self.is_initiator:
464             return self.sk_er
465         return self.sk_ei
466
467     def concat(self, alg, key_len):
468         return alg + "-" + str(key_len * 8)
469
470     @property
471     def vpp_ike_cypto_alg(self):
472         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
473
474     @property
475     def vpp_esp_cypto_alg(self):
476         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
477
478     def verify_hmac(self, ikemsg):
479         integ_trunc = self.ike_integ_alg.trunc_len
480         exp_hmac = ikemsg[-integ_trunc:]
481         data = ikemsg[:-integ_trunc]
482         computed_hmac = self.compute_hmac(
483             self.ike_integ_alg.mod(), self.peer_authkey, data
484         )
485         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
486
487     def compute_hmac(self, integ, key, data):
488         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
489         h.update(data)
490         return h.finalize()
491
492     def decrypt(self, data, aad=None, icv=None):
493         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
494
495     def hmac_and_decrypt(self, ike):
496         ep = ike[ikev2.IKEv2_payload_Encrypted]
497         if self.ike_crypto == "AES-GCM-16ICV":
498             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
499             ct = ep.load[:-GCM_ICV_SIZE]
500             tag = ep.load[-GCM_ICV_SIZE:]
501             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
502         else:
503             self.verify_hmac(raw(ike))
504             integ_trunc = self.ike_integ_alg.trunc_len
505
506             # remove ICV and decrypt payload
507             ct = ep.load[:-integ_trunc]
508             plain = self.decrypt(ct)
509         # remove padding
510         pad_len = plain[-1]
511         return plain[: -pad_len - 1]
512
513     def build_ts_addr(self, ts, version):
514         return {
515             "starting_address_v" + version: ts["start_addr"],
516             "ending_address_v" + version: ts["end_addr"],
517         }
518
519     def generate_ts(self, is_ip4):
520         c = self.child_sas[0]
521         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
522         if is_ip4:
523             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
524             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
525             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
526             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
527         else:
528             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
529             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
530             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
531             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
532
533         if self.is_initiator:
534             return ([ts1], [ts2])
535         return ([ts2], [ts1])
536
537     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
538         if crypto not in CRYPTO_ALGOS:
539             raise TypeError("unsupported encryption algo %r" % crypto)
540         self.ike_crypto = crypto
541         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
542         self.ike_crypto_key_len = crypto_key_len
543
544         if integ not in AUTH_ALGOS:
545             raise TypeError("unsupported auth algo %r" % integ)
546         self.ike_integ = None if integ == "NULL" else integ
547         self.ike_integ_alg = AUTH_ALGOS[integ]
548
549         if prf not in PRF_ALGOS:
550             raise TypeError("unsupported prf algo %r" % prf)
551         self.ike_prf = prf
552         self.ike_prf_alg = PRF_ALGOS[prf]
553         self.ike_dh = dh
554         self.ike_group = DH[self.ike_dh]
555
556     def set_esp_props(self, crypto, crypto_key_len, integ):
557         self.esp_crypto_key_len = crypto_key_len
558         if crypto not in CRYPTO_ALGOS:
559             raise TypeError("unsupported encryption algo %r" % crypto)
560         self.esp_crypto = crypto
561         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
562
563         if integ not in AUTH_ALGOS:
564             raise TypeError("unsupported auth algo %r" % integ)
565         self.esp_integ = None if integ == "NULL" else integ
566         self.esp_integ_alg = AUTH_ALGOS[integ]
567
568     def crypto_attr(self, key_len):
569         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
570             return (0x800E << 16 | key_len << 3, 12)
571         else:
572             raise Exception("unsupported attribute type")
573
574     def ike_crypto_attr(self):
575         return self.crypto_attr(self.ike_crypto_key_len)
576
577     def esp_crypto_attr(self):
578         return self.crypto_attr(self.esp_crypto_key_len)
579
580     def compute_nat_sha1(self, ip, port, rspi=None):
581         if rspi is None:
582             rspi = self.rspi
583         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
584         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
585         digest.update(data)
586         return digest.finalize()
587
588
589 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
590 class IkePeer(VppTestCase):
591     """common class for initiator and responder"""
592
593     @classmethod
594     def setUpClass(cls):
595         import scapy.contrib.ikev2 as _ikev2
596
597         globals()["ikev2"] = _ikev2
598         super(IkePeer, cls).setUpClass()
599         cls.create_pg_interfaces(range(2))
600         for i in cls.pg_interfaces:
601             i.admin_up()
602             i.config_ip4()
603             i.resolve_arp()
604             i.config_ip6()
605             i.resolve_ndp()
606
607     @classmethod
608     def tearDownClass(cls):
609         super(IkePeer, cls).tearDownClass()
610
611     def tearDown(self):
612         super(IkePeer, self).tearDown()
613         if self.del_sa_from_responder:
614             self.initiate_del_sa_from_responder()
615         else:
616             self.initiate_del_sa_from_initiator()
617         r = self.vapi.ikev2_sa_dump()
618         self.assertEqual(len(r), 0)
619         sas = self.vapi.ipsec_sa_dump()
620         self.assertEqual(len(sas), 0)
621         self.p.remove_vpp_config()
622         self.assertIsNone(self.p.query_vpp_config())
623
624     def setUp(self):
625         super(IkePeer, self).setUp()
626         self.config_tc()
627         self.p.add_vpp_config()
628         self.assertIsNotNone(self.p.query_vpp_config())
629         if self.sa.is_initiator:
630             self.sa.generate_dh_data()
631         self.vapi.cli("ikev2 set logging level 4")
632         self.vapi.cli("event-lo clear")
633
634     def assert_counter(self, count, name, version="ip4"):
635         node_name = "/err/ikev2-%s/" % version + name
636         self.assertEqual(count, self.statistics.get_err_counter(node_name))
637
638     def create_rekey_request(self, kex=False):
639         sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
640         header = ikev2.IKEv2(
641             init_SPI=self.sa.ispi,
642             resp_SPI=self.sa.rspi,
643             id=self.sa.new_msg_id(),
644             flags="Initiator",
645             exch_type="CREATE_CHILD_SA",
646         )
647
648         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
649         return self.create_packet(
650             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
651         )
652
653     def create_empty_request(self):
654         header = ikev2.IKEv2(
655             init_SPI=self.sa.ispi,
656             resp_SPI=self.sa.rspi,
657             id=self.sa.new_msg_id(),
658             flags="Initiator",
659             exch_type="INFORMATIONAL",
660             next_payload="Encrypted",
661         )
662
663         msg = self.encrypt_ike_msg(header, b"", None)
664         return self.create_packet(
665             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
666         )
667
668     def create_packet(
669         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
670     ):
671         if use_ip6:
672             src_ip = src_if.remote_ip6
673             dst_ip = src_if.local_ip6
674             ip_layer = IPv6
675         else:
676             src_ip = src_if.remote_ip4
677             dst_ip = src_if.local_ip4
678             ip_layer = IP
679         res = (
680             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
681             / ip_layer(src=src_ip, dst=dst_ip)
682             / UDP(sport=sport, dport=dport)
683         )
684         if natt:
685             # insert non ESP marker
686             res = res / Raw(b"\x00" * 4)
687         return res / msg
688
689     def verify_udp(self, udp):
690         self.assertEqual(udp.sport, self.sa.sport)
691         self.assertEqual(udp.dport, self.sa.dport)
692
693     def get_ike_header(self, packet):
694         try:
695             ih = packet[ikev2.IKEv2]
696             ih = self.verify_and_remove_non_esp_marker(ih)
697         except IndexError as e:
698             # this is a workaround for getting IKEv2 layer as both ikev2 and
699             # ipsec register for port 4500
700             esp = packet[ESP]
701             ih = self.verify_and_remove_non_esp_marker(esp)
702         self.assertEqual(ih.version, 0x20)
703         self.assertNotIn("Version", ih.flags)
704         return ih
705
706     def verify_and_remove_non_esp_marker(self, packet):
707         if self.sa.natt:
708             # if we are in nat traversal mode check for non esp marker
709             # and remove it
710             data = raw(packet)
711             self.assertEqual(data[:4], b"\x00" * 4)
712             return ikev2.IKEv2(data[4:])
713         else:
714             return packet
715
716     def encrypt_ike_msg(self, header, plain, first_payload):
717         if self.sa.ike_crypto == "AES-GCM-16ICV":
718             data = self.sa.ike_crypto_alg.pad(raw(plain))
719             plen = (
720                 len(data)
721                 + GCM_IV_SIZE
722                 + GCM_ICV_SIZE
723                 + len(ikev2.IKEv2_payload_Encrypted())
724             )
725             tlen = plen + len(ikev2.IKEv2())
726
727             # prepare aad data
728             sk_p = ikev2.IKEv2_payload_Encrypted(
729                 next_payload=first_payload, length=plen
730             )
731             header.length = tlen
732             res = header / sk_p
733             encr = self.sa.encrypt(raw(plain), raw(res))
734             sk_p = ikev2.IKEv2_payload_Encrypted(
735                 next_payload=first_payload, length=plen, load=encr
736             )
737             res = header / sk_p
738         else:
739             encr = self.sa.encrypt(raw(plain))
740             trunc_len = self.sa.ike_integ_alg.trunc_len
741             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
742             tlen = plen + len(ikev2.IKEv2())
743
744             sk_p = ikev2.IKEv2_payload_Encrypted(
745                 next_payload=first_payload, length=plen, load=encr
746             )
747             header.length = tlen
748             res = header / sk_p
749
750             integ_data = raw(res)
751             hmac_data = self.sa.compute_hmac(
752                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
753             )
754             res = res / Raw(hmac_data[:trunc_len])
755         assert len(res) == tlen
756         return res
757
758     def verify_udp_encap(self, ipsec_sa):
759         e = VppEnum.vl_api_ipsec_sad_flags_t
760         if self.sa.udp_encap or self.sa.natt:
761             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
762         else:
763             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
764
765     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
766         sas = self.vapi.ipsec_sa_dump()
767         if sa_count is None:
768             if is_rekey:
769                 # after rekey there is a short period of time in which old
770                 # inbound SA is still present
771                 sa_count = 3
772             else:
773                 sa_count = 2
774         self.assertEqual(len(sas), sa_count)
775         if self.sa.is_initiator:
776             if is_rekey:
777                 sa0 = sas[0].entry
778                 sa1 = sas[2].entry
779             else:
780                 sa0 = sas[0].entry
781                 sa1 = sas[1].entry
782         else:
783             if is_rekey:
784                 sa0 = sas[2].entry
785                 sa1 = sas[0].entry
786             else:
787                 sa1 = sas[0].entry
788                 sa0 = sas[1].entry
789
790         c = self.sa.child_sas[0]
791
792         self.verify_udp_encap(sa0)
793         self.verify_udp_encap(sa1)
794         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
795         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
796         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
797
798         if self.sa.esp_integ is None:
799             vpp_integ_alg = 0
800         else:
801             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
802         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
803         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
804
805         # verify crypto keys
806         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
807         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
808         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
809         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
810
811         # verify integ keys
812         if vpp_integ_alg:
813             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
814             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
815             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
816             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
817         else:
818             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
819             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
820
821     def verify_keymat(self, api_keys, keys, name):
822         km = getattr(keys, name)
823         api_km = getattr(api_keys, name)
824         api_km_len = getattr(api_keys, name + "_len")
825         self.assertEqual(len(km), api_km_len)
826         self.assertEqual(km, api_km[:api_km_len])
827
828     def verify_id(self, api_id, exp_id):
829         self.assertEqual(api_id.type, IDType.value(exp_id.type))
830         self.assertEqual(api_id.data_len, exp_id.data_len)
831         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
832
833     def verify_ike_sas(self):
834         r = self.vapi.ikev2_sa_dump()
835         self.assertEqual(len(r), 1)
836         sa = r[0].sa
837         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
838         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
839         if self.ip6:
840             if self.sa.is_initiator:
841                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
842                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
843             else:
844                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
845                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
846         else:
847             if self.sa.is_initiator:
848                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
849                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
850             else:
851                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
852                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
853         self.verify_keymat(sa.keys, self.sa, "sk_d")
854         self.verify_keymat(sa.keys, self.sa, "sk_ai")
855         self.verify_keymat(sa.keys, self.sa, "sk_ar")
856         self.verify_keymat(sa.keys, self.sa, "sk_ei")
857         self.verify_keymat(sa.keys, self.sa, "sk_er")
858         self.verify_keymat(sa.keys, self.sa, "sk_pi")
859         self.verify_keymat(sa.keys, self.sa, "sk_pr")
860
861         self.assertEqual(sa.i_id.type, self.sa.id_type)
862         self.assertEqual(sa.r_id.type, self.sa.id_type)
863         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
864         self.assertEqual(sa.r_id.data_len, len(self.idr))
865         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
866         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
867
868         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
869         self.assertEqual(len(r), 1)
870         csa = r[0].child_sa
871         self.assertEqual(csa.sa_index, sa.sa_index)
872         c = self.sa.child_sas[0]
873         if hasattr(c, "sk_ai"):
874             self.verify_keymat(csa.keys, c, "sk_ai")
875             self.verify_keymat(csa.keys, c, "sk_ar")
876         self.verify_keymat(csa.keys, c, "sk_ei")
877         self.verify_keymat(csa.keys, c, "sk_er")
878         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
879         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
880
881         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
882         tsi = tsi[0]
883         tsr = tsr[0]
884         r = self.vapi.ikev2_traffic_selector_dump(
885             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
886         )
887         self.assertEqual(len(r), 1)
888         ts = r[0].ts
889         self.verify_ts(r[0].ts, tsi[0], True)
890
891         r = self.vapi.ikev2_traffic_selector_dump(
892             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
893         )
894         self.assertEqual(len(r), 1)
895         self.verify_ts(r[0].ts, tsr[0], False)
896
897         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
898         self.verify_nonce(n, self.sa.i_nonce)
899         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
900         self.verify_nonce(n, self.sa.r_nonce)
901
902     def verify_nonce(self, api_nonce, nonce):
903         self.assertEqual(api_nonce.data_len, len(nonce))
904         self.assertEqual(api_nonce.nonce, nonce)
905
906     def verify_ts(self, api_ts, ts, is_initiator):
907         if is_initiator:
908             self.assertTrue(api_ts.is_local)
909         else:
910             self.assertFalse(api_ts.is_local)
911
912         if self.p.ts_is_ip4:
913             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
914             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
915         else:
916             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
917             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
918         self.assertEqual(api_ts.start_port, ts.start_port)
919         self.assertEqual(api_ts.end_port, ts.end_port)
920         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
921
922
923 class TemplateInitiator(IkePeer):
924     """initiator test template"""
925
926     def initiate_del_sa_from_initiator(self):
927         ispi = int.from_bytes(self.sa.ispi, "little")
928         self.pg0.enable_capture()
929         self.pg_start()
930         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
931         capture = self.pg0.get_capture(1)
932         ih = self.get_ike_header(capture[0])
933         self.assertNotIn("Response", ih.flags)
934         self.assertIn("Initiator", ih.flags)
935         self.assertEqual(ih.init_SPI, self.sa.ispi)
936         self.assertEqual(ih.resp_SPI, self.sa.rspi)
937         plain = self.sa.hmac_and_decrypt(ih)
938         d = ikev2.IKEv2_payload_Delete(plain)
939         self.assertEqual(d.proto, 1)  # proto=IKEv2
940         header = ikev2.IKEv2(
941             init_SPI=self.sa.ispi,
942             resp_SPI=self.sa.rspi,
943             flags="Response",
944             exch_type="INFORMATIONAL",
945             id=ih.id,
946             next_payload="Encrypted",
947         )
948         resp = self.encrypt_ike_msg(header, b"", None)
949         self.send_and_assert_no_replies(self.pg0, resp)
950
951     def verify_del_sa(self, packet):
952         ih = self.get_ike_header(packet)
953         self.assertEqual(ih.id, self.sa.msg_id)
954         self.assertEqual(ih.exch_type, 37)  # exchange informational
955         self.assertIn("Response", ih.flags)
956         self.assertIn("Initiator", ih.flags)
957         plain = self.sa.hmac_and_decrypt(ih)
958         self.assertEqual(plain, b"")
959
960     def initiate_del_sa_from_responder(self):
961         header = ikev2.IKEv2(
962             init_SPI=self.sa.ispi,
963             resp_SPI=self.sa.rspi,
964             exch_type="INFORMATIONAL",
965             id=self.sa.new_msg_id(),
966         )
967         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
968         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
969         packet = self.create_packet(
970             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
971         )
972         self.pg0.add_stream(packet)
973         self.pg0.enable_capture()
974         self.pg_start()
975         capture = self.pg0.get_capture(1)
976         self.verify_del_sa(capture[0])
977
978     @staticmethod
979     def find_notify_payload(packet, notify_type):
980         n = packet[ikev2.IKEv2_payload_Notify]
981         while n is not None:
982             if n.type == notify_type:
983                 return n
984             n = n.payload
985         return None
986
987     def verify_nat_detection(self, packet):
988         if self.ip6:
989             iph = packet[IPv6]
990         else:
991             iph = packet[IP]
992         udp = packet[UDP]
993
994         # NAT_DETECTION_SOURCE_IP
995         s = self.find_notify_payload(packet, 16388)
996         self.assertIsNotNone(s)
997         src_sha = self.sa.compute_nat_sha1(
998             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
999         )
1000         self.assertEqual(s.load, src_sha)
1001
1002         # NAT_DETECTION_DESTINATION_IP
1003         s = self.find_notify_payload(packet, 16389)
1004         self.assertIsNotNone(s)
1005         dst_sha = self.sa.compute_nat_sha1(
1006             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1007         )
1008         self.assertEqual(s.load, dst_sha)
1009
1010     def verify_sa_init_request(self, packet):
1011         udp = packet[UDP]
1012         self.sa.dport = udp.sport
1013         ih = packet[ikev2.IKEv2]
1014         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1015         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1016         self.sa.ispi = ih.init_SPI
1017         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1018         self.assertIn("Initiator", ih.flags)
1019         self.assertNotIn("Response", ih.flags)
1020         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1021         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1022
1023         prop = packet[ikev2.IKEv2_payload_Proposal]
1024         self.assertEqual(prop.proto, 1)  # proto = ikev2
1025         self.assertEqual(prop.proposal, 1)
1026         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1027         self.assertEqual(
1028             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1029         )
1030         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1031         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1032         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1033         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1034
1035         self.verify_nat_detection(packet)
1036         self.sa.set_ike_props(
1037             crypto="AES-GCM-16ICV",
1038             crypto_key_len=32,
1039             integ="NULL",
1040             prf="PRF_HMAC_SHA2_256",
1041             dh="3072MODPgr",
1042         )
1043         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1044         self.sa.generate_dh_data()
1045         self.sa.complete_dh_data()
1046         self.sa.calc_keys()
1047
1048     def update_esp_transforms(self, trans, sa):
1049         while trans:
1050             if trans.transform_type == 1:  # ecryption
1051                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1052             elif trans.transform_type == 3:  # integrity
1053                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1054             trans = trans.payload
1055
1056     def verify_sa_auth_req(self, packet):
1057         udp = packet[UDP]
1058         self.sa.dport = udp.sport
1059         ih = self.get_ike_header(packet)
1060         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1061         self.assertEqual(ih.init_SPI, self.sa.ispi)
1062         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1063         self.assertIn("Initiator", ih.flags)
1064         self.assertNotIn("Response", ih.flags)
1065
1066         udp = packet[UDP]
1067         self.verify_udp(udp)
1068         self.assertEqual(ih.id, self.sa.msg_id + 1)
1069         self.sa.msg_id += 1
1070         plain = self.sa.hmac_and_decrypt(ih)
1071         idi = ikev2.IKEv2_payload_IDi(plain)
1072         self.assertEqual(idi.load, self.sa.i_id)
1073         if self.no_idr_auth:
1074             self.assertEqual(idi.next_payload, 39)  # AUTH
1075         else:
1076             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1077             self.assertEqual(idr.load, self.sa.r_id)
1078         prop = idi[ikev2.IKEv2_payload_Proposal]
1079         c = self.sa.child_sas[0]
1080         c.ispi = prop.SPI
1081         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1082
1083     def send_init_response(self):
1084         tr_attr = self.sa.ike_crypto_attr()
1085         trans = (
1086             ikev2.IKEv2_payload_Transform(
1087                 transform_type="Encryption",
1088                 transform_id=self.sa.ike_crypto,
1089                 length=tr_attr[1],
1090                 key_length=tr_attr[0],
1091             )
1092             / ikev2.IKEv2_payload_Transform(
1093                 transform_type="Integrity", transform_id=self.sa.ike_integ
1094             )
1095             / ikev2.IKEv2_payload_Transform(
1096                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1097             )
1098             / ikev2.IKEv2_payload_Transform(
1099                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1100             )
1101         )
1102         props = ikev2.IKEv2_payload_Proposal(
1103             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1104         )
1105
1106         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1107         if self.sa.natt:
1108             dst_address = b"\x0a\x0a\x0a\x0a"
1109         else:
1110             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1111         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1112         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1113
1114         self.sa.init_resp_packet = (
1115             ikev2.IKEv2(
1116                 init_SPI=self.sa.ispi,
1117                 resp_SPI=self.sa.rspi,
1118                 exch_type="IKE_SA_INIT",
1119                 flags="Response",
1120             )
1121             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1122             / ikev2.IKEv2_payload_KE(
1123                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1124             )
1125             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1126             / ikev2.IKEv2_payload_Notify(
1127                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1128             )
1129             / ikev2.IKEv2_payload_Notify(
1130                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1131             )
1132         )
1133
1134         ike_msg = self.create_packet(
1135             self.pg0,
1136             self.sa.init_resp_packet,
1137             self.sa.sport,
1138             self.sa.dport,
1139             False,
1140             self.ip6,
1141         )
1142         self.pg_send(self.pg0, ike_msg)
1143         capture = self.pg0.get_capture(1)
1144         self.verify_sa_auth_req(capture[0])
1145
1146     def initiate_sa_init(self):
1147         self.pg0.enable_capture()
1148         self.pg_start()
1149         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1150
1151         capture = self.pg0.get_capture(1)
1152         self.verify_sa_init_request(capture[0])
1153         self.send_init_response()
1154
1155     def send_auth_response(self):
1156         tr_attr = self.sa.esp_crypto_attr()
1157         trans = (
1158             ikev2.IKEv2_payload_Transform(
1159                 transform_type="Encryption",
1160                 transform_id=self.sa.esp_crypto,
1161                 length=tr_attr[1],
1162                 key_length=tr_attr[0],
1163             )
1164             / ikev2.IKEv2_payload_Transform(
1165                 transform_type="Integrity", transform_id=self.sa.esp_integ
1166             )
1167             / ikev2.IKEv2_payload_Transform(
1168                 transform_type="Extended Sequence Number", transform_id="No ESN"
1169             )
1170             / ikev2.IKEv2_payload_Transform(
1171                 transform_type="Extended Sequence Number", transform_id="ESN"
1172             )
1173         )
1174
1175         c = self.sa.child_sas[0]
1176         props = ikev2.IKEv2_payload_Proposal(
1177             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1178         )
1179
1180         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1181         plain = (
1182             ikev2.IKEv2_payload_IDi(
1183                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1184             )
1185             / ikev2.IKEv2_payload_IDr(
1186                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1187             )
1188             / ikev2.IKEv2_payload_AUTH(
1189                 next_payload="SA",
1190                 auth_type=AuthMethod.value(self.sa.auth_method),
1191                 load=self.sa.auth_data,
1192             )
1193             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1194             / ikev2.IKEv2_payload_TSi(
1195                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1196             )
1197             / ikev2.IKEv2_payload_TSr(
1198                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1199             )
1200             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1201         )
1202
1203         header = ikev2.IKEv2(
1204             init_SPI=self.sa.ispi,
1205             resp_SPI=self.sa.rspi,
1206             id=self.sa.new_msg_id(),
1207             flags="Response",
1208             exch_type="IKE_AUTH",
1209         )
1210
1211         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1212         packet = self.create_packet(
1213             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1214         )
1215         self.pg_send(self.pg0, packet)
1216
1217     def test_initiator(self):
1218         self.initiate_sa_init()
1219         self.sa.auth_init()
1220         self.sa.calc_child_keys()
1221         self.send_auth_response()
1222         self.verify_ike_sas()
1223
1224
1225 class TemplateResponder(IkePeer):
1226     """responder test template"""
1227
1228     def initiate_del_sa_from_responder(self):
1229         self.pg0.enable_capture()
1230         self.pg_start()
1231         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1232         capture = self.pg0.get_capture(1)
1233         ih = self.get_ike_header(capture[0])
1234         self.assertNotIn("Response", ih.flags)
1235         self.assertNotIn("Initiator", ih.flags)
1236         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1237         plain = self.sa.hmac_and_decrypt(ih)
1238         d = ikev2.IKEv2_payload_Delete(plain)
1239         self.assertEqual(d.proto, 1)  # proto=IKEv2
1240         self.assertEqual(ih.init_SPI, self.sa.ispi)
1241         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1242         header = ikev2.IKEv2(
1243             init_SPI=self.sa.ispi,
1244             resp_SPI=self.sa.rspi,
1245             flags="Initiator+Response",
1246             exch_type="INFORMATIONAL",
1247             id=ih.id,
1248             next_payload="Encrypted",
1249         )
1250         resp = self.encrypt_ike_msg(header, b"", None)
1251         self.send_and_assert_no_replies(self.pg0, resp)
1252
1253     def verify_del_sa(self, packet):
1254         ih = self.get_ike_header(packet)
1255         self.assertEqual(ih.id, self.sa.msg_id)
1256         self.assertEqual(ih.exch_type, 37)  # exchange informational
1257         self.assertIn("Response", ih.flags)
1258         self.assertNotIn("Initiator", ih.flags)
1259         self.assertEqual(ih.next_payload, 46)  # Encrypted
1260         self.assertEqual(ih.init_SPI, self.sa.ispi)
1261         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1262         plain = self.sa.hmac_and_decrypt(ih)
1263         self.assertEqual(plain, b"")
1264
1265     def initiate_del_sa_from_initiator(self):
1266         header = ikev2.IKEv2(
1267             init_SPI=self.sa.ispi,
1268             resp_SPI=self.sa.rspi,
1269             flags="Initiator",
1270             exch_type="INFORMATIONAL",
1271             id=self.sa.new_msg_id(),
1272         )
1273         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1274         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1275         packet = self.create_packet(
1276             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1277         )
1278         self.pg0.add_stream(packet)
1279         self.pg0.enable_capture()
1280         self.pg_start()
1281         capture = self.pg0.get_capture(1)
1282         self.verify_del_sa(capture[0])
1283
1284     def send_sa_init_req(self):
1285         tr_attr = self.sa.ike_crypto_attr()
1286         trans = (
1287             ikev2.IKEv2_payload_Transform(
1288                 transform_type="Encryption",
1289                 transform_id=self.sa.ike_crypto,
1290                 length=tr_attr[1],
1291                 key_length=tr_attr[0],
1292             )
1293             / ikev2.IKEv2_payload_Transform(
1294                 transform_type="Integrity", transform_id=self.sa.ike_integ
1295             )
1296             / ikev2.IKEv2_payload_Transform(
1297                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1298             )
1299             / ikev2.IKEv2_payload_Transform(
1300                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1301             )
1302         )
1303
1304         props = ikev2.IKEv2_payload_Proposal(
1305             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1306         )
1307
1308         next_payload = None if self.ip6 else "Notify"
1309
1310         self.sa.init_req_packet = (
1311             ikev2.IKEv2(
1312                 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1313             )
1314             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1315             / ikev2.IKEv2_payload_KE(
1316                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1317             )
1318             / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1319         )
1320
1321         if not self.ip6:
1322             if self.sa.i_natt:
1323                 src_address = b"\x0a\x0a\x0a\x01"
1324             else:
1325                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1326
1327             if self.sa.r_natt:
1328                 dst_address = b"\x0a\x0a\x0a\x0a"
1329             else:
1330                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1331
1332             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1333             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1334             nat_src_detection = ikev2.IKEv2_payload_Notify(
1335                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1336             )
1337             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1338                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1339             )
1340             self.sa.init_req_packet = (
1341                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1342             )
1343
1344         ike_msg = self.create_packet(
1345             self.pg0,
1346             self.sa.init_req_packet,
1347             self.sa.sport,
1348             self.sa.dport,
1349             self.sa.natt,
1350             self.ip6,
1351         )
1352         self.pg0.add_stream(ike_msg)
1353         self.pg0.enable_capture()
1354         self.pg_start()
1355         capture = self.pg0.get_capture(1)
1356         self.verify_sa_init(capture[0])
1357
1358     def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1359         tr_attr = self.sa.esp_crypto_attr()
1360         last_payload = last_payload or "Notify"
1361         trans_nb = 4
1362         trans = (
1363             ikev2.IKEv2_payload_Transform(
1364                 transform_type="Encryption",
1365                 transform_id=self.sa.esp_crypto,
1366                 length=tr_attr[1],
1367                 key_length=tr_attr[0],
1368             )
1369             / ikev2.IKEv2_payload_Transform(
1370                 transform_type="Integrity", transform_id=self.sa.esp_integ
1371             )
1372             / ikev2.IKEv2_payload_Transform(
1373                 transform_type="Extended Sequence Number", transform_id="No ESN"
1374             )
1375             / ikev2.IKEv2_payload_Transform(
1376                 transform_type="Extended Sequence Number", transform_id="ESN"
1377             )
1378         )
1379
1380         if kex:
1381             trans_nb += 1
1382             trans /= ikev2.IKEv2_payload_Transform(
1383                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1384             )
1385
1386         c = self.sa.child_sas[0]
1387         props = ikev2.IKEv2_payload_Proposal(
1388             proposal=1,
1389             proto="ESP",
1390             SPIsize=4,
1391             SPI=c.ispi,
1392             trans_nb=trans_nb,
1393             trans=trans,
1394         )
1395
1396         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1397         plain = (
1398             ikev2.IKEv2_payload_AUTH(
1399                 next_payload="SA",
1400                 auth_type=AuthMethod.value(self.sa.auth_method),
1401                 load=self.sa.auth_data,
1402             )
1403             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1404             / ikev2.IKEv2_payload_TSi(
1405                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1406             )
1407             / ikev2.IKEv2_payload_TSr(
1408                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1409             )
1410         )
1411
1412         if is_rekey:
1413             first_payload = "Nonce"
1414             if kex:
1415                 head = ikev2.IKEv2_payload_Nonce(
1416                     load=self.sa.i_nonce, next_payload="KE"
1417                 ) / ikev2.IKEv2_payload_KE(
1418                     group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1419                 )
1420             else:
1421                 head = ikev2.IKEv2_payload_Nonce(
1422                     load=self.sa.i_nonce, next_payload="SA"
1423                 )
1424             plain = (
1425                 head
1426                 / plain
1427                 / ikev2.IKEv2_payload_Notify(
1428                     type="REKEY_SA",
1429                     proto="ESP",
1430                     SPI=c.ispi,
1431                     length=8 + len(c.ispi),
1432                     next_payload="Notify",
1433                 )
1434                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1435             )
1436         else:
1437             first_payload = "IDi"
1438             if self.no_idr_auth:
1439                 ids = ikev2.IKEv2_payload_IDi(
1440                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1441                 )
1442             else:
1443                 ids = ikev2.IKEv2_payload_IDi(
1444                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1445                 ) / ikev2.IKEv2_payload_IDr(
1446                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1447                 )
1448             plain = ids / plain
1449         return plain, first_payload
1450
1451     def send_sa_auth(self):
1452         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1453         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1454         header = ikev2.IKEv2(
1455             init_SPI=self.sa.ispi,
1456             resp_SPI=self.sa.rspi,
1457             id=self.sa.new_msg_id(),
1458             flags="Initiator",
1459             exch_type="IKE_AUTH",
1460         )
1461
1462         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1463         packet = self.create_packet(
1464             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1465         )
1466         self.pg0.add_stream(packet)
1467         self.pg0.enable_capture()
1468         self.pg_start()
1469         capture = self.pg0.get_capture(1)
1470         self.verify_sa_auth_resp(capture[0])
1471
1472     def verify_sa_init(self, packet):
1473         ih = self.get_ike_header(packet)
1474
1475         self.assertEqual(ih.id, self.sa.msg_id)
1476         self.assertEqual(ih.exch_type, 34)
1477         self.assertIn("Response", ih.flags)
1478         self.assertEqual(ih.init_SPI, self.sa.ispi)
1479         self.assertNotEqual(ih.resp_SPI, 0)
1480         self.sa.rspi = ih.resp_SPI
1481         try:
1482             sa = ih[ikev2.IKEv2_payload_SA]
1483             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1484             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1485         except IndexError as e:
1486             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1487             self.logger.error(ih.show())
1488             raise
1489         self.sa.complete_dh_data()
1490         self.sa.calc_keys()
1491         self.sa.auth_init()
1492
1493     def verify_sa_auth_resp(self, packet):
1494         ike = self.get_ike_header(packet)
1495         udp = packet[UDP]
1496         self.verify_udp(udp)
1497         self.assertEqual(ike.id, self.sa.msg_id)
1498         plain = self.sa.hmac_and_decrypt(ike)
1499         idr = ikev2.IKEv2_payload_IDr(plain)
1500         prop = idr[ikev2.IKEv2_payload_Proposal]
1501         self.assertEqual(prop.SPIsize, 4)
1502         self.sa.child_sas[0].rspi = prop.SPI
1503         self.sa.calc_child_keys()
1504
1505     IKE_NODE_SUFFIX = "ip4"
1506
1507     def verify_counters(self):
1508         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1509         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1510         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1511
1512         r = self.vapi.ikev2_sa_dump()
1513         s = r[0].sa.stats
1514         self.assertEqual(1, s.n_sa_auth_req)
1515         self.assertEqual(1, s.n_sa_init_req)
1516
1517     def test_responder(self):
1518         self.send_sa_init_req()
1519         self.send_sa_auth()
1520         self.verify_ipsec_sas()
1521         self.verify_ike_sas()
1522         self.verify_counters()
1523
1524
1525 class Ikev2Params(object):
1526     def config_params(self, params={}):
1527         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1528         ei = VppEnum.vl_api_ipsec_integ_alg_t
1529         self.vpp_enums = {
1530             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1531             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1532             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1533             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1534             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1535             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1536             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1537             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1538             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1539             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1540         }
1541
1542         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1543         if dpd_disabled:
1544             self.vapi.cli("ikev2 dpd disable")
1545         self.del_sa_from_responder = (
1546             False
1547             if "del_sa_from_responder" not in params
1548             else params["del_sa_from_responder"]
1549         )
1550         i_natt = False if "i_natt" not in params else params["i_natt"]
1551         r_natt = False if "r_natt" not in params else params["r_natt"]
1552         self.p = Profile(self, "pr1")
1553         self.ip6 = False if "ip6" not in params else params["ip6"]
1554
1555         if "auth" in params and params["auth"] == "rsa-sig":
1556             auth_method = "rsa-sig"
1557             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1558             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1559
1560             client_file = work_dir + params["client-cert"]
1561             server_pem = open(work_dir + params["server-cert"]).read()
1562             client_priv = open(work_dir + params["client-key"]).read()
1563             client_priv = load_pem_private_key(
1564                 str.encode(client_priv), None, default_backend()
1565             )
1566             self.peer_cert = x509.load_pem_x509_certificate(
1567                 str.encode(server_pem), default_backend()
1568             )
1569             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1570             auth_data = None
1571         else:
1572             auth_data = b"$3cr3tpa$$w0rd"
1573             self.p.add_auth(method="shared-key", data=auth_data)
1574             auth_method = "shared-key"
1575             client_priv = None
1576
1577         is_init = True if "is_initiator" not in params else params["is_initiator"]
1578         self.no_idr_auth = params.get("no_idr_in_auth", False)
1579
1580         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1581         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1582         r_id = self.idr = idr["data"]
1583         i_id = self.idi = idi["data"]
1584         if is_init:
1585             # scapy is initiator, VPP is responder
1586             self.p.add_local_id(**idr)
1587             self.p.add_remote_id(**idi)
1588             if self.no_idr_auth:
1589                 r_id = None
1590         else:
1591             # VPP is initiator, scapy is responder
1592             self.p.add_local_id(**idi)
1593             if not self.no_idr_auth:
1594                 self.p.add_remote_id(**idr)
1595
1596         loc_ts = (
1597             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1598             if "loc_ts" not in params
1599             else params["loc_ts"]
1600         )
1601         rem_ts = (
1602             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1603             if "rem_ts" not in params
1604             else params["rem_ts"]
1605         )
1606         self.p.add_local_ts(**loc_ts)
1607         self.p.add_remote_ts(**rem_ts)
1608         if "responder" in params:
1609             self.p.add_responder(params["responder"])
1610         if "ike_transforms" in params:
1611             self.p.add_ike_transforms(params["ike_transforms"])
1612         if "esp_transforms" in params:
1613             self.p.add_esp_transforms(params["esp_transforms"])
1614
1615         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1616         if udp_encap:
1617             self.p.set_udp_encap(True)
1618
1619         if "responder_hostname" in params:
1620             hn = params["responder_hostname"]
1621             self.p.add_responder_hostname(hn)
1622
1623             # configure static dns record
1624             self.vapi.dns_name_server_add_del(
1625                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1626             )
1627             self.vapi.dns_enable_disable(enable=1)
1628
1629             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1630             self.vapi.cli(cmd)
1631
1632         self.sa = IKEv2SA(
1633             self,
1634             i_id=i_id,
1635             r_id=r_id,
1636             is_initiator=is_init,
1637             id_type=self.p.local_id["id_type"],
1638             i_natt=i_natt,
1639             r_natt=r_natt,
1640             priv_key=client_priv,
1641             auth_method=auth_method,
1642             nonce=params.get("nonce"),
1643             auth_data=auth_data,
1644             udp_encap=udp_encap,
1645             local_ts=self.p.remote_ts,
1646             remote_ts=self.p.local_ts,
1647         )
1648
1649         if is_init:
1650             ike_crypto = (
1651                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1652             )
1653             ike_integ = (
1654                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1655             )
1656             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1657
1658             esp_crypto = (
1659                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1660             )
1661             esp_integ = (
1662                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1663             )
1664
1665             self.sa.set_ike_props(
1666                 crypto=ike_crypto[0],
1667                 crypto_key_len=ike_crypto[1],
1668                 integ=ike_integ,
1669                 prf="PRF_HMAC_SHA2_256",
1670                 dh=ike_dh,
1671             )
1672             self.sa.set_esp_props(
1673                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1674             )
1675
1676
1677 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1678 class TestApi(VppTestCase):
1679     """Test IKEV2 API"""
1680
1681     @classmethod
1682     def setUpClass(cls):
1683         super(TestApi, cls).setUpClass()
1684
1685     @classmethod
1686     def tearDownClass(cls):
1687         super(TestApi, cls).tearDownClass()
1688
1689     def tearDown(self):
1690         super(TestApi, self).tearDown()
1691         self.p1.remove_vpp_config()
1692         self.p2.remove_vpp_config()
1693         r = self.vapi.ikev2_profile_dump()
1694         self.assertEqual(len(r), 0)
1695
1696     def configure_profile(self, cfg):
1697         p = Profile(self, cfg["name"])
1698         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1699         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1700         p.add_local_ts(**cfg["loc_ts"])
1701         p.add_remote_ts(**cfg["rem_ts"])
1702         p.add_responder(cfg["responder"])
1703         p.add_ike_transforms(cfg["ike_ts"])
1704         p.add_esp_transforms(cfg["esp_ts"])
1705         p.add_auth(**cfg["auth"])
1706         p.set_udp_encap(cfg["udp_encap"])
1707         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1708         if "lifetime_data" in cfg:
1709             p.set_lifetime_data(cfg["lifetime_data"])
1710         if "tun_itf" in cfg:
1711             p.set_tunnel_interface(cfg["tun_itf"])
1712         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1713             p.disable_natt()
1714         p.add_vpp_config()
1715         return p
1716
1717     def test_profile_api(self):
1718         """test profile dump API"""
1719         loc_ts4 = {
1720             "proto": 8,
1721             "start_port": 1,
1722             "end_port": 19,
1723             "start_addr": "3.3.3.2",
1724             "end_addr": "3.3.3.3",
1725         }
1726         rem_ts4 = {
1727             "proto": 9,
1728             "start_port": 10,
1729             "end_port": 119,
1730             "start_addr": "4.5.76.80",
1731             "end_addr": "2.3.4.6",
1732         }
1733
1734         loc_ts6 = {
1735             "proto": 8,
1736             "start_port": 1,
1737             "end_port": 19,
1738             "start_addr": "ab::1",
1739             "end_addr": "ab::4",
1740         }
1741         rem_ts6 = {
1742             "proto": 9,
1743             "start_port": 10,
1744             "end_port": 119,
1745             "start_addr": "cd::12",
1746             "end_addr": "cd::13",
1747         }
1748
1749         conf = {
1750             "p1": {
1751                 "name": "p1",
1752                 "natt_disabled": True,
1753                 "loc_id": ("fqdn", b"vpp.home"),
1754                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1755                 "loc_ts": loc_ts4,
1756                 "rem_ts": rem_ts4,
1757                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1758                 "ike_ts": {
1759                     "crypto_alg": 20,
1760                     "crypto_key_size": 32,
1761                     "integ_alg": 0,
1762                     "dh_group": 1,
1763                 },
1764                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1765                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1766                 "udp_encap": True,
1767                 "ipsec_over_udp_port": 4501,
1768                 "lifetime_data": {
1769                     "lifetime": 123,
1770                     "lifetime_maxdata": 20192,
1771                     "lifetime_jitter": 9,
1772                     "handover": 132,
1773                 },
1774             },
1775             "p2": {
1776                 "name": "p2",
1777                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1778                 "rem_id": ("ip6-addr", b"abcd::1"),
1779                 "loc_ts": loc_ts6,
1780                 "rem_ts": rem_ts6,
1781                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1782                 "ike_ts": {
1783                     "crypto_alg": 12,
1784                     "crypto_key_size": 16,
1785                     "integ_alg": 3,
1786                     "dh_group": 3,
1787                 },
1788                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1789                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1790                 "udp_encap": False,
1791                 "ipsec_over_udp_port": 4600,
1792                 "tun_itf": 0,
1793             },
1794         }
1795         self.p1 = self.configure_profile(conf["p1"])
1796         self.p2 = self.configure_profile(conf["p2"])
1797
1798         r = self.vapi.ikev2_profile_dump()
1799         self.assertEqual(len(r), 2)
1800         self.verify_profile(r[0].profile, conf["p1"])
1801         self.verify_profile(r[1].profile, conf["p2"])
1802
1803     def verify_id(self, api_id, cfg_id):
1804         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1805         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1806
1807     def verify_ts(self, api_ts, cfg_ts):
1808         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1809         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1810         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1811         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1812         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1813
1814     def verify_responder(self, api_r, cfg_r):
1815         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1816         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1817
1818     def verify_transforms(self, api_ts, cfg_ts):
1819         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1820         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1821         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1822
1823     def verify_ike_transforms(self, api_ts, cfg_ts):
1824         self.verify_transforms(api_ts, cfg_ts)
1825         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1826
1827     def verify_esp_transforms(self, api_ts, cfg_ts):
1828         self.verify_transforms(api_ts, cfg_ts)
1829
1830     def verify_auth(self, api_auth, cfg_auth):
1831         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1832         self.assertEqual(api_auth.data, cfg_auth["data"])
1833         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1834
1835     def verify_lifetime_data(self, p, ld):
1836         self.assertEqual(p.lifetime, ld["lifetime"])
1837         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1838         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1839         self.assertEqual(p.handover, ld["handover"])
1840
1841     def verify_profile(self, ap, cp):
1842         self.assertEqual(ap.name, cp["name"])
1843         self.assertEqual(ap.udp_encap, cp["udp_encap"])
1844         self.verify_id(ap.loc_id, cp["loc_id"])
1845         self.verify_id(ap.rem_id, cp["rem_id"])
1846         self.verify_ts(ap.loc_ts, cp["loc_ts"])
1847         self.verify_ts(ap.rem_ts, cp["rem_ts"])
1848         self.verify_responder(ap.responder, cp["responder"])
1849         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1850         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1851         self.verify_auth(ap.auth, cp["auth"])
1852         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1853         self.assertTrue(natt_dis == ap.natt_disabled)
1854
1855         if "lifetime_data" in cp:
1856             self.verify_lifetime_data(ap, cp["lifetime_data"])
1857         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1858         if "tun_itf" in cp:
1859             self.assertEqual(ap.tun_itf, cp["tun_itf"])
1860         else:
1861             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1862
1863
1864 @tag_fixme_vpp_workers
1865 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1866     """test responder - responder behind NAT"""
1867
1868     IKE_NODE_SUFFIX = "ip4-natt"
1869
1870     def config_tc(self):
1871         self.config_params({"r_natt": True})
1872
1873
1874 @tag_fixme_vpp_workers
1875 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1876     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1877
1878     def config_tc(self):
1879         self.config_params(
1880             {
1881                 "i_natt": True,
1882                 "is_initiator": False,  # seen from test case perspective
1883                 # thus vpp is initiator
1884                 "responder": {
1885                     "sw_if_index": self.pg0.sw_if_index,
1886                     "addr": self.pg0.remote_ip4,
1887                 },
1888                 "ike-crypto": ("AES-GCM-16ICV", 32),
1889                 "ike-integ": "NULL",
1890                 "ike-dh": "3072MODPgr",
1891                 "ike_transforms": {
1892                     "crypto_alg": 20,  # "aes-gcm-16"
1893                     "crypto_key_size": 256,
1894                     "dh_group": 15,  # "modp-3072"
1895                 },
1896                 "esp_transforms": {
1897                     "crypto_alg": 12,  # "aes-cbc"
1898                     "crypto_key_size": 256,
1899                     # "hmac-sha2-256-128"
1900                     "integ_alg": 12,
1901                 },
1902             }
1903         )
1904
1905
1906 @tag_fixme_vpp_workers
1907 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1908     """test ikev2 initiator - pre shared key auth"""
1909
1910     def config_tc(self):
1911         self.config_params(
1912             {
1913                 "is_initiator": False,  # seen from test case perspective
1914                 # thus vpp is initiator
1915                 "ike-crypto": ("AES-GCM-16ICV", 32),
1916                 "ike-integ": "NULL",
1917                 "ike-dh": "3072MODPgr",
1918                 "ike_transforms": {
1919                     "crypto_alg": 20,  # "aes-gcm-16"
1920                     "crypto_key_size": 256,
1921                     "dh_group": 15,  # "modp-3072"
1922                 },
1923                 "esp_transforms": {
1924                     "crypto_alg": 12,  # "aes-cbc"
1925                     "crypto_key_size": 256,
1926                     # "hmac-sha2-256-128"
1927                     "integ_alg": 12,
1928                 },
1929                 "responder_hostname": {
1930                     "hostname": "vpp.responder.org",
1931                     "sw_if_index": self.pg0.sw_if_index,
1932                 },
1933             }
1934         )
1935
1936
1937 @tag_fixme_vpp_workers
1938 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1939     """test initiator - request window size (1)"""
1940
1941     def rekey_respond(self, req, update_child_sa_data):
1942         ih = self.get_ike_header(req)
1943         plain = self.sa.hmac_and_decrypt(ih)
1944         sa = ikev2.IKEv2_payload_SA(plain)
1945         if update_child_sa_data:
1946             prop = sa[ikev2.IKEv2_payload_Proposal]
1947             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1948             self.sa.r_nonce = self.sa.i_nonce
1949             self.sa.child_sas[0].ispi = prop.SPI
1950             self.sa.child_sas[0].rspi = prop.SPI
1951             self.sa.calc_child_keys()
1952
1953         header = ikev2.IKEv2(
1954             init_SPI=self.sa.ispi,
1955             resp_SPI=self.sa.rspi,
1956             flags="Response",
1957             exch_type=36,
1958             id=ih.id,
1959             next_payload="Encrypted",
1960         )
1961         resp = self.encrypt_ike_msg(header, sa, "SA")
1962         packet = self.create_packet(
1963             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1964         )
1965         self.send_and_assert_no_replies(self.pg0, packet)
1966
1967     def test_initiator(self):
1968         super(TestInitiatorRequestWindowSize, self).test_initiator()
1969         self.pg0.enable_capture()
1970         self.pg_start()
1971         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1972         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1973         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1974         capture = self.pg0.get_capture(2)
1975
1976         # reply in reverse order
1977         self.rekey_respond(capture[1], True)
1978         self.rekey_respond(capture[0], False)
1979
1980         # verify that only the second request was accepted
1981         self.verify_ike_sas()
1982         self.verify_ipsec_sas(is_rekey=True)
1983
1984
1985 @tag_fixme_vpp_workers
1986 class TestInitiatorRekey(TestInitiatorPsk):
1987     """test ikev2 initiator - rekey"""
1988
1989     def rekey_from_initiator(self):
1990         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1991         self.pg0.enable_capture()
1992         self.pg_start()
1993         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1994         capture = self.pg0.get_capture(1)
1995         ih = self.get_ike_header(capture[0])
1996         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1997         self.assertNotIn("Response", ih.flags)
1998         self.assertIn("Initiator", ih.flags)
1999         plain = self.sa.hmac_and_decrypt(ih)
2000         sa = ikev2.IKEv2_payload_SA(plain)
2001         prop = sa[ikev2.IKEv2_payload_Proposal]
2002         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2003         self.sa.r_nonce = self.sa.i_nonce
2004         # update new responder SPI
2005         self.sa.child_sas[0].ispi = prop.SPI
2006         self.sa.child_sas[0].rspi = prop.SPI
2007         self.sa.calc_child_keys()
2008         header = ikev2.IKEv2(
2009             init_SPI=self.sa.ispi,
2010             resp_SPI=self.sa.rspi,
2011             flags="Response",
2012             exch_type=36,
2013             id=ih.id,
2014             next_payload="Encrypted",
2015         )
2016         resp = self.encrypt_ike_msg(header, sa, "SA")
2017         packet = self.create_packet(
2018             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2019         )
2020         self.send_and_assert_no_replies(self.pg0, packet)
2021
2022     def test_initiator(self):
2023         super(TestInitiatorRekey, self).test_initiator()
2024         self.rekey_from_initiator()
2025         self.verify_ike_sas()
2026         self.verify_ipsec_sas(is_rekey=True)
2027
2028
2029 @tag_fixme_vpp_workers
2030 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2031     """test ikev2 initiator - delete IKE SA from responder"""
2032
2033     def config_tc(self):
2034         self.config_params(
2035             {
2036                 "del_sa_from_responder": True,
2037                 "is_initiator": False,  # seen from test case perspective
2038                 # thus vpp is initiator
2039                 "responder": {
2040                     "sw_if_index": self.pg0.sw_if_index,
2041                     "addr": self.pg0.remote_ip4,
2042                 },
2043                 "ike-crypto": ("AES-GCM-16ICV", 32),
2044                 "ike-integ": "NULL",
2045                 "ike-dh": "3072MODPgr",
2046                 "ike_transforms": {
2047                     "crypto_alg": 20,  # "aes-gcm-16"
2048                     "crypto_key_size": 256,
2049                     "dh_group": 15,  # "modp-3072"
2050                 },
2051                 "esp_transforms": {
2052                     "crypto_alg": 12,  # "aes-cbc"
2053                     "crypto_key_size": 256,
2054                     # "hmac-sha2-256-128"
2055                     "integ_alg": 12,
2056                 },
2057                 "no_idr_in_auth": True,
2058             }
2059         )
2060
2061
2062 @tag_fixme_vpp_workers
2063 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2064     """test ikev2 responder - initiator behind NAT"""
2065
2066     IKE_NODE_SUFFIX = "ip4-natt"
2067
2068     def config_tc(self):
2069         self.config_params({"i_natt": True})
2070
2071
2072 @tag_fixme_vpp_workers
2073 class TestResponderPsk(TemplateResponder, Ikev2Params):
2074     """test ikev2 responder - pre shared key auth"""
2075
2076     def config_tc(self):
2077         self.config_params()
2078
2079
2080 @tag_fixme_vpp_workers
2081 class TestResponderDpd(TestResponderPsk):
2082     """
2083     Dead peer detection test
2084     """
2085
2086     def config_tc(self):
2087         self.config_params({"dpd_disabled": False})
2088
2089     def tearDown(self):
2090         pass
2091
2092     def test_responder(self):
2093         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2094         super(TestResponderDpd, self).test_responder()
2095         self.pg0.enable_capture()
2096         self.pg_start()
2097         # capture empty request but don't reply
2098         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2099         ih = self.get_ike_header(capture[0])
2100         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2101         plain = self.sa.hmac_and_decrypt(ih)
2102         self.assertEqual(plain, b"")
2103         # wait for SA expiration
2104         time.sleep(3)
2105         ike_sas = self.vapi.ikev2_sa_dump()
2106         self.assertEqual(len(ike_sas), 0)
2107         ipsec_sas = self.vapi.ipsec_sa_dump()
2108         self.assertEqual(len(ipsec_sas), 0)
2109
2110
2111 @tag_fixme_vpp_workers
2112 class TestResponderRekey(TestResponderPsk):
2113     """test ikev2 responder - rekey"""
2114
2115     WITH_KEX = False
2116
2117     def send_rekey_from_initiator(self):
2118         if self.WITH_KEX:
2119             self.sa.generate_dh_data()
2120         packet = self.create_rekey_request(kex=self.WITH_KEX)
2121         self.pg0.add_stream(packet)
2122         self.pg0.enable_capture()
2123         self.pg_start()
2124         capture = self.pg0.get_capture(1)
2125         return capture
2126
2127     def process_rekey_response(self, capture):
2128         ih = self.get_ike_header(capture[0])
2129         plain = self.sa.hmac_and_decrypt(ih)
2130         sa = ikev2.IKEv2_payload_SA(plain)
2131         prop = sa[ikev2.IKEv2_payload_Proposal]
2132         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2133         # update new responder SPI
2134         self.sa.child_sas[0].rspi = prop.SPI
2135         if self.WITH_KEX:
2136             self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2137             self.sa.complete_dh_data()
2138         self.sa.calc_child_keys(kex=self.WITH_KEX)
2139
2140     def test_responder(self):
2141         super(TestResponderRekey, self).test_responder()
2142         self.process_rekey_response(self.send_rekey_from_initiator())
2143         self.verify_ike_sas()
2144         self.verify_ipsec_sas(is_rekey=True)
2145         self.assert_counter(1, "rekey_req", "ip4")
2146         r = self.vapi.ikev2_sa_dump()
2147         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2148
2149
2150 @tag_fixme_vpp_workers
2151 class TestResponderRekeyRepeat(TestResponderRekey):
2152     """test ikev2 responder - rekey repeat"""
2153
2154     def test_responder(self):
2155         super(TestResponderRekeyRepeat, self).test_responder()
2156         # rekey request is not accepted until old IPsec SA is expired
2157         capture = self.send_rekey_from_initiator()
2158         ih = self.get_ike_header(capture[0])
2159         plain = self.sa.hmac_and_decrypt(ih)
2160         notify = ikev2.IKEv2_payload_Notify(plain)
2161         self.assertEqual(notify.type, 43)
2162         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2163         # rekey request is accepted after old IPsec SA was expired
2164         for _ in range(50):
2165             if len(self.vapi.ipsec_sa_dump()) != 3:
2166                 break
2167             time.sleep(0.2)
2168         else:
2169             self.fail("old IPsec SA not expired")
2170         self.process_rekey_response(self.send_rekey_from_initiator())
2171         self.verify_ike_sas()
2172         self.verify_ipsec_sas(sa_count=3)
2173
2174
2175 @tag_fixme_vpp_workers
2176 class TestResponderRekeyKEX(TestResponderRekey):
2177     """test ikev2 responder - rekey with key exchange"""
2178
2179     WITH_KEX = True
2180
2181
2182 @tag_fixme_vpp_workers
2183 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2184     """test ikev2 responder - rekey repeat with key exchange"""
2185
2186     WITH_KEX = True
2187
2188
2189 @tag_fixme_ubuntu2204
2190 @tag_fixme_debian11
2191 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2192     """test ikev2 responder - non-default table id"""
2193
2194     @classmethod
2195     def setUpClass(cls):
2196         import scapy.contrib.ikev2 as _ikev2
2197
2198         globals()["ikev2"] = _ikev2
2199         super(IkePeer, cls).setUpClass()
2200         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2201             cls, "vpp"
2202         ):
2203             return
2204         cls.create_pg_interfaces(range(1))
2205         cls.vapi.cli("ip table add 1")
2206         cls.vapi.cli("set interface ip table pg0 1")
2207         for i in cls.pg_interfaces:
2208             i.admin_up()
2209             i.config_ip4()
2210             i.resolve_arp()
2211             i.config_ip6()
2212             i.resolve_ndp()
2213
2214     def config_tc(self):
2215         self.config_params({"dpd_disabled": False})
2216
2217     def test_responder(self):
2218         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2219         super(TestResponderVrf, self).test_responder()
2220         self.pg0.enable_capture()
2221         self.pg_start()
2222         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2223         ih = self.get_ike_header(capture[0])
2224         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2225         plain = self.sa.hmac_and_decrypt(ih)
2226         self.assertEqual(plain, b"")
2227
2228
2229 @tag_fixme_vpp_workers
2230 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2231     """test ikev2 responder - cert based auth"""
2232
2233     def config_tc(self):
2234         self.config_params(
2235             {
2236                 "udp_encap": True,
2237                 "auth": "rsa-sig",
2238                 "server-key": "server-key.pem",
2239                 "client-key": "client-key.pem",
2240                 "client-cert": "client-cert.pem",
2241                 "server-cert": "server-cert.pem",
2242             }
2243         )
2244
2245
2246 @tag_fixme_vpp_workers
2247 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2248     TemplateResponder, Ikev2Params
2249 ):
2250     """
2251     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2252     """
2253
2254     def config_tc(self):
2255         self.config_params(
2256             {
2257                 "ike-crypto": ("AES-CBC", 16),
2258                 "ike-integ": "SHA2-256-128",
2259                 "esp-crypto": ("AES-CBC", 24),
2260                 "esp-integ": "SHA2-384-192",
2261                 "ike-dh": "2048MODPgr",
2262                 "nonce": os.urandom(256),
2263                 "no_idr_in_auth": True,
2264             }
2265         )
2266
2267
2268 @tag_fixme_vpp_workers
2269 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2270     TemplateResponder, Ikev2Params
2271 ):
2272
2273     """
2274     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2275     """
2276
2277     def config_tc(self):
2278         self.config_params(
2279             {
2280                 "ike-crypto": ("AES-CBC", 32),
2281                 "ike-integ": "SHA2-256-128",
2282                 "esp-crypto": ("AES-GCM-16ICV", 32),
2283                 "esp-integ": "NULL",
2284                 "ike-dh": "3072MODPgr",
2285             }
2286         )
2287
2288
2289 @tag_fixme_vpp_workers
2290 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2291     """
2292     IKE:AES_GCM_16_256
2293     """
2294
2295     IKE_NODE_SUFFIX = "ip6"
2296
2297     def config_tc(self):
2298         self.config_params(
2299             {
2300                 "del_sa_from_responder": True,
2301                 "ip6": True,
2302                 "natt": True,
2303                 "ike-crypto": ("AES-GCM-16ICV", 32),
2304                 "ike-integ": "NULL",
2305                 "ike-dh": "2048MODPgr",
2306                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2307                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2308             }
2309         )
2310
2311
2312 @tag_fixme_vpp_workers
2313 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2314     """
2315     Test for keep alive messages
2316     """
2317
2318     def send_empty_req_from_responder(self):
2319         packet = self.create_empty_request()
2320         self.pg0.add_stream(packet)
2321         self.pg0.enable_capture()
2322         self.pg_start()
2323         capture = self.pg0.get_capture(1)
2324         ih = self.get_ike_header(capture[0])
2325         self.assertEqual(ih.id, self.sa.msg_id)
2326         plain = self.sa.hmac_and_decrypt(ih)
2327         self.assertEqual(plain, b"")
2328         self.assert_counter(1, "keepalive", "ip4")
2329         r = self.vapi.ikev2_sa_dump()
2330         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2331
2332     def test_initiator(self):
2333         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2334         self.send_empty_req_from_responder()
2335
2336
2337 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2338     """malformed packet test"""
2339
2340     def tearDown(self):
2341         pass
2342
2343     def config_tc(self):
2344         self.config_params()
2345
2346     def create_ike_init_msg(self, length=None, payload=None):
2347         msg = ikev2.IKEv2(
2348             length=length,
2349             init_SPI="\x11" * 8,
2350             flags="Initiator",
2351             exch_type="IKE_SA_INIT",
2352         )
2353         if payload is not None:
2354             msg /= payload
2355         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2356
2357     def verify_bad_packet_length(self):
2358         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2359         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2360         self.assert_counter(self.pkt_count, "bad_length")
2361
2362     def verify_bad_sa_payload_length(self):
2363         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2364         ike_msg = self.create_ike_init_msg(payload=p)
2365         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2366         self.assert_counter(self.pkt_count, "malformed_packet")
2367
2368     def test_responder(self):
2369         self.pkt_count = 254
2370         self.verify_bad_packet_length()
2371         self.verify_bad_sa_payload_length()
2372
2373
2374 if __name__ == "__main__":
2375     unittest.main(testRunner=VppTestRunner)