ikev2: add option to disable NAT traversal
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
index 4f64b56..d065d46 100644 (file)
@@ -1,4 +1,5 @@
 import os
+from socket import inet_pton
 from cryptography import x509
 from cryptography.hazmat.backends import default_backend
 from cryptography.hazmat.primitives import hashes, hmac
@@ -112,9 +113,7 @@ class CryptoAlgo(object):
                                self.mode(nonce, icv, len(icv)),
                                default_backend()).decryptor()
             decryptor.authenticate_additional_data(aad)
-            pt = decryptor.update(ct) + decryptor.finalize()
-            pad_len = pt[-1] + 1
-            return pt[:-pad_len]
+            return decryptor.update(ct) + decryptor.finalize()
 
     def pad(self, data):
         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
@@ -152,10 +151,28 @@ PRF_ALGOS = {
                                   hashes.SHA256, 32),
 }
 
+CRYPTO_IDS = {
+    12: 'AES-CBC',
+    20: 'AES-GCM-16ICV',
+}
+
+INTEG_IDS = {
+    2: 'HMAC-SHA1-96',
+    12: 'SHA2-256-128',
+    13: 'SHA2-384-192',
+    14: 'SHA2-512-256',
+}
+
 
 class IKEv2ChildSA(object):
-    def __init__(self, local_ts, remote_ts, spi=None):
-        self.spi = spi or os.urandom(4)
+    def __init__(self, local_ts, remote_ts, is_initiator):
+        spi = os.urandom(4)
+        if is_initiator:
+            self.ispi = spi
+            self.rspi = None
+        else:
+            self.rspi = spi
+            self.ispi = None
         self.local_ts = local_ts
         self.remote_ts = remote_ts
 
@@ -194,7 +211,8 @@ class IKEv2SA(object):
             self.rspi = spi
             self.ispi = 8 * b'\x00'
             self.r_nonce = nonce
-        self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
+        self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
+                          self.is_initiator)]
 
     def new_msg_id(self):
         self.msg_id += 1
@@ -434,14 +452,17 @@ class IKEv2SA(object):
             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
             ct = ep.load[:-GCM_ICV_SIZE]
             tag = ep.load[-GCM_ICV_SIZE:]
-            return self.decrypt(ct, raw(ike)[:aad_len], tag)
+            plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
         else:
             self.verify_hmac(raw(ike))
             integ_trunc = self.ike_integ_alg.trunc_len
 
             # remove ICV and decrypt payload
             ct = ep.load[:-integ_trunc]
-            return self.decrypt(ct)
+            plain = self.decrypt(ct)
+        # remove padding
+        pad_len = plain[-1]
+        return plain[:-pad_len - 1]
 
     def build_ts_addr(self, ts, version):
         return {'starting_address_v' + version: ts['start_addr'],
@@ -510,8 +531,10 @@ class IKEv2SA(object):
     def esp_crypto_attr(self):
         return self.crypto_attr(self.esp_crypto_key_len)
 
-    def compute_nat_sha1(self, ip, port):
-        data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
+    def compute_nat_sha1(self, ip, port, rspi=None):
+        if rspi is None:
+            rspi = self.rspi
+        data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
         digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
         digest.update(data)
         return digest.finalize()
@@ -537,14 +560,29 @@ class IkePeer(VppTestCase):
     def tearDownClass(cls):
         super(IkePeer, cls).tearDownClass()
 
+    def tearDown(self):
+        super(IkePeer, self).tearDown()
+        if self.del_sa_from_responder:
+            self.initiate_del_sa_from_responder()
+        else:
+            self.initiate_del_sa_from_initiator()
+        r = self.vapi.ikev2_sa_dump()
+        self.assertEqual(len(r), 0)
+        sas = self.vapi.ipsec_sa_dump()
+        self.assertEqual(len(sas), 0)
+        self.p.remove_vpp_config()
+        self.assertIsNone(self.p.query_vpp_config())
+
     def setUp(self):
         super(IkePeer, self).setUp()
         self.config_tc()
         self.p.add_vpp_config()
         self.assertIsNotNone(self.p.query_vpp_config())
-        self.sa.generate_dh_data()
+        if self.sa.is_initiator:
+            self.sa.generate_dh_data()
         self.vapi.cli('ikev2 set logging level 4')
         self.vapi.cli('event-lo clear')
+        self.vapi.cli('ikev2 dpd disable')
 
     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
                       use_ip6=False):
