Add support for 464XLAT NAT44 mode (VPP-1045)
[vpp.git] / test / test_nat.py
index aeeb5aa..4ced0af 100644 (file)
@@ -134,30 +134,34 @@ class MethodHolder(VppTestCase):
             self.assertEqual(new['ICMPv6EchoReply'].cksum,
                              pkt['ICMPv6EchoReply'].cksum)
 
-    def create_stream_in(self, in_if, out_if, ttl=64):
+    def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
         """
         Create packet stream for inside network
 
         :param in_if: Inside interface
         :param out_if: Outside interface
+        :param dst_ip: Destination address
         :param ttl: TTL of generated packets
         """
+        if dst_ip is None:
+            dst_ip = out_if.remote_ip4
+
         pkts = []
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
-             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
              TCP(sport=self.tcp_port_in, dport=20))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
-             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
              UDP(sport=self.udp_port_in, dport=20))
         pkts.append(p)
 
         # ICMP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
-             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+             IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
              ICMP(id=self.icmp_id_in, type='echo-request'))
         pkts.append(p)
 
@@ -206,6 +210,48 @@ class MethodHolder(VppTestCase):
             pref_n[15] = ip4_n[3]
         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
 
+    def extract_ip4(self, ip6, plen):
+        """
+        Extract IPv4 address embedded in IPv6 addresses
+
+        :param ip6: IPv6 address
+        :param plen: IPv6 prefix length
+        :returns: extracted IPv4 address
+        """
+        ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
+        ip4_n = [None] * 4
+        if plen == 32:
+            ip4_n[0] = ip6_n[4]
+            ip4_n[1] = ip6_n[5]
+            ip4_n[2] = ip6_n[6]
+            ip4_n[3] = ip6_n[7]
+        elif plen == 40:
+            ip4_n[0] = ip6_n[5]
+            ip4_n[1] = ip6_n[6]
+            ip4_n[2] = ip6_n[7]
+            ip4_n[3] = ip6_n[9]
+        elif plen == 48:
+            ip4_n[0] = ip6_n[6]
+            ip4_n[1] = ip6_n[7]
+            ip4_n[2] = ip6_n[9]
+            ip4_n[3] = ip6_n[10]
+        elif plen == 56:
+            ip4_n[0] = ip6_n[7]
+            ip4_n[1] = ip6_n[9]
+            ip4_n[2] = ip6_n[10]
+            ip4_n[3] = ip6_n[11]
+        elif plen == 64:
+            ip4_n[0] = ip6_n[9]
+            ip4_n[1] = ip6_n[10]
+            ip4_n[2] = ip6_n[11]
+            ip4_n[3] = ip6_n[12]
+        elif plen == 96:
+            ip4_n[0] = ip6_n[12]
+            ip4_n[1] = ip6_n[13]
+            ip4_n[2] = ip6_n[14]
+            ip4_n[3] = ip6_n[15]
+        return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
+
     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
         """
         Create IPv6 packet stream for inside network
@@ -284,8 +330,37 @@ class MethodHolder(VppTestCase):
 
         return pkts
 
+    def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
+        """
+        Create packet stream for outside network
+
+        :param out_if: Outside interface
+        :param dst_ip: Destination IP address (Default use global NAT address)
+        :param hl: HL of generated packets
+        """
+        pkts = []
+        # TCP
+        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
+             TCP(dport=self.tcp_port_out, sport=20))
+        pkts.append(p)
+
+        # UDP
+        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
+             UDP(dport=self.udp_port_out, sport=20))
+        pkts.append(p)
+
+        # ICMP
+        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+             IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
+             ICMPv6EchoReply(id=self.icmp_id_out))
+        pkts.append(p)
+
+        return pkts
+
     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
-                           packet_num=3, dst_ip=None):
+                           packet_num=3, dst_ip=None, is_ip6=False):
         """
         Verify captured packets on outside network
 
@@ -294,16 +369,24 @@ class MethodHolder(VppTestCase):
         :param same_port: Sorce port number is not translated (Default False)
         :param packet_num: Expected number of packets (Default 3)
         :param dst_ip: Destination IP address (Default do not verify)
