X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ikev2.py;h=5b699dd0d8d3479b0f4fba6d8a566ef29722e1e0;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=438a674977f77bab7f93a94a6e3bf336d0083d3d;hpb=c7cceeebb738b0fabd93d2c4fdfd561321a2be1d;p=vpp.git diff --git a/test/test_ikev2.py b/test/test_ikev2.py index 438a674977f..5b699dd0d8d 100644 --- a/test/test_ikev2.py +++ b/test/test_ikev2.py @@ -13,6 +13,7 @@ from cryptography.hazmat.primitives.ciphers import ( ) from ipaddress import IPv4Address, IPv6Address, ip_address import unittest +from config import config from scapy.layers.ipsec import ESP from scapy.layers.inet import IP, UDP, Ether from scapy.layers.inet6 import IPv6 @@ -37,7 +38,9 @@ GCM_IV_SIZE = 8 # defined in rfc3526 # tuple structure is (p, g, key_len) DH = { - '2048MODPgr': (long_converter(""" + "2048MODPgr": ( + long_converter( + """ FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245 @@ -48,9 +51,14 @@ DH = { 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510 - 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256), - - '3072MODPgr': (long_converter(""" + 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF""" + ), + 2, + 256, + ), + "3072MODPgr": ( + long_converter( + """ FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245 @@ -66,7 +74,11 @@ DH = { ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31 - 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384) + 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF""" + ), + 2, + 384, + ), } @@ -78,7 +90,7 @@ class CryptoAlgo(object): if self.cipher is not None: self.bs = self.cipher.block_size // 8 - if self.name == 'AES-GCM-16ICV': + if self.name == "AES-GCM-16ICV": self.iv_len = GCM_IV_SIZE else: self.iv_len = self.bs @@ -86,14 +98,16 @@ class CryptoAlgo(object): def encrypt(self, data, key, aad=None): iv = os.urandom(self.iv_len) if aad is None: - encryptor = Cipher(self.cipher(key), self.mode(iv), - default_backend()).encryptor() + encryptor = Cipher( + self.cipher(key), self.mode(iv), default_backend() + ).encryptor() return iv + encryptor.update(data) + encryptor.finalize() else: salt = key[-SALT_SIZE:] nonce = salt + iv - encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce), - default_backend()).encryptor() + encryptor = Cipher( + self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend() + ).encryptor() encryptor.authenticate_additional_data(aad) data = encryptor.update(data) + encryptor.finalize() data += encryptor.tag[:GCM_ICV_SIZE] @@ -101,26 +115,26 @@ class CryptoAlgo(object): def decrypt(self, data, key, aad=None, icv=None): if aad is None: - iv = data[:self.iv_len] - ct = data[self.iv_len:] - decryptor = Cipher(algorithms.AES(key), - self.mode(iv), - default_backend()).decryptor() + iv = data[: self.iv_len] + ct = data[self.iv_len :] + decryptor = Cipher( + algorithms.AES(key), self.mode(iv), default_backend() + ).decryptor() return decryptor.update(ct) + decryptor.finalize() else: salt = key[-SALT_SIZE:] nonce = salt + data[:GCM_IV_SIZE] ct = data[GCM_IV_SIZE:] key = key[:-SALT_SIZE] - decryptor = Cipher(algorithms.AES(key), - self.mode(nonce, icv, len(icv)), - default_backend()).decryptor() + decryptor = Cipher( + algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend() + ).decryptor() decryptor.authenticate_additional_data(aad) return decryptor.update(ct) + decryptor.finalize() def pad(self, data): pad_len = (len(data) // self.bs + 1) * self.bs - len(data) - data = data + b'\x00' * (pad_len - 1) + data = data + b"\x00" * (pad_len - 1) return data + bytes([pad_len - 1]) @@ -134,36 +148,34 @@ class AuthAlgo(object): CRYPTO_ALGOS = { - 'NULL': CryptoAlgo('NULL', cipher=None, mode=None), - 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC), - 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES, - mode=modes.GCM), + "NULL": CryptoAlgo("NULL", cipher=None, mode=None), + "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC), + "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM), } AUTH_ALGOS = { - 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0), - 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12), - 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16), - 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24), - 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32), + "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0), + "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12), + "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16), + "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24), + "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32), } PRF_ALGOS = { - 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0), - 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC, - hashes.SHA256, 32), + "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0), + "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32), } CRYPTO_IDS = { - 12: 'AES-CBC', - 20: 'AES-GCM-16ICV', + 12: "AES-CBC", + 20: "AES-GCM-16ICV", } INTEG_IDS = { - 2: 'HMAC-SHA1-96', - 12: 'SHA2-256-128', - 13: 'SHA2-384-192', - 14: 'SHA2-512-256', + 2: "HMAC-SHA1-96", + 12: "SHA2-256-128", + 13: "SHA2-384-192", + 14: "SHA2-512-256", } @@ -181,11 +193,24 @@ class IKEv2ChildSA(object): class IKEv2SA(object): - def __init__(self, test, is_initiator=True, i_id=None, r_id=None, - spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn', - nonce=None, auth_data=None, local_ts=None, remote_ts=None, - auth_method='shared-key', priv_key=None, i_natt=False, - r_natt=False, udp_encap=False): + def __init__( + self, + test, + is_initiator=True, + i_id=None, + r_id=None, + spi=b"\x01\x02\x03\x04\x05\x06\x07\x08", + id_type="fqdn", + nonce=None, + auth_data=None, + local_ts=None, + remote_ts=None, + auth_method="shared-key", + priv_key=None, + i_natt=False, + r_natt=False, + udp_encap=False, + ): self.udp_encap = udp_encap self.i_natt = i_natt self.r_natt = r_natt @@ -210,15 +235,14 @@ class IKEv2SA(object): self.id_type = id_type self.auth_method = auth_method if self.is_initiator: - self.rspi = 8 * b'\x00' + self.rspi = 8 * b"\x00" self.ispi = spi self.i_nonce = nonce else: self.rspi = spi - self.ispi = 8 * b'\x00' + self.ispi = 8 * b"\x00" self.r_nonce = nonce - self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, - self.is_initiator)] + self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)] def new_msg_id(self): self.msg_id += 1 @@ -244,13 +268,14 @@ class IKEv2SA(object): priv = self.dh_private_key peer = self.peer_dh_pub_key p, g, l = self.ike_group - return pow(int.from_bytes(peer, 'big'), - int.from_bytes(priv, 'big'), p).to_bytes(l, 'big') + return pow( + int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p + ).to_bytes(l, "big") def generate_dh_data(self): # generate DH keys if self.ike_dh not in DH: - raise NotImplementedError('%s not in DH group' % self.ike_dh) + raise NotImplementedError("%s not in DH group" % self.ike_dh) if self.dh_params is None: dhg = DH[self.ike_dh] @@ -260,13 +285,13 @@ class IKEv2SA(object): priv = self.dh_params.generate_private_key() pub = priv.public_key() x = priv.private_numbers().x - self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big') + self.dh_private_key = x.to_bytes(priv.key_size // 8, "big") y = pub.public_numbers().y if self.is_initiator: - self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big') + self.i_dh_data = y.to_bytes(pub.key_size // 8, "big") else: - self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big') + self.r_dh_data = y.to_bytes(pub.key_size // 8, "big") def complete_dh_data(self): self.dh_shared_secret = self.compute_secret() @@ -280,41 +305,39 @@ class IKEv2SA(object): integ_key_len = self.esp_integ_alg.key_len salt_len = 0 if integ_key_len else 4 - l = (integ_key_len * 2 + - encr_key_len * 2 + - salt_len * 2) + l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2 keymat = self.calc_prfplus(prf, self.sk_d, s, l) pos = 0 - c.sk_ei = keymat[pos:pos+encr_key_len] + c.sk_ei = keymat[pos : pos + encr_key_len] pos += encr_key_len if integ_key_len: - c.sk_ai = keymat[pos:pos+integ_key_len] + c.sk_ai = keymat[pos : pos + integ_key_len] pos += integ_key_len else: - c.salt_ei = keymat[pos:pos+salt_len] + c.salt_ei = keymat[pos : pos + salt_len] pos += salt_len - c.sk_er = keymat[pos:pos+encr_key_len] + c.sk_er = keymat[pos : pos + encr_key_len] pos += encr_key_len if integ_key_len: - c.sk_ar = keymat[pos:pos+integ_key_len] + c.sk_ar = keymat[pos : pos + integ_key_len] pos += integ_key_len else: - c.salt_er = keymat[pos:pos+salt_len] + c.salt_er = keymat[pos : pos + salt_len] pos += salt_len def calc_prfplus(self, prf, key, seed, length): - r = b'' + r = b"" t = None x = 1 while len(r) < length and x < 255: if t is not None: s = t else: - s = b'' + s = b"" s = s + seed + bytes([x]) t = self.calc_prf(prf, key, s) r = r + t @@ -347,30 +370,32 @@ class IKEv2SA(object): else: salt_size = 0 - l = (prf_key_trunc + - integ_key_len * 2 + - encr_key_len * 2 + - tr_prf_key_len * 2 + - salt_size * 2) + l = ( + prf_key_trunc + + integ_key_len * 2 + + encr_key_len * 2 + + tr_prf_key_len * 2 + + salt_size * 2 + ) keymat = self.calc_prfplus(prf, self.skeyseed, s, l) pos = 0 - self.sk_d = keymat[:pos+prf_key_trunc] + self.sk_d = keymat[: pos + prf_key_trunc] pos += prf_key_trunc - self.sk_ai = keymat[pos:pos+integ_key_len] + self.sk_ai = keymat[pos : pos + integ_key_len] pos += integ_key_len - self.sk_ar = keymat[pos:pos+integ_key_len] + self.sk_ar = keymat[pos : pos + integ_key_len] pos += integ_key_len - self.sk_ei = keymat[pos:pos+encr_key_len + salt_size] + self.sk_ei = keymat[pos : pos + encr_key_len + salt_size] pos += encr_key_len + salt_size - self.sk_er = keymat[pos:pos+encr_key_len + salt_size] + self.sk_er = keymat[pos : pos + encr_key_len + salt_size] pos += encr_key_len + salt_size - self.sk_pi = keymat[pos:pos+tr_prf_key_len] + self.sk_pi = keymat[pos : pos + tr_prf_key_len] pos += tr_prf_key_len - self.sk_pr = keymat[pos:pos+tr_prf_key_len] + self.sk_pr = keymat[pos : pos + tr_prf_key_len] def generate_authmsg(self, prf, packet): if self.is_initiator: @@ -392,14 +417,15 @@ class IKEv2SA(object): else: packet = self.init_resp_packet authmsg = self.generate_authmsg(prf, raw(packet)) - if self.auth_method == 'shared-key': + if self.auth_method == "shared-key": psk = self.calc_prf(prf, self.auth_data, KEY_PAD) self.auth_data = self.calc_prf(prf, psk, authmsg) - elif self.auth_method == 'rsa-sig': - self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(), - hashes.SHA1()) + elif self.auth_method == "rsa-sig": + self.auth_data = self.priv_key.sign( + authmsg, padding.PKCS1v15(), hashes.SHA1() + ) else: - raise TypeError('unknown auth method type!') + raise TypeError("unknown auth method type!") def encrypt(self, data, aad=None): data = self.ike_crypto_alg.pad(data) @@ -430,7 +456,7 @@ class IKEv2SA(object): return self.sk_ei def concat(self, alg, key_len): - return alg + '-' + str(key_len * 8) + return alg + "-" + str(key_len * 8) @property def vpp_ike_cypto_alg(self): @@ -444,8 +470,9 @@ class IKEv2SA(object): integ_trunc = self.ike_integ_alg.trunc_len exp_hmac = ikemsg[-integ_trunc:] data = ikemsg[:-integ_trunc] - computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(), - self.peer_authkey, data) + computed_hmac = self.compute_hmac( + self.ike_integ_alg.mod(), self.peer_authkey, data + ) self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac) def compute_hmac(self, integ, key, data): @@ -458,7 +485,7 @@ class IKEv2SA(object): def hmac_and_decrypt(self, ike): ep = ike[ikev2.IKEv2_payload_Encrypted] - if self.ike_crypto == 'AES-GCM-16ICV': + if self.ike_crypto == "AES-GCM-16ICV": aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2()) ct = ep.load[:-GCM_ICV_SIZE] tag = ep.load[-GCM_ICV_SIZE:] @@ -472,26 +499,26 @@ class IKEv2SA(object): plain = self.decrypt(ct) # remove padding pad_len = plain[-1] - return plain[:-pad_len - 1] + return plain[: -pad_len - 1] def build_ts_addr(self, ts, version): - return {'starting_address_v' + version: ts['start_addr'], - 'ending_address_v' + version: ts['end_addr']} + return { + "starting_address_v" + version: ts["start_addr"], + "ending_address_v" + version: ts["end_addr"], + } def generate_ts(self, is_ip4): c = self.child_sas[0] - ts_data = {'IP_protocol_ID': 0, - 'start_port': 0, - 'end_port': 0xffff} + ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF} if is_ip4: - ts_data.update(self.build_ts_addr(c.local_ts, '4')) + ts_data.update(self.build_ts_addr(c.local_ts, "4")) ts1 = ikev2.IPv4TrafficSelector(**ts_data) - ts_data.update(self.build_ts_addr(c.remote_ts, '4')) + ts_data.update(self.build_ts_addr(c.remote_ts, "4")) ts2 = ikev2.IPv4TrafficSelector(**ts_data) else: - ts_data.update(self.build_ts_addr(c.local_ts, '6')) + ts_data.update(self.build_ts_addr(c.local_ts, "6")) ts1 = ikev2.IPv6TrafficSelector(**ts_data) - ts_data.update(self.build_ts_addr(c.remote_ts, '6')) + ts_data.update(self.build_ts_addr(c.remote_ts, "6")) ts2 = ikev2.IPv6TrafficSelector(**ts_data) if self.is_initiator: @@ -500,18 +527,18 @@ class IKEv2SA(object): def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh): if crypto not in CRYPTO_ALGOS: - raise TypeError('unsupported encryption algo %r' % crypto) + raise TypeError("unsupported encryption algo %r" % crypto) self.ike_crypto = crypto self.ike_crypto_alg = CRYPTO_ALGOS[crypto] self.ike_crypto_key_len = crypto_key_len if integ not in AUTH_ALGOS: - raise TypeError('unsupported auth algo %r' % integ) - self.ike_integ = None if integ == 'NULL' else integ + raise TypeError("unsupported auth algo %r" % integ) + self.ike_integ = None if integ == "NULL" else integ self.ike_integ_alg = AUTH_ALGOS[integ] if prf not in PRF_ALGOS: - raise TypeError('unsupported prf algo %r' % prf) + raise TypeError("unsupported prf algo %r" % prf) self.ike_prf = prf self.ike_prf_alg = PRF_ALGOS[prf] self.ike_dh = dh @@ -520,20 +547,20 @@ class IKEv2SA(object): def set_esp_props(self, crypto, crypto_key_len, integ): self.esp_crypto_key_len = crypto_key_len if crypto not in CRYPTO_ALGOS: - raise TypeError('unsupported encryption algo %r' % crypto) + raise TypeError("unsupported encryption algo %r" % crypto) self.esp_crypto = crypto self.esp_crypto_alg = CRYPTO_ALGOS[crypto] if integ not in AUTH_ALGOS: - raise TypeError('unsupported auth algo %r' % integ) - self.esp_integ = None if integ == 'NULL' else integ + raise TypeError("unsupported auth algo %r" % integ) + self.esp_integ = None if integ == "NULL" else integ self.esp_integ_alg = AUTH_ALGOS[integ] def crypto_attr(self, key_len): - if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']: - return (0x800e << 16 | key_len << 3, 12) + if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]: + return (0x800E << 16 | key_len << 3, 12) else: - raise Exception('unsupported attribute type') + raise Exception("unsupported attribute type") def ike_crypto_attr(self): return self.crypto_attr(self.ike_crypto_key_len) @@ -544,19 +571,20 @@ class IKEv2SA(object): def compute_nat_sha1(self, ip, port, rspi=None): if rspi is None: rspi = self.rspi - data = self.ispi + rspi + ip + (port).to_bytes(2, 'big') + data = self.ispi + rspi + ip + (port).to_bytes(2, "big") digest = hashes.Hash(hashes.SHA1(), backend=default_backend()) digest.update(data) return digest.finalize() class IkePeer(VppTestCase): - """ common class for initiator and responder """ + """common class for initiator and responder""" @classmethod def setUpClass(cls): import scapy.contrib.ikev2 as _ikev2 - globals()['ikev2'] = _ikev2 + + globals()["ikev2"] = _ikev2 super(IkePeer, cls).setUpClass() cls.create_pg_interfaces(range(2)) for i in cls.pg_interfaces: @@ -590,36 +618,46 @@ class IkePeer(VppTestCase): self.assertIsNotNone(self.p.query_vpp_config()) if self.sa.is_initiator: self.sa.generate_dh_data() - self.vapi.cli('ikev2 set logging level 4') - self.vapi.cli('event-lo clear') + self.vapi.cli("ikev2 set logging level 4") + self.vapi.cli("event-lo clear") - def assert_counter(self, count, name, version='ip4'): - node_name = '/err/ikev2-%s/' % version + name + def assert_counter(self, count, name, version="ip4"): + node_name = "/err/ikev2-%s/" % version + name self.assertEqual(count, self.statistics.get_err_counter(node_name)) def create_rekey_request(self): sa, first_payload = self.generate_auth_payload(is_rekey=True) header = ikev2.IKEv2( - init_SPI=self.sa.ispi, - resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), - flags='Initiator', exch_type='CREATE_CHILD_SA') + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + id=self.sa.new_msg_id(), + flags="Initiator", + exch_type="CREATE_CHILD_SA", + ) ike_msg = self.encrypt_ike_msg(header, sa, first_payload) - return self.create_packet(self.pg0, ike_msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) + return self.create_packet( + self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) def create_empty_request(self): - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - id=self.sa.new_msg_id(), flags='Initiator', - exch_type='INFORMATIONAL', - next_payload='Encrypted') - - msg = self.encrypt_ike_msg(header, b'', None) - return self.create_packet(self.pg0, msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) - - def create_packet(self, src_if, msg, sport=500, dport=500, natt=False, - use_ip6=False): + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + id=self.sa.new_msg_id(), + flags="Initiator", + exch_type="INFORMATIONAL", + next_payload="Encrypted", + ) + + msg = self.encrypt_ike_msg(header, b"", None) + return self.create_packet( + self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) + + def create_packet( + self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False + ): if use_ip6: src_ip = src_if.remote_ip6 dst_ip = src_if.local_ip6 @@ -628,12 +666,14 @@ class IkePeer(VppTestCase): src_ip = src_if.remote_ip4 dst_ip = src_if.local_ip4 ip_layer = IP - res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - ip_layer(src=src_ip, dst=dst_ip) / - UDP(sport=sport, dport=dport)) + res = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / ip_layer(src=src_ip, dst=dst_ip) + / UDP(sport=sport, dport=dport) + ) if natt: # insert non ESP marker - res = res / Raw(b'\x00' * 4) + res = res / Raw(b"\x00" * 4) return res / msg def verify_udp(self, udp): @@ -650,7 +690,7 @@ class IkePeer(VppTestCase): esp = packet[ESP] ih = self.verify_and_remove_non_esp_marker(esp) self.assertEqual(ih.version, 0x20) - self.assertNotIn('Version', ih.flags) + self.assertNotIn("Version", ih.flags) return ih def verify_and_remove_non_esp_marker(self, packet): @@ -658,26 +698,32 @@ class IkePeer(VppTestCase): # if we are in nat traversal mode check for non esp marker # and remove it data = raw(packet) - self.assertEqual(data[:4], b'\x00' * 4) + self.assertEqual(data[:4], b"\x00" * 4) return ikev2.IKEv2(data[4:]) else: return packet def encrypt_ike_msg(self, header, plain, first_payload): - if self.sa.ike_crypto == 'AES-GCM-16ICV': + if self.sa.ike_crypto == "AES-GCM-16ICV": data = self.sa.ike_crypto_alg.pad(raw(plain)) - plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\ - len(ikev2.IKEv2_payload_Encrypted()) + plen = ( + len(data) + + GCM_IV_SIZE + + GCM_ICV_SIZE + + len(ikev2.IKEv2_payload_Encrypted()) + ) tlen = plen + len(ikev2.IKEv2()) # prepare aad data - sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload, - length=plen) + sk_p = ikev2.IKEv2_payload_Encrypted( + next_payload=first_payload, length=plen + ) header.length = tlen res = header / sk_p encr = self.sa.encrypt(raw(plain), raw(res)) - sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload, - length=plen, load=encr) + sk_p = ikev2.IKEv2_payload_Encrypted( + next_payload=first_payload, length=plen, load=encr + ) res = header / sk_p else: encr = self.sa.encrypt(raw(plain)) @@ -685,16 +731,18 @@ class IkePeer(VppTestCase): plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len tlen = plen + len(ikev2.IKEv2()) - sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload, - length=plen, load=encr) + sk_p = ikev2.IKEv2_payload_Encrypted( + next_payload=first_payload, length=plen, load=encr + ) header.length = tlen res = header / sk_p integ_data = raw(res) - hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(), - self.sa.my_authkey, integ_data) + hmac_data = self.sa.compute_hmac( + self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data + ) res = res / Raw(hmac_data[:trunc_len]) - assert(len(res) == tlen) + assert len(res) == tlen return res def verify_udp_encap(self, ipsec_sa): @@ -746,37 +794,37 @@ class IkePeer(VppTestCase): # verify crypto keys self.assertEqual(sa0.crypto_key.length, len(c.sk_er)) self.assertEqual(sa1.crypto_key.length, len(c.sk_ei)) - self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er) - self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei) + self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er) + self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei) # verify integ keys if vpp_integ_alg: self.assertEqual(sa0.integrity_key.length, len(c.sk_ar)) self.assertEqual(sa1.integrity_key.length, len(c.sk_ai)) - self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar) - self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai) + self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar) + self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai) else: - self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er) - self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei) + self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er) + self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei) def verify_keymat(self, api_keys, keys, name): km = getattr(keys, name) api_km = getattr(api_keys, name) - api_km_len = getattr(api_keys, name + '_len') + api_km_len = getattr(api_keys, name + "_len") self.assertEqual(len(km), api_km_len) self.assertEqual(km, api_km[:api_km_len]) def verify_id(self, api_id, exp_id): self.assertEqual(api_id.type, IDType.value(exp_id.type)) self.assertEqual(api_id.data_len, exp_id.data_len) - self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type) + self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type) def verify_ike_sas(self): r = self.vapi.ikev2_sa_dump() self.assertEqual(len(r), 1) sa = r[0].sa - self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big')) - self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big')) + self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big")) + self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big")) if self.ip6: if self.sa.is_initiator: self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6)) @@ -791,55 +839,53 @@ class IkePeer(VppTestCase): else: self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4)) self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4)) - self.verify_keymat(sa.keys, self.sa, 'sk_d') - self.verify_keymat(sa.keys, self.sa, 'sk_ai') - self.verify_keymat(sa.keys, self.sa, 'sk_ar') - self.verify_keymat(sa.keys, self.sa, 'sk_ei') - self.verify_keymat(sa.keys, self.sa, 'sk_er') - self.verify_keymat(sa.keys, self.sa, 'sk_pi') - self.verify_keymat(sa.keys, self.sa, 'sk_pr') + self.verify_keymat(sa.keys, self.sa, "sk_d") + self.verify_keymat(sa.keys, self.sa, "sk_ai") + self.verify_keymat(sa.keys, self.sa, "sk_ar") + self.verify_keymat(sa.keys, self.sa, "sk_ei") + self.verify_keymat(sa.keys, self.sa, "sk_er") + self.verify_keymat(sa.keys, self.sa, "sk_pi") + self.verify_keymat(sa.keys, self.sa, "sk_pr") self.assertEqual(sa.i_id.type, self.sa.id_type) self.assertEqual(sa.r_id.type, self.sa.id_type) self.assertEqual(sa.i_id.data_len, len(self.sa.i_id)) self.assertEqual(sa.r_id.data_len, len(self.idr)) - self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id) - self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr) + self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id) + self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr) r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index) self.assertEqual(len(r), 1) csa = r[0].child_sa self.assertEqual(csa.sa_index, sa.sa_index) c = self.sa.child_sas[0] - if hasattr(c, 'sk_ai'): - self.verify_keymat(csa.keys, c, 'sk_ai') - self.verify_keymat(csa.keys, c, 'sk_ar') - self.verify_keymat(csa.keys, c, 'sk_ei') - self.verify_keymat(csa.keys, c, 'sk_er') - self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi) - self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi) + if hasattr(c, "sk_ai"): + self.verify_keymat(csa.keys, c, "sk_ai") + self.verify_keymat(csa.keys, c, "sk_ar") + self.verify_keymat(csa.keys, c, "sk_ei") + self.verify_keymat(csa.keys, c, "sk_er") + self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi) + self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) tsi = tsi[0] tsr = tsr[0] r = self.vapi.ikev2_traffic_selector_dump( - is_initiator=True, sa_index=sa.sa_index, - child_sa_index=csa.child_sa_index) + is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index + ) self.assertEqual(len(r), 1) ts = r[0].ts self.verify_ts(r[0].ts, tsi[0], True) r = self.vapi.ikev2_traffic_selector_dump( - is_initiator=False, sa_index=sa.sa_index, - child_sa_index=csa.child_sa_index) + is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index + ) self.assertEqual(len(r), 1) self.verify_ts(r[0].ts, tsr[0], False) - n = self.vapi.ikev2_nonce_get(is_initiator=True, - sa_index=sa.sa_index) + n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index) self.verify_nonce(n, self.sa.i_nonce) - n = self.vapi.ikev2_nonce_get(is_initiator=False, - sa_index=sa.sa_index) + n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index) self.verify_nonce(n, self.sa.r_nonce) def verify_nonce(self, api_nonce, nonce): @@ -853,61 +899,65 @@ class IkePeer(VppTestCase): self.assertFalse(api_ts.is_local) if self.p.ts_is_ip4: - self.assertEqual(api_ts.start_addr, - IPv4Address(ts.starting_address_v4)) - self.assertEqual(api_ts.end_addr, - IPv4Address(ts.ending_address_v4)) + self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4)) + self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4)) else: - self.assertEqual(api_ts.start_addr, - IPv6Address(ts.starting_address_v6)) - self.assertEqual(api_ts.end_addr, - IPv6Address(ts.ending_address_v6)) + self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6)) + self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6)) self.assertEqual(api_ts.start_port, ts.start_port) self.assertEqual(api_ts.end_port, ts.end_port) self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID) class TemplateInitiator(IkePeer): - """ initiator test template """ + """initiator test template""" def initiate_del_sa_from_initiator(self): - ispi = int.from_bytes(self.sa.ispi, 'little') + ispi = int.from_bytes(self.sa.ispi, "little") self.pg0.enable_capture() self.pg_start() self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi) capture = self.pg0.get_capture(1) ih = self.get_ike_header(capture[0]) - self.assertNotIn('Response', ih.flags) - self.assertIn('Initiator', ih.flags) + self.assertNotIn("Response", ih.flags) + self.assertIn("Initiator", ih.flags) self.assertEqual(ih.init_SPI, self.sa.ispi) self.assertEqual(ih.resp_SPI, self.sa.rspi) plain = self.sa.hmac_and_decrypt(ih) d = ikev2.IKEv2_payload_Delete(plain) self.assertEqual(d.proto, 1) # proto=IKEv2 - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Response', exch_type='INFORMATIONAL', - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, b'', None) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + flags="Response", + exch_type="INFORMATIONAL", + id=ih.id, + next_payload="Encrypted", + ) + resp = self.encrypt_ike_msg(header, b"", None) self.send_and_assert_no_replies(self.pg0, resp) def verify_del_sa(self, packet): ih = self.get_ike_header(packet) self.assertEqual(ih.id, self.sa.msg_id) self.assertEqual(ih.exch_type, 37) # exchange informational - self.assertIn('Response', ih.flags) - self.assertIn('Initiator', ih.flags) + self.assertIn("Response", ih.flags) + self.assertIn("Initiator", ih.flags) plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') + self.assertEqual(plain, b"") def initiate_del_sa_from_responder(self): - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - exch_type='INFORMATIONAL', - id=self.sa.new_msg_id()) - del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2') - ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete') - packet = self.create_packet(self.pg0, ike_msg, - self.sa.sport, self.sa.dport, - self.sa.natt, self.ip6) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + exch_type="INFORMATIONAL", + id=self.sa.new_msg_id(), + ) + del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2") + ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete") + packet = self.create_packet( + self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) self.pg0.add_stream(packet) self.pg0.enable_capture() self.pg_start() @@ -934,26 +984,28 @@ class TemplateInitiator(IkePeer): s = self.find_notify_payload(packet, 16388) self.assertIsNotNone(s) src_sha = self.sa.compute_nat_sha1( - inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8) + inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8 + ) self.assertEqual(s.load, src_sha) # NAT_DETECTION_DESTINATION_IP s = self.find_notify_payload(packet, 16389) self.assertIsNotNone(s) dst_sha = self.sa.compute_nat_sha1( - inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8) + inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8 + ) self.assertEqual(s.load, dst_sha) def verify_sa_init_request(self, packet): udp = packet[UDP] self.sa.dport = udp.sport ih = packet[ikev2.IKEv2] - self.assertNotEqual(ih.init_SPI, 8 * b'\x00') + self.assertNotEqual(ih.init_SPI, 8 * b"\x00") self.assertEqual(ih.exch_type, 34) # SA_INIT self.sa.ispi = ih.init_SPI - self.assertEqual(ih.resp_SPI, 8 * b'\x00') - self.assertIn('Initiator', ih.flags) - self.assertNotIn('Response', ih.flags) + self.assertEqual(ih.resp_SPI, 8 * b"\x00") + self.assertIn("Initiator", ih.flags) + self.assertNotIn("Response", ih.flags) self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load @@ -961,20 +1013,23 @@ class TemplateInitiator(IkePeer): self.assertEqual(prop.proto, 1) # proto = ikev2 self.assertEqual(prop.proposal, 1) self.assertEqual(prop.trans[0].transform_type, 1) # encryption - self.assertEqual(prop.trans[0].transform_id, - self.p.ike_transforms['crypto_alg']) + self.assertEqual( + prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"] + ) self.assertEqual(prop.trans[1].transform_type, 2) # prf self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256" self.assertEqual(prop.trans[2].transform_type, 4) # dh - self.assertEqual(prop.trans[2].transform_id, - self.p.ike_transforms['dh_group']) + self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"]) self.verify_nat_detection(packet) self.sa.set_ike_props( - crypto='AES-GCM-16ICV', crypto_key_len=32, - integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr') - self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32, - integ='SHA2-256-128') + crypto="AES-GCM-16ICV", + crypto_key_len=32, + integ="NULL", + prf="PRF_HMAC_SHA2_256", + dh="3072MODPgr", + ) + self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128") self.sa.generate_dh_data() self.sa.complete_dh_data() self.sa.calc_keys() @@ -994,8 +1049,8 @@ class TemplateInitiator(IkePeer): self.assertEqual(ih.resp_SPI, self.sa.rspi) self.assertEqual(ih.init_SPI, self.sa.ispi) self.assertEqual(ih.exch_type, 35) # IKE_AUTH - self.assertIn('Initiator', ih.flags) - self.assertNotIn('Response', ih.flags) + self.assertIn("Initiator", ih.flags) + self.assertNotIn("Response", ih.flags) udp = packet[UDP] self.verify_udp(udp) @@ -1012,48 +1067,67 @@ class TemplateInitiator(IkePeer): prop = idi[ikev2.IKEv2_payload_Proposal] c = self.sa.child_sas[0] c.ispi = prop.SPI - self.update_esp_transforms( - prop[ikev2.IKEv2_payload_Transform], self.sa) + self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa) def send_init_response(self): tr_attr = self.sa.ike_crypto_attr() - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.ike_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.ike_integ) / - ikev2.IKEv2_payload_Transform(transform_type='PRF', - transform_id=self.sa.ike_prf_alg.name) / - ikev2.IKEv2_payload_Transform(transform_type='GroupDesc', - transform_id=self.sa.ike_dh)) - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', - trans_nb=4, trans=trans)) + trans = ( + ikev2.IKEv2_payload_Transform( + transform_type="Encryption", + transform_id=self.sa.ike_crypto, + length=tr_attr[1], + key_length=tr_attr[0], + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Integrity", transform_id=self.sa.ike_integ + ) + / ikev2.IKEv2_payload_Transform( + transform_type="PRF", transform_id=self.sa.ike_prf_alg.name + ) + / ikev2.IKEv2_payload_Transform( + transform_type="GroupDesc", transform_id=self.sa.ike_dh + ) + ) + props = ikev2.IKEv2_payload_Proposal( + proposal=1, proto="IKEv2", trans_nb=4, trans=trans + ) src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) if self.sa.natt: - dst_address = b'\x0a\x0a\x0a\x0a' + dst_address = b"\x0a\x0a\x0a\x0a" else: dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4) src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport) self.sa.init_resp_packet = ( - ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - exch_type='IKE_SA_INIT', flags='Response') / - ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) / - ikev2.IKEv2_payload_KE(next_payload='Nonce', - group=self.sa.ike_dh, - load=self.sa.my_dh_pub_key) / - ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, - next_payload='Notify') / - ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_SOURCE_IP', load=src_nat, - next_payload='Notify') / ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)) - - ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet, - self.sa.sport, self.sa.dport, - False, self.ip6) + ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + exch_type="IKE_SA_INIT", + flags="Response", + ) + / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props) + / ikev2.IKEv2_payload_KE( + next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key + ) + / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify") + / ikev2.IKEv2_payload_Notify( + type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify" + ) + / ikev2.IKEv2_payload_Notify( + type="NAT_DETECTION_DESTINATION_IP", load=dst_nat + ) + ) + + ike_msg = self.create_packet( + self.pg0, + self.sa.init_resp_packet, + self.sa.sport, + self.sa.dport, + False, + self.ip6, + ) self.pg_send(self.pg0, ike_msg) capture = self.pg0.get_capture(1) self.verify_sa_auth_req(capture[0]) @@ -1069,47 +1143,64 @@ class TemplateInitiator(IkePeer): def send_auth_response(self): tr_attr = self.sa.esp_crypto_attr() - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.esp_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.esp_integ) / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='No ESN') / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='ESN')) + trans = ( + ikev2.IKEv2_payload_Transform( + transform_type="Encryption", + transform_id=self.sa.esp_crypto, + length=tr_attr[1], + key_length=tr_attr[0], + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Integrity", transform_id=self.sa.esp_integ + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Extended Sequence Number", transform_id="No ESN" + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Extended Sequence Number", transform_id="ESN" + ) + ) c = self.sa.child_sas[0] - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans)) + props = ikev2.IKEv2_payload_Proposal( + proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans + ) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) - plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr', - IDtype=self.sa.id_type, load=self.sa.i_id) / - ikev2.IKEv2_payload_IDr(next_payload='AUTH', - IDtype=self.sa.id_type, load=self.sa.r_id) / - ikev2.IKEv2_payload_AUTH(next_payload='SA', - auth_type=AuthMethod.value(self.sa.auth_method), - load=self.sa.auth_data) / - ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) / - ikev2.IKEv2_payload_TSi(next_payload='TSr', - number_of_TSs=len(tsi), - traffic_selector=tsi) / - ikev2.IKEv2_payload_TSr(next_payload='Notify', - number_of_TSs=len(tsr), - traffic_selector=tsr) / - ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')) + plain = ( + ikev2.IKEv2_payload_IDi( + next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id + ) + / ikev2.IKEv2_payload_IDr( + next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id + ) + / ikev2.IKEv2_payload_AUTH( + next_payload="SA", + auth_type=AuthMethod.value(self.sa.auth_method), + load=self.sa.auth_data, + ) + / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props) + / ikev2.IKEv2_payload_TSi( + next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi + ) + / ikev2.IKEv2_payload_TSr( + next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr + ) + / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT") + ) header = ikev2.IKEv2( - init_SPI=self.sa.ispi, - resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), - flags='Response', exch_type='IKE_AUTH') - - ike_msg = self.encrypt_ike_msg(header, plain, 'IDi') - packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + id=self.sa.new_msg_id(), + flags="Response", + exch_type="IKE_AUTH", + ) + + ike_msg = self.encrypt_ike_msg(header, plain, "IDi") + packet = self.create_packet( + self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) self.pg_send(self.pg0, packet) def test_initiator(self): @@ -1121,51 +1212,58 @@ class TemplateInitiator(IkePeer): class TemplateResponder(IkePeer): - """ responder test template """ + """responder test template""" def initiate_del_sa_from_responder(self): self.pg0.enable_capture() self.pg_start() - self.vapi.ikev2_initiate_del_ike_sa( - ispi=int.from_bytes(self.sa.ispi, 'little')) + self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little")) capture = self.pg0.get_capture(1) ih = self.get_ike_header(capture[0]) - self.assertNotIn('Response', ih.flags) - self.assertNotIn('Initiator', ih.flags) + self.assertNotIn("Response", ih.flags) + self.assertNotIn("Initiator", ih.flags) self.assertEqual(ih.exch_type, 37) # INFORMATIONAL plain = self.sa.hmac_and_decrypt(ih) d = ikev2.IKEv2_payload_Delete(plain) self.assertEqual(d.proto, 1) # proto=IKEv2 self.assertEqual(ih.init_SPI, self.sa.ispi) self.assertEqual(ih.resp_SPI, self.sa.rspi) - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Initiator+Response', - exch_type='INFORMATIONAL', - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, b'', None) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + flags="Initiator+Response", + exch_type="INFORMATIONAL", + id=ih.id, + next_payload="Encrypted", + ) + resp = self.encrypt_ike_msg(header, b"", None) self.send_and_assert_no_replies(self.pg0, resp) def verify_del_sa(self, packet): ih = self.get_ike_header(packet) self.assertEqual(ih.id, self.sa.msg_id) self.assertEqual(ih.exch_type, 37) # exchange informational - self.assertIn('Response', ih.flags) - self.assertNotIn('Initiator', ih.flags) + self.assertIn("Response", ih.flags) + self.assertNotIn("Initiator", ih.flags) self.assertEqual(ih.next_payload, 46) # Encrypted self.assertEqual(ih.init_SPI, self.sa.ispi) self.assertEqual(ih.resp_SPI, self.sa.rspi) plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') + self.assertEqual(plain, b"") def initiate_del_sa_from_initiator(self): - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Initiator', exch_type='INFORMATIONAL', - id=self.sa.new_msg_id()) - del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2') - ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete') - packet = self.create_packet(self.pg0, ike_msg, - self.sa.sport, self.sa.dport, - self.sa.natt, self.ip6) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + flags="Initiator", + exch_type="INFORMATIONAL", + id=self.sa.new_msg_id(), + ) + del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2") + ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete") + packet = self.create_packet( + self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) self.pg0.add_stream(packet) self.pg0.enable_capture() self.pg_start() @@ -1174,56 +1272,72 @@ class TemplateResponder(IkePeer): def send_sa_init_req(self): tr_attr = self.sa.ike_crypto_attr() - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.ike_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.ike_integ) / - ikev2.IKEv2_payload_Transform(transform_type='PRF', - transform_id=self.sa.ike_prf_alg.name) / - ikev2.IKEv2_payload_Transform(transform_type='GroupDesc', - transform_id=self.sa.ike_dh)) - - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', - trans_nb=4, trans=trans)) - - next_payload = None if self.ip6 else 'Notify' + trans = ( + ikev2.IKEv2_payload_Transform( + transform_type="Encryption", + transform_id=self.sa.ike_crypto, + length=tr_attr[1], + key_length=tr_attr[0], + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Integrity", transform_id=self.sa.ike_integ + ) + / ikev2.IKEv2_payload_Transform( + transform_type="PRF", transform_id=self.sa.ike_prf_alg.name + ) + / ikev2.IKEv2_payload_Transform( + transform_type="GroupDesc", transform_id=self.sa.ike_dh + ) + ) + + props = ikev2.IKEv2_payload_Proposal( + proposal=1, proto="IKEv2", trans_nb=4, trans=trans + ) + + next_payload = None if self.ip6 else "Notify" self.sa.init_req_packet = ( - ikev2.IKEv2(init_SPI=self.sa.ispi, - flags='Initiator', exch_type='IKE_SA_INIT') / - ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) / - ikev2.IKEv2_payload_KE(next_payload='Nonce', - group=self.sa.ike_dh, - load=self.sa.my_dh_pub_key) / - ikev2.IKEv2_payload_Nonce(next_payload=next_payload, - load=self.sa.i_nonce)) + ikev2.IKEv2( + init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT" + ) + / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props) + / ikev2.IKEv2_payload_KE( + next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key + ) + / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce) + ) if not self.ip6: if self.sa.i_natt: - src_address = b'\x0a\x0a\x0a\x01' + src_address = b"\x0a\x0a\x0a\x01" else: src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) if self.sa.r_natt: - dst_address = b'\x0a\x0a\x0a\x0a' + dst_address = b"\x0a\x0a\x0a\x0a" else: dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4) src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport) nat_src_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_SOURCE_IP', load=src_nat, - next_payload='Notify') + type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify" + ) nat_dst_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_DESTINATION_IP', load=dst_nat) - self.sa.init_req_packet = (self.sa.init_req_packet / - nat_src_detection / - nat_dst_detection) - - ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet, - self.sa.sport, self.sa.dport, - self.sa.natt, self.ip6) + type="NAT_DETECTION_DESTINATION_IP", load=dst_nat + ) + self.sa.init_req_packet = ( + self.sa.init_req_packet / nat_src_detection / nat_dst_detection + ) + + ike_msg = self.create_packet( + self.pg0, + self.sa.init_req_packet, + self.sa.sport, + self.sa.dport, + self.sa.natt, + self.ip6, + ) self.pg0.add_stream(ike_msg) self.pg0.enable_capture() self.pg_start() @@ -1232,65 +1346,83 @@ class TemplateResponder(IkePeer): def generate_auth_payload(self, last_payload=None, is_rekey=False): tr_attr = self.sa.esp_crypto_attr() - last_payload = last_payload or 'Notify' - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.esp_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.esp_integ) / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='No ESN') / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='ESN')) + last_payload = last_payload or "Notify" + trans = ( + ikev2.IKEv2_payload_Transform( + transform_type="Encryption", + transform_id=self.sa.esp_crypto, + length=tr_attr[1], + key_length=tr_attr[0], + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Integrity", transform_id=self.sa.esp_integ + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Extended Sequence Number", transform_id="No ESN" + ) + / ikev2.IKEv2_payload_Transform( + transform_type="Extended Sequence Number", transform_id="ESN" + ) + ) c = self.sa.child_sas[0] - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans)) + props = ikev2.IKEv2_payload_Proposal( + proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans + ) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) - plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA', - auth_type=AuthMethod.value(self.sa.auth_method), - load=self.sa.auth_data) / - ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) / - ikev2.IKEv2_payload_TSi(next_payload='TSr', - number_of_TSs=len(tsi), traffic_selector=tsi) / - ikev2.IKEv2_payload_TSr(next_payload=last_payload, - number_of_TSs=len(tsr), traffic_selector=tsr)) + plain = ( + ikev2.IKEv2_payload_AUTH( + next_payload="SA", + auth_type=AuthMethod.value(self.sa.auth_method), + load=self.sa.auth_data, + ) + / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props) + / ikev2.IKEv2_payload_TSi( + next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi + ) + / ikev2.IKEv2_payload_TSr( + next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr + ) + ) if is_rekey: - first_payload = 'Nonce' - plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, - next_payload='SA') / plain / - ikev2.IKEv2_payload_Notify(type='REKEY_SA', - proto='ESP', SPI=c.ispi)) + first_payload = "Nonce" + plain = ( + ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA") + / plain + / ikev2.IKEv2_payload_Notify(type="REKEY_SA", proto="ESP", SPI=c.ispi) + ) else: - first_payload = 'IDi' + first_payload = "IDi" if self.no_idr_auth: - ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH', - IDtype=self.sa.id_type, - load=self.sa.i_id) + ids = ikev2.IKEv2_payload_IDi( + next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id + ) else: - ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr', - IDtype=self.sa.id_type, load=self.sa.i_id) / - ikev2.IKEv2_payload_IDr(next_payload='AUTH', - IDtype=self.sa.id_type, load=self.sa.r_id)) + ids = ikev2.IKEv2_payload_IDi( + next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id + ) / ikev2.IKEv2_payload_IDr( + next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id + ) plain = ids / plain return plain, first_payload def send_sa_auth(self): - plain, first_payload = self.generate_auth_payload( - last_payload='Notify') - plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT') + plain, first_payload = self.generate_auth_payload(last_payload="Notify") + plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT") header = ikev2.IKEv2( - init_SPI=self.sa.ispi, - resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), - flags='Initiator', exch_type='IKE_AUTH') + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + id=self.sa.new_msg_id(), + flags="Initiator", + exch_type="IKE_AUTH", + ) ike_msg = self.encrypt_ike_msg(header, plain, first_payload) - packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) + packet = self.create_packet( + self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) self.pg0.add_stream(packet) self.pg0.enable_capture() self.pg_start() @@ -1302,7 +1434,7 @@ class TemplateResponder(IkePeer): self.assertEqual(ih.id, self.sa.msg_id) self.assertEqual(ih.exch_type, 34) - self.assertIn('Response', ih.flags) + self.assertIn("Response", ih.flags) self.assertEqual(ih.init_SPI, self.sa.ispi) self.assertNotEqual(ih.resp_SPI, 0) self.sa.rspi = ih.resp_SPI @@ -1330,12 +1462,12 @@ class TemplateResponder(IkePeer): self.sa.child_sas[0].rspi = prop.SPI self.sa.calc_child_keys() - IKE_NODE_SUFFIX = 'ip4' + IKE_NODE_SUFFIX = "ip4" def verify_counters(self): - self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX) - self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX) - self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX) + self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX) + self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX) + self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX) r = self.vapi.ikev2_sa_dump() s = r[0].sa.stats @@ -1355,59 +1487,60 @@ class Ikev2Params(object): ec = VppEnum.vl_api_ipsec_crypto_alg_t ei = VppEnum.vl_api_ipsec_integ_alg_t self.vpp_enums = { - 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128, - 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192, - 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256, - 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128, - 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192, - 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256, - - 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96, - 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128, - 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192, - 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256} - - dpd_disabled = True if 'dpd_disabled' not in params else\ - params['dpd_disabled'] + "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128, + "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192, + "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256, + "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128, + "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192, + "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256, + "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96, + "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128, + "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192, + "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256, + } + + dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"] if dpd_disabled: - self.vapi.cli('ikev2 dpd disable') - self.del_sa_from_responder = False if 'del_sa_from_responder'\ - not in params else params['del_sa_from_responder'] - i_natt = False if 'i_natt' not in params else params['i_natt'] - r_natt = False if 'r_natt' not in params else params['r_natt'] - self.p = Profile(self, 'pr1') - self.ip6 = False if 'ip6' not in params else params['ip6'] - - if 'auth' in params and params['auth'] == 'rsa-sig': - auth_method = 'rsa-sig' - work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/' - self.vapi.ikev2_set_local_key( - key_file=work_dir + params['server-key']) - - client_file = work_dir + params['client-cert'] - server_pem = open(work_dir + params['server-cert']).read() - client_priv = open(work_dir + params['client-key']).read() - client_priv = load_pem_private_key(str.encode(client_priv), None, - default_backend()) + self.vapi.cli("ikev2 dpd disable") + self.del_sa_from_responder = ( + False + if "del_sa_from_responder" not in params + else params["del_sa_from_responder"] + ) + i_natt = False if "i_natt" not in params else params["i_natt"] + r_natt = False if "r_natt" not in params else params["r_natt"] + self.p = Profile(self, "pr1") + self.ip6 = False if "ip6" not in params else params["ip6"] + + if "auth" in params and params["auth"] == "rsa-sig": + auth_method = "rsa-sig" + work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/" + self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"]) + + client_file = work_dir + params["client-cert"] + server_pem = open(work_dir + params["server-cert"]).read() + client_priv = open(work_dir + params["client-key"]).read() + client_priv = load_pem_private_key( + str.encode(client_priv), None, default_backend() + ) self.peer_cert = x509.load_pem_x509_certificate( - str.encode(server_pem), - default_backend()) - self.p.add_auth(method='rsa-sig', data=str.encode(client_file)) + str.encode(server_pem), default_backend() + ) + self.p.add_auth(method="rsa-sig", data=str.encode(client_file)) auth_data = None else: - auth_data = b'$3cr3tpa$$w0rd' - self.p.add_auth(method='shared-key', data=auth_data) - auth_method = 'shared-key' + auth_data = b"$3cr3tpa$$w0rd" + self.p.add_auth(method="shared-key", data=auth_data) + auth_method = "shared-key" client_priv = None - is_init = True if 'is_initiator' not in params else\ - params['is_initiator'] - self.no_idr_auth = params.get('no_idr_in_auth', False) + is_init = True if "is_initiator" not in params else params["is_initiator"] + self.no_idr_auth = params.get("no_idr_in_auth", False) - idr = {'id_type': 'fqdn', 'data': b'vpp.home'} - idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'} - r_id = self.idr = idr['data'] - i_id = self.idi = idi['data'] + idr = {"id_type": "fqdn", "data": b"vpp.home"} + idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"} + r_id = self.idr = idr["data"] + i_id = self.idi = idi["data"] if is_init: # scapy is initiator, VPP is responder self.p.add_local_id(**idr) @@ -1420,70 +1553,90 @@ class Ikev2Params(object): if not self.no_idr_auth: self.p.add_remote_id(**idr) - loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\ - 'loc_ts' not in params else params['loc_ts'] - rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\ - 'rem_ts' not in params else params['rem_ts'] + loc_ts = ( + {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"} + if "loc_ts" not in params + else params["loc_ts"] + ) + rem_ts = ( + {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"} + if "rem_ts" not in params + else params["rem_ts"] + ) self.p.add_local_ts(**loc_ts) self.p.add_remote_ts(**rem_ts) - if 'responder' in params: - self.p.add_responder(params['responder']) - if 'ike_transforms' in params: - self.p.add_ike_transforms(params['ike_transforms']) - if 'esp_transforms' in params: - self.p.add_esp_transforms(params['esp_transforms']) - - udp_encap = False if 'udp_encap' not in params else\ - params['udp_encap'] + if "responder" in params: + self.p.add_responder(params["responder"]) + if "ike_transforms" in params: + self.p.add_ike_transforms(params["ike_transforms"]) + if "esp_transforms" in params: + self.p.add_esp_transforms(params["esp_transforms"]) + + udp_encap = False if "udp_encap" not in params else params["udp_encap"] if udp_encap: self.p.set_udp_encap(True) - if 'responder_hostname' in params: - hn = params['responder_hostname'] + if "responder_hostname" in params: + hn = params["responder_hostname"] self.p.add_responder_hostname(hn) # configure static dns record self.vapi.dns_name_server_add_del( - is_ip6=0, is_add=1, - server_address=IPv4Address(u'8.8.8.8').packed) + is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed + ) self.vapi.dns_enable_disable(enable=1) - cmd = "dns cache add {} {}".format(hn['hostname'], - self.pg0.remote_ip4) + cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4) self.vapi.cli(cmd) - self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id, - is_initiator=is_init, - id_type=self.p.local_id['id_type'], - i_natt=i_natt, r_natt=r_natt, - priv_key=client_priv, auth_method=auth_method, - nonce=params.get('nonce'), - auth_data=auth_data, udp_encap=udp_encap, - local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) + self.sa = IKEv2SA( + self, + i_id=i_id, + r_id=r_id, + is_initiator=is_init, + id_type=self.p.local_id["id_type"], + i_natt=i_natt, + r_natt=r_natt, + priv_key=client_priv, + auth_method=auth_method, + nonce=params.get("nonce"), + auth_data=auth_data, + udp_encap=udp_encap, + local_ts=self.p.remote_ts, + remote_ts=self.p.local_ts, + ) if is_init: - ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\ - params['ike-crypto'] - ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\ - params['ike-integ'] - ike_dh = '2048MODPgr' if 'ike-dh' not in params else\ - params['ike-dh'] - - esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\ - params['esp-crypto'] - esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\ - params['esp-integ'] + ike_crypto = ( + ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"] + ) + ike_integ = ( + "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"] + ) + ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"] + + esp_crypto = ( + ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"] + ) + esp_integ = ( + "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"] + ) self.sa.set_ike_props( - crypto=ike_crypto[0], crypto_key_len=ike_crypto[1], - integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh) + crypto=ike_crypto[0], + crypto_key_len=ike_crypto[1], + integ=ike_integ, + prf="PRF_HMAC_SHA2_256", + dh=ike_dh, + ) self.sa.set_esp_props( - crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], - integ=esp_integ) + crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ + ) class TestApi(VppTestCase): - """ Test IKEV2 API """ + """Test IKEV2 API""" + @classmethod def setUpClass(cls): super(TestApi, cls).setUpClass() @@ -1500,241 +1653,249 @@ class TestApi(VppTestCase): self.assertEqual(len(r), 0) def configure_profile(self, cfg): - p = Profile(self, cfg['name']) - p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1]) - p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1]) - p.add_local_ts(**cfg['loc_ts']) - p.add_remote_ts(**cfg['rem_ts']) - p.add_responder(cfg['responder']) - p.add_ike_transforms(cfg['ike_ts']) - p.add_esp_transforms(cfg['esp_ts']) - p.add_auth(**cfg['auth']) - p.set_udp_encap(cfg['udp_encap']) - p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port']) - if 'lifetime_data' in cfg: - p.set_lifetime_data(cfg['lifetime_data']) - if 'tun_itf' in cfg: - p.set_tunnel_interface(cfg['tun_itf']) - if 'natt_disabled' in cfg and cfg['natt_disabled']: + p = Profile(self, cfg["name"]) + p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1]) + p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1]) + p.add_local_ts(**cfg["loc_ts"]) + p.add_remote_ts(**cfg["rem_ts"]) + p.add_responder(cfg["responder"]) + p.add_ike_transforms(cfg["ike_ts"]) + p.add_esp_transforms(cfg["esp_ts"]) + p.add_auth(**cfg["auth"]) + p.set_udp_encap(cfg["udp_encap"]) + p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"]) + if "lifetime_data" in cfg: + p.set_lifetime_data(cfg["lifetime_data"]) + if "tun_itf" in cfg: + p.set_tunnel_interface(cfg["tun_itf"]) + if "natt_disabled" in cfg and cfg["natt_disabled"]: p.disable_natt() p.add_vpp_config() return p def test_profile_api(self): - """ test profile dump API """ + """test profile dump API""" loc_ts4 = { - 'proto': 8, - 'start_port': 1, - 'end_port': 19, - 'start_addr': '3.3.3.2', - 'end_addr': '3.3.3.3', - } + "proto": 8, + "start_port": 1, + "end_port": 19, + "start_addr": "3.3.3.2", + "end_addr": "3.3.3.3", + } rem_ts4 = { - 'proto': 9, - 'start_port': 10, - 'end_port': 119, - 'start_addr': '4.5.76.80', - 'end_addr': '2.3.4.6', - } + "proto": 9, + "start_port": 10, + "end_port": 119, + "start_addr": "4.5.76.80", + "end_addr": "2.3.4.6", + } loc_ts6 = { - 'proto': 8, - 'start_port': 1, - 'end_port': 19, - 'start_addr': 'ab::1', - 'end_addr': 'ab::4', - } + "proto": 8, + "start_port": 1, + "end_port": 19, + "start_addr": "ab::1", + "end_addr": "ab::4", + } rem_ts6 = { - 'proto': 9, - 'start_port': 10, - 'end_port': 119, - 'start_addr': 'cd::12', - 'end_addr': 'cd::13', - } + "proto": 9, + "start_port": 10, + "end_port": 119, + "start_addr": "cd::12", + "end_addr": "cd::13", + } conf = { - 'p1': { - 'name': 'p1', - 'natt_disabled': True, - 'loc_id': ('fqdn', b'vpp.home'), - 'rem_id': ('fqdn', b'roadwarrior.example.com'), - 'loc_ts': loc_ts4, - 'rem_ts': rem_ts4, - 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'}, - 'ike_ts': { - 'crypto_alg': 20, - 'crypto_key_size': 32, - 'integ_alg': 0, - 'dh_group': 1}, - 'esp_ts': { - 'crypto_alg': 13, - 'crypto_key_size': 24, - 'integ_alg': 2}, - 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'}, - 'udp_encap': True, - 'ipsec_over_udp_port': 4501, - 'lifetime_data': { - 'lifetime': 123, - 'lifetime_maxdata': 20192, - 'lifetime_jitter': 9, - 'handover': 132}, + "p1": { + "name": "p1", + "natt_disabled": True, + "loc_id": ("fqdn", b"vpp.home"), + "rem_id": ("fqdn", b"roadwarrior.example.com"), + "loc_ts": loc_ts4, + "rem_ts": rem_ts4, + "responder": {"sw_if_index": 0, "addr": "5.6.7.8"}, + "ike_ts": { + "crypto_alg": 20, + "crypto_key_size": 32, + "integ_alg": 0, + "dh_group": 1, + }, + "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2}, + "auth": {"method": "shared-key", "data": b"sharedkeydata"}, + "udp_encap": True, + "ipsec_over_udp_port": 4501, + "lifetime_data": { + "lifetime": 123, + "lifetime_maxdata": 20192, + "lifetime_jitter": 9, + "handover": 132, + }, + }, + "p2": { + "name": "p2", + "loc_id": ("ip4-addr", b"192.168.2.1"), + "rem_id": ("ip6-addr", b"abcd::1"), + "loc_ts": loc_ts6, + "rem_ts": rem_ts6, + "responder": {"sw_if_index": 4, "addr": "def::10"}, + "ike_ts": { + "crypto_alg": 12, + "crypto_key_size": 16, + "integ_alg": 3, + "dh_group": 3, + }, + "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4}, + "auth": {"method": "shared-key", "data": b"sharedkeydata"}, + "udp_encap": False, + "ipsec_over_udp_port": 4600, + "tun_itf": 0, }, - 'p2': { - 'name': 'p2', - 'loc_id': ('ip4-addr', b'192.168.2.1'), - 'rem_id': ('ip6-addr', b'abcd::1'), - 'loc_ts': loc_ts6, - 'rem_ts': rem_ts6, - 'responder': {'sw_if_index': 4, 'addr': 'def::10'}, - 'ike_ts': { - 'crypto_alg': 12, - 'crypto_key_size': 16, - 'integ_alg': 3, - 'dh_group': 3}, - 'esp_ts': { - 'crypto_alg': 9, - 'crypto_key_size': 24, - 'integ_alg': 4}, - 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'}, - 'udp_encap': False, - 'ipsec_over_udp_port': 4600, - 'tun_itf': 0} } - self.p1 = self.configure_profile(conf['p1']) - self.p2 = self.configure_profile(conf['p2']) + self.p1 = self.configure_profile(conf["p1"]) + self.p2 = self.configure_profile(conf["p2"]) r = self.vapi.ikev2_profile_dump() self.assertEqual(len(r), 2) - self.verify_profile(r[0].profile, conf['p1']) - self.verify_profile(r[1].profile, conf['p2']) + self.verify_profile(r[0].profile, conf["p1"]) + self.verify_profile(r[1].profile, conf["p2"]) def verify_id(self, api_id, cfg_id): self.assertEqual(api_id.type, IDType.value(cfg_id[0])) - self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1]) + self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1]) def verify_ts(self, api_ts, cfg_ts): - self.assertEqual(api_ts.protocol_id, cfg_ts['proto']) - self.assertEqual(api_ts.start_port, cfg_ts['start_port']) - self.assertEqual(api_ts.end_port, cfg_ts['end_port']) - self.assertEqual(api_ts.start_addr, - ip_address(text_type(cfg_ts['start_addr']))) - self.assertEqual(api_ts.end_addr, - ip_address(text_type(cfg_ts['end_addr']))) + self.assertEqual(api_ts.protocol_id, cfg_ts["proto"]) + self.assertEqual(api_ts.start_port, cfg_ts["start_port"]) + self.assertEqual(api_ts.end_port, cfg_ts["end_port"]) + self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"]))) + self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"]))) def verify_responder(self, api_r, cfg_r): - self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index']) - self.assertEqual(api_r.addr, ip_address(cfg_r['addr'])) + self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"]) + self.assertEqual(api_r.addr, ip_address(cfg_r["addr"])) def verify_transforms(self, api_ts, cfg_ts): - self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg']) - self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size']) - self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg']) + self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"]) + self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"]) + self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"]) def verify_ike_transforms(self, api_ts, cfg_ts): self.verify_transforms(api_ts, cfg_ts) - self.assertEqual(api_ts.dh_group, cfg_ts['dh_group']) + self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"]) def verify_esp_transforms(self, api_ts, cfg_ts): self.verify_transforms(api_ts, cfg_ts) def verify_auth(self, api_auth, cfg_auth): - self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method'])) - self.assertEqual(api_auth.data, cfg_auth['data']) - self.assertEqual(api_auth.data_len, len(cfg_auth['data'])) + self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"])) + self.assertEqual(api_auth.data, cfg_auth["data"]) + self.assertEqual(api_auth.data_len, len(cfg_auth["data"])) def verify_lifetime_data(self, p, ld): - self.assertEqual(p.lifetime, ld['lifetime']) - self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata']) - self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter']) - self.assertEqual(p.handover, ld['handover']) + self.assertEqual(p.lifetime, ld["lifetime"]) + self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"]) + self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"]) + self.assertEqual(p.handover, ld["handover"]) def verify_profile(self, ap, cp): - self.assertEqual(ap.name, cp['name']) - self.assertEqual(ap.udp_encap, cp['udp_encap']) - self.verify_id(ap.loc_id, cp['loc_id']) - self.verify_id(ap.rem_id, cp['rem_id']) - self.verify_ts(ap.loc_ts, cp['loc_ts']) - self.verify_ts(ap.rem_ts, cp['rem_ts']) - self.verify_responder(ap.responder, cp['responder']) - self.verify_ike_transforms(ap.ike_ts, cp['ike_ts']) - self.verify_esp_transforms(ap.esp_ts, cp['esp_ts']) - self.verify_auth(ap.auth, cp['auth']) - natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled'] + self.assertEqual(ap.name, cp["name"]) + self.assertEqual(ap.udp_encap, cp["udp_encap"]) + self.verify_id(ap.loc_id, cp["loc_id"]) + self.verify_id(ap.rem_id, cp["rem_id"]) + self.verify_ts(ap.loc_ts, cp["loc_ts"]) + self.verify_ts(ap.rem_ts, cp["rem_ts"]) + self.verify_responder(ap.responder, cp["responder"]) + self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"]) + self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"]) + self.verify_auth(ap.auth, cp["auth"]) + natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"] self.assertTrue(natt_dis == ap.natt_disabled) - if 'lifetime_data' in cp: - self.verify_lifetime_data(ap, cp['lifetime_data']) - self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port']) - if 'tun_itf' in cp: - self.assertEqual(ap.tun_itf, cp['tun_itf']) + if "lifetime_data" in cp: + self.verify_lifetime_data(ap, cp["lifetime_data"]) + self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"]) + if "tun_itf" in cp: + self.assertEqual(ap.tun_itf, cp["tun_itf"]) else: - self.assertEqual(ap.tun_itf, 0xffffffff) + self.assertEqual(ap.tun_itf, 0xFFFFFFFF) @tag_fixme_vpp_workers class TestResponderBehindNAT(TemplateResponder, Ikev2Params): - """ test responder - responder behind NAT """ + """test responder - responder behind NAT""" - IKE_NODE_SUFFIX = 'ip4-natt' + IKE_NODE_SUFFIX = "ip4-natt" def config_tc(self): - self.config_params({'r_natt': True}) + self.config_params({"r_natt": True}) @tag_fixme_vpp_workers class TestInitiatorNATT(TemplateInitiator, Ikev2Params): - """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """ + """test ikev2 initiator - NAT traversal (intitiator behind NAT)""" def config_tc(self): - self.config_params({ - 'i_natt': True, - 'is_initiator': False, # seen from test case perspective - # thus vpp is initiator - 'responder': {'sw_if_index': self.pg0.sw_if_index, - 'addr': self.pg0.remote_ip4}, - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '3072MODPgr', - 'ike_transforms': { - 'crypto_alg': 20, # "aes-gcm-16" - 'crypto_key_size': 256, - 'dh_group': 15, # "modp-3072" - }, - 'esp_transforms': { - 'crypto_alg': 12, # "aes-cbc" - 'crypto_key_size': 256, - # "hmac-sha2-256-128" - 'integ_alg': 12}}) + self.config_params( + { + "i_natt": True, + "is_initiator": False, # seen from test case perspective + # thus vpp is initiator + "responder": { + "sw_if_index": self.pg0.sw_if_index, + "addr": self.pg0.remote_ip4, + }, + "ike-crypto": ("AES-GCM-16ICV", 32), + "ike-integ": "NULL", + "ike-dh": "3072MODPgr", + "ike_transforms": { + "crypto_alg": 20, # "aes-gcm-16" + "crypto_key_size": 256, + "dh_group": 15, # "modp-3072" + }, + "esp_transforms": { + "crypto_alg": 12, # "aes-cbc" + "crypto_key_size": 256, + # "hmac-sha2-256-128" + "integ_alg": 12, + }, + } + ) @tag_fixme_vpp_workers class TestInitiatorPsk(TemplateInitiator, Ikev2Params): - """ test ikev2 initiator - pre shared key auth """ + """test ikev2 initiator - pre shared key auth""" def config_tc(self): - self.config_params({ - 'is_initiator': False, # seen from test case perspective - # thus vpp is initiator - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '3072MODPgr', - 'ike_transforms': { - 'crypto_alg': 20, # "aes-gcm-16" - 'crypto_key_size': 256, - 'dh_group': 15, # "modp-3072" - }, - 'esp_transforms': { - 'crypto_alg': 12, # "aes-cbc" - 'crypto_key_size': 256, - # "hmac-sha2-256-128" - 'integ_alg': 12}, - 'responder_hostname': {'hostname': 'vpp.responder.org', - 'sw_if_index': self.pg0.sw_if_index}}) + self.config_params( + { + "is_initiator": False, # seen from test case perspective + # thus vpp is initiator + "ike-crypto": ("AES-GCM-16ICV", 32), + "ike-integ": "NULL", + "ike-dh": "3072MODPgr", + "ike_transforms": { + "crypto_alg": 20, # "aes-gcm-16" + "crypto_key_size": 256, + "dh_group": 15, # "modp-3072" + }, + "esp_transforms": { + "crypto_alg": 12, # "aes-cbc" + "crypto_key_size": 256, + # "hmac-sha2-256-128" + "integ_alg": 12, + }, + "responder_hostname": { + "hostname": "vpp.responder.org", + "sw_if_index": self.pg0.sw_if_index, + }, + } + ) @tag_fixme_vpp_workers class TestInitiatorRequestWindowSize(TestInitiatorPsk): - """ test initiator - request window size (1) """ + """test initiator - request window size (1)""" def rekey_respond(self, req, update_child_sa_data): ih = self.get_ike_header(req) @@ -1748,19 +1909,25 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk): self.sa.child_sas[0].rspi = prop.SPI self.sa.calc_child_keys() - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Response', exch_type=36, - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, sa, 'SA') - packet = self.create_packet(self.pg0, resp, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + flags="Response", + exch_type=36, + id=ih.id, + next_payload="Encrypted", + ) + resp = self.encrypt_ike_msg(header, sa, "SA") + packet = self.create_packet( + self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) self.send_and_assert_no_replies(self.pg0, packet) def test_initiator(self): super(TestInitiatorRequestWindowSize, self).test_initiator() self.pg0.enable_capture() self.pg_start() - ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little') + ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little") self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) capture = self.pg0.get_capture(2) @@ -1776,18 +1943,18 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk): @tag_fixme_vpp_workers class TestInitiatorRekey(TestInitiatorPsk): - """ test ikev2 initiator - rekey """ + """test ikev2 initiator - rekey""" def rekey_from_initiator(self): - ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little') + ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little") self.pg0.enable_capture() self.pg_start() self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) capture = self.pg0.get_capture(1) ih = self.get_ike_header(capture[0]) self.assertEqual(ih.exch_type, 36) # CHILD_SA - self.assertNotIn('Response', ih.flags) - self.assertIn('Initiator', ih.flags) + self.assertNotIn("Response", ih.flags) + self.assertIn("Initiator", ih.flags) plain = self.sa.hmac_and_decrypt(ih) sa = ikev2.IKEv2_payload_SA(plain) prop = sa[ikev2.IKEv2_payload_Proposal] @@ -1797,12 +1964,18 @@ class TestInitiatorRekey(TestInitiatorPsk): self.sa.child_sas[0].ispi = prop.SPI self.sa.child_sas[0].rspi = prop.SPI self.sa.calc_child_keys() - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Response', exch_type=36, - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, sa, 'SA') - packet = self.create_packet(self.pg0, resp, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + flags="Response", + exch_type=36, + id=ih.id, + next_payload="Encrypted", + ) + resp = self.encrypt_ike_msg(header, sa, "SA") + packet = self.create_packet( + self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) self.send_and_assert_no_replies(self.pg0, packet) def test_initiator(self): @@ -1814,45 +1987,51 @@ class TestInitiatorRekey(TestInitiatorPsk): @tag_fixme_vpp_workers class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params): - """ test ikev2 initiator - delete IKE SA from responder """ + """test ikev2 initiator - delete IKE SA from responder""" def config_tc(self): - self.config_params({ - 'del_sa_from_responder': True, - 'is_initiator': False, # seen from test case perspective - # thus vpp is initiator - 'responder': {'sw_if_index': self.pg0.sw_if_index, - 'addr': self.pg0.remote_ip4}, - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '3072MODPgr', - 'ike_transforms': { - 'crypto_alg': 20, # "aes-gcm-16" - 'crypto_key_size': 256, - 'dh_group': 15, # "modp-3072" - }, - 'esp_transforms': { - 'crypto_alg': 12, # "aes-cbc" - 'crypto_key_size': 256, - # "hmac-sha2-256-128" - 'integ_alg': 12}, - 'no_idr_in_auth': True}) + self.config_params( + { + "del_sa_from_responder": True, + "is_initiator": False, # seen from test case perspective + # thus vpp is initiator + "responder": { + "sw_if_index": self.pg0.sw_if_index, + "addr": self.pg0.remote_ip4, + }, + "ike-crypto": ("AES-GCM-16ICV", 32), + "ike-integ": "NULL", + "ike-dh": "3072MODPgr", + "ike_transforms": { + "crypto_alg": 20, # "aes-gcm-16" + "crypto_key_size": 256, + "dh_group": 15, # "modp-3072" + }, + "esp_transforms": { + "crypto_alg": 12, # "aes-cbc" + "crypto_key_size": 256, + # "hmac-sha2-256-128" + "integ_alg": 12, + }, + "no_idr_in_auth": True, + } + ) @tag_fixme_vpp_workers class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params): - """ test ikev2 responder - initiator behind NAT """ + """test ikev2 responder - initiator behind NAT""" - IKE_NODE_SUFFIX = 'ip4-natt' + IKE_NODE_SUFFIX = "ip4-natt" def config_tc(self): - self.config_params( - {'i_natt': True}) + self.config_params({"i_natt": True}) @tag_fixme_vpp_workers class TestResponderPsk(TemplateResponder, Ikev2Params): - """ test ikev2 responder - pre shared key auth """ + """test ikev2 responder - pre shared key auth""" + def config_tc(self): self.config_params() @@ -1862,8 +2041,9 @@ class TestResponderDpd(TestResponderPsk): """ Dead peer detection test """ + def config_tc(self): - self.config_params({'dpd_disabled': False}) + self.config_params({"dpd_disabled": False}) def tearDown(self): pass @@ -1878,7 +2058,7 @@ class TestResponderDpd(TestResponderPsk): ih = self.get_ike_header(capture[0]) self.assertEqual(ih.exch_type, 37) # INFORMATIONAL plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') + self.assertEqual(plain, b"") # wait for SA expiration time.sleep(3) ike_sas = self.vapi.ikev2_sa_dump() @@ -1889,7 +2069,7 @@ class TestResponderDpd(TestResponderPsk): @tag_fixme_vpp_workers class TestResponderRekey(TestResponderPsk): - """ test ikev2 responder - rekey """ + """test ikev2 responder - rekey""" def rekey_from_initiator(self): packet = self.create_rekey_request() @@ -1911,18 +2091,19 @@ class TestResponderRekey(TestResponderPsk): self.sa.calc_child_keys() self.verify_ike_sas() self.verify_ipsec_sas(is_rekey=True) - self.assert_counter(1, 'rekey_req', 'ip4') + self.assert_counter(1, "rekey_req", "ip4") r = self.vapi.ikev2_sa_dump() self.assertEqual(r[0].sa.stats.n_rekey_req, 1) class TestResponderVrf(TestResponderPsk, Ikev2Params): - """ test ikev2 responder - non-default table id """ + """test ikev2 responder - non-default table id""" @classmethod def setUpClass(cls): import scapy.contrib.ikev2 as _ikev2 - globals()['ikev2'] = _ikev2 + + globals()["ikev2"] = _ikev2 super(IkePeer, cls).setUpClass() cls.create_pg_interfaces(range(1)) cls.vapi.cli("ip table add 1") @@ -1935,7 +2116,7 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params): i.resolve_ndp() def config_tc(self): - self.config_params({'dpd_disabled': False}) + self.config_params({"dpd_disabled": False}) def test_responder(self): self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1) @@ -1946,53 +2127,67 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params): ih = self.get_ike_header(capture[0]) self.assertEqual(ih.exch_type, 37) # INFORMATIONAL plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') + self.assertEqual(plain, b"") @tag_fixme_vpp_workers class TestResponderRsaSign(TemplateResponder, Ikev2Params): - """ test ikev2 responder - cert based auth """ + """test ikev2 responder - cert based auth""" + def config_tc(self): - self.config_params({ - 'udp_encap': True, - 'auth': 'rsa-sig', - 'server-key': 'server-key.pem', - 'client-key': 'client-key.pem', - 'client-cert': 'client-cert.pem', - 'server-cert': 'server-cert.pem'}) + self.config_params( + { + "udp_encap": True, + "auth": "rsa-sig", + "server-key": "server-key.pem", + "client-key": "client-key.pem", + "client-cert": "client-cert.pem", + "server-cert": "server-cert.pem", + } + ) @tag_fixme_vpp_workers -class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\ - (TemplateResponder, Ikev2Params): +class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192( + TemplateResponder, Ikev2Params +): """ IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192 """ + def config_tc(self): - self.config_params({ - 'ike-crypto': ('AES-CBC', 16), - 'ike-integ': 'SHA2-256-128', - 'esp-crypto': ('AES-CBC', 24), - 'esp-integ': 'SHA2-384-192', - 'ike-dh': '2048MODPgr', - 'nonce': os.urandom(256), - 'no_idr_in_auth': True}) + self.config_params( + { + "ike-crypto": ("AES-CBC", 16), + "ike-integ": "SHA2-256-128", + "esp-crypto": ("AES-CBC", 24), + "esp-integ": "SHA2-384-192", + "ike-dh": "2048MODPgr", + "nonce": os.urandom(256), + "no_idr_in_auth": True, + } + ) @tag_fixme_vpp_workers -class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\ - (TemplateResponder, Ikev2Params): +class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16( + TemplateResponder, Ikev2Params +): """ IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16 """ + def config_tc(self): - self.config_params({ - 'ike-crypto': ('AES-CBC', 32), - 'ike-integ': 'SHA2-256-128', - 'esp-crypto': ('AES-GCM-16ICV', 32), - 'esp-integ': 'NULL', - 'ike-dh': '3072MODPgr'}) + self.config_params( + { + "ike-crypto": ("AES-CBC", 32), + "ike-integ": "SHA2-256-128", + "esp-crypto": ("AES-GCM-16ICV", 32), + "esp-integ": "NULL", + "ike-dh": "3072MODPgr", + } + ) @tag_fixme_vpp_workers @@ -2001,20 +2196,21 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params): IKE:AES_GCM_16_256 """ - IKE_NODE_SUFFIX = 'ip6' + IKE_NODE_SUFFIX = "ip6" def config_tc(self): - self.config_params({ - 'del_sa_from_responder': True, - 'ip6': True, - 'natt': True, - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '2048MODPgr', - 'loc_ts': {'start_addr': 'ab:cd::0', - 'end_addr': 'ab:cd::10'}, - 'rem_ts': {'start_addr': '11::0', - 'end_addr': '11::100'}}) + self.config_params( + { + "del_sa_from_responder": True, + "ip6": True, + "natt": True, + "ike-crypto": ("AES-GCM-16ICV", 32), + "ike-integ": "NULL", + "ike-dh": "2048MODPgr", + "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"}, + "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"}, + } + ) @tag_fixme_vpp_workers @@ -2032,8 +2228,8 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk): ih = self.get_ike_header(capture[0]) self.assertEqual(ih.id, self.sa.msg_id) plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') - self.assert_counter(1, 'keepalive', 'ip4') + self.assertEqual(plain, b"") + self.assert_counter(1, "keepalive", "ip4") r = self.vapi.ikev2_sa_dump() self.assertEqual(1, r[0].sa.stats.n_keepalives) @@ -2043,7 +2239,7 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk): class TestMalformedMessages(TemplateResponder, Ikev2Params): - """ malformed packet test """ + """malformed packet test""" def tearDown(self): pass @@ -2052,23 +2248,26 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params): self.config_params() def create_ike_init_msg(self, length=None, payload=None): - msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8, - flags='Initiator', exch_type='IKE_SA_INIT') + msg = ikev2.IKEv2( + length=length, + init_SPI="\x11" * 8, + flags="Initiator", + exch_type="IKE_SA_INIT", + ) if payload is not None: msg /= payload - return self.create_packet(self.pg0, msg, self.sa.sport, - self.sa.dport) + return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport) def verify_bad_packet_length(self): - ike_msg = self.create_ike_init_msg(length=0xdead) + ike_msg = self.create_ike_init_msg(length=0xDEAD) self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'bad_length') + self.assert_counter(self.pkt_count, "bad_length") def verify_bad_sa_payload_length(self): - p = ikev2.IKEv2_payload_SA(length=0xdead) + p = ikev2.IKEv2_payload_SA(length=0xDEAD) ike_msg = self.create_ike_init_msg(payload=p) self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'malformed_packet') + self.assert_counter(self.pkt_count, "malformed_packet") def test_responder(self): self.pkt_count = 254 @@ -2076,5 +2275,5 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params): self.verify_bad_sa_payload_length() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)