tests: ikev2: add nat traversal & cert based auth test 63/27663/4
authorFilip Tehlar <ftehlar@cisco.com>
Tue, 23 Jun 2020 20:35:58 +0000 (20:35 +0000)
committerBenoît Ganne <bganne@cisco.com>
Tue, 30 Jun 2020 08:15:01 +0000 (08:15 +0000)
Type: test

Change-Id: I3e8e451c5deaf04f519a471369370c383d9cda3b
Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
src/plugins/ikev2/ikev2.c
src/plugins/ikev2/test/certs/client-cert.pem [new file with mode: 0644]
src/plugins/ikev2/test/certs/client-key.pem [new file with mode: 0644]
src/plugins/ikev2/test/certs/server-cert.pem [new file with mode: 0644]
src/plugins/ikev2/test/certs/server-key.pem [new file with mode: 0644]
src/plugins/ikev2/test/test_ikev2.py

index 964281c..00e925c 100644 (file)
@@ -3364,9 +3364,9 @@ ikev2_set_profile_auth (vlib_main_t * vm, u8 * name, u8 auth_method,
       vec_add1 (p->auth.data, 0);
       if (p->auth.key)
        EVP_PKEY_free (p->auth.key);
-      p->auth.key = ikev2_load_cert_file (auth_data);
+      p->auth.key = ikev2_load_cert_file (p->auth.data);
       if (p->auth.key == NULL)
-       return clib_error_return (0, "load cert '%s' failed", auth_data);
+       return clib_error_return (0, "load cert '%s' failed", p->auth.data);
     }
 
   return 0;
