nat: improve nat44-ed outside address distribution
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
23 from framework import is_distro_ubuntu2204, is_distro_debian11
24 from framework import VppTestCase, VppTestRunner
25 from vpp_ikev2 import Profile, IDType, AuthMethod
26 from vpp_papi import VppEnum
27
28 try:
29     text_type = unicode
30 except NameError:
31     text_type = str
32
33 KEY_PAD = b"Key Pad for IKEv2"
34 SALT_SIZE = 4
35 GCM_ICV_SIZE = 16
36 GCM_IV_SIZE = 8
37
38
39 # defined in rfc3526
40 # tuple structure is (p, g, key_len)
41 DH = {
42     "2048MODPgr": (
43         long_converter(
44             """
45     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
56         ),
57         2,
58         256,
59     ),
60     "3072MODPgr": (
61         long_converter(
62             """
63     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
64     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
65     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
66     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
67     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
68     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
69     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
70     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
71     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
72     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
73     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
74     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
75     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
76     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
77     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
78     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
79         ),
80         2,
81         384,
82     ),
83 }
84
85
86 class CryptoAlgo(object):
87     def __init__(self, name, cipher, mode):
88         self.name = name
89         self.cipher = cipher
90         self.mode = mode
91         if self.cipher is not None:
92             self.bs = self.cipher.block_size // 8
93
94             if self.name == "AES-GCM-16ICV":
95                 self.iv_len = GCM_IV_SIZE
96             else:
97                 self.iv_len = self.bs
98
99     def encrypt(self, data, key, aad=None):
100         iv = os.urandom(self.iv_len)
101         if aad is None:
102             encryptor = Cipher(
103                 self.cipher(key), self.mode(iv), default_backend()
104             ).encryptor()
105             return iv + encryptor.update(data) + encryptor.finalize()
106         else:
107             salt = key[-SALT_SIZE:]
108             nonce = salt + iv
109             encryptor = Cipher(
110                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
111             ).encryptor()
112             encryptor.authenticate_additional_data(aad)
113             data = encryptor.update(data) + encryptor.finalize()
114             data += encryptor.tag[:GCM_ICV_SIZE]
115             return iv + data
116
117     def decrypt(self, data, key, aad=None, icv=None):
118         if aad is None:
119             iv = data[: self.iv_len]
120             ct = data[self.iv_len :]
121             decryptor = Cipher(
122                 algorithms.AES(key), self.mode(iv), default_backend()
123             ).decryptor()
124             return decryptor.update(ct) + decryptor.finalize()
125         else:
126             salt = key[-SALT_SIZE:]
127             nonce = salt + data[:GCM_IV_SIZE]
128             ct = data[GCM_IV_SIZE:]
129             key = key[:-SALT_SIZE]
130             decryptor = Cipher(
131                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
132             ).decryptor()
133             decryptor.authenticate_additional_data(aad)
134             return decryptor.update(ct) + decryptor.finalize()
135
136     def pad(self, data):
137         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
138         data = data + b"\x00" * (pad_len - 1)
139         return data + bytes([pad_len - 1])
140
141
142 class AuthAlgo(object):
143     def __init__(self, name, mac, mod, key_len, trunc_len=None):
144         self.name = name
145         self.mac = mac
146         self.mod = mod
147         self.key_len = key_len
148         self.trunc_len = trunc_len or key_len
149
150
151 CRYPTO_ALGOS = {
152     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
153     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
154     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
155 }
156
157 AUTH_ALGOS = {
158     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
159     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
160     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
161     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
162     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
163 }
164
165 PRF_ALGOS = {
166     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
167     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
168 }
169
170 CRYPTO_IDS = {
171     12: "AES-CBC",
172     20: "AES-GCM-16ICV",
173 }
174
175 INTEG_IDS = {
176     2: "HMAC-SHA1-96",
177     12: "SHA2-256-128",
178     13: "SHA2-384-192",
179     14: "SHA2-512-256",
180 }
181
182
183 class IKEv2ChildSA(object):
184     def __init__(self, local_ts, remote_ts, is_initiator):
185         spi = os.urandom(4)
186         if is_initiator:
187             self.ispi = spi
188             self.rspi = None
189         else:
190             self.rspi = spi
191             self.ispi = None
192         self.local_ts = local_ts
193         self.remote_ts = remote_ts
194
195
196 class IKEv2SA(object):
197     def __init__(
198         self,
199         test,
200         is_initiator=True,
201         i_id=None,
202         r_id=None,
203         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
204         id_type="fqdn",
205         nonce=None,
206         auth_data=None,
207         local_ts=None,
208         remote_ts=None,
209         auth_method="shared-key",
210         priv_key=None,
211         i_natt=False,
212         r_natt=False,
213         udp_encap=False,
214     ):
215         self.udp_encap = udp_encap
216         self.i_natt = i_natt
217         self.r_natt = r_natt
218         if i_natt or r_natt:
219             self.sport = 4500
220             self.dport = 4500
221         else:
222             self.sport = 500
223             self.dport = 500
224         self.msg_id = 0
225         self.dh_params = None
226         self.test = test
227         self.priv_key = priv_key
228         self.is_initiator = is_initiator
229         nonce = nonce or os.urandom(32)
230         self.auth_data = auth_data
231         self.i_id = i_id
232         self.r_id = r_id
233         if isinstance(id_type, str):
234             self.id_type = IDType.value(id_type)
235         else:
236             self.id_type = id_type
237         self.auth_method = auth_method
238         if self.is_initiator:
239             self.rspi = 8 * b"\x00"
240             self.ispi = spi
241             self.i_nonce = nonce
242         else:
243             self.rspi = spi
244             self.ispi = 8 * b"\x00"
245             self.r_nonce = nonce
246         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
247
248     def new_msg_id(self):
249         self.msg_id += 1
250         return self.msg_id
251
252     @property
253     def my_dh_pub_key(self):
254         if self.is_initiator:
255             return self.i_dh_data
256         return self.r_dh_data
257
258     @property
259     def peer_dh_pub_key(self):
260         if self.is_initiator:
261             return self.r_dh_data
262         return self.i_dh_data
263
264     @property
265     def natt(self):
266         return self.i_natt or self.r_natt
267
268     def compute_secret(self):
269         priv = self.dh_private_key
270         peer = self.peer_dh_pub_key
271         p, g, l = self.ike_group
272         return pow(
273             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
274         ).to_bytes(l, "big")
275
276     def generate_dh_data(self):
277         # generate DH keys
278         if self.ike_dh not in DH:
279             raise NotImplementedError("%s not in DH group" % self.ike_dh)
280
281         if self.dh_params is None:
282             dhg = DH[self.ike_dh]
283             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
284             self.dh_params = pn.parameters(default_backend())
285
286         priv = self.dh_params.generate_private_key()
287         pub = priv.public_key()
288         x = priv.private_numbers().x
289         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
290         y = pub.public_numbers().y
291
292         if self.is_initiator:
293             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
294         else:
295             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
296
297     def complete_dh_data(self):
298         self.dh_shared_secret = self.compute_secret()
299
300     def calc_child_keys(self, kex=False):
301         prf = self.ike_prf_alg.mod()
302         s = self.i_nonce + self.r_nonce
303         if kex:
304             s = self.dh_shared_secret + s
305         c = self.child_sas[0]
306
307         encr_key_len = self.esp_crypto_key_len
308         integ_key_len = self.esp_integ_alg.key_len
309         salt_len = 0 if integ_key_len else 4
310
311         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
312         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
313
314         pos = 0
315         c.sk_ei = keymat[pos : pos + encr_key_len]
316         pos += encr_key_len
317
318         if integ_key_len:
319             c.sk_ai = keymat[pos : pos + integ_key_len]
320             pos += integ_key_len
321         else:
322             c.salt_ei = keymat[pos : pos + salt_len]
323             pos += salt_len
324
325         c.sk_er = keymat[pos : pos + encr_key_len]
326         pos += encr_key_len
327
328         if integ_key_len:
329             c.sk_ar = keymat[pos : pos + integ_key_len]
330             pos += integ_key_len
331         else:
332             c.salt_er = keymat[pos : pos + salt_len]
333             pos += salt_len
334
335     def calc_prfplus(self, prf, key, seed, length):
336         r = b""
337         t = None
338         x = 1
339         while len(r) < length and x < 255:
340             if t is not None:
341                 s = t
342             else:
343                 s = b""
344             s = s + seed + bytes([x])
345             t = self.calc_prf(prf, key, s)
346             r = r + t
347             x = x + 1
348
349         if x == 255:
350             return None
351         return r
352
353     def calc_prf(self, prf, key, data):
354         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
355         h.update(data)
356         return h.finalize()
357
358     def calc_keys(self):
359         prf = self.ike_prf_alg.mod()
360         # SKEYSEED = prf(Ni | Nr, g^ir)
361         s = self.i_nonce + self.r_nonce
362         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
363
364         # calculate S = Ni | Nr | SPIi SPIr
365         s = s + self.ispi + self.rspi
366
367         prf_key_trunc = self.ike_prf_alg.trunc_len
368         encr_key_len = self.ike_crypto_key_len
369         tr_prf_key_len = self.ike_prf_alg.key_len
370         integ_key_len = self.ike_integ_alg.key_len
371         if integ_key_len == 0:
372             salt_size = 4
373         else:
374             salt_size = 0
375
376         l = (
377             prf_key_trunc
378             + integ_key_len * 2
379             + encr_key_len * 2
380             + tr_prf_key_len * 2
381             + salt_size * 2
382         )
383         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
384
385         pos = 0
386         self.sk_d = keymat[: pos + prf_key_trunc]
387         pos += prf_key_trunc
388
389         self.sk_ai = keymat[pos : pos + integ_key_len]
390         pos += integ_key_len
391         self.sk_ar = keymat[pos : pos + integ_key_len]
392         pos += integ_key_len
393
394         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
395         pos += encr_key_len + salt_size
396         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
397         pos += encr_key_len + salt_size
398
399         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
400         pos += tr_prf_key_len
401         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
402
403     def generate_authmsg(self, prf, packet):
404         if self.is_initiator:
405             id = self.i_id
406             nonce = self.r_nonce
407             key = self.sk_pi
408         else:
409             id = self.r_id
410             nonce = self.i_nonce
411             key = self.sk_pr
412         data = bytes([self.id_type, 0, 0, 0]) + id
413         id_hash = self.calc_prf(prf, key, data)
414         return packet + nonce + id_hash
415
416     def auth_init(self):
417         prf = self.ike_prf_alg.mod()
418         if self.is_initiator:
419             packet = self.init_req_packet
420         else:
421             packet = self.init_resp_packet
422         authmsg = self.generate_authmsg(prf, raw(packet))
423         if self.auth_method == "shared-key":
424             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
425             self.auth_data = self.calc_prf(prf, psk, authmsg)
426         elif self.auth_method == "rsa-sig":
427             self.auth_data = self.priv_key.sign(
428                 authmsg, padding.PKCS1v15(), hashes.SHA1()
429             )
430         else:
431             raise TypeError("unknown auth method type!")
432
433     def encrypt(self, data, aad=None):
434         data = self.ike_crypto_alg.pad(data)
435         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
436
437     @property
438     def peer_authkey(self):
439         if self.is_initiator:
440             return self.sk_ar
441         return self.sk_ai
442
443     @property
444     def my_authkey(self):
445         if self.is_initiator:
446             return self.sk_ai
447         return self.sk_ar
448
449     @property
450     def my_cryptokey(self):
451         if self.is_initiator:
452             return self.sk_ei
453         return self.sk_er
454
455     @property
456     def peer_cryptokey(self):
457         if self.is_initiator:
458             return self.sk_er
459         return self.sk_ei
460
461     def concat(self, alg, key_len):
462         return alg + "-" + str(key_len * 8)
463
464     @property
465     def vpp_ike_cypto_alg(self):
466         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
467
468     @property
469     def vpp_esp_cypto_alg(self):
470         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
471
472     def verify_hmac(self, ikemsg):
473         integ_trunc = self.ike_integ_alg.trunc_len
474         exp_hmac = ikemsg[-integ_trunc:]
475         data = ikemsg[:-integ_trunc]
476         computed_hmac = self.compute_hmac(
477             self.ike_integ_alg.mod(), self.peer_authkey, data
478         )
479         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
480
481     def compute_hmac(self, integ, key, data):
482         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
483         h.update(data)
484         return h.finalize()
485
486     def decrypt(self, data, aad=None, icv=None):
487         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
488
489     def hmac_and_decrypt(self, ike):
490         ep = ike[ikev2.IKEv2_payload_Encrypted]
491         if self.ike_crypto == "AES-GCM-16ICV":
492             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
493             ct = ep.load[:-GCM_ICV_SIZE]
494             tag = ep.load[-GCM_ICV_SIZE:]
495             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
496         else:
497             self.verify_hmac(raw(ike))
498             integ_trunc = self.ike_integ_alg.trunc_len
499
500             # remove ICV and decrypt payload
501             ct = ep.load[:-integ_trunc]
502             plain = self.decrypt(ct)
503         # remove padding
504         pad_len = plain[-1]
505         return plain[: -pad_len - 1]
506
507     def build_ts_addr(self, ts, version):
508         return {
509             "starting_address_v" + version: ts["start_addr"],
510             "ending_address_v" + version: ts["end_addr"],
511         }
512
513     def generate_ts(self, is_ip4):
514         c = self.child_sas[0]
515         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
516         if is_ip4:
517             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
518             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
519             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
520             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
521         else:
522             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
523             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
524             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
525             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
526
527         if self.is_initiator:
528             return ([ts1], [ts2])
529         return ([ts2], [ts1])
530
531     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
532         if crypto not in CRYPTO_ALGOS:
533             raise TypeError("unsupported encryption algo %r" % crypto)
534         self.ike_crypto = crypto
535         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
536         self.ike_crypto_key_len = crypto_key_len
537
538         if integ not in AUTH_ALGOS:
539             raise TypeError("unsupported auth algo %r" % integ)
540         self.ike_integ = None if integ == "NULL" else integ
541         self.ike_integ_alg = AUTH_ALGOS[integ]
542
543         if prf not in PRF_ALGOS:
544             raise TypeError("unsupported prf algo %r" % prf)
545         self.ike_prf = prf
546         self.ike_prf_alg = PRF_ALGOS[prf]
547         self.ike_dh = dh
548         self.ike_group = DH[self.ike_dh]
549
550     def set_esp_props(self, crypto, crypto_key_len, integ):
551         self.esp_crypto_key_len = crypto_key_len
552         if crypto not in CRYPTO_ALGOS:
553             raise TypeError("unsupported encryption algo %r" % crypto)
554         self.esp_crypto = crypto
555         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
556
557         if integ not in AUTH_ALGOS:
558             raise TypeError("unsupported auth algo %r" % integ)
559         self.esp_integ = None if integ == "NULL" else integ
560         self.esp_integ_alg = AUTH_ALGOS[integ]
561
562     def crypto_attr(self, key_len):
563         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
564             return (0x800E << 16 | key_len << 3, 12)
565         else:
566             raise Exception("unsupported attribute type")
567
568     def ike_crypto_attr(self):
569         return self.crypto_attr(self.ike_crypto_key_len)
570
571     def esp_crypto_attr(self):
572         return self.crypto_attr(self.esp_crypto_key_len)
573
574     def compute_nat_sha1(self, ip, port, rspi=None):
575         if rspi is None:
576             rspi = self.rspi
577         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
578         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
579         digest.update(data)
580         return digest.finalize()
581
582
583 class IkePeer(VppTestCase):
584     """common class for initiator and responder"""
585
586     @classmethod
587     def setUpClass(cls):
588         import scapy.contrib.ikev2 as _ikev2
589
590         globals()["ikev2"] = _ikev2
591         super(IkePeer, cls).setUpClass()
592         cls.create_pg_interfaces(range(2))
593         for i in cls.pg_interfaces:
594             i.admin_up()
595             i.config_ip4()
596             i.resolve_arp()
597             i.config_ip6()
598             i.resolve_ndp()
599
600     @classmethod
601     def tearDownClass(cls):
602         super(IkePeer, cls).tearDownClass()
603
604     def tearDown(self):
605         super(IkePeer, self).tearDown()
606         if self.del_sa_from_responder:
607             self.initiate_del_sa_from_responder()
608         else:
609             self.initiate_del_sa_from_initiator()
610         r = self.vapi.ikev2_sa_dump()
611         self.assertEqual(len(r), 0)
612         sas = self.vapi.ipsec_sa_dump()
613         self.assertEqual(len(sas), 0)
614         self.p.remove_vpp_config()
615         self.assertIsNone(self.p.query_vpp_config())
616
617     def setUp(self):
618         super(IkePeer, self).setUp()
619         self.config_tc()
620         self.p.add_vpp_config()
621         self.assertIsNotNone(self.p.query_vpp_config())
622         if self.sa.is_initiator:
623             self.sa.generate_dh_data()
624         self.vapi.cli("ikev2 set logging level 4")
625         self.vapi.cli("event-lo clear")
626
627     def assert_counter(self, count, name, version="ip4"):
628         node_name = "/err/ikev2-%s/" % version + name
629         self.assertEqual(count, self.statistics.get_err_counter(node_name))
630
631     def create_rekey_request(self, kex=False):
632         sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
633         header = ikev2.IKEv2(
634             init_SPI=self.sa.ispi,
635             resp_SPI=self.sa.rspi,
636             id=self.sa.new_msg_id(),
637             flags="Initiator",
638             exch_type="CREATE_CHILD_SA",
639         )
640
641         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
642         return self.create_packet(
643             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
644         )
645
646     def create_empty_request(self):
647         header = ikev2.IKEv2(
648             init_SPI=self.sa.ispi,
649             resp_SPI=self.sa.rspi,
650             id=self.sa.new_msg_id(),
651             flags="Initiator",
652             exch_type="INFORMATIONAL",
653             next_payload="Encrypted",
654         )
655
656         msg = self.encrypt_ike_msg(header, b"", None)
657         return self.create_packet(
658             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
659         )
660
661     def create_packet(
662         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
663     ):
664         if use_ip6:
665             src_ip = src_if.remote_ip6
666             dst_ip = src_if.local_ip6
667             ip_layer = IPv6
668         else:
669             src_ip = src_if.remote_ip4
670             dst_ip = src_if.local_ip4
671             ip_layer = IP
672         res = (
673             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
674             / ip_layer(src=src_ip, dst=dst_ip)
675             / UDP(sport=sport, dport=dport)
676         )
677         if natt:
678             # insert non ESP marker
679             res = res / Raw(b"\x00" * 4)
680         return res / msg
681
682     def verify_udp(self, udp):
683         self.assertEqual(udp.sport, self.sa.sport)
684         self.assertEqual(udp.dport, self.sa.dport)
685
686     def get_ike_header(self, packet):
687         try:
688             ih = packet[ikev2.IKEv2]
689             ih = self.verify_and_remove_non_esp_marker(ih)
690         except IndexError as e:
691             # this is a workaround for getting IKEv2 layer as both ikev2 and
692             # ipsec register for port 4500
693             esp = packet[ESP]
694             ih = self.verify_and_remove_non_esp_marker(esp)
695         self.assertEqual(ih.version, 0x20)
696         self.assertNotIn("Version", ih.flags)
697         return ih
698
699     def verify_and_remove_non_esp_marker(self, packet):
700         if self.sa.natt:
701             # if we are in nat traversal mode check for non esp marker
702             # and remove it
703             data = raw(packet)
704             self.assertEqual(data[:4], b"\x00" * 4)
705             return ikev2.IKEv2(data[4:])
706         else:
707             return packet
708
709     def encrypt_ike_msg(self, header, plain, first_payload):
710         if self.sa.ike_crypto == "AES-GCM-16ICV":
711             data = self.sa.ike_crypto_alg.pad(raw(plain))
712             plen = (
713                 len(data)
714                 + GCM_IV_SIZE
715                 + GCM_ICV_SIZE
716                 + len(ikev2.IKEv2_payload_Encrypted())
717             )
718             tlen = plen + len(ikev2.IKEv2())
719
720             # prepare aad data
721             sk_p = ikev2.IKEv2_payload_Encrypted(
722                 next_payload=first_payload, length=plen
723             )
724             header.length = tlen
725             res = header / sk_p
726             encr = self.sa.encrypt(raw(plain), raw(res))
727             sk_p = ikev2.IKEv2_payload_Encrypted(
728                 next_payload=first_payload, length=plen, load=encr
729             )
730             res = header / sk_p
731         else:
732             encr = self.sa.encrypt(raw(plain))
733             trunc_len = self.sa.ike_integ_alg.trunc_len
734             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
735             tlen = plen + len(ikev2.IKEv2())
736
737             sk_p = ikev2.IKEv2_payload_Encrypted(
738                 next_payload=first_payload, length=plen, load=encr
739             )
740             header.length = tlen
741             res = header / sk_p
742
743             integ_data = raw(res)
744             hmac_data = self.sa.compute_hmac(
745                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
746             )
747             res = res / Raw(hmac_data[:trunc_len])
748         assert len(res) == tlen
749         return res
750
751     def verify_udp_encap(self, ipsec_sa):
752         e = VppEnum.vl_api_ipsec_sad_flags_t
753         if self.sa.udp_encap or self.sa.natt:
754             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
755         else:
756             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
757
758     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
759         sas = self.vapi.ipsec_sa_dump()
760         if sa_count is None:
761             if is_rekey:
762                 # after rekey there is a short period of time in which old
763                 # inbound SA is still present
764                 sa_count = 3
765             else:
766                 sa_count = 2
767         self.assertEqual(len(sas), sa_count)
768         if self.sa.is_initiator:
769             if is_rekey:
770                 sa0 = sas[0].entry
771                 sa1 = sas[2].entry
772             else:
773                 sa0 = sas[0].entry
774                 sa1 = sas[1].entry
775         else:
776             if is_rekey:
777                 sa0 = sas[2].entry
778                 sa1 = sas[0].entry
779             else:
780                 sa1 = sas[0].entry
781                 sa0 = sas[1].entry
782
783         c = self.sa.child_sas[0]
784
785         self.verify_udp_encap(sa0)
786         self.verify_udp_encap(sa1)
787         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
788         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
789         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
790
791         if self.sa.esp_integ is None:
792             vpp_integ_alg = 0
793         else:
794             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
795         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
796         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
797
798         # verify crypto keys
799         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
800         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
801         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
802         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
803
804         # verify integ keys
805         if vpp_integ_alg:
806             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
807             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
808             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
809             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
810         else:
811             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
812             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
813
814     def verify_keymat(self, api_keys, keys, name):
815         km = getattr(keys, name)
816         api_km = getattr(api_keys, name)
817         api_km_len = getattr(api_keys, name + "_len")
818         self.assertEqual(len(km), api_km_len)
819         self.assertEqual(km, api_km[:api_km_len])
820
821     def verify_id(self, api_id, exp_id):
822         self.assertEqual(api_id.type, IDType.value(exp_id.type))
823         self.assertEqual(api_id.data_len, exp_id.data_len)
824         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
825
826     def verify_ike_sas(self):
827         r = self.vapi.ikev2_sa_dump()
828         self.assertEqual(len(r), 1)
829         sa = r[0].sa
830         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
831         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
832         if self.ip6:
833             if self.sa.is_initiator:
834                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
835                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
836             else:
837                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
838                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
839         else:
840             if self.sa.is_initiator:
841                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
842                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
843             else:
844                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
845                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
846         self.verify_keymat(sa.keys, self.sa, "sk_d")
847         self.verify_keymat(sa.keys, self.sa, "sk_ai")
848         self.verify_keymat(sa.keys, self.sa, "sk_ar")
849         self.verify_keymat(sa.keys, self.sa, "sk_ei")
850         self.verify_keymat(sa.keys, self.sa, "sk_er")
851         self.verify_keymat(sa.keys, self.sa, "sk_pi")
852         self.verify_keymat(sa.keys, self.sa, "sk_pr")
853
854         self.assertEqual(sa.i_id.type, self.sa.id_type)
855         self.assertEqual(sa.r_id.type, self.sa.id_type)
856         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
857         self.assertEqual(sa.r_id.data_len, len(self.idr))
858         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
859         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
860
861         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
862         self.assertEqual(len(r), 1)
863         csa = r[0].child_sa
864         self.assertEqual(csa.sa_index, sa.sa_index)
865         c = self.sa.child_sas[0]
866         if hasattr(c, "sk_ai"):
867             self.verify_keymat(csa.keys, c, "sk_ai")
868             self.verify_keymat(csa.keys, c, "sk_ar")
869         self.verify_keymat(csa.keys, c, "sk_ei")
870         self.verify_keymat(csa.keys, c, "sk_er")
871         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
872         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
873
874         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
875         tsi = tsi[0]
876         tsr = tsr[0]
877         r = self.vapi.ikev2_traffic_selector_dump(
878             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
879         )
880         self.assertEqual(len(r), 1)
881         ts = r[0].ts
882         self.verify_ts(r[0].ts, tsi[0], True)
883
884         r = self.vapi.ikev2_traffic_selector_dump(
885             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
886         )
887         self.assertEqual(len(r), 1)
888         self.verify_ts(r[0].ts, tsr[0], False)
889
890         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
891         self.verify_nonce(n, self.sa.i_nonce)
892         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
893         self.verify_nonce(n, self.sa.r_nonce)
894
895     def verify_nonce(self, api_nonce, nonce):
896         self.assertEqual(api_nonce.data_len, len(nonce))
897         self.assertEqual(api_nonce.nonce, nonce)
898
899     def verify_ts(self, api_ts, ts, is_initiator):
900         if is_initiator:
901             self.assertTrue(api_ts.is_local)
902         else:
903             self.assertFalse(api_ts.is_local)
904
905         if self.p.ts_is_ip4:
906             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
907             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
908         else:
909             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
910             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
911         self.assertEqual(api_ts.start_port, ts.start_port)
912         self.assertEqual(api_ts.end_port, ts.end_port)
913         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
914
915
916 class TemplateInitiator(IkePeer):
917     """initiator test template"""
918
919     def initiate_del_sa_from_initiator(self):
920         ispi = int.from_bytes(self.sa.ispi, "little")
921         self.pg0.enable_capture()
922         self.pg_start()
923         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
924         capture = self.pg0.get_capture(1)
925         ih = self.get_ike_header(capture[0])
926         self.assertNotIn("Response", ih.flags)
927         self.assertIn("Initiator", ih.flags)
928         self.assertEqual(ih.init_SPI, self.sa.ispi)
929         self.assertEqual(ih.resp_SPI, self.sa.rspi)
930         plain = self.sa.hmac_and_decrypt(ih)
931         d = ikev2.IKEv2_payload_Delete(plain)
932         self.assertEqual(d.proto, 1)  # proto=IKEv2
933         header = ikev2.IKEv2(
934             init_SPI=self.sa.ispi,
935             resp_SPI=self.sa.rspi,
936             flags="Response",
937             exch_type="INFORMATIONAL",
938             id=ih.id,
939             next_payload="Encrypted",
940         )
941         resp = self.encrypt_ike_msg(header, b"", None)
942         self.send_and_assert_no_replies(self.pg0, resp)
943
944     def verify_del_sa(self, packet):
945         ih = self.get_ike_header(packet)
946         self.assertEqual(ih.id, self.sa.msg_id)
947         self.assertEqual(ih.exch_type, 37)  # exchange informational
948         self.assertIn("Response", ih.flags)
949         self.assertIn("Initiator", ih.flags)
950         plain = self.sa.hmac_and_decrypt(ih)
951         self.assertEqual(plain, b"")
952
953     def initiate_del_sa_from_responder(self):
954         header = ikev2.IKEv2(
955             init_SPI=self.sa.ispi,
956             resp_SPI=self.sa.rspi,
957             exch_type="INFORMATIONAL",
958             id=self.sa.new_msg_id(),
959         )
960         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
961         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
962         packet = self.create_packet(
963             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
964         )
965         self.pg0.add_stream(packet)
966         self.pg0.enable_capture()
967         self.pg_start()
968         capture = self.pg0.get_capture(1)
969         self.verify_del_sa(capture[0])
970
971     @staticmethod
972     def find_notify_payload(packet, notify_type):
973         n = packet[ikev2.IKEv2_payload_Notify]
974         while n is not None:
975             if n.type == notify_type:
976                 return n
977             n = n.payload
978         return None
979
980     def verify_nat_detection(self, packet):
981         if self.ip6:
982             iph = packet[IPv6]
983         else:
984             iph = packet[IP]
985         udp = packet[UDP]
986
987         # NAT_DETECTION_SOURCE_IP
988         s = self.find_notify_payload(packet, 16388)
989         self.assertIsNotNone(s)
990         src_sha = self.sa.compute_nat_sha1(
991             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
992         )
993         self.assertEqual(s.load, src_sha)
994
995         # NAT_DETECTION_DESTINATION_IP
996         s = self.find_notify_payload(packet, 16389)
997         self.assertIsNotNone(s)
998         dst_sha = self.sa.compute_nat_sha1(
999             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1000         )
1001         self.assertEqual(s.load, dst_sha)
1002
1003     def verify_sa_init_request(self, packet):
1004         udp = packet[UDP]
1005         self.sa.dport = udp.sport
1006         ih = packet[ikev2.IKEv2]
1007         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1008         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1009         self.sa.ispi = ih.init_SPI
1010         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1011         self.assertIn("Initiator", ih.flags)
1012         self.assertNotIn("Response", ih.flags)
1013         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1014         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1015
1016         prop = packet[ikev2.IKEv2_payload_Proposal]
1017         self.assertEqual(prop.proto, 1)  # proto = ikev2
1018         self.assertEqual(prop.proposal, 1)
1019         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1020         self.assertEqual(
1021             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1022         )
1023         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1024         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1025         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1026         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1027
1028         self.verify_nat_detection(packet)
1029         self.sa.set_ike_props(
1030             crypto="AES-GCM-16ICV",
1031             crypto_key_len=32,
1032             integ="NULL",
1033             prf="PRF_HMAC_SHA2_256",
1034             dh="3072MODPgr",
1035         )
1036         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1037         self.sa.generate_dh_data()
1038         self.sa.complete_dh_data()
1039         self.sa.calc_keys()
1040
1041     def update_esp_transforms(self, trans, sa):
1042         while trans:
1043             if trans.transform_type == 1:  # ecryption
1044                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1045             elif trans.transform_type == 3:  # integrity
1046                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1047             trans = trans.payload
1048
1049     def verify_sa_auth_req(self, packet):
1050         udp = packet[UDP]
1051         self.sa.dport = udp.sport
1052         ih = self.get_ike_header(packet)
1053         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1054         self.assertEqual(ih.init_SPI, self.sa.ispi)
1055         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1056         self.assertIn("Initiator", ih.flags)
1057         self.assertNotIn("Response", ih.flags)
1058
1059         udp = packet[UDP]
1060         self.verify_udp(udp)
1061         self.assertEqual(ih.id, self.sa.msg_id + 1)
1062         self.sa.msg_id += 1
1063         plain = self.sa.hmac_and_decrypt(ih)
1064         idi = ikev2.IKEv2_payload_IDi(plain)
1065         self.assertEqual(idi.load, self.sa.i_id)
1066         if self.no_idr_auth:
1067             self.assertEqual(idi.next_payload, 39)  # AUTH
1068         else:
1069             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1070             self.assertEqual(idr.load, self.sa.r_id)
1071         prop = idi[ikev2.IKEv2_payload_Proposal]
1072         c = self.sa.child_sas[0]
1073         c.ispi = prop.SPI
1074         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1075
1076     def send_init_response(self):
1077         tr_attr = self.sa.ike_crypto_attr()
1078         trans = (
1079             ikev2.IKEv2_payload_Transform(
1080                 transform_type="Encryption",
1081                 transform_id=self.sa.ike_crypto,
1082                 length=tr_attr[1],
1083                 key_length=tr_attr[0],
1084             )
1085             / ikev2.IKEv2_payload_Transform(
1086                 transform_type="Integrity", transform_id=self.sa.ike_integ
1087             )
1088             / ikev2.IKEv2_payload_Transform(
1089                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1090             )
1091             / ikev2.IKEv2_payload_Transform(
1092                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1093             )
1094         )
1095         props = ikev2.IKEv2_payload_Proposal(
1096             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1097         )
1098
1099         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1100         if self.sa.natt:
1101             dst_address = b"\x0a\x0a\x0a\x0a"
1102         else:
1103             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1104         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1105         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1106
1107         self.sa.init_resp_packet = (
1108             ikev2.IKEv2(
1109                 init_SPI=self.sa.ispi,
1110                 resp_SPI=self.sa.rspi,
1111                 exch_type="IKE_SA_INIT",
1112                 flags="Response",
1113             )
1114             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1115             / ikev2.IKEv2_payload_KE(
1116                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1117             )
1118             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1119             / ikev2.IKEv2_payload_Notify(
1120                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1121             )
1122             / ikev2.IKEv2_payload_Notify(
1123                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1124             )
1125         )
1126
1127         ike_msg = self.create_packet(
1128             self.pg0,
1129             self.sa.init_resp_packet,
1130             self.sa.sport,
1131             self.sa.dport,
1132             False,
1133             self.ip6,
1134         )
1135         self.pg_send(self.pg0, ike_msg)
1136         capture = self.pg0.get_capture(1)
1137         self.verify_sa_auth_req(capture[0])
1138
1139     def initiate_sa_init(self):
1140         self.pg0.enable_capture()
1141         self.pg_start()
1142         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1143
1144         capture = self.pg0.get_capture(1)
1145         self.verify_sa_init_request(capture[0])
1146         self.send_init_response()
1147
1148     def send_auth_response(self):
1149         tr_attr = self.sa.esp_crypto_attr()
1150         trans = (
1151             ikev2.IKEv2_payload_Transform(
1152                 transform_type="Encryption",
1153                 transform_id=self.sa.esp_crypto,
1154                 length=tr_attr[1],
1155                 key_length=tr_attr[0],
1156             )
1157             / ikev2.IKEv2_payload_Transform(
1158                 transform_type="Integrity", transform_id=self.sa.esp_integ
1159             )
1160             / ikev2.IKEv2_payload_Transform(
1161                 transform_type="Extended Sequence Number", transform_id="No ESN"
1162             )
1163             / ikev2.IKEv2_payload_Transform(
1164                 transform_type="Extended Sequence Number", transform_id="ESN"
1165             )
1166         )
1167
1168         c = self.sa.child_sas[0]
1169         props = ikev2.IKEv2_payload_Proposal(
1170             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1171         )
1172
1173         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1174         plain = (
1175             ikev2.IKEv2_payload_IDi(
1176                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1177             )
1178             / ikev2.IKEv2_payload_IDr(
1179                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1180             )
1181             / ikev2.IKEv2_payload_AUTH(
1182                 next_payload="SA",
1183                 auth_type=AuthMethod.value(self.sa.auth_method),
1184                 load=self.sa.auth_data,
1185             )
1186             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1187             / ikev2.IKEv2_payload_TSi(
1188                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1189             )
1190             / ikev2.IKEv2_payload_TSr(
1191                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1192             )
1193             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1194         )
1195
1196         header = ikev2.IKEv2(
1197             init_SPI=self.sa.ispi,
1198             resp_SPI=self.sa.rspi,
1199             id=self.sa.new_msg_id(),
1200             flags="Response",
1201             exch_type="IKE_AUTH",
1202         )
1203
1204         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1205         packet = self.create_packet(
1206             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1207         )
1208         self.pg_send(self.pg0, packet)
1209
1210     def test_initiator(self):
1211         self.initiate_sa_init()
1212         self.sa.auth_init()
1213         self.sa.calc_child_keys()
1214         self.send_auth_response()
1215         self.verify_ike_sas()
1216
1217
1218 class TemplateResponder(IkePeer):
1219     """responder test template"""
1220
1221     def initiate_del_sa_from_responder(self):
1222         self.pg0.enable_capture()
1223         self.pg_start()
1224         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1225         capture = self.pg0.get_capture(1)
1226         ih = self.get_ike_header(capture[0])
1227         self.assertNotIn("Response", ih.flags)
1228         self.assertNotIn("Initiator", ih.flags)
1229         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1230         plain = self.sa.hmac_and_decrypt(ih)
1231         d = ikev2.IKEv2_payload_Delete(plain)
1232         self.assertEqual(d.proto, 1)  # proto=IKEv2
1233         self.assertEqual(ih.init_SPI, self.sa.ispi)
1234         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1235         header = ikev2.IKEv2(
1236             init_SPI=self.sa.ispi,
1237             resp_SPI=self.sa.rspi,
1238             flags="Initiator+Response",
1239             exch_type="INFORMATIONAL",
1240             id=ih.id,
1241             next_payload="Encrypted",
1242         )
1243         resp = self.encrypt_ike_msg(header, b"", None)
1244         self.send_and_assert_no_replies(self.pg0, resp)
1245
1246     def verify_del_sa(self, packet):
1247         ih = self.get_ike_header(packet)
1248         self.assertEqual(ih.id, self.sa.msg_id)
1249         self.assertEqual(ih.exch_type, 37)  # exchange informational
1250         self.assertIn("Response", ih.flags)
1251         self.assertNotIn("Initiator", ih.flags)
1252         self.assertEqual(ih.next_payload, 46)  # Encrypted
1253         self.assertEqual(ih.init_SPI, self.sa.ispi)
1254         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1255         plain = self.sa.hmac_and_decrypt(ih)
1256         self.assertEqual(plain, b"")
1257
1258     def initiate_del_sa_from_initiator(self):
1259         header = ikev2.IKEv2(
1260             init_SPI=self.sa.ispi,
1261             resp_SPI=self.sa.rspi,
1262             flags="Initiator",
1263             exch_type="INFORMATIONAL",
1264             id=self.sa.new_msg_id(),
1265         )
1266         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1267         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1268         packet = self.create_packet(
1269             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1270         )
1271         self.pg0.add_stream(packet)
1272         self.pg0.enable_capture()
1273         self.pg_start()
1274         capture = self.pg0.get_capture(1)
1275         self.verify_del_sa(capture[0])
1276
1277     def send_sa_init_req(self):
1278         tr_attr = self.sa.ike_crypto_attr()
1279         trans = (
1280             ikev2.IKEv2_payload_Transform(
1281                 transform_type="Encryption",
1282                 transform_id=self.sa.ike_crypto,
1283                 length=tr_attr[1],
1284                 key_length=tr_attr[0],
1285             )
1286             / ikev2.IKEv2_payload_Transform(
1287                 transform_type="Integrity", transform_id=self.sa.ike_integ
1288             )
1289             / ikev2.IKEv2_payload_Transform(
1290                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1291             )
1292             / ikev2.IKEv2_payload_Transform(
1293                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1294             )
1295         )
1296
1297         props = ikev2.IKEv2_payload_Proposal(
1298             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1299         )
1300
1301         next_payload = None if self.ip6 else "Notify"
1302
1303         self.sa.init_req_packet = (
1304             ikev2.IKEv2(
1305                 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1306             )
1307             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1308             / ikev2.IKEv2_payload_KE(
1309                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1310             )
1311             / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1312         )
1313
1314         if not self.ip6:
1315             if self.sa.i_natt:
1316                 src_address = b"\x0a\x0a\x0a\x01"
1317             else:
1318                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1319
1320             if self.sa.r_natt:
1321                 dst_address = b"\x0a\x0a\x0a\x0a"
1322             else:
1323                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1324
1325             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1326             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1327             nat_src_detection = ikev2.IKEv2_payload_Notify(
1328                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1329             )
1330             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1331                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1332             )
1333             self.sa.init_req_packet = (
1334                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1335             )
1336
1337         ike_msg = self.create_packet(
1338             self.pg0,
1339             self.sa.init_req_packet,
1340             self.sa.sport,
1341             self.sa.dport,
1342             self.sa.natt,
1343             self.ip6,
1344         )
1345         self.pg0.add_stream(ike_msg)
1346         self.pg0.enable_capture()
1347         self.pg_start()
1348         capture = self.pg0.get_capture(1)
1349         self.verify_sa_init(capture[0])
1350
1351     def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1352         tr_attr = self.sa.esp_crypto_attr()
1353         last_payload = last_payload or "Notify"
1354         trans_nb = 4
1355         trans = (
1356             ikev2.IKEv2_payload_Transform(
1357                 transform_type="Encryption",
1358                 transform_id=self.sa.esp_crypto,
1359                 length=tr_attr[1],
1360                 key_length=tr_attr[0],
1361             )
1362             / ikev2.IKEv2_payload_Transform(
1363                 transform_type="Integrity", transform_id=self.sa.esp_integ
1364             )
1365             / ikev2.IKEv2_payload_Transform(
1366                 transform_type="Extended Sequence Number", transform_id="No ESN"
1367             )
1368             / ikev2.IKEv2_payload_Transform(
1369                 transform_type="Extended Sequence Number", transform_id="ESN"
1370             )
1371         )
1372
1373         if kex:
1374             trans_nb += 1
1375             trans /= ikev2.IKEv2_payload_Transform(
1376                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1377             )
1378
1379         c = self.sa.child_sas[0]
1380         props = ikev2.IKEv2_payload_Proposal(
1381             proposal=1,
1382             proto="ESP",
1383             SPIsize=4,
1384             SPI=c.ispi,
1385             trans_nb=trans_nb,
1386             trans=trans,
1387         )
1388
1389         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1390         plain = (
1391             ikev2.IKEv2_payload_AUTH(
1392                 next_payload="SA",
1393                 auth_type=AuthMethod.value(self.sa.auth_method),
1394                 load=self.sa.auth_data,
1395             )
1396             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1397             / ikev2.IKEv2_payload_TSi(
1398                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1399             )
1400             / ikev2.IKEv2_payload_TSr(
1401                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1402             )
1403         )
1404
1405         if is_rekey:
1406             first_payload = "Nonce"
1407             if kex:
1408                 head = ikev2.IKEv2_payload_Nonce(
1409                     load=self.sa.i_nonce, next_payload="KE"
1410                 ) / ikev2.IKEv2_payload_KE(
1411                     group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1412                 )
1413             else:
1414                 head = ikev2.IKEv2_payload_Nonce(
1415                     load=self.sa.i_nonce, next_payload="SA"
1416                 )
1417             plain = (
1418                 head
1419                 / plain
1420                 / ikev2.IKEv2_payload_Notify(
1421                     type="REKEY_SA",
1422                     proto="ESP",
1423                     SPI=c.ispi,
1424                     length=8 + len(c.ispi),
1425                     next_payload="Notify",
1426                 )
1427                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1428             )
1429         else:
1430             first_payload = "IDi"
1431             if self.no_idr_auth:
1432                 ids = ikev2.IKEv2_payload_IDi(
1433                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1434                 )
1435             else:
1436                 ids = ikev2.IKEv2_payload_IDi(
1437                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1438                 ) / ikev2.IKEv2_payload_IDr(
1439                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1440                 )
1441             plain = ids / plain
1442         return plain, first_payload
1443
1444     def send_sa_auth(self):
1445         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1446         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1447         header = ikev2.IKEv2(
1448             init_SPI=self.sa.ispi,
1449             resp_SPI=self.sa.rspi,
1450             id=self.sa.new_msg_id(),
1451             flags="Initiator",
1452             exch_type="IKE_AUTH",
1453         )
1454
1455         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1456         packet = self.create_packet(
1457             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1458         )
1459         self.pg0.add_stream(packet)
1460         self.pg0.enable_capture()
1461         self.pg_start()
1462         capture = self.pg0.get_capture(1)
1463         self.verify_sa_auth_resp(capture[0])
1464
1465     def verify_sa_init(self, packet):
1466         ih = self.get_ike_header(packet)
1467
1468         self.assertEqual(ih.id, self.sa.msg_id)
1469         self.assertEqual(ih.exch_type, 34)
1470         self.assertIn("Response", ih.flags)
1471         self.assertEqual(ih.init_SPI, self.sa.ispi)
1472         self.assertNotEqual(ih.resp_SPI, 0)
1473         self.sa.rspi = ih.resp_SPI
1474         try:
1475             sa = ih[ikev2.IKEv2_payload_SA]
1476             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1477             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1478         except IndexError as e:
1479             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1480             self.logger.error(ih.show())
1481             raise
1482         self.sa.complete_dh_data()
1483         self.sa.calc_keys()
1484         self.sa.auth_init()
1485
1486     def verify_sa_auth_resp(self, packet):
1487         ike = self.get_ike_header(packet)
1488         udp = packet[UDP]
1489         self.verify_udp(udp)
1490         self.assertEqual(ike.id, self.sa.msg_id)
1491         plain = self.sa.hmac_and_decrypt(ike)
1492         idr = ikev2.IKEv2_payload_IDr(plain)
1493         prop = idr[ikev2.IKEv2_payload_Proposal]
1494         self.assertEqual(prop.SPIsize, 4)
1495         self.sa.child_sas[0].rspi = prop.SPI
1496         self.sa.calc_child_keys()
1497
1498     IKE_NODE_SUFFIX = "ip4"
1499
1500     def verify_counters(self):
1501         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1502         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1503         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1504
1505         r = self.vapi.ikev2_sa_dump()
1506         s = r[0].sa.stats
1507         self.assertEqual(1, s.n_sa_auth_req)
1508         self.assertEqual(1, s.n_sa_init_req)
1509
1510     def test_responder(self):
1511         self.send_sa_init_req()
1512         self.send_sa_auth()
1513         self.verify_ipsec_sas()
1514         self.verify_ike_sas()
1515         self.verify_counters()
1516
1517
1518 class Ikev2Params(object):
1519     def config_params(self, params={}):
1520         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1521         ei = VppEnum.vl_api_ipsec_integ_alg_t
1522         self.vpp_enums = {
1523             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1524             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1525             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1526             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1527             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1528             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1529             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1530             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1531             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1532             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1533         }
1534
1535         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1536         if dpd_disabled:
1537             self.vapi.cli("ikev2 dpd disable")
1538         self.del_sa_from_responder = (
1539             False
1540             if "del_sa_from_responder" not in params
1541             else params["del_sa_from_responder"]
1542         )
1543         i_natt = False if "i_natt" not in params else params["i_natt"]
1544         r_natt = False if "r_natt" not in params else params["r_natt"]
1545         self.p = Profile(self, "pr1")
1546         self.ip6 = False if "ip6" not in params else params["ip6"]
1547
1548         if "auth" in params and params["auth"] == "rsa-sig":
1549             auth_method = "rsa-sig"
1550             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1551             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1552
1553             client_file = work_dir + params["client-cert"]
1554             server_pem = open(work_dir + params["server-cert"]).read()
1555             client_priv = open(work_dir + params["client-key"]).read()
1556             client_priv = load_pem_private_key(
1557                 str.encode(client_priv), None, default_backend()
1558             )
1559             self.peer_cert = x509.load_pem_x509_certificate(
1560                 str.encode(server_pem), default_backend()
1561             )
1562             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1563             auth_data = None
1564         else:
1565             auth_data = b"$3cr3tpa$$w0rd"
1566             self.p.add_auth(method="shared-key", data=auth_data)
1567             auth_method = "shared-key"
1568             client_priv = None
1569
1570         is_init = True if "is_initiator" not in params else params["is_initiator"]
1571         self.no_idr_auth = params.get("no_idr_in_auth", False)
1572
1573         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1574         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1575         r_id = self.idr = idr["data"]
1576         i_id = self.idi = idi["data"]
1577         if is_init:
1578             # scapy is initiator, VPP is responder
1579             self.p.add_local_id(**idr)
1580             self.p.add_remote_id(**idi)
1581             if self.no_idr_auth:
1582                 r_id = None
1583         else:
1584             # VPP is initiator, scapy is responder
1585             self.p.add_local_id(**idi)
1586             if not self.no_idr_auth:
1587                 self.p.add_remote_id(**idr)
1588
1589         loc_ts = (
1590             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1591             if "loc_ts" not in params
1592             else params["loc_ts"]
1593         )
1594         rem_ts = (
1595             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1596             if "rem_ts" not in params
1597             else params["rem_ts"]
1598         )
1599         self.p.add_local_ts(**loc_ts)
1600         self.p.add_remote_ts(**rem_ts)
1601         if "responder" in params:
1602             self.p.add_responder(params["responder"])
1603         if "ike_transforms" in params:
1604             self.p.add_ike_transforms(params["ike_transforms"])
1605         if "esp_transforms" in params:
1606             self.p.add_esp_transforms(params["esp_transforms"])
1607
1608         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1609         if udp_encap:
1610             self.p.set_udp_encap(True)
1611
1612         if "responder_hostname" in params:
1613             hn = params["responder_hostname"]
1614             self.p.add_responder_hostname(hn)
1615
1616             # configure static dns record
1617             self.vapi.dns_name_server_add_del(
1618                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1619             )
1620             self.vapi.dns_enable_disable(enable=1)
1621
1622             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1623             self.vapi.cli(cmd)
1624
1625         self.sa = IKEv2SA(
1626             self,
1627             i_id=i_id,
1628             r_id=r_id,
1629             is_initiator=is_init,
1630             id_type=self.p.local_id["id_type"],
1631             i_natt=i_natt,
1632             r_natt=r_natt,
1633             priv_key=client_priv,
1634             auth_method=auth_method,
1635             nonce=params.get("nonce"),
1636             auth_data=auth_data,
1637             udp_encap=udp_encap,
1638             local_ts=self.p.remote_ts,
1639             remote_ts=self.p.local_ts,
1640         )
1641
1642         if is_init:
1643             ike_crypto = (
1644                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1645             )
1646             ike_integ = (
1647                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1648             )
1649             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1650
1651             esp_crypto = (
1652                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1653             )
1654             esp_integ = (
1655                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1656             )
1657
1658             self.sa.set_ike_props(
1659                 crypto=ike_crypto[0],
1660                 crypto_key_len=ike_crypto[1],
1661                 integ=ike_integ,
1662                 prf="PRF_HMAC_SHA2_256",
1663                 dh=ike_dh,
1664             )
1665             self.sa.set_esp_props(
1666                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1667             )
1668
1669
1670 class TestApi(VppTestCase):
1671     """Test IKEV2 API"""
1672
1673     @classmethod
1674     def setUpClass(cls):
1675         super(TestApi, cls).setUpClass()
1676
1677     @classmethod
1678     def tearDownClass(cls):
1679         super(TestApi, cls).tearDownClass()
1680
1681     def tearDown(self):
1682         super(TestApi, self).tearDown()
1683         self.p1.remove_vpp_config()
1684         self.p2.remove_vpp_config()
1685         r = self.vapi.ikev2_profile_dump()
1686         self.assertEqual(len(r), 0)
1687
1688     def configure_profile(self, cfg):
1689         p = Profile(self, cfg["name"])
1690         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1691         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1692         p.add_local_ts(**cfg["loc_ts"])
1693         p.add_remote_ts(**cfg["rem_ts"])
1694         p.add_responder(cfg["responder"])
1695         p.add_ike_transforms(cfg["ike_ts"])
1696         p.add_esp_transforms(cfg["esp_ts"])
1697         p.add_auth(**cfg["auth"])
1698         p.set_udp_encap(cfg["udp_encap"])
1699         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1700         if "lifetime_data" in cfg:
1701             p.set_lifetime_data(cfg["lifetime_data"])
1702         if "tun_itf" in cfg:
1703             p.set_tunnel_interface(cfg["tun_itf"])
1704         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1705             p.disable_natt()
1706         p.add_vpp_config()
1707         return p
1708
1709     def test_profile_api(self):
1710         """test profile dump API"""
1711         loc_ts4 = {
1712             "proto": 8,
1713             "start_port": 1,
1714             "end_port": 19,
1715             "start_addr": "3.3.3.2",
1716             "end_addr": "3.3.3.3",
1717         }
1718         rem_ts4 = {
1719             "proto": 9,
1720             "start_port": 10,
1721             "end_port": 119,
1722             "start_addr": "4.5.76.80",
1723             "end_addr": "2.3.4.6",
1724         }
1725
1726         loc_ts6 = {
1727             "proto": 8,
1728             "start_port": 1,
1729             "end_port": 19,
1730             "start_addr": "ab::1",
1731             "end_addr": "ab::4",
1732         }
1733         rem_ts6 = {
1734             "proto": 9,
1735             "start_port": 10,
1736             "end_port": 119,
1737             "start_addr": "cd::12",
1738             "end_addr": "cd::13",
1739         }
1740
1741         conf = {
1742             "p1": {
1743                 "name": "p1",
1744                 "natt_disabled": True,
1745                 "loc_id": ("fqdn", b"vpp.home"),
1746                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1747                 "loc_ts": loc_ts4,
1748                 "rem_ts": rem_ts4,
1749                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1750                 "ike_ts": {
1751                     "crypto_alg": 20,
1752                     "crypto_key_size": 32,
1753                     "integ_alg": 0,
1754                     "dh_group": 1,
1755                 },
1756                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1757                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1758                 "udp_encap": True,
1759                 "ipsec_over_udp_port": 4501,
1760                 "lifetime_data": {
1761                     "lifetime": 123,
1762                     "lifetime_maxdata": 20192,
1763                     "lifetime_jitter": 9,
1764                     "handover": 132,
1765                 },
1766             },
1767             "p2": {
1768                 "name": "p2",
1769                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1770                 "rem_id": ("ip6-addr", b"abcd::1"),
1771                 "loc_ts": loc_ts6,
1772                 "rem_ts": rem_ts6,
1773                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1774                 "ike_ts": {
1775                     "crypto_alg": 12,
1776                     "crypto_key_size": 16,
1777                     "integ_alg": 3,
1778                     "dh_group": 3,
1779                 },
1780                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1781                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1782                 "udp_encap": False,
1783                 "ipsec_over_udp_port": 4600,
1784                 "tun_itf": 0,
1785             },
1786         }
1787         self.p1 = self.configure_profile(conf["p1"])
1788         self.p2 = self.configure_profile(conf["p2"])
1789
1790         r = self.vapi.ikev2_profile_dump()
1791         self.assertEqual(len(r), 2)
1792         self.verify_profile(r[0].profile, conf["p1"])
1793         self.verify_profile(r[1].profile, conf["p2"])
1794
1795     def verify_id(self, api_id, cfg_id):
1796         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1797         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1798
1799     def verify_ts(self, api_ts, cfg_ts):
1800         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1801         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1802         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1803         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1804         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1805
1806     def verify_responder(self, api_r, cfg_r):
1807         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1808         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1809
1810     def verify_transforms(self, api_ts, cfg_ts):
1811         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1812         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1813         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1814
1815     def verify_ike_transforms(self, api_ts, cfg_ts):
1816         self.verify_transforms(api_ts, cfg_ts)
1817         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1818
1819     def verify_esp_transforms(self, api_ts, cfg_ts):
1820         self.verify_transforms(api_ts, cfg_ts)
1821
1822     def verify_auth(self, api_auth, cfg_auth):
1823         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1824         self.assertEqual(api_auth.data, cfg_auth["data"])
1825         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1826
1827     def verify_lifetime_data(self, p, ld):
1828         self.assertEqual(p.lifetime, ld["lifetime"])
1829         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1830         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1831         self.assertEqual(p.handover, ld["handover"])
1832
1833     def verify_profile(self, ap, cp):
1834         self.assertEqual(ap.name, cp["name"])
1835         self.assertEqual(ap.udp_encap, cp["udp_encap"])
1836         self.verify_id(ap.loc_id, cp["loc_id"])
1837         self.verify_id(ap.rem_id, cp["rem_id"])
1838         self.verify_ts(ap.loc_ts, cp["loc_ts"])
1839         self.verify_ts(ap.rem_ts, cp["rem_ts"])
1840         self.verify_responder(ap.responder, cp["responder"])
1841         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1842         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1843         self.verify_auth(ap.auth, cp["auth"])
1844         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1845         self.assertTrue(natt_dis == ap.natt_disabled)
1846
1847         if "lifetime_data" in cp:
1848             self.verify_lifetime_data(ap, cp["lifetime_data"])
1849         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1850         if "tun_itf" in cp:
1851             self.assertEqual(ap.tun_itf, cp["tun_itf"])
1852         else:
1853             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1854
1855
1856 @tag_fixme_vpp_workers
1857 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1858     """test responder - responder behind NAT"""
1859
1860     IKE_NODE_SUFFIX = "ip4-natt"
1861
1862     def config_tc(self):
1863         self.config_params({"r_natt": True})
1864
1865
1866 @tag_fixme_vpp_workers
1867 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1868     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1869
1870     def config_tc(self):
1871         self.config_params(
1872             {
1873                 "i_natt": True,
1874                 "is_initiator": False,  # seen from test case perspective
1875                 # thus vpp is initiator
1876                 "responder": {
1877                     "sw_if_index": self.pg0.sw_if_index,
1878                     "addr": self.pg0.remote_ip4,
1879                 },
1880                 "ike-crypto": ("AES-GCM-16ICV", 32),
1881                 "ike-integ": "NULL",
1882                 "ike-dh": "3072MODPgr",
1883                 "ike_transforms": {
1884                     "crypto_alg": 20,  # "aes-gcm-16"
1885                     "crypto_key_size": 256,
1886                     "dh_group": 15,  # "modp-3072"
1887                 },
1888                 "esp_transforms": {
1889                     "crypto_alg": 12,  # "aes-cbc"
1890                     "crypto_key_size": 256,
1891                     # "hmac-sha2-256-128"
1892                     "integ_alg": 12,
1893                 },
1894             }
1895         )
1896
1897
1898 @tag_fixme_vpp_workers
1899 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1900     """test ikev2 initiator - pre shared key auth"""
1901
1902     def config_tc(self):
1903         self.config_params(
1904             {
1905                 "is_initiator": False,  # seen from test case perspective
1906                 # thus vpp is initiator
1907                 "ike-crypto": ("AES-GCM-16ICV", 32),
1908                 "ike-integ": "NULL",
1909                 "ike-dh": "3072MODPgr",
1910                 "ike_transforms": {
1911                     "crypto_alg": 20,  # "aes-gcm-16"
1912                     "crypto_key_size": 256,
1913                     "dh_group": 15,  # "modp-3072"
1914                 },
1915                 "esp_transforms": {
1916                     "crypto_alg": 12,  # "aes-cbc"
1917                     "crypto_key_size": 256,
1918                     # "hmac-sha2-256-128"
1919                     "integ_alg": 12,
1920                 },
1921                 "responder_hostname": {
1922                     "hostname": "vpp.responder.org",
1923                     "sw_if_index": self.pg0.sw_if_index,
1924                 },
1925             }
1926         )
1927
1928
1929 @tag_fixme_vpp_workers
1930 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1931     """test initiator - request window size (1)"""
1932
1933     def rekey_respond(self, req, update_child_sa_data):
1934         ih = self.get_ike_header(req)
1935         plain = self.sa.hmac_and_decrypt(ih)
1936         sa = ikev2.IKEv2_payload_SA(plain)
1937         if update_child_sa_data:
1938             prop = sa[ikev2.IKEv2_payload_Proposal]
1939             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1940             self.sa.r_nonce = self.sa.i_nonce
1941             self.sa.child_sas[0].ispi = prop.SPI
1942             self.sa.child_sas[0].rspi = prop.SPI
1943             self.sa.calc_child_keys()
1944
1945         header = ikev2.IKEv2(
1946             init_SPI=self.sa.ispi,
1947             resp_SPI=self.sa.rspi,
1948             flags="Response",
1949             exch_type=36,
1950             id=ih.id,
1951             next_payload="Encrypted",
1952         )
1953         resp = self.encrypt_ike_msg(header, sa, "SA")
1954         packet = self.create_packet(
1955             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1956         )
1957         self.send_and_assert_no_replies(self.pg0, packet)
1958
1959     def test_initiator(self):
1960         super(TestInitiatorRequestWindowSize, self).test_initiator()
1961         self.pg0.enable_capture()
1962         self.pg_start()
1963         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1964         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1965         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1966         capture = self.pg0.get_capture(2)
1967
1968         # reply in reverse order
1969         self.rekey_respond(capture[1], True)
1970         self.rekey_respond(capture[0], False)
1971
1972         # verify that only the second request was accepted
1973         self.verify_ike_sas()
1974         self.verify_ipsec_sas(is_rekey=True)
1975
1976
1977 @tag_fixme_vpp_workers
1978 class TestInitiatorRekey(TestInitiatorPsk):
1979     """test ikev2 initiator - rekey"""
1980
1981     def rekey_from_initiator(self):
1982         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1983         self.pg0.enable_capture()
1984         self.pg_start()
1985         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1986         capture = self.pg0.get_capture(1)
1987         ih = self.get_ike_header(capture[0])
1988         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1989         self.assertNotIn("Response", ih.flags)
1990         self.assertIn("Initiator", ih.flags)
1991         plain = self.sa.hmac_and_decrypt(ih)
1992         sa = ikev2.IKEv2_payload_SA(plain)
1993         prop = sa[ikev2.IKEv2_payload_Proposal]
1994         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1995         self.sa.r_nonce = self.sa.i_nonce
1996         # update new responder SPI
1997         self.sa.child_sas[0].ispi = prop.SPI
1998         self.sa.child_sas[0].rspi = prop.SPI
1999         self.sa.calc_child_keys()
2000         header = ikev2.IKEv2(
2001             init_SPI=self.sa.ispi,
2002             resp_SPI=self.sa.rspi,
2003             flags="Response",
2004             exch_type=36,
2005             id=ih.id,
2006             next_payload="Encrypted",
2007         )
2008         resp = self.encrypt_ike_msg(header, sa, "SA")
2009         packet = self.create_packet(
2010             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2011         )
2012         self.send_and_assert_no_replies(self.pg0, packet)
2013
2014     def test_initiator(self):
2015         super(TestInitiatorRekey, self).test_initiator()
2016         self.rekey_from_initiator()
2017         self.verify_ike_sas()
2018         self.verify_ipsec_sas(is_rekey=True)
2019
2020
2021 @tag_fixme_vpp_workers
2022 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2023     """test ikev2 initiator - delete IKE SA from responder"""
2024
2025     def config_tc(self):
2026         self.config_params(
2027             {
2028                 "del_sa_from_responder": True,
2029                 "is_initiator": False,  # seen from test case perspective
2030                 # thus vpp is initiator
2031                 "responder": {
2032                     "sw_if_index": self.pg0.sw_if_index,
2033                     "addr": self.pg0.remote_ip4,
2034                 },
2035                 "ike-crypto": ("AES-GCM-16ICV", 32),
2036                 "ike-integ": "NULL",
2037                 "ike-dh": "3072MODPgr",
2038                 "ike_transforms": {
2039                     "crypto_alg": 20,  # "aes-gcm-16"
2040                     "crypto_key_size": 256,
2041                     "dh_group": 15,  # "modp-3072"
2042                 },
2043                 "esp_transforms": {
2044                     "crypto_alg": 12,  # "aes-cbc"
2045                     "crypto_key_size": 256,
2046                     # "hmac-sha2-256-128"
2047                     "integ_alg": 12,
2048                 },
2049                 "no_idr_in_auth": True,
2050             }
2051         )
2052
2053
2054 @tag_fixme_vpp_workers
2055 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2056     """test ikev2 responder - initiator behind NAT"""
2057
2058     IKE_NODE_SUFFIX = "ip4-natt"
2059
2060     def config_tc(self):
2061         self.config_params({"i_natt": True})
2062
2063
2064 @tag_fixme_vpp_workers
2065 class TestResponderPsk(TemplateResponder, Ikev2Params):
2066     """test ikev2 responder - pre shared key auth"""
2067
2068     def config_tc(self):
2069         self.config_params()
2070
2071
2072 @tag_fixme_vpp_workers
2073 class TestResponderDpd(TestResponderPsk):
2074     """
2075     Dead peer detection test
2076     """
2077
2078     def config_tc(self):
2079         self.config_params({"dpd_disabled": False})
2080
2081     def tearDown(self):
2082         pass
2083
2084     def test_responder(self):
2085         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2086         super(TestResponderDpd, self).test_responder()
2087         self.pg0.enable_capture()
2088         self.pg_start()
2089         # capture empty request but don't reply
2090         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2091         ih = self.get_ike_header(capture[0])
2092         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2093         plain = self.sa.hmac_and_decrypt(ih)
2094         self.assertEqual(plain, b"")
2095         # wait for SA expiration
2096         time.sleep(3)
2097         ike_sas = self.vapi.ikev2_sa_dump()
2098         self.assertEqual(len(ike_sas), 0)
2099         ipsec_sas = self.vapi.ipsec_sa_dump()
2100         self.assertEqual(len(ipsec_sas), 0)
2101
2102
2103 @tag_fixme_vpp_workers
2104 class TestResponderRekey(TestResponderPsk):
2105     """test ikev2 responder - rekey"""
2106
2107     WITH_KEX = False
2108
2109     def send_rekey_from_initiator(self):
2110         if self.WITH_KEX:
2111             self.sa.generate_dh_data()
2112         packet = self.create_rekey_request(kex=self.WITH_KEX)
2113         self.pg0.add_stream(packet)
2114         self.pg0.enable_capture()
2115         self.pg_start()
2116         capture = self.pg0.get_capture(1)
2117         return capture
2118
2119     def process_rekey_response(self, capture):
2120         ih = self.get_ike_header(capture[0])
2121         plain = self.sa.hmac_and_decrypt(ih)
2122         sa = ikev2.IKEv2_payload_SA(plain)
2123         prop = sa[ikev2.IKEv2_payload_Proposal]
2124         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2125         # update new responder SPI
2126         self.sa.child_sas[0].rspi = prop.SPI
2127         if self.WITH_KEX:
2128             self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2129             self.sa.complete_dh_data()
2130         self.sa.calc_child_keys(kex=self.WITH_KEX)
2131
2132     def test_responder(self):
2133         super(TestResponderRekey, self).test_responder()
2134         self.process_rekey_response(self.send_rekey_from_initiator())
2135         self.verify_ike_sas()
2136         self.verify_ipsec_sas(is_rekey=True)
2137         self.assert_counter(1, "rekey_req", "ip4")
2138         r = self.vapi.ikev2_sa_dump()
2139         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2140
2141
2142 @tag_fixme_vpp_workers
2143 class TestResponderRekeyRepeat(TestResponderRekey):
2144     """test ikev2 responder - rekey repeat"""
2145
2146     def test_responder(self):
2147         super(TestResponderRekeyRepeat, self).test_responder()
2148         # rekey request is not accepted until old IPsec SA is expired
2149         capture = self.send_rekey_from_initiator()
2150         ih = self.get_ike_header(capture[0])
2151         plain = self.sa.hmac_and_decrypt(ih)
2152         notify = ikev2.IKEv2_payload_Notify(plain)
2153         self.assertEqual(notify.type, 43)
2154         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2155         # rekey request is accepted after old IPsec SA was expired
2156         for _ in range(50):
2157             if len(self.vapi.ipsec_sa_dump()) != 3:
2158                 break
2159             time.sleep(0.2)
2160         else:
2161             self.fail("old IPsec SA not expired")
2162         self.process_rekey_response(self.send_rekey_from_initiator())
2163         self.verify_ike_sas()
2164         self.verify_ipsec_sas(sa_count=3)
2165
2166
2167 @tag_fixme_vpp_workers
2168 class TestResponderRekeyKEX(TestResponderRekey):
2169     """test ikev2 responder - rekey with key exchange"""
2170
2171     WITH_KEX = True
2172
2173
2174 @tag_fixme_vpp_workers
2175 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2176     """test ikev2 responder - rekey repeat with key exchange"""
2177
2178     WITH_KEX = True
2179
2180
2181 @tag_fixme_ubuntu2204
2182 @tag_fixme_debian11
2183 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2184     """test ikev2 responder - non-default table id"""
2185
2186     @classmethod
2187     def setUpClass(cls):
2188         import scapy.contrib.ikev2 as _ikev2
2189
2190         globals()["ikev2"] = _ikev2
2191         super(IkePeer, cls).setUpClass()
2192         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2193             cls, "vpp"
2194         ):
2195             return
2196         cls.create_pg_interfaces(range(1))
2197         cls.vapi.cli("ip table add 1")
2198         cls.vapi.cli("set interface ip table pg0 1")
2199         for i in cls.pg_interfaces:
2200             i.admin_up()
2201             i.config_ip4()
2202             i.resolve_arp()
2203             i.config_ip6()
2204             i.resolve_ndp()
2205
2206     def config_tc(self):
2207         self.config_params({"dpd_disabled": False})
2208
2209     def test_responder(self):
2210         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2211         super(TestResponderVrf, self).test_responder()
2212         self.pg0.enable_capture()
2213         self.pg_start()
2214         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2215         ih = self.get_ike_header(capture[0])
2216         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2217         plain = self.sa.hmac_and_decrypt(ih)
2218         self.assertEqual(plain, b"")
2219
2220
2221 @tag_fixme_vpp_workers
2222 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2223     """test ikev2 responder - cert based auth"""
2224
2225     def config_tc(self):
2226         self.config_params(
2227             {
2228                 "udp_encap": True,
2229                 "auth": "rsa-sig",
2230                 "server-key": "server-key.pem",
2231                 "client-key": "client-key.pem",
2232                 "client-cert": "client-cert.pem",
2233                 "server-cert": "server-cert.pem",
2234             }
2235         )
2236
2237
2238 @tag_fixme_vpp_workers
2239 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2240     TemplateResponder, Ikev2Params
2241 ):
2242     """
2243     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2244     """
2245
2246     def config_tc(self):
2247         self.config_params(
2248             {
2249                 "ike-crypto": ("AES-CBC", 16),
2250                 "ike-integ": "SHA2-256-128",
2251                 "esp-crypto": ("AES-CBC", 24),
2252                 "esp-integ": "SHA2-384-192",
2253                 "ike-dh": "2048MODPgr",
2254                 "nonce": os.urandom(256),
2255                 "no_idr_in_auth": True,
2256             }
2257         )
2258
2259
2260 @tag_fixme_vpp_workers
2261 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2262     TemplateResponder, Ikev2Params
2263 ):
2264
2265     """
2266     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2267     """
2268
2269     def config_tc(self):
2270         self.config_params(
2271             {
2272                 "ike-crypto": ("AES-CBC", 32),
2273                 "ike-integ": "SHA2-256-128",
2274                 "esp-crypto": ("AES-GCM-16ICV", 32),
2275                 "esp-integ": "NULL",
2276                 "ike-dh": "3072MODPgr",
2277             }
2278         )
2279
2280
2281 @tag_fixme_vpp_workers
2282 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2283     """
2284     IKE:AES_GCM_16_256
2285     """
2286
2287     IKE_NODE_SUFFIX = "ip6"
2288
2289     def config_tc(self):
2290         self.config_params(
2291             {
2292                 "del_sa_from_responder": True,
2293                 "ip6": True,
2294                 "natt": True,
2295                 "ike-crypto": ("AES-GCM-16ICV", 32),
2296                 "ike-integ": "NULL",
2297                 "ike-dh": "2048MODPgr",
2298                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2299                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2300             }
2301         )
2302
2303
2304 @tag_fixme_vpp_workers
2305 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2306     """
2307     Test for keep alive messages
2308     """
2309
2310     def send_empty_req_from_responder(self):
2311         packet = self.create_empty_request()
2312         self.pg0.add_stream(packet)
2313         self.pg0.enable_capture()
2314         self.pg_start()
2315         capture = self.pg0.get_capture(1)
2316         ih = self.get_ike_header(capture[0])
2317         self.assertEqual(ih.id, self.sa.msg_id)
2318         plain = self.sa.hmac_and_decrypt(ih)
2319         self.assertEqual(plain, b"")
2320         self.assert_counter(1, "keepalive", "ip4")
2321         r = self.vapi.ikev2_sa_dump()
2322         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2323
2324     def test_initiator(self):
2325         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2326         self.send_empty_req_from_responder()
2327
2328
2329 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2330     """malformed packet test"""
2331
2332     def tearDown(self):
2333         pass
2334
2335     def config_tc(self):
2336         self.config_params()
2337
2338     def create_ike_init_msg(self, length=None, payload=None):
2339         msg = ikev2.IKEv2(
2340             length=length,
2341             init_SPI="\x11" * 8,
2342             flags="Initiator",
2343             exch_type="IKE_SA_INIT",
2344         )
2345         if payload is not None:
2346             msg /= payload
2347         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2348
2349     def verify_bad_packet_length(self):
2350         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2351         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2352         self.assert_counter(self.pkt_count, "bad_length")
2353
2354     def verify_bad_sa_payload_length(self):
2355         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2356         ike_msg = self.create_ike_init_msg(payload=p)
2357         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2358         self.assert_counter(self.pkt_count, "malformed_packet")
2359
2360     def test_responder(self):
2361         self.pkt_count = 254
2362         self.verify_bad_packet_length()
2363         self.verify_bad_sa_payload_length()
2364
2365
2366 if __name__ == "__main__":
2367     unittest.main(testRunner=VppTestRunner)