P2P Ethernet - API
[vpp.git] / test / test_snat.py
index f90d906..64fa305 100644 (file)
@@ -7,8 +7,10 @@ import struct
 from framework import VppTestCase, VppTestRunner, running_extended_tests
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from framework import VppTestCase, VppTestRunner, running_extended_tests
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
 from scapy.layers.l2 import Ether, ARP
 from scapy.data import IP_PROTOS
 from scapy.layers.l2 import Ether, ARP
 from scapy.data import IP_PROTOS
+from scapy.packet import bind_layers
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
 from time import sleep
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
 from time import sleep
@@ -36,13 +38,13 @@ class MethodHolder(VppTestCase):
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             TCP(sport=self.tcp_port_in))
+             TCP(sport=self.tcp_port_in, dport=20))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             UDP(sport=self.udp_port_in))
+             UDP(sport=self.udp_port_in, dport=20))
         pkts.append(p)
 
         # ICMP
         pkts.append(p)
 
         # ICMP
@@ -53,6 +55,36 @@ class MethodHolder(VppTestCase):
 
         return pkts
 
 
         return pkts
 
+    def create_stream_in_ip6(self, in_if, out_if, hlim=64):
+        """
+        Create IPv6 packet stream for inside network
+
+        :param in_if: Inside interface
+        :param out_if: Outside interface
+        :param ttl: Hop Limit of generated packets
+        """
+        pkts = []
+        dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+        # TCP
+        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+             TCP(sport=self.tcp_port_in, dport=20))
+        pkts.append(p)
+
+        # UDP
+        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+             UDP(sport=self.udp_port_in, dport=20))
+        pkts.append(p)
+
+        # ICMP
+        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+             ICMPv6EchoRequest(id=self.icmp_id_in))
+        pkts.append(p)
+
+        return pkts
+
     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
         """
         Create packet stream for outside network
     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
         """
         Create packet stream for outside network
@@ -67,13 +99,13 @@ class MethodHolder(VppTestCase):
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             TCP(dport=self.tcp_port_out))
+             TCP(dport=self.tcp_port_out, sport=20))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             UDP(dport=self.udp_port_out))
+             UDP(dport=self.udp_port_out, sport=20))
         pkts.append(p)
 
         # ICMP
         pkts.append(p)
 
         # ICMP
@@ -85,7 +117,7 @@ class MethodHolder(VppTestCase):
         return pkts
 
     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
         return pkts
 
     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
-                           packet_num=3):
+                           packet_num=3, dst_ip=None):
         """
         Verify captured packets on outside network
 
         """
         Verify captured packets on outside network
 
@@ -93,6 +125,7 @@ class MethodHolder(VppTestCase):
         :param nat_ip: Translated IP address (Default use global SNAT address)
         :param same_port: Sorce port number is not translated (Default False)
         :param packet_num: Expected number of packets (Default 3)
         :param nat_ip: Translated IP address (Default use global SNAT address)
         :param same_port: Sorce port number is not translated (Default False)
         :param packet_num: Expected number of packets (Default 3)
+        :param dst_ip: Destination IP address (Default do not verify)
         """
         if nat_ip is None:
             nat_ip = self.snat_addr
         """
         if nat_ip is None:
             nat_ip = self.snat_addr
@@ -100,6 +133,8 @@ class MethodHolder(VppTestCase):
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
+                if dst_ip is not None:
+                    self.assertEqual(packet[IP].dst, dst_ip)
                 if packet.haslayer(TCP):
                     if same_port:
                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
                 if packet.haslayer(TCP):
                     if same_port:
                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
@@ -148,6 +183,32 @@ class MethodHolder(VppTestCase):
                                       "(inside network):", packet))
                 raise
 
                                       "(inside network):", packet))
                 raise
 