diff --git a/src/plugins/ikev2/test/certs/client-cert.pem b/src/plugins/ikev2/test/certs/client-cert.pem
new file mode 100644 (file)
index 0000000..b2ee80a
--- /dev/null
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE-----
+MIICzDCCAbSgAwIBAgIIZkIcEDbLZVUwDQYJKoZIhvcNAQELBQAwJjEkMCIGA1UE
+AxMbcm9hZHdhcnJpb3IudnBuLmV4YW1wbGUuY29tMB4XDTE5MDkwMjExMzMwMloX
+DTIyMDkwMTExMzMwMlowJjEkMCIGA1UEAxMbcm9hZHdhcnJpb3IudnBuLmV4YW1w
+bGUuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyLJ+1cTRwYiP
+19l6yEJYf9Oh22u62i+deenCv1zQf/XNX47NMqar1Nx6YOXIWBBL70gql2sh7t8C
+U5/3Aw9J+mvpk5t7Rw2/vpTRf/X8UncD4wpRpSA/rg5Jd/xR/wkCjS4CgvtQavwg
+aLJ2bqMZJDLcj+oFKzPpx2MnUrvPc1lqiep4cIXF0E3qg2UwiEWP7O/6E7CJgt+9
+TXVjK6+foOboHEaW3S0xTVjivqwozotSUDGDLG4h7f3YZGkMmgC2EwyIIA6DFl0m
+Ryg/RMAzqh1wUATWBqnVo3P0gcgF5FQS8a35w4GkIT1Th/zodNQ2/sLq2/WG8kyJ
+mpk6X52e7wIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQBtxCXCijOJlXFIay3iPiXA
+9s9A9HaNdppJsC0nu/xnxOsvJslra8940nQz0gdJSqiOZ0EkKczH35ezrARvpvY4
+kmk2+WiG2mbm+aTPv+HuyCKYRWh+75H9XdLEIvDhEtVIp8ghrGcwD+Ul9KTWaHeF
+obEYPTF5j/xshu2sIHcC43boC9kYF5faPLuCY1c2i/yJz8BmjUvjzbtSCMAXyMRf
+iEEdTv0mddTyTdw21OX6YKOZPCqevDGvlzjqLQT+k5idUMzisUGqD6b7iDAJOK6R
+oE43bK7xrawOntXLniQfE7EW4rl2VxbYDIBv4nwgAaKLr0odjYnYAswTDdvga/vW
+-----END CERTIFICATE-----
diff --git a/src/plugins/ikev2/test/certs/client-key.pem b/src/plugins/ikev2/test/certs/client-key.pem
new file mode 100644 (file)
index 0000000..dacc931
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEAyLJ+1cTRwYiP19l6yEJYf9Oh22u62i+deenCv1zQf/XNX47N
+Mqar1Nx6YOXIWBBL70gql2sh7t8CU5/3Aw9J+mvpk5t7Rw2/vpTRf/X8UncD4wpR
+pSA/rg5Jd/xR/wkCjS4CgvtQavwgaLJ2bqMZJDLcj+oFKzPpx2MnUrvPc1lqiep4
+cIXF0E3qg2UwiEWP7O/6E7CJgt+9TXVjK6+foOboHEaW3S0xTVjivqwozotSUDGD
+LG4h7f3YZGkMmgC2EwyIIA6DFl0mRyg/RMAzqh1wUATWBqnVo3P0gcgF5FQS8a35
+w4GkIT1Th/zodNQ2/sLq2/WG8kyJmpk6X52e7wIDAQABAoIBAFzagGYErosc1Hgo
+DW9ziqBxWJQ87nEd0HzkEw8YMPrSzVblqbhCdoOAEjqcdSmROKCXQeVWUpfCo7/u
+5YiL3U03+hQpvkpsaQcSsS7Drx60I586WfXMysmLoKAbpBAbqguJeu4Auf8VcSUf
+VD3xxaMCyRroScXoOcPe8wWLEPqrU/M4McmiG7cw5w/GAFR6GBlbjX0IQ0HHIn9n
+K1ukXREfWwKczIOXFFzcDfEQupVPAHdZDi+iL93Hzkw89JDQ2WdYCdMbXace3CtL
+1niIgKcfvSgYQznumD6R/MIuhdqcDEOi2REcEv+JaTbzQXBCEaAFIgV/6Apu8FEM
+pMxvmbkCgYEA/GxPIUep2Q4hIcmHB9dUQBB93oy1UBLzoJ5ItACoQ6VHXSrF5N3k
+Ppl5kg6s9mNWLjAXMet03ripv2KiqIvUS4EExFzVgRkDNYkDkl9dKgxL25vsSqCY
+LyRfrlHmiXJ3Lyby1JnhmXSqfOjF9JDb7gmNVLLMqQI23aAdLZWhoEsCgYEAy4qL
+p0BU2/z5pIrH3DGRa4ZlzrDrd6DA1uGqnQ84SfYhtzf8hxHawkqAECGgHWu1j5oM
+itiZJtcS3dAPe8YO8sm97eZ8ykdXZYiJ5FaKQxukLA3TEH4Ol6+dsq+/YElJYiKy
+mEzQem3JvJ92x0gmb3UHo7KwK3eLUY3ZUiw/vW0CgYEA0YAvOMbawCyK8Rjq+mYM
+JdPKNACCp3/jNkbIvqThAqvVjAzpDCfhvNbyYHfPs/sEVvdQ+PycFhIbyJ1btRnA
+zB270CHgjfItiKw8C6scjr5/4zgJKHMbe9zrkKmm9iLLqpXf6vYAmLWMnnkveukT
+r/+7jjWLySXuVYkDTocnCvsCgYEAo+8Ca8K0ljo6Fvxv3DlQFno4pTB5dfBTYL6A
+297WRzumDBtbmK4Ys0OtwVBT1V5a7T31vB6Xu2CeIWjiD56Fi6ZRArsQ5xpumahO
+50mMMmCg6Zp41F+zARjAupoyWSqtoAyyK1gy0Wh7gyAsWnUR/9MSRBY3LcsqupS0
+5CofbEECgYEAluhNF+oGIv7aLP3nXl8wgeYnRG8u4q4B7UAVq51BhC6oYhgG3X+d
+KEhboYaSiD2SEaNc1v9NmeT7r0rADx0xrwbe8J1WgtlAH5lzIIxX6TW4njOmkDBO
+Ys8H7NBi2UwQpsveo7o8NIm5p6WEhhjlyk6hme0UIAKGU2abqisKj6A=
+-----END RSA PRIVATE KEY-----
diff --git a/src/plugins/ikev2/test/certs/server-cert.pem b/src/plugins/ikev2/test/certs/server-cert.pem
new file mode 100644 (file)
index 0000000..212b7fa
--- /dev/null
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE-----
+MIICpjCCAY6gAwIBAgIILBeRA+MsN/IwDQYJKoZIhvcNAQELBQAwEzERMA8GA1UE
+AxMIdnBwLmhvbWUwHhcNMTkwOTAyMTEzMjUzWhcNMjIwOTAxMTEzMjUzWjATMREw
+DwYDVQQDEwh2cHAuaG9tZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
+AOA2L3Ua3oRI0gMQsVI+gllxxHo5XXU/8gCKYdqDaXEKJ8pvnfGXmUTyWuaVtBhK
+yXHubdW0Z5a4t8G0yzhOeUxgYlpA0r4qHdXl2tyaWRRVm25mdnQz0H2c6juqYS1c
+eYJpfB2kK3TZ4ewH2Wh5fHBi4dFB77uHEaVZiPIKHlu9/0Aqtk/r5ql7B1YJlSGM
+qE81dPXsD0wb0obaCfgjaYUSVOUf4RLFsO+UtnvNSdo/qEQoCETjoLEU+uQA3Nrn
+GNcOsLxkmrYMv0GfaUEi8BqowOVeqQcyn4JZFX1wR3GCNVxFh7WCLCgGIZIWXV6M
+B5HTuYKfNaGDVi6MIKqq5PcCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEABPxPf7v+
+KV7butzwl3H1mfbUj0hnT4WBcOyq3aAeOWnJhOlkXSBiGngwu0Nt9Hw/w8gepnHw
+1hAHzX2RKKxlJGtTGzWZrXX9k8BHtqqByFqF8H+ZOUCsGxMhwkGaCEbfMaGsBt0c
+RqLrF1C43oQ+9k5oFCeyFXUt6zWVPqc5iuCDLPxjDS6xviMv8GsitWjCnB2ge6d9
+PoXdhD6vQWT3ntteUD+L/z4zVkpeJeYbRWVJKOqkP0Byi1sKIFCr0oWrQDjGLCuj
+TV92lBt/1iY57GpJUwpizY0UUu8QiMOb5TDbbcr4cVaASolS09QnY7YnZO0rfevR
+QuN4Ct7SNzm16g==
+-----END CERTIFICATE-----
diff --git a/src/plugins/ikev2/test/certs/server-key.pem b/src/plugins/ikev2/test/certs/server-key.pem
new file mode 100644 (file)
index 0000000..dc5db8d
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpgIBAAKCAQEA4DYvdRrehEjSAxCxUj6CWXHEejlddT/yAIph2oNpcQonym+d
+8ZeZRPJa5pW0GErJce5t1bRnlri3wbTLOE55TGBiWkDSviod1eXa3JpZFFWbbmZ2
+dDPQfZzqO6phLVx5gml8HaQrdNnh7AfZaHl8cGLh0UHvu4cRpVmI8goeW73/QCq2
+T+vmqXsHVgmVIYyoTzV09ewPTBvShtoJ+CNphRJU5R/hEsWw75S2e81J2j+oRCgI
+ROOgsRT65ADc2ucY1w6wvGSatgy/QZ9pQSLwGqjA5V6pBzKfglkVfXBHcYI1XEWH
+tYIsKAYhkhZdXowHkdO5gp81oYNWLowgqqrk9wIDAQABAoIBAQDYLcaCPb7wgxni
+lLSz3MtnnFbZlffzdg1K0iJZQr1mnWkSOZ5q80V3gcl9bDgx9+HVNRO3qnL6ku2y
+GjSa/Kbdqk0KQlgvvIH5296zNyrOfWAZCTgLAG50vtflYXjC+Ne7kyN1nPW9e9rl
+7geEQSZnYh/i2JViFXBBIlv1KyZsQq3lW6QmSMpKD/qaNqmmzsGnefCGX0miRYzL
+D5o/SLbcFg60f+FYFNtmU4tqH/hkPXQV/d2s/SmPiFmN1Jdu19whW+mbCj/Unv9A
+sF6Zsz+dTtsFXTc9O2ciREUMa3mrO3Jnefe544LMcLiby4UNRjd+xUin99SgwQBK
+AolK/b0ZAoGBAPbinF6ZIuW53LqXWTc3nB6G8jnJ2Sys7e1R+wGPUYywaMUVxJBE
+ZWO1WO1/10xI14Pm1xlK+pheJ4eaoGQGXQkwaKn/yWjr7MLuItPPYLWLck73/mLU
+PS10YZIkOchTDTqCXfAJwPWFm4kPf2Vw3d4OJ5SS8t6J1XHyPVRdnOqjAoGBAOh9
+R6YWYoTIib6hmnITT2ok+Z9T/QILtsDZ9STQWAGCHufSs2No+XPT7/5j6qtBkEwu
+7bONQQmZ8Z1hGdPnDX+xorZkC2iypvXl3fkDieKxFosgLMJOUaPtiMS68PFdJJD8
+aZZA517EZqU+1KBcDtUC92RhEGsBqnGVKZbeq/WdAoGBAOoM91UvNEg5Jaq3JNsW
+GylNoYFrfDnAT2d3GtDlv9fyvcBp3IkwbjZSi9XMJp2pSLdBNpZB4MRmLm9+BaQN
+7FFz1SWk2ppfX3uFKvtGVFaHtnxq2n1y2/SlqoJTWXwyUA4x3UBc3tNkMImFaa8P
+iyaioJ9XmGukm/eEQOQkQ0ilAoGBAInW/UvzqaWgYEqt9av81vDPy1Azs6Yq28er
+cjTJceQzCa/YtJMUAhIybfAdvVycGrmQeU4jSZQvuZjdoTqJmu9Cmn4ZAsqoYWUZ
+TGn/Nxk4dW03PlJVrQHMK3K4g3IXDz8G6HM1N/hv6Yu/nMEW05RnldXl1p4a9rsw
+/N9+VeXhAoGBAJHAfZrTUyCMYOBSIJS15rCOchN85tlXVbtEvH803N140wH2T8Qw
+GdiYcTKc8LmWWhrXvFMVdZAE+pyRZkPVU/8Mo6lu1zMTvc+FyNbqfcp3Z4KiOp06
+OUVCvjZicY0neZ53pCaMqo/VqnI35wVvPNdpnFLdimNMJCJpJipn/35i
+-----END RSA PRIVATE KEY-----
index d2d82ba..f0053fd 100644 (file)
@@ -1,17 +1,20 @@
 import os
