ikev2: fix rekeying with multiple notify payloads
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
26
27 try:
28     text_type = unicode
29 except NameError:
30     text_type = str
31
32 KEY_PAD = b"Key Pad for IKEv2"
33 SALT_SIZE = 4
34 GCM_ICV_SIZE = 16
35 GCM_IV_SIZE = 8
36
37
38 # defined in rfc3526
39 # tuple structure is (p, g, key_len)
40 DH = {
41     "2048MODPgr": (
42         long_converter(
43             """
44     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
45     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
46     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
47     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
48     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
49     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
50     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
51     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
52     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
53     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
54     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
55         ),
56         2,
57         256,
58     ),
59     "3072MODPgr": (
60         long_converter(
61             """
62     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
63     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
64     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
65     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
66     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
67     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
68     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
69     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
70     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
71     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
72     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
73     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
74     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
75     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
76     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
77     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
78         ),
79         2,
80         384,
81     ),
82 }
83
84
85 class CryptoAlgo(object):
86     def __init__(self, name, cipher, mode):
87         self.name = name
88         self.cipher = cipher
89         self.mode = mode
90         if self.cipher is not None:
91             self.bs = self.cipher.block_size // 8
92
93             if self.name == "AES-GCM-16ICV":
94                 self.iv_len = GCM_IV_SIZE
95             else:
96                 self.iv_len = self.bs
97
98     def encrypt(self, data, key, aad=None):
99         iv = os.urandom(self.iv_len)
100         if aad is None:
101             encryptor = Cipher(
102                 self.cipher(key), self.mode(iv), default_backend()
103             ).encryptor()
104             return iv + encryptor.update(data) + encryptor.finalize()
105         else:
106             salt = key[-SALT_SIZE:]
107             nonce = salt + iv
108             encryptor = Cipher(
109                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
110             ).encryptor()
111             encryptor.authenticate_additional_data(aad)
112             data = encryptor.update(data) + encryptor.finalize()
113             data += encryptor.tag[:GCM_ICV_SIZE]
114             return iv + data
115
116     def decrypt(self, data, key, aad=None, icv=None):
117         if aad is None:
118             iv = data[: self.iv_len]
119             ct = data[self.iv_len :]
120             decryptor = Cipher(
121                 algorithms.AES(key), self.mode(iv), default_backend()
122             ).decryptor()
123             return decryptor.update(ct) + decryptor.finalize()
124         else:
125             salt = key[-SALT_SIZE:]
126             nonce = salt + data[:GCM_IV_SIZE]
127             ct = data[GCM_IV_SIZE:]
128             key = key[:-SALT_SIZE]
129             decryptor = Cipher(
130                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
131             ).decryptor()
132             decryptor.authenticate_additional_data(aad)
133             return decryptor.update(ct) + decryptor.finalize()
134
135     def pad(self, data):
136         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
137         data = data + b"\x00" * (pad_len - 1)
138         return data + bytes([pad_len - 1])
139
140
141 class AuthAlgo(object):
142     def __init__(self, name, mac, mod, key_len, trunc_len=None):
143         self.name = name
144         self.mac = mac
145         self.mod = mod
146         self.key_len = key_len
147         self.trunc_len = trunc_len or key_len
148
149
150 CRYPTO_ALGOS = {
151     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
152     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
153     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
154 }
155
156 AUTH_ALGOS = {
157     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
158     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
159     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
160     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
161     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
162 }
163
164 PRF_ALGOS = {
165     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
167 }
168
169 CRYPTO_IDS = {
170     12: "AES-CBC",
171     20: "AES-GCM-16ICV",
172 }
173
174 INTEG_IDS = {
175     2: "HMAC-SHA1-96",
176     12: "SHA2-256-128",
177     13: "SHA2-384-192",
178     14: "SHA2-512-256",
179 }
180
181
182 class IKEv2ChildSA(object):
183     def __init__(self, local_ts, remote_ts, is_initiator):
184         spi = os.urandom(4)
185         if is_initiator:
186             self.ispi = spi
187             self.rspi = None
188         else:
189             self.rspi = spi
190             self.ispi = None
191         self.local_ts = local_ts
192         self.remote_ts = remote_ts
193
194
195 class IKEv2SA(object):
196     def __init__(
197         self,
198         test,
199         is_initiator=True,
200         i_id=None,
201         r_id=None,
202         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
203         id_type="fqdn",
204         nonce=None,
205         auth_data=None,
206         local_ts=None,
207         remote_ts=None,
208         auth_method="shared-key",
209         priv_key=None,
210         i_natt=False,
211         r_natt=False,
212         udp_encap=False,
213     ):
214         self.udp_encap = udp_encap
215         self.i_natt = i_natt
216         self.r_natt = r_natt
217         if i_natt or r_natt:
218             self.sport = 4500
219             self.dport = 4500
220         else:
221             self.sport = 500
222             self.dport = 500
223         self.msg_id = 0
224         self.dh_params = None
225         self.test = test
226         self.priv_key = priv_key
227         self.is_initiator = is_initiator
228         nonce = nonce or os.urandom(32)
229         self.auth_data = auth_data
230         self.i_id = i_id
231         self.r_id = r_id
232         if isinstance(id_type, str):
233             self.id_type = IDType.value(id_type)
234         else:
235             self.id_type = id_type
236         self.auth_method = auth_method
237         if self.is_initiator:
238             self.rspi = 8 * b"\x00"
239             self.ispi = spi
240             self.i_nonce = nonce
241         else:
242             self.rspi = spi
243             self.ispi = 8 * b"\x00"
244             self.r_nonce = nonce
245         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
246
247     def new_msg_id(self):
248         self.msg_id += 1
249         return self.msg_id
250
251     @property
252     def my_dh_pub_key(self):
253         if self.is_initiator:
254             return self.i_dh_data
255         return self.r_dh_data
256
257     @property
258     def peer_dh_pub_key(self):
259         if self.is_initiator:
260             return self.r_dh_data
261         return self.i_dh_data
262
263     @property
264     def natt(self):
265         return self.i_natt or self.r_natt
266
267     def compute_secret(self):
268         priv = self.dh_private_key
269         peer = self.peer_dh_pub_key
270         p, g, l = self.ike_group
271         return pow(
272             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
273         ).to_bytes(l, "big")
274
275     def generate_dh_data(self):
276         # generate DH keys
277         if self.ike_dh not in DH:
278             raise NotImplementedError("%s not in DH group" % self.ike_dh)
279
280         if self.dh_params is None:
281             dhg = DH[self.ike_dh]
282             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
283             self.dh_params = pn.parameters(default_backend())
284
285         priv = self.dh_params.generate_private_key()
286         pub = priv.public_key()
287         x = priv.private_numbers().x
288         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
289         y = pub.public_numbers().y
290
291         if self.is_initiator:
292             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
293         else:
294             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
295
296     def complete_dh_data(self):
297         self.dh_shared_secret = self.compute_secret()
298
299     def calc_child_keys(self):
300         prf = self.ike_prf_alg.mod()
301         s = self.i_nonce + self.r_nonce
302         c = self.child_sas[0]
303
304         encr_key_len = self.esp_crypto_key_len
305         integ_key_len = self.esp_integ_alg.key_len
306         salt_len = 0 if integ_key_len else 4
307
308         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
309         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
310
311         pos = 0
312         c.sk_ei = keymat[pos : pos + encr_key_len]
313         pos += encr_key_len
314
315         if integ_key_len:
316             c.sk_ai = keymat[pos : pos + integ_key_len]
317             pos += integ_key_len
318         else:
319             c.salt_ei = keymat[pos : pos + salt_len]
320             pos += salt_len
321
322         c.sk_er = keymat[pos : pos + encr_key_len]
323         pos += encr_key_len
324
325         if integ_key_len:
326             c.sk_ar = keymat[pos : pos + integ_key_len]
327             pos += integ_key_len
328         else:
329             c.salt_er = keymat[pos : pos + salt_len]
330             pos += salt_len
331
332     def calc_prfplus(self, prf, key, seed, length):
333         r = b""
334         t = None
335         x = 1
336         while len(r) < length and x < 255:
337             if t is not None:
338                 s = t
339             else:
340                 s = b""
341             s = s + seed + bytes([x])
342             t = self.calc_prf(prf, key, s)
343             r = r + t
344             x = x + 1
345
346         if x == 255:
347             return None
348         return r
349
350     def calc_prf(self, prf, key, data):
351         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
352         h.update(data)
353         return h.finalize()
354
355     def calc_keys(self):
356         prf = self.ike_prf_alg.mod()
357         # SKEYSEED = prf(Ni | Nr, g^ir)
358         s = self.i_nonce + self.r_nonce
359         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
360
361         # calculate S = Ni | Nr | SPIi SPIr
362         s = s + self.ispi + self.rspi
363
364         prf_key_trunc = self.ike_prf_alg.trunc_len
365         encr_key_len = self.ike_crypto_key_len
366         tr_prf_key_len = self.ike_prf_alg.key_len
367         integ_key_len = self.ike_integ_alg.key_len
368         if integ_key_len == 0:
369             salt_size = 4
370         else:
371             salt_size = 0
372
373         l = (
374             prf_key_trunc
375             + integ_key_len * 2
376             + encr_key_len * 2
377             + tr_prf_key_len * 2
378             + salt_size * 2
379         )
380         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
381
382         pos = 0
383         self.sk_d = keymat[: pos + prf_key_trunc]
384         pos += prf_key_trunc
385
386         self.sk_ai = keymat[pos : pos + integ_key_len]
387         pos += integ_key_len
388         self.sk_ar = keymat[pos : pos + integ_key_len]
389         pos += integ_key_len
390
391         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
392         pos += encr_key_len + salt_size
393         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
394         pos += encr_key_len + salt_size
395
396         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
397         pos += tr_prf_key_len
398         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
399
400     def generate_authmsg(self, prf, packet):
401         if self.is_initiator:
402             id = self.i_id
403             nonce = self.r_nonce
404             key = self.sk_pi
405         else:
406             id = self.r_id
407             nonce = self.i_nonce
408             key = self.sk_pr
409         data = bytes([self.id_type, 0, 0, 0]) + id
410         id_hash = self.calc_prf(prf, key, data)
411         return packet + nonce + id_hash
412
413     def auth_init(self):
414         prf = self.ike_prf_alg.mod()
415         if self.is_initiator:
416             packet = self.init_req_packet
417         else:
418             packet = self.init_resp_packet
419         authmsg = self.generate_authmsg(prf, raw(packet))
420         if self.auth_method == "shared-key":
421             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
422             self.auth_data = self.calc_prf(prf, psk, authmsg)
423         elif self.auth_method == "rsa-sig":
424             self.auth_data = self.priv_key.sign(
425                 authmsg, padding.PKCS1v15(), hashes.SHA1()
426             )
427         else:
428             raise TypeError("unknown auth method type!")
429
430     def encrypt(self, data, aad=None):
431         data = self.ike_crypto_alg.pad(data)
432         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
433
434     @property
435     def peer_authkey(self):
436         if self.is_initiator:
437             return self.sk_ar
438         return self.sk_ai
439
440     @property
441     def my_authkey(self):
442         if self.is_initiator:
443             return self.sk_ai
444         return self.sk_ar
445
446     @property
447     def my_cryptokey(self):
448         if self.is_initiator:
449             return self.sk_ei
450         return self.sk_er
451
452     @property
453     def peer_cryptokey(self):
454         if self.is_initiator:
455             return self.sk_er
456         return self.sk_ei
457
458     def concat(self, alg, key_len):
459         return alg + "-" + str(key_len * 8)
460
461     @property
462     def vpp_ike_cypto_alg(self):
463         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
464
465     @property
466     def vpp_esp_cypto_alg(self):
467         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
468
469     def verify_hmac(self, ikemsg):
470         integ_trunc = self.ike_integ_alg.trunc_len
471         exp_hmac = ikemsg[-integ_trunc:]
472         data = ikemsg[:-integ_trunc]
473         computed_hmac = self.compute_hmac(
474             self.ike_integ_alg.mod(), self.peer_authkey, data
475         )
476         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
477
478     def compute_hmac(self, integ, key, data):
479         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
480         h.update(data)
481         return h.finalize()
482
483     def decrypt(self, data, aad=None, icv=None):
484         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
485
486     def hmac_and_decrypt(self, ike):
487         ep = ike[ikev2.IKEv2_payload_Encrypted]
488         if self.ike_crypto == "AES-GCM-16ICV":
489             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
490             ct = ep.load[:-GCM_ICV_SIZE]
491             tag = ep.load[-GCM_ICV_SIZE:]
492             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
493         else:
494             self.verify_hmac(raw(ike))
495             integ_trunc = self.ike_integ_alg.trunc_len
496
497             # remove ICV and decrypt payload
498             ct = ep.load[:-integ_trunc]
499             plain = self.decrypt(ct)
500         # remove padding
501         pad_len = plain[-1]
502         return plain[: -pad_len - 1]
503
504     def build_ts_addr(self, ts, version):
505         return {
506             "starting_address_v" + version: ts["start_addr"],
507             "ending_address_v" + version: ts["end_addr"],
508         }
509
510     def generate_ts(self, is_ip4):
511         c = self.child_sas[0]
512         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
513         if is_ip4:
514             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
515             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
516             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
517             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
518         else:
519             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
520             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
521             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
522             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
523
524         if self.is_initiator:
525             return ([ts1], [ts2])
526         return ([ts2], [ts1])
527
528     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
529         if crypto not in CRYPTO_ALGOS:
530             raise TypeError("unsupported encryption algo %r" % crypto)
531         self.ike_crypto = crypto
532         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
533         self.ike_crypto_key_len = crypto_key_len
534
535         if integ not in AUTH_ALGOS:
536             raise TypeError("unsupported auth algo %r" % integ)
537         self.ike_integ = None if integ == "NULL" else integ
538         self.ike_integ_alg = AUTH_ALGOS[integ]
539
540         if prf not in PRF_ALGOS:
541             raise TypeError("unsupported prf algo %r" % prf)
542         self.ike_prf = prf
543         self.ike_prf_alg = PRF_ALGOS[prf]
544         self.ike_dh = dh
545         self.ike_group = DH[self.ike_dh]
546
547     def set_esp_props(self, crypto, crypto_key_len, integ):
548         self.esp_crypto_key_len = crypto_key_len
549         if crypto not in CRYPTO_ALGOS:
550             raise TypeError("unsupported encryption algo %r" % crypto)
551         self.esp_crypto = crypto
552         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
553
554         if integ not in AUTH_ALGOS:
555             raise TypeError("unsupported auth algo %r" % integ)
556         self.esp_integ = None if integ == "NULL" else integ
557         self.esp_integ_alg = AUTH_ALGOS[integ]
558
559     def crypto_attr(self, key_len):
560         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
561             return (0x800E << 16 | key_len << 3, 12)
562         else:
563             raise Exception("unsupported attribute type")
564
565     def ike_crypto_attr(self):
566         return self.crypto_attr(self.ike_crypto_key_len)
567
568     def esp_crypto_attr(self):
569         return self.crypto_attr(self.esp_crypto_key_len)
570
571     def compute_nat_sha1(self, ip, port, rspi=None):
572         if rspi is None:
573             rspi = self.rspi
574         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
575         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
576         digest.update(data)
577         return digest.finalize()
578
579
580 class IkePeer(VppTestCase):
581     """common class for initiator and responder"""
582
583     @classmethod
584     def setUpClass(cls):
585         import scapy.contrib.ikev2 as _ikev2
586
587         globals()["ikev2"] = _ikev2
588         super(IkePeer, cls).setUpClass()
589         cls.create_pg_interfaces(range(2))
590         for i in cls.pg_interfaces:
591             i.admin_up()
592             i.config_ip4()
593             i.resolve_arp()
594             i.config_ip6()
595             i.resolve_ndp()
596
597     @classmethod
598     def tearDownClass(cls):
599         super(IkePeer, cls).tearDownClass()
600
601     def tearDown(self):
602         super(IkePeer, self).tearDown()
603         if self.del_sa_from_responder:
604             self.initiate_del_sa_from_responder()
605         else:
606             self.initiate_del_sa_from_initiator()
607         r = self.vapi.ikev2_sa_dump()
608         self.assertEqual(len(r), 0)
609         sas = self.vapi.ipsec_sa_dump()
610         self.assertEqual(len(sas), 0)
611         self.p.remove_vpp_config()
612         self.assertIsNone(self.p.query_vpp_config())
613
614     def setUp(self):
615         super(IkePeer, self).setUp()
616         self.config_tc()
617         self.p.add_vpp_config()
618         self.assertIsNotNone(self.p.query_vpp_config())
619         if self.sa.is_initiator:
620             self.sa.generate_dh_data()
621         self.vapi.cli("ikev2 set logging level 4")
622         self.vapi.cli("event-lo clear")
623
624     def assert_counter(self, count, name, version="ip4"):
625         node_name = "/err/ikev2-%s/" % version + name
626         self.assertEqual(count, self.statistics.get_err_counter(node_name))
627
628     def create_rekey_request(self):
629         sa, first_payload = self.generate_auth_payload(is_rekey=True)
630         header = ikev2.IKEv2(
631             init_SPI=self.sa.ispi,
632             resp_SPI=self.sa.rspi,
633             id=self.sa.new_msg_id(),
634             flags="Initiator",
635             exch_type="CREATE_CHILD_SA",
636         )
637
638         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
639         return self.create_packet(
640             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
641         )
642
643     def create_empty_request(self):
644         header = ikev2.IKEv2(
645             init_SPI=self.sa.ispi,
646             resp_SPI=self.sa.rspi,
647             id=self.sa.new_msg_id(),
648             flags="Initiator",
649             exch_type="INFORMATIONAL",
650             next_payload="Encrypted",
651         )
652
653         msg = self.encrypt_ike_msg(header, b"", None)
654         return self.create_packet(
655             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
656         )
657
658     def create_packet(
659         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
660     ):
661         if use_ip6:
662             src_ip = src_if.remote_ip6
663             dst_ip = src_if.local_ip6
664             ip_layer = IPv6
665         else:
666             src_ip = src_if.remote_ip4
667             dst_ip = src_if.local_ip4
668             ip_layer = IP
669         res = (
670             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
671             / ip_layer(src=src_ip, dst=dst_ip)
672             / UDP(sport=sport, dport=dport)
673         )
674         if natt:
675             # insert non ESP marker
676             res = res / Raw(b"\x00" * 4)
677         return res / msg
678
679     def verify_udp(self, udp):
680         self.assertEqual(udp.sport, self.sa.sport)
681         self.assertEqual(udp.dport, self.sa.dport)
682
683     def get_ike_header(self, packet):
684         try:
685             ih = packet[ikev2.IKEv2]
686             ih = self.verify_and_remove_non_esp_marker(ih)
687         except IndexError as e:
688             # this is a workaround for getting IKEv2 layer as both ikev2 and
689             # ipsec register for port 4500
690             esp = packet[ESP]
691             ih = self.verify_and_remove_non_esp_marker(esp)
692         self.assertEqual(ih.version, 0x20)
693         self.assertNotIn("Version", ih.flags)
694         return ih
695
696     def verify_and_remove_non_esp_marker(self, packet):
697         if self.sa.natt:
698             # if we are in nat traversal mode check for non esp marker
699             # and remove it
700             data = raw(packet)
701             self.assertEqual(data[:4], b"\x00" * 4)
702             return ikev2.IKEv2(data[4:])
703         else:
704             return packet
705
706     def encrypt_ike_msg(self, header, plain, first_payload):
707         if self.sa.ike_crypto == "AES-GCM-16ICV":
708             data = self.sa.ike_crypto_alg.pad(raw(plain))
709             plen = (
710                 len(data)
711                 + GCM_IV_SIZE
712                 + GCM_ICV_SIZE
713                 + len(ikev2.IKEv2_payload_Encrypted())
714             )
715             tlen = plen + len(ikev2.IKEv2())
716
717             # prepare aad data
718             sk_p = ikev2.IKEv2_payload_Encrypted(
719                 next_payload=first_payload, length=plen
720             )
721             header.length = tlen
722             res = header / sk_p
723             encr = self.sa.encrypt(raw(plain), raw(res))
724             sk_p = ikev2.IKEv2_payload_Encrypted(
725                 next_payload=first_payload, length=plen, load=encr
726             )
727             res = header / sk_p
728         else:
729             encr = self.sa.encrypt(raw(plain))
730             trunc_len = self.sa.ike_integ_alg.trunc_len
731             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
732             tlen = plen + len(ikev2.IKEv2())
733
734             sk_p = ikev2.IKEv2_payload_Encrypted(
735                 next_payload=first_payload, length=plen, load=encr
736             )
737             header.length = tlen
738             res = header / sk_p
739
740             integ_data = raw(res)
741             hmac_data = self.sa.compute_hmac(
742                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
743             )
744             res = res / Raw(hmac_data[:trunc_len])
745         assert len(res) == tlen
746         return res
747
748     def verify_udp_encap(self, ipsec_sa):
749         e = VppEnum.vl_api_ipsec_sad_flags_t
750         if self.sa.udp_encap or self.sa.natt:
751             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
752         else:
753             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
754
755     def verify_ipsec_sas(self, is_rekey=False):
756         sas = self.vapi.ipsec_sa_dump()
757         if is_rekey:
758             # after rekey there is a short period of time in which old
759             # inbound SA is still present
760             sa_count = 3
761         else:
762             sa_count = 2
763         self.assertEqual(len(sas), sa_count)
764         if self.sa.is_initiator:
765             if is_rekey:
766                 sa0 = sas[0].entry
767                 sa1 = sas[2].entry
768             else:
769                 sa0 = sas[0].entry
770                 sa1 = sas[1].entry
771         else:
772             if is_rekey:
773                 sa0 = sas[2].entry
774                 sa1 = sas[0].entry
775             else:
776                 sa1 = sas[0].entry
777                 sa0 = sas[1].entry
778
779         c = self.sa.child_sas[0]
780
781         self.verify_udp_encap(sa0)
782         self.verify_udp_encap(sa1)
783         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
784         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
785         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
786
787         if self.sa.esp_integ is None:
788             vpp_integ_alg = 0
789         else:
790             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
791         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
792         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
793
794         # verify crypto keys
795         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
796         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
797         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
798         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
799
800         # verify integ keys
801         if vpp_integ_alg:
802             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
803             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
804             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
805             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
806         else:
807             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
808             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
809
810     def verify_keymat(self, api_keys, keys, name):
811         km = getattr(keys, name)
812         api_km = getattr(api_keys, name)
813         api_km_len = getattr(api_keys, name + "_len")
814         self.assertEqual(len(km), api_km_len)
815         self.assertEqual(km, api_km[:api_km_len])
816
817     def verify_id(self, api_id, exp_id):
818         self.assertEqual(api_id.type, IDType.value(exp_id.type))
819         self.assertEqual(api_id.data_len, exp_id.data_len)
820         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
821
822     def verify_ike_sas(self):
823         r = self.vapi.ikev2_sa_dump()
824         self.assertEqual(len(r), 1)
825         sa = r[0].sa
826         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
827         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
828         if self.ip6:
829             if self.sa.is_initiator:
830                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
831                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
832             else:
833                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
834                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
835         else:
836             if self.sa.is_initiator:
837                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
838                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
839             else:
840                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
841                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
842         self.verify_keymat(sa.keys, self.sa, "sk_d")
843         self.verify_keymat(sa.keys, self.sa, "sk_ai")
844         self.verify_keymat(sa.keys, self.sa, "sk_ar")
845         self.verify_keymat(sa.keys, self.sa, "sk_ei")
846         self.verify_keymat(sa.keys, self.sa, "sk_er")
847         self.verify_keymat(sa.keys, self.sa, "sk_pi")
848         self.verify_keymat(sa.keys, self.sa, "sk_pr")
849
850         self.assertEqual(sa.i_id.type, self.sa.id_type)
851         self.assertEqual(sa.r_id.type, self.sa.id_type)
852         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
853         self.assertEqual(sa.r_id.data_len, len(self.idr))
854         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
855         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
856
857         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
858         self.assertEqual(len(r), 1)
859         csa = r[0].child_sa
860         self.assertEqual(csa.sa_index, sa.sa_index)
861         c = self.sa.child_sas[0]
862         if hasattr(c, "sk_ai"):
863             self.verify_keymat(csa.keys, c, "sk_ai")
864             self.verify_keymat(csa.keys, c, "sk_ar")
865         self.verify_keymat(csa.keys, c, "sk_ei")
866         self.verify_keymat(csa.keys, c, "sk_er")
867         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
868         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
869
870         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
871         tsi = tsi[0]
872         tsr = tsr[0]
873         r = self.vapi.ikev2_traffic_selector_dump(
874             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
875         )
876         self.assertEqual(len(r), 1)
877         ts = r[0].ts
878         self.verify_ts(r[0].ts, tsi[0], True)
879
880         r = self.vapi.ikev2_traffic_selector_dump(
881             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
882         )
883         self.assertEqual(len(r), 1)
884         self.verify_ts(r[0].ts, tsr[0], False)
885
886         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
887         self.verify_nonce(n, self.sa.i_nonce)
888         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
889         self.verify_nonce(n, self.sa.r_nonce)
890
891     def verify_nonce(self, api_nonce, nonce):
892         self.assertEqual(api_nonce.data_len, len(nonce))
893         self.assertEqual(api_nonce.nonce, nonce)
894
895     def verify_ts(self, api_ts, ts, is_initiator):
896         if is_initiator:
897             self.assertTrue(api_ts.is_local)
898         else:
899             self.assertFalse(api_ts.is_local)
900
901         if self.p.ts_is_ip4:
902             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
903             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
904         else:
905             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
906             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
907         self.assertEqual(api_ts.start_port, ts.start_port)
908         self.assertEqual(api_ts.end_port, ts.end_port)
909         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
910
911
912 class TemplateInitiator(IkePeer):
913     """initiator test template"""
914
915     def initiate_del_sa_from_initiator(self):
916         ispi = int.from_bytes(self.sa.ispi, "little")
917         self.pg0.enable_capture()
918         self.pg_start()
919         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
920         capture = self.pg0.get_capture(1)
921         ih = self.get_ike_header(capture[0])
922         self.assertNotIn("Response", ih.flags)
923         self.assertIn("Initiator", ih.flags)
924         self.assertEqual(ih.init_SPI, self.sa.ispi)
925         self.assertEqual(ih.resp_SPI, self.sa.rspi)
926         plain = self.sa.hmac_and_decrypt(ih)
927         d = ikev2.IKEv2_payload_Delete(plain)
928         self.assertEqual(d.proto, 1)  # proto=IKEv2
929         header = ikev2.IKEv2(
930             init_SPI=self.sa.ispi,
931             resp_SPI=self.sa.rspi,
932             flags="Response",
933             exch_type="INFORMATIONAL",
934             id=ih.id,
935             next_payload="Encrypted",
936         )
937         resp = self.encrypt_ike_msg(header, b"", None)
938         self.send_and_assert_no_replies(self.pg0, resp)
939
940     def verify_del_sa(self, packet):
941         ih = self.get_ike_header(packet)
942         self.assertEqual(ih.id, self.sa.msg_id)
943         self.assertEqual(ih.exch_type, 37)  # exchange informational
944         self.assertIn("Response", ih.flags)
945         self.assertIn("Initiator", ih.flags)
946         plain = self.sa.hmac_and_decrypt(ih)
947         self.assertEqual(plain, b"")
948
949     def initiate_del_sa_from_responder(self):
950         header = ikev2.IKEv2(
951             init_SPI=self.sa.ispi,
952             resp_SPI=self.sa.rspi,
953             exch_type="INFORMATIONAL",
954             id=self.sa.new_msg_id(),
955         )
956         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
957         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
958         packet = self.create_packet(
959             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
960         )
961         self.pg0.add_stream(packet)
962         self.pg0.enable_capture()
963         self.pg_start()
964         capture = self.pg0.get_capture(1)
965         self.verify_del_sa(capture[0])
966
967     @staticmethod
968     def find_notify_payload(packet, notify_type):
969         n = packet[ikev2.IKEv2_payload_Notify]
970         while n is not None:
971             if n.type == notify_type:
972                 return n
973             n = n.payload
974         return None
975
976     def verify_nat_detection(self, packet):
977         if self.ip6:
978             iph = packet[IPv6]
979         else:
980             iph = packet[IP]
981         udp = packet[UDP]
982
983         # NAT_DETECTION_SOURCE_IP
984         s = self.find_notify_payload(packet, 16388)
985         self.assertIsNotNone(s)
986         src_sha = self.sa.compute_nat_sha1(
987             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
988         )
989         self.assertEqual(s.load, src_sha)
990
991         # NAT_DETECTION_DESTINATION_IP
992         s = self.find_notify_payload(packet, 16389)
993         self.assertIsNotNone(s)
994         dst_sha = self.sa.compute_nat_sha1(
995             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
996         )
997         self.assertEqual(s.load, dst_sha)
998
999     def verify_sa_init_request(self, packet):
1000         udp = packet[UDP]
1001         self.sa.dport = udp.sport
1002         ih = packet[ikev2.IKEv2]
1003         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1004         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1005         self.sa.ispi = ih.init_SPI
1006         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1007         self.assertIn("Initiator", ih.flags)
1008         self.assertNotIn("Response", ih.flags)
1009         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1010         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1011
1012         prop = packet[ikev2.IKEv2_payload_Proposal]
1013         self.assertEqual(prop.proto, 1)  # proto = ikev2
1014         self.assertEqual(prop.proposal, 1)
1015         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1016         self.assertEqual(
1017             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1018         )
1019         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1020         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1021         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1022         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1023
1024         self.verify_nat_detection(packet)
1025         self.sa.set_ike_props(
1026             crypto="AES-GCM-16ICV",
1027             crypto_key_len=32,
1028             integ="NULL",
1029             prf="PRF_HMAC_SHA2_256",
1030             dh="3072MODPgr",
1031         )
1032         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1033         self.sa.generate_dh_data()
1034         self.sa.complete_dh_data()
1035         self.sa.calc_keys()
1036
1037     def update_esp_transforms(self, trans, sa):
1038         while trans:
1039             if trans.transform_type == 1:  # ecryption
1040                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1041             elif trans.transform_type == 3:  # integrity
1042                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1043             trans = trans.payload
1044
1045     def verify_sa_auth_req(self, packet):
1046         udp = packet[UDP]
1047         self.sa.dport = udp.sport
1048         ih = self.get_ike_header(packet)
1049         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1050         self.assertEqual(ih.init_SPI, self.sa.ispi)
1051         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1052         self.assertIn("Initiator", ih.flags)
1053         self.assertNotIn("Response", ih.flags)
1054
1055         udp = packet[UDP]
1056         self.verify_udp(udp)
1057         self.assertEqual(ih.id, self.sa.msg_id + 1)
1058         self.sa.msg_id += 1
1059         plain = self.sa.hmac_and_decrypt(ih)
1060         idi = ikev2.IKEv2_payload_IDi(plain)
1061         self.assertEqual(idi.load, self.sa.i_id)
1062         if self.no_idr_auth:
1063             self.assertEqual(idi.next_payload, 39)  # AUTH
1064         else:
1065             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1066             self.assertEqual(idr.load, self.sa.r_id)
1067         prop = idi[ikev2.IKEv2_payload_Proposal]
1068         c = self.sa.child_sas[0]
1069         c.ispi = prop.SPI
1070         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1071
1072     def send_init_response(self):
1073         tr_attr = self.sa.ike_crypto_attr()
1074         trans = (
1075             ikev2.IKEv2_payload_Transform(
1076                 transform_type="Encryption",
1077                 transform_id=self.sa.ike_crypto,
1078                 length=tr_attr[1],
1079                 key_length=tr_attr[0],
1080             )
1081             / ikev2.IKEv2_payload_Transform(
1082                 transform_type="Integrity", transform_id=self.sa.ike_integ
1083             )
1084             / ikev2.IKEv2_payload_Transform(
1085                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1086             )
1087             / ikev2.IKEv2_payload_Transform(
1088                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1089             )
1090         )
1091         props = ikev2.IKEv2_payload_Proposal(
1092             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1093         )
1094
1095         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1096         if self.sa.natt:
1097             dst_address = b"\x0a\x0a\x0a\x0a"
1098         else:
1099             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1100         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1101         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1102
1103         self.sa.init_resp_packet = (
1104             ikev2.IKEv2(
1105                 init_SPI=self.sa.ispi,
1106                 resp_SPI=self.sa.rspi,
1107                 exch_type="IKE_SA_INIT",
1108                 flags="Response",
1109             )
1110             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1111             / ikev2.IKEv2_payload_KE(
1112                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1113             )
1114             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1115             / ikev2.IKEv2_payload_Notify(
1116                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1117             )
1118             / ikev2.IKEv2_payload_Notify(
1119                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1120             )
1121         )
1122
1123         ike_msg = self.create_packet(
1124             self.pg0,
1125             self.sa.init_resp_packet,
1126             self.sa.sport,
1127             self.sa.dport,
1128             False,
1129             self.ip6,
1130         )
1131         self.pg_send(self.pg0, ike_msg)
1132         capture = self.pg0.get_capture(1)
1133         self.verify_sa_auth_req(capture[0])
1134
1135     def initiate_sa_init(self):
1136         self.pg0.enable_capture()
1137         self.pg_start()
1138         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1139
1140         capture = self.pg0.get_capture(1)
1141         self.verify_sa_init_request(capture[0])
1142         self.send_init_response()
1143
1144     def send_auth_response(self):
1145         tr_attr = self.sa.esp_crypto_attr()
1146         trans = (
1147             ikev2.IKEv2_payload_Transform(
1148                 transform_type="Encryption",
1149                 transform_id=self.sa.esp_crypto,
1150                 length=tr_attr[1],
1151                 key_length=tr_attr[0],
1152             )
1153             / ikev2.IKEv2_payload_Transform(
1154                 transform_type="Integrity", transform_id=self.sa.esp_integ
1155             )
1156             / ikev2.IKEv2_payload_Transform(
1157                 transform_type="Extended Sequence Number", transform_id="No ESN"
1158             )
1159             / ikev2.IKEv2_payload_Transform(
1160                 transform_type="Extended Sequence Number", transform_id="ESN"
1161             )
1162         )
1163
1164         c = self.sa.child_sas[0]
1165         props = ikev2.IKEv2_payload_Proposal(
1166             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1167         )
1168
1169         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1170         plain = (
1171             ikev2.IKEv2_payload_IDi(
1172                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1173             )
1174             / ikev2.IKEv2_payload_IDr(
1175                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1176             )
1177             / ikev2.IKEv2_payload_AUTH(
1178                 next_payload="SA",
1179                 auth_type=AuthMethod.value(self.sa.auth_method),
1180                 load=self.sa.auth_data,
1181             )
1182             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1183             / ikev2.IKEv2_payload_TSi(
1184                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1185             )
1186             / ikev2.IKEv2_payload_TSr(
1187                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1188             )
1189             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1190         )
1191
1192         header = ikev2.IKEv2(
1193             init_SPI=self.sa.ispi,
1194             resp_SPI=self.sa.rspi,
1195             id=self.sa.new_msg_id(),
1196             flags="Response",
1197             exch_type="IKE_AUTH",
1198         )
1199
1200         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1201         packet = self.create_packet(
1202             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1203         )
1204         self.pg_send(self.pg0, packet)
1205
1206     def test_initiator(self):
1207         self.initiate_sa_init()
1208         self.sa.auth_init()
1209         self.sa.calc_child_keys()
1210         self.send_auth_response()
1211         self.verify_ike_sas()
1212
1213
1214 class TemplateResponder(IkePeer):
1215     """responder test template"""
1216
1217     def initiate_del_sa_from_responder(self):
1218         self.pg0.enable_capture()
1219         self.pg_start()
1220         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1221         capture = self.pg0.get_capture(1)
1222         ih = self.get_ike_header(capture[0])
1223         self.assertNotIn("Response", ih.flags)
1224         self.assertNotIn("Initiator", ih.flags)
1225         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1226         plain = self.sa.hmac_and_decrypt(ih)
1227         d = ikev2.IKEv2_payload_Delete(plain)
1228         self.assertEqual(d.proto, 1)  # proto=IKEv2
1229         self.assertEqual(ih.init_SPI, self.sa.ispi)
1230         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1231         header = ikev2.IKEv2(
1232             init_SPI=self.sa.ispi,
1233             resp_SPI=self.sa.rspi,
1234             flags="Initiator+Response",
1235             exch_type="INFORMATIONAL",
1236             id=ih.id,
1237             next_payload="Encrypted",
1238         )
1239         resp = self.encrypt_ike_msg(header, b"", None)
1240         self.send_and_assert_no_replies(self.pg0, resp)
1241
1242     def verify_del_sa(self, packet):
1243         ih = self.get_ike_header(packet)
1244         self.assertEqual(ih.id, self.sa.msg_id)
1245         self.assertEqual(ih.exch_type, 37)  # exchange informational
1246         self.assertIn("Response", ih.flags)
1247         self.assertNotIn("Initiator", ih.flags)
1248         self.assertEqual(ih.next_payload, 46)  # Encrypted
1249         self.assertEqual(ih.init_SPI, self.sa.ispi)
1250         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1251         plain = self.sa.hmac_and_decrypt(ih)
1252         self.assertEqual(plain, b"")
1253
1254     def initiate_del_sa_from_initiator(self):
1255         header = ikev2.IKEv2(
1256             init_SPI=self.sa.ispi,
1257             resp_SPI=self.sa.rspi,
1258             flags="Initiator",
1259             exch_type="INFORMATIONAL",
1260             id=self.sa.new_msg_id(),
1261         )
1262         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1263         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1264         packet = self.create_packet(
1265             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1266         )
1267         self.pg0.add_stream(packet)
1268         self.pg0.enable_capture()
1269         self.pg_start()
1270         capture = self.pg0.get_capture(1)
1271         self.verify_del_sa(capture[0])
1272
1273     def send_sa_init_req(self):
1274         tr_attr = self.sa.ike_crypto_attr()
1275         trans = (
1276             ikev2.IKEv2_payload_Transform(
1277                 transform_type="Encryption",
1278                 transform_id=self.sa.ike_crypto,
1279                 length=tr_attr[1],
1280                 key_length=tr_attr[0],
1281             )
1282             / ikev2.IKEv2_payload_Transform(
1283                 transform_type="Integrity", transform_id=self.sa.ike_integ
1284             )
1285             / ikev2.IKEv2_payload_Transform(
1286                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1287             )
1288             / ikev2.IKEv2_payload_Transform(
1289                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1290             )
1291         )
1292
1293         props = ikev2.IKEv2_payload_Proposal(
1294             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1295         )
1296
1297         next_payload = None if self.ip6 else "Notify"
1298
1299         self.sa.init_req_packet = (
1300             ikev2.IKEv2(
1301                 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1302             )
1303             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1304             / ikev2.IKEv2_payload_KE(
1305                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1306             )
1307             / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1308         )
1309
1310         if not self.ip6:
1311             if self.sa.i_natt:
1312                 src_address = b"\x0a\x0a\x0a\x01"
1313             else:
1314                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1315
1316             if self.sa.r_natt:
1317                 dst_address = b"\x0a\x0a\x0a\x0a"
1318             else:
1319                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1320
1321             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1322             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1323             nat_src_detection = ikev2.IKEv2_payload_Notify(
1324                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1325             )
1326             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1327                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1328             )
1329             self.sa.init_req_packet = (
1330                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1331             )
1332
1333         ike_msg = self.create_packet(
1334             self.pg0,
1335             self.sa.init_req_packet,
1336             self.sa.sport,
1337             self.sa.dport,
1338             self.sa.natt,
1339             self.ip6,
1340         )
1341         self.pg0.add_stream(ike_msg)
1342         self.pg0.enable_capture()
1343         self.pg_start()
1344         capture = self.pg0.get_capture(1)
1345         self.verify_sa_init(capture[0])
1346
1347     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1348         tr_attr = self.sa.esp_crypto_attr()
1349         last_payload = last_payload or "Notify"
1350         trans = (
1351             ikev2.IKEv2_payload_Transform(
1352                 transform_type="Encryption",
1353                 transform_id=self.sa.esp_crypto,
1354                 length=tr_attr[1],
1355                 key_length=tr_attr[0],
1356             )
1357             / ikev2.IKEv2_payload_Transform(
1358                 transform_type="Integrity", transform_id=self.sa.esp_integ
1359             )
1360             / ikev2.IKEv2_payload_Transform(
1361                 transform_type="Extended Sequence Number", transform_id="No ESN"
1362             )
1363             / ikev2.IKEv2_payload_Transform(
1364                 transform_type="Extended Sequence Number", transform_id="ESN"
1365             )
1366         )
1367
1368         c = self.sa.child_sas[0]
1369         props = ikev2.IKEv2_payload_Proposal(
1370             proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
1371         )
1372
1373         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1374         plain = (
1375             ikev2.IKEv2_payload_AUTH(
1376                 next_payload="SA",
1377                 auth_type=AuthMethod.value(self.sa.auth_method),
1378                 load=self.sa.auth_data,
1379             )
1380             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1381             / ikev2.IKEv2_payload_TSi(
1382                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1383             )
1384             / ikev2.IKEv2_payload_TSr(
1385                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1386             )
1387         )
1388
1389         if is_rekey:
1390             first_payload = "Nonce"
1391             plain = (
1392                 ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
1393                 / plain
1394                 / ikev2.IKEv2_payload_Notify(
1395                     type="REKEY_SA",
1396                     proto="ESP",
1397                     SPI=c.ispi,
1398                     length=8 + len(c.ispi),
1399                     next_payload="Notify",
1400                 )
1401                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1402             )
1403         else:
1404             first_payload = "IDi"
1405             if self.no_idr_auth:
1406                 ids = ikev2.IKEv2_payload_IDi(
1407                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1408                 )
1409             else:
1410                 ids = ikev2.IKEv2_payload_IDi(
1411                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1412                 ) / ikev2.IKEv2_payload_IDr(
1413                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1414                 )
1415             plain = ids / plain
1416         return plain, first_payload
1417
1418     def send_sa_auth(self):
1419         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1420         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1421         header = ikev2.IKEv2(
1422             init_SPI=self.sa.ispi,
1423             resp_SPI=self.sa.rspi,
1424             id=self.sa.new_msg_id(),
1425             flags="Initiator",
1426             exch_type="IKE_AUTH",
1427         )
1428
1429         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1430         packet = self.create_packet(
1431             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1432         )
1433         self.pg0.add_stream(packet)
1434         self.pg0.enable_capture()
1435         self.pg_start()
1436         capture = self.pg0.get_capture(1)
1437         self.verify_sa_auth_resp(capture[0])
1438
1439     def verify_sa_init(self, packet):
1440         ih = self.get_ike_header(packet)
1441
1442         self.assertEqual(ih.id, self.sa.msg_id)
1443         self.assertEqual(ih.exch_type, 34)
1444         self.assertIn("Response", ih.flags)
1445         self.assertEqual(ih.init_SPI, self.sa.ispi)
1446         self.assertNotEqual(ih.resp_SPI, 0)
1447         self.sa.rspi = ih.resp_SPI
1448         try:
1449             sa = ih[ikev2.IKEv2_payload_SA]
1450             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1451             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1452         except IndexError as e:
1453             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1454             self.logger.error(ih.show())
1455             raise
1456         self.sa.complete_dh_data()
1457         self.sa.calc_keys()
1458         self.sa.auth_init()
1459
1460     def verify_sa_auth_resp(self, packet):
1461         ike = self.get_ike_header(packet)
1462         udp = packet[UDP]
1463         self.verify_udp(udp)
1464         self.assertEqual(ike.id, self.sa.msg_id)
1465         plain = self.sa.hmac_and_decrypt(ike)
1466         idr = ikev2.IKEv2_payload_IDr(plain)
1467         prop = idr[ikev2.IKEv2_payload_Proposal]
1468         self.assertEqual(prop.SPIsize, 4)
1469         self.sa.child_sas[0].rspi = prop.SPI
1470         self.sa.calc_child_keys()
1471
1472     IKE_NODE_SUFFIX = "ip4"
1473
1474     def verify_counters(self):
1475         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1476         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1477         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1478
1479         r = self.vapi.ikev2_sa_dump()
1480         s = r[0].sa.stats
1481         self.assertEqual(1, s.n_sa_auth_req)
1482         self.assertEqual(1, s.n_sa_init_req)
1483
1484     def test_responder(self):
1485         self.send_sa_init_req()
1486         self.send_sa_auth()
1487         self.verify_ipsec_sas()
1488         self.verify_ike_sas()
1489         self.verify_counters()
1490
1491
1492 class Ikev2Params(object):
1493     def config_params(self, params={}):
1494         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1495         ei = VppEnum.vl_api_ipsec_integ_alg_t
1496         self.vpp_enums = {
1497             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1498             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1499             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1500             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1501             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1502             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1503             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1504             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1505             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1506             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1507         }
1508
1509         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1510         if dpd_disabled:
1511             self.vapi.cli("ikev2 dpd disable")
1512         self.del_sa_from_responder = (
1513             False
1514             if "del_sa_from_responder" not in params
1515             else params["del_sa_from_responder"]
1516         )
1517         i_natt = False if "i_natt" not in params else params["i_natt"]
1518         r_natt = False if "r_natt" not in params else params["r_natt"]
1519         self.p = Profile(self, "pr1")
1520         self.ip6 = False if "ip6" not in params else params["ip6"]
1521
1522         if "auth" in params and params["auth"] == "rsa-sig":
1523             auth_method = "rsa-sig"
1524             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1525             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1526
1527             client_file = work_dir + params["client-cert"]
1528             server_pem = open(work_dir + params["server-cert"]).read()
1529             client_priv = open(work_dir + params["client-key"]).read()
1530             client_priv = load_pem_private_key(
1531                 str.encode(client_priv), None, default_backend()
1532             )
1533             self.peer_cert = x509.load_pem_x509_certificate(
1534                 str.encode(server_pem), default_backend()
1535             )
1536             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1537             auth_data = None
1538         else:
1539             auth_data = b"$3cr3tpa$$w0rd"
1540             self.p.add_auth(method="shared-key", data=auth_data)
1541             auth_method = "shared-key"
1542             client_priv = None
1543
1544         is_init = True if "is_initiator" not in params else params["is_initiator"]
1545         self.no_idr_auth = params.get("no_idr_in_auth", False)
1546
1547         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1548         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1549         r_id = self.idr = idr["data"]
1550         i_id = self.idi = idi["data"]
1551         if is_init:
1552             # scapy is initiator, VPP is responder
1553             self.p.add_local_id(**idr)
1554             self.p.add_remote_id(**idi)
1555             if self.no_idr_auth:
1556                 r_id = None
1557         else:
1558             # VPP is initiator, scapy is responder
1559             self.p.add_local_id(**idi)
1560             if not self.no_idr_auth:
1561                 self.p.add_remote_id(**idr)
1562
1563         loc_ts = (
1564             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1565             if "loc_ts" not in params
1566             else params["loc_ts"]
1567         )
1568         rem_ts = (
1569             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1570             if "rem_ts" not in params
1571             else params["rem_ts"]
1572         )
1573         self.p.add_local_ts(**loc_ts)
1574         self.p.add_remote_ts(**rem_ts)
1575         if "responder" in params:
1576             self.p.add_responder(params["responder"])
1577         if "ike_transforms" in params:
1578             self.p.add_ike_transforms(params["ike_transforms"])
1579         if "esp_transforms" in params:
1580             self.p.add_esp_transforms(params["esp_transforms"])
1581
1582         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1583         if udp_encap:
1584             self.p.set_udp_encap(True)
1585
1586         if "responder_hostname" in params:
1587             hn = params["responder_hostname"]
1588             self.p.add_responder_hostname(hn)
1589
1590             # configure static dns record
1591             self.vapi.dns_name_server_add_del(
1592                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1593             )
1594             self.vapi.dns_enable_disable(enable=1)
1595
1596             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1597             self.vapi.cli(cmd)
1598
1599         self.sa = IKEv2SA(
1600             self,
1601             i_id=i_id,
1602             r_id=r_id,
1603             is_initiator=is_init,
1604             id_type=self.p.local_id["id_type"],
1605             i_natt=i_natt,
1606             r_natt=r_natt,
1607             priv_key=client_priv,
1608             auth_method=auth_method,
1609             nonce=params.get("nonce"),
1610             auth_data=auth_data,
1611             udp_encap=udp_encap,
1612             local_ts=self.p.remote_ts,
1613             remote_ts=self.p.local_ts,
1614         )
1615
1616         if is_init:
1617             ike_crypto = (
1618                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1619             )
1620             ike_integ = (
1621                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1622             )
1623             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1624
1625             esp_crypto = (
1626                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1627             )
1628             esp_integ = (
1629                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1630             )
1631
1632             self.sa.set_ike_props(
1633                 crypto=ike_crypto[0],
1634                 crypto_key_len=ike_crypto[1],
1635                 integ=ike_integ,
1636                 prf="PRF_HMAC_SHA2_256",
1637                 dh=ike_dh,
1638             )
1639             self.sa.set_esp_props(
1640                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1641             )
1642
1643
1644 class TestApi(VppTestCase):
1645     """Test IKEV2 API"""
1646
1647     @classmethod
1648     def setUpClass(cls):
1649         super(TestApi, cls).setUpClass()
1650
1651     @classmethod
1652     def tearDownClass(cls):
1653         super(TestApi, cls).tearDownClass()
1654
1655     def tearDown(self):
1656         super(TestApi, self).tearDown()
1657         self.p1.remove_vpp_config()
1658         self.p2.remove_vpp_config()
1659         r = self.vapi.ikev2_profile_dump()
1660         self.assertEqual(len(r), 0)
1661
1662     def configure_profile(self, cfg):
1663         p = Profile(self, cfg["name"])
1664         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1665         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1666         p.add_local_ts(**cfg["loc_ts"])
1667         p.add_remote_ts(**cfg["rem_ts"])
1668         p.add_responder(cfg["responder"])
1669         p.add_ike_transforms(cfg["ike_ts"])
1670         p.add_esp_transforms(cfg["esp_ts"])
1671         p.add_auth(**cfg["auth"])
1672         p.set_udp_encap(cfg["udp_encap"])
1673         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1674         if "lifetime_data" in cfg:
1675             p.set_lifetime_data(cfg["lifetime_data"])
1676         if "tun_itf" in cfg:
1677             p.set_tunnel_interface(cfg["tun_itf"])
1678         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1679             p.disable_natt()
1680         p.add_vpp_config()
1681         return p
1682
1683     def test_profile_api(self):
1684         """test profile dump API"""
1685         loc_ts4 = {
1686             "proto": 8,
1687             "start_port": 1,
1688             "end_port": 19,
1689             "start_addr": "3.3.3.2",
1690             "end_addr": "3.3.3.3",
1691         }
1692         rem_ts4 = {
1693             "proto": 9,
1694             "start_port": 10,
1695             "end_port": 119,
1696             "start_addr": "4.5.76.80",
1697             "end_addr": "2.3.4.6",
1698         }
1699
1700         loc_ts6 = {
1701             "proto": 8,
1702             "start_port": 1,
1703             "end_port": 19,
1704             "start_addr": "ab::1",
1705             "end_addr": "ab::4",
1706         }
1707         rem_ts6 = {
1708             "proto": 9,
1709             "start_port": 10,
1710             "end_port": 119,
1711             "start_addr": "cd::12",
1712             "end_addr": "cd::13",
1713         }
1714
1715         conf = {
1716             "p1": {
1717                 "name": "p1",
1718                 "natt_disabled": True,
1719                 "loc_id": ("fqdn", b"vpp.home"),
1720                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1721                 "loc_ts": loc_ts4,
1722                 "rem_ts": rem_ts4,
1723                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1724                 "ike_ts": {
1725                     "crypto_alg": 20,
1726                     "crypto_key_size": 32,
1727                     "integ_alg": 0,
1728                     "dh_group": 1,
1729                 },
1730                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1731                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1732                 "udp_encap": True,
1733                 "ipsec_over_udp_port": 4501,
1734                 "lifetime_data": {
1735                     "lifetime": 123,
1736                     "lifetime_maxdata": 20192,
1737                     "lifetime_jitter": 9,
1738                     "handover": 132,
1739                 },
1740             },
1741             "p2": {
1742                 "name": "p2",
1743                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1744                 "rem_id": ("ip6-addr", b"abcd::1"),
1745                 "loc_ts": loc_ts6,
1746                 "rem_ts": rem_ts6,
1747                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1748                 "ike_ts": {
1749                     "crypto_alg": 12,
1750                     "crypto_key_size": 16,
1751                     "integ_alg": 3,
1752                     "dh_group": 3,
1753                 },
1754                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1755                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1756                 "udp_encap": False,
1757                 "ipsec_over_udp_port": 4600,
1758                 "tun_itf": 0,
1759             },
1760         }
1761         self.p1 = self.configure_profile(conf["p1"])
1762         self.p2 = self.configure_profile(conf["p2"])
1763
1764         r = self.vapi.ikev2_profile_dump()
1765         self.assertEqual(len(r), 2)
1766         self.verify_profile(r[0].profile, conf["p1"])
1767         self.verify_profile(r[1].profile, conf["p2"])
1768
1769     def verify_id(self, api_id, cfg_id):
1770         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1771         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1772
1773     def verify_ts(self, api_ts, cfg_ts):
1774         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1775         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1776         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1777         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1778         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1779
1780     def verify_responder(self, api_r, cfg_r):
1781         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1782         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1783
1784     def verify_transforms(self, api_ts, cfg_ts):
1785         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1786         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1787         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1788
1789     def verify_ike_transforms(self, api_ts, cfg_ts):
1790         self.verify_transforms(api_ts, cfg_ts)
1791         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1792
1793     def verify_esp_transforms(self, api_ts, cfg_ts):
1794         self.verify_transforms(api_ts, cfg_ts)
1795
1796     def verify_auth(self, api_auth, cfg_auth):
1797         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1798         self.assertEqual(api_auth.data, cfg_auth["data"])
1799         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1800
1801     def verify_lifetime_data(self, p, ld):
1802         self.assertEqual(p.lifetime, ld["lifetime"])
1803         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1804         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1805         self.assertEqual(p.handover, ld["handover"])
1806
1807     def verify_profile(self, ap, cp):
1808         self.assertEqual(ap.name, cp["name"])
1809         self.assertEqual(ap.udp_encap, cp["udp_encap"])
1810         self.verify_id(ap.loc_id, cp["loc_id"])
1811         self.verify_id(ap.rem_id, cp["rem_id"])
1812         self.verify_ts(ap.loc_ts, cp["loc_ts"])
1813         self.verify_ts(ap.rem_ts, cp["rem_ts"])
1814         self.verify_responder(ap.responder, cp["responder"])
1815         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1816         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1817         self.verify_auth(ap.auth, cp["auth"])
1818         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1819         self.assertTrue(natt_dis == ap.natt_disabled)
1820
1821         if "lifetime_data" in cp:
1822             self.verify_lifetime_data(ap, cp["lifetime_data"])
1823         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1824         if "tun_itf" in cp:
1825             self.assertEqual(ap.tun_itf, cp["tun_itf"])
1826         else:
1827             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1828
1829
1830 @tag_fixme_vpp_workers
1831 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1832     """test responder - responder behind NAT"""
1833
1834     IKE_NODE_SUFFIX = "ip4-natt"
1835
1836     def config_tc(self):
1837         self.config_params({"r_natt": True})
1838
1839
1840 @tag_fixme_vpp_workers
1841 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1842     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1843
1844     def config_tc(self):
1845         self.config_params(
1846             {
1847                 "i_natt": True,
1848                 "is_initiator": False,  # seen from test case perspective
1849                 # thus vpp is initiator
1850                 "responder": {
1851                     "sw_if_index": self.pg0.sw_if_index,
1852                     "addr": self.pg0.remote_ip4,
1853                 },
1854                 "ike-crypto": ("AES-GCM-16ICV", 32),
1855                 "ike-integ": "NULL",
1856                 "ike-dh": "3072MODPgr",
1857                 "ike_transforms": {
1858                     "crypto_alg": 20,  # "aes-gcm-16"
1859                     "crypto_key_size": 256,
1860                     "dh_group": 15,  # "modp-3072"
1861                 },
1862                 "esp_transforms": {
1863                     "crypto_alg": 12,  # "aes-cbc"
1864                     "crypto_key_size": 256,
1865                     # "hmac-sha2-256-128"
1866                     "integ_alg": 12,
1867                 },
1868             }
1869         )
1870
1871
1872 @tag_fixme_vpp_workers
1873 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1874     """test ikev2 initiator - pre shared key auth"""
1875
1876     def config_tc(self):
1877         self.config_params(
1878             {
1879                 "is_initiator": False,  # seen from test case perspective
1880                 # thus vpp is initiator
1881                 "ike-crypto": ("AES-GCM-16ICV", 32),
1882                 "ike-integ": "NULL",
1883                 "ike-dh": "3072MODPgr",
1884                 "ike_transforms": {
1885                     "crypto_alg": 20,  # "aes-gcm-16"
1886                     "crypto_key_size": 256,
1887                     "dh_group": 15,  # "modp-3072"
1888                 },
1889                 "esp_transforms": {
1890                     "crypto_alg": 12,  # "aes-cbc"
1891                     "crypto_key_size": 256,
1892                     # "hmac-sha2-256-128"
1893                     "integ_alg": 12,
1894                 },
1895                 "responder_hostname": {
1896                     "hostname": "vpp.responder.org",
1897                     "sw_if_index": self.pg0.sw_if_index,
1898                 },
1899             }
1900         )
1901
1902
1903 @tag_fixme_vpp_workers
1904 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1905     """test initiator - request window size (1)"""
1906
1907     def rekey_respond(self, req, update_child_sa_data):
1908         ih = self.get_ike_header(req)
1909         plain = self.sa.hmac_and_decrypt(ih)
1910         sa = ikev2.IKEv2_payload_SA(plain)
1911         if update_child_sa_data:
1912             prop = sa[ikev2.IKEv2_payload_Proposal]
1913             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1914             self.sa.r_nonce = self.sa.i_nonce
1915             self.sa.child_sas[0].ispi = prop.SPI
1916             self.sa.child_sas[0].rspi = prop.SPI
1917             self.sa.calc_child_keys()
1918
1919         header = ikev2.IKEv2(
1920             init_SPI=self.sa.ispi,
1921             resp_SPI=self.sa.rspi,
1922             flags="Response",
1923             exch_type=36,
1924             id=ih.id,
1925             next_payload="Encrypted",
1926         )
1927         resp = self.encrypt_ike_msg(header, sa, "SA")
1928         packet = self.create_packet(
1929             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1930         )
1931         self.send_and_assert_no_replies(self.pg0, packet)
1932
1933     def test_initiator(self):
1934         super(TestInitiatorRequestWindowSize, self).test_initiator()
1935         self.pg0.enable_capture()
1936         self.pg_start()
1937         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1938         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1939         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1940         capture = self.pg0.get_capture(2)
1941
1942         # reply in reverse order
1943         self.rekey_respond(capture[1], True)
1944         self.rekey_respond(capture[0], False)
1945
1946         # verify that only the second request was accepted
1947         self.verify_ike_sas()
1948         self.verify_ipsec_sas(is_rekey=True)
1949
1950
1951 @tag_fixme_vpp_workers
1952 class TestInitiatorRekey(TestInitiatorPsk):
1953     """test ikev2 initiator - rekey"""
1954
1955     def rekey_from_initiator(self):
1956         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1957         self.pg0.enable_capture()
1958         self.pg_start()
1959         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1960         capture = self.pg0.get_capture(1)
1961         ih = self.get_ike_header(capture[0])
1962         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1963         self.assertNotIn("Response", ih.flags)
1964         self.assertIn("Initiator", ih.flags)
1965         plain = self.sa.hmac_and_decrypt(ih)
1966         sa = ikev2.IKEv2_payload_SA(plain)
1967         prop = sa[ikev2.IKEv2_payload_Proposal]
1968         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1969         self.sa.r_nonce = self.sa.i_nonce
1970         # update new responder SPI
1971         self.sa.child_sas[0].ispi = prop.SPI
1972         self.sa.child_sas[0].rspi = prop.SPI
1973         self.sa.calc_child_keys()
1974         header = ikev2.IKEv2(
1975             init_SPI=self.sa.ispi,
1976             resp_SPI=self.sa.rspi,
1977             flags="Response",
1978             exch_type=36,
1979             id=ih.id,
1980             next_payload="Encrypted",
1981         )
1982         resp = self.encrypt_ike_msg(header, sa, "SA")
1983         packet = self.create_packet(
1984             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1985         )
1986         self.send_and_assert_no_replies(self.pg0, packet)
1987
1988     def test_initiator(self):
1989         super(TestInitiatorRekey, self).test_initiator()
1990         self.rekey_from_initiator()
1991         self.verify_ike_sas()
1992         self.verify_ipsec_sas(is_rekey=True)
1993
1994
1995 @tag_fixme_vpp_workers
1996 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1997     """test ikev2 initiator - delete IKE SA from responder"""
1998
1999     def config_tc(self):
2000         self.config_params(
2001             {
2002                 "del_sa_from_responder": True,
2003                 "is_initiator": False,  # seen from test case perspective
2004                 # thus vpp is initiator
2005                 "responder": {
2006                     "sw_if_index": self.pg0.sw_if_index,
2007                     "addr": self.pg0.remote_ip4,
2008                 },
2009                 "ike-crypto": ("AES-GCM-16ICV", 32),
2010                 "ike-integ": "NULL",
2011                 "ike-dh": "3072MODPgr",
2012                 "ike_transforms": {
2013                     "crypto_alg": 20,  # "aes-gcm-16"
2014                     "crypto_key_size": 256,
2015                     "dh_group": 15,  # "modp-3072"
2016                 },
2017                 "esp_transforms": {
2018                     "crypto_alg": 12,  # "aes-cbc"
2019                     "crypto_key_size": 256,
2020                     # "hmac-sha2-256-128"
2021                     "integ_alg": 12,
2022                 },
2023                 "no_idr_in_auth": True,
2024             }
2025         )
2026
2027
2028 @tag_fixme_vpp_workers
2029 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2030     """test ikev2 responder - initiator behind NAT"""
2031
2032     IKE_NODE_SUFFIX = "ip4-natt"
2033
2034     def config_tc(self):
2035         self.config_params({"i_natt": True})
2036
2037
2038 @tag_fixme_vpp_workers
2039 class TestResponderPsk(TemplateResponder, Ikev2Params):
2040     """test ikev2 responder - pre shared key auth"""
2041
2042     def config_tc(self):
2043         self.config_params()
2044
2045
2046 @tag_fixme_vpp_workers
2047 class TestResponderDpd(TestResponderPsk):
2048     """
2049     Dead peer detection test
2050     """
2051
2052     def config_tc(self):
2053         self.config_params({"dpd_disabled": False})
2054
2055     def tearDown(self):
2056         pass
2057
2058     def test_responder(self):
2059         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2060         super(TestResponderDpd, self).test_responder()
2061         self.pg0.enable_capture()
2062         self.pg_start()
2063         # capture empty request but don't reply
2064         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2065         ih = self.get_ike_header(capture[0])
2066         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2067         plain = self.sa.hmac_and_decrypt(ih)
2068         self.assertEqual(plain, b"")
2069         # wait for SA expiration
2070         time.sleep(3)
2071         ike_sas = self.vapi.ikev2_sa_dump()
2072         self.assertEqual(len(ike_sas), 0)
2073         ipsec_sas = self.vapi.ipsec_sa_dump()
2074         self.assertEqual(len(ipsec_sas), 0)
2075
2076
2077 @tag_fixme_vpp_workers
2078 class TestResponderRekey(TestResponderPsk):
2079     """test ikev2 responder - rekey"""
2080
2081     def rekey_from_initiator(self):
2082         packet = self.create_rekey_request()
2083         self.pg0.add_stream(packet)
2084         self.pg0.enable_capture()
2085         self.pg_start()
2086         capture = self.pg0.get_capture(1)
2087         ih = self.get_ike_header(capture[0])
2088         plain = self.sa.hmac_and_decrypt(ih)
2089         sa = ikev2.IKEv2_payload_SA(plain)
2090         prop = sa[ikev2.IKEv2_payload_Proposal]
2091         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2092         # update new responder SPI
2093         self.sa.child_sas[0].rspi = prop.SPI
2094
2095     def test_responder(self):
2096         super(TestResponderRekey, self).test_responder()
2097         self.rekey_from_initiator()
2098         self.sa.calc_child_keys()
2099         self.verify_ike_sas()
2100         self.verify_ipsec_sas(is_rekey=True)
2101         self.assert_counter(1, "rekey_req", "ip4")
2102         r = self.vapi.ikev2_sa_dump()
2103         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2104
2105
2106 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2107     """test ikev2 responder - non-default table id"""
2108
2109     @classmethod
2110     def setUpClass(cls):
2111         import scapy.contrib.ikev2 as _ikev2
2112
2113         globals()["ikev2"] = _ikev2
2114         super(IkePeer, cls).setUpClass()
2115         cls.create_pg_interfaces(range(1))
2116         cls.vapi.cli("ip table add 1")
2117         cls.vapi.cli("set interface ip table pg0 1")
2118         for i in cls.pg_interfaces:
2119             i.admin_up()
2120             i.config_ip4()
2121             i.resolve_arp()
2122             i.config_ip6()
2123             i.resolve_ndp()
2124
2125     def config_tc(self):
2126         self.config_params({"dpd_disabled": False})
2127
2128     def test_responder(self):
2129         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2130         super(TestResponderVrf, self).test_responder()
2131         self.pg0.enable_capture()
2132         self.pg_start()
2133         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2134         ih = self.get_ike_header(capture[0])
2135         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2136         plain = self.sa.hmac_and_decrypt(ih)
2137         self.assertEqual(plain, b"")
2138
2139
2140 @tag_fixme_vpp_workers
2141 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2142     """test ikev2 responder - cert based auth"""
2143
2144     def config_tc(self):
2145         self.config_params(
2146             {
2147                 "udp_encap": True,
2148                 "auth": "rsa-sig",
2149                 "server-key": "server-key.pem",
2150                 "client-key": "client-key.pem",
2151                 "client-cert": "client-cert.pem",
2152                 "server-cert": "server-cert.pem",
2153             }
2154         )
2155
2156
2157 @tag_fixme_vpp_workers
2158 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2159     TemplateResponder, Ikev2Params
2160 ):
2161     """
2162     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2163     """
2164
2165     def config_tc(self):
2166         self.config_params(
2167             {
2168                 "ike-crypto": ("AES-CBC", 16),
2169                 "ike-integ": "SHA2-256-128",
2170                 "esp-crypto": ("AES-CBC", 24),
2171                 "esp-integ": "SHA2-384-192",
2172                 "ike-dh": "2048MODPgr",
2173                 "nonce": os.urandom(256),
2174                 "no_idr_in_auth": True,
2175             }
2176         )
2177
2178
2179 @tag_fixme_vpp_workers
2180 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2181     TemplateResponder, Ikev2Params
2182 ):
2183
2184     """
2185     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2186     """
2187
2188     def config_tc(self):
2189         self.config_params(
2190             {
2191                 "ike-crypto": ("AES-CBC", 32),
2192                 "ike-integ": "SHA2-256-128",
2193                 "esp-crypto": ("AES-GCM-16ICV", 32),
2194                 "esp-integ": "NULL",
2195                 "ike-dh": "3072MODPgr",
2196             }
2197         )
2198
2199
2200 @tag_fixme_vpp_workers
2201 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2202     """
2203     IKE:AES_GCM_16_256
2204     """
2205
2206     IKE_NODE_SUFFIX = "ip6"
2207
2208     def config_tc(self):
2209         self.config_params(
2210             {
2211                 "del_sa_from_responder": True,
2212                 "ip6": True,
2213                 "natt": True,
2214                 "ike-crypto": ("AES-GCM-16ICV", 32),
2215                 "ike-integ": "NULL",
2216                 "ike-dh": "2048MODPgr",
2217                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2218                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2219             }
2220         )
2221
2222
2223 @tag_fixme_vpp_workers
2224 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2225     """
2226     Test for keep alive messages
2227     """
2228
2229     def send_empty_req_from_responder(self):
2230         packet = self.create_empty_request()
2231         self.pg0.add_stream(packet)
2232         self.pg0.enable_capture()
2233         self.pg_start()
2234         capture = self.pg0.get_capture(1)
2235         ih = self.get_ike_header(capture[0])
2236         self.assertEqual(ih.id, self.sa.msg_id)
2237         plain = self.sa.hmac_and_decrypt(ih)
2238         self.assertEqual(plain, b"")
2239         self.assert_counter(1, "keepalive", "ip4")
2240         r = self.vapi.ikev2_sa_dump()
2241         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2242
2243     def test_initiator(self):
2244         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2245         self.send_empty_req_from_responder()
2246
2247
2248 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2249     """malformed packet test"""
2250
2251     def tearDown(self):
2252         pass
2253
2254     def config_tc(self):
2255         self.config_params()
2256
2257     def create_ike_init_msg(self, length=None, payload=None):
2258         msg = ikev2.IKEv2(
2259             length=length,
2260             init_SPI="\x11" * 8,
2261             flags="Initiator",
2262             exch_type="IKE_SA_INIT",
2263         )
2264         if payload is not None:
2265             msg /= payload
2266         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2267
2268     def verify_bad_packet_length(self):
2269         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2270         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2271         self.assert_counter(self.pkt_count, "bad_length")
2272
2273     def verify_bad_sa_payload_length(self):
2274         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2275         ike_msg = self.create_ike_init_msg(payload=p)
2276         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2277         self.assert_counter(self.pkt_count, "malformed_packet")
2278
2279     def test_responder(self):
2280         self.pkt_count = 254
2281         self.verify_bad_packet_length()
2282         self.verify_bad_sa_payload_length()
2283
2284
2285 if __name__ == "__main__":
2286     unittest.main(testRunner=VppTestRunner)