@@ -577,6 +615,7 @@ class IkePeer(VppTestCase):
             esp = packet[ESP]
             ih = self.verify_and_remove_non_esp_marker(esp)
         self.assertEqual(ih.version, 0x20)
+        self.assertNotIn('Version', ih.flags)
         return ih
 
     def verify_and_remove_non_esp_marker(self, packet):
@@ -623,16 +662,30 @@ class IkePeer(VppTestCase):
         assert(len(res) == tlen)
         return res
 
-    def verify_ipsec_sas(self):
+    def verify_ipsec_sas(self, is_rekey=False):
         sas = self.vapi.ipsec_sa_dump()
-        self.assertEqual(len(sas), 2)
+        if is_rekey:
+            # after rekey there is a short period of time in which old
+            # inbound SA is still present
+            sa_count = 3
+        else:
+            sa_count = 2
+        self.assertEqual(len(sas), sa_count)
         e = VppEnum.vl_api_ipsec_sad_flags_t
         if self.sa.is_initiator:
-            sa0 = sas[0].entry
-            sa1 = sas[1].entry
+            if is_rekey:
+                sa0 = sas[0].entry
+                sa1 = sas[2].entry
+            else:
+                sa0 = sas[0].entry
+                sa1 = sas[1].entry
         else:
-            sa1 = sas[0].entry
-            sa0 = sas[1].entry
+            if is_rekey:
+                sa0 = sas[2].entry
+                sa1 = sas[0].entry
+            else:
+                sa1 = sas[0].entry
+                sa0 = sas[1].entry
 
         c = self.sa.child_sas[0]
 
@@ -720,6 +773,8 @@ class IkePeer(VppTestCase):
             self.verify_keymat(csa.keys, c, 'sk_ar')
         self.verify_keymat(csa.keys, c, 'sk_ei')
         self.verify_keymat(csa.keys, c, 'sk_er')
+        self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
+        self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
         tsi = tsi[0]
@@ -772,8 +827,79 @@ class IkePeer(VppTestCase):
 class TemplateInitiator(IkePeer):
     """ initiator test template """
 
