NAT64: custom prefix
[vpp.git] / test / test_snat.py
index 7544582..d826877 100644 (file)
@@ -157,16 +157,65 @@ class MethodHolder(VppTestCase):
 
         return pkts
 
-    def create_stream_in_ip6(self, in_if, out_if, hlim=64):
+    def compose_ip6(self, ip4, pref, plen):
+        """
+        Compose IPv4-embedded IPv6 addresses
+
+        :param ip4: IPv4 address
+        :param pref: IPv6 prefix
+        :param plen: IPv6 prefix length
+        :returns: IPv4-embedded IPv6 addresses
+        """
+        pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
+        ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
+        if plen == 32:
+            pref_n[4] = ip4_n[0]
+            pref_n[5] = ip4_n[1]
+            pref_n[6] = ip4_n[2]
+            pref_n[7] = ip4_n[3]
+        elif plen == 40:
+            pref_n[5] = ip4_n[0]
+            pref_n[6] = ip4_n[1]
+            pref_n[7] = ip4_n[2]
+            pref_n[9] = ip4_n[3]
+        elif plen == 48:
+            pref_n[6] = ip4_n[0]
+            pref_n[7] = ip4_n[1]
+            pref_n[9] = ip4_n[2]
+            pref_n[10] = ip4_n[3]
+        elif plen == 56:
+            pref_n[7] = ip4_n[0]
+            pref_n[9] = ip4_n[1]
+            pref_n[10] = ip4_n[2]
+            pref_n[11] = ip4_n[3]
+        elif plen == 64:
+            pref_n[9] = ip4_n[0]
+            pref_n[10] = ip4_n[1]
+            pref_n[11] = ip4_n[2]
+            pref_n[12] = ip4_n[3]
+        elif plen == 96:
+            pref_n[12] = ip4_n[0]
+            pref_n[13] = ip4_n[1]
+            pref_n[14] = ip4_n[2]
+            pref_n[15] = ip4_n[3]
+        return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+
+    def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
         """
         Create IPv6 packet stream for inside network
 
         :param in_if: Inside interface
         :param out_if: Outside interface
         :param ttl: Hop Limit of generated packets
+        :param pref: NAT64 prefix
+        :param plen: NAT64 prefix length
         """
         pkts = []
-        dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+        if pref is None:
+            dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+        else:
+            dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
+
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
@@ -3101,6 +3150,84 @@ class TestNAT64(MethodHolder):
                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
                 raise
 
+    def test_prefix(self):
+        """ NAT64 Network-Specific Prefix """
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+        self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
+                                                self.vrf1_nat_addr_n,
+                                                vrf_id=self.vrf1_id)
+        self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
+
+        # Add global prefix
+        global_pref64 = "2001:db8::"
+        global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
+        global_pref64_len = 32
+        self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
+
+        prefix = self.vapi.nat64_prefix_dump()
+        self.assertEqual(len(prefix), 1)
+        self.assertEqual(prefix[0].prefix, global_pref64_n)
+        self.assertEqual(prefix[0].prefix_len, global_pref64_len)
+        self.assertEqual(prefix[0].vrf_id, 0)
+
+        # Add tenant specific prefix
+        vrf1_pref64 = "2001:db8:122:300::"
+        vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
+        vrf1_pref64_len = 56
+        self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
+                                       vrf1_pref64_len,
+                                       vrf_id=self.vrf1_id)
+        prefix = self.vapi.nat64_prefix_dump()
+        self.assertEqual(len(prefix), 2)
+
+        # Global prefix
+        pkts = self.create_stream_in_ip6(self.pg0,
+                                         self.pg1,
+                                         pref=global_pref64,
+                                         plen=global_pref64_len)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(len(pkts))
+        self.verify_capture_out(capture, nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        dst_ip = self.compose_ip6(self.pg1.remote_ip4,
+                                  global_pref64,
+                                  global_pref64_len)
+        self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
+
+        # Tenant specific prefix
+        pkts = self.create_stream_in_ip6(self.pg2,
+                                         self.pg1,
+                                         pref=vrf1_pref64,
+                                         plen=vrf1_pref64_len)
+        self.pg2.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(len(pkts))
+        self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg2.get_capture(len(pkts))
+        dst_ip = self.compose_ip6(self.pg1.remote_ip4,
+                                  vrf1_pref64,
+                                  vrf1_pref64_len)
+        self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
+
     def nat64_get_ses_num(self):
         """
         Return number of active NAT64 sessions.
@@ -3166,11 +3293,19 @@ class TestNAT64(MethodHolder):
                                                     vrf_id=addr.vrf_id,
                                                     is_add=0)
 
+        prefixes = self.vapi.nat64_prefix_dump()
+        for prefix in prefixes:
+            self.vapi.nat64_add_del_prefix(prefix.prefix,
+                                           prefix.prefix_len,
+                                           vrf_id=prefix.vrf_id,
+                                           is_add=0)
+
     def tearDown(self):
         super(TestNAT64, self).tearDown()
         if not self.vpp_dead:
             self.logger.info(self.vapi.cli("show nat64 pool"))
             self.logger.info(self.vapi.cli("show nat64 interfaces"))
+            self.logger.info(self.vapi.cli("show nat64 prefix"))
             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
             self.logger.info(self.vapi.cli("show nat64 bib udp"))
             self.logger.info(self.vapi.cli("show nat64 bib icmp"))