+    def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
+        """
+        Verify captured IPv6 packets on inside network
+
+        :param capture: Captured packets
+        :param src_ip: Source IP
+        :param dst_ip: Destination IP address
+        :param packet_num: Expected number of packets (Default 3)
+        """
+        self.assertEqual(packet_num, len(capture))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IPv6].src, src_ip)
+                self.assertEqual(packet[IPv6].dst, dst_ip)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                else:
+                    self.assertEqual(packet[ICMPv6EchoReply].id,
+                                     self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet "
+                                      "(inside network):", packet))
+                raise
+
     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
         """
         Verify captured packet that don't have to be translated
     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
         """
         Verify captured packet that don't have to be translated
@@ -312,8 +373,10 @@ class TestSNAT(MethodHolder):
             cls.icmp_id_in = 6305
             cls.icmp_id_out = 6305
             cls.snat_addr = '10.0.0.3'
             cls.icmp_id_in = 6305
             cls.icmp_id_out = 6305
             cls.snat_addr = '10.0.0.3'
+            cls.ipfix_src_port = 4739
+            cls.ipfix_domain_id = 1
 
 
-            cls.create_pg_interfaces(range(8))
+            cls.create_pg_interfaces(range(9))
             cls.interfaces = list(cls.pg_interfaces[0:4])
 
             for i in cls.interfaces:
             cls.interfaces = list(cls.pg_interfaces[0:4])
 
             for i in cls.interfaces:
@@ -321,7 +384,7 @@ class TestSNAT(MethodHolder):
                 i.config_ip4()
                 i.resolve_arp()
 
                 i.config_ip4()
                 i.resolve_arp()
 
-            cls.pg0.generate_remote_hosts(2)
+            cls.pg0.generate_remote_hosts(3)
             cls.pg0.configure_ipv4_neighbors()
 
             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
             cls.pg0.configure_ipv4_neighbors()
 
             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
@@ -344,6 +407,7 @@ class TestSNAT(MethodHolder):
                 i.resolve_arp()
 
             cls.pg7.admin_up()
                 i.resolve_arp()
 
             cls.pg7.admin_up()
+            cls.pg8.admin_up()
 
         except Exception:
             super(TestSNAT, cls).tearDownClass()
 
         except Exception:
             super(TestSNAT, cls).tearDownClass()
@@ -353,6 +417,26 @@ class TestSNAT(MethodHolder):
         """
         Clear SNAT configuration.
         """
         """
         Clear SNAT configuration.
         """
+        # I found no elegant way to do this
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index,
+                                   is_add=0)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index,
+                                   is_add=0)
+
+        for intf in [self.pg7, self.pg8]:
+            neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
+            for n in neighbors:
+                self.vapi.ip_neighbor_add_del(intf.sw_if_index,
+                                              n.mac_address,
+                                              n.ip_address,
+                                              is_add=0)
+
         if self.pg7.has_ip4_config:
             self.pg7.unconfig_ip4()
 
         if self.pg7.has_ip4_config:
             self.pg7.unconfig_ip4()
 
@@ -360,7 +444,10 @@ class TestSNAT(MethodHolder):
         for intf in interfaces:
             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
 
         for intf in interfaces:
             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
 
-        self.vapi.snat_ipfix(enable=0)
+        self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
+                             domain_id=self.ipfix_domain_id)
+        self.ipfix_src_port = 4739
+        self.ipfix_domain_id = 1
 
         interfaces = self.vapi.snat_interface_dump()
         for intf in interfaces:
 
         interfaces = self.vapi.snat_interface_dump()
         for intf in interfaces:
@@ -989,7 +1076,7 @@ class TestSNAT(MethodHolder):
                              self.icmp_id_in])
 
     def test_hairpinning(self):
                              self.icmp_id_in])
 
     def test_hairpinning(self):
-        """ SNAT hairpinning """
+        """ SNAT hairpinning - 1:1 NAT with port"""
 
         host = self.pg0.remote_hosts[0]
         server = self.pg0.remote_hosts[1]
 
         host = self.pg0.remote_hosts[0]
         server = self.pg0.remote_hosts[1]