-    def tearDown(self):
-        super(TemplateInitiator, self).tearDown()
+    def initiate_del_sa_from_initiator(self):
+        ispi = int.from_bytes(self.sa.ispi, 'little')
+        self.pg0.enable_capture()
+        self.pg_start()
+        self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertNotIn('Response', ih.flags)
+        self.assertIn('Initiator', ih.flags)
+        self.assertEqual(ih.init_SPI, self.sa.ispi)
+        self.assertEqual(ih.resp_SPI, self.sa.rspi)
+        plain = self.sa.hmac_and_decrypt(ih)
+        d = ikev2.IKEv2_payload_Delete(plain)
+        self.assertEqual(d.proto, 1)  # proto=IKEv2
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Response', exch_type='INFORMATIONAL',
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, b'', None)
+        self.send_and_assert_no_replies(self.pg0, resp)
+
+    def verify_del_sa(self, packet):
+        ih = self.get_ike_header(packet)
+        self.assertEqual(ih.id, self.sa.msg_id)
+        self.assertEqual(ih.exch_type, 37)  # exchange informational
+        self.assertIn('Response', ih.flags)
+        self.assertIn('Initiator', ih.flags)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+
+    def initiate_del_sa_from_responder(self):
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             exch_type='INFORMATIONAL',
+                             id=self.sa.new_msg_id())
+        del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+        ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+        packet = self.create_packet(self.pg0, ike_msg,
+                                    self.sa.sport, self.sa.dport,
+                                    self.sa.natt, self.ip6)
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        self.verify_del_sa(capture[0])
+
+    @staticmethod
+    def find_notify_payload(packet, notify_type):
+        n = packet[ikev2.IKEv2_payload_Notify]
+        while n is not None:
+            if n.type == notify_type:
+                return n
+            n = n.payload
+        return None
+
+    def verify_nat_detection(self, packet):
+        if self.ip6:
+            iph = packet[IPv6]
+        else:
+            iph = packet[IP]
+        udp = packet[UDP]
+
+        # NAT_DETECTION_SOURCE_IP
+        s = self.find_notify_payload(packet, 16388)
+        self.assertIsNotNone(s)
+        src_sha = self.sa.compute_nat_sha1(
+                inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
+        self.assertEqual(s.load, src_sha)
+
+        # NAT_DETECTION_DESTINATION_IP
+        s = self.find_notify_payload(packet, 16389)
+        self.assertIsNotNone(s)
+        dst_sha = self.sa.compute_nat_sha1(
+                inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
+        self.assertEqual(s.load, dst_sha)
 
     def verify_sa_init_request(self, packet):
         ih = packet[ikev2.IKEv2]
@@ -798,9 +924,24 @@ class TemplateInitiator(IkePeer):
         self.assertEqual(prop.trans[2].transform_id,
                          self.p.ike_transforms['dh_group'])
 
+        self.verify_nat_detection(packet)
+        self.sa.set_ike_props(
+                    crypto='AES-GCM-16ICV', crypto_key_len=32,
+                    integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
+        self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
+                              integ='SHA2-256-128')
+        self.sa.generate_dh_data()
         self.sa.complete_dh_data()
         self.sa.calc_keys()
 
+    def update_esp_transforms(self, trans, sa):
+        while trans:
+            if trans.transform_type == 1:  # ecryption
+                sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
+            elif trans.transform_type == 3:  # integrity
+                sa.esp_integ = INTEG_IDS[trans.transform_id]
+            trans = trans.payload
+
     def verify_sa_auth_req(self, packet):
         ih = self.get_ike_header(packet)
         self.assertEqual(ih.resp_SPI, self.sa.rspi)
@@ -818,6 +959,11 @@ class TemplateInitiator(IkePeer):
         idr = ikev2.IKEv2_payload_IDr(idi.payload)
         self.assertEqual(idi.load, self.sa.i_id)
         self.assertEqual(idr.load, self.sa.r_id)
+        prop = idi[ikev2.IKEv2_payload_Proposal]
+        c = self.sa.child_sas[0]
+        c.ispi = prop.SPI
+        self.update_esp_transforms(
+                prop[ikev2.IKEv2_payload_Transform], self.sa)
 
     def send_init_response(self):
         tr_attr = self.sa.ike_crypto_attr()
@@ -871,8 +1017,9 @@ class TemplateInitiator(IkePeer):
                  transform_type='Extended Sequence Number',
                  transform_id='ESN'))
 
+        c = self.sa.child_sas[0]
         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
-                 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+                 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
         plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
@@ -912,22 +1059,41 @@ class TemplateInitiator(IkePeer):
 class TemplateResponder(IkePeer):
     """ responder test template """
 
-    def tearDown(self):
-        super(TemplateResponder, self).tearDown()
-        if self.sa.is_initiator:
-            self.initiate_del_sa()
-            r = self.vapi.ikev2_sa_dump()
-            self.assertEqual(len(r), 0)
-
-        self.p.remove_vpp_config()
-        self.assertIsNone(self.p.query_vpp_config())
+    def initiate_del_sa_from_responder(self):
+        self.pg0.enable_capture()
+        self.pg_start()
+        self.vapi.ikev2_initiate_del_ike_sa(
+                ispi=int.from_bytes(self.sa.ispi, 'little'))
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertNotIn('Response', ih.flags)
+        self.assertNotIn('Initiator', ih.flags)
+        self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
+        plain = self.sa.hmac_and_decrypt(ih)
+        d = ikev2.IKEv2_payload_Delete(plain)
+        self.assertEqual(d.proto, 1)  # proto=IKEv2
+        self.assertEqual(ih.init_SPI, self.sa.ispi)
+        self.assertEqual(ih.resp_SPI, self.sa.rspi)
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Initiator+Response',
+                             exch_type='INFORMATIONAL',
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, b'', None)
+        self.send_and_assert_no_replies(self.pg0, resp)
 
     def verify_del_sa(self, packet):
         ih = self.get_ike_header(packet)
         self.assertEqual(ih.id, self.sa.msg_id)
         self.assertEqual(ih.exch_type, 37)  # exchange informational
