tests: replace pycodestyle with black
[vpp.git] / test / test_ikev2.py
index 58a7ec3..5b699dd 100644 (file)
@@ -38,7 +38,9 @@ GCM_IV_SIZE = 8
 # defined in rfc3526
 # tuple structure is (p, g, key_len)
 DH = {
-    '2048MODPgr': (long_converter("""
+    "2048MODPgr": (
+        long_converter(
+            """
     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
@@ -49,9 +51,14 @@ DH = {
     670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
     E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
     DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
-    15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
-
-    '3072MODPgr': (long_converter("""
+    15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
+        ),
+        2,
+        256,
+    ),
+    "3072MODPgr": (
+        long_converter(
+            """
     FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
     29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
     EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
@@ -67,7 +74,11 @@ DH = {
     ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
     F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
     BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
-    43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
+    43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
+        ),
+        2,
+        384,
+    ),
 }
 
 
@@ -79,7 +90,7 @@ class CryptoAlgo(object):
         if self.cipher is not None:
             self.bs = self.cipher.block_size // 8
 
-            if self.name == 'AES-GCM-16ICV':
+            if self.name == "AES-GCM-16ICV":
                 self.iv_len = GCM_IV_SIZE
             else:
                 self.iv_len = self.bs
@@ -87,14 +98,16 @@ class CryptoAlgo(object):
     def encrypt(self, data, key, aad=None):
         iv = os.urandom(self.iv_len)
         if aad is None:
-            encryptor = Cipher(self.cipher(key), self.mode(iv),
-                               default_backend()).encryptor()
+            encryptor = Cipher(
+                self.cipher(key), self.mode(iv), default_backend()
+            ).encryptor()
             return iv + encryptor.update(data) + encryptor.finalize()
         else:
             salt = key[-SALT_SIZE:]
             nonce = salt + iv
-            encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
-                               default_backend()).encryptor()
+            encryptor = Cipher(
+                self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
+            ).encryptor()
             encryptor.authenticate_additional_data(aad)
             data = encryptor.update(data) + encryptor.finalize()
             data += encryptor.tag[:GCM_ICV_SIZE]
@@ -102,26 +115,26 @@ class CryptoAlgo(object):
 
     def decrypt(self, data, key, aad=None, icv=None):
         if aad is None:
-            iv = data[:self.iv_len]
-            ct = data[self.iv_len:]
-            decryptor = Cipher(algorithms.AES(key),
-                               self.mode(iv),
-                               default_backend()).decryptor()
+            iv = data[: self.iv_len]
+            ct = data[self.iv_len :]
+            decryptor = Cipher(
+                algorithms.AES(key), self.mode(iv), default_backend()
+            ).decryptor()
             return decryptor.update(ct) + decryptor.finalize()
         else:
             salt = key[-SALT_SIZE:]
             nonce = salt + data[:GCM_IV_SIZE]
             ct = data[GCM_IV_SIZE:]
             key = key[:-SALT_SIZE]
-            decryptor = Cipher(algorithms.AES(key),
-                               self.mode(nonce, icv, len(icv)),
-                               default_backend()).decryptor()
+            decryptor = Cipher(
+                algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
+            ).decryptor()
             decryptor.authenticate_additional_data(aad)
             return decryptor.update(ct) + decryptor.finalize()
 
     def pad(self, data):
         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
-        data = data + b'\x00' * (pad_len - 1)
+        data = data + b"\x00" * (pad_len - 1)
         return data + bytes([pad_len - 1])
 
 
@@ -135,36 +148,34 @@ class AuthAlgo(object):
 
 
 CRYPTO_ALGOS = {
-    'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
-    'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
-    'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
-                                mode=modes.GCM),
+    "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
+    "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
+    "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
 }
 
 AUTH_ALGOS = {
-    'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
-    'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
-    'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
-    'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
-    'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
+    "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
+    "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
+    "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
+    "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
+    "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
 }
 
 PRF_ALGOS = {
-    'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
-    'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
-                                  hashes.SHA256, 32),
+    "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
+    "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
 }
 
 CRYPTO_IDS = {
-    12: 'AES-CBC',
-    20: 'AES-GCM-16ICV',
+    12: "AES-CBC",
+    20: "AES-GCM-16ICV",
 }
 
 INTEG_IDS = {
-    2: 'HMAC-SHA1-96',
-    12: 'SHA2-256-128',
-    13: 'SHA2-384-192',
-    14: 'SHA2-512-256',
+    2: "HMAC-SHA1-96",
+    12: "SHA2-256-128",
+    13: "SHA2-384-192",
+    14: "SHA2-512-256",
 }
 
 
@@ -182,11 +193,24 @@ class IKEv2ChildSA(object):
 
 
 class IKEv2SA(object):
-    def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
-                 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
-                 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
-                 auth_method='shared-key', priv_key=None, i_natt=False,
-                 r_natt=False, udp_encap=False):
+    def __init__(
+        self,
+        test,
+        is_initiator=True,
+        i_id=None,
+        r_id=None,
+        spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
+        id_type="fqdn",
+        nonce=None,
+        auth_data=None,
+        local_ts=None,
+        remote_ts=None,
+        auth_method="shared-key",
+        priv_key=None,
+        i_natt=False,
+        r_natt=False,
+        udp_encap=False,
+    ):
         self.udp_encap = udp_encap
         self.i_natt = i_natt
         self.r_natt = r_natt
@@ -211,15 +235,14 @@ class IKEv2SA(object):
             self.id_type = id_type
         self.auth_method = auth_method
         if self.is_initiator:
-            self.rspi = 8 * b'\x00'
+            self.rspi = 8 * b"\x00"
             self.ispi = spi
             self.i_nonce = nonce
         else:
             self.rspi = spi
-            self.ispi = 8 * b'\x00'
+            self.ispi = 8 * b"\x00"
             self.r_nonce = nonce
-        self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
-                          self.is_initiator)]
+        self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
 
     def new_msg_id(self):
         self.msg_id += 1
@@ -245,13 +268,14 @@ class IKEv2SA(object):
         priv = self.dh_private_key
         peer = self.peer_dh_pub_key
         p, g, l = self.ike_group
-        return pow(int.from_bytes(peer, 'big'),
-                   int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
+        return pow(
+            int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
+        ).to_bytes(l, "big")
 
     def generate_dh_data(self):
         # generate DH keys
         if self.ike_dh not in DH:
-            raise NotImplementedError('%s not in DH group' % self.ike_dh)
+            raise NotImplementedError("%s not in DH group" % self.ike_dh)
 
         if self.dh_params is None:
             dhg = DH[self.ike_dh]
@@ -261,13 +285,13 @@ class IKEv2SA(object):
         priv = self.dh_params.generate_private_key()
         pub = priv.public_key()
         x = priv.private_numbers().x
-        self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
+        self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
         y = pub.public_numbers().y
 
         if self.is_initiator:
-            self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
+            self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
         else:
-            self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
+            self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
 
     def complete_dh_data(self):
         self.dh_shared_secret = self.compute_secret()
@@ -281,41 +305,39 @@ class IKEv2SA(object):
         integ_key_len = self.esp_integ_alg.key_len
         salt_len = 0 if integ_key_len else 4
 
-        l = (integ_key_len * 2 +
-             encr_key_len * 2 +
-             salt_len * 2)
+        l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
         keymat = self.calc_prfplus(prf, self.sk_d, s, l)
 
         pos = 0
-        c.sk_ei = keymat[pos:pos+encr_key_len]
+        c.sk_ei = keymat[pos : pos + encr_key_len]
         pos += encr_key_len
 
         if integ_key_len:
-            c.sk_ai = keymat[pos:pos+integ_key_len]
+            c.sk_ai = keymat[pos : pos + integ_key_len]
             pos += integ_key_len
         else:
-            c.salt_ei = keymat[pos:pos+salt_len]
+            c.salt_ei = keymat[pos : pos + salt_len]
             pos += salt_len
 
-        c.sk_er = keymat[pos:pos+encr_key_len]
+        c.sk_er = keymat[pos : pos + encr_key_len]
         pos += encr_key_len
 
         if integ_key_len:
-            c.sk_ar = keymat[pos:pos+integ_key_len]
+            c.sk_ar = keymat[pos : pos + integ_key_len]
             pos += integ_key_len
         else:
-            c.salt_er = keymat[pos:pos+salt_len]
+            c.salt_er = keymat[pos : pos + salt_len]
             pos += salt_len
 
     def calc_prfplus(self, prf, key, seed, length):
-        r = b''
+        r = b""
         t = None
         x = 1
         while len(r) < length and x < 255:
             if t is not None:
                 s = t
             else:
-                s = b''
+                s = b""
             s = s + seed + bytes([x])
             t = self.calc_prf(prf, key, s)
             r = r + t
@@ -348,30 +370,32 @@ class IKEv2SA(object):
         else:
             salt_size = 0
 
-        l = (prf_key_trunc +
-             integ_key_len * 2 +
-             encr_key_len * 2 +
-             tr_prf_key_len * 2 +
-             salt_size * 2)
+        l = (
+            prf_key_trunc
+            + integ_key_len * 2
+            + encr_key_len * 2
+            + tr_prf_key_len * 2
+            + salt_size * 2
+        )
         keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
 
         pos = 0
-        self.sk_d = keymat[:pos+prf_key_trunc]
+        self.sk_d = keymat[: pos + prf_key_trunc]
         pos += prf_key_trunc
 
-        self.sk_ai = keymat[pos:pos+integ_key_len]
+        self.sk_ai = keymat[pos : pos + integ_key_len]
         pos += integ_key_len
-        self.sk_ar = keymat[pos:pos+integ_key_len]
+        self.sk_ar = keymat[pos : pos + integ_key_len]
         pos += integ_key_len
 
-        self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
+        self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
         pos += encr_key_len + salt_size
-        self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
+        self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
         pos += encr_key_len + salt_size
 
-        self.sk_pi = keymat[pos:pos+tr_prf_key_len]
+        self.sk_pi = keymat[pos : pos + tr_prf_key_len]
         pos += tr_prf_key_len
-        self.sk_pr = keymat[pos:pos+tr_prf_key_len]
+        self.sk_pr = keymat[pos : pos + tr_prf_key_len]
 
     def generate_authmsg(self, prf, packet):
         if self.is_initiator:
@@ -393,14 +417,15 @@ class IKEv2SA(object):
         else:
             packet = self.init_resp_packet
         authmsg = self.generate_authmsg(prf, raw(packet))
-        if self.auth_method == 'shared-key':
+        if self.auth_method == "shared-key":
             psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
             self.auth_data = self.calc_prf(prf, psk, authmsg)
-        elif self.auth_method == 'rsa-sig':
-            self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
-                                                hashes.SHA1())
+        elif self.auth_method == "rsa-sig":
+            self.auth_data = self.priv_key.sign(
+                authmsg, padding.PKCS1v15(), hashes.SHA1()
+            )
         else:
-            raise TypeError('unknown auth method type!')
+            raise TypeError("unknown auth method type!")
 
     def encrypt(self, data, aad=None):
         data = self.ike_crypto_alg.pad(data)
@@ -431,7 +456,7 @@ class IKEv2SA(object):
         return self.sk_ei
 
     def concat(self, alg, key_len):
-        return alg + '-' + str(key_len * 8)
+        return alg + "-" + str(key_len * 8)
 
     @property
     def vpp_ike_cypto_alg(self):
@@ -445,8 +470,9 @@ class IKEv2SA(object):
         integ_trunc = self.ike_integ_alg.trunc_len
         exp_hmac = ikemsg[-integ_trunc:]
         data = ikemsg[:-integ_trunc]
-        computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
-                                          self.peer_authkey, data)
+        computed_hmac = self.compute_hmac(
+            self.ike_integ_alg.mod(), self.peer_authkey, data
+        )
         self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
 
     def compute_hmac(self, integ, key, data):
@@ -459,7 +485,7 @@ class IKEv2SA(object):
 
     def hmac_and_decrypt(self, ike):
         ep = ike[ikev2.IKEv2_payload_Encrypted]
-        if self.ike_crypto == 'AES-GCM-16ICV':
+        if self.ike_crypto == "AES-GCM-16ICV":
             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
             ct = ep.load[:-GCM_ICV_SIZE]
             tag = ep.load[-GCM_ICV_SIZE:]
@@ -473,26 +499,26 @@ class IKEv2SA(object):
             plain = self.decrypt(ct)
         # remove padding
         pad_len = plain[-1]
-        return plain[:-pad_len - 1]
+        return plain[: -pad_len - 1]
 
     def build_ts_addr(self, ts, version):
-        return {'starting_address_v' + version: ts['start_addr'],
-                'ending_address_v' + version: ts['end_addr']}
+        return {
+            "starting_address_v" + version: ts["start_addr"],
+            "ending_address_v" + version: ts["end_addr"],
+        }
 
     def generate_ts(self, is_ip4):
         c = self.child_sas[0]
-        ts_data = {'IP_protocol_ID': 0,
-                   'start_port': 0,
-                   'end_port': 0xffff}
+        ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
         if is_ip4:
-            ts_data.update(self.build_ts_addr(c.local_ts, '4'))
+            ts_data.update(self.build_ts_addr(c.local_ts, "4"))
             ts1 = ikev2.IPv4TrafficSelector(**ts_data)
-            ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
+            ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
             ts2 = ikev2.IPv4TrafficSelector(**ts_data)
         else:
-            ts_data.update(self.build_ts_addr(c.local_ts, '6'))
+            ts_data.update(self.build_ts_addr(c.local_ts, "6"))
             ts1 = ikev2.IPv6TrafficSelector(**ts_data)
-            ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
+            ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
             ts2 = ikev2.IPv6TrafficSelector(**ts_data)
 
         if self.is_initiator:
@@ -501,18 +527,18 @@ class IKEv2SA(object):
 
     def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
         if crypto not in CRYPTO_ALGOS:
-            raise TypeError('unsupported encryption algo %r' % crypto)
+            raise TypeError("unsupported encryption algo %r" % crypto)
         self.ike_crypto = crypto
         self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
         self.ike_crypto_key_len = crypto_key_len
 
         if integ not in AUTH_ALGOS:
-            raise TypeError('unsupported auth algo %r' % integ)
-        self.ike_integ = None if integ == 'NULL' else integ
+            raise TypeError("unsupported auth algo %r" % integ)
+        self.ike_integ = None if integ == "NULL" else integ
         self.ike_integ_alg = AUTH_ALGOS[integ]
 
         if prf not in PRF_ALGOS:
-            raise TypeError('unsupported prf algo %r' % prf)
+            raise TypeError("unsupported prf algo %r" % prf)
         self.ike_prf = prf
         self.ike_prf_alg = PRF_ALGOS[prf]
         self.ike_dh = dh
@@ -521,20 +547,20 @@ class IKEv2SA(object):
     def set_esp_props(self, crypto, crypto_key_len, integ):
         self.esp_crypto_key_len = crypto_key_len
         if crypto not in CRYPTO_ALGOS:
-            raise TypeError('unsupported encryption algo %r' % crypto)
+            raise TypeError("unsupported encryption algo %r" % crypto)
         self.esp_crypto = crypto
         self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
 
         if integ not in AUTH_ALGOS:
-            raise TypeError('unsupported auth algo %r' % integ)
-        self.esp_integ = None if integ == 'NULL' else integ
+            raise TypeError("unsupported auth algo %r" % integ)
+        self.esp_integ = None if integ == "NULL" else integ
         self.esp_integ_alg = AUTH_ALGOS[integ]
 
     def crypto_attr(self, key_len):
-        if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
-            return (0x800e << 16 | key_len << 3, 12)
+        if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
+            return (0x800E << 16 | key_len << 3, 12)
         else:
-            raise Exception('unsupported attribute type')
+            raise Exception("unsupported attribute type")
 
     def ike_crypto_attr(self):
         return self.crypto_attr(self.ike_crypto_key_len)
@@ -545,19 +571,20 @@ class IKEv2SA(object):
     def compute_nat_sha1(self, ip, port, rspi=None):
         if rspi is None:
             rspi = self.rspi
-        data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
+        data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
         digest.update(data)
         return digest.finalize()
 
 
 class IkePeer(VppTestCase):
-    """ common class for initiator and responder """
+    """common class for initiator and responder"""
 
     @classmethod
     def setUpClass(cls):
         import scapy.contrib.ikev2 as _ikev2
-        globals()['ikev2'] = _ikev2
+
+        globals()["ikev2"] = _ikev2
         super(IkePeer, cls).setUpClass()
         cls.create_pg_interfaces(range(2))
         for i in cls.pg_interfaces:
@@ -591,36 +618,46 @@ class IkePeer(VppTestCase):
         self.assertIsNotNone(self.p.query_vpp_config())
         if self.sa.is_initiator:
             self.sa.generate_dh_data()
-        self.vapi.cli('ikev2 set logging level 4')
-        self.vapi.cli('event-lo clear')
+        self.vapi.cli("ikev2 set logging level 4")
+        self.vapi.cli("event-lo clear")
 
-    def assert_counter(self, count, name, version='ip4'):
-        node_name = '/err/ikev2-%s/' % version + name
+    def assert_counter(self, count, name, version="ip4"):
+        node_name = "/err/ikev2-%s/" % version + name
         self.assertEqual(count, self.statistics.get_err_counter(node_name))
 
     def create_rekey_request(self):
         sa, first_payload = self.generate_auth_payload(is_rekey=True)
         header = ikev2.IKEv2(
-                init_SPI=self.sa.ispi,
-                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
-                flags='Initiator', exch_type='CREATE_CHILD_SA')
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            id=self.sa.new_msg_id(),
+            flags="Initiator",
+            exch_type="CREATE_CHILD_SA",
+        )
 
         ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
-        return self.create_packet(self.pg0, ike_msg, self.sa.sport,
-                                  self.sa.dport, self.sa.natt, self.ip6)
+        return self.create_packet(
+            self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
 
     def create_empty_request(self):
-        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                             id=self.sa.new_msg_id(), flags='Initiator',
-                             exch_type='INFORMATIONAL',
-                             next_payload='Encrypted')
-
-        msg = self.encrypt_ike_msg(header, b'', None)
-        return self.create_packet(self.pg0, msg, self.sa.sport,
-                                  self.sa.dport, self.sa.natt, self.ip6)
-
-    def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
-                      use_ip6=False):
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            id=self.sa.new_msg_id(),
+            flags="Initiator",
+            exch_type="INFORMATIONAL",
+            next_payload="Encrypted",
+        )
+
+        msg = self.encrypt_ike_msg(header, b"", None)
+        return self.create_packet(
+            self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
+
+    def create_packet(
+        self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
+    ):
         if use_ip6:
             src_ip = src_if.remote_ip6
             dst_ip = src_if.local_ip6
@@ -629,12 +666,14 @@ class IkePeer(VppTestCase):
             src_ip = src_if.remote_ip4
             dst_ip = src_if.local_ip4
             ip_layer = IP
-        res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
-               ip_layer(src=src_ip, dst=dst_ip) /
-               UDP(sport=sport, dport=dport))
+        res = (
+            Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+            / ip_layer(src=src_ip, dst=dst_ip)
+            / UDP(sport=sport, dport=dport)
+        )
         if natt:
             # insert non ESP marker
-            res = res / Raw(b'\x00' * 4)
+            res = res / Raw(b"\x00" * 4)
         return res / msg
 
     def verify_udp(self, udp):
@@ -651,7 +690,7 @@ class IkePeer(VppTestCase):
             esp = packet[ESP]
             ih = self.verify_and_remove_non_esp_marker(esp)
         self.assertEqual(ih.version, 0x20)
-        self.assertNotIn('Version', ih.flags)
+        self.assertNotIn("Version", ih.flags)
         return ih
 
     def verify_and_remove_non_esp_marker(self, packet):
@@ -659,26 +698,32 @@ class IkePeer(VppTestCase):
             # if we are in nat traversal mode check for non esp marker
             # and remove it
             data = raw(packet)
-            self.assertEqual(data[:4], b'\x00' * 4)
+            self.assertEqual(data[:4], b"\x00" * 4)
             return ikev2.IKEv2(data[4:])
         else:
             return packet
 
     def encrypt_ike_msg(self, header, plain, first_payload):
-        if self.sa.ike_crypto == 'AES-GCM-16ICV':
+        if self.sa.ike_crypto == "AES-GCM-16ICV":
             data = self.sa.ike_crypto_alg.pad(raw(plain))
-            plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
-                len(ikev2.IKEv2_payload_Encrypted())
+            plen = (
+                len(data)
+                + GCM_IV_SIZE
+                + GCM_ICV_SIZE
+                + len(ikev2.IKEv2_payload_Encrypted())
+            )
             tlen = plen + len(ikev2.IKEv2())
 
             # prepare aad data
-            sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
-                                                 length=plen)
+            sk_p = ikev2.IKEv2_payload_Encrypted(
+                next_payload=first_payload, length=plen
+            )
             header.length = tlen
             res = header / sk_p
             encr = self.sa.encrypt(raw(plain), raw(res))
-            sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
-                                                 length=plen, load=encr)
+            sk_p = ikev2.IKEv2_payload_Encrypted(
+                next_payload=first_payload, length=plen, load=encr
+            )
             res = header / sk_p
         else:
             encr = self.sa.encrypt(raw(plain))
@@ -686,16 +731,18 @@ class IkePeer(VppTestCase):
             plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
             tlen = plen + len(ikev2.IKEv2())
 
-            sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
-                                                 length=plen, load=encr)
+            sk_p = ikev2.IKEv2_payload_Encrypted(
+                next_payload=first_payload, length=plen, load=encr
+            )
             header.length = tlen
             res = header / sk_p
 
             integ_data = raw(res)
-            hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
-                                             self.sa.my_authkey, integ_data)
+            hmac_data = self.sa.compute_hmac(
+                self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
+            )
             res = res / Raw(hmac_data[:trunc_len])
-        assert(len(res) == tlen)
+        assert len(res) == tlen
         return res
 
     def verify_udp_encap(self, ipsec_sa):
@@ -747,37 +794,37 @@ class IkePeer(VppTestCase):
         # verify crypto keys
         self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
         self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
-        self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
-        self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
+        self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
+        self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
 
         # verify integ keys
         if vpp_integ_alg:
             self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
             self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
-            self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
-            self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
+            self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
+            self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
         else:
-            self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
-            self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
+            self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
+            self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
 
     def verify_keymat(self, api_keys, keys, name):
         km = getattr(keys, name)
         api_km = getattr(api_keys, name)
-        api_km_len = getattr(api_keys, name + '_len')
+        api_km_len = getattr(api_keys, name + "_len")
         self.assertEqual(len(km), api_km_len)
         self.assertEqual(km, api_km[:api_km_len])
 
     def verify_id(self, api_id, exp_id):
         self.assertEqual(api_id.type, IDType.value(exp_id.type))
         self.assertEqual(api_id.data_len, exp_id.data_len)
-        self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
+        self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
 
     def verify_ike_sas(self):
         r = self.vapi.ikev2_sa_dump()
         self.assertEqual(len(r), 1)
         sa = r[0].sa
-        self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
-        self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
+        self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
+        self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
         if self.ip6:
             if self.sa.is_initiator:
                 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
@@ -792,55 +839,53 @@ class IkePeer(VppTestCase):
             else:
                 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
                 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
-        self.verify_keymat(sa.keys, self.sa, 'sk_d')
-        self.verify_keymat(sa.keys, self.sa, 'sk_ai')
-        self.verify_keymat(sa.keys, self.sa, 'sk_ar')
-        self.verify_keymat(sa.keys, self.sa, 'sk_ei')
-        self.verify_keymat(sa.keys, self.sa, 'sk_er')
-        self.verify_keymat(sa.keys, self.sa, 'sk_pi')
-        self.verify_keymat(sa.keys, self.sa, 'sk_pr')
+        self.verify_keymat(sa.keys, self.sa, "sk_d")
+        self.verify_keymat(sa.keys, self.sa, "sk_ai")
+        self.verify_keymat(sa.keys, self.sa, "sk_ar")
+        self.verify_keymat(sa.keys, self.sa, "sk_ei")
+        self.verify_keymat(sa.keys, self.sa, "sk_er")
+        self.verify_keymat(sa.keys, self.sa, "sk_pi")
+        self.verify_keymat(sa.keys, self.sa, "sk_pr")
 
         self.assertEqual(sa.i_id.type, self.sa.id_type)
         self.assertEqual(sa.r_id.type, self.sa.id_type)
         self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
         self.assertEqual(sa.r_id.data_len, len(self.idr))
-        self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
-        self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr)
+        self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
+        self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
 
         r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
         self.assertEqual(len(r), 1)
         csa = r[0].child_sa
         self.assertEqual(csa.sa_index, sa.sa_index)
         c = self.sa.child_sas[0]
-        if hasattr(c, 'sk_ai'):
-            self.verify_keymat(csa.keys, c, 'sk_ai')
-            self.verify_keymat(csa.keys, c, 'sk_ar')
-        self.verify_keymat(csa.keys, c, 'sk_ei')
-        self.verify_keymat(csa.keys, c, 'sk_er')
-        self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
-        self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
+        if hasattr(c, "sk_ai"):
+            self.verify_keymat(csa.keys, c, "sk_ai")
+            self.verify_keymat(csa.keys, c, "sk_ar")
+        self.verify_keymat(csa.keys, c, "sk_ei")
+        self.verify_keymat(csa.keys, c, "sk_er")
+        self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
+        self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
         tsi = tsi[0]
         tsr = tsr[0]
         r = self.vapi.ikev2_traffic_selector_dump(
-                is_initiator=True, sa_index=sa.sa_index,
-                child_sa_index=csa.child_sa_index)
+            is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+        )
         self.assertEqual(len(r), 1)
         ts = r[0].ts
         self.verify_ts(r[0].ts, tsi[0], True)
 
         r = self.vapi.ikev2_traffic_selector_dump(
-                is_initiator=False, sa_index=sa.sa_index,
-                child_sa_index=csa.child_sa_index)
+            is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+        )
         self.assertEqual(len(r), 1)
         self.verify_ts(r[0].ts, tsr[0], False)
 
-        n = self.vapi.ikev2_nonce_get(is_initiator=True,
-                                      sa_index=sa.sa_index)
+        n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
         self.verify_nonce(n, self.sa.i_nonce)
-        n = self.vapi.ikev2_nonce_get(is_initiator=False,
-                                      sa_index=sa.sa_index)
+        n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
         self.verify_nonce(n, self.sa.r_nonce)
 
     def verify_nonce(self, api_nonce, nonce):
@@ -854,61 +899,65 @@ class IkePeer(VppTestCase):
             self.assertFalse(api_ts.is_local)
 
         if self.p.ts_is_ip4:
-            self.assertEqual(api_ts.start_addr,
-                             IPv4Address(ts.starting_address_v4))
-            self.assertEqual(api_ts.end_addr,
-                             IPv4Address(ts.ending_address_v4))
+            self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
+            self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
         else:
-            self.assertEqual(api_ts.start_addr,
-                             IPv6Address(ts.starting_address_v6))
-            self.assertEqual(api_ts.end_addr,
-                             IPv6Address(ts.ending_address_v6))
+            self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
+            self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
         self.assertEqual(api_ts.start_port, ts.start_port)
         self.assertEqual(api_ts.end_port, ts.end_port)
         self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
 
 
 class TemplateInitiator(IkePeer):
-    """ initiator test template """
+    """initiator test template"""
 
     def initiate_del_sa_from_initiator(self):
-        ispi = int.from_bytes(self.sa.ispi, 'little')
+        ispi = int.from_bytes(self.sa.ispi, "little")
         self.pg0.enable_capture()
         self.pg_start()
         self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
         capture = self.pg0.get_capture(1)
         ih = self.get_ike_header(capture[0])
-        self.assertNotIn('Response', ih.flags)
-        self.assertIn('Initiator', ih.flags)
+        self.assertNotIn("Response", ih.flags)
+        self.assertIn("Initiator", ih.flags)
         self.assertEqual(ih.init_SPI, self.sa.ispi)
         self.assertEqual(ih.resp_SPI, self.sa.rspi)
         plain = self.sa.hmac_and_decrypt(ih)
         d = ikev2.IKEv2_payload_Delete(plain)
         self.assertEqual(d.proto, 1)  # proto=IKEv2
-        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                             flags='Response', exch_type='INFORMATIONAL',
-                             id=ih.id, next_payload='Encrypted')
-        resp = self.encrypt_ike_msg(header, b'', None)
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            flags="Response",
+            exch_type="INFORMATIONAL",
+            id=ih.id,
+            next_payload="Encrypted",
+        )
+        resp = self.encrypt_ike_msg(header, b"", None)
         self.send_and_assert_no_replies(self.pg0, resp)
 
     def verify_del_sa(self, packet):
         ih = self.get_ike_header(packet)
         self.assertEqual(ih.id, self.sa.msg_id)
         self.assertEqual(ih.exch_type, 37)  # exchange informational
-        self.assertIn('Response', ih.flags)
-        self.assertIn('Initiator', ih.flags)
+        self.assertIn("Response", ih.flags)
+        self.assertIn("Initiator", ih.flags)
         plain = self.sa.hmac_and_decrypt(ih)
-        self.assertEqual(plain, b'')
+        self.assertEqual(plain, b"")
 
     def initiate_del_sa_from_responder(self):
-        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                             exch_type='INFORMATIONAL',
-                             id=self.sa.new_msg_id())
-        del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
-        ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
-        packet = self.create_packet(self.pg0, ike_msg,
-                                    self.sa.sport, self.sa.dport,
-                                    self.sa.natt, self.ip6)
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            exch_type="INFORMATIONAL",
+            id=self.sa.new_msg_id(),
+        )
+        del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
+        ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
+        packet = self.create_packet(
+            self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
@@ -935,26 +984,28 @@ class TemplateInitiator(IkePeer):
         s = self.find_notify_payload(packet, 16388)
         self.assertIsNotNone(s)
         src_sha = self.sa.compute_nat_sha1(
-                inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
+            inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
+        )
         self.assertEqual(s.load, src_sha)
 
         # NAT_DETECTION_DESTINATION_IP
         s = self.find_notify_payload(packet, 16389)
         self.assertIsNotNone(s)
         dst_sha = self.sa.compute_nat_sha1(
-                inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
+            inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
+        )
         self.assertEqual(s.load, dst_sha)
 
     def verify_sa_init_request(self, packet):
         udp = packet[UDP]
         self.sa.dport = udp.sport
         ih = packet[ikev2.IKEv2]
-        self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
+        self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
         self.assertEqual(ih.exch_type, 34)  # SA_INIT
         self.sa.ispi = ih.init_SPI
-        self.assertEqual(ih.resp_SPI, 8 * b'\x00')
-        self.assertIn('Initiator', ih.flags)
-        self.assertNotIn('Response', ih.flags)
+        self.assertEqual(ih.resp_SPI, 8 * b"\x00")
+        self.assertIn("Initiator", ih.flags)
+        self.assertNotIn("Response", ih.flags)
         self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
         self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
 
@@ -962,20 +1013,23 @@ class TemplateInitiator(IkePeer):
         self.assertEqual(prop.proto, 1)  # proto = ikev2
         self.assertEqual(prop.proposal, 1)
         self.assertEqual(prop.trans[0].transform_type, 1)  # encryption
-        self.assertEqual(prop.trans[0].transform_id,
-                         self.p.ike_transforms['crypto_alg'])
+        self.assertEqual(
+            prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
+        )
         self.assertEqual(prop.trans[1].transform_type, 2)  # prf
         self.assertEqual(prop.trans[1].transform_id, 5)  # "hmac-sha2-256"
         self.assertEqual(prop.trans[2].transform_type, 4)  # dh
-        self.assertEqual(prop.trans[2].transform_id,
-                         self.p.ike_transforms['dh_group'])
+        self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
 
         self.verify_nat_detection(packet)
         self.sa.set_ike_props(
-                    crypto='AES-GCM-16ICV', crypto_key_len=32,
-                    integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
-        self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
-                              integ='SHA2-256-128')
+            crypto="AES-GCM-16ICV",
+            crypto_key_len=32,
+            integ="NULL",
+            prf="PRF_HMAC_SHA2_256",
+            dh="3072MODPgr",
+        )
+        self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
         self.sa.generate_dh_data()
         self.sa.complete_dh_data()
         self.sa.calc_keys()