+from cryptography import x509
 from cryptography.hazmat.backends import default_backend
 from cryptography.hazmat.primitives import hashes, hmac
-from cryptography.hazmat.primitives.asymmetric import dh
+from cryptography.hazmat.primitives.asymmetric import dh, padding
+from cryptography.hazmat.primitives.serialization import load_pem_private_key
 from cryptography.hazmat.primitives.ciphers import (
     Cipher,
     algorithms,
     modes,
 )
+from scapy.layers.ipsec import ESP
 from scapy.layers.inet import IP, UDP, Ether
 from scapy.packet import raw, Raw
 from scapy.utils import long_converter
 from framework import VppTestCase, VppTestRunner
-from vpp_ikev2 import Profile, IDType
+from vpp_ikev2 import Profile, IDType, AuthMethod
 
 
 KEY_PAD = b"Key Pad for IKEv2"
@@ -100,9 +103,17 @@ class IKEv2SA(object):
     def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
                  i_id=None, r_id=None, id_type='fqdn', nonce=None,
                  auth_data=None, local_ts=None, remote_ts=None,
-                 auth_method='shared-key'):
+                 auth_method='shared-key', priv_key=None, natt=False):
+        self.natt = natt
+        if natt:
+            self.sport = 4500
+            self.dport = 4500
+        else:
+            self.sport = 500
+            self.dport = 500
         self.dh_params = None
         self.test = test
