NAT64: multi-thread support (VPP-891)
[vpp.git] / test / test_nat.py
index b01fe07..3616329 100644 (file)
@@ -704,8 +704,6 @@ class TestNAT44(MethodHolder):
             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
             cls.pg9.resolve_arp()
 
-            random.seed()
-
         except Exception:
             super(TestNAT44, cls).tearDownClass()
             raise
@@ -785,6 +783,17 @@ class TestNAT44(MethodHolder):
                 local_num=0,
                 locals=[])
 
+        identity_mappings = self.vapi.nat44_identity_mapping_dump()
+        for id_m in identity_mappings:
+            self.vapi.nat44_add_del_identity_mapping(
+                addr_only=id_m.addr_only,
+                ip=id_m.ip_address,
+                port=id_m.port,
+                sw_if_index=id_m.sw_if_index,
+                vrf_id=id_m.vrf_id,
+                protocol=id_m.protocol,
+                is_add=0)
+
         adresses = self.vapi.nat44_address_dump()
         for addr in adresses:
             self.vapi.nat44_add_del_address_range(addr.ip_address,
@@ -1192,6 +1201,35 @@ class TestNAT44(MethodHolder):
         self.pg_start()
         self.pg3.assert_nothing_captured()
 
+    def test_identity_nat(self):
+        """ Identity NAT """
+
+        self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+
+        p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+             IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
+             TCP(sport=12345, dport=56789))
+        self.pg1.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        p = capture[0]
+        try:
+            ip = p[IP]
+            tcp = p[TCP]
+            self.assertEqual(ip.dst, self.pg0.remote_ip4)
+            self.assertEqual(ip.src, self.pg1.remote_ip4)
+            self.assertEqual(tcp.dport, 56789)
+            self.assertEqual(tcp.sport, 12345)
+            self.check_tcp_checksum(p)
+            self.check_ip_checksum(p)
+        except:
+            self.logger.error(ppp("Unexpected or invalid packet:", p))
+            raise
+
     def test_static_lb(self):
         """ NAT44 local service load balancing """
         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
@@ -1787,6 +1825,38 @@ class TestNAT44(MethodHolder):
         static_mappings = self.vapi.nat44_static_mapping_dump()
         self.assertEqual(0, len(static_mappings))
 
+    def test_interface_addr_identity_nat(self):
+        """ Identity NAT with addresses from interface """
+
+        port = 53053
+        self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
+        self.vapi.nat44_add_del_identity_mapping(
+            sw_if_index=self.pg7.sw_if_index,
+            port=port,
+            protocol=IP_PROTOS.tcp,
+            addr_only=0)
+
+        # identity mappings with external interface
+        identity_mappings = self.vapi.nat44_identity_mapping_dump()
+        self.assertEqual(1, len(identity_mappings))
+        self.assertEqual(self.pg7.sw_if_index,
+                         identity_mappings[0].sw_if_index)
+
+        # configure interface address and check identity mappings
+        self.pg7.config_ip4()
+        identity_mappings = self.vapi.nat44_identity_mapping_dump()
+        self.assertEqual(1, len(identity_mappings))
+        self.assertEqual(identity_mappings[0].ip_address,
+                         self.pg7.local_ip4n)
+        self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
+        self.assertEqual(port, identity_mappings[0].port)
+        self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
+
+        # remove interface address and check identity mappings
+        self.pg7.unconfig_ip4()
+        identity_mappings = self.vapi.nat44_identity_mapping_dump()
+        self.assertEqual(0, len(identity_mappings))
+
     def test_ipfix_nat44_sess(self):
         """ IPFIX logging NAT44 session created/delted """
         self.ipfix_domain_id = 10
@@ -1986,7 +2056,7 @@ class TestNAT44(MethodHolder):
         nat_ip2 = "10.0.0.11"
 
         self.nat44_add_address(nat_ip1)
-        self.nat44_add_address(nat_ip2)
+        self.nat44_add_address(nat_ip2, vrf_id=99)
         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
@@ -2762,7 +2832,7 @@ class TestNAT44(MethodHolder):
         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
                                                   is_inside=0)
-        self.vapi.cli("nat44 addr-port-assignment-alg map-e psid 10 "
+        self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
                       "psid-offset 6 psid-len 6")
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
@@ -2792,7 +2862,7 @@ class TestNAT44(MethodHolder):
         if not self.vpp_dead:
             self.logger.info(self.vapi.cli("show nat44 verbose"))
             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
-            self.vapi.cli("nat44 addr-port-assignment-alg default")
+            self.vapi.cli("nat addr-port-assignment-alg default")
             self.clear_nat44()
 
 
@@ -4608,6 +4678,20 @@ class TestDSlite(MethodHolder):
         self.check_ip_checksum(capture)
         self.check_icmp_checksum(capture)
 
+        # ping DS-Lite AFTR tunnel endpoint address
+        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+             IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
+             ICMPv6EchoRequest())
+        self.pg1.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(1)
+        self.assertEqual(1, len(capture))
+        capture = capture[0]
+        self.assertEqual(capture[IPv6].src, aftr_ip6)
+        self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
+        self.assertTrue(capture.haslayer(ICMPv6EchoReply))
+
     def tearDown(self):
         super(TestDSlite, self).tearDown()
         if not self.vpp_dead: