Add special Twice-NAT feature (VPP-1221)
[vpp.git] / test / test_nat.py
index 7c12619..47f3b8c 100644 (file)
@@ -922,6 +922,9 @@ class TestNAT44(MethodHolder):
             cls.pg0.generate_remote_hosts(3)
             cls.pg0.configure_ipv4_neighbors()
 
+            cls.pg1.generate_remote_hosts(1)
+            cls.pg1.configure_ipv4_neighbors()
+
             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
             cls.vapi.ip_table_add_del(10, is_add=1)
             cls.vapi.ip_table_add_del(20, is_add=1)
@@ -1029,6 +1032,7 @@ class TestNAT44(MethodHolder):
                 vrf_id=sm.vrf_id,
                 protocol=sm.protocol,
                 twice_nat=sm.twice_nat,
+                self_twice_nat=sm.self_twice_nat,
                 out2in_only=sm.out2in_only,
                 tag=sm.tag,
                 external_sw_if_index=sm.external_sw_if_index,
@@ -1042,6 +1046,7 @@ class TestNAT44(MethodHolder):
                 lb_sm.protocol,
                 vrf_id=lb_sm.vrf_id,
                 twice_nat=lb_sm.twice_nat,
+                self_twice_nat=lb_sm.self_twice_nat,
                 out2in_only=lb_sm.out2in_only,
                 tag=lb_sm.tag,
                 is_add=0,
@@ -1072,7 +1077,8 @@ class TestNAT44(MethodHolder):
     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
                                  local_port=0, external_port=0, vrf_id=0,
                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
-                                 proto=0, twice_nat=0, out2in_only=0, tag=""):
+                                 proto=0, twice_nat=0, self_twice_nat=0,
+                                 out2in_only=0, tag=""):
         """
         Add/delete NAT44 static mapping
 
@@ -1085,6 +1091,9 @@ class TestNAT44(MethodHolder):
         :param external_sw_if_index: External interface instead of IP address
         :param proto: IP protocol (Mandatory if port specified)
         :param twice_nat: 1 if translate external host address and port
+        :param self_twice_nat: 1 if translate external host address and port
+                               whenever external host address equals
+                               local address of internal host
         :param out2in_only: if 1 rule is matching only out2in direction
         :param tag: Opaque string tag
         """
@@ -1103,6 +1112,7 @@ class TestNAT44(MethodHolder):
             vrf_id,
             proto,
             twice_nat,
+            self_twice_nat,
             out2in_only,
             tag,
             is_add)
@@ -3844,56 +3854,128 @@ class TestNAT44(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-    def test_twice_nat(self):
-        """ Twice NAT44 """
+    def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
+                         client_id=None):
         twice_nat_addr = '10.0.1.3'
+
         port_in = 8080
+        if lb:
+            if not same_pg:
+                port_in1 = port_in
+                port_in2 = port_in
+            else:
+                port_in1 = port_in+1
+                port_in2 = port_in+2
+
         port_out = 80
         eh_port_out = 4567
-        eh_port_in = 0
+
+        server1 = self.pg0.remote_hosts[0]
+        server2 = self.pg0.remote_hosts[1]
+        if lb and same_pg:
+            server2 = server1
+        if not lb:
+            server = server1
+
+        pg0 = self.pg0
+        if same_pg:
+            pg1 = self.pg0
+        else:
+            pg1 = self.pg1
+
+        eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
+                        client_id == 1)
+
         self.nat44_add_address(self.nat_addr)
         self.nat44_add_address(twice_nat_addr, twice_nat=1)
-        self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
-                                      port_in, port_out, proto=IP_PROTOS.tcp,
-                                      twice_nat=1)
-        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
-        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+        if not lb:
+            self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
+                                          port_in, port_out,
+                                          proto=IP_PROTOS.tcp,
+                                          twice_nat=int(not self_twice_nat),
+                                          self_twice_nat=int(self_twice_nat))
+        else:
+            locals = [{'addr': server1.ip4n,
+                       'port': port_in1,
+                       'probability': 50},
+                      {'addr': server2.ip4n,
+                       'port': port_in2,
+                       'probability': 50}]
+            out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
+            self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
+                                                      port_out,
+                                                      IP_PROTOS.tcp,
+                                                      twice_nat=int(
+                                                          not self_twice_nat),
+                                                      self_twice_nat=int(
+                                                          self_twice_nat),
+                                                      local_num=len(locals),
+                                                      locals=locals)
+        self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
                                                   is_inside=0)
 
-        p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
-             IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+        if same_pg:
+            if not lb:
+                client = server
+            else:
+                assert client_id is not None
+                if client_id == 1:
+                    client = self.pg0.remote_hosts[0]
+                elif client_id == 2:
+                    client = self.pg0.remote_hosts[1]
+        else:
+            client = pg1.remote_hosts[0]
+        p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
+             IP(src=client.ip4, dst=self.nat_addr) /
              TCP(sport=eh_port_out, dport=port_out))
-        self.pg1.add_stream(p)
+        pg1.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture(1)
+        capture = pg0.get_capture(1)
         p = capture[0]
         try:
             ip = p[IP]
             tcp = p[TCP]