+        self.priv_key = priv_key
         self.is_initiator = is_initiator
         nonce = nonce or os.urandom(32)
         self.auth_data = auth_data
@@ -248,8 +259,14 @@ class IKEv2SA(object):
     def auth_init(self):
         prf = self.ike_prf_alg.mod()
         authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
-        psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
-        self.auth_data = self.calc_prf(prf, psk, authmsg)
+        if self.auth_method == 'shared-key':
+            psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
+            self.auth_data = self.calc_prf(prf, psk, authmsg)
+        elif self.auth_method == 'rsa-sig':
+            self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
+                                                hashes.SHA1())
+        else:
+            raise TypeError('unknown auth method type!')
 
     def encrypt(self, data):
         data = self.ike_crypto_alg.pad(data)
@@ -359,15 +376,21 @@ class IKEv2SA(object):
     def esp_crypto_attr(self):
         return self.crypto_attr(self.esp_crypto_key_len)
 
+    def compute_nat_sha1(self, ip, port):
+        data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
+        digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
+        digest.update(data)
+        return digest.finalize()
 
-class TestResponder(VppTestCase):
+
+class TemplateResponder(VppTestCase):
     """ responder test """
 
     @classmethod
     def setUpClass(cls):
         import scapy.contrib.ikev2 as _ikev2
         globals()['ikev2'] = _ikev2