+        self.assertIn('Response', ih.flags)
+        self.assertNotIn('Initiator', ih.flags)
+        self.assertEqual(ih.next_payload, 46)  # Encrypted
+        self.assertEqual(ih.init_SPI, self.sa.ispi)
+        self.assertEqual(ih.resp_SPI, self.sa.rspi)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
 
-    def initiate_del_sa(self):
+    def initiate_del_sa_from_initiator(self):
         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
                              flags='Initiator', exch_type='INFORMATIONAL',
                              id=self.sa.new_msg_id())
@@ -957,11 +1123,6 @@ class TemplateResponder(IkePeer):
         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
                  trans_nb=4, trans=trans))
 
-        if behind_nat:
-            next_payload = 'Notify'
-        else:
-            next_payload = None
-
         self.sa.init_req_packet = (
                 ikev2.IKEv2(init_SPI=self.sa.ispi,
                             flags='Initiator', exch_type='IKE_SA_INIT') /
@@ -969,19 +1130,21 @@ class TemplateResponder(IkePeer):
                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
                                        group=self.sa.ike_dh,
                                        load=self.sa.my_dh_pub_key) /
-                ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
+                ikev2.IKEv2_payload_Nonce(next_payload='Notify',
                                           load=self.sa.i_nonce))
 
         if behind_nat:
             src_address = b'\x0a\x0a\x0a\x01'
         else:
-            src_address = bytes(self.pg0.local_ip4, 'ascii')
+            src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
 
         src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
-        dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
-                                           self.sa.sport)
+        dst_nat = self.sa.compute_nat_sha1(
+                inet_pton(socket.AF_INET, self.pg0.local_ip4),
+                self.sa.dport)
         nat_src_detection = ikev2.IKEv2_payload_Notify(
-                type='NAT_DETECTION_SOURCE_IP', load=src_nat)
+                type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+                next_payload='Notify')
         nat_dst_detection = ikev2.IKEv2_payload_Notify(
                 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
         self.sa.init_req_packet = (self.sa.init_req_packet /
@@ -997,8 +1160,9 @@ class TemplateResponder(IkePeer):
         capture = self.pg0.get_capture(1)
         self.verify_sa_init(capture[0])
 
-    def send_sa_auth(self):
+    def generate_auth_payload(self, last_payload=None, is_rekey=False):
         tr_attr = self.sa.esp_crypto_attr()
+        last_payload = last_payload or 'Notify'
         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
                  transform_id=self.sa.esp_crypto, length=tr_attr[1],
                  key_length=tr_attr[0]) /
@@ -1011,32 +1175,45 @@ class TemplateResponder(IkePeer):
                  transform_type='Extended Sequence Number',
                  transform_id='ESN'))
 
+        c = self.sa.child_sas[0]
         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
-                 SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+                 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
-        plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
-                 IDtype=self.sa.id_type, load=self.sa.i_id) /
-                 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
-                 IDtype=self.sa.id_type, load=self.sa.r_id) /
-                 ikev2.IKEv2_payload_AUTH(next_payload='SA',
+        plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
                  auth_type=AuthMethod.value(self.sa.auth_method),
                  load=self.sa.auth_data) /
                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
-                 number_of_TSs=len(tsi),
-                 traffic_selector=tsi) /
-                 ikev2.IKEv2_payload_TSr(next_payload='Notify',
-                 number_of_TSs=len(tsr),
-                 traffic_selector=tsr) /
-                 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+                 number_of_TSs=len(tsi), traffic_selector=tsi) /
+                 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
+                 number_of_TSs=len(tsr), traffic_selector=tsr))
+
+        if is_rekey:
+            first_payload = 'Nonce'
+            plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
+                     next_payload='SA') / plain /
+                     ikev2.IKEv2_payload_Notify(type='REKEY_SA',
+                     proto='ESP', SPI=c.ispi))
+        else:
+            first_payload = 'IDi'
+            ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
+                   IDtype=self.sa.id_type, load=self.sa.i_id) /
+                   ikev2.IKEv2_payload_IDr(next_payload='AUTH',
+                   IDtype=self.sa.id_type, load=self.sa.r_id))
+            plain = ids / plain
+        return plain, first_payload
 