@@ -1048,6 +1135,168 @@ class TestSNAT(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
             self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
+    def test_hairpinning2(self):
+        """ SNAT hairpinning - 1:1 NAT"""
+
+        server1_nat_ip = "10.0.0.10"
+        server2_nat_ip = "10.0.0.11"
+        host = self.pg0.remote_hosts[0]
+        server1 = self.pg0.remote_hosts[1]
+        server2 = self.pg0.remote_hosts[2]
+        server_tcp_port = 22
+        server_udp_port = 20
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+                                                 is_inside=0)
+
+        # add static mapping for servers
+        self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
+        self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
+
+        # host to server1
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             UDP(sport=self.udp_port_in, dport=server_udp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             ICMP(id=self.icmp_id_in, type='echo-request'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, self.snat_addr)
+                self.assertEqual(packet[IP].dst, server1.ip4)
+                if packet.haslayer(TCP):
+                    self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].dport, server_tcp_port)
+                    self.tcp_port_out = packet[TCP].sport
+                elif packet.haslayer(UDP):
+                    self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].dport, server_udp_port)
+                    self.udp_port_out = packet[UDP].sport
+                else:
+                    self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
+                    self.icmp_id_out = packet[ICMP].id
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server1 to host
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             UDP(sport=server_udp_port, dport=self.udp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             ICMP(id=self.icmp_id_out, type='echo-reply'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server1_nat_ip)
+                self.assertEqual(packet[IP].dst, host.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].sport, server_tcp_port)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].sport, server_udp_port)
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server2 to server1
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             UDP(sport=self.udp_port_in, dport=server_udp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             ICMP(id=self.icmp_id_in, type='echo-request'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server2_nat_ip)
+                self.assertEqual(packet[IP].dst, server1.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].dport, server_tcp_port)
+                    self.tcp_port_out = packet[TCP].sport
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].dport, server_udp_port)
+                    self.udp_port_out = packet[UDP].sport
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+                    self.icmp_id_out = packet[ICMP].id
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server1 to server2
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             UDP(sport=server_udp_port, dport=self.udp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             ICMP(id=self.icmp_id_out, type='echo-reply'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server1_nat_ip)
+                self.assertEqual(packet[IP].dst, server2.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].sport, server_tcp_port)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].sport, server_udp_port)
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
     def test_max_translations_per_user(self):
         """ MAX translations per user - recycle the least recently used """
 
     def test_max_translations_per_user(self):
         """ MAX translations per user - recycle the least recently used """
 
@@ -1120,6 +1369,10 @@ class TestSNAT(MethodHolder):
 
     def test_ipfix_nat44_sess(self):
         """ S-NAT IPFIX logging NAT44 session created/delted """
 
     def test_ipfix_nat44_sess(self):
         """ S-NAT IPFIX logging NAT44 session created/delted """
+        self.ipfix_domain_id = 10
+        self.ipfix_src_port = 20202
+        colector_port = 30303
+        bind_layers(UDP, IPFIX, dport=30303)
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
@@ -1127,8 +1380,10 @@ class TestSNAT(MethodHolder):
         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
-                                     template_interval=10)
-        self.vapi.snat_ipfix()
+                                     template_interval=10,
+                                     collector_port=colector_port)
+        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+                             src_port=self.ipfix_src_port)
 
         pkts = self.create_stream_in(self.pg0, self.pg1)
         self.pg0.add_stream(pkts)
 
         pkts = self.create_stream_in(self.pg0, self.pg1)
         self.pg0.add_stream(pkts)
@@ -1143,6 +1398,12 @@ class TestSNAT(MethodHolder):
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
+            self.assertEqual(p[IP].src, self.pg3.local_ip4)
+            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+            self.assertEqual(p[UDP].dport, colector_port)
+            self.assertEqual(p[IPFIX].observationDomainID,
+                             self.ipfix_domain_id)
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
@@ -1160,7 +1421,8 @@ class TestSNAT(MethodHolder):
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
                                      template_interval=10)
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
                                      template_interval=10)