-        super(TestResponder, cls).setUpClass()
+        super(TemplateResponder, cls).setUpClass()
         cls.create_pg_interfaces(range(2))
         for i in cls.pg_interfaces:
             i.admin_up()
@@ -376,40 +399,24 @@ class TestResponder(VppTestCase):
 
     @classmethod
     def tearDownClass(cls):
-        super(TestResponder, cls).tearDownClass()
+        super(TemplateResponder, cls).tearDownClass()
 
     def setUp(self):
-        super(TestResponder, self).setUp()
+        super(TemplateResponder, self).setUp()
         self.config_tc()
-
-    def config_tc(self):
-        self.p = Profile(self, 'pr1')
-        self.p.add_auth(method='shared-key', data=b'$3cr3tpa$$w0rd')
-        self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
-        self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
-        self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
-        self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
         self.p.add_vpp_config()
-
-        self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
-                          r_id=self.p.local_id['data'],
-                          is_initiator=True, auth_data=self.p.auth['data'],
-                          id_type=self.p.local_id['id_type'],
-                          local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
-
-        self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
-                              integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
-                              dh='2048MODPgr')
-        self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
-                              integ='HMAC-SHA1-96')
         self.sa.generate_dh_data()
 
-    def create_ike_msg(self, src_if, msg, sport=500, dport=500):
-        return (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
-                IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
-                UDP(sport=sport, dport=dport) / msg)
+    def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
+        res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+               IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
+               UDP(sport=sport, dport=dport))
+        if natt:
+            # insert non ESP marker
+            res = res / Raw(b'\x00' * 4)
+        return res / msg
 
-    def send_sa_init(self):
+    def send_sa_init(self, behind_nat=False):
         tr_attr = self.sa.ike_crypto_attr()
         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
@@ -424,6 +431,11 @@ class TestResponder(VppTestCase):
         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
                  trans_nb=4, trans=trans))
 
+        if behind_nat:
+            next_payload = 'Notify'
+        else:
+            next_payload = None
+
         self.sa.init_req_packet = (
                 ikev2.IKEv2(init_SPI=self.sa.ispi,
                             flags='Initiator', exch_type='IKE_SA_INIT') /
@@ -431,9 +443,20 @@ class TestResponder(VppTestCase):
                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
                                        group=self.sa.ike_dh,
                                        load=self.sa.dh_pub_key()) /
-                ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce))
-
-        ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet)
+                ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
+                                          load=self.sa.i_nonce))
+
+        if behind_nat:
+            src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
+                                               self.sa.sport)
+            nat_detection = ikev2.IKEv2_payload_Notify(
+                    type='NAT_DETECTION_SOURCE_IP',
+                    load=src_nat)
+            self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
+
+        ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
+                                      self.sa.sport, self.sa.dport,
+                                      self.sa.natt)
         self.pg0.add_stream(ike_msg)
         self.pg0.enable_capture()
         self.pg_start()
@@ -463,7 +486,8 @@ class TestResponder(VppTestCase):
                  ikev2.IKEv2_payload_IDr(next_payload='AUTH',
                  IDtype=self.sa.id_type, load=self.sa.r_id) /
                  ikev2.IKEv2_payload_AUTH(next_payload='SA',
-                 auth_type=2, load=self.sa.auth_data) /
+                 auth_type=AuthMethod.value(self.sa.auth_method),
+                 load=self.sa.auth_data) /
                  ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
                  ikev2.IKEv2_payload_TSi(next_payload='TSr',
                  number_of_TSs=len(tsi),
@@ -490,15 +514,27 @@ class TestResponder(VppTestCase):
         sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
         assert(len(sa_auth) == tlen)
 
-        packet = self.create_ike_msg(self.pg0, sa_auth)
+        packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
+                                     self.sa.dport, self.sa.natt)
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
         capture = self.pg0.get_capture(1)
         self.verify_sa_auth(capture[0])
 
