build: add ability to disable some plugins from packaging and tests
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
23 from framework import is_distro_ubuntu2204, is_distro_debian11
24 from framework import VppTestCase, VppTestRunner
25 from vpp_ikev2 import Profile, IDType, AuthMethod
26 from vpp_papi import VppEnum
27
28 try:
29     text_type = unicode
30 except NameError:
31     text_type = str
32
33 KEY_PAD = b"Key Pad for IKEv2"
34 SALT_SIZE = 4
35 GCM_ICV_SIZE = 16
36 GCM_IV_SIZE = 8
37
38
39 # defined in rfc3526
40 # tuple structure is (p, g, key_len)
41 DH = {
42     "2048MODPgr": (
43         long_converter(
44             """
45     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
46     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
47     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
48     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
49     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
50     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
51     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
52     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
53     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
54     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
55     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
56         ),
57         2,
58         256,
59     ),
60     "3072MODPgr": (
61         long_converter(
62             """
63     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
64     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
65     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
66     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
67     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
68     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
69     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
70     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
71     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
72     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
73     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
74     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
75     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
76     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
77     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
78     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
79         ),
80         2,
81         384,
82     ),
83 }
84
85
86 class CryptoAlgo(object):
87     def __init__(self, name, cipher, mode):
88         self.name = name
89         self.cipher = cipher
90         self.mode = mode
91         if self.cipher is not None:
92             self.bs = self.cipher.block_size // 8
93
94             if self.name == "AES-GCM-16ICV":
95                 self.iv_len = GCM_IV_SIZE
96             else:
97                 self.iv_len = self.bs
98
99     def encrypt(self, data, key, aad=None):
100         iv = os.urandom(self.iv_len)
101         if aad is None:
102             encryptor = Cipher(
103                 self.cipher(key), self.mode(iv), default_backend()
104             ).encryptor()
105             return iv + encryptor.update(data) + encryptor.finalize()
106         else:
107             salt = key[-SALT_SIZE:]
108             nonce = salt + iv
109             encryptor = Cipher(
110                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
111             ).encryptor()
112             encryptor.authenticate_additional_data(aad)
113             data = encryptor.update(data) + encryptor.finalize()
114             data += encryptor.tag[:GCM_ICV_SIZE]
115             return iv + data
116
117     def decrypt(self, data, key, aad=None, icv=None):
118         if aad is None:
119             iv = data[: self.iv_len]
120             ct = data[self.iv_len :]
121             decryptor = Cipher(
122                 algorithms.AES(key), self.mode(iv), default_backend()
123             ).decryptor()
124             return decryptor.update(ct) + decryptor.finalize()
125         else:
126             salt = key[-SALT_SIZE:]
127             nonce = salt + data[:GCM_IV_SIZE]
128             ct = data[GCM_IV_SIZE:]
129             key = key[:-SALT_SIZE]
130             decryptor = Cipher(
131                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
132             ).decryptor()
133             decryptor.authenticate_additional_data(aad)
134             return decryptor.update(ct) + decryptor.finalize()
135
136     def pad(self, data):
137         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
138         data = data + b"\x00" * (pad_len - 1)
139         return data + bytes([pad_len - 1])
140
141
142 class AuthAlgo(object):
143     def __init__(self, name, mac, mod, key_len, trunc_len=None):
144         self.name = name
145         self.mac = mac
146         self.mod = mod
147         self.key_len = key_len
148         self.trunc_len = trunc_len or key_len
149
150
151 CRYPTO_ALGOS = {
152     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
153     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
154     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
155 }
156
157 AUTH_ALGOS = {
158     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
159     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
160     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
161     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
162     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
163 }
164
165 PRF_ALGOS = {
166     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
167     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
168 }
169
170 CRYPTO_IDS = {
171     12: "AES-CBC",
172     20: "AES-GCM-16ICV",
173 }
174
175 INTEG_IDS = {
176     2: "HMAC-SHA1-96",
177     12: "SHA2-256-128",
178     13: "SHA2-384-192",
179     14: "SHA2-512-256",
180 }
181
182
183 class IKEv2ChildSA(object):
184     def __init__(self, local_ts, remote_ts, is_initiator):
185         spi = os.urandom(4)
186         if is_initiator:
187             self.ispi = spi
188             self.rspi = None
189         else:
190             self.rspi = spi
191             self.ispi = None
192         self.local_ts = local_ts
193         self.remote_ts = remote_ts
194
195
196 class IKEv2SA(object):
197     def __init__(
198         self,
199         test,
200         is_initiator=True,
201         i_id=None,
202         r_id=None,
203         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
204         id_type="fqdn",
205         nonce=None,
206         auth_data=None,
207         local_ts=None,
208         remote_ts=None,
209         auth_method="shared-key",
210         priv_key=None,
211         i_natt=False,
212         r_natt=False,
213         udp_encap=False,
214     ):
215         self.udp_encap = udp_encap
216         self.i_natt = i_natt
217         self.r_natt = r_natt
218         if i_natt or r_natt:
219             self.sport = 4500
220             self.dport = 4500
221         else:
222             self.sport = 500
223             self.dport = 500
224         self.msg_id = 0
225         self.dh_params = None
226         self.test = test
227         self.priv_key = priv_key
228         self.is_initiator = is_initiator
229         nonce = nonce or os.urandom(32)
230         self.auth_data = auth_data
231         self.i_id = i_id
232         self.r_id = r_id
233         if isinstance(id_type, str):
234             self.id_type = IDType.value(id_type)
235         else:
236             self.id_type = id_type
237         self.auth_method = auth_method
238         if self.is_initiator:
239             self.rspi = 8 * b"\x00"
240             self.ispi = spi
241             self.i_nonce = nonce
242         else:
243             self.rspi = spi
244             self.ispi = 8 * b"\x00"
245             self.r_nonce = nonce
246         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
247
248     def new_msg_id(self):
249         self.msg_id += 1
250         return self.msg_id
251
252     @property
253     def my_dh_pub_key(self):
254         if self.is_initiator:
255             return self.i_dh_data
256         return self.r_dh_data
257
258     @property
259     def peer_dh_pub_key(self):
260         if self.is_initiator:
261             return self.r_dh_data
262         return self.i_dh_data
263
264     @property
265     def natt(self):
266         return self.i_natt or self.r_natt
267
268     def compute_secret(self):
269         priv = self.dh_private_key
270         peer = self.peer_dh_pub_key
271         p, g, l = self.ike_group
272         return pow(
273             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
274         ).to_bytes(l, "big")
275
276     def generate_dh_data(self):
277         # generate DH keys
278         if self.ike_dh not in DH:
279             raise NotImplementedError("%s not in DH group" % self.ike_dh)
280
281         if self.dh_params is None:
282             dhg = DH[self.ike_dh]
283             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
284             self.dh_params = pn.parameters(default_backend())
285
286         priv = self.dh_params.generate_private_key()
287         pub = priv.public_key()
288         x = priv.private_numbers().x
289         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
290         y = pub.public_numbers().y
291
292         if self.is_initiator:
293             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
294         else:
295             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
296
297     def complete_dh_data(self):
298         self.dh_shared_secret = self.compute_secret()
299
300     def calc_child_keys(self, kex=False):
301         prf = self.ike_prf_alg.mod()
302         s = self.i_nonce + self.r_nonce
303         if kex:
304             s = self.dh_shared_secret + s
305         c = self.child_sas[0]
306
307         encr_key_len = self.esp_crypto_key_len
308         integ_key_len = self.esp_integ_alg.key_len
309         salt_len = 0 if integ_key_len else 4
310
311         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
312         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
313
314         pos = 0
315         c.sk_ei = keymat[pos : pos + encr_key_len]
316         pos += encr_key_len
317
318         if integ_key_len:
319             c.sk_ai = keymat[pos : pos + integ_key_len]
320             pos += integ_key_len
321         else:
322             c.salt_ei = keymat[pos : pos + salt_len]
323             pos += salt_len
324
325         c.sk_er = keymat[pos : pos + encr_key_len]
326         pos += encr_key_len
327
328         if integ_key_len:
329             c.sk_ar = keymat[pos : pos + integ_key_len]
330             pos += integ_key_len
331         else:
332             c.salt_er = keymat[pos : pos + salt_len]
333             pos += salt_len
334
335     def calc_prfplus(self, prf, key, seed, length):
336         r = b""
337         t = None
338         x = 1
339         while len(r) < length and x < 255:
340             if t is not None:
341                 s = t
342             else:
343                 s = b""
344             s = s + seed + bytes([x])
345             t = self.calc_prf(prf, key, s)
346             r = r + t
347             x = x + 1
348
349         if x == 255:
350             return None
351         return r
352
353     def calc_prf(self, prf, key, data):
354         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
355         h.update(data)
356         return h.finalize()
357
358     def calc_keys(self):
359         prf = self.ike_prf_alg.mod()
360         # SKEYSEED = prf(Ni | Nr, g^ir)
361         s = self.i_nonce + self.r_nonce
362         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
363
364         # calculate S = Ni | Nr | SPIi SPIr
365         s = s + self.ispi + self.rspi
366
367         prf_key_trunc = self.ike_prf_alg.trunc_len
368         encr_key_len = self.ike_crypto_key_len
369         tr_prf_key_len = self.ike_prf_alg.key_len
370         integ_key_len = self.ike_integ_alg.key_len
371         if integ_key_len == 0:
372             salt_size = 4
373         else:
374             salt_size = 0
375
376         l = (
377             prf_key_trunc
378             + integ_key_len * 2
379             + encr_key_len * 2
380             + tr_prf_key_len * 2
381             + salt_size * 2
382         )
383         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
384
385         pos = 0
386         self.sk_d = keymat[: pos + prf_key_trunc]
387         pos += prf_key_trunc
388
389         self.sk_ai = keymat[pos : pos + integ_key_len]
390         pos += integ_key_len
391         self.sk_ar = keymat[pos : pos + integ_key_len]
392         pos += integ_key_len
393
394         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
395         pos += encr_key_len + salt_size
396         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
397         pos += encr_key_len + salt_size
398
399         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
400         pos += tr_prf_key_len
401         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
402
403     def generate_authmsg(self, prf, packet):
404         if self.is_initiator:
405             id = self.i_id
406             nonce = self.r_nonce
407             key = self.sk_pi
408         else:
409             id = self.r_id
410             nonce = self.i_nonce
411             key = self.sk_pr
412         data = bytes([self.id_type, 0, 0, 0]) + id
413         id_hash = self.calc_prf(prf, key, data)
414         return packet + nonce + id_hash
415
416     def auth_init(self):
417         prf = self.ike_prf_alg.mod()
418         if self.is_initiator:
419             packet = self.init_req_packet
420         else:
421             packet = self.init_resp_packet
422         authmsg = self.generate_authmsg(prf, raw(packet))
423         if self.auth_method == "shared-key":
424             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
425             self.auth_data = self.calc_prf(prf, psk, authmsg)
426         elif self.auth_method == "rsa-sig":
427             self.auth_data = self.priv_key.sign(
428                 authmsg, padding.PKCS1v15(), hashes.SHA1()
429             )
430         else:
431             raise TypeError("unknown auth method type!")
432
433     def encrypt(self, data, aad=None):
434         data = self.ike_crypto_alg.pad(data)
435         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
436
437     @property
438     def peer_authkey(self):
439         if self.is_initiator:
440             return self.sk_ar
441         return self.sk_ai
442
443     @property
444     def my_authkey(self):
445         if self.is_initiator:
446             return self.sk_ai
447         return self.sk_ar
448
449     @property
450     def my_cryptokey(self):
451         if self.is_initiator:
452             return self.sk_ei
453         return self.sk_er
454
455     @property
456     def peer_cryptokey(self):
457         if self.is_initiator:
458             return self.sk_er
459         return self.sk_ei
460
461     def concat(self, alg, key_len):
462         return alg + "-" + str(key_len * 8)
463
464     @property
465     def vpp_ike_cypto_alg(self):
466         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
467
468     @property
469     def vpp_esp_cypto_alg(self):
470         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
471
472     def verify_hmac(self, ikemsg):
473         integ_trunc = self.ike_integ_alg.trunc_len
474         exp_hmac = ikemsg[-integ_trunc:]
475         data = ikemsg[:-integ_trunc]
476         computed_hmac = self.compute_hmac(
477             self.ike_integ_alg.mod(), self.peer_authkey, data
478         )
479         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
480
481     def compute_hmac(self, integ, key, data):
482         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
483         h.update(data)
484         return h.finalize()
485
486     def decrypt(self, data, aad=None, icv=None):
487         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
488
489     def hmac_and_decrypt(self, ike):
490         ep = ike[ikev2.IKEv2_payload_Encrypted]
491         if self.ike_crypto == "AES-GCM-16ICV":
492             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
493             ct = ep.load[:-GCM_ICV_SIZE]
494             tag = ep.load[-GCM_ICV_SIZE:]
495             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
496         else:
497             self.verify_hmac(raw(ike))
498             integ_trunc = self.ike_integ_alg.trunc_len
499
500             # remove ICV and decrypt payload
501             ct = ep.load[:-integ_trunc]
502             plain = self.decrypt(ct)
503         # remove padding
504         pad_len = plain[-1]
505         return plain[: -pad_len - 1]
506
507     def build_ts_addr(self, ts, version):
508         return {
509             "starting_address_v" + version: ts["start_addr"],
510             "ending_address_v" + version: ts["end_addr"],
511         }
512
513     def generate_ts(self, is_ip4):
514         c = self.child_sas[0]
515         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
516         if is_ip4:
517             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
518             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
519             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
520             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
521         else:
522             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
523             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
524             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
525             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
526
527         if self.is_initiator:
528             return ([ts1], [ts2])
529         return ([ts2], [ts1])
530
531     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
532         if crypto not in CRYPTO_ALGOS:
533             raise TypeError("unsupported encryption algo %r" % crypto)
534         self.ike_crypto = crypto
535         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
536         self.ike_crypto_key_len = crypto_key_len
537
538         if integ not in AUTH_ALGOS:
539             raise TypeError("unsupported auth algo %r" % integ)
540         self.ike_integ = None if integ == "NULL" else integ
541         self.ike_integ_alg = AUTH_ALGOS[integ]
542
543         if prf not in PRF_ALGOS:
544             raise TypeError("unsupported prf algo %r" % prf)
545         self.ike_prf = prf
546         self.ike_prf_alg = PRF_ALGOS[prf]
547         self.ike_dh = dh
548         self.ike_group = DH[self.ike_dh]
549
550     def set_esp_props(self, crypto, crypto_key_len, integ):
551         self.esp_crypto_key_len = crypto_key_len
552         if crypto not in CRYPTO_ALGOS:
553             raise TypeError("unsupported encryption algo %r" % crypto)
554         self.esp_crypto = crypto
555         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
556
557         if integ not in AUTH_ALGOS:
558             raise TypeError("unsupported auth algo %r" % integ)
559         self.esp_integ = None if integ == "NULL" else integ
560         self.esp_integ_alg = AUTH_ALGOS[integ]
561
562     def crypto_attr(self, key_len):
563         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
564             return (0x800E << 16 | key_len << 3, 12)
565         else:
566             raise Exception("unsupported attribute type")
567
568     def ike_crypto_attr(self):
569         return self.crypto_attr(self.ike_crypto_key_len)
570
571     def esp_crypto_attr(self):
572         return self.crypto_attr(self.esp_crypto_key_len)
573
574     def compute_nat_sha1(self, ip, port, rspi=None):
575         if rspi is None:
576             rspi = self.rspi
577         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
578         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
579         digest.update(data)
580         return digest.finalize()
581
582
583 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
584 class IkePeer(VppTestCase):
585     """common class for initiator and responder"""
586
587     @classmethod
588     def setUpClass(cls):
589         import scapy.contrib.ikev2 as _ikev2
590
591         globals()["ikev2"] = _ikev2
592         super(IkePeer, cls).setUpClass()
593         cls.create_pg_interfaces(range(2))
594         for i in cls.pg_interfaces:
595             i.admin_up()
596             i.config_ip4()
597             i.resolve_arp()
598             i.config_ip6()
599             i.resolve_ndp()
600
601     @classmethod
602     def tearDownClass(cls):
603         super(IkePeer, cls).tearDownClass()
604
605     def tearDown(self):
606         super(IkePeer, self).tearDown()
607         if self.del_sa_from_responder:
608             self.initiate_del_sa_from_responder()
609         else:
610             self.initiate_del_sa_from_initiator()
611         r = self.vapi.ikev2_sa_dump()
612         self.assertEqual(len(r), 0)
613         sas = self.vapi.ipsec_sa_dump()
614         self.assertEqual(len(sas), 0)
615         self.p.remove_vpp_config()
616         self.assertIsNone(self.p.query_vpp_config())
617
618     def setUp(self):
619         super(IkePeer, self).setUp()
620         self.config_tc()
621         self.p.add_vpp_config()
622         self.assertIsNotNone(self.p.query_vpp_config())
623         if self.sa.is_initiator:
624             self.sa.generate_dh_data()
625         self.vapi.cli("ikev2 set logging level 4")
626         self.vapi.cli("event-lo clear")
627
628     def assert_counter(self, count, name, version="ip4"):
629         node_name = "/err/ikev2-%s/" % version + name
630         self.assertEqual(count, self.statistics.get_err_counter(node_name))
631
632     def create_rekey_request(self, kex=False):
633         sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
634         header = ikev2.IKEv2(
635             init_SPI=self.sa.ispi,
636             resp_SPI=self.sa.rspi,
637             id=self.sa.new_msg_id(),
638             flags="Initiator",
639             exch_type="CREATE_CHILD_SA",
640         )
641
642         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
643         return self.create_packet(
644             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
645         )
646
647     def create_empty_request(self):
648         header = ikev2.IKEv2(
649             init_SPI=self.sa.ispi,
650             resp_SPI=self.sa.rspi,
651             id=self.sa.new_msg_id(),
652             flags="Initiator",
653             exch_type="INFORMATIONAL",
654             next_payload="Encrypted",
655         )
656
657         msg = self.encrypt_ike_msg(header, b"", None)
658         return self.create_packet(
659             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
660         )
661
662     def create_packet(
663         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
664     ):
665         if use_ip6:
666             src_ip = src_if.remote_ip6
667             dst_ip = src_if.local_ip6
668             ip_layer = IPv6
669         else:
670             src_ip = src_if.remote_ip4
671             dst_ip = src_if.local_ip4
672             ip_layer = IP
673         res = (
674             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
675             / ip_layer(src=src_ip, dst=dst_ip)
676             / UDP(sport=sport, dport=dport)
677         )
678         if natt:
679             # insert non ESP marker
680             res = res / Raw(b"\x00" * 4)
681         return res / msg
682
683     def verify_udp(self, udp):
684         self.assertEqual(udp.sport, self.sa.sport)
685         self.assertEqual(udp.dport, self.sa.dport)
686
687     def get_ike_header(self, packet):
688         try:
689             ih = packet[ikev2.IKEv2]
690             ih = self.verify_and_remove_non_esp_marker(ih)
691         except IndexError as e:
692             # this is a workaround for getting IKEv2 layer as both ikev2 and
693             # ipsec register for port 4500
694             esp = packet[ESP]
695             ih = self.verify_and_remove_non_esp_marker(esp)
696         self.assertEqual(ih.version, 0x20)
697         self.assertNotIn("Version", ih.flags)
698         return ih
699
700     def verify_and_remove_non_esp_marker(self, packet):
701         if self.sa.natt:
702             # if we are in nat traversal mode check for non esp marker
703             # and remove it
704             data = raw(packet)
705             self.assertEqual(data[:4], b"\x00" * 4)
706             return ikev2.IKEv2(data[4:])
707         else:
708             return packet
709
710     def encrypt_ike_msg(self, header, plain, first_payload):
711         if self.sa.ike_crypto == "AES-GCM-16ICV":
712             data = self.sa.ike_crypto_alg.pad(raw(plain))
713             plen = (
714                 len(data)
715                 + GCM_IV_SIZE
716                 + GCM_ICV_SIZE
717                 + len(ikev2.IKEv2_payload_Encrypted())
718             )
719             tlen = plen + len(ikev2.IKEv2())
720
721             # prepare aad data
722             sk_p = ikev2.IKEv2_payload_Encrypted(
723                 next_payload=first_payload, length=plen
724             )
725             header.length = tlen
726             res = header / sk_p
727             encr = self.sa.encrypt(raw(plain), raw(res))
728             sk_p = ikev2.IKEv2_payload_Encrypted(
729                 next_payload=first_payload, length=plen, load=encr
730             )
731             res = header / sk_p
732         else:
733             encr = self.sa.encrypt(raw(plain))
734             trunc_len = self.sa.ike_integ_alg.trunc_len
735             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
736             tlen = plen + len(ikev2.IKEv2())
737
738             sk_p = ikev2.IKEv2_payload_Encrypted(
739                 next_payload=first_payload, length=plen, load=encr
740             )
741             header.length = tlen
742             res = header / sk_p
743
744             integ_data = raw(res)
745             hmac_data = self.sa.compute_hmac(
746                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
747             )
748             res = res / Raw(hmac_data[:trunc_len])
749         assert len(res) == tlen
750         return res
751
752     def verify_udp_encap(self, ipsec_sa):
753         e = VppEnum.vl_api_ipsec_sad_flags_t
754         if self.sa.udp_encap or self.sa.natt:
755             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
756         else:
757             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
758
759     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
760         sas = self.vapi.ipsec_sa_dump()
761         if sa_count is None:
762             if is_rekey:
763                 # after rekey there is a short period of time in which old
764                 # inbound SA is still present
765                 sa_count = 3
766             else:
767                 sa_count = 2
768         self.assertEqual(len(sas), sa_count)
769         if self.sa.is_initiator:
770             if is_rekey:
771                 sa0 = sas[0].entry
772                 sa1 = sas[2].entry
773             else:
774                 sa0 = sas[0].entry
775                 sa1 = sas[1].entry
776         else:
777             if is_rekey:
778                 sa0 = sas[2].entry
779                 sa1 = sas[0].entry
780             else:
781                 sa1 = sas[0].entry
782                 sa0 = sas[1].entry
783
784         c = self.sa.child_sas[0]
785
786         self.verify_udp_encap(sa0)
787         self.verify_udp_encap(sa1)
788         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
789         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
790         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
791
792         if self.sa.esp_integ is None:
793             vpp_integ_alg = 0
794         else:
795             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
796         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
797         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
798
799         # verify crypto keys
800         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
801         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
802         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
803         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
804
805         # verify integ keys
806         if vpp_integ_alg:
807             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
808             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
809             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
810             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
811         else:
812             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
813             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
814
815     def verify_keymat(self, api_keys, keys, name):
816         km = getattr(keys, name)
817         api_km = getattr(api_keys, name)
818         api_km_len = getattr(api_keys, name + "_len")
819         self.assertEqual(len(km), api_km_len)
820         self.assertEqual(km, api_km[:api_km_len])
821
822     def verify_id(self, api_id, exp_id):
823         self.assertEqual(api_id.type, IDType.value(exp_id.type))
824         self.assertEqual(api_id.data_len, exp_id.data_len)
825         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
826
827     def verify_ike_sas(self):
828         r = self.vapi.ikev2_sa_dump()
829         self.assertEqual(len(r), 1)
830         sa = r[0].sa
831         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
832         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
833         if self.ip6:
834             if self.sa.is_initiator:
835                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
836                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
837             else:
838                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
839                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
840         else:
841             if self.sa.is_initiator:
842                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
843                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
844             else:
845                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
846                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
847         self.verify_keymat(sa.keys, self.sa, "sk_d")
848         self.verify_keymat(sa.keys, self.sa, "sk_ai")
849         self.verify_keymat(sa.keys, self.sa, "sk_ar")
850         self.verify_keymat(sa.keys, self.sa, "sk_ei")
851         self.verify_keymat(sa.keys, self.sa, "sk_er")
852         self.verify_keymat(sa.keys, self.sa, "sk_pi")
853         self.verify_keymat(sa.keys, self.sa, "sk_pr")
854
855         self.assertEqual(sa.i_id.type, self.sa.id_type)
856         self.assertEqual(sa.r_id.type, self.sa.id_type)
857         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
858         self.assertEqual(sa.r_id.data_len, len(self.idr))
859         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
860         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
861
862         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
863         self.assertEqual(len(r), 1)
864         csa = r[0].child_sa
865         self.assertEqual(csa.sa_index, sa.sa_index)
866         c = self.sa.child_sas[0]
867         if hasattr(c, "sk_ai"):
868             self.verify_keymat(csa.keys, c, "sk_ai")
869             self.verify_keymat(csa.keys, c, "sk_ar")
870         self.verify_keymat(csa.keys, c, "sk_ei")
871         self.verify_keymat(csa.keys, c, "sk_er")
872         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
873         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
874
875         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
876         tsi = tsi[0]
877         tsr = tsr[0]
878         r = self.vapi.ikev2_traffic_selector_dump(
879             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
880         )
881         self.assertEqual(len(r), 1)
882         ts = r[0].ts
883         self.verify_ts(r[0].ts, tsi[0], True)
884
885         r = self.vapi.ikev2_traffic_selector_dump(
886             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
887         )
888         self.assertEqual(len(r), 1)
889         self.verify_ts(r[0].ts, tsr[0], False)
890
891         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
892         self.verify_nonce(n, self.sa.i_nonce)
893         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
894         self.verify_nonce(n, self.sa.r_nonce)
895
896     def verify_nonce(self, api_nonce, nonce):
897         self.assertEqual(api_nonce.data_len, len(nonce))
898         self.assertEqual(api_nonce.nonce, nonce)
899
900     def verify_ts(self, api_ts, ts, is_initiator):
901         if is_initiator:
902             self.assertTrue(api_ts.is_local)
903         else:
904             self.assertFalse(api_ts.is_local)
905
906         if self.p.ts_is_ip4:
907             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
908             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
909         else:
910             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
911             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
912         self.assertEqual(api_ts.start_port, ts.start_port)
913         self.assertEqual(api_ts.end_port, ts.end_port)
914         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
915
916
917 class TemplateInitiator(IkePeer):
918     """initiator test template"""
919
920     def initiate_del_sa_from_initiator(self):
921         ispi = int.from_bytes(self.sa.ispi, "little")
922         self.pg0.enable_capture()
923         self.pg_start()
924         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
925         capture = self.pg0.get_capture(1)
926         ih = self.get_ike_header(capture[0])
927         self.assertNotIn("Response", ih.flags)
928         self.assertIn("Initiator", ih.flags)
929         self.assertEqual(ih.init_SPI, self.sa.ispi)
930         self.assertEqual(ih.resp_SPI, self.sa.rspi)
931         plain = self.sa.hmac_and_decrypt(ih)
932         d = ikev2.IKEv2_payload_Delete(plain)
933         self.assertEqual(d.proto, 1)  # proto=IKEv2
934         header = ikev2.IKEv2(
935             init_SPI=self.sa.ispi,
936             resp_SPI=self.sa.rspi,
937             flags="Response",
938             exch_type="INFORMATIONAL",
939             id=ih.id,
940             next_payload="Encrypted",
941         )
942         resp = self.encrypt_ike_msg(header, b"", None)
943         self.send_and_assert_no_replies(self.pg0, resp)
944
945     def verify_del_sa(self, packet):
946         ih = self.get_ike_header(packet)
947         self.assertEqual(ih.id, self.sa.msg_id)
948         self.assertEqual(ih.exch_type, 37)  # exchange informational
949         self.assertIn("Response", ih.flags)
950         self.assertIn("Initiator", ih.flags)
951         plain = self.sa.hmac_and_decrypt(ih)
952         self.assertEqual(plain, b"")
953
954     def initiate_del_sa_from_responder(self):
955         header = ikev2.IKEv2(
956             init_SPI=self.sa.ispi,
957             resp_SPI=self.sa.rspi,
958             exch_type="INFORMATIONAL",
959             id=self.sa.new_msg_id(),
960         )
961         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
962         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
963         packet = self.create_packet(
964             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
965         )
966         self.pg0.add_stream(packet)
967         self.pg0.enable_capture()
968         self.pg_start()
969         capture = self.pg0.get_capture(1)
970         self.verify_del_sa(capture[0])
971
972     @staticmethod
973     def find_notify_payload(packet, notify_type):
974         n = packet[ikev2.IKEv2_payload_Notify]
975         while n is not None:
976             if n.type == notify_type:
977                 return n
978             n = n.payload
979         return None
980
981     def verify_nat_detection(self, packet):
982         if self.ip6:
983             iph = packet[IPv6]
984         else:
985             iph = packet[IP]
986         udp = packet[UDP]
987
988         # NAT_DETECTION_SOURCE_IP
989         s = self.find_notify_payload(packet, 16388)
990         self.assertIsNotNone(s)
991         src_sha = self.sa.compute_nat_sha1(
992             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
993         )
994         self.assertEqual(s.load, src_sha)
995
996         # NAT_DETECTION_DESTINATION_IP
997         s = self.find_notify_payload(packet, 16389)
998         self.assertIsNotNone(s)
999         dst_sha = self.sa.compute_nat_sha1(
1000             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
1001         )
1002         self.assertEqual(s.load, dst_sha)
1003
1004     def verify_sa_init_request(self, packet):
1005         udp = packet[UDP]
1006         self.sa.dport = udp.sport
1007         ih = packet[ikev2.IKEv2]
1008         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1009         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1010         self.sa.ispi = ih.init_SPI
1011         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1012         self.assertIn("Initiator", ih.flags)
1013         self.assertNotIn("Response", ih.flags)
1014         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1015         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1016
1017         prop = packet[ikev2.IKEv2_payload_Proposal]
1018         self.assertEqual(prop.proto, 1)  # proto = ikev2
1019         self.assertEqual(prop.proposal, 1)
1020         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1021         self.assertEqual(
1022             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1023         )
1024         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1025         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1026         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1027         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1028
1029         self.verify_nat_detection(packet)
1030         self.sa.set_ike_props(
1031             crypto="AES-GCM-16ICV",
1032             crypto_key_len=32,
1033             integ="NULL",
1034             prf="PRF_HMAC_SHA2_256",
1035             dh="3072MODPgr",
1036         )
1037         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1038         self.sa.generate_dh_data()
1039         self.sa.complete_dh_data()
1040         self.sa.calc_keys()
1041
1042     def update_esp_transforms(self, trans, sa):
1043         while trans:
1044             if trans.transform_type == 1:  # ecryption
1045                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1046             elif trans.transform_type == 3:  # integrity
1047                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1048             trans = trans.payload
1049
1050     def verify_sa_auth_req(self, packet):
1051         udp = packet[UDP]
1052         self.sa.dport = udp.sport
1053         ih = self.get_ike_header(packet)
1054         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1055         self.assertEqual(ih.init_SPI, self.sa.ispi)
1056         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1057         self.assertIn("Initiator", ih.flags)
1058         self.assertNotIn("Response", ih.flags)
1059
1060         udp = packet[UDP]
1061         self.verify_udp(udp)
1062         self.assertEqual(ih.id, self.sa.msg_id + 1)
1063         self.sa.msg_id += 1
1064         plain = self.sa.hmac_and_decrypt(ih)
1065         idi = ikev2.IKEv2_payload_IDi(plain)
1066         self.assertEqual(idi.load, self.sa.i_id)
1067         if self.no_idr_auth:
1068             self.assertEqual(idi.next_payload, 39)  # AUTH
1069         else:
1070             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1071             self.assertEqual(idr.load, self.sa.r_id)
1072         prop = idi[ikev2.IKEv2_payload_Proposal]
1073         c = self.sa.child_sas[0]
1074         c.ispi = prop.SPI
1075         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1076
1077     def send_init_response(self):
1078         tr_attr = self.sa.ike_crypto_attr()
1079         trans = (
1080             ikev2.IKEv2_payload_Transform(
1081                 transform_type="Encryption",
1082                 transform_id=self.sa.ike_crypto,
1083                 length=tr_attr[1],
1084                 key_length=tr_attr[0],
1085             )
1086             / ikev2.IKEv2_payload_Transform(
1087                 transform_type="Integrity", transform_id=self.sa.ike_integ
1088             )
1089             / ikev2.IKEv2_payload_Transform(
1090                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1091             )
1092             / ikev2.IKEv2_payload_Transform(
1093                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1094             )
1095         )
1096         props = ikev2.IKEv2_payload_Proposal(
1097             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1098         )
1099
1100         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1101         if self.sa.natt:
1102             dst_address = b"\x0a\x0a\x0a\x0a"
1103         else:
1104             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1105         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1106         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1107
1108         self.sa.init_resp_packet = (
1109             ikev2.IKEv2(
1110                 init_SPI=self.sa.ispi,
1111                 resp_SPI=self.sa.rspi,
1112                 exch_type="IKE_SA_INIT",
1113                 flags="Response",
1114             )
1115             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1116             / ikev2.IKEv2_payload_KE(
1117                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1118             )
1119             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1120             / ikev2.IKEv2_payload_Notify(
1121                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1122             )
1123             / ikev2.IKEv2_payload_Notify(
1124                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1125             )
1126         )
1127
1128         ike_msg = self.create_packet(
1129             self.pg0,
1130             self.sa.init_resp_packet,
1131             self.sa.sport,
1132             self.sa.dport,
1133             False,
1134             self.ip6,
1135         )
1136         self.pg_send(self.pg0, ike_msg)
1137         capture = self.pg0.get_capture(1)
1138         self.verify_sa_auth_req(capture[0])
1139
1140     def initiate_sa_init(self):
1141         self.pg0.enable_capture()
1142         self.pg_start()
1143         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1144
1145         capture = self.pg0.get_capture(1)
1146         self.verify_sa_init_request(capture[0])
1147         self.send_init_response()
1148
1149     def send_auth_response(self):
1150         tr_attr = self.sa.esp_crypto_attr()
1151         trans = (
1152             ikev2.IKEv2_payload_Transform(
1153                 transform_type="Encryption",
1154                 transform_id=self.sa.esp_crypto,
1155                 length=tr_attr[1],
1156                 key_length=tr_attr[0],
1157             )
1158             / ikev2.IKEv2_payload_Transform(
1159                 transform_type="Integrity", transform_id=self.sa.esp_integ
1160             )
1161             / ikev2.IKEv2_payload_Transform(
1162                 transform_type="Extended Sequence Number", transform_id="No ESN"
1163             )
1164             / ikev2.IKEv2_payload_Transform(
1165                 transform_type="Extended Sequence Number", transform_id="ESN"
1166             )
1167         )
1168
1169         c = self.sa.child_sas[0]
1170         props = ikev2.IKEv2_payload_Proposal(
1171             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1172         )
1173
1174         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1175         plain = (
1176             ikev2.IKEv2_payload_IDi(
1177                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1178             )
1179             / ikev2.IKEv2_payload_IDr(
1180                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1181             )
1182             / ikev2.IKEv2_payload_AUTH(
1183                 next_payload="SA",
1184                 auth_type=AuthMethod.value(self.sa.auth_method),
1185                 load=self.sa.auth_data,
1186             )
1187             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1188             / ikev2.IKEv2_payload_TSi(
1189                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1190             )
1191             / ikev2.IKEv2_payload_TSr(
1192                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1193             )
1194             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1195         )
1196
1197         header = ikev2.IKEv2(
1198             init_SPI=self.sa.ispi,
1199             resp_SPI=self.sa.rspi,
1200             id=self.sa.new_msg_id(),
1201             flags="Response",
1202             exch_type="IKE_AUTH",
1203         )
1204
1205         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1206         packet = self.create_packet(
1207             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1208         )
1209         self.pg_send(self.pg0, packet)
1210
1211     def test_initiator(self):
1212         self.initiate_sa_init()
1213         self.sa.auth_init()
1214         self.sa.calc_child_keys()
1215         self.send_auth_response()
1216         self.verify_ike_sas()
1217
1218
1219 class TemplateResponder(IkePeer):
1220     """responder test template"""
1221
1222     def initiate_del_sa_from_responder(self):
1223         self.pg0.enable_capture()
1224         self.pg_start()
1225         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1226         capture = self.pg0.get_capture(1)
1227         ih = self.get_ike_header(capture[0])
1228         self.assertNotIn("Response", ih.flags)
1229         self.assertNotIn("Initiator", ih.flags)
1230         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1231         plain = self.sa.hmac_and_decrypt(ih)
1232         d = ikev2.IKEv2_payload_Delete(plain)
1233         self.assertEqual(d.proto, 1)  # proto=IKEv2
1234         self.assertEqual(ih.init_SPI, self.sa.ispi)
1235         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1236         header = ikev2.IKEv2(
1237             init_SPI=self.sa.ispi,
1238             resp_SPI=self.sa.rspi,
1239             flags="Initiator+Response",
1240             exch_type="INFORMATIONAL",
1241             id=ih.id,
1242             next_payload="Encrypted",
1243         )
1244         resp = self.encrypt_ike_msg(header, b"", None)
1245         self.send_and_assert_no_replies(self.pg0, resp)
1246
1247     def verify_del_sa(self, packet):
1248         ih = self.get_ike_header(packet)
1249         self.assertEqual(ih.id, self.sa.msg_id)
1250         self.assertEqual(ih.exch_type, 37)  # exchange informational
1251         self.assertIn("Response", ih.flags)
1252         self.assertNotIn("Initiator", ih.flags)
1253         self.assertEqual(ih.next_payload, 46)  # Encrypted
1254         self.assertEqual(ih.init_SPI, self.sa.ispi)
1255         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1256         plain = self.sa.hmac_and_decrypt(ih)
1257         self.assertEqual(plain, b"")
1258
1259     def initiate_del_sa_from_initiator(self):
1260         header = ikev2.IKEv2(
1261             init_SPI=self.sa.ispi,
1262             resp_SPI=self.sa.rspi,
1263             flags="Initiator",
1264             exch_type="INFORMATIONAL",
1265             id=self.sa.new_msg_id(),
1266         )
1267         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1268         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1269         packet = self.create_packet(
1270             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1271         )
1272         self.pg0.add_stream(packet)
1273         self.pg0.enable_capture()
1274         self.pg_start()
1275         capture = self.pg0.get_capture(1)
1276         self.verify_del_sa(capture[0])
1277
1278     def send_sa_init_req(self):
1279         tr_attr = self.sa.ike_crypto_attr()
1280         trans = (
1281             ikev2.IKEv2_payload_Transform(
1282                 transform_type="Encryption",
1283                 transform_id=self.sa.ike_crypto,
1284                 length=tr_attr[1],
1285                 key_length=tr_attr[0],
1286             )
1287             / ikev2.IKEv2_payload_Transform(
1288                 transform_type="Integrity", transform_id=self.sa.ike_integ
1289             )
1290             / ikev2.IKEv2_payload_Transform(
1291                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1292             )
1293             / ikev2.IKEv2_payload_Transform(
1294                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1295             )
1296         )
1297
1298         props = ikev2.IKEv2_payload_Proposal(
1299             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1300         )
1301
1302         next_payload = None if self.ip6 else "Notify"
1303
1304         self.sa.init_req_packet = (
1305             ikev2.IKEv2(
1306                 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1307             )
1308             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1309             / ikev2.IKEv2_payload_KE(
1310                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1311             )
1312             / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1313         )
1314
1315         if not self.ip6:
1316             if self.sa.i_natt:
1317                 src_address = b"\x0a\x0a\x0a\x01"
1318             else:
1319                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1320
1321             if self.sa.r_natt:
1322                 dst_address = b"\x0a\x0a\x0a\x0a"
1323             else:
1324                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1325
1326             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1327             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1328             nat_src_detection = ikev2.IKEv2_payload_Notify(
1329                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1330             )
1331             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1332                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1333             )
1334             self.sa.init_req_packet = (
1335                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1336             )
1337
1338         ike_msg = self.create_packet(
1339             self.pg0,
1340             self.sa.init_req_packet,
1341             self.sa.sport,
1342             self.sa.dport,
1343             self.sa.natt,
1344             self.ip6,
1345         )
1346         self.pg0.add_stream(ike_msg)
1347         self.pg0.enable_capture()
1348         self.pg_start()
1349         capture = self.pg0.get_capture(1)
1350         self.verify_sa_init(capture[0])
1351
1352     def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
1353         tr_attr = self.sa.esp_crypto_attr()
1354         last_payload = last_payload or "Notify"
1355         trans_nb = 4
1356         trans = (
1357             ikev2.IKEv2_payload_Transform(
1358                 transform_type="Encryption",
1359                 transform_id=self.sa.esp_crypto,
1360                 length=tr_attr[1],
1361                 key_length=tr_attr[0],
1362             )
1363             / ikev2.IKEv2_payload_Transform(
1364                 transform_type="Integrity", transform_id=self.sa.esp_integ
1365             )
1366             / ikev2.IKEv2_payload_Transform(
1367                 transform_type="Extended Sequence Number", transform_id="No ESN"
1368             )
1369             / ikev2.IKEv2_payload_Transform(
1370                 transform_type="Extended Sequence Number", transform_id="ESN"
1371             )
1372         )
1373
1374         if kex:
1375             trans_nb += 1
1376             trans /= ikev2.IKEv2_payload_Transform(
1377                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1378             )
1379
1380         c = self.sa.child_sas[0]
1381         props = ikev2.IKEv2_payload_Proposal(
1382             proposal=1,
1383             proto="ESP",
1384             SPIsize=4,
1385             SPI=c.ispi,
1386             trans_nb=trans_nb,
1387             trans=trans,
1388         )
1389
1390         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1391         plain = (
1392             ikev2.IKEv2_payload_AUTH(
1393                 next_payload="SA",
1394                 auth_type=AuthMethod.value(self.sa.auth_method),
1395                 load=self.sa.auth_data,
1396             )
1397             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1398             / ikev2.IKEv2_payload_TSi(
1399                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1400             )
1401             / ikev2.IKEv2_payload_TSr(
1402                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1403             )
1404         )
1405
1406         if is_rekey:
1407             first_payload = "Nonce"
1408             if kex:
1409                 head = ikev2.IKEv2_payload_Nonce(
1410                     load=self.sa.i_nonce, next_payload="KE"
1411                 ) / ikev2.IKEv2_payload_KE(
1412                     group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
1413                 )
1414             else:
1415                 head = ikev2.IKEv2_payload_Nonce(
1416                     load=self.sa.i_nonce, next_payload="SA"
1417                 )
1418             plain = (
1419                 head
1420                 / plain
1421                 / ikev2.IKEv2_payload_Notify(
1422                     type="REKEY_SA",
1423                     proto="ESP",
1424                     SPI=c.ispi,
1425                     length=8 + len(c.ispi),
1426                     next_payload="Notify",
1427                 )
1428                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1429             )
1430         else:
1431             first_payload = "IDi"
1432             if self.no_idr_auth:
1433                 ids = ikev2.IKEv2_payload_IDi(
1434                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1435                 )
1436             else:
1437                 ids = ikev2.IKEv2_payload_IDi(
1438                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1439                 ) / ikev2.IKEv2_payload_IDr(
1440                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1441                 )
1442             plain = ids / plain
1443         return plain, first_payload
1444
1445     def send_sa_auth(self):
1446         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1447         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1448         header = ikev2.IKEv2(
1449             init_SPI=self.sa.ispi,
1450             resp_SPI=self.sa.rspi,
1451             id=self.sa.new_msg_id(),
1452             flags="Initiator",
1453             exch_type="IKE_AUTH",
1454         )
1455
1456         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1457         packet = self.create_packet(
1458             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1459         )
1460         self.pg0.add_stream(packet)
1461         self.pg0.enable_capture()
1462         self.pg_start()
1463         capture = self.pg0.get_capture(1)
1464         self.verify_sa_auth_resp(capture[0])
1465
1466     def verify_sa_init(self, packet):
1467         ih = self.get_ike_header(packet)
1468
1469         self.assertEqual(ih.id, self.sa.msg_id)
1470         self.assertEqual(ih.exch_type, 34)
1471         self.assertIn("Response", ih.flags)
1472         self.assertEqual(ih.init_SPI, self.sa.ispi)
1473         self.assertNotEqual(ih.resp_SPI, 0)
1474         self.sa.rspi = ih.resp_SPI
1475         try:
1476             sa = ih[ikev2.IKEv2_payload_SA]
1477             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1478             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1479         except IndexError as e:
1480             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1481             self.logger.error(ih.show())
1482             raise
1483         self.sa.complete_dh_data()
1484         self.sa.calc_keys()
1485         self.sa.auth_init()
1486
1487     def verify_sa_auth_resp(self, packet):
1488         ike = self.get_ike_header(packet)
1489         udp = packet[UDP]
1490         self.verify_udp(udp)
1491         self.assertEqual(ike.id, self.sa.msg_id)
1492         plain = self.sa.hmac_and_decrypt(ike)
1493         idr = ikev2.IKEv2_payload_IDr(plain)
1494         prop = idr[ikev2.IKEv2_payload_Proposal]
1495         self.assertEqual(prop.SPIsize, 4)
1496         self.sa.child_sas[0].rspi = prop.SPI
1497         self.sa.calc_child_keys()
1498
1499     IKE_NODE_SUFFIX = "ip4"
1500
1501     def verify_counters(self):
1502         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1503         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1504         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1505
1506         r = self.vapi.ikev2_sa_dump()
1507         s = r[0].sa.stats
1508         self.assertEqual(1, s.n_sa_auth_req)
1509         self.assertEqual(1, s.n_sa_init_req)
1510
1511     def test_responder(self):
1512         self.send_sa_init_req()
1513         self.send_sa_auth()
1514         self.verify_ipsec_sas()
1515         self.verify_ike_sas()
1516         self.verify_counters()
1517
1518
1519 class Ikev2Params(object):
1520     def config_params(self, params={}):
1521         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1522         ei = VppEnum.vl_api_ipsec_integ_alg_t
1523         self.vpp_enums = {
1524             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1525             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1526             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1527             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1528             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1529             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1530             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1531             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1532             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1533             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1534         }
1535
1536         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1537         if dpd_disabled:
1538             self.vapi.cli("ikev2 dpd disable")
1539         self.del_sa_from_responder = (
1540             False
1541             if "del_sa_from_responder" not in params
1542             else params["del_sa_from_responder"]
1543         )
1544         i_natt = False if "i_natt" not in params else params["i_natt"]
1545         r_natt = False if "r_natt" not in params else params["r_natt"]
1546         self.p = Profile(self, "pr1")
1547         self.ip6 = False if "ip6" not in params else params["ip6"]
1548
1549         if "auth" in params and params["auth"] == "rsa-sig":
1550             auth_method = "rsa-sig"
1551             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1552             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1553
1554             client_file = work_dir + params["client-cert"]
1555             server_pem = open(work_dir + params["server-cert"]).read()
1556             client_priv = open(work_dir + params["client-key"]).read()
1557             client_priv = load_pem_private_key(
1558                 str.encode(client_priv), None, default_backend()
1559             )
1560             self.peer_cert = x509.load_pem_x509_certificate(
1561                 str.encode(server_pem), default_backend()
1562             )
1563             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1564             auth_data = None
1565         else:
1566             auth_data = b"$3cr3tpa$$w0rd"
1567             self.p.add_auth(method="shared-key", data=auth_data)
1568             auth_method = "shared-key"
1569             client_priv = None
1570
1571         is_init = True if "is_initiator" not in params else params["is_initiator"]
1572         self.no_idr_auth = params.get("no_idr_in_auth", False)
1573
1574         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1575         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1576         r_id = self.idr = idr["data"]
1577         i_id = self.idi = idi["data"]
1578         if is_init:
1579             # scapy is initiator, VPP is responder
1580             self.p.add_local_id(**idr)
1581             self.p.add_remote_id(**idi)
1582             if self.no_idr_auth:
1583                 r_id = None
1584         else:
1585             # VPP is initiator, scapy is responder
1586             self.p.add_local_id(**idi)
1587             if not self.no_idr_auth:
1588                 self.p.add_remote_id(**idr)
1589
1590         loc_ts = (
1591             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1592             if "loc_ts" not in params
1593             else params["loc_ts"]
1594         )
1595         rem_ts = (
1596             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1597             if "rem_ts" not in params
1598             else params["rem_ts"]
1599         )
1600         self.p.add_local_ts(**loc_ts)
1601         self.p.add_remote_ts(**rem_ts)
1602         if "responder" in params:
1603             self.p.add_responder(params["responder"])
1604         if "ike_transforms" in params:
1605             self.p.add_ike_transforms(params["ike_transforms"])
1606         if "esp_transforms" in params:
1607             self.p.add_esp_transforms(params["esp_transforms"])
1608
1609         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1610         if udp_encap:
1611             self.p.set_udp_encap(True)
1612
1613         if "responder_hostname" in params:
1614             hn = params["responder_hostname"]
1615             self.p.add_responder_hostname(hn)
1616
1617             # configure static dns record
1618             self.vapi.dns_name_server_add_del(
1619                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1620             )
1621             self.vapi.dns_enable_disable(enable=1)
1622
1623             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1624             self.vapi.cli(cmd)
1625
1626         self.sa = IKEv2SA(
1627             self,
1628             i_id=i_id,
1629             r_id=r_id,
1630             is_initiator=is_init,
1631             id_type=self.p.local_id["id_type"],
1632             i_natt=i_natt,
1633             r_natt=r_natt,
1634             priv_key=client_priv,
1635             auth_method=auth_method,
1636             nonce=params.get("nonce"),
1637             auth_data=auth_data,
1638             udp_encap=udp_encap,
1639             local_ts=self.p.remote_ts,
1640             remote_ts=self.p.local_ts,
1641         )
1642
1643         if is_init:
1644             ike_crypto = (
1645                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1646             )
1647             ike_integ = (
1648                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1649             )
1650             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1651
1652             esp_crypto = (
1653                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1654             )
1655             esp_integ = (
1656                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1657             )
1658
1659             self.sa.set_ike_props(
1660                 crypto=ike_crypto[0],
1661                 crypto_key_len=ike_crypto[1],
1662                 integ=ike_integ,
1663                 prf="PRF_HMAC_SHA2_256",
1664                 dh=ike_dh,
1665             )
1666             self.sa.set_esp_props(
1667                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1668             )
1669
1670
1671 @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
1672 class TestApi(VppTestCase):
1673     """Test IKEV2 API"""
1674
1675     @classmethod
1676     def setUpClass(cls):
1677         super(TestApi, cls).setUpClass()
1678
1679     @classmethod
1680     def tearDownClass(cls):
1681         super(TestApi, cls).tearDownClass()
1682
1683     def tearDown(self):
1684         super(TestApi, self).tearDown()
1685         self.p1.remove_vpp_config()
1686         self.p2.remove_vpp_config()
1687         r = self.vapi.ikev2_profile_dump()
1688         self.assertEqual(len(r), 0)
1689
1690     def configure_profile(self, cfg):
1691         p = Profile(self, cfg["name"])
1692         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1693         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1694         p.add_local_ts(**cfg["loc_ts"])
1695         p.add_remote_ts(**cfg["rem_ts"])
1696         p.add_responder(cfg["responder"])
1697         p.add_ike_transforms(cfg["ike_ts"])
1698         p.add_esp_transforms(cfg["esp_ts"])
1699         p.add_auth(**cfg["auth"])
1700         p.set_udp_encap(cfg["udp_encap"])
1701         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1702         if "lifetime_data" in cfg:
1703             p.set_lifetime_data(cfg["lifetime_data"])
1704         if "tun_itf" in cfg:
1705             p.set_tunnel_interface(cfg["tun_itf"])
1706         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1707             p.disable_natt()
1708         p.add_vpp_config()
1709         return p
1710
1711     def test_profile_api(self):
1712         """test profile dump API"""
1713         loc_ts4 = {
1714             "proto": 8,
1715             "start_port": 1,
1716             "end_port": 19,
1717             "start_addr": "3.3.3.2",
1718             "end_addr": "3.3.3.3",
1719         }
1720         rem_ts4 = {
1721             "proto": 9,
1722             "start_port": 10,
1723             "end_port": 119,
1724             "start_addr": "4.5.76.80",
1725             "end_addr": "2.3.4.6",
1726         }
1727
1728         loc_ts6 = {
1729             "proto": 8,
1730             "start_port": 1,
1731             "end_port": 19,
1732             "start_addr": "ab::1",
1733             "end_addr": "ab::4",
1734         }
1735         rem_ts6 = {
1736             "proto": 9,
1737             "start_port": 10,
1738             "end_port": 119,
1739             "start_addr": "cd::12",
1740             "end_addr": "cd::13",
1741         }
1742
1743         conf = {
1744             "p1": {
1745                 "name": "p1",
1746                 "natt_disabled": True,
1747                 "loc_id": ("fqdn", b"vpp.home"),
1748                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1749                 "loc_ts": loc_ts4,
1750                 "rem_ts": rem_ts4,
1751                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1752                 "ike_ts": {
1753                     "crypto_alg": 20,
1754                     "crypto_key_size": 32,
1755                     "integ_alg": 0,
1756                     "dh_group": 1,
1757                 },
1758                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1759                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1760                 "udp_encap": True,
1761                 "ipsec_over_udp_port": 4501,
1762                 "lifetime_data": {
1763                     "lifetime": 123,
1764                     "lifetime_maxdata": 20192,
1765                     "lifetime_jitter": 9,
1766                     "handover": 132,
1767                 },
1768             },
1769             "p2": {
1770                 "name": "p2",
1771                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1772                 "rem_id": ("ip6-addr", b"abcd::1"),
1773                 "loc_ts": loc_ts6,
1774                 "rem_ts": rem_ts6,
1775                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1776                 "ike_ts": {
1777                     "crypto_alg": 12,
1778                     "crypto_key_size": 16,
1779                     "integ_alg": 3,
1780                     "dh_group": 3,
1781                 },
1782                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1783                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1784                 "udp_encap": False,
1785                 "ipsec_over_udp_port": 4600,
1786                 "tun_itf": 0,
1787             },
1788         }
1789         self.p1 = self.configure_profile(conf["p1"])
1790         self.p2 = self.configure_profile(conf["p2"])
1791
1792         r = self.vapi.ikev2_profile_dump()
1793         self.assertEqual(len(r), 2)
1794         self.verify_profile(r[0].profile, conf["p1"])
1795         self.verify_profile(r[1].profile, conf["p2"])
1796
1797     def verify_id(self, api_id, cfg_id):
1798         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1799         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1800
1801     def verify_ts(self, api_ts, cfg_ts):
1802         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1803         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1804         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1805         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1806         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1807
1808     def verify_responder(self, api_r, cfg_r):
1809         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1810         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1811
1812     def verify_transforms(self, api_ts, cfg_ts):
1813         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1814         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1815         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1816
1817     def verify_ike_transforms(self, api_ts, cfg_ts):
1818         self.verify_transforms(api_ts, cfg_ts)
1819         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1820
1821     def verify_esp_transforms(self, api_ts, cfg_ts):
1822         self.verify_transforms(api_ts, cfg_ts)
1823
1824     def verify_auth(self, api_auth, cfg_auth):
1825         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1826         self.assertEqual(api_auth.data, cfg_auth["data"])
1827         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1828
1829     def verify_lifetime_data(self, p, ld):
1830         self.assertEqual(p.lifetime, ld["lifetime"])
1831         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1832         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1833         self.assertEqual(p.handover, ld["handover"])
1834
1835     def verify_profile(self, ap, cp):
1836         self.assertEqual(ap.name, cp["name"])
1837         self.assertEqual(ap.udp_encap, cp["udp_encap"])
1838         self.verify_id(ap.loc_id, cp["loc_id"])
1839         self.verify_id(ap.rem_id, cp["rem_id"])
1840         self.verify_ts(ap.loc_ts, cp["loc_ts"])
1841         self.verify_ts(ap.rem_ts, cp["rem_ts"])
1842         self.verify_responder(ap.responder, cp["responder"])
1843         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1844         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1845         self.verify_auth(ap.auth, cp["auth"])
1846         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1847         self.assertTrue(natt_dis == ap.natt_disabled)
1848
1849         if "lifetime_data" in cp:
1850             self.verify_lifetime_data(ap, cp["lifetime_data"])
1851         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1852         if "tun_itf" in cp:
1853             self.assertEqual(ap.tun_itf, cp["tun_itf"])
1854         else:
1855             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1856
1857
1858 @tag_fixme_vpp_workers
1859 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1860     """test responder - responder behind NAT"""
1861
1862     IKE_NODE_SUFFIX = "ip4-natt"
1863
1864     def config_tc(self):
1865         self.config_params({"r_natt": True})
1866
1867
1868 @tag_fixme_vpp_workers
1869 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1870     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1871
1872     def config_tc(self):
1873         self.config_params(
1874             {
1875                 "i_natt": True,
1876                 "is_initiator": False,  # seen from test case perspective
1877                 # thus vpp is initiator
1878                 "responder": {
1879                     "sw_if_index": self.pg0.sw_if_index,
1880                     "addr": self.pg0.remote_ip4,
1881                 },
1882                 "ike-crypto": ("AES-GCM-16ICV", 32),
1883                 "ike-integ": "NULL",
1884                 "ike-dh": "3072MODPgr",
1885                 "ike_transforms": {
1886                     "crypto_alg": 20,  # "aes-gcm-16"
1887                     "crypto_key_size": 256,
1888                     "dh_group": 15,  # "modp-3072"
1889                 },
1890                 "esp_transforms": {
1891                     "crypto_alg": 12,  # "aes-cbc"
1892                     "crypto_key_size": 256,
1893                     # "hmac-sha2-256-128"
1894                     "integ_alg": 12,
1895                 },
1896             }
1897         )
1898
1899
1900 @tag_fixme_vpp_workers
1901 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1902     """test ikev2 initiator - pre shared key auth"""
1903
1904     def config_tc(self):
1905         self.config_params(
1906             {
1907                 "is_initiator": False,  # seen from test case perspective
1908                 # thus vpp is initiator
1909                 "ike-crypto": ("AES-GCM-16ICV", 32),
1910                 "ike-integ": "NULL",
1911                 "ike-dh": "3072MODPgr",
1912                 "ike_transforms": {
1913                     "crypto_alg": 20,  # "aes-gcm-16"
1914                     "crypto_key_size": 256,
1915                     "dh_group": 15,  # "modp-3072"
1916                 },
1917                 "esp_transforms": {
1918                     "crypto_alg": 12,  # "aes-cbc"
1919                     "crypto_key_size": 256,
1920                     # "hmac-sha2-256-128"
1921                     "integ_alg": 12,
1922                 },
1923                 "responder_hostname": {
1924                     "hostname": "vpp.responder.org",
1925                     "sw_if_index": self.pg0.sw_if_index,
1926                 },
1927             }
1928         )
1929
1930
1931 @tag_fixme_vpp_workers
1932 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1933     """test initiator - request window size (1)"""
1934
1935     def rekey_respond(self, req, update_child_sa_data):
1936         ih = self.get_ike_header(req)
1937         plain = self.sa.hmac_and_decrypt(ih)
1938         sa = ikev2.IKEv2_payload_SA(plain)
1939         if update_child_sa_data:
1940             prop = sa[ikev2.IKEv2_payload_Proposal]
1941             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1942             self.sa.r_nonce = self.sa.i_nonce
1943             self.sa.child_sas[0].ispi = prop.SPI
1944             self.sa.child_sas[0].rspi = prop.SPI
1945             self.sa.calc_child_keys()
1946
1947         header = ikev2.IKEv2(
1948             init_SPI=self.sa.ispi,
1949             resp_SPI=self.sa.rspi,
1950             flags="Response",
1951             exch_type=36,
1952             id=ih.id,
1953             next_payload="Encrypted",
1954         )
1955         resp = self.encrypt_ike_msg(header, sa, "SA")
1956         packet = self.create_packet(
1957             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1958         )
1959         self.send_and_assert_no_replies(self.pg0, packet)
1960
1961     def test_initiator(self):
1962         super(TestInitiatorRequestWindowSize, self).test_initiator()
1963         self.pg0.enable_capture()
1964         self.pg_start()
1965         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1966         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1967         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1968         capture = self.pg0.get_capture(2)
1969
1970         # reply in reverse order
1971         self.rekey_respond(capture[1], True)
1972         self.rekey_respond(capture[0], False)
1973
1974         # verify that only the second request was accepted
1975         self.verify_ike_sas()
1976         self.verify_ipsec_sas(is_rekey=True)
1977
1978
1979 @tag_fixme_vpp_workers
1980 class TestInitiatorRekey(TestInitiatorPsk):
1981     """test ikev2 initiator - rekey"""
1982
1983     def rekey_from_initiator(self):
1984         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1985         self.pg0.enable_capture()
1986         self.pg_start()
1987         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1988         capture = self.pg0.get_capture(1)
1989         ih = self.get_ike_header(capture[0])
1990         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1991         self.assertNotIn("Response", ih.flags)
1992         self.assertIn("Initiator", ih.flags)
1993         plain = self.sa.hmac_and_decrypt(ih)
1994         sa = ikev2.IKEv2_payload_SA(plain)
1995         prop = sa[ikev2.IKEv2_payload_Proposal]
1996         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1997         self.sa.r_nonce = self.sa.i_nonce
1998         # update new responder SPI
1999         self.sa.child_sas[0].ispi = prop.SPI
2000         self.sa.child_sas[0].rspi = prop.SPI
2001         self.sa.calc_child_keys()
2002         header = ikev2.IKEv2(
2003             init_SPI=self.sa.ispi,
2004             resp_SPI=self.sa.rspi,
2005             flags="Response",
2006             exch_type=36,
2007             id=ih.id,
2008             next_payload="Encrypted",
2009         )
2010         resp = self.encrypt_ike_msg(header, sa, "SA")
2011         packet = self.create_packet(
2012             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
2013         )
2014         self.send_and_assert_no_replies(self.pg0, packet)
2015
2016     def test_initiator(self):
2017         super(TestInitiatorRekey, self).test_initiator()
2018         self.rekey_from_initiator()
2019         self.verify_ike_sas()
2020         self.verify_ipsec_sas(is_rekey=True)
2021
2022
2023 @tag_fixme_vpp_workers
2024 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
2025     """test ikev2 initiator - delete IKE SA from responder"""
2026
2027     def config_tc(self):
2028         self.config_params(
2029             {
2030                 "del_sa_from_responder": True,
2031                 "is_initiator": False,  # seen from test case perspective
2032                 # thus vpp is initiator
2033                 "responder": {
2034                     "sw_if_index": self.pg0.sw_if_index,
2035                     "addr": self.pg0.remote_ip4,
2036                 },
2037                 "ike-crypto": ("AES-GCM-16ICV", 32),
2038                 "ike-integ": "NULL",
2039                 "ike-dh": "3072MODPgr",
2040                 "ike_transforms": {
2041                     "crypto_alg": 20,  # "aes-gcm-16"
2042                     "crypto_key_size": 256,
2043                     "dh_group": 15,  # "modp-3072"
2044                 },
2045                 "esp_transforms": {
2046                     "crypto_alg": 12,  # "aes-cbc"
2047                     "crypto_key_size": 256,
2048                     # "hmac-sha2-256-128"
2049                     "integ_alg": 12,
2050                 },
2051                 "no_idr_in_auth": True,
2052             }
2053         )
2054
2055
2056 @tag_fixme_vpp_workers
2057 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2058     """test ikev2 responder - initiator behind NAT"""
2059
2060     IKE_NODE_SUFFIX = "ip4-natt"
2061
2062     def config_tc(self):
2063         self.config_params({"i_natt": True})
2064
2065
2066 @tag_fixme_vpp_workers
2067 class TestResponderPsk(TemplateResponder, Ikev2Params):
2068     """test ikev2 responder - pre shared key auth"""
2069
2070     def config_tc(self):
2071         self.config_params()
2072
2073
2074 @tag_fixme_vpp_workers
2075 class TestResponderDpd(TestResponderPsk):
2076     """
2077     Dead peer detection test
2078     """
2079
2080     def config_tc(self):
2081         self.config_params({"dpd_disabled": False})
2082
2083     def tearDown(self):
2084         pass
2085
2086     def test_responder(self):
2087         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2088         super(TestResponderDpd, self).test_responder()
2089         self.pg0.enable_capture()
2090         self.pg_start()
2091         # capture empty request but don't reply
2092         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2093         ih = self.get_ike_header(capture[0])
2094         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2095         plain = self.sa.hmac_and_decrypt(ih)
2096         self.assertEqual(plain, b"")
2097         # wait for SA expiration
2098         time.sleep(3)
2099         ike_sas = self.vapi.ikev2_sa_dump()
2100         self.assertEqual(len(ike_sas), 0)
2101         ipsec_sas = self.vapi.ipsec_sa_dump()
2102         self.assertEqual(len(ipsec_sas), 0)
2103
2104
2105 @tag_fixme_vpp_workers
2106 class TestResponderRekey(TestResponderPsk):
2107     """test ikev2 responder - rekey"""
2108
2109     WITH_KEX = False
2110
2111     def send_rekey_from_initiator(self):
2112         if self.WITH_KEX:
2113             self.sa.generate_dh_data()
2114         packet = self.create_rekey_request(kex=self.WITH_KEX)
2115         self.pg0.add_stream(packet)
2116         self.pg0.enable_capture()
2117         self.pg_start()
2118         capture = self.pg0.get_capture(1)
2119         return capture
2120
2121     def process_rekey_response(self, capture):
2122         ih = self.get_ike_header(capture[0])
2123         plain = self.sa.hmac_and_decrypt(ih)
2124         sa = ikev2.IKEv2_payload_SA(plain)
2125         prop = sa[ikev2.IKEv2_payload_Proposal]
2126         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2127         # update new responder SPI
2128         self.sa.child_sas[0].rspi = prop.SPI
2129         if self.WITH_KEX:
2130             self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
2131             self.sa.complete_dh_data()
2132         self.sa.calc_child_keys(kex=self.WITH_KEX)
2133
2134     def test_responder(self):
2135         super(TestResponderRekey, self).test_responder()
2136         self.process_rekey_response(self.send_rekey_from_initiator())
2137         self.verify_ike_sas()
2138         self.verify_ipsec_sas(is_rekey=True)
2139         self.assert_counter(1, "rekey_req", "ip4")
2140         r = self.vapi.ikev2_sa_dump()
2141         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2142
2143
2144 @tag_fixme_vpp_workers
2145 class TestResponderRekeyRepeat(TestResponderRekey):
2146     """test ikev2 responder - rekey repeat"""
2147
2148     def test_responder(self):
2149         super(TestResponderRekeyRepeat, self).test_responder()
2150         # rekey request is not accepted until old IPsec SA is expired
2151         capture = self.send_rekey_from_initiator()
2152         ih = self.get_ike_header(capture[0])
2153         plain = self.sa.hmac_and_decrypt(ih)
2154         notify = ikev2.IKEv2_payload_Notify(plain)
2155         self.assertEqual(notify.type, 43)
2156         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2157         # rekey request is accepted after old IPsec SA was expired
2158         for _ in range(50):
2159             if len(self.vapi.ipsec_sa_dump()) != 3:
2160                 break
2161             time.sleep(0.2)
2162         else:
2163             self.fail("old IPsec SA not expired")
2164         self.process_rekey_response(self.send_rekey_from_initiator())
2165         self.verify_ike_sas()
2166         self.verify_ipsec_sas(sa_count=3)
2167
2168
2169 @tag_fixme_vpp_workers
2170 class TestResponderRekeyKEX(TestResponderRekey):
2171     """test ikev2 responder - rekey with key exchange"""
2172
2173     WITH_KEX = True
2174
2175
2176 @tag_fixme_vpp_workers
2177 class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
2178     """test ikev2 responder - rekey repeat with key exchange"""
2179
2180     WITH_KEX = True
2181
2182
2183 @tag_fixme_ubuntu2204
2184 @tag_fixme_debian11
2185 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2186     """test ikev2 responder - non-default table id"""
2187
2188     @classmethod
2189     def setUpClass(cls):
2190         import scapy.contrib.ikev2 as _ikev2
2191
2192         globals()["ikev2"] = _ikev2
2193         super(IkePeer, cls).setUpClass()
2194         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
2195             cls, "vpp"
2196         ):
2197             return
2198         cls.create_pg_interfaces(range(1))
2199         cls.vapi.cli("ip table add 1")
2200         cls.vapi.cli("set interface ip table pg0 1")
2201         for i in cls.pg_interfaces:
2202             i.admin_up()
2203             i.config_ip4()
2204             i.resolve_arp()
2205             i.config_ip6()
2206             i.resolve_ndp()
2207
2208     def config_tc(self):
2209         self.config_params({"dpd_disabled": False})
2210
2211     def test_responder(self):
2212         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2213         super(TestResponderVrf, self).test_responder()
2214         self.pg0.enable_capture()
2215         self.pg_start()
2216         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2217         ih = self.get_ike_header(capture[0])
2218         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2219         plain = self.sa.hmac_and_decrypt(ih)
2220         self.assertEqual(plain, b"")
2221
2222
2223 @tag_fixme_vpp_workers
2224 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2225     """test ikev2 responder - cert based auth"""
2226
2227     def config_tc(self):
2228         self.config_params(
2229             {
2230                 "udp_encap": True,
2231                 "auth": "rsa-sig",
2232                 "server-key": "server-key.pem",
2233                 "client-key": "client-key.pem",
2234                 "client-cert": "client-cert.pem",
2235                 "server-cert": "server-cert.pem",
2236             }
2237         )
2238
2239
2240 @tag_fixme_vpp_workers
2241 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2242     TemplateResponder, Ikev2Params
2243 ):
2244     """
2245     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2246     """
2247
2248     def config_tc(self):
2249         self.config_params(
2250             {
2251                 "ike-crypto": ("AES-CBC", 16),
2252                 "ike-integ": "SHA2-256-128",
2253                 "esp-crypto": ("AES-CBC", 24),
2254                 "esp-integ": "SHA2-384-192",
2255                 "ike-dh": "2048MODPgr",
2256                 "nonce": os.urandom(256),
2257                 "no_idr_in_auth": True,
2258             }
2259         )
2260
2261
2262 @tag_fixme_vpp_workers
2263 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2264     TemplateResponder, Ikev2Params
2265 ):
2266
2267     """
2268     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2269     """
2270
2271     def config_tc(self):
2272         self.config_params(
2273             {
2274                 "ike-crypto": ("AES-CBC", 32),
2275                 "ike-integ": "SHA2-256-128",
2276                 "esp-crypto": ("AES-GCM-16ICV", 32),
2277                 "esp-integ": "NULL",
2278                 "ike-dh": "3072MODPgr",
2279             }
2280         )
2281
2282
2283 @tag_fixme_vpp_workers
2284 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2285     """
2286     IKE:AES_GCM_16_256
2287     """
2288
2289     IKE_NODE_SUFFIX = "ip6"
2290
2291     def config_tc(self):
2292         self.config_params(
2293             {
2294                 "del_sa_from_responder": True,
2295                 "ip6": True,
2296                 "natt": True,
2297                 "ike-crypto": ("AES-GCM-16ICV", 32),
2298                 "ike-integ": "NULL",
2299                 "ike-dh": "2048MODPgr",
2300                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2301                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2302             }
2303         )
2304
2305
2306 @tag_fixme_vpp_workers
2307 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2308     """
2309     Test for keep alive messages
2310     """
2311
2312     def send_empty_req_from_responder(self):
2313         packet = self.create_empty_request()
2314         self.pg0.add_stream(packet)
2315         self.pg0.enable_capture()
2316         self.pg_start()
2317         capture = self.pg0.get_capture(1)
2318         ih = self.get_ike_header(capture[0])
2319         self.assertEqual(ih.id, self.sa.msg_id)
2320         plain = self.sa.hmac_and_decrypt(ih)
2321         self.assertEqual(plain, b"")
2322         self.assert_counter(1, "keepalive", "ip4")
2323         r = self.vapi.ikev2_sa_dump()
2324         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2325
2326     def test_initiator(self):
2327         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2328         self.send_empty_req_from_responder()
2329
2330
2331 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2332     """malformed packet test"""
2333
2334     def tearDown(self):
2335         pass
2336
2337     def config_tc(self):
2338         self.config_params()
2339
2340     def create_ike_init_msg(self, length=None, payload=None):
2341         msg = ikev2.IKEv2(
2342             length=length,
2343             init_SPI="\x11" * 8,
2344             flags="Initiator",
2345             exch_type="IKE_SA_INIT",
2346         )
2347         if payload is not None:
2348             msg /= payload
2349         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2350
2351     def verify_bad_packet_length(self):
2352         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2353         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2354         self.assert_counter(self.pkt_count, "bad_length")
2355
2356     def verify_bad_sa_payload_length(self):
2357         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2358         ike_msg = self.create_ike_init_msg(payload=p)
2359         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2360         self.assert_counter(self.pkt_count, "malformed_packet")
2361
2362     def test_responder(self):
2363         self.pkt_count = 254
2364         self.verify_bad_packet_length()
2365         self.verify_bad_sa_payload_length()
2366
2367
2368 if __name__ == "__main__":
2369     unittest.main(testRunner=VppTestRunner)