3c5887195810b152d98467813e50aa3ccacf8680
[vpp.git] / test / test_ikev2.py
1 import os
2 import time
3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
10     Cipher,
11     algorithms,
12     modes,
13 )
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 import unittest
16 from config import config
17 from scapy.layers.ipsec import ESP
18 from scapy.layers.inet import IP, UDP, Ether
19 from scapy.layers.inet6 import IPv6
20 from scapy.packet import raw, Raw
21 from scapy.utils import long_converter
22 from framework import tag_fixme_vpp_workers
23 from framework import VppTestCase, VppTestRunner
24 from vpp_ikev2 import Profile, IDType, AuthMethod
25 from vpp_papi import VppEnum
26
27 try:
28     text_type = unicode
29 except NameError:
30     text_type = str
31
32 KEY_PAD = b"Key Pad for IKEv2"
33 SALT_SIZE = 4
34 GCM_ICV_SIZE = 16
35 GCM_IV_SIZE = 8
36
37
38 # defined in rfc3526
39 # tuple structure is (p, g, key_len)
40 DH = {
41     "2048MODPgr": (
42         long_converter(
43             """
44     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
45     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
46     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
47     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
48     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
49     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
50     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
51     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
52     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
53     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
54     15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
55         ),
56         2,
57         256,
58     ),
59     "3072MODPgr": (
60         long_converter(
61             """
62     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
63     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
64     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
65     E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
66     EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
67     C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
68     83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
69     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
70     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
71     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
72     15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
73     ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
74     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
75     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
76     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
77     43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
78         ),
79         2,
80         384,
81     ),
82 }
83
84
85 class CryptoAlgo(object):
86     def __init__(self, name, cipher, mode):
87         self.name = name
88         self.cipher = cipher
89         self.mode = mode
90         if self.cipher is not None:
91             self.bs = self.cipher.block_size // 8
92
93             if self.name == "AES-GCM-16ICV":
94                 self.iv_len = GCM_IV_SIZE
95             else:
96                 self.iv_len = self.bs
97
98     def encrypt(self, data, key, aad=None):
99         iv = os.urandom(self.iv_len)
100         if aad is None:
101             encryptor = Cipher(
102                 self.cipher(key), self.mode(iv), default_backend()
103             ).encryptor()
104             return iv + encryptor.update(data) + encryptor.finalize()
105         else:
106             salt = key[-SALT_SIZE:]
107             nonce = salt + iv
108             encryptor = Cipher(
109                 self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
110             ).encryptor()
111             encryptor.authenticate_additional_data(aad)
112             data = encryptor.update(data) + encryptor.finalize()
113             data += encryptor.tag[:GCM_ICV_SIZE]
114             return iv + data
115
116     def decrypt(self, data, key, aad=None, icv=None):
117         if aad is None:
118             iv = data[: self.iv_len]
119             ct = data[self.iv_len :]
120             decryptor = Cipher(
121                 algorithms.AES(key), self.mode(iv), default_backend()
122             ).decryptor()
123             return decryptor.update(ct) + decryptor.finalize()
124         else:
125             salt = key[-SALT_SIZE:]
126             nonce = salt + data[:GCM_IV_SIZE]
127             ct = data[GCM_IV_SIZE:]
128             key = key[:-SALT_SIZE]
129             decryptor = Cipher(
130                 algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
131             ).decryptor()
132             decryptor.authenticate_additional_data(aad)
133             return decryptor.update(ct) + decryptor.finalize()
134
135     def pad(self, data):
136         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
137         data = data + b"\x00" * (pad_len - 1)
138         return data + bytes([pad_len - 1])
139
140
141 class AuthAlgo(object):
142     def __init__(self, name, mac, mod, key_len, trunc_len=None):
143         self.name = name
144         self.mac = mac
145         self.mod = mod
146         self.key_len = key_len
147         self.trunc_len = trunc_len or key_len
148
149
150 CRYPTO_ALGOS = {
151     "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
152     "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
153     "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
154 }
155
156 AUTH_ALGOS = {
157     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
158     "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
159     "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
160     "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
161     "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
162 }
163
164 PRF_ALGOS = {
165     "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
166     "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
167 }
168
169 CRYPTO_IDS = {
170     12: "AES-CBC",
171     20: "AES-GCM-16ICV",
172 }
173
174 INTEG_IDS = {
175     2: "HMAC-SHA1-96",
176     12: "SHA2-256-128",
177     13: "SHA2-384-192",
178     14: "SHA2-512-256",
179 }
180
181
182 class IKEv2ChildSA(object):
183     def __init__(self, local_ts, remote_ts, is_initiator):
184         spi = os.urandom(4)
185         if is_initiator:
186             self.ispi = spi
187             self.rspi = None
188         else:
189             self.rspi = spi
190             self.ispi = None
191         self.local_ts = local_ts
192         self.remote_ts = remote_ts
193
194
195 class IKEv2SA(object):
196     def __init__(
197         self,
198         test,
199         is_initiator=True,
200         i_id=None,
201         r_id=None,
202         spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
203         id_type="fqdn",
204         nonce=None,
205         auth_data=None,
206         local_ts=None,
207         remote_ts=None,
208         auth_method="shared-key",
209         priv_key=None,
210         i_natt=False,
211         r_natt=False,
212         udp_encap=False,
213     ):
214         self.udp_encap = udp_encap
215         self.i_natt = i_natt
216         self.r_natt = r_natt
217         if i_natt or r_natt:
218             self.sport = 4500
219             self.dport = 4500
220         else:
221             self.sport = 500
222             self.dport = 500
223         self.msg_id = 0
224         self.dh_params = None
225         self.test = test
226         self.priv_key = priv_key
227         self.is_initiator = is_initiator
228         nonce = nonce or os.urandom(32)
229         self.auth_data = auth_data
230         self.i_id = i_id
231         self.r_id = r_id
232         if isinstance(id_type, str):
233             self.id_type = IDType.value(id_type)
234         else:
235             self.id_type = id_type
236         self.auth_method = auth_method
237         if self.is_initiator:
238             self.rspi = 8 * b"\x00"
239             self.ispi = spi
240             self.i_nonce = nonce
241         else:
242             self.rspi = spi
243             self.ispi = 8 * b"\x00"
244             self.r_nonce = nonce
245         self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
246
247     def new_msg_id(self):
248         self.msg_id += 1
249         return self.msg_id
250
251     @property
252     def my_dh_pub_key(self):
253         if self.is_initiator:
254             return self.i_dh_data
255         return self.r_dh_data
256
257     @property
258     def peer_dh_pub_key(self):
259         if self.is_initiator:
260             return self.r_dh_data
261         return self.i_dh_data
262
263     @property
264     def natt(self):
265         return self.i_natt or self.r_natt
266
267     def compute_secret(self):
268         priv = self.dh_private_key
269         peer = self.peer_dh_pub_key
270         p, g, l = self.ike_group
271         return pow(
272             int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
273         ).to_bytes(l, "big")
274
275     def generate_dh_data(self):
276         # generate DH keys
277         if self.ike_dh not in DH:
278             raise NotImplementedError("%s not in DH group" % self.ike_dh)
279
280         if self.dh_params is None:
281             dhg = DH[self.ike_dh]
282             pn = dh.DHParameterNumbers(dhg[0], dhg[1])
283             self.dh_params = pn.parameters(default_backend())
284
285         priv = self.dh_params.generate_private_key()
286         pub = priv.public_key()
287         x = priv.private_numbers().x
288         self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
289         y = pub.public_numbers().y
290
291         if self.is_initiator:
292             self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
293         else:
294             self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
295
296     def complete_dh_data(self):
297         self.dh_shared_secret = self.compute_secret()
298
299     def calc_child_keys(self):
300         prf = self.ike_prf_alg.mod()
301         s = self.i_nonce + self.r_nonce
302         c = self.child_sas[0]
303
304         encr_key_len = self.esp_crypto_key_len
305         integ_key_len = self.esp_integ_alg.key_len
306         salt_len = 0 if integ_key_len else 4
307
308         l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
309         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
310
311         pos = 0
312         c.sk_ei = keymat[pos : pos + encr_key_len]
313         pos += encr_key_len
314
315         if integ_key_len:
316             c.sk_ai = keymat[pos : pos + integ_key_len]
317             pos += integ_key_len
318         else:
319             c.salt_ei = keymat[pos : pos + salt_len]
320             pos += salt_len
321
322         c.sk_er = keymat[pos : pos + encr_key_len]
323         pos += encr_key_len
324
325         if integ_key_len:
326             c.sk_ar = keymat[pos : pos + integ_key_len]
327             pos += integ_key_len
328         else:
329             c.salt_er = keymat[pos : pos + salt_len]
330             pos += salt_len
331
332     def calc_prfplus(self, prf, key, seed, length):
333         r = b""
334         t = None
335         x = 1
336         while len(r) < length and x < 255:
337             if t is not None:
338                 s = t
339             else:
340                 s = b""
341             s = s + seed + bytes([x])
342             t = self.calc_prf(prf, key, s)
343             r = r + t
344             x = x + 1
345
346         if x == 255:
347             return None
348         return r
349
350     def calc_prf(self, prf, key, data):
351         h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
352         h.update(data)
353         return h.finalize()
354
355     def calc_keys(self):
356         prf = self.ike_prf_alg.mod()
357         # SKEYSEED = prf(Ni | Nr, g^ir)
358         s = self.i_nonce + self.r_nonce
359         self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
360
361         # calculate S = Ni | Nr | SPIi SPIr
362         s = s + self.ispi + self.rspi
363
364         prf_key_trunc = self.ike_prf_alg.trunc_len
365         encr_key_len = self.ike_crypto_key_len
366         tr_prf_key_len = self.ike_prf_alg.key_len
367         integ_key_len = self.ike_integ_alg.key_len
368         if integ_key_len == 0:
369             salt_size = 4
370         else:
371             salt_size = 0
372
373         l = (
374             prf_key_trunc
375             + integ_key_len * 2
376             + encr_key_len * 2
377             + tr_prf_key_len * 2
378             + salt_size * 2
379         )
380         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
381
382         pos = 0
383         self.sk_d = keymat[: pos + prf_key_trunc]
384         pos += prf_key_trunc
385
386         self.sk_ai = keymat[pos : pos + integ_key_len]
387         pos += integ_key_len
388         self.sk_ar = keymat[pos : pos + integ_key_len]
389         pos += integ_key_len
390
391         self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
392         pos += encr_key_len + salt_size
393         self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
394         pos += encr_key_len + salt_size
395
396         self.sk_pi = keymat[pos : pos + tr_prf_key_len]
397         pos += tr_prf_key_len
398         self.sk_pr = keymat[pos : pos + tr_prf_key_len]
399
400     def generate_authmsg(self, prf, packet):
401         if self.is_initiator:
402             id = self.i_id
403             nonce = self.r_nonce
404             key = self.sk_pi
405         else:
406             id = self.r_id
407             nonce = self.i_nonce
408             key = self.sk_pr
409         data = bytes([self.id_type, 0, 0, 0]) + id
410         id_hash = self.calc_prf(prf, key, data)
411         return packet + nonce + id_hash
412
413     def auth_init(self):
414         prf = self.ike_prf_alg.mod()
415         if self.is_initiator:
416             packet = self.init_req_packet
417         else:
418             packet = self.init_resp_packet
419         authmsg = self.generate_authmsg(prf, raw(packet))
420         if self.auth_method == "shared-key":
421             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
422             self.auth_data = self.calc_prf(prf, psk, authmsg)
423         elif self.auth_method == "rsa-sig":
424             self.auth_data = self.priv_key.sign(
425                 authmsg, padding.PKCS1v15(), hashes.SHA1()
426             )
427         else:
428             raise TypeError("unknown auth method type!")
429
430     def encrypt(self, data, aad=None):
431         data = self.ike_crypto_alg.pad(data)
432         return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
433
434     @property
435     def peer_authkey(self):
436         if self.is_initiator:
437             return self.sk_ar
438         return self.sk_ai
439
440     @property
441     def my_authkey(self):
442         if self.is_initiator:
443             return self.sk_ai
444         return self.sk_ar
445
446     @property
447     def my_cryptokey(self):
448         if self.is_initiator:
449             return self.sk_ei
450         return self.sk_er
451
452     @property
453     def peer_cryptokey(self):
454         if self.is_initiator:
455             return self.sk_er
456         return self.sk_ei
457
458     def concat(self, alg, key_len):
459         return alg + "-" + str(key_len * 8)
460
461     @property
462     def vpp_ike_cypto_alg(self):
463         return self.concat(self.ike_crypto, self.ike_crypto_key_len)
464
465     @property
466     def vpp_esp_cypto_alg(self):
467         return self.concat(self.esp_crypto, self.esp_crypto_key_len)
468
469     def verify_hmac(self, ikemsg):
470         integ_trunc = self.ike_integ_alg.trunc_len
471         exp_hmac = ikemsg[-integ_trunc:]
472         data = ikemsg[:-integ_trunc]
473         computed_hmac = self.compute_hmac(
474             self.ike_integ_alg.mod(), self.peer_authkey, data
475         )
476         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
477
478     def compute_hmac(self, integ, key, data):
479         h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
480         h.update(data)
481         return h.finalize()
482
483     def decrypt(self, data, aad=None, icv=None):
484         return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
485
486     def hmac_and_decrypt(self, ike):
487         ep = ike[ikev2.IKEv2_payload_Encrypted]
488         if self.ike_crypto == "AES-GCM-16ICV":
489             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
490             ct = ep.load[:-GCM_ICV_SIZE]
491             tag = ep.load[-GCM_ICV_SIZE:]
492             plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
493         else:
494             self.verify_hmac(raw(ike))
495             integ_trunc = self.ike_integ_alg.trunc_len
496
497             # remove ICV and decrypt payload
498             ct = ep.load[:-integ_trunc]
499             plain = self.decrypt(ct)
500         # remove padding
501         pad_len = plain[-1]
502         return plain[: -pad_len - 1]
503
504     def build_ts_addr(self, ts, version):
505         return {
506             "starting_address_v" + version: ts["start_addr"],
507             "ending_address_v" + version: ts["end_addr"],
508         }
509
510     def generate_ts(self, is_ip4):
511         c = self.child_sas[0]
512         ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
513         if is_ip4:
514             ts_data.update(self.build_ts_addr(c.local_ts, "4"))
515             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
516             ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
517             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
518         else:
519             ts_data.update(self.build_ts_addr(c.local_ts, "6"))
520             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
521             ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
522             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
523
524         if self.is_initiator:
525             return ([ts1], [ts2])
526         return ([ts2], [ts1])
527
528     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
529         if crypto not in CRYPTO_ALGOS:
530             raise TypeError("unsupported encryption algo %r" % crypto)
531         self.ike_crypto = crypto
532         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
533         self.ike_crypto_key_len = crypto_key_len
534
535         if integ not in AUTH_ALGOS:
536             raise TypeError("unsupported auth algo %r" % integ)
537         self.ike_integ = None if integ == "NULL" else integ
538         self.ike_integ_alg = AUTH_ALGOS[integ]
539
540         if prf not in PRF_ALGOS:
541             raise TypeError("unsupported prf algo %r" % prf)
542         self.ike_prf = prf
543         self.ike_prf_alg = PRF_ALGOS[prf]
544         self.ike_dh = dh
545         self.ike_group = DH[self.ike_dh]
546
547     def set_esp_props(self, crypto, crypto_key_len, integ):
548         self.esp_crypto_key_len = crypto_key_len
549         if crypto not in CRYPTO_ALGOS:
550             raise TypeError("unsupported encryption algo %r" % crypto)
551         self.esp_crypto = crypto
552         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
553
554         if integ not in AUTH_ALGOS:
555             raise TypeError("unsupported auth algo %r" % integ)
556         self.esp_integ = None if integ == "NULL" else integ
557         self.esp_integ_alg = AUTH_ALGOS[integ]
558
559     def crypto_attr(self, key_len):
560         if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
561             return (0x800E << 16 | key_len << 3, 12)
562         else:
563             raise Exception("unsupported attribute type")
564
565     def ike_crypto_attr(self):
566         return self.crypto_attr(self.ike_crypto_key_len)
567
568     def esp_crypto_attr(self):
569         return self.crypto_attr(self.esp_crypto_key_len)
570
571     def compute_nat_sha1(self, ip, port, rspi=None):
572         if rspi is None:
573             rspi = self.rspi
574         data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
575         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
576         digest.update(data)
577         return digest.finalize()
578
579
580 class IkePeer(VppTestCase):
581     """common class for initiator and responder"""
582
583     @classmethod
584     def setUpClass(cls):
585         import scapy.contrib.ikev2 as _ikev2
586
587         globals()["ikev2"] = _ikev2
588         super(IkePeer, cls).setUpClass()
589         cls.create_pg_interfaces(range(2))
590         for i in cls.pg_interfaces:
591             i.admin_up()
592             i.config_ip4()
593             i.resolve_arp()
594             i.config_ip6()
595             i.resolve_ndp()
596
597     @classmethod
598     def tearDownClass(cls):
599         super(IkePeer, cls).tearDownClass()
600
601     def tearDown(self):
602         super(IkePeer, self).tearDown()
603         if self.del_sa_from_responder:
604             self.initiate_del_sa_from_responder()
605         else:
606             self.initiate_del_sa_from_initiator()
607         r = self.vapi.ikev2_sa_dump()
608         self.assertEqual(len(r), 0)
609         sas = self.vapi.ipsec_sa_dump()
610         self.assertEqual(len(sas), 0)
611         self.p.remove_vpp_config()
612         self.assertIsNone(self.p.query_vpp_config())
613
614     def setUp(self):
615         super(IkePeer, self).setUp()
616         self.config_tc()
617         self.p.add_vpp_config()
618         self.assertIsNotNone(self.p.query_vpp_config())
619         if self.sa.is_initiator:
620             self.sa.generate_dh_data()
621         self.vapi.cli("ikev2 set logging level 4")
622         self.vapi.cli("event-lo clear")
623
624     def assert_counter(self, count, name, version="ip4"):
625         node_name = "/err/ikev2-%s/" % version + name
626         self.assertEqual(count, self.statistics.get_err_counter(node_name))
627
628     def create_rekey_request(self):
629         sa, first_payload = self.generate_auth_payload(is_rekey=True)
630         header = ikev2.IKEv2(
631             init_SPI=self.sa.ispi,
632             resp_SPI=self.sa.rspi,
633             id=self.sa.new_msg_id(),
634             flags="Initiator",
635             exch_type="CREATE_CHILD_SA",
636         )
637
638         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
639         return self.create_packet(
640             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
641         )
642
643     def create_empty_request(self):
644         header = ikev2.IKEv2(
645             init_SPI=self.sa.ispi,
646             resp_SPI=self.sa.rspi,
647             id=self.sa.new_msg_id(),
648             flags="Initiator",
649             exch_type="INFORMATIONAL",
650             next_payload="Encrypted",
651         )
652
653         msg = self.encrypt_ike_msg(header, b"", None)
654         return self.create_packet(
655             self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
656         )
657
658     def create_packet(
659         self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
660     ):
661         if use_ip6:
662             src_ip = src_if.remote_ip6
663             dst_ip = src_if.local_ip6
664             ip_layer = IPv6
665         else:
666             src_ip = src_if.remote_ip4
667             dst_ip = src_if.local_ip4
668             ip_layer = IP
669         res = (
670             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
671             / ip_layer(src=src_ip, dst=dst_ip)
672             / UDP(sport=sport, dport=dport)
673         )
674         if natt:
675             # insert non ESP marker
676             res = res / Raw(b"\x00" * 4)
677         return res / msg
678
679     def verify_udp(self, udp):
680         self.assertEqual(udp.sport, self.sa.sport)
681         self.assertEqual(udp.dport, self.sa.dport)
682
683     def get_ike_header(self, packet):
684         try:
685             ih = packet[ikev2.IKEv2]
686             ih = self.verify_and_remove_non_esp_marker(ih)
687         except IndexError as e:
688             # this is a workaround for getting IKEv2 layer as both ikev2 and
689             # ipsec register for port 4500
690             esp = packet[ESP]
691             ih = self.verify_and_remove_non_esp_marker(esp)
692         self.assertEqual(ih.version, 0x20)
693         self.assertNotIn("Version", ih.flags)
694         return ih
695
696     def verify_and_remove_non_esp_marker(self, packet):
697         if self.sa.natt:
698             # if we are in nat traversal mode check for non esp marker
699             # and remove it
700             data = raw(packet)
701             self.assertEqual(data[:4], b"\x00" * 4)
702             return ikev2.IKEv2(data[4:])
703         else:
704             return packet
705
706     def encrypt_ike_msg(self, header, plain, first_payload):
707         if self.sa.ike_crypto == "AES-GCM-16ICV":
708             data = self.sa.ike_crypto_alg.pad(raw(plain))
709             plen = (
710                 len(data)
711                 + GCM_IV_SIZE
712                 + GCM_ICV_SIZE
713                 + len(ikev2.IKEv2_payload_Encrypted())
714             )
715             tlen = plen + len(ikev2.IKEv2())
716
717             # prepare aad data
718             sk_p = ikev2.IKEv2_payload_Encrypted(
719                 next_payload=first_payload, length=plen
720             )
721             header.length = tlen
722             res = header / sk_p
723             encr = self.sa.encrypt(raw(plain), raw(res))
724             sk_p = ikev2.IKEv2_payload_Encrypted(
725                 next_payload=first_payload, length=plen, load=encr
726             )
727             res = header / sk_p
728         else:
729             encr = self.sa.encrypt(raw(plain))
730             trunc_len = self.sa.ike_integ_alg.trunc_len
731             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
732             tlen = plen + len(ikev2.IKEv2())
733
734             sk_p = ikev2.IKEv2_payload_Encrypted(
735                 next_payload=first_payload, length=plen, load=encr
736             )
737             header.length = tlen
738             res = header / sk_p
739
740             integ_data = raw(res)
741             hmac_data = self.sa.compute_hmac(
742                 self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
743             )
744             res = res / Raw(hmac_data[:trunc_len])
745         assert len(res) == tlen
746         return res
747
748     def verify_udp_encap(self, ipsec_sa):
749         e = VppEnum.vl_api_ipsec_sad_flags_t
750         if self.sa.udp_encap or self.sa.natt:
751             self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
752         else:
753             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
754
755     def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
756         sas = self.vapi.ipsec_sa_dump()
757         if sa_count is None:
758             if is_rekey:
759                 # after rekey there is a short period of time in which old
760                 # inbound SA is still present
761                 sa_count = 3
762             else:
763                 sa_count = 2
764         self.assertEqual(len(sas), sa_count)
765         if self.sa.is_initiator:
766             if is_rekey:
767                 sa0 = sas[0].entry
768                 sa1 = sas[2].entry
769             else:
770                 sa0 = sas[0].entry
771                 sa1 = sas[1].entry
772         else:
773             if is_rekey:
774                 sa0 = sas[2].entry
775                 sa1 = sas[0].entry
776             else:
777                 sa1 = sas[0].entry
778                 sa0 = sas[1].entry
779
780         c = self.sa.child_sas[0]
781
782         self.verify_udp_encap(sa0)
783         self.verify_udp_encap(sa1)
784         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
785         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
786         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
787
788         if self.sa.esp_integ is None:
789             vpp_integ_alg = 0
790         else:
791             vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
792         self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
793         self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
794
795         # verify crypto keys
796         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
797         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
798         self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
799         self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
800
801         # verify integ keys
802         if vpp_integ_alg:
803             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
804             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
805             self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
806             self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
807         else:
808             self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
809             self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
810
811     def verify_keymat(self, api_keys, keys, name):
812         km = getattr(keys, name)
813         api_km = getattr(api_keys, name)
814         api_km_len = getattr(api_keys, name + "_len")
815         self.assertEqual(len(km), api_km_len)
816         self.assertEqual(km, api_km[:api_km_len])
817
818     def verify_id(self, api_id, exp_id):
819         self.assertEqual(api_id.type, IDType.value(exp_id.type))
820         self.assertEqual(api_id.data_len, exp_id.data_len)
821         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
822
823     def verify_ike_sas(self):
824         r = self.vapi.ikev2_sa_dump()
825         self.assertEqual(len(r), 1)
826         sa = r[0].sa
827         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
828         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
829         if self.ip6:
830             if self.sa.is_initiator:
831                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
832                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
833             else:
834                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
835                 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
836         else:
837             if self.sa.is_initiator:
838                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
839                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
840             else:
841                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
842                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
843         self.verify_keymat(sa.keys, self.sa, "sk_d")
844         self.verify_keymat(sa.keys, self.sa, "sk_ai")
845         self.verify_keymat(sa.keys, self.sa, "sk_ar")
846         self.verify_keymat(sa.keys, self.sa, "sk_ei")
847         self.verify_keymat(sa.keys, self.sa, "sk_er")
848         self.verify_keymat(sa.keys, self.sa, "sk_pi")
849         self.verify_keymat(sa.keys, self.sa, "sk_pr")
850
851         self.assertEqual(sa.i_id.type, self.sa.id_type)
852         self.assertEqual(sa.r_id.type, self.sa.id_type)
853         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
854         self.assertEqual(sa.r_id.data_len, len(self.idr))
855         self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
856         self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
857
858         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
859         self.assertEqual(len(r), 1)
860         csa = r[0].child_sa
861         self.assertEqual(csa.sa_index, sa.sa_index)
862         c = self.sa.child_sas[0]
863         if hasattr(c, "sk_ai"):
864             self.verify_keymat(csa.keys, c, "sk_ai")
865             self.verify_keymat(csa.keys, c, "sk_ar")
866         self.verify_keymat(csa.keys, c, "sk_ei")
867         self.verify_keymat(csa.keys, c, "sk_er")
868         self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
869         self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
870
871         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
872         tsi = tsi[0]
873         tsr = tsr[0]
874         r = self.vapi.ikev2_traffic_selector_dump(
875             is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
876         )
877         self.assertEqual(len(r), 1)
878         ts = r[0].ts
879         self.verify_ts(r[0].ts, tsi[0], True)
880
881         r = self.vapi.ikev2_traffic_selector_dump(
882             is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
883         )
884         self.assertEqual(len(r), 1)
885         self.verify_ts(r[0].ts, tsr[0], False)
886
887         n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
888         self.verify_nonce(n, self.sa.i_nonce)
889         n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
890         self.verify_nonce(n, self.sa.r_nonce)
891
892     def verify_nonce(self, api_nonce, nonce):
893         self.assertEqual(api_nonce.data_len, len(nonce))
894         self.assertEqual(api_nonce.nonce, nonce)
895
896     def verify_ts(self, api_ts, ts, is_initiator):
897         if is_initiator:
898             self.assertTrue(api_ts.is_local)
899         else:
900             self.assertFalse(api_ts.is_local)
901
902         if self.p.ts_is_ip4:
903             self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
904             self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
905         else:
906             self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
907             self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
908         self.assertEqual(api_ts.start_port, ts.start_port)
909         self.assertEqual(api_ts.end_port, ts.end_port)
910         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
911
912
913 class TemplateInitiator(IkePeer):
914     """initiator test template"""
915
916     def initiate_del_sa_from_initiator(self):
917         ispi = int.from_bytes(self.sa.ispi, "little")
918         self.pg0.enable_capture()
919         self.pg_start()
920         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
921         capture = self.pg0.get_capture(1)
922         ih = self.get_ike_header(capture[0])
923         self.assertNotIn("Response", ih.flags)
924         self.assertIn("Initiator", ih.flags)
925         self.assertEqual(ih.init_SPI, self.sa.ispi)
926         self.assertEqual(ih.resp_SPI, self.sa.rspi)
927         plain = self.sa.hmac_and_decrypt(ih)
928         d = ikev2.IKEv2_payload_Delete(plain)
929         self.assertEqual(d.proto, 1)  # proto=IKEv2
930         header = ikev2.IKEv2(
931             init_SPI=self.sa.ispi,
932             resp_SPI=self.sa.rspi,
933             flags="Response",
934             exch_type="INFORMATIONAL",
935             id=ih.id,
936             next_payload="Encrypted",
937         )
938         resp = self.encrypt_ike_msg(header, b"", None)
939         self.send_and_assert_no_replies(self.pg0, resp)
940
941     def verify_del_sa(self, packet):
942         ih = self.get_ike_header(packet)
943         self.assertEqual(ih.id, self.sa.msg_id)
944         self.assertEqual(ih.exch_type, 37)  # exchange informational
945         self.assertIn("Response", ih.flags)
946         self.assertIn("Initiator", ih.flags)
947         plain = self.sa.hmac_and_decrypt(ih)
948         self.assertEqual(plain, b"")
949
950     def initiate_del_sa_from_responder(self):
951         header = ikev2.IKEv2(
952             init_SPI=self.sa.ispi,
953             resp_SPI=self.sa.rspi,
954             exch_type="INFORMATIONAL",
955             id=self.sa.new_msg_id(),
956         )
957         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
958         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
959         packet = self.create_packet(
960             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
961         )
962         self.pg0.add_stream(packet)
963         self.pg0.enable_capture()
964         self.pg_start()
965         capture = self.pg0.get_capture(1)
966         self.verify_del_sa(capture[0])
967
968     @staticmethod
969     def find_notify_payload(packet, notify_type):
970         n = packet[ikev2.IKEv2_payload_Notify]
971         while n is not None:
972             if n.type == notify_type:
973                 return n
974             n = n.payload
975         return None
976
977     def verify_nat_detection(self, packet):
978         if self.ip6:
979             iph = packet[IPv6]
980         else:
981             iph = packet[IP]
982         udp = packet[UDP]
983
984         # NAT_DETECTION_SOURCE_IP
985         s = self.find_notify_payload(packet, 16388)
986         self.assertIsNotNone(s)
987         src_sha = self.sa.compute_nat_sha1(
988             inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
989         )
990         self.assertEqual(s.load, src_sha)
991
992         # NAT_DETECTION_DESTINATION_IP
993         s = self.find_notify_payload(packet, 16389)
994         self.assertIsNotNone(s)
995         dst_sha = self.sa.compute_nat_sha1(
996             inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
997         )
998         self.assertEqual(s.load, dst_sha)
999
1000     def verify_sa_init_request(self, packet):
1001         udp = packet[UDP]
1002         self.sa.dport = udp.sport
1003         ih = packet[ikev2.IKEv2]
1004         self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
1005         self.assertEqual(ih.exch_type, 34)  # SA_INIT
1006         self.sa.ispi = ih.init_SPI
1007         self.assertEqual(ih.resp_SPI, 8 * b"\x00")
1008         self.assertIn("Initiator", ih.flags)
1009         self.assertNotIn("Response", ih.flags)
1010         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1011         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
1012
1013         prop = packet[ikev2.IKEv2_payload_Proposal]
1014         self.assertEqual(prop.proto, 1)  # proto = ikev2
1015         self.assertEqual(prop.proposal, 1)
1016         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
1017         self.assertEqual(
1018             prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
1019         )
1020         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
1021         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
1022         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
1023         self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
1024
1025         self.verify_nat_detection(packet)
1026         self.sa.set_ike_props(
1027             crypto="AES-GCM-16ICV",
1028             crypto_key_len=32,
1029             integ="NULL",
1030             prf="PRF_HMAC_SHA2_256",
1031             dh="3072MODPgr",
1032         )
1033         self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
1034         self.sa.generate_dh_data()
1035         self.sa.complete_dh_data()
1036         self.sa.calc_keys()
1037
1038     def update_esp_transforms(self, trans, sa):
1039         while trans:
1040             if trans.transform_type == 1:  # ecryption
1041                 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
1042             elif trans.transform_type == 3:  # integrity
1043                 sa.esp_integ = INTEG_IDS[trans.transform_id]
1044             trans = trans.payload
1045
1046     def verify_sa_auth_req(self, packet):
1047         udp = packet[UDP]
1048         self.sa.dport = udp.sport
1049         ih = self.get_ike_header(packet)
1050         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1051         self.assertEqual(ih.init_SPI, self.sa.ispi)
1052         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
1053         self.assertIn("Initiator", ih.flags)
1054         self.assertNotIn("Response", ih.flags)
1055
1056         udp = packet[UDP]
1057         self.verify_udp(udp)
1058         self.assertEqual(ih.id, self.sa.msg_id + 1)
1059         self.sa.msg_id += 1
1060         plain = self.sa.hmac_and_decrypt(ih)
1061         idi = ikev2.IKEv2_payload_IDi(plain)
1062         self.assertEqual(idi.load, self.sa.i_id)
1063         if self.no_idr_auth:
1064             self.assertEqual(idi.next_payload, 39)  # AUTH
1065         else:
1066             idr = ikev2.IKEv2_payload_IDr(idi.payload)
1067             self.assertEqual(idr.load, self.sa.r_id)
1068         prop = idi[ikev2.IKEv2_payload_Proposal]
1069         c = self.sa.child_sas[0]
1070         c.ispi = prop.SPI
1071         self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
1072
1073     def send_init_response(self):
1074         tr_attr = self.sa.ike_crypto_attr()
1075         trans = (
1076             ikev2.IKEv2_payload_Transform(
1077                 transform_type="Encryption",
1078                 transform_id=self.sa.ike_crypto,
1079                 length=tr_attr[1],
1080                 key_length=tr_attr[0],
1081             )
1082             / ikev2.IKEv2_payload_Transform(
1083                 transform_type="Integrity", transform_id=self.sa.ike_integ
1084             )
1085             / ikev2.IKEv2_payload_Transform(
1086                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1087             )
1088             / ikev2.IKEv2_payload_Transform(
1089                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1090             )
1091         )
1092         props = ikev2.IKEv2_payload_Proposal(
1093             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1094         )
1095
1096         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1097         if self.sa.natt:
1098             dst_address = b"\x0a\x0a\x0a\x0a"
1099         else:
1100             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1101         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1102         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1103
1104         self.sa.init_resp_packet = (
1105             ikev2.IKEv2(
1106                 init_SPI=self.sa.ispi,
1107                 resp_SPI=self.sa.rspi,
1108                 exch_type="IKE_SA_INIT",
1109                 flags="Response",
1110             )
1111             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1112             / ikev2.IKEv2_payload_KE(
1113                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1114             )
1115             / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
1116             / ikev2.IKEv2_payload_Notify(
1117                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1118             )
1119             / ikev2.IKEv2_payload_Notify(
1120                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1121             )
1122         )
1123
1124         ike_msg = self.create_packet(
1125             self.pg0,
1126             self.sa.init_resp_packet,
1127             self.sa.sport,
1128             self.sa.dport,
1129             False,
1130             self.ip6,
1131         )
1132         self.pg_send(self.pg0, ike_msg)
1133         capture = self.pg0.get_capture(1)
1134         self.verify_sa_auth_req(capture[0])
1135
1136     def initiate_sa_init(self):
1137         self.pg0.enable_capture()
1138         self.pg_start()
1139         self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1140
1141         capture = self.pg0.get_capture(1)
1142         self.verify_sa_init_request(capture[0])
1143         self.send_init_response()
1144
1145     def send_auth_response(self):
1146         tr_attr = self.sa.esp_crypto_attr()
1147         trans = (
1148             ikev2.IKEv2_payload_Transform(
1149                 transform_type="Encryption",
1150                 transform_id=self.sa.esp_crypto,
1151                 length=tr_attr[1],
1152                 key_length=tr_attr[0],
1153             )
1154             / ikev2.IKEv2_payload_Transform(
1155                 transform_type="Integrity", transform_id=self.sa.esp_integ
1156             )
1157             / ikev2.IKEv2_payload_Transform(
1158                 transform_type="Extended Sequence Number", transform_id="No ESN"
1159             )
1160             / ikev2.IKEv2_payload_Transform(
1161                 transform_type="Extended Sequence Number", transform_id="ESN"
1162             )
1163         )
1164
1165         c = self.sa.child_sas[0]
1166         props = ikev2.IKEv2_payload_Proposal(
1167             proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
1168         )
1169
1170         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1171         plain = (
1172             ikev2.IKEv2_payload_IDi(
1173                 next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1174             )
1175             / ikev2.IKEv2_payload_IDr(
1176                 next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1177             )
1178             / ikev2.IKEv2_payload_AUTH(
1179                 next_payload="SA",
1180                 auth_type=AuthMethod.value(self.sa.auth_method),
1181                 load=self.sa.auth_data,
1182             )
1183             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1184             / ikev2.IKEv2_payload_TSi(
1185                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1186             )
1187             / ikev2.IKEv2_payload_TSr(
1188                 next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
1189             )
1190             / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1191         )
1192
1193         header = ikev2.IKEv2(
1194             init_SPI=self.sa.ispi,
1195             resp_SPI=self.sa.rspi,
1196             id=self.sa.new_msg_id(),
1197             flags="Response",
1198             exch_type="IKE_AUTH",
1199         )
1200
1201         ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
1202         packet = self.create_packet(
1203             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1204         )
1205         self.pg_send(self.pg0, packet)
1206
1207     def test_initiator(self):
1208         self.initiate_sa_init()
1209         self.sa.auth_init()
1210         self.sa.calc_child_keys()
1211         self.send_auth_response()
1212         self.verify_ike_sas()
1213
1214
1215 class TemplateResponder(IkePeer):
1216     """responder test template"""
1217
1218     def initiate_del_sa_from_responder(self):
1219         self.pg0.enable_capture()
1220         self.pg_start()
1221         self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
1222         capture = self.pg0.get_capture(1)
1223         ih = self.get_ike_header(capture[0])
1224         self.assertNotIn("Response", ih.flags)
1225         self.assertNotIn("Initiator", ih.flags)
1226         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
1227         plain = self.sa.hmac_and_decrypt(ih)
1228         d = ikev2.IKEv2_payload_Delete(plain)
1229         self.assertEqual(d.proto, 1)  # proto=IKEv2
1230         self.assertEqual(ih.init_SPI, self.sa.ispi)
1231         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1232         header = ikev2.IKEv2(
1233             init_SPI=self.sa.ispi,
1234             resp_SPI=self.sa.rspi,
1235             flags="Initiator+Response",
1236             exch_type="INFORMATIONAL",
1237             id=ih.id,
1238             next_payload="Encrypted",
1239         )
1240         resp = self.encrypt_ike_msg(header, b"", None)
1241         self.send_and_assert_no_replies(self.pg0, resp)
1242
1243     def verify_del_sa(self, packet):
1244         ih = self.get_ike_header(packet)
1245         self.assertEqual(ih.id, self.sa.msg_id)
1246         self.assertEqual(ih.exch_type, 37)  # exchange informational
1247         self.assertIn("Response", ih.flags)
1248         self.assertNotIn("Initiator", ih.flags)
1249         self.assertEqual(ih.next_payload, 46)  # Encrypted
1250         self.assertEqual(ih.init_SPI, self.sa.ispi)
1251         self.assertEqual(ih.resp_SPI, self.sa.rspi)
1252         plain = self.sa.hmac_and_decrypt(ih)
1253         self.assertEqual(plain, b"")
1254
1255     def initiate_del_sa_from_initiator(self):
1256         header = ikev2.IKEv2(
1257             init_SPI=self.sa.ispi,
1258             resp_SPI=self.sa.rspi,
1259             flags="Initiator",
1260             exch_type="INFORMATIONAL",
1261             id=self.sa.new_msg_id(),
1262         )
1263         del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
1264         ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
1265         packet = self.create_packet(
1266             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1267         )
1268         self.pg0.add_stream(packet)
1269         self.pg0.enable_capture()
1270         self.pg_start()
1271         capture = self.pg0.get_capture(1)
1272         self.verify_del_sa(capture[0])
1273
1274     def send_sa_init_req(self):
1275         tr_attr = self.sa.ike_crypto_attr()
1276         trans = (
1277             ikev2.IKEv2_payload_Transform(
1278                 transform_type="Encryption",
1279                 transform_id=self.sa.ike_crypto,
1280                 length=tr_attr[1],
1281                 key_length=tr_attr[0],
1282             )
1283             / ikev2.IKEv2_payload_Transform(
1284                 transform_type="Integrity", transform_id=self.sa.ike_integ
1285             )
1286             / ikev2.IKEv2_payload_Transform(
1287                 transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
1288             )
1289             / ikev2.IKEv2_payload_Transform(
1290                 transform_type="GroupDesc", transform_id=self.sa.ike_dh
1291             )
1292         )
1293
1294         props = ikev2.IKEv2_payload_Proposal(
1295             proposal=1, proto="IKEv2", trans_nb=4, trans=trans
1296         )
1297
1298         next_payload = None if self.ip6 else "Notify"
1299
1300         self.sa.init_req_packet = (
1301             ikev2.IKEv2(
1302                 init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
1303             )
1304             / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
1305             / ikev2.IKEv2_payload_KE(
1306                 next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
1307             )
1308             / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
1309         )
1310
1311         if not self.ip6:
1312             if self.sa.i_natt:
1313                 src_address = b"\x0a\x0a\x0a\x01"
1314             else:
1315                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1316
1317             if self.sa.r_natt:
1318                 dst_address = b"\x0a\x0a\x0a\x0a"
1319             else:
1320                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1321
1322             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1323             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1324             nat_src_detection = ikev2.IKEv2_payload_Notify(
1325                 type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
1326             )
1327             nat_dst_detection = ikev2.IKEv2_payload_Notify(
1328                 type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
1329             )
1330             self.sa.init_req_packet = (
1331                 self.sa.init_req_packet / nat_src_detection / nat_dst_detection
1332             )
1333
1334         ike_msg = self.create_packet(
1335             self.pg0,
1336             self.sa.init_req_packet,
1337             self.sa.sport,
1338             self.sa.dport,
1339             self.sa.natt,
1340             self.ip6,
1341         )
1342         self.pg0.add_stream(ike_msg)
1343         self.pg0.enable_capture()
1344         self.pg_start()
1345         capture = self.pg0.get_capture(1)
1346         self.verify_sa_init(capture[0])
1347
1348     def generate_auth_payload(self, last_payload=None, is_rekey=False):
1349         tr_attr = self.sa.esp_crypto_attr()
1350         last_payload = last_payload or "Notify"
1351         trans = (
1352             ikev2.IKEv2_payload_Transform(
1353                 transform_type="Encryption",
1354                 transform_id=self.sa.esp_crypto,
1355                 length=tr_attr[1],
1356                 key_length=tr_attr[0],
1357             )
1358             / ikev2.IKEv2_payload_Transform(
1359                 transform_type="Integrity", transform_id=self.sa.esp_integ
1360             )
1361             / ikev2.IKEv2_payload_Transform(
1362                 transform_type="Extended Sequence Number", transform_id="No ESN"
1363             )
1364             / ikev2.IKEv2_payload_Transform(
1365                 transform_type="Extended Sequence Number", transform_id="ESN"
1366             )
1367         )
1368
1369         c = self.sa.child_sas[0]
1370         props = ikev2.IKEv2_payload_Proposal(
1371             proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
1372         )
1373
1374         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1375         plain = (
1376             ikev2.IKEv2_payload_AUTH(
1377                 next_payload="SA",
1378                 auth_type=AuthMethod.value(self.sa.auth_method),
1379                 load=self.sa.auth_data,
1380             )
1381             / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
1382             / ikev2.IKEv2_payload_TSi(
1383                 next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
1384             )
1385             / ikev2.IKEv2_payload_TSr(
1386                 next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
1387             )
1388         )
1389
1390         if is_rekey:
1391             first_payload = "Nonce"
1392             plain = (
1393                 ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
1394                 / plain
1395                 / ikev2.IKEv2_payload_Notify(
1396                     type="REKEY_SA",
1397                     proto="ESP",
1398                     SPI=c.ispi,
1399                     length=8 + len(c.ispi),
1400                     next_payload="Notify",
1401                 )
1402                 / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
1403             )
1404         else:
1405             first_payload = "IDi"
1406             if self.no_idr_auth:
1407                 ids = ikev2.IKEv2_payload_IDi(
1408                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
1409                 )
1410             else:
1411                 ids = ikev2.IKEv2_payload_IDi(
1412                     next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
1413                 ) / ikev2.IKEv2_payload_IDr(
1414                     next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
1415                 )
1416             plain = ids / plain
1417         return plain, first_payload
1418
1419     def send_sa_auth(self):
1420         plain, first_payload = self.generate_auth_payload(last_payload="Notify")
1421         plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
1422         header = ikev2.IKEv2(
1423             init_SPI=self.sa.ispi,
1424             resp_SPI=self.sa.rspi,
1425             id=self.sa.new_msg_id(),
1426             flags="Initiator",
1427             exch_type="IKE_AUTH",
1428         )
1429
1430         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1431         packet = self.create_packet(
1432             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1433         )
1434         self.pg0.add_stream(packet)
1435         self.pg0.enable_capture()
1436         self.pg_start()
1437         capture = self.pg0.get_capture(1)
1438         self.verify_sa_auth_resp(capture[0])
1439
1440     def verify_sa_init(self, packet):
1441         ih = self.get_ike_header(packet)
1442
1443         self.assertEqual(ih.id, self.sa.msg_id)
1444         self.assertEqual(ih.exch_type, 34)
1445         self.assertIn("Response", ih.flags)
1446         self.assertEqual(ih.init_SPI, self.sa.ispi)
1447         self.assertNotEqual(ih.resp_SPI, 0)
1448         self.sa.rspi = ih.resp_SPI
1449         try:
1450             sa = ih[ikev2.IKEv2_payload_SA]
1451             self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1452             self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1453         except IndexError as e:
1454             self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1455             self.logger.error(ih.show())
1456             raise
1457         self.sa.complete_dh_data()
1458         self.sa.calc_keys()
1459         self.sa.auth_init()
1460
1461     def verify_sa_auth_resp(self, packet):
1462         ike = self.get_ike_header(packet)
1463         udp = packet[UDP]
1464         self.verify_udp(udp)
1465         self.assertEqual(ike.id, self.sa.msg_id)
1466         plain = self.sa.hmac_and_decrypt(ike)
1467         idr = ikev2.IKEv2_payload_IDr(plain)
1468         prop = idr[ikev2.IKEv2_payload_Proposal]
1469         self.assertEqual(prop.SPIsize, 4)
1470         self.sa.child_sas[0].rspi = prop.SPI
1471         self.sa.calc_child_keys()
1472
1473     IKE_NODE_SUFFIX = "ip4"
1474
1475     def verify_counters(self):
1476         self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
1477         self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
1478         self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
1479
1480         r = self.vapi.ikev2_sa_dump()
1481         s = r[0].sa.stats
1482         self.assertEqual(1, s.n_sa_auth_req)
1483         self.assertEqual(1, s.n_sa_init_req)
1484
1485     def test_responder(self):
1486         self.send_sa_init_req()
1487         self.send_sa_auth()
1488         self.verify_ipsec_sas()
1489         self.verify_ike_sas()
1490         self.verify_counters()
1491
1492
1493 class Ikev2Params(object):
1494     def config_params(self, params={}):
1495         ec = VppEnum.vl_api_ipsec_crypto_alg_t
1496         ei = VppEnum.vl_api_ipsec_integ_alg_t
1497         self.vpp_enums = {
1498             "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1499             "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1500             "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1501             "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1502             "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1503             "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1504             "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
1505             "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1506             "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1507             "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
1508         }
1509
1510         dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
1511         if dpd_disabled:
1512             self.vapi.cli("ikev2 dpd disable")
1513         self.del_sa_from_responder = (
1514             False
1515             if "del_sa_from_responder" not in params
1516             else params["del_sa_from_responder"]
1517         )
1518         i_natt = False if "i_natt" not in params else params["i_natt"]
1519         r_natt = False if "r_natt" not in params else params["r_natt"]
1520         self.p = Profile(self, "pr1")
1521         self.ip6 = False if "ip6" not in params else params["ip6"]
1522
1523         if "auth" in params and params["auth"] == "rsa-sig":
1524             auth_method = "rsa-sig"
1525             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
1526             self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
1527
1528             client_file = work_dir + params["client-cert"]
1529             server_pem = open(work_dir + params["server-cert"]).read()
1530             client_priv = open(work_dir + params["client-key"]).read()
1531             client_priv = load_pem_private_key(
1532                 str.encode(client_priv), None, default_backend()
1533             )
1534             self.peer_cert = x509.load_pem_x509_certificate(
1535                 str.encode(server_pem), default_backend()
1536             )
1537             self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
1538             auth_data = None
1539         else:
1540             auth_data = b"$3cr3tpa$$w0rd"
1541             self.p.add_auth(method="shared-key", data=auth_data)
1542             auth_method = "shared-key"
1543             client_priv = None
1544
1545         is_init = True if "is_initiator" not in params else params["is_initiator"]
1546         self.no_idr_auth = params.get("no_idr_in_auth", False)
1547
1548         idr = {"id_type": "fqdn", "data": b"vpp.home"}
1549         idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
1550         r_id = self.idr = idr["data"]
1551         i_id = self.idi = idi["data"]
1552         if is_init:
1553             # scapy is initiator, VPP is responder
1554             self.p.add_local_id(**idr)
1555             self.p.add_remote_id(**idi)
1556             if self.no_idr_auth:
1557                 r_id = None
1558         else:
1559             # VPP is initiator, scapy is responder
1560             self.p.add_local_id(**idi)
1561             if not self.no_idr_auth:
1562                 self.p.add_remote_id(**idr)
1563
1564         loc_ts = (
1565             {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
1566             if "loc_ts" not in params
1567             else params["loc_ts"]
1568         )
1569         rem_ts = (
1570             {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
1571             if "rem_ts" not in params
1572             else params["rem_ts"]
1573         )
1574         self.p.add_local_ts(**loc_ts)
1575         self.p.add_remote_ts(**rem_ts)
1576         if "responder" in params:
1577             self.p.add_responder(params["responder"])
1578         if "ike_transforms" in params:
1579             self.p.add_ike_transforms(params["ike_transforms"])
1580         if "esp_transforms" in params:
1581             self.p.add_esp_transforms(params["esp_transforms"])
1582
1583         udp_encap = False if "udp_encap" not in params else params["udp_encap"]
1584         if udp_encap:
1585             self.p.set_udp_encap(True)
1586
1587         if "responder_hostname" in params:
1588             hn = params["responder_hostname"]
1589             self.p.add_responder_hostname(hn)
1590
1591             # configure static dns record
1592             self.vapi.dns_name_server_add_del(
1593                 is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
1594             )
1595             self.vapi.dns_enable_disable(enable=1)
1596
1597             cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
1598             self.vapi.cli(cmd)
1599
1600         self.sa = IKEv2SA(
1601             self,
1602             i_id=i_id,
1603             r_id=r_id,
1604             is_initiator=is_init,
1605             id_type=self.p.local_id["id_type"],
1606             i_natt=i_natt,
1607             r_natt=r_natt,
1608             priv_key=client_priv,
1609             auth_method=auth_method,
1610             nonce=params.get("nonce"),
1611             auth_data=auth_data,
1612             udp_encap=udp_encap,
1613             local_ts=self.p.remote_ts,
1614             remote_ts=self.p.local_ts,
1615         )
1616
1617         if is_init:
1618             ike_crypto = (
1619                 ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
1620             )
1621             ike_integ = (
1622                 "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
1623             )
1624             ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
1625
1626             esp_crypto = (
1627                 ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
1628             )
1629             esp_integ = (
1630                 "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
1631             )
1632
1633             self.sa.set_ike_props(
1634                 crypto=ike_crypto[0],
1635                 crypto_key_len=ike_crypto[1],
1636                 integ=ike_integ,
1637                 prf="PRF_HMAC_SHA2_256",
1638                 dh=ike_dh,
1639             )
1640             self.sa.set_esp_props(
1641                 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
1642             )
1643
1644
1645 class TestApi(VppTestCase):
1646     """Test IKEV2 API"""
1647
1648     @classmethod
1649     def setUpClass(cls):
1650         super(TestApi, cls).setUpClass()
1651
1652     @classmethod
1653     def tearDownClass(cls):
1654         super(TestApi, cls).tearDownClass()
1655
1656     def tearDown(self):
1657         super(TestApi, self).tearDown()
1658         self.p1.remove_vpp_config()
1659         self.p2.remove_vpp_config()
1660         r = self.vapi.ikev2_profile_dump()
1661         self.assertEqual(len(r), 0)
1662
1663     def configure_profile(self, cfg):
1664         p = Profile(self, cfg["name"])
1665         p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
1666         p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
1667         p.add_local_ts(**cfg["loc_ts"])
1668         p.add_remote_ts(**cfg["rem_ts"])
1669         p.add_responder(cfg["responder"])
1670         p.add_ike_transforms(cfg["ike_ts"])
1671         p.add_esp_transforms(cfg["esp_ts"])
1672         p.add_auth(**cfg["auth"])
1673         p.set_udp_encap(cfg["udp_encap"])
1674         p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
1675         if "lifetime_data" in cfg:
1676             p.set_lifetime_data(cfg["lifetime_data"])
1677         if "tun_itf" in cfg:
1678             p.set_tunnel_interface(cfg["tun_itf"])
1679         if "natt_disabled" in cfg and cfg["natt_disabled"]:
1680             p.disable_natt()
1681         p.add_vpp_config()
1682         return p
1683
1684     def test_profile_api(self):
1685         """test profile dump API"""
1686         loc_ts4 = {
1687             "proto": 8,
1688             "start_port": 1,
1689             "end_port": 19,
1690             "start_addr": "3.3.3.2",
1691             "end_addr": "3.3.3.3",
1692         }
1693         rem_ts4 = {
1694             "proto": 9,
1695             "start_port": 10,
1696             "end_port": 119,
1697             "start_addr": "4.5.76.80",
1698             "end_addr": "2.3.4.6",
1699         }
1700
1701         loc_ts6 = {
1702             "proto": 8,
1703             "start_port": 1,
1704             "end_port": 19,
1705             "start_addr": "ab::1",
1706             "end_addr": "ab::4",
1707         }
1708         rem_ts6 = {
1709             "proto": 9,
1710             "start_port": 10,
1711             "end_port": 119,
1712             "start_addr": "cd::12",
1713             "end_addr": "cd::13",
1714         }
1715
1716         conf = {
1717             "p1": {
1718                 "name": "p1",
1719                 "natt_disabled": True,
1720                 "loc_id": ("fqdn", b"vpp.home"),
1721                 "rem_id": ("fqdn", b"roadwarrior.example.com"),
1722                 "loc_ts": loc_ts4,
1723                 "rem_ts": rem_ts4,
1724                 "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
1725                 "ike_ts": {
1726                     "crypto_alg": 20,
1727                     "crypto_key_size": 32,
1728                     "integ_alg": 0,
1729                     "dh_group": 1,
1730                 },
1731                 "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
1732                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1733                 "udp_encap": True,
1734                 "ipsec_over_udp_port": 4501,
1735                 "lifetime_data": {
1736                     "lifetime": 123,
1737                     "lifetime_maxdata": 20192,
1738                     "lifetime_jitter": 9,
1739                     "handover": 132,
1740                 },
1741             },
1742             "p2": {
1743                 "name": "p2",
1744                 "loc_id": ("ip4-addr", b"192.168.2.1"),
1745                 "rem_id": ("ip6-addr", b"abcd::1"),
1746                 "loc_ts": loc_ts6,
1747                 "rem_ts": rem_ts6,
1748                 "responder": {"sw_if_index": 4, "addr": "def::10"},
1749                 "ike_ts": {
1750                     "crypto_alg": 12,
1751                     "crypto_key_size": 16,
1752                     "integ_alg": 3,
1753                     "dh_group": 3,
1754                 },
1755                 "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
1756                 "auth": {"method": "shared-key", "data": b"sharedkeydata"},
1757                 "udp_encap": False,
1758                 "ipsec_over_udp_port": 4600,
1759                 "tun_itf": 0,
1760             },
1761         }
1762         self.p1 = self.configure_profile(conf["p1"])
1763         self.p2 = self.configure_profile(conf["p2"])
1764
1765         r = self.vapi.ikev2_profile_dump()
1766         self.assertEqual(len(r), 2)
1767         self.verify_profile(r[0].profile, conf["p1"])
1768         self.verify_profile(r[1].profile, conf["p2"])
1769
1770     def verify_id(self, api_id, cfg_id):
1771         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1772         self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
1773
1774     def verify_ts(self, api_ts, cfg_ts):
1775         self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
1776         self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
1777         self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
1778         self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
1779         self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
1780
1781     def verify_responder(self, api_r, cfg_r):
1782         self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
1783         self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
1784
1785     def verify_transforms(self, api_ts, cfg_ts):
1786         self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
1787         self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
1788         self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
1789
1790     def verify_ike_transforms(self, api_ts, cfg_ts):
1791         self.verify_transforms(api_ts, cfg_ts)
1792         self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
1793
1794     def verify_esp_transforms(self, api_ts, cfg_ts):
1795         self.verify_transforms(api_ts, cfg_ts)
1796
1797     def verify_auth(self, api_auth, cfg_auth):
1798         self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
1799         self.assertEqual(api_auth.data, cfg_auth["data"])
1800         self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
1801
1802     def verify_lifetime_data(self, p, ld):
1803         self.assertEqual(p.lifetime, ld["lifetime"])
1804         self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
1805         self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
1806         self.assertEqual(p.handover, ld["handover"])
1807
1808     def verify_profile(self, ap, cp):
1809         self.assertEqual(ap.name, cp["name"])
1810         self.assertEqual(ap.udp_encap, cp["udp_encap"])
1811         self.verify_id(ap.loc_id, cp["loc_id"])
1812         self.verify_id(ap.rem_id, cp["rem_id"])
1813         self.verify_ts(ap.loc_ts, cp["loc_ts"])
1814         self.verify_ts(ap.rem_ts, cp["rem_ts"])
1815         self.verify_responder(ap.responder, cp["responder"])
1816         self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
1817         self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
1818         self.verify_auth(ap.auth, cp["auth"])
1819         natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
1820         self.assertTrue(natt_dis == ap.natt_disabled)
1821
1822         if "lifetime_data" in cp:
1823             self.verify_lifetime_data(ap, cp["lifetime_data"])
1824         self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
1825         if "tun_itf" in cp:
1826             self.assertEqual(ap.tun_itf, cp["tun_itf"])
1827         else:
1828             self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
1829
1830
1831 @tag_fixme_vpp_workers
1832 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1833     """test responder - responder behind NAT"""
1834
1835     IKE_NODE_SUFFIX = "ip4-natt"
1836
1837     def config_tc(self):
1838         self.config_params({"r_natt": True})
1839
1840
1841 @tag_fixme_vpp_workers
1842 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1843     """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
1844
1845     def config_tc(self):
1846         self.config_params(
1847             {
1848                 "i_natt": True,
1849                 "is_initiator": False,  # seen from test case perspective
1850                 # thus vpp is initiator
1851                 "responder": {
1852                     "sw_if_index": self.pg0.sw_if_index,
1853                     "addr": self.pg0.remote_ip4,
1854                 },
1855                 "ike-crypto": ("AES-GCM-16ICV", 32),
1856                 "ike-integ": "NULL",
1857                 "ike-dh": "3072MODPgr",
1858                 "ike_transforms": {
1859                     "crypto_alg": 20,  # "aes-gcm-16"
1860                     "crypto_key_size": 256,
1861                     "dh_group": 15,  # "modp-3072"
1862                 },
1863                 "esp_transforms": {
1864                     "crypto_alg": 12,  # "aes-cbc"
1865                     "crypto_key_size": 256,
1866                     # "hmac-sha2-256-128"
1867                     "integ_alg": 12,
1868                 },
1869             }
1870         )
1871
1872
1873 @tag_fixme_vpp_workers
1874 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1875     """test ikev2 initiator - pre shared key auth"""
1876
1877     def config_tc(self):
1878         self.config_params(
1879             {
1880                 "is_initiator": False,  # seen from test case perspective
1881                 # thus vpp is initiator
1882                 "ike-crypto": ("AES-GCM-16ICV", 32),
1883                 "ike-integ": "NULL",
1884                 "ike-dh": "3072MODPgr",
1885                 "ike_transforms": {
1886                     "crypto_alg": 20,  # "aes-gcm-16"
1887                     "crypto_key_size": 256,
1888                     "dh_group": 15,  # "modp-3072"
1889                 },
1890                 "esp_transforms": {
1891                     "crypto_alg": 12,  # "aes-cbc"
1892                     "crypto_key_size": 256,
1893                     # "hmac-sha2-256-128"
1894                     "integ_alg": 12,
1895                 },
1896                 "responder_hostname": {
1897                     "hostname": "vpp.responder.org",
1898                     "sw_if_index": self.pg0.sw_if_index,
1899                 },
1900             }
1901         )
1902
1903
1904 @tag_fixme_vpp_workers
1905 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1906     """test initiator - request window size (1)"""
1907
1908     def rekey_respond(self, req, update_child_sa_data):
1909         ih = self.get_ike_header(req)
1910         plain = self.sa.hmac_and_decrypt(ih)
1911         sa = ikev2.IKEv2_payload_SA(plain)
1912         if update_child_sa_data:
1913             prop = sa[ikev2.IKEv2_payload_Proposal]
1914             self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1915             self.sa.r_nonce = self.sa.i_nonce
1916             self.sa.child_sas[0].ispi = prop.SPI
1917             self.sa.child_sas[0].rspi = prop.SPI
1918             self.sa.calc_child_keys()
1919
1920         header = ikev2.IKEv2(
1921             init_SPI=self.sa.ispi,
1922             resp_SPI=self.sa.rspi,
1923             flags="Response",
1924             exch_type=36,
1925             id=ih.id,
1926             next_payload="Encrypted",
1927         )
1928         resp = self.encrypt_ike_msg(header, sa, "SA")
1929         packet = self.create_packet(
1930             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1931         )
1932         self.send_and_assert_no_replies(self.pg0, packet)
1933
1934     def test_initiator(self):
1935         super(TestInitiatorRequestWindowSize, self).test_initiator()
1936         self.pg0.enable_capture()
1937         self.pg_start()
1938         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1939         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1940         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1941         capture = self.pg0.get_capture(2)
1942
1943         # reply in reverse order
1944         self.rekey_respond(capture[1], True)
1945         self.rekey_respond(capture[0], False)
1946
1947         # verify that only the second request was accepted
1948         self.verify_ike_sas()
1949         self.verify_ipsec_sas(is_rekey=True)
1950
1951
1952 @tag_fixme_vpp_workers
1953 class TestInitiatorRekey(TestInitiatorPsk):
1954     """test ikev2 initiator - rekey"""
1955
1956     def rekey_from_initiator(self):
1957         ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
1958         self.pg0.enable_capture()
1959         self.pg_start()
1960         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1961         capture = self.pg0.get_capture(1)
1962         ih = self.get_ike_header(capture[0])
1963         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
1964         self.assertNotIn("Response", ih.flags)
1965         self.assertIn("Initiator", ih.flags)
1966         plain = self.sa.hmac_and_decrypt(ih)
1967         sa = ikev2.IKEv2_payload_SA(plain)
1968         prop = sa[ikev2.IKEv2_payload_Proposal]
1969         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1970         self.sa.r_nonce = self.sa.i_nonce
1971         # update new responder SPI
1972         self.sa.child_sas[0].ispi = prop.SPI
1973         self.sa.child_sas[0].rspi = prop.SPI
1974         self.sa.calc_child_keys()
1975         header = ikev2.IKEv2(
1976             init_SPI=self.sa.ispi,
1977             resp_SPI=self.sa.rspi,
1978             flags="Response",
1979             exch_type=36,
1980             id=ih.id,
1981             next_payload="Encrypted",
1982         )
1983         resp = self.encrypt_ike_msg(header, sa, "SA")
1984         packet = self.create_packet(
1985             self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
1986         )
1987         self.send_and_assert_no_replies(self.pg0, packet)
1988
1989     def test_initiator(self):
1990         super(TestInitiatorRekey, self).test_initiator()
1991         self.rekey_from_initiator()
1992         self.verify_ike_sas()
1993         self.verify_ipsec_sas(is_rekey=True)
1994
1995
1996 @tag_fixme_vpp_workers
1997 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1998     """test ikev2 initiator - delete IKE SA from responder"""
1999
2000     def config_tc(self):
2001         self.config_params(
2002             {
2003                 "del_sa_from_responder": True,
2004                 "is_initiator": False,  # seen from test case perspective
2005                 # thus vpp is initiator
2006                 "responder": {
2007                     "sw_if_index": self.pg0.sw_if_index,
2008                     "addr": self.pg0.remote_ip4,
2009                 },
2010                 "ike-crypto": ("AES-GCM-16ICV", 32),
2011                 "ike-integ": "NULL",
2012                 "ike-dh": "3072MODPgr",
2013                 "ike_transforms": {
2014                     "crypto_alg": 20,  # "aes-gcm-16"
2015                     "crypto_key_size": 256,
2016                     "dh_group": 15,  # "modp-3072"
2017                 },
2018                 "esp_transforms": {
2019                     "crypto_alg": 12,  # "aes-cbc"
2020                     "crypto_key_size": 256,
2021                     # "hmac-sha2-256-128"
2022                     "integ_alg": 12,
2023                 },
2024                 "no_idr_in_auth": True,
2025             }
2026         )
2027
2028
2029 @tag_fixme_vpp_workers
2030 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
2031     """test ikev2 responder - initiator behind NAT"""
2032
2033     IKE_NODE_SUFFIX = "ip4-natt"
2034
2035     def config_tc(self):
2036         self.config_params({"i_natt": True})
2037
2038
2039 @tag_fixme_vpp_workers
2040 class TestResponderPsk(TemplateResponder, Ikev2Params):
2041     """test ikev2 responder - pre shared key auth"""
2042
2043     def config_tc(self):
2044         self.config_params()
2045
2046
2047 @tag_fixme_vpp_workers
2048 class TestResponderDpd(TestResponderPsk):
2049     """
2050     Dead peer detection test
2051     """
2052
2053     def config_tc(self):
2054         self.config_params({"dpd_disabled": False})
2055
2056     def tearDown(self):
2057         pass
2058
2059     def test_responder(self):
2060         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2061         super(TestResponderDpd, self).test_responder()
2062         self.pg0.enable_capture()
2063         self.pg_start()
2064         # capture empty request but don't reply
2065         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2066         ih = self.get_ike_header(capture[0])
2067         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2068         plain = self.sa.hmac_and_decrypt(ih)
2069         self.assertEqual(plain, b"")
2070         # wait for SA expiration
2071         time.sleep(3)
2072         ike_sas = self.vapi.ikev2_sa_dump()
2073         self.assertEqual(len(ike_sas), 0)
2074         ipsec_sas = self.vapi.ipsec_sa_dump()
2075         self.assertEqual(len(ipsec_sas), 0)
2076
2077
2078 @tag_fixme_vpp_workers
2079 class TestResponderRekey(TestResponderPsk):
2080     """test ikev2 responder - rekey"""
2081
2082     def send_rekey_from_initiator(self):
2083         packet = self.create_rekey_request()
2084         self.pg0.add_stream(packet)
2085         self.pg0.enable_capture()
2086         self.pg_start()
2087         capture = self.pg0.get_capture(1)
2088         return capture
2089
2090     def process_rekey_response(self, capture):
2091         ih = self.get_ike_header(capture[0])
2092         plain = self.sa.hmac_and_decrypt(ih)
2093         sa = ikev2.IKEv2_payload_SA(plain)
2094         prop = sa[ikev2.IKEv2_payload_Proposal]
2095         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
2096         # update new responder SPI
2097         self.sa.child_sas[0].rspi = prop.SPI
2098
2099     def test_responder(self):
2100         super(TestResponderRekey, self).test_responder()
2101         self.process_rekey_response(self.send_rekey_from_initiator())
2102         self.sa.calc_child_keys()
2103         self.verify_ike_sas()
2104         self.verify_ipsec_sas(is_rekey=True)
2105         self.assert_counter(1, "rekey_req", "ip4")
2106         r = self.vapi.ikev2_sa_dump()
2107         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
2108
2109
2110 @tag_fixme_vpp_workers
2111 class TestResponderRekeyRepeat(TestResponderRekey):
2112     """test ikev2 responder - rekey repeat"""
2113
2114     def test_responder(self):
2115         super(TestResponderRekeyRepeat, self).test_responder()
2116         # rekey request is not accepted until old IPsec SA is expired
2117         capture = self.send_rekey_from_initiator()
2118         ih = self.get_ike_header(capture[0])
2119         plain = self.sa.hmac_and_decrypt(ih)
2120         notify = ikev2.IKEv2_payload_Notify(plain)
2121         self.assertEqual(notify.type, 43)
2122         self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
2123         # rekey request is accepted after old IPsec SA was expired
2124         for _ in range(50):
2125             if len(self.vapi.ipsec_sa_dump()) != 3:
2126                 break
2127             time.sleep(0.2)
2128         else:
2129             self.fail("old IPsec SA not expired")
2130         self.process_rekey_response(self.send_rekey_from_initiator())
2131         self.sa.calc_child_keys()
2132         self.verify_ike_sas()
2133         self.verify_ipsec_sas(sa_count=3)
2134
2135
2136 class TestResponderVrf(TestResponderPsk, Ikev2Params):
2137     """test ikev2 responder - non-default table id"""
2138
2139     @classmethod
2140     def setUpClass(cls):
2141         import scapy.contrib.ikev2 as _ikev2
2142
2143         globals()["ikev2"] = _ikev2
2144         super(IkePeer, cls).setUpClass()
2145         cls.create_pg_interfaces(range(1))
2146         cls.vapi.cli("ip table add 1")
2147         cls.vapi.cli("set interface ip table pg0 1")
2148         for i in cls.pg_interfaces:
2149             i.admin_up()
2150             i.config_ip4()
2151             i.resolve_arp()
2152             i.config_ip6()
2153             i.resolve_ndp()
2154
2155     def config_tc(self):
2156         self.config_params({"dpd_disabled": False})
2157
2158     def test_responder(self):
2159         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
2160         super(TestResponderVrf, self).test_responder()
2161         self.pg0.enable_capture()
2162         self.pg_start()
2163         capture = self.pg0.get_capture(expected_count=1, timeout=5)
2164         ih = self.get_ike_header(capture[0])
2165         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
2166         plain = self.sa.hmac_and_decrypt(ih)
2167         self.assertEqual(plain, b"")
2168
2169
2170 @tag_fixme_vpp_workers
2171 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
2172     """test ikev2 responder - cert based auth"""
2173
2174     def config_tc(self):
2175         self.config_params(
2176             {
2177                 "udp_encap": True,
2178                 "auth": "rsa-sig",
2179                 "server-key": "server-key.pem",
2180                 "client-key": "client-key.pem",
2181                 "client-cert": "client-cert.pem",
2182                 "server-cert": "server-cert.pem",
2183             }
2184         )
2185
2186
2187 @tag_fixme_vpp_workers
2188 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
2189     TemplateResponder, Ikev2Params
2190 ):
2191     """
2192     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
2193     """
2194
2195     def config_tc(self):
2196         self.config_params(
2197             {
2198                 "ike-crypto": ("AES-CBC", 16),
2199                 "ike-integ": "SHA2-256-128",
2200                 "esp-crypto": ("AES-CBC", 24),
2201                 "esp-integ": "SHA2-384-192",
2202                 "ike-dh": "2048MODPgr",
2203                 "nonce": os.urandom(256),
2204                 "no_idr_in_auth": True,
2205             }
2206         )
2207
2208
2209 @tag_fixme_vpp_workers
2210 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
2211     TemplateResponder, Ikev2Params
2212 ):
2213
2214     """
2215     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
2216     """
2217
2218     def config_tc(self):
2219         self.config_params(
2220             {
2221                 "ike-crypto": ("AES-CBC", 32),
2222                 "ike-integ": "SHA2-256-128",
2223                 "esp-crypto": ("AES-GCM-16ICV", 32),
2224                 "esp-integ": "NULL",
2225                 "ike-dh": "3072MODPgr",
2226             }
2227         )
2228
2229
2230 @tag_fixme_vpp_workers
2231 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2232     """
2233     IKE:AES_GCM_16_256
2234     """
2235
2236     IKE_NODE_SUFFIX = "ip6"
2237
2238     def config_tc(self):
2239         self.config_params(
2240             {
2241                 "del_sa_from_responder": True,
2242                 "ip6": True,
2243                 "natt": True,
2244                 "ike-crypto": ("AES-GCM-16ICV", 32),
2245                 "ike-integ": "NULL",
2246                 "ike-dh": "2048MODPgr",
2247                 "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
2248                 "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
2249             }
2250         )
2251
2252
2253 @tag_fixme_vpp_workers
2254 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2255     """
2256     Test for keep alive messages
2257     """
2258
2259     def send_empty_req_from_responder(self):
2260         packet = self.create_empty_request()
2261         self.pg0.add_stream(packet)
2262         self.pg0.enable_capture()
2263         self.pg_start()
2264         capture = self.pg0.get_capture(1)
2265         ih = self.get_ike_header(capture[0])
2266         self.assertEqual(ih.id, self.sa.msg_id)
2267         plain = self.sa.hmac_and_decrypt(ih)
2268         self.assertEqual(plain, b"")
2269         self.assert_counter(1, "keepalive", "ip4")
2270         r = self.vapi.ikev2_sa_dump()
2271         self.assertEqual(1, r[0].sa.stats.n_keepalives)
2272
2273     def test_initiator(self):
2274         super(TestInitiatorKeepaliveMsg, self).test_initiator()
2275         self.send_empty_req_from_responder()
2276
2277
2278 class TestMalformedMessages(TemplateResponder, Ikev2Params):
2279     """malformed packet test"""
2280
2281     def tearDown(self):
2282         pass
2283
2284     def config_tc(self):
2285         self.config_params()
2286
2287     def create_ike_init_msg(self, length=None, payload=None):
2288         msg = ikev2.IKEv2(
2289             length=length,
2290             init_SPI="\x11" * 8,
2291             flags="Initiator",
2292             exch_type="IKE_SA_INIT",
2293         )
2294         if payload is not None:
2295             msg /= payload
2296         return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
2297
2298     def verify_bad_packet_length(self):
2299         ike_msg = self.create_ike_init_msg(length=0xDEAD)
2300         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2301         self.assert_counter(self.pkt_count, "bad_length")
2302
2303     def verify_bad_sa_payload_length(self):
2304         p = ikev2.IKEv2_payload_SA(length=0xDEAD)
2305         ike_msg = self.create_ike_init_msg(payload=p)
2306         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
2307         self.assert_counter(self.pkt_count, "malformed_packet")
2308
2309     def test_responder(self):
2310         self.pkt_count = 254
2311         self.verify_bad_packet_length()
2312         self.verify_bad_sa_payload_length()
2313
2314
2315 if __name__ == "__main__":
2316     unittest.main(testRunner=VppTestRunner)