-            self.assertEqual(ip.dst, self.pg0.remote_ip4)
-            self.assertEqual(ip.src, twice_nat_addr)
-            self.assertEqual(tcp.dport, port_in)
-            self.assertNotEqual(tcp.sport, eh_port_out)
+            if lb:
+                if ip.dst == server1.ip4:
+                    server = server1
+                    port_in = port_in1
+                else:
+                    server = server2
+                    port_in = port_in2
+            self.assertEqual(ip.dst, server.ip4)
+            if lb and same_pg:
+                self.assertIn(tcp.dport, [port_in1, port_in2])
+            else:
+                self.assertEqual(tcp.dport, port_in)
+            if eh_translate:
+                self.assertEqual(ip.src, twice_nat_addr)
+                self.assertNotEqual(tcp.sport, eh_port_out)
+            else:
+                self.assertEqual(ip.src, client.ip4)
+                self.assertEqual(tcp.sport, eh_port_out)
+            eh_addr_in = ip.src
             eh_port_in = tcp.sport
+            saved_port_in = tcp.dport
             self.check_tcp_checksum(p)
             self.check_ip_checksum(p)
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
-             IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
-             TCP(sport=port_in, dport=eh_port_in))
-        self.pg0.add_stream(p)
+        p = (Ether(src=server.mac, dst=pg0.local_mac) /
+             IP(src=server.ip4, dst=eh_addr_in) /
+             TCP(sport=saved_port_in, dport=eh_port_in))
+        pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture(1)
+        capture = pg1.get_capture(1)
         p = capture[0]
         try:
             ip = p[IP]
             tcp = p[TCP]
-            self.assertEqual(ip.dst, self.pg1.remote_ip4)
+            self.assertEqual(ip.dst, client.ip4)
             self.assertEqual(ip.src, self.nat_addr)
             self.assertEqual(tcp.dport, eh_port_out)
             self.assertEqual(tcp.sport, port_out)
@@ -3903,84 +3985,31 @@ class TestNAT44(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-    def test_twice_nat_lb(self):
-        """ Twice NAT44 local service load balancing """
-        external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
-        twice_nat_addr = '10.0.1.3'
-        local_port = 8080
-        external_port = 80
-        eh_port_out = 4567
-        eh_port_in = 0
-        server1 = self.pg0.remote_hosts[0]
-        server2 = self.pg0.remote_hosts[1]
+    def test_twice_nat(self):
+        """ Twice NAT44 """
+        self.twice_nat_common()
 
-        locals = [{'addr': server1.ip4n,
-                   'port': local_port,
-                   'probability': 50},
-                  {'addr': server2.ip4n,
-                   'port': local_port,
-                   'probability': 50}]
+    def test_self_twice_nat_positive(self):
+        """ Self Twice NAT44 (positive test) """
+        self.twice_nat_common(self_twice_nat=True, same_pg=True)
 
-        self.nat44_add_address(self.nat_addr)
-        self.nat44_add_address(twice_nat_addr, twice_nat=1)
+    def test_self_twice_nat_negative(self):
+        """ Self Twice NAT44 (negative test) """
+        self.twice_nat_common(self_twice_nat=True)
 
-        self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
-                                                  external_port,
-                                                  IP_PROTOS.tcp,
-                                                  twice_nat=1,
-                                                  local_num=len(locals),
-                                                  locals=locals)
-        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
-        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
-                                                  is_inside=0)
+    def test_twice_nat_lb(self):
+        """ Twice NAT44 local service load balancing """
+        self.twice_nat_common(lb=True)
 
-        p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
-             IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
-             TCP(sport=eh_port_out, dport=external_port))
-        self.pg1.add_stream(p)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        capture = self.pg0.get_capture(1)
-        p = capture[0]
-        server = None
-        try:
-            ip = p[IP]
-            tcp = p[TCP]
-            self.assertEqual(ip.src, twice_nat_addr)
-            self.assertIn(ip.dst, [server1.ip4, server2.ip4])
-            if ip.dst == server1.ip4:
-                server = server1
-            else:
-                server = server2
-            self.assertNotEqual(tcp.sport, eh_port_out)
-            eh_port_in = tcp.sport
-            self.assertEqual(tcp.dport, local_port)
-            self.check_tcp_checksum(p)
-            self.check_ip_checksum(p)
-        except:
-            self.logger.error(ppp("Unexpected or invalid packet:", p))
-            raise
+    def test_self_twice_nat_lb_positive(self):
+        """ Self Twice NAT44 local service load balancing (positive test) """
+        self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+                              client_id=1)
 
-        p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
-             IP(src=server.ip4, dst=twice_nat_addr) /
-             TCP(sport=local_port, dport=eh_port_in))
-        self.pg0.add_stream(p)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        capture = self.pg1.get_capture(1)
-        p = capture[0]
-        try:
-            ip = p[IP]
-            tcp = p[TCP]
-            self.assertEqual(ip.src, self.nat_addr)
-            self.assertEqual(ip.dst, self.pg1.remote_ip4)
-            self.assertEqual(tcp.sport, external_port)
-            self.assertEqual(tcp.dport, eh_port_out)
-            self.check_tcp_checksum(p)
-            self.check_ip_checksum(p)
-        except:
-            self.logger.error(ppp("Unexpected or invalid packet:", p))
-            raise
+    def test_self_twice_nat_lb_negative(self):
+        """ Self Twice NAT44 local service load balancing (negative test) """
+        self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+                              client_id=2)
 
     def test_twice_nat_interface_addr(self):
         """ Acquire twice NAT44 addresses from interface """