@@ -995,8 +1049,8 @@ class TemplateInitiator(IkePeer):
         self.assertEqual(ih.resp_SPI, self.sa.rspi)
         self.assertEqual(ih.init_SPI, self.sa.ispi)
         self.assertEqual(ih.exch_type, 35)  # IKE_AUTH
-        self.assertIn('Initiator', ih.flags)
-        self.assertNotIn('Response', ih.flags)
+        self.assertIn("Initiator", ih.flags)
+        self.assertNotIn("Response", ih.flags)
 
         udp = packet[UDP]
         self.verify_udp(udp)
@@ -1013,48 +1067,67 @@ class TemplateInitiator(IkePeer):
         prop = idi[ikev2.IKEv2_payload_Proposal]
         c = self.sa.child_sas[0]
         c.ispi = prop.SPI
-        self.update_esp_transforms(
-                prop[ikev2.IKEv2_payload_Transform], self.sa)
+        self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
 
     def send_init_response(self):
         tr_attr = self.sa.ike_crypto_attr()
-        trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
-                 transform_id=self.sa.ike_crypto, length=tr_attr[1],
-                 key_length=tr_attr[0]) /
-                 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
-                 transform_id=self.sa.ike_integ) /
-                 ikev2.IKEv2_payload_Transform(transform_type='PRF',
-                 transform_id=self.sa.ike_prf_alg.name) /
-                 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
-                 transform_id=self.sa.ike_dh))
-        props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
-                 trans_nb=4, trans=trans))
+        trans = (
+            ikev2.IKEv2_payload_Transform(
+                transform_type="Encryption",
+                transform_id=self.sa.ike_crypto,
+                length=tr_attr[1],
+                key_length=tr_attr[0],
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Integrity", transform_id=self.sa.ike_integ
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="GroupDesc", transform_id=self.sa.ike_dh
+            )
+        )
+        props = ikev2.IKEv2_payload_Proposal(
+            proposal=1, proto="IKEv2", trans_nb=4, trans=trans
+        )
 
         src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
         if self.sa.natt:
-            dst_address = b'\x0a\x0a\x0a\x0a'
+            dst_address = b"\x0a\x0a\x0a\x0a"
         else:
             dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
         dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
 
         self.sa.init_resp_packet = (
-            ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                        exch_type='IKE_SA_INIT', flags='Response') /
-            ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
-            ikev2.IKEv2_payload_KE(next_payload='Nonce',
-                                   group=self.sa.ike_dh,
-                                   load=self.sa.my_dh_pub_key) /
-            ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
-                                      next_payload='Notify') /
-            ikev2.IKEv2_payload_Notify(
-                    type='NAT_DETECTION_SOURCE_IP', load=src_nat,
-                    next_payload='Notify') / ikev2.IKEv2_payload_Notify(
-                    type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
-
-        ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
-                                     self.sa.sport, self.sa.dport,
-                                     False, self.ip6)
+            ikev2.IKEv2(
+                init_SPI=self.sa.ispi,
+                resp_SPI=self.sa.rspi,
+                exch_type="IKE_SA_INIT",
+                flags="Response",
+            )
+            / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
+            / ikev2.IKEv2_payload_KE(
+                next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
+            )
+            / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
+            / ikev2.IKEv2_payload_Notify(
+                type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
+            )
+            / ikev2.IKEv2_payload_Notify(
+                type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
+            )
+        )
+
+        ike_msg = self.create_packet(
+            self.pg0,
+            self.sa.init_resp_packet,
+            self.sa.sport,
+            self.sa.dport,
+            False,
+            self.ip6,
+        )
         self.pg_send(self.pg0, ike_msg)
         capture = self.pg0.get_capture(1)
         self.verify_sa_auth_req(capture[0])
@@ -1070,47 +1143,64 @@ class TemplateInitiator(IkePeer):
 
     def send_auth_response(self):
         tr_attr = self.sa.esp_crypto_attr()
-        trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
-                 transform_id=self.sa.esp_crypto, length=tr_attr[1],
-                 key_length=tr_attr[0]) /
-                 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
-                 transform_id=self.sa.esp_integ) /
-                 ikev2.IKEv2_payload_Transform(
-                 transform_type='Extended Sequence Number',
-                 transform_id='No ESN') /
-                 ikev2.IKEv2_payload_Transform(
-                 transform_type='Extended Sequence Number',
-                 transform_id='ESN'))
+        trans = (
+            ikev2.IKEv2_payload_Transform(
+                transform_type="Encryption",
+                transform_id=self.sa.esp_crypto,
+                length=tr_attr[1],
+                key_length=tr_attr[0],
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Integrity", transform_id=self.sa.esp_integ
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Extended Sequence Number", transform_id="No ESN"
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Extended Sequence Number", transform_id="ESN"
+            )
+        )
 
         c = self.sa.child_sas[0]
-        props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
-                 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
+        props = ikev2.IKEv2_payload_Proposal(
+            proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
+        )
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
-        plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
-                 IDtype=self.sa.id_type, load=self.sa.i_id) /
-                 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
-                 IDtype=self.sa.id_type, load=self.sa.r_id) /
-                 ikev2.IKEv2_payload_AUTH(next_payload='SA',
-                 auth_type=AuthMethod.value(self.sa.auth_method),
-                 load=self.sa.auth_data) /
-                 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
-                 ikev2.IKEv2_payload_TSi(next_payload='TSr',
-                 number_of_TSs=len(tsi),
-                 traffic_selector=tsi) /
-                 ikev2.IKEv2_payload_TSr(next_payload='Notify',
-                 number_of_TSs=len(tsr),
-                 traffic_selector=tsr) /
-                 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+        plain = (
+            ikev2.IKEv2_payload_IDi(
+                next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
+            )
+            / ikev2.IKEv2_payload_IDr(
+                next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
+            )
+            / ikev2.IKEv2_payload_AUTH(
+                next_payload="SA",
+                auth_type=AuthMethod.value(self.sa.auth_method),
+                load=self.sa.auth_data,
+            )
+            / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
+            / ikev2.IKEv2_payload_TSi(
+                next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
+            )
+            / ikev2.IKEv2_payload_TSr(
+                next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
+            )
+            / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
+        )
 
         header = ikev2.IKEv2(
-                init_SPI=self.sa.ispi,
-                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
-                flags='Response', exch_type='IKE_AUTH')
-
-        ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
-        packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
-                                    self.sa.dport, self.sa.natt, self.ip6)
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            id=self.sa.new_msg_id(),
+            flags="Response",
+            exch_type="IKE_AUTH",
+        )
+
+        ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
+        packet = self.create_packet(
+            self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
         self.pg_send(self.pg0, packet)
 
     def test_initiator(self):
@@ -1122,51 +1212,58 @@ class TemplateInitiator(IkePeer):
 
 
 class TemplateResponder(IkePeer):
-    """ responder test template """
+    """responder test template"""
 
     def initiate_del_sa_from_responder(self):
         self.pg0.enable_capture()
         self.pg_start()
-        self.vapi.ikev2_initiate_del_ike_sa(
-                ispi=int.from_bytes(self.sa.ispi, 'little'))
+        self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
         capture = self.pg0.get_capture(1)
         ih = self.get_ike_header(capture[0])
-        self.assertNotIn('Response', ih.flags)
-        self.assertNotIn('Initiator', ih.flags)
+        self.assertNotIn("Response", ih.flags)
+        self.assertNotIn("Initiator", ih.flags)
         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
         plain = self.sa.hmac_and_decrypt(ih)
         d = ikev2.IKEv2_payload_Delete(plain)
         self.assertEqual(d.proto, 1)  # proto=IKEv2
         self.assertEqual(ih.init_SPI, self.sa.ispi)
         self.assertEqual(ih.resp_SPI, self.sa.rspi)
-        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                             flags='Initiator+Response',
-                             exch_type='INFORMATIONAL',
-                             id=ih.id, next_payload='Encrypted')
-        resp = self.encrypt_ike_msg(header, b'', None)
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            flags="Initiator+Response",
+            exch_type="INFORMATIONAL",
+            id=ih.id,
+            next_payload="Encrypted",
+        )
+        resp = self.encrypt_ike_msg(header, b"", None)
         self.send_and_assert_no_replies(self.pg0, resp)
 
     def verify_del_sa(self, packet):
         ih = self.get_ike_header(packet)
         self.assertEqual(ih.id, self.sa.msg_id)
         self.assertEqual(ih.exch_type, 37)  # exchange informational
-        self.assertIn('Response', ih.flags)
-        self.assertNotIn('Initiator', ih.flags)
+        self.assertIn("Response", ih.flags)
+        self.assertNotIn("Initiator", ih.flags)
         self.assertEqual(ih.next_payload, 46)  # Encrypted
         self.assertEqual(ih.init_SPI, self.sa.ispi)
         self.assertEqual(ih.resp_SPI, self.sa.rspi)
         plain = self.sa.hmac_and_decrypt(ih)
-        self.assertEqual(plain, b'')
+        self.assertEqual(plain, b"")
 
     def initiate_del_sa_from_initiator(self):
-        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                             flags='Initiator', exch_type='INFORMATIONAL',
-                             id=self.sa.new_msg_id())
-        del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
-        ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
-        packet = self.create_packet(self.pg0, ike_msg,
-                                    self.sa.sport, self.sa.dport,
-                                    self.sa.natt, self.ip6)
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            flags="Initiator",
+            exch_type="INFORMATIONAL",
+            id=self.sa.new_msg_id(),
+        )
+        del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
+        ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
+        packet = self.create_packet(
+            self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
@@ -1175,56 +1272,72 @@ class TemplateResponder(IkePeer):
 
     def send_sa_init_req(self):
         tr_attr = self.sa.ike_crypto_attr()
-        trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
-                 transform_id=self.sa.ike_crypto, length=tr_attr[1],
-                 key_length=tr_attr[0]) /
-                 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
-                 transform_id=self.sa.ike_integ) /
-                 ikev2.IKEv2_payload_Transform(transform_type='PRF',
-                 transform_id=self.sa.ike_prf_alg.name) /
-                 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
-                 transform_id=self.sa.ike_dh))
-
-        props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
-                 trans_nb=4, trans=trans))
-
-        next_payload = None if self.ip6 else 'Notify'
+        trans = (
+            ikev2.IKEv2_payload_Transform(
+                transform_type="Encryption",
+                transform_id=self.sa.ike_crypto,
+                length=tr_attr[1],
+                key_length=tr_attr[0],
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Integrity", transform_id=self.sa.ike_integ
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="GroupDesc", transform_id=self.sa.ike_dh
+            )
+        )
+
+        props = ikev2.IKEv2_payload_Proposal(
+            proposal=1, proto="IKEv2", trans_nb=4, trans=trans
+        )
+
+        next_payload = None if self.ip6 else "Notify"
 
         self.sa.init_req_packet = (
-                ikev2.IKEv2(init_SPI=self.sa.ispi,
-                            flags='Initiator', exch_type='IKE_SA_INIT') /
-                ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
-                ikev2.IKEv2_payload_KE(next_payload='Nonce',
-                                       group=self.sa.ike_dh,
-                                       load=self.sa.my_dh_pub_key) /
-                ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
-                                          load=self.sa.i_nonce))
+            ikev2.IKEv2(
+                init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
+            )
+            / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
+            / ikev2.IKEv2_payload_KE(
+                next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
+            )
+            / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
+        )
 
         if not self.ip6:
             if self.sa.i_natt:
-                src_address = b'\x0a\x0a\x0a\x01'
+                src_address = b"\x0a\x0a\x0a\x01"
             else:
                 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
 
             if self.sa.r_natt:
-                dst_address = b'\x0a\x0a\x0a\x0a'
+                dst_address = b"\x0a\x0a\x0a\x0a"
             else:
                 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
 
             src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
             dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
             nat_src_detection = ikev2.IKEv2_payload_Notify(
-                    type='NAT_DETECTION_SOURCE_IP', load=src_nat,
-                    next_payload='Notify')
+                type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
+            )
             nat_dst_detection = ikev2.IKEv2_payload_Notify(
-                    type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
-            self.sa.init_req_packet = (self.sa.init_req_packet /
-                                       nat_src_detection /
-                                       nat_dst_detection)
-
-        ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
-                                     self.sa.sport, self.sa.dport,
-                                     self.sa.natt, self.ip6)
+                type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
+            )
+            self.sa.init_req_packet = (
+                self.sa.init_req_packet / nat_src_detection / nat_dst_detection
+            )
+
+        ike_msg = self.create_packet(
+            self.pg0,
+            self.sa.init_req_packet,
+            self.sa.sport,
+            self.sa.dport,
+            self.sa.natt,
+            self.ip6,
+        )
         self.pg0.add_stream(ike_msg)
         self.pg0.enable_capture()
         self.pg_start()
@@ -1233,65 +1346,83 @@ class TemplateResponder(IkePeer):
 
     def generate_auth_payload(self, last_payload=None, is_rekey=False):
         tr_attr = self.sa.esp_crypto_attr()
-        last_payload = last_payload or 'Notify'
-        trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
-                 transform_id=self.sa.esp_crypto, length=tr_attr[1],
-                 key_length=tr_attr[0]) /
-                 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
-                 transform_id=self.sa.esp_integ) /
-                 ikev2.IKEv2_payload_Transform(
-                 transform_type='Extended Sequence Number',
-                 transform_id='No ESN') /
-                 ikev2.IKEv2_payload_Transform(
-                 transform_type='Extended Sequence Number',
-                 transform_id='ESN'))
+        last_payload = last_payload or "Notify"
+        trans = (
+            ikev2.IKEv2_payload_Transform(
+                transform_type="Encryption",
+                transform_id=self.sa.esp_crypto,
+                length=tr_attr[1],
+                key_length=tr_attr[0],
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Integrity", transform_id=self.sa.esp_integ
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Extended Sequence Number", transform_id="No ESN"
+            )
+            / ikev2.IKEv2_payload_Transform(
+                transform_type="Extended Sequence Number", transform_id="ESN"
+            )
+        )
 
         c = self.sa.child_sas[0]
-        props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
-                 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
+        props = ikev2.IKEv2_payload_Proposal(
+            proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
+        )
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
-        plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
-                 auth_type=AuthMethod.value(self.sa.auth_method),
-                 load=self.sa.auth_data) /
-                 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
-                 ikev2.IKEv2_payload_TSi(next_payload='TSr',
-                 number_of_TSs=len(tsi), traffic_selector=tsi) /
-                 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
-                 number_of_TSs=len(tsr), traffic_selector=tsr))
+        plain = (
+            ikev2.IKEv2_payload_AUTH(
+                next_payload="SA",
+                auth_type=AuthMethod.value(self.sa.auth_method),
+                load=self.sa.auth_data,
+            )
+            / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
+            / ikev2.IKEv2_payload_TSi(
+                next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
+            )
+            / ikev2.IKEv2_payload_TSr(
+                next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
+            )
+        )
 
         if is_rekey:
-            first_payload = 'Nonce'
-            plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
-                     next_payload='SA') / plain /
-                     ikev2.IKEv2_payload_Notify(type='REKEY_SA',
-                     proto='ESP', SPI=c.ispi))
+            first_payload = "Nonce"
+            plain = (
+                ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
+                / plain
+                / ikev2.IKEv2_payload_Notify(type="REKEY_SA", proto="ESP", SPI=c.ispi)
+            )
         else:
-            first_payload = 'IDi'
+            first_payload = "IDi"
             if self.no_idr_auth:
-                ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH',
-                                              IDtype=self.sa.id_type,
-                                              load=self.sa.i_id)
+                ids = ikev2.IKEv2_payload_IDi(
+                    next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
+                )
             else:
-                ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
-                       IDtype=self.sa.id_type, load=self.sa.i_id) /
-                       ikev2.IKEv2_payload_IDr(next_payload='AUTH',
-                       IDtype=self.sa.id_type, load=self.sa.r_id))
+                ids = ikev2.IKEv2_payload_IDi(
+                    next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
+                ) / ikev2.IKEv2_payload_IDr(
+                    next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
+                )
             plain = ids / plain
         return plain, first_payload
 
     def send_sa_auth(self):
-        plain, first_payload = self.generate_auth_payload(
-                    last_payload='Notify')
-        plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
+        plain, first_payload = self.generate_auth_payload(last_payload="Notify")
+        plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
         header = ikev2.IKEv2(
-                init_SPI=self.sa.ispi,
-                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
-                flags='Initiator', exch_type='IKE_AUTH')
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            id=self.sa.new_msg_id(),
+            flags="Initiator",
+            exch_type="IKE_AUTH",
+        )
 
         ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
-        packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
-                                    self.sa.dport, self.sa.natt, self.ip6)
+        packet = self.create_packet(
+            self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
@@ -1303,7 +1434,7 @@ class TemplateResponder(IkePeer):
 
         self.assertEqual(ih.id, self.sa.msg_id)
         self.assertEqual(ih.exch_type, 34)
-        self.assertIn('Response', ih.flags)
+        self.assertIn("Response", ih.flags)
         self.assertEqual(ih.init_SPI, self.sa.ispi)
         self.assertNotEqual(ih.resp_SPI, 0)
         self.sa.rspi = ih.resp_SPI
@@ -1331,12 +1462,12 @@ class TemplateResponder(IkePeer):
         self.sa.child_sas[0].rspi = prop.SPI
         self.sa.calc_child_keys()
 
-    IKE_NODE_SUFFIX = 'ip4'
+    IKE_NODE_SUFFIX = "ip4"
 
     def verify_counters(self):
-        self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
-        self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
-        self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
+        self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
+        self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
+        self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
 
         r = self.vapi.ikev2_sa_dump()
         s = r[0].sa.stats
@@ -1356,59 +1487,60 @@ class Ikev2Params(object):
         ec = VppEnum.vl_api_ipsec_crypto_alg_t
         ei = VppEnum.vl_api_ipsec_integ_alg_t
         self.vpp_enums = {
-                'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
-                'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
-                'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
-                'AES-GCM-16ICV-128':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
-                'AES-GCM-16ICV-192':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
-                'AES-GCM-16ICV-256':  ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
-
-                'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
-                'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
-                'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
-                'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
-
-        dpd_disabled = True if 'dpd_disabled' not in params else\
-            params['dpd_disabled']
+            "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
+            "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
+            "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
+            "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
+            "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
+            "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
+            "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
+            "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
+            "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
+            "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
+        }
+
+        dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
         if dpd_disabled:
-            self.vapi.cli('ikev2 dpd disable')
-        self.del_sa_from_responder = False if 'del_sa_from_responder'\
-            not in params else params['del_sa_from_responder']
-        i_natt = False if 'i_natt' not in params else params['i_natt']
-        r_natt = False if 'r_natt' not in params else params['r_natt']
-        self.p = Profile(self, 'pr1')
-        self.ip6 = False if 'ip6' not in params else params['ip6']
-
-        if 'auth' in params and params['auth'] == 'rsa-sig':
-            auth_method = 'rsa-sig'
+            self.vapi.cli("ikev2 dpd disable")
+        self.del_sa_from_responder = (
+            False
+            if "del_sa_from_responder" not in params
+            else params["del_sa_from_responder"]
+        )
+        i_natt = False if "i_natt" not in params else params["i_natt"]
+        r_natt = False if "r_natt" not in params else params["r_natt"]
+        self.p = Profile(self, "pr1")
+        self.ip6 = False if "ip6" not in params else params["ip6"]
+
+        if "auth" in params and params["auth"] == "rsa-sig":
+            auth_method = "rsa-sig"
             work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
-            self.vapi.ikev2_set_local_key(
-                    key_file=work_dir + params['server-key'])
-
-            client_file = work_dir + params['client-cert']
-            server_pem = open(work_dir + params['server-cert']).read()
-            client_priv = open(work_dir + params['client-key']).read()
-            client_priv = load_pem_private_key(str.encode(client_priv), None,
-                                               default_backend())
+            self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
+
+            client_file = work_dir + params["client-cert"]
+            server_pem = open(work_dir + params["server-cert"]).read()
+            client_priv = open(work_dir + params["client-key"]).read()
+            client_priv = load_pem_private_key(
+                str.encode(client_priv), None, default_backend()
+            )
             self.peer_cert = x509.load_pem_x509_certificate(
-                    str.encode(server_pem),
-                    default_backend())
-            self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
+                str.encode(server_pem), default_backend()
+            )
+            self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
             auth_data = None
         else:
-            auth_data = b'$3cr3tpa$$w0rd'
-            self.p.add_auth(method='shared-key', data=auth_data)
-            auth_method = 'shared-key'
+            auth_data = b"$3cr3tpa$$w0rd"
+            self.p.add_auth(method="shared-key", data=auth_data)
+            auth_method = "shared-key"
             client_priv = None
 
-        is_init = True if 'is_initiator' not in params else\
-            params['is_initiator']
-        self.no_idr_auth = params.get('no_idr_in_auth', False)
+        is_init = True if "is_initiator" not in params else params["is_initiator"]
+        self.no_idr_auth = params.get("no_idr_in_auth", False)
 
-        idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
-        idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
-        r_id = self.idr = idr['data']
-        i_id = self.idi = idi['data']
+        idr = {"id_type": "fqdn", "data": b"vpp.home"}
+        idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
+        r_id = self.idr = idr["data"]
+        i_id = self.idi = idi["data"]
         if is_init:
             # scapy is initiator, VPP is responder
             self.p.add_local_id(**idr)
@@ -1421,70 +1553,90 @@ class Ikev2Params(object):
             if not self.no_idr_auth:
                 self.p.add_remote_id(**idr)
 