+    def send_sa_auth(self):
+        plain, first_payload = self.generate_auth_payload(
+                    last_payload='Notify')
+        plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
         header = ikev2.IKEv2(
                 init_SPI=self.sa.ispi,
                 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
                 flags='Initiator', exch_type='IKE_AUTH')
 
-        ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
+        ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
         packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
                                     self.sa.dport, self.sa.natt, self.ip6)
         self.pg0.add_stream(packet)
@@ -1050,7 +1227,7 @@ class TemplateResponder(IkePeer):
 
         self.assertEqual(ih.id, self.sa.msg_id)
         self.assertEqual(ih.exch_type, 34)
-        self.assertTrue('Response' in ih.flags)
+        self.assertIn('Response', ih.flags)
         self.assertEqual(ih.init_SPI, self.sa.ispi)
         self.assertNotEqual(ih.resp_SPI, 0)
         self.sa.rspi = ih.resp_SPI
@@ -1072,6 +1249,10 @@ class TemplateResponder(IkePeer):
         self.verify_udp(udp)
         self.assertEqual(ike.id, self.sa.msg_id)
         plain = self.sa.hmac_and_decrypt(ike)
+        idr = ikev2.IKEv2_payload_IDr(plain)
+        prop = idr[ikev2.IKEv2_payload_Proposal]
+        self.assertEqual(prop.SPIsize, 4)
+        self.sa.child_sas[0].rspi = prop.SPI
         self.sa.calc_child_keys()
 
     def test_responder(self):
@@ -1098,6 +1279,8 @@ class Ikev2Params(object):
                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
 
+        self.del_sa_from_responder = False if 'del_sa_from_responder'\
+            not in params else params['del_sa_from_responder']
         is_natt = 'natt' in params and params['natt'] or False
         self.p = Profile(self, 'pr1')
         self.ip6 = False if 'ip6' not in params else params['ip6']
@@ -1156,24 +1339,25 @@ class Ikev2Params(object):
                           auth_data=auth_data,
                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
 