-        self.vapi.snat_ipfix()
+        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+                             src_port=self.ipfix_src_port)
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
@@ -1175,6 +1437,12 @@ class TestSNAT(MethodHolder):
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
+            self.assertEqual(p[IP].src, self.pg3.local_ip4)
+            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+            self.assertEqual(p[UDP].dport, 4739)
+            self.assertEqual(p[IPFIX].observationDomainID,
+                             self.ipfix_domain_id)
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
@@ -1311,6 +1579,145 @@ class TestSNAT(MethodHolder):
         capture = self.pg2.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip1)
 
         capture = self.pg2.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip1)
 
+    def test_dynamic_ipless_interfaces(self):
+        """ SNAT interfaces without configured ip dynamic map """
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8, self.snat_addr)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+    def test_static_ipless_interfaces(self):
+        """ SNAT 1:1 NAT interfaces without configured ip """
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture, self.snat_addr, True)
+
+    def test_static_with_port_ipless_interfaces(self):
+        """ SNAT 1:1 NAT with port interfaces without configured ip """
+
+        self.tcp_port_out = 30606
+        self.udp_port_out = 30607
+        self.icmp_id_out = 30608
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_address(self.snat_addr)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.tcp_port_in, self.tcp_port_out,
+                                     proto=IP_PROTOS.tcp)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.udp_port_in, self.udp_port_out,
+                                     proto=IP_PROTOS.udp)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.icmp_id_in, self.icmp_id_out,
+                                     proto=IP_PROTOS.icmp)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture)
+
     def tearDown(self):
         super(TestSNAT, self).tearDown()
         if not self.vpp_dead:
     def tearDown(self):
         super(TestSNAT, self).tearDown()
         if not self.vpp_dead:
@@ -1895,12 +2302,24 @@ class TestDeterministicNAT(MethodHolder):
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-             UDP(sport=3000, dport=3000))
+             UDP(sport=3001, dport=3002))
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.assert_nothing_captured()
 
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.assert_nothing_captured()
 
+        # verify ICMP error packet
+        capture = self.pg0.get_capture(1)
+        p = capture[0]
+        self.assertTrue(p.haslayer(ICMP))
+        icmp = p[ICMP]
+        self.assertEqual(icmp.type, 3)
+        self.assertEqual(icmp.code, 1)
+        self.assertTrue(icmp.haslayer(IPerror))
+        inner_ip = icmp[IPerror]
+        self.assertEqual(inner_ip[UDPerror].sport, 3001)
+        self.assertEqual(inner_ip[UDPerror].dport, 3002)
+
         dms = self.vapi.snat_det_map_dump()
 
         self.assertEqual(1000, dms[0].ses_num)
         dms = self.vapi.snat_det_map_dump()
 
         self.assertEqual(1000, dms[0].ses_num)
@@ -1946,5 +2365,352 @@ class TestDeterministicNAT(MethodHolder):
             self.logger.info(self.vapi.cli("show snat detail"))
             self.clear_snat()
 
             self.logger.info(self.vapi.cli("show snat detail"))
             self.clear_snat()
 
