One armed NAT (VPP-1035)
[vpp.git] / test / test_nat.py
index 73e9e21..2afa1dd 100644 (file)
@@ -537,7 +537,7 @@ class TestNAT44(MethodHolder):
             cls.ipfix_src_port = 4739
             cls.ipfix_domain_id = 1
 
-            cls.create_pg_interfaces(range(9))
+            cls.create_pg_interfaces(range(10))
             cls.interfaces = list(cls.pg_interfaces[0:4])
 
             for i in cls.interfaces:
@@ -572,6 +572,18 @@ class TestNAT44(MethodHolder):
             cls.pg7.admin_up()
             cls.pg8.admin_up()
 
+            cls.pg9.generate_remote_hosts(2)
+            cls.pg9.config_ip4()
+            ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
+            cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
+                                                  ip_addr_n,
+                                                  24)
+            cls.pg9.admin_up()
+            cls.pg9.resolve_arp()
+            cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
+            cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
+            cls.pg9.resolve_arp()
+
         except Exception:
             super(TestNAT44, cls).tearDownClass()
             raise
@@ -614,6 +626,10 @@ class TestNAT44(MethodHolder):
 
         interfaces = self.vapi.nat44_interface_dump()
         for intf in interfaces:
+            if intf.is_inside > 1:
+                self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
+                                                          0,
+                                                          is_add=0)
             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
                                                       intf.is_inside,
                                                       is_add=0)
@@ -2377,6 +2393,62 @@ class TestNAT44(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
+    def test_one_armed_nat44(self):
+        """ One armed NAT44 """
+        remote_host = self.pg9.remote_hosts[0]
+        local_host = self.pg9.remote_hosts[1]
+        external_port = 0
+
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
+                                                  is_inside=0)
+
+        # in2out
+        p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+             IP(src=local_host.ip4, dst=remote_host.ip4) /
+             TCP(sport=12345, dport=80))
+        self.pg9.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg9.get_capture(1)
+        p = capture[0]
+        try:
+            ip = p[IP]
+            tcp = p[TCP]
+            self.assertEqual(ip.src, self.nat_addr)
+            self.assertEqual(ip.dst, remote_host.ip4)
+            self.assertNotEqual(tcp.sport, 12345)
+            external_port = tcp.sport
+            self.assertEqual(tcp.dport, 80)
+            self.check_tcp_checksum(p)
+            self.check_ip_checksum(p)
+        except:
+            self.logger.error(ppp("Unexpected or invalid packet:", p))
+            raise
+
+        # out2in
+        p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+             IP(src=remote_host.ip4, dst=self.nat_addr) /
+             TCP(sport=80, dport=external_port))
+        self.pg9.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg9.get_capture(1)
+        p = capture[0]
+        try:
+            ip = p[IP]
+            tcp = p[TCP]
+            self.assertEqual(ip.src, remote_host.ip4)
+            self.assertEqual(ip.dst, local_host.ip4)
+            self.assertEqual(tcp.sport, 80)
+            self.assertEqual(tcp.dport, 12345)
+            self.check_tcp_checksum(p)
+            self.check_ip_checksum(p)
+        except:
+            self.logger.error(ppp("Unexpected or invalid packet:", p))
+            raise
+
     def tearDown(self):
         super(TestNAT44, self).tearDown()
         if not self.vpp_dead:
@@ -3048,7 +3120,7 @@ class TestNAT64(MethodHolder):
             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
                                                    cls.vrf1_nat_addr)
 
-            cls.create_pg_interfaces(range(3))
+            cls.create_pg_interfaces(range(4))
             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
             cls.ip6_interfaces.append(cls.pg_interfaces[2])
             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
@@ -3069,6 +3141,12 @@ class TestNAT64(MethodHolder):
                 i.config_ip4()
                 i.resolve_arp()
 
+            cls.pg3.admin_up()
+            cls.pg3.config_ip4()
+            cls.pg3.resolve_arp()
+            cls.pg3.config_ip6()
+            cls.pg3.configure_ipv6_neighbors()
+
         except Exception:
             super(TestNAT64, cls).tearDownClass()
             raise
@@ -3763,6 +3841,62 @@ class TestNAT64(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", packet))
             raise
 
+    def test_one_armed_nat64(self):
+        """ One armed NAT64 """
+        external_port = 0
+        remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
+                                           '64:ff9b::',
+                                           96)
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
+
+        # in2out
+        p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
+             IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
+             TCP(sport=12345, dport=80))
+        self.pg3.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg3.get_capture(1)
+        p = capture[0]
+        try:
+            ip = p[IP]
+            tcp = p[TCP]
+            self.assertEqual(ip.src, self.nat_addr)
+            self.assertEqual(ip.dst, self.pg3.remote_ip4)
+            self.assertNotEqual(tcp.sport, 12345)
+            external_port = tcp.sport
+            self.assertEqual(tcp.dport, 80)
+            self.check_tcp_checksum(p)
+            self.check_ip_checksum(p)
+        except:
+            self.logger.error(ppp("Unexpected or invalid packet:", p))
+            raise
+
+        # out2in
+        p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
+             IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
+             TCP(sport=80, dport=external_port))
+        self.pg3.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg3.get_capture(1)
+        p = capture[0]
+        try:
+            ip = p[IPv6]
+            tcp = p[TCP]
+            self.assertEqual(ip.src, remote_host_ip6)
+            self.assertEqual(ip.dst, self.pg3.remote_ip6)
+            self.assertEqual(tcp.sport, 80)
+            self.assertEqual(tcp.dport, 12345)
+            self.check_tcp_checksum(p)
+        except:
+            self.logger.error(ppp("Unexpected or invalid packet:", p))
+            raise
+
     def nat64_get_ses_num(self):
         """
         Return number of active NAT64 sessions.
@@ -3778,6 +3912,10 @@ class TestNAT64(MethodHolder):
 
         interfaces = self.vapi.nat64_interface_dump()
         for intf in interfaces:
+            if intf.is_inside > 1:
+                self.vapi.nat64_add_del_interface(intf.sw_if_index,
+                                                  0,
+                                                  is_add=0)
             self.vapi.nat64_add_del_interface(intf.sw_if_index,
                                               intf.is_inside,
                                               is_add=0)