+        :param is_ip6: If L3 protocol is IPv6 (Default False)
         """
+        if is_ip6:
+            IP46 = IPv6
+            ICMP46 = ICMPv6EchoRequest
+        else:
+            IP46 = IP
+            ICMP46 = ICMP
         if nat_ip is None:
             nat_ip = self.nat_addr
         self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
-                self.check_ip_checksum(packet)
-                self.assertEqual(packet[IP].src, nat_ip)
+                if not is_ip6:
+                    self.check_ip_checksum(packet)
+                self.assertEqual(packet[IP46].src, nat_ip)
                 if dst_ip is not None:
-                    self.assertEqual(packet[IP].dst, dst_ip)
+                    self.assertEqual(packet[IP46].dst, dst_ip)
                 if packet.haslayer(TCP):
                     if same_port:
                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
@@ -321,16 +404,33 @@ class MethodHolder(VppTestCase):
                     self.udp_port_out = packet[UDP].sport
                 else:
                     if same_port:
-                        self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+                        self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
                     else:
-                        self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
-                    self.icmp_id_out = packet[ICMP].id
-                    self.check_icmp_checksum(packet)
+                        self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
+                    self.icmp_id_out = packet[ICMP46].id
+                    if is_ip6:
+                        self.check_icmpv6_checksum(packet)
+                    else:
+                        self.check_icmp_checksum(packet)
             except:
                 self.logger.error(ppp("Unexpected or invalid packet "
                                       "(outside network):", packet))
                 raise
 
+    def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
+                               packet_num=3, dst_ip=None):
+        """
+        Verify captured packets on outside network
+
+        :param capture: Captured packets
+        :param nat_ip: Translated IP address
+        :param same_port: Sorce port number is not translated (Default False)
+        :param packet_num: Expected number of packets (Default 3)
+        :param dst_ip: Destination IP address (Default do not verify)
+        """
+        return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
+                                       dst_ip, True)
+
     def verify_capture_in(self, capture, in_if, packet_num=3):
         """
         Verify captured packets on inside network
@@ -3105,6 +3205,123 @@ class TestNAT44(MethodHolder):
             self.clear_nat44()
 
 
+class TestNAT44Out2InDPO(MethodHolder):
+    """ NAT44 Test Cases using out2in DPO """
+
+    @classmethod
+    def setUpConstants(cls):
+        super(TestNAT44Out2InDPO, cls).setUpConstants()
+        cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestNAT44Out2InDPO, cls).setUpClass()
+
+        try:
+            cls.tcp_port_in = 6303
+            cls.tcp_port_out = 6303
+            cls.udp_port_in = 6304
+            cls.udp_port_out = 6304
+            cls.icmp_id_in = 6305
+            cls.icmp_id_out = 6305
+            cls.nat_addr = '10.0.0.3'
+            cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+            cls.dst_ip4 = '192.168.70.1'
+
+            cls.create_pg_interfaces(range(2))
+
+            cls.pg0.admin_up()
+            cls.pg0.config_ip4()
+            cls.pg0.resolve_arp()
+
+            cls.pg1.admin_up()
+            cls.pg1.config_ip6()
+            cls.pg1.resolve_ndp()
+
+            cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
+                                      dst_address_length=0,
+                                      next_hop_address=cls.pg1.remote_ip6n,
+                                      next_hop_sw_if_index=cls.pg1.sw_if_index)
+
+        except Exception:
+            super(TestNAT44Out2InDPO, cls).tearDownClass()
+            raise
+
+    def configure_xlat(self):
+        self.dst_ip6_pfx = '1:2:3::'
+        self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
+                                              self.dst_ip6_pfx)
+        self.dst_ip6_pfx_len = 96
+        self.src_ip6_pfx = '4:5:6::'
+        self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
+                                              self.src_ip6_pfx)
+        self.src_ip6_pfx_len = 96
+        self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
+                                 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
+                                 '\x00\x00\x00\x00', 0, is_translation=1,
+                                 is_rfc6052=1)
+
+    def test_464xlat_ce(self):
+        """ Test 464XLAT CE with NAT44 """
+
+        self.configure_xlat()
+
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
+
+        out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
+                                       self.dst_ip6_pfx_len)
+        out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
+                                       self.src_ip6_pfx_len)
+
+        try:
+            pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
+            self.pg0.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            capture = self.pg1.get_capture(len(pkts))
+            self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
+                                        dst_ip=out_src_ip6)
+
+            pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
+                                              out_dst_ip6)
+            self.pg1.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            capture = self.pg0.get_capture(len(pkts))
+            self.verify_capture_in(capture, self.pg0)
+        finally:
+            self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
+                                                      is_add=0)
+            self.vapi.nat44_add_del_address_range(self.nat_addr_n,
+                                                  self.nat_addr_n, is_add=0)
+
+    def test_464xlat_ce_no_nat(self):
+        """ Test 464XLAT CE without NAT44 """
+
+        self.configure_xlat()
+
+        out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
+                                       self.dst_ip6_pfx_len)
+        out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
+                                       self.src_ip6_pfx_len)
+
+        pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(len(pkts))
+        self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
+                                    nat_ip=out_dst_ip6, same_port=True)
+
+        pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg0)
+
+
 class TestDeterministicNAT(MethodHolder):
     """ Deterministic NAT Test Cases """