-        loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
-            'loc_ts' not in params else params['loc_ts']
-        rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
-            'rem_ts' not in params else params['rem_ts']
+        loc_ts = (
+            {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
+            if "loc_ts" not in params
+            else params["loc_ts"]
+        )
+        rem_ts = (
+            {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
+            if "rem_ts" not in params
+            else params["rem_ts"]
+        )
         self.p.add_local_ts(**loc_ts)
         self.p.add_remote_ts(**rem_ts)
-        if 'responder' in params:
-            self.p.add_responder(params['responder'])
-        if 'ike_transforms' in params:
-            self.p.add_ike_transforms(params['ike_transforms'])
-        if 'esp_transforms' in params:
-            self.p.add_esp_transforms(params['esp_transforms'])
-
-        udp_encap = False if 'udp_encap' not in params else\
-            params['udp_encap']
+        if "responder" in params:
+            self.p.add_responder(params["responder"])
+        if "ike_transforms" in params:
+            self.p.add_ike_transforms(params["ike_transforms"])
+        if "esp_transforms" in params:
+            self.p.add_esp_transforms(params["esp_transforms"])
+
+        udp_encap = False if "udp_encap" not in params else params["udp_encap"]
         if udp_encap:
             self.p.set_udp_encap(True)
 
-        if 'responder_hostname' in params:
-            hn = params['responder_hostname']
+        if "responder_hostname" in params:
+            hn = params["responder_hostname"]
             self.p.add_responder_hostname(hn)
 
             # configure static dns record
             self.vapi.dns_name_server_add_del(
-                is_ip6=0, is_add=1,
-                server_address=IPv4Address(u'8.8.8.8').packed)
+                is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
+            )
             self.vapi.dns_enable_disable(enable=1)
 
-            cmd = "dns cache add {} {}".format(hn['hostname'],
-                                               self.pg0.remote_ip4)
+            cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
             self.vapi.cli(cmd)
 
-        self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id,
-                          is_initiator=is_init,
-                          id_type=self.p.local_id['id_type'],
-                          i_natt=i_natt, r_natt=r_natt,
-                          priv_key=client_priv, auth_method=auth_method,
-                          nonce=params.get('nonce'),
-                          auth_data=auth_data, udp_encap=udp_encap,
-                          local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
+        self.sa = IKEv2SA(
+            self,
+            i_id=i_id,
+            r_id=r_id,
+            is_initiator=is_init,
+            id_type=self.p.local_id["id_type"],
+            i_natt=i_natt,
+            r_natt=r_natt,
+            priv_key=client_priv,
+            auth_method=auth_method,
+            nonce=params.get("nonce"),
+            auth_data=auth_data,
+            udp_encap=udp_encap,
+            local_ts=self.p.remote_ts,
+            remote_ts=self.p.local_ts,
+        )
 
         if is_init:
-            ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
-                params['ike-crypto']
-            ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
-                params['ike-integ']
-            ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
-                params['ike-dh']
-
-            esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
-                params['esp-crypto']
-            esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
-                params['esp-integ']
+            ike_crypto = (
+                ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
+            )
+            ike_integ = (
+                "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
+            )
+            ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
+
+            esp_crypto = (
+                ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
+            )
+            esp_integ = (
+                "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
+            )
 
             self.sa.set_ike_props(
-                    crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
-                    integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
+                crypto=ike_crypto[0],
+                crypto_key_len=ike_crypto[1],
+                integ=ike_integ,
+                prf="PRF_HMAC_SHA2_256",
+                dh=ike_dh,
+            )
             self.sa.set_esp_props(
-                    crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
-                    integ=esp_integ)
+                crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
+            )
 
 
 class TestApi(VppTestCase):
-    """ Test IKEV2 API """
+    """Test IKEV2 API"""
+
     @classmethod
     def setUpClass(cls):
         super(TestApi, cls).setUpClass()
@@ -1501,241 +1653,249 @@ class TestApi(VppTestCase):
         self.assertEqual(len(r), 0)
 
     def configure_profile(self, cfg):
-        p = Profile(self, cfg['name'])
-        p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
-        p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
-        p.add_local_ts(**cfg['loc_ts'])
-        p.add_remote_ts(**cfg['rem_ts'])
-        p.add_responder(cfg['responder'])
-        p.add_ike_transforms(cfg['ike_ts'])
-        p.add_esp_transforms(cfg['esp_ts'])
-        p.add_auth(**cfg['auth'])
-        p.set_udp_encap(cfg['udp_encap'])
-        p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
-        if 'lifetime_data' in cfg:
-            p.set_lifetime_data(cfg['lifetime_data'])
-        if 'tun_itf' in cfg:
-            p.set_tunnel_interface(cfg['tun_itf'])
-        if 'natt_disabled' in cfg and cfg['natt_disabled']:
+        p = Profile(self, cfg["name"])
+        p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
+        p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
+        p.add_local_ts(**cfg["loc_ts"])
+        p.add_remote_ts(**cfg["rem_ts"])
+        p.add_responder(cfg["responder"])
+        p.add_ike_transforms(cfg["ike_ts"])
+        p.add_esp_transforms(cfg["esp_ts"])
+        p.add_auth(**cfg["auth"])
+        p.set_udp_encap(cfg["udp_encap"])
+        p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
+        if "lifetime_data" in cfg:
+            p.set_lifetime_data(cfg["lifetime_data"])
+        if "tun_itf" in cfg:
+            p.set_tunnel_interface(cfg["tun_itf"])
+        if "natt_disabled" in cfg and cfg["natt_disabled"]:
             p.disable_natt()
         p.add_vpp_config()
         return p
 
     def test_profile_api(self):
