ikev2: test responder behind NAT 27/30327/2
authorFilip Tehlar <ftehlar@cisco.com>
Fri, 4 Dec 2020 17:38:11 +0000 (17:38 +0000)
committerBeno�t Ganne <bganne@cisco.com>
Wed, 9 Dec 2020 08:12:09 +0000 (08:12 +0000)
Type: test
Ticket: VPP-1903

Change-Id: I7fab6931833d6e253b7b921172825387302d8f70
Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
src/plugins/ikev2/test/test_ikev2.py

index 3a1ab91..3db3a05 100644 (file)
@@ -183,11 +183,12 @@ class IKEv2SA(object):
     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
-                 auth_method='shared-key', priv_key=None, natt=False,
-                 udp_encap=False):
+                 auth_method='shared-key', priv_key=None, i_natt=False,
+                 r_natt=False, udp_encap=False):
         self.udp_encap = udp_encap
-        self.natt = natt
-        if natt:
+        self.i_natt = i_natt
+        self.r_natt = r_natt
+        if i_natt or r_natt:
             self.sport = 4500
             self.dport = 4500
         else:
@@ -234,6 +235,10 @@ class IKEv2SA(object):
             return self.r_dh_data
         return self.i_dh_data
 
+    @property
+    def natt(self):
+        return self.i_natt or self.r_natt
+
     def compute_secret(self):
         priv = self.dh_private_key
         peer = self.peer_dh_pub_key
@@ -1159,7 +1164,7 @@ class TemplateResponder(IkePeer):
         capture = self.pg0.get_capture(1)
         self.verify_del_sa(capture[0])
 
-    def send_sa_init_req(self, behind_nat=False):
+    def send_sa_init_req(self):
         tr_attr = self.sa.ike_crypto_attr()
         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
@@ -1174,6 +1179,8 @@ class TemplateResponder(IkePeer):
         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
                  trans_nb=4, trans=trans))
 
+        next_payload = None if self.ip6 else 'Notify'
+
         self.sa.init_req_packet = (
                 ikev2.IKEv2(init_SPI=self.sa.ispi,
                             flags='Initiator', exch_type='IKE_SA_INIT') /
@@ -1181,26 +1188,30 @@ class TemplateResponder(IkePeer):
                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
                                        group=self.sa.ike_dh,
                                        load=self.sa.my_dh_pub_key) /
-                ikev2.IKEv2_payload_Nonce(next_payload='Notify',
+                ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
                                           load=self.sa.i_nonce))
 
-        if behind_nat:
-            src_address = b'\x0a\x0a\x0a\x01'
-        else:
-            src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+        if not self.ip6:
+            if self.sa.i_natt:
+                src_address = b'\x0a\x0a\x0a\x01'
+            else:
+                src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
 
-        src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
-        dst_nat = self.sa.compute_nat_sha1(
-                inet_pton(socket.AF_INET, self.pg0.local_ip4),
-                self.sa.dport)
-        nat_src_detection = ikev2.IKEv2_payload_Notify(
-                type='NAT_DETECTION_SOURCE_IP', load=src_nat,
-                next_payload='Notify')
-        nat_dst_detection = ikev2.IKEv2_payload_Notify(
-                type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
-        self.sa.init_req_packet = (self.sa.init_req_packet /
-                                   nat_src_detection /
-                                   nat_dst_detection)
+            if self.sa.r_natt:
+                dst_address = b'\x0a\x0a\x0a\x0a'
+            else:
+                dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+
+            src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+            dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+            nat_src_detection = ikev2.IKEv2_payload_Notify(
+                    type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+                    next_payload='Notify')
+            nat_dst_detection = ikev2.IKEv2_payload_Notify(
+                    type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
+            self.sa.init_req_packet = (self.sa.init_req_packet /
+                                       nat_src_detection /
+                                       nat_dst_detection)
 
         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
                                      self.sa.sport, self.sa.dport,
@@ -1307,7 +1318,7 @@ class TemplateResponder(IkePeer):
         self.sa.calc_child_keys()
 
     def test_responder(self):
-        self.send_sa_init_req(self.sa.natt)
+        self.send_sa_init_req()
         self.send_sa_auth()
         self.verify_ipsec_sas()
         self.verify_ike_sas()
@@ -1336,7 +1347,8 @@ class Ikev2Params(object):
             self.vapi.cli('ikev2 dpd disable')
         self.del_sa_from_responder = False if 'del_sa_from_responder'\
             not in params else params['del_sa_from_responder']
-        is_natt = 'natt' in params and params['natt'] or False
+        i_natt = False if 'i_natt' not in params else params['i_natt']
+        r_natt = False if 'r_natt' not in params else params['r_natt']
         self.p = Profile(self, 'pr1')
         self.ip6 = False if 'ip6' not in params else params['ip6']
 
@@ -1394,7 +1406,8 @@ class Ikev2Params(object):
 
         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
                           is_initiator=is_init,
-                          id_type=self.p.local_id['id_type'], natt=is_natt,
+                          id_type=self.p.local_id['id_type'],
+                          i_natt=i_natt, r_natt=r_natt,
                           priv_key=client_priv, auth_method=auth_method,
                           auth_data=auth_data, udp_encap=udp_encap,
                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
@@ -1608,12 +1621,19 @@ class TestApi(VppTestCase):
             self.assertEqual(ap.tun_itf, 0xffffffff)
 
 
+class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
+    """ test responder - responder behind NAT """
+
+    def config_tc(self):
+        self.config_params({'r_natt': True})
+
+
 class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
     """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
 
     def config_tc(self):
         self.config_params({
-            'natt': True,
+            'i_natt': True,
             'is_initiator': False,  # seen from test case perspective
                                     # thus vpp is initiator
             'responder': {'sw_if_index': self.pg0.sw_if_index,
@@ -1760,11 +1780,11 @@ class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
                 'integ_alg': 12}})
 
 
-class TestResponderNATT(TemplateResponder, Ikev2Params):
-    """ test ikev2 responder - nat traversal """
+class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
+    """ test ikev2 responder - initiator behind NAT """
     def config_tc(self):
         self.config_params(
-                {'natt': True})
+                {'i_natt': True})
 
 
 class TestResponderPsk(TemplateResponder, Ikev2Params):