-        ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
-            params['ike-crypto']
-        ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
-            params['ike-integ']
-        ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
-            params['ike-dh']
-
-        esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
-            params['esp-crypto']
-        esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
-            params['esp-integ']
-
-        self.sa.set_ike_props(
-                crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
-                integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
-        self.sa.set_esp_props(
-                crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
-                integ=esp_integ)
+        if is_init:
+            ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
+                params['ike-crypto']
+            ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
+                params['ike-integ']
+            ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
+                params['ike-dh']
+
+            esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
+                params['esp-crypto']
+            esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
+                params['esp-integ']
+
+            self.sa.set_ike_props(
+                    crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
+                    integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
+            self.sa.set_esp_props(
+                    crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
+                    integ=esp_integ)
 
 
 class TestApi(VppTestCase):
@@ -1209,6 +1393,8 @@ class TestApi(VppTestCase):
             p.set_lifetime_data(cfg['lifetime_data'])
         if 'tun_itf' in cfg:
             p.set_tunnel_interface(cfg['tun_itf'])
+        if 'natt_disabled' in cfg and cfg['natt_disabled']:
+            p.disable_natt()
         p.add_vpp_config()
         return p
 
@@ -1247,6 +1433,7 @@ class TestApi(VppTestCase):
         conf = {
             'p1': {
                 'name': 'p1',
+                'natt_disabled': True,
                 'loc_id': ('fqdn', b'vpp.home'),
                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
                 'loc_ts': loc_ts4,
@@ -1350,6 +1537,9 @@ class TestApi(VppTestCase):
         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
         self.verify_auth(ap.auth, cp['auth'])
+        natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
+        self.assertTrue(natt_dis == ap.natt_disabled)
+
         if 'lifetime_data' in cp:
             self.verify_lifetime_data(ap, cp['lifetime_data'])
         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
@@ -1361,8 +1551,72 @@ class TestApi(VppTestCase):
 
 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
     """ test ikev2 initiator - pre shared key auth """
+
+    def config_tc(self):
+        self.config_params({
+            'is_initiator': False,  # seen from test case perspective
+                                    # thus vpp is initiator
+            'responder': {'sw_if_index': self.pg0.sw_if_index,
+                           'addr': self.pg0.remote_ip4},
+            'ike-crypto': ('AES-GCM-16ICV', 32),
+            'ike-integ': 'NULL',
+            'ike-dh': '3072MODPgr',
+            'ike_transforms': {
+                'crypto_alg': 20,  # "aes-gcm-16"
+                'crypto_key_size': 256,
+                'dh_group': 15,  # "modp-3072"
+            },
+            'esp_transforms': {
+                'crypto_alg': 12,  # "aes-cbc"
+                'crypto_key_size': 256,
+                # "hmac-sha2-256-128"
+                'integ_alg': 12}})
+
+
+class TestInitiatorRekey(TestInitiatorPsk):
+    """ test ikev2 initiator - rekey """
+
+    def rekey_from_initiator(self):
+        ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+        self.pg0.enable_capture()
+        self.pg_start()
+        self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.exch_type, 36)  # CHILD_SA
+        self.assertNotIn('Response', ih.flags)
+        self.assertIn('Initiator', ih.flags)
+        plain = self.sa.hmac_and_decrypt(ih)
+        sa = ikev2.IKEv2_payload_SA(plain)
+        prop = sa[ikev2.IKEv2_payload_Proposal]
+        nonce = sa[ikev2.IKEv2_payload_Nonce]
+        self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+        self.sa.r_nonce = self.sa.i_nonce
+        # update new responder SPI
+        self.sa.child_sas[0].ispi = prop.SPI
+        self.sa.child_sas[0].rspi = prop.SPI
+        self.sa.calc_child_keys()
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Response', exch_type=36,
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, sa, 'SA')
+        packet = self.create_packet(self.pg0, resp, self.sa.sport,
+                                    self.sa.dport, self.sa.natt, self.ip6)
+        self.send_and_assert_no_replies(self.pg0, packet)
+
+    def test_initiator(self):
+        super(TestInitiatorRekey, self).test_initiator()
+        self.rekey_from_initiator()
+        self.verify_ike_sas()
+        self.verify_ipsec_sas(is_rekey=True)
+
+
+class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
+    """ test ikev2 initiator - delete IKE SA from responder """
+
     def config_tc(self):
         self.config_params({
+            'del_sa_from_responder': True,
             'is_initiator': False,  # seen from test case perspective
                                     # thus vpp is initiator
             'responder': {'sw_if_index': self.pg0.sw_if_index,
@@ -1395,6 +1649,40 @@ class TestResponderPsk(TemplateResponder, Ikev2Params):
         self.config_params()
 
 
+class TestResponderRekey(TestResponderPsk):
+    """ test ikev2 responder - rekey """
+
+    def rekey_from_initiator(self):
+        sa, first_payload = self.generate_auth_payload(is_rekey=True)
+        header = ikev2.IKEv2(
+                init_SPI=self.sa.ispi,
+                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+                flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+        ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+        packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
+                                    self.sa.dport, self.sa.natt, self.ip6)
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        plain = self.sa.hmac_and_decrypt(ih)
+        sa = ikev2.IKEv2_payload_SA(plain)
+        prop = sa[ikev2.IKEv2_payload_Proposal]
+        nonce = sa[ikev2.IKEv2_payload_Nonce]
+        self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+        # update new responder SPI
+        self.sa.child_sas[0].rspi = prop.SPI
+
+    def test_responder(self):
+        super(TestResponderRekey, self).test_responder()
+        self.rekey_from_initiator()
+        self.sa.calc_child_keys()
+        self.verify_ike_sas()
+        self.verify_ipsec_sas(is_rekey=True)
+
+
 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
     """ test ikev2 responder - cert based auth """
     def config_tc(self):
@@ -1440,6 +1728,7 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
     """
     def config_tc(self):
         self.config_params({
+            'del_sa_from_responder': True,
             'ip6': True,
             'natt': True,
             'ike-crypto': ('AES-GCM-16ICV', 32),