-        """ test profile dump API """
+        """test profile dump API"""
         loc_ts4 = {
-                    'proto': 8,
-                    'start_port': 1,
-                    'end_port': 19,
-                    'start_addr': '3.3.3.2',
-                    'end_addr': '3.3.3.3',
-                }
+            "proto": 8,
+            "start_port": 1,
+            "end_port": 19,
+            "start_addr": "3.3.3.2",
+            "end_addr": "3.3.3.3",
+        }
         rem_ts4 = {
-                    'proto': 9,
-                    'start_port': 10,
-                    'end_port': 119,
-                    'start_addr': '4.5.76.80',
-                    'end_addr': '2.3.4.6',
-                }
+            "proto": 9,
+            "start_port": 10,
+            "end_port": 119,
+            "start_addr": "4.5.76.80",
+            "end_addr": "2.3.4.6",
+        }
 
         loc_ts6 = {
-                    'proto': 8,
-                    'start_port': 1,
-                    'end_port': 19,
-                    'start_addr': 'ab::1',
-                    'end_addr': 'ab::4',
-                }
+            "proto": 8,
+            "start_port": 1,
+            "end_port": 19,
+            "start_addr": "ab::1",
+            "end_addr": "ab::4",
+        }
         rem_ts6 = {
-                    'proto': 9,
-                    'start_port': 10,
-                    'end_port': 119,
-                    'start_addr': 'cd::12',
-                    'end_addr': 'cd::13',
-                }
+            "proto": 9,
+            "start_port": 10,
+            "end_port": 119,
+            "start_addr": "cd::12",
+            "end_addr": "cd::13",
+        }
 
         conf = {
-            'p1': {
-                'name': 'p1',
-                'natt_disabled': True,
-                'loc_id': ('fqdn', b'vpp.home'),
-                'rem_id': ('fqdn', b'roadwarrior.example.com'),
-                'loc_ts': loc_ts4,
-                'rem_ts': rem_ts4,
-                'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
-                'ike_ts': {
-                        'crypto_alg': 20,
-                        'crypto_key_size': 32,
-                        'integ_alg': 0,
-                        'dh_group': 1},
-                'esp_ts': {
-                        'crypto_alg': 13,
-                        'crypto_key_size': 24,
-                        'integ_alg': 2},
-                'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
-                'udp_encap': True,
-                'ipsec_over_udp_port': 4501,
-                'lifetime_data': {
-                    'lifetime': 123,
-                    'lifetime_maxdata': 20192,
-                    'lifetime_jitter': 9,
-                    'handover': 132},
+            "p1": {
+                "name": "p1",
+                "natt_disabled": True,
+                "loc_id": ("fqdn", b"vpp.home"),
+                "rem_id": ("fqdn", b"roadwarrior.example.com"),
+                "loc_ts": loc_ts4,
+                "rem_ts": rem_ts4,
+                "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
+                "ike_ts": {
+                    "crypto_alg": 20,
+                    "crypto_key_size": 32,
+                    "integ_alg": 0,
+                    "dh_group": 1,
+                },
+                "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
+                "auth": {"method": "shared-key", "data": b"sharedkeydata"},
+                "udp_encap": True,
+                "ipsec_over_udp_port": 4501,
+                "lifetime_data": {
+                    "lifetime": 123,
+                    "lifetime_maxdata": 20192,
+                    "lifetime_jitter": 9,
+                    "handover": 132,
+                },
+            },
+            "p2": {
+                "name": "p2",
+                "loc_id": ("ip4-addr", b"192.168.2.1"),
+                "rem_id": ("ip6-addr", b"abcd::1"),
+                "loc_ts": loc_ts6,
+                "rem_ts": rem_ts6,
+                "responder": {"sw_if_index": 4, "addr": "def::10"},
+                "ike_ts": {
+                    "crypto_alg": 12,
+                    "crypto_key_size": 16,
+                    "integ_alg": 3,
+                    "dh_group": 3,
+                },
+                "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
+                "auth": {"method": "shared-key", "data": b"sharedkeydata"},
+                "udp_encap": False,
+                "ipsec_over_udp_port": 4600,
+                "tun_itf": 0,
             },
-            'p2': {
-                'name': 'p2',
-                'loc_id': ('ip4-addr', b'192.168.2.1'),
-                'rem_id': ('ip6-addr', b'abcd::1'),
-                'loc_ts': loc_ts6,
-                'rem_ts': rem_ts6,
-                'responder': {'sw_if_index': 4, 'addr': 'def::10'},
-                'ike_ts': {
-                        'crypto_alg': 12,
-                        'crypto_key_size': 16,
-                        'integ_alg': 3,
-                        'dh_group': 3},
-                'esp_ts': {
-                        'crypto_alg': 9,
-                        'crypto_key_size': 24,
-                        'integ_alg': 4},
-                'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
-                'udp_encap': False,
-                'ipsec_over_udp_port': 4600,
-                'tun_itf': 0}
         }
-        self.p1 = self.configure_profile(conf['p1'])
-        self.p2 = self.configure_profile(conf['p2'])
+        self.p1 = self.configure_profile(conf["p1"])
+        self.p2 = self.configure_profile(conf["p2"])
 
         r = self.vapi.ikev2_profile_dump()
         self.assertEqual(len(r), 2)
-        self.verify_profile(r[0].profile, conf['p1'])
-        self.verify_profile(r[1].profile, conf['p2'])
+        self.verify_profile(r[0].profile, conf["p1"])
+        self.verify_profile(r[1].profile, conf["p2"])
 
     def verify_id(self, api_id, cfg_id):
         self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
-        self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
+        self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
 
     def verify_ts(self, api_ts, cfg_ts):
-        self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
-        self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
-        self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
-        self.assertEqual(api_ts.start_addr,
-                         ip_address(text_type(cfg_ts['start_addr'])))
-        self.assertEqual(api_ts.end_addr,
-                         ip_address(text_type(cfg_ts['end_addr'])))
+        self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
+        self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
+        self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
+        self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
+        self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
 
     def verify_responder(self, api_r, cfg_r):
-        self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
-        self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
+        self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
+        self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
 
     def verify_transforms(self, api_ts, cfg_ts):
-        self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
-        self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
-        self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
+        self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
+        self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
+        self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
 
     def verify_ike_transforms(self, api_ts, cfg_ts):
         self.verify_transforms(api_ts, cfg_ts)
-        self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
+        self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
 
     def verify_esp_transforms(self, api_ts, cfg_ts):
         self.verify_transforms(api_ts, cfg_ts)
 
     def verify_auth(self, api_auth, cfg_auth):
-        self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
-        self.assertEqual(api_auth.data, cfg_auth['data'])
-        self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
+        self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
+        self.assertEqual(api_auth.data, cfg_auth["data"])
+        self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
 
     def verify_lifetime_data(self, p, ld):
-        self.assertEqual(p.lifetime, ld['lifetime'])
-        self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
-        self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
-        self.assertEqual(p.handover, ld['handover'])
+        self.assertEqual(p.lifetime, ld["lifetime"])
+        self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
+        self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
+        self.assertEqual(p.handover, ld["handover"])
 
     def verify_profile(self, ap, cp):
-        self.assertEqual(ap.name, cp['name'])
-        self.assertEqual(ap.udp_encap, cp['udp_encap'])
-        self.verify_id(ap.loc_id, cp['loc_id'])
-        self.verify_id(ap.rem_id, cp['rem_id'])
-        self.verify_ts(ap.loc_ts, cp['loc_ts'])
-        self.verify_ts(ap.rem_ts, cp['rem_ts'])
-        self.verify_responder(ap.responder, cp['responder'])
-        self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
-        self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
-        self.verify_auth(ap.auth, cp['auth'])
-        natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
+        self.assertEqual(ap.name, cp["name"])
+        self.assertEqual(ap.udp_encap, cp["udp_encap"])
+        self.verify_id(ap.loc_id, cp["loc_id"])
+        self.verify_id(ap.rem_id, cp["rem_id"])
+        self.verify_ts(ap.loc_ts, cp["loc_ts"])
+        self.verify_ts(ap.rem_ts, cp["rem_ts"])
+        self.verify_responder(ap.responder, cp["responder"])
+        self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
+        self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
+        self.verify_auth(ap.auth, cp["auth"])
+        natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
         self.assertTrue(natt_dis == ap.natt_disabled)
 
-        if 'lifetime_data' in cp:
-            self.verify_lifetime_data(ap, cp['lifetime_data'])
-        self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
-        if 'tun_itf' in cp:
-            self.assertEqual(ap.tun_itf, cp['tun_itf'])
+        if "lifetime_data" in cp:
+            self.verify_lifetime_data(ap, cp["lifetime_data"])
+        self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
+        if "tun_itf" in cp:
+            self.assertEqual(ap.tun_itf, cp["tun_itf"])
         else:
-            self.assertEqual(ap.tun_itf, 0xffffffff)
+            self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
 
 
 @tag_fixme_vpp_workers
 class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
-    """ test responder - responder behind NAT """
+    """test responder - responder behind NAT"""
 
-    IKE_NODE_SUFFIX = 'ip4-natt'
+    IKE_NODE_SUFFIX = "ip4-natt"
 
     def config_tc(self):
-        self.config_params({'r_natt': True})
+        self.config_params({"r_natt": True})
 
 
 @tag_fixme_vpp_workers
 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
-    """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
+    """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
 
     def config_tc(self):
-        self.config_params({
-            'i_natt': True,
-            'is_initiator': False,  # seen from test case perspective
-                                    # thus vpp is initiator
-            'responder': {'sw_if_index': self.pg0.sw_if_index,
-                           'addr': self.pg0.remote_ip4},
-            'ike-crypto': ('AES-GCM-16ICV', 32),
-            'ike-integ': 'NULL',
-            'ike-dh': '3072MODPgr',
-            'ike_transforms': {
-                'crypto_alg': 20,  # "aes-gcm-16"
-                'crypto_key_size': 256,
-                'dh_group': 15,  # "modp-3072"
-            },
-            'esp_transforms': {
-                'crypto_alg': 12,  # "aes-cbc"
-                'crypto_key_size': 256,
-                # "hmac-sha2-256-128"
-                'integ_alg': 12}})
+        self.config_params(
+            {
+                "i_natt": True,
+                "is_initiator": False,  # seen from test case perspective
+                # thus vpp is initiator
+                "responder": {
+                    "sw_if_index": self.pg0.sw_if_index,
+                    "addr": self.pg0.remote_ip4,
+                },
+                "ike-crypto": ("AES-GCM-16ICV", 32),
+                "ike-integ": "NULL",
+                "ike-dh": "3072MODPgr",
+                "ike_transforms": {
+                    "crypto_alg": 20,  # "aes-gcm-16"
+                    "crypto_key_size": 256,
+                    "dh_group": 15,  # "modp-3072"
+                },
+                "esp_transforms": {
+                    "crypto_alg": 12,  # "aes-cbc"
+                    "crypto_key_size": 256,
+                    # "hmac-sha2-256-128"
+                    "integ_alg": 12,
+                },
+            }
+        )
 
 
 @tag_fixme_vpp_workers
 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
-    """ test ikev2 initiator - pre shared key auth """
+    """test ikev2 initiator - pre shared key auth"""
 
     def config_tc(self):
-        self.config_params({
-            'is_initiator': False,  # seen from test case perspective
-                                    # thus vpp is initiator
-            'ike-crypto': ('AES-GCM-16ICV', 32),
-            'ike-integ': 'NULL',
-            'ike-dh': '3072MODPgr',
-            'ike_transforms': {
-                'crypto_alg': 20,  # "aes-gcm-16"
-                'crypto_key_size': 256,
-                'dh_group': 15,  # "modp-3072"
-            },
-            'esp_transforms': {
-                'crypto_alg': 12,  # "aes-cbc"
-                'crypto_key_size': 256,
-                # "hmac-sha2-256-128"
-                'integ_alg': 12},
-            'responder_hostname': {'hostname': 'vpp.responder.org',
-                                   'sw_if_index': self.pg0.sw_if_index}})
+        self.config_params(
+            {
+                "is_initiator": False,  # seen from test case perspective
+                # thus vpp is initiator
+                "ike-crypto": ("AES-GCM-16ICV", 32),
+                "ike-integ": "NULL",
+                "ike-dh": "3072MODPgr",
+                "ike_transforms": {
+                    "crypto_alg": 20,  # "aes-gcm-16"
+                    "crypto_key_size": 256,
+                    "dh_group": 15,  # "modp-3072"
+                },
+                "esp_transforms": {
+                    "crypto_alg": 12,  # "aes-cbc"
+                    "crypto_key_size": 256,
+                    # "hmac-sha2-256-128"
+                    "integ_alg": 12,
+                },
+                "responder_hostname": {
+                    "hostname": "vpp.responder.org",
+                    "sw_if_index": self.pg0.sw_if_index,
+                },
+            }
+        )
 
 
 @tag_fixme_vpp_workers
 class TestInitiatorRequestWindowSize(TestInitiatorPsk):
-    """ test initiator - request window size (1) """
+    """test initiator - request window size (1)"""
 
     def rekey_respond(self, req, update_child_sa_data):
         ih = self.get_ike_header(req)
@@ -1749,19 +1909,25 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk):
             self.sa.child_sas[0].rspi = prop.SPI
             self.sa.calc_child_keys()
 
-        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                             flags='Response', exch_type=36,
-                             id=ih.id, next_payload='Encrypted')
-        resp = self.encrypt_ike_msg(header, sa, 'SA')
-        packet = self.create_packet(self.pg0, resp, self.sa.sport,
-                                    self.sa.dport, self.sa.natt, self.ip6)
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            flags="Response",
+            exch_type=36,
+            id=ih.id,
+            next_payload="Encrypted",
+        )
+        resp = self.encrypt_ike_msg(header, sa, "SA")
+        packet = self.create_packet(
+            self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
         self.send_and_assert_no_replies(self.pg0, packet)
 
     def test_initiator(self):
         super(TestInitiatorRequestWindowSize, self).test_initiator()
         self.pg0.enable_capture()
         self.pg_start()
-        ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+        ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
         capture = self.pg0.get_capture(2)
@@ -1777,18 +1943,18 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk):
 
 @tag_fixme_vpp_workers
 class TestInitiatorRekey(TestInitiatorPsk):
-    """ test ikev2 initiator - rekey """
+    """test ikev2 initiator - rekey"""
 
     def rekey_from_initiator(self):
-        ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+        ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
         self.pg0.enable_capture()
         self.pg_start()
         self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
         capture = self.pg0.get_capture(1)
         ih = self.get_ike_header(capture[0])
         self.assertEqual(ih.exch_type, 36)  # CHILD_SA
-        self.assertNotIn('Response', ih.flags)
-        self.assertIn('Initiator', ih.flags)
+        self.assertNotIn("Response", ih.flags)
+        self.assertIn("Initiator", ih.flags)
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
         prop = sa[ikev2.IKEv2_payload_Proposal]
@@ -1798,12 +1964,18 @@ class TestInitiatorRekey(TestInitiatorPsk):
         self.sa.child_sas[0].ispi = prop.SPI
         self.sa.child_sas[0].rspi = prop.SPI
         self.sa.calc_child_keys()