+
+class TestNAT64(MethodHolder):
+    """ NAT64 Test Cases """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestNAT64, cls).setUpClass()
+
+        try:
+            cls.tcp_port_in = 6303
+            cls.tcp_port_out = 6303
+            cls.udp_port_in = 6304
+            cls.udp_port_out = 6304
+            cls.icmp_id_in = 6305
+            cls.icmp_id_out = 6305
+            cls.nat_addr = '10.0.0.3'
+            cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+
+            cls.create_pg_interfaces(range(2))
+            cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
+            cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
+
+            for i in cls.ip6_interfaces:
+                i.admin_up()
+                i.config_ip6()
+                i.resolve_ndp()
+
+            for i in cls.ip4_interfaces:
+                i.admin_up()
+                i.config_ip4()
+                i.resolve_arp()
+
+        except Exception:
+            super(TestNAT64, cls).tearDownClass()
+            raise
+
+    def test_pool(self):
+        """ Add/delete address to NAT64 pool """
+        nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
+
+        self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
+
+        addresses = self.vapi.nat64_pool_addr_dump()
+        self.assertEqual(len(addresses), 1)
+        self.assertEqual(addresses[0].address, nat_addr)
+
+        self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
+
+        addresses = self.vapi.nat64_pool_addr_dump()
+        self.assertEqual(len(addresses), 0)
+
+    def test_interface(self):
+        """ Enable/disable NAT64 feature on the interface """
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        interfaces = self.vapi.nat64_interface_dump()
+        self.assertEqual(len(interfaces), 2)
+        pg0_found = False
+        pg1_found = False
+        for intf in interfaces:
+            if intf.sw_if_index == self.pg0.sw_if_index:
+                self.assertEqual(intf.is_inside, 1)
+                pg0_found = True
+            elif intf.sw_if_index == self.pg1.sw_if_index:
+                self.assertEqual(intf.is_inside, 0)
+                pg1_found = True
+        self.assertTrue(pg0_found)
+        self.assertTrue(pg1_found)
+
+        features = self.vapi.cli("show interface features pg0")
+        self.assertNotEqual(features.find('nat64-in2out'), -1)
+        features = self.vapi.cli("show interface features pg1")
+        self.assertNotEqual(features.find('nat64-out2in'), -1)
+
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
+
+        interfaces = self.vapi.nat64_interface_dump()
+        self.assertEqual(len(interfaces), 0)
+
+    def test_static_bib(self):
+        """ Add/delete static BIB entry """
+        in_addr = socket.inet_pton(socket.AF_INET6,
+                                   '2001:db8:85a3::8a2e:370:7334')
+        out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
+        in_port = 1234
+        out_port = 5678
+        proto = IP_PROTOS.tcp
+
+        self.vapi.nat64_add_del_static_bib(in_addr,
+                                           out_addr,
+                                           in_port,
+                                           out_port,
+                                           proto)
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+        static_bib_num = 0
+        for bibe in bib:
+            if bibe.is_static:
+                static_bib_num += 1
+                self.assertEqual(bibe.i_addr, in_addr)
+                self.assertEqual(bibe.o_addr, out_addr)
+                self.assertEqual(bibe.i_port, in_port)
+                self.assertEqual(bibe.o_port, out_port)
+        self.assertEqual(static_bib_num, 1)
+
+        self.vapi.nat64_add_del_static_bib(in_addr,
+                                           out_addr,
+                                           in_port,
+                                           out_port,
+                                           proto,
+                                           is_add=0)
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+        static_bib_num = 0
+        for bibe in bib:
+            if bibe.is_static:
+                static_bib_num += 1
+        self.assertEqual(static_bib_num, 0)
+
+    def test_set_timeouts(self):
+        """ Set NAT64 timeouts """
+        # verify default values
+        timeouts = self.vapi.nat64_get_timeouts()
+        self.assertEqual(timeouts.udp, 300)
+        self.assertEqual(timeouts.icmp, 60)
+        self.assertEqual(timeouts.tcp_trans, 240)
+        self.assertEqual(timeouts.tcp_est, 7440)
+        self.assertEqual(timeouts.tcp_incoming_syn, 6)
+
+        # set and verify custom values
+        self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
+                                     tcp_est=7450, tcp_incoming_syn=10)
+        timeouts = self.vapi.nat64_get_timeouts()
+        self.assertEqual(timeouts.udp, 200)
+        self.assertEqual(timeouts.icmp, 30)
+        self.assertEqual(timeouts.tcp_trans, 250)
+        self.assertEqual(timeouts.tcp_est, 7450)
+        self.assertEqual(timeouts.tcp_incoming_syn, 10)
+
+    def test_dynamic(self):
+        """ NAT64 dynamic translation test """
+        self.tcp_port_in = 6303
+        self.udp_port_in = 6304
+        self.icmp_id_in = 6305
+
+        ses_num_start = self.nat64_get_ses_num()
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        # in2out
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(3)
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+        # in2out
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(3)
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+        ses_num_end = self.nat64_get_ses_num()
+
+        self.assertEqual(ses_num_end - ses_num_start, 3)
+
+    def test_static(self):
+        """ NAT64 static translation test """
+        self.tcp_port_in = 60303
+        self.udp_port_in = 60304
+        self.icmp_id_in = 60305
+        self.tcp_port_out = 60303
+        self.udp_port_out = 60304
+        self.icmp_id_out = 60305
+
+        ses_num_start = self.nat64_get_ses_num()
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+                                           self.nat_addr_n,
+                                           self.tcp_port_in,
+                                           self.tcp_port_out,
+                                           IP_PROTOS.tcp)
+        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+                                           self.nat_addr_n,
+                                           self.udp_port_in,
+                                           self.udp_port_out,
+                                           IP_PROTOS.udp)
+        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+                                           self.nat_addr_n,
+                                           self.icmp_id_in,
+                                           self.icmp_id_out,
+                                           IP_PROTOS.icmp)
+
+        # in2out
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4, same_port=True)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(3)
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+        ses_num_end = self.nat64_get_ses_num()
+
+        self.assertEqual(ses_num_end - ses_num_start, 3)
+
+    @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+    def test_session_timeout(self):
+        """ NAT64 session timeout """
+        self.icmp_id_in = 1234
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+        self.vapi.nat64_set_timeouts(icmp=5)
+
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+
+        ses_num_before_timeout = self.nat64_get_ses_num()
+
+        sleep(15)
+
+        # ICMP session after timeout
+        ses_num_after_timeout = self.nat64_get_ses_num()
+        self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
+
+    def nat64_get_ses_num(self):
+        """
+        Return number of active NAT64 sessions.
+        """
+        ses_num = 0
+        st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
+        ses_num += len(st)
+        st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
+        ses_num += len(st)
+        st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
+        ses_num += len(st)
+        return ses_num
+
+    def clear_nat64(self):
+        """
+        Clear NAT64 configuration.
+        """
+        self.vapi.nat64_set_timeouts()
+
+        interfaces = self.vapi.nat64_interface_dump()
+        for intf in interfaces:
+            self.vapi.nat64_add_del_interface(intf.sw_if_index,
+                                              intf.is_inside,
+                                              is_add=0)
+
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+        for bibe in bib:
+            if bibe.is_static:
+                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+                                                   bibe.o_addr,
+                                                   bibe.i_port,
+                                                   bibe.o_port,
+                                                   bibe.proto,
+                                                   bibe.vrf_id,
+                                                   is_add=0)
+
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
+        for bibe in bib:
+            if bibe.is_static:
+                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+                                                   bibe.o_addr,
+                                                   bibe.i_port,
+                                                   bibe.o_port,
+                                                   bibe.proto,
+                                                   bibe.vrf_id,
+                                                   is_add=0)
+
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
+        for bibe in bib:
+            if bibe.is_static:
+                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+                                                   bibe.o_addr,
+                                                   bibe.i_port,
+                                                   bibe.o_port,
+                                                   bibe.proto,
+                                                   bibe.vrf_id,
+                                                   is_add=0)
+
+        adresses = self.vapi.nat64_pool_addr_dump()
+        for addr in adresses:
+            self.vapi.nat64_add_del_pool_addr_range(addr.address,
+                                                    addr.address,
+                                                    is_add=0)
+
+    def tearDown(self):
+        super(TestNAT64, self).tearDown()
+        if not self.vpp_dead:
+            self.logger.info(self.vapi.cli("show nat64 pool"))
+            self.logger.info(self.vapi.cli("show nat64 interfaces"))
+            self.logger.info(self.vapi.cli("show nat64 bib tcp"))
+            self.logger.info(self.vapi.cli("show nat64 bib udp"))
+            self.logger.info(self.vapi.cli("show nat64 bib icmp"))
+            self.logger.info(self.vapi.cli("show nat64 session table tcp"))
+            self.logger.info(self.vapi.cli("show nat64 session table udp"))
+            self.logger.info(self.vapi.cli("show nat64 session table icmp"))
+            self.clear_nat64()
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)