+    def get_ike_header(self, packet):
+        try:
+            ih = packet[ikev2.IKEv2]
+        except IndexError as e:
+            # this is a workaround for getting IKEv2 layer as both ikev2 and
+            # ipsec register for port 4500
+            esp = packet[ESP]
+            ih = self.verify_and_remove_non_esp_marker(esp)
+        return ih
+
     def verify_sa_init(self, packet):
-        ih = packet[ikev2.IKEv2]
+        ih = self.get_ike_header(packet)
+
         self.assertEqual(ih.exch_type, 34)
         self.assertTrue('Response' in ih.flags)
         self.assertEqual(ih.init_SPI, self.sa.ispi)
@@ -515,13 +551,24 @@ class TestResponder(VppTestCase):
         self.sa.calc_keys()
         self.sa.auth_init()
 
+    def verify_and_remove_non_esp_marker(self, packet):
+        if self.sa.natt:
+            # if we are in nat traversal mode check for non esp marker
+            # and remove it
+            data = raw(packet)
+            self.assertEqual(data[:4], b'\x00' * 4)
+            return ikev2.IKEv2(data[4:])
+        else:
+            return packet
+
+    def verify_udp(self, udp):
+        self.assertEqual(udp.sport, self.sa.sport)
+        self.assertEqual(udp.dport, self.sa.dport)
+
     def verify_sa_auth(self, packet):
-        try:
-            ike = packet[ikev2.IKEv2]
-            ep = packet[ikev2.IKEv2_payload_Encrypted]
-        except KeyError as e:
-            self.logger.error("unexpected reply: no IKEv2/Encrypt payload!")
-            raise
+        ike = self.get_ike_header(packet)
+        udp = packet[UDP]
+        self.verify_udp(udp)
         plain = self.sa.hmac_and_decrypt(ike)
         self.sa.calc_child_keys()
 
@@ -545,10 +592,79 @@ class TestResponder(VppTestCase):
         self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
 
     def test_responder(self):
-        self.send_sa_init()
+        self.send_sa_init(self.sa.natt)
         self.send_sa_auth()
         self.verify_child_sas()
 
 
+class Ikev2Params(object):
+    def config_params(self, params={}):
+        is_natt = 'natt' in params and params['natt'] or False
+        self.p = Profile(self, 'pr1')
+
+        if 'auth' in params and params['auth'] == 'rsa-sig':
+            auth_method = 'rsa-sig'
+            work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
+            self.vapi.ikev2_set_local_key(
+                    key_file=work_dir + params['server-key'])
+
+            client_file = work_dir + params['client-cert']
+            server_pem = open(work_dir + params['server-cert']).read()
+            client_priv = open(work_dir + params['client-key']).read()
+            client_priv = load_pem_private_key(str.encode(client_priv), None,
+                                               default_backend())
+            self.peer_cert = x509.load_pem_x509_certificate(
+                    str.encode(server_pem),
+                    default_backend())
+            self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
+            auth_data = None
+        else:
+            auth_data = b'$3cr3tpa$$w0rd'
+            self.p.add_auth(method='shared-key', data=auth_data)
+            auth_method = 'shared-key'
+            client_priv = None
+
+        self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
+        self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
+        self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
+        self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
+
+        self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
+                          r_id=self.p.local_id['data'],
+                          id_type=self.p.local_id['id_type'], natt=is_natt,
+                          priv_key=client_priv, auth_method=auth_method,
+                          auth_data=auth_data,
+                          local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
+
+        self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
+                              integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
+                              dh='2048MODPgr')
+        self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
+                              integ='HMAC-SHA1-96')
+
+
+class TestResponderNATT(TemplateResponder, Ikev2Params):
+    """ test ikev2 responder - nat traversal """
+    def config_tc(self):
+        self.config_params(
+                {'natt': True})
+
+
+class TestResponderPsk(TemplateResponder, Ikev2Params):
+    """ test ikev2 responder - pre shared key auth """
+    def config_tc(self):
+        self.config_params()
+
+
+class TestResponderRsaSign(TemplateResponder, Ikev2Params):
+    """ test ikev2 responder - cert based auth """
+    def config_tc(self):
+        self.config_params({
+            'auth': 'rsa-sig',
+            'server-key': 'server-key.pem',
+            'client-key': 'client-key.pem',
+            'client-cert': 'client-cert.pem',
+            'server-cert': 'server-cert.pem'})
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)