-        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
-                             flags='Response', exch_type=36,
-                             id=ih.id, next_payload='Encrypted')
-        resp = self.encrypt_ike_msg(header, sa, 'SA')
-        packet = self.create_packet(self.pg0, resp, self.sa.sport,
-                                    self.sa.dport, self.sa.natt, self.ip6)
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            flags="Response",
+            exch_type=36,
+            id=ih.id,
+            next_payload="Encrypted",
+        )
+        resp = self.encrypt_ike_msg(header, sa, "SA")
+        packet = self.create_packet(
+            self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
         self.send_and_assert_no_replies(self.pg0, packet)
 
     def test_initiator(self):
@@ -1815,45 +1987,51 @@ class TestInitiatorRekey(TestInitiatorPsk):
 
 @tag_fixme_vpp_workers
 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
-    """ test ikev2 initiator - delete IKE SA from responder """
+    """test ikev2 initiator - delete IKE SA from responder"""
 
     def config_tc(self):
-        self.config_params({
-            'del_sa_from_responder': True,
-            'is_initiator': False,  # seen from test case perspective
-                                    # thus vpp is initiator
-            'responder': {'sw_if_index': self.pg0.sw_if_index,
-                           'addr': self.pg0.remote_ip4},
-            'ike-crypto': ('AES-GCM-16ICV', 32),
-            'ike-integ': 'NULL',
-            'ike-dh': '3072MODPgr',
-            'ike_transforms': {
-                'crypto_alg': 20,  # "aes-gcm-16"
-                'crypto_key_size': 256,
-                'dh_group': 15,  # "modp-3072"
-            },
-            'esp_transforms': {
-                'crypto_alg': 12,  # "aes-cbc"
-                'crypto_key_size': 256,
-                # "hmac-sha2-256-128"
-                'integ_alg': 12},
-            'no_idr_in_auth': True})
+        self.config_params(
+            {
+                "del_sa_from_responder": True,
+                "is_initiator": False,  # seen from test case perspective
+                # thus vpp is initiator
+                "responder": {
+                    "sw_if_index": self.pg0.sw_if_index,
+                    "addr": self.pg0.remote_ip4,
+                },
+                "ike-crypto": ("AES-GCM-16ICV", 32),
+                "ike-integ": "NULL",
+                "ike-dh": "3072MODPgr",
+                "ike_transforms": {
+                    "crypto_alg": 20,  # "aes-gcm-16"
+                    "crypto_key_size": 256,
+                    "dh_group": 15,  # "modp-3072"
+                },
+                "esp_transforms": {
+                    "crypto_alg": 12,  # "aes-cbc"
+                    "crypto_key_size": 256,
+                    # "hmac-sha2-256-128"
+                    "integ_alg": 12,
+                },
+                "no_idr_in_auth": True,
+            }
+        )
 
 
 @tag_fixme_vpp_workers
 class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
-    """ test ikev2 responder - initiator behind NAT """
+    """test ikev2 responder - initiator behind NAT"""
 
-    IKE_NODE_SUFFIX = 'ip4-natt'
+    IKE_NODE_SUFFIX = "ip4-natt"
 
     def config_tc(self):
-        self.config_params(
-                {'i_natt': True})
+        self.config_params({"i_natt": True})
 
 
 @tag_fixme_vpp_workers
 class TestResponderPsk(TemplateResponder, Ikev2Params):
-    """ test ikev2 responder - pre shared key auth """
+    """test ikev2 responder - pre shared key auth"""
+
     def config_tc(self):
         self.config_params()
 
@@ -1863,8 +2041,9 @@ class TestResponderDpd(TestResponderPsk):
     """
     Dead peer detection test
     """
+
     def config_tc(self):
-        self.config_params({'dpd_disabled': False})
+        self.config_params({"dpd_disabled": False})
 
     def tearDown(self):
         pass
@@ -1879,7 +2058,7 @@ class TestResponderDpd(TestResponderPsk):
         ih = self.get_ike_header(capture[0])
         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
         plain = self.sa.hmac_and_decrypt(ih)
-        self.assertEqual(plain, b'')
+        self.assertEqual(plain, b"")
         # wait for SA expiration
         time.sleep(3)
         ike_sas = self.vapi.ikev2_sa_dump()
@@ -1890,7 +2069,7 @@ class TestResponderDpd(TestResponderPsk):
 
 @tag_fixme_vpp_workers
 class TestResponderRekey(TestResponderPsk):
-    """ test ikev2 responder - rekey """
+    """test ikev2 responder - rekey"""
 
     def rekey_from_initiator(self):
         packet = self.create_rekey_request()
@@ -1912,18 +2091,19 @@ class TestResponderRekey(TestResponderPsk):
         self.sa.calc_child_keys()
         self.verify_ike_sas()
         self.verify_ipsec_sas(is_rekey=True)
-        self.assert_counter(1, 'rekey_req', 'ip4')
+        self.assert_counter(1, "rekey_req", "ip4")
         r = self.vapi.ikev2_sa_dump()
         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
 
 
 class TestResponderVrf(TestResponderPsk, Ikev2Params):
-    """ test ikev2 responder - non-default table id """
+    """test ikev2 responder - non-default table id"""
 
     @classmethod
     def setUpClass(cls):
         import scapy.contrib.ikev2 as _ikev2
-        globals()['ikev2'] = _ikev2
+
+        globals()["ikev2"] = _ikev2
         super(IkePeer, cls).setUpClass()
         cls.create_pg_interfaces(range(1))
         cls.vapi.cli("ip table add 1")
@@ -1936,7 +2116,7 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params):
             i.resolve_ndp()
 
     def config_tc(self):
-        self.config_params({'dpd_disabled': False})
+        self.config_params({"dpd_disabled": False})
 
     def test_responder(self):
         self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
@@ -1947,53 +2127,67 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params):
         ih = self.get_ike_header(capture[0])
         self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
         plain = self.sa.hmac_and_decrypt(ih)
-        self.assertEqual(plain, b'')
+        self.assertEqual(plain, b"")
 
 
 @tag_fixme_vpp_workers
 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
-    """ test ikev2 responder - cert based auth """
+    """test ikev2 responder - cert based auth"""
+
     def config_tc(self):
-        self.config_params({
-            'udp_encap': True,
-            'auth': 'rsa-sig',
-            'server-key': 'server-key.pem',
-            'client-key': 'client-key.pem',
-            'client-cert': 'client-cert.pem',
-            'server-cert': 'server-cert.pem'})
+        self.config_params(
+            {
+                "udp_encap": True,
+                "auth": "rsa-sig",
+                "server-key": "server-key.pem",
+                "client-key": "client-key.pem",
+                "client-cert": "client-cert.pem",
+                "server-cert": "server-cert.pem",
+            }
+        )
 
 
 @tag_fixme_vpp_workers
-class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
-        (TemplateResponder, Ikev2Params):
+class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
+    TemplateResponder, Ikev2Params
+):
     """
     IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
     """
+
     def config_tc(self):
-        self.config_params({
-            'ike-crypto': ('AES-CBC', 16),
-            'ike-integ': 'SHA2-256-128',
-            'esp-crypto': ('AES-CBC', 24),
-            'esp-integ': 'SHA2-384-192',
-            'ike-dh': '2048MODPgr',
-            'nonce': os.urandom(256),
-            'no_idr_in_auth': True})
+        self.config_params(
+            {
+                "ike-crypto": ("AES-CBC", 16),
+                "ike-integ": "SHA2-256-128",
+                "esp-crypto": ("AES-CBC", 24),
+                "esp-integ": "SHA2-384-192",
+                "ike-dh": "2048MODPgr",
+                "nonce": os.urandom(256),
+                "no_idr_in_auth": True,
+            }
+        )
 
 
 @tag_fixme_vpp_workers
-class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
-        (TemplateResponder, Ikev2Params):
+class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
+    TemplateResponder, Ikev2Params
+):
 
     """
     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
     """
+
     def config_tc(self):
-        self.config_params({
-            'ike-crypto': ('AES-CBC', 32),
-            'ike-integ': 'SHA2-256-128',
-            'esp-crypto': ('AES-GCM-16ICV', 32),
-            'esp-integ': 'NULL',
-            'ike-dh': '3072MODPgr'})
+        self.config_params(
+            {
+                "ike-crypto": ("AES-CBC", 32),
+                "ike-integ": "SHA2-256-128",
+                "esp-crypto": ("AES-GCM-16ICV", 32),
+                "esp-integ": "NULL",
+                "ike-dh": "3072MODPgr",
+            }
+        )
 
 
 @tag_fixme_vpp_workers
@@ -2002,20 +2196,21 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
     IKE:AES_GCM_16_256
     """
 
-    IKE_NODE_SUFFIX = 'ip6'
+    IKE_NODE_SUFFIX = "ip6"
 
     def config_tc(self):
-        self.config_params({
-            'del_sa_from_responder': True,
-            'ip6': True,
-            'natt': True,
-            'ike-crypto': ('AES-GCM-16ICV', 32),
-            'ike-integ': 'NULL',
-            'ike-dh': '2048MODPgr',
-            'loc_ts': {'start_addr': 'ab:cd::0',
-                       'end_addr': 'ab:cd::10'},
-            'rem_ts': {'start_addr': '11::0',
-                       'end_addr': '11::100'}})
+        self.config_params(
+            {
+                "del_sa_from_responder": True,
+                "ip6": True,
+                "natt": True,
+                "ike-crypto": ("AES-GCM-16ICV", 32),
+                "ike-integ": "NULL",
+                "ike-dh": "2048MODPgr",
+                "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
+                "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
+            }
+        )
 
 
 @tag_fixme_vpp_workers
@@ -2033,8 +2228,8 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
         ih = self.get_ike_header(capture[0])
         self.assertEqual(ih.id, self.sa.msg_id)
         plain = self.sa.hmac_and_decrypt(ih)
-        self.assertEqual(plain, b'')
-        self.assert_counter(1, 'keepalive', 'ip4')
+        self.assertEqual(plain, b"")
+        self.assert_counter(1, "keepalive", "ip4")
         r = self.vapi.ikev2_sa_dump()
         self.assertEqual(1, r[0].sa.stats.n_keepalives)
 
@@ -2044,7 +2239,7 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
 
 
 class TestMalformedMessages(TemplateResponder, Ikev2Params):
-    """ malformed packet test """
+    """malformed packet test"""
 
     def tearDown(self):
         pass
@@ -2053,23 +2248,26 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params):
         self.config_params()
 
     def create_ike_init_msg(self, length=None, payload=None):
-        msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
-                          flags='Initiator', exch_type='IKE_SA_INIT')
+        msg = ikev2.IKEv2(
+            length=length,
+            init_SPI="\x11" * 8,
+            flags="Initiator",
+            exch_type="IKE_SA_INIT",
+        )
         if payload is not None:
             msg /= payload
-        return self.create_packet(self.pg0, msg, self.sa.sport,
-                                  self.sa.dport)
+        return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
 
     def verify_bad_packet_length(self):
-        ike_msg = self.create_ike_init_msg(length=0xdead)
+        ike_msg = self.create_ike_init_msg(length=0xDEAD)
         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
-        self.assert_counter(self.pkt_count, 'bad_length')
+        self.assert_counter(self.pkt_count, "bad_length")
 
     def verify_bad_sa_payload_length(self):
-        p = ikev2.IKEv2_payload_SA(length=0xdead)
+        p = ikev2.IKEv2_payload_SA(length=0xDEAD)
         ike_msg = self.create_ike_init_msg(payload=p)
         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
-        self.assert_counter(self.pkt_count, 'malformed_packet')
+        self.assert_counter(self.pkt_count, "malformed_packet")
 
     def test_responder(self):
         self.pkt_count = 254
@@ -2077,5 +2275,5 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params):
         self.verify_bad_sa_payload_length()
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)