LISP-GPE: add test CLI for NSH
[vpp.git] / test / test_snat.py
index b455e62..c2f9280 100644 (file)
@@ -7,8 +7,11 @@ import struct
 from framework import VppTestCase, VppTestRunner, running_extended_tests
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from framework import VppTestCase, VppTestRunner, running_extended_tests
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
 from scapy.layers.l2 import Ether, ARP
 from scapy.data import IP_PROTOS
 from scapy.layers.l2 import Ether, ARP
 from scapy.data import IP_PROTOS
+from scapy.packet import bind_layers
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
 from time import sleep
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
 from time import sleep
@@ -24,6 +27,17 @@ class MethodHolder(VppTestCase):
     def tearDown(self):
         super(MethodHolder, self).tearDown()
 
     def tearDown(self):
         super(MethodHolder, self).tearDown()
 
+    def check_tcp_checksum(self, pkt):
+        """
+        Check TCP checksum in IP packet
+
+        :param pkt: Packet to check TCP checksum
+        """
+        new = pkt.__class__(str(pkt))
+        del new['TCP'].chksum
+        new = new.__class__(str(new))
+        self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
+
     def create_stream_in(self, in_if, out_if, ttl=64):
         """
         Create packet stream for inside network
     def create_stream_in(self, in_if, out_if, ttl=64):
         """
         Create packet stream for inside network
@@ -36,13 +50,13 @@ class MethodHolder(VppTestCase):
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             TCP(sport=self.tcp_port_in))
+             TCP(sport=self.tcp_port_in, dport=20))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             UDP(sport=self.udp_port_in))
+             UDP(sport=self.udp_port_in, dport=20))
         pkts.append(p)
 
         # ICMP
         pkts.append(p)
 
         # ICMP
@@ -53,6 +67,36 @@ class MethodHolder(VppTestCase):
 
         return pkts
 
 
         return pkts
 
+    def create_stream_in_ip6(self, in_if, out_if, hlim=64):
+        """
+        Create IPv6 packet stream for inside network
+
+        :param in_if: Inside interface
+        :param out_if: Outside interface
+        :param ttl: Hop Limit of generated packets
+        """
+        pkts = []
+        dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+        # TCP
+        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+             TCP(sport=self.tcp_port_in, dport=20))
+        pkts.append(p)
+
+        # UDP
+        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+             UDP(sport=self.udp_port_in, dport=20))
+        pkts.append(p)
+
+        # ICMP
+        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+             ICMPv6EchoRequest(id=self.icmp_id_in))
+        pkts.append(p)
+
+        return pkts
+
     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
         """
         Create packet stream for outside network
     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
         """
         Create packet stream for outside network
@@ -67,13 +111,13 @@ class MethodHolder(VppTestCase):
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             TCP(dport=self.tcp_port_out))
+             TCP(dport=self.tcp_port_out, sport=20))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             UDP(dport=self.udp_port_out))
+             UDP(dport=self.udp_port_out, sport=20))
         pkts.append(p)
 
         # ICMP
         pkts.append(p)
 
         # ICMP
@@ -85,7 +129,7 @@ class MethodHolder(VppTestCase):
         return pkts
 
     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
         return pkts
 
     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
-                           packet_num=3):
+                           packet_num=3, dst_ip=None):
         """
         Verify captured packets on outside network
 
         """
         Verify captured packets on outside network
 
@@ -93,6 +137,7 @@ class MethodHolder(VppTestCase):
         :param nat_ip: Translated IP address (Default use global SNAT address)
         :param same_port: Sorce port number is not translated (Default False)
         :param packet_num: Expected number of packets (Default 3)
         :param nat_ip: Translated IP address (Default use global SNAT address)
         :param same_port: Sorce port number is not translated (Default False)
         :param packet_num: Expected number of packets (Default 3)
+        :param dst_ip: Destination IP address (Default do not verify)
         """
         if nat_ip is None:
             nat_ip = self.snat_addr
         """
         if nat_ip is None:
             nat_ip = self.snat_addr
@@ -100,6 +145,8 @@ class MethodHolder(VppTestCase):
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
+                if dst_ip is not None:
+                    self.assertEqual(packet[IP].dst, dst_ip)
                 if packet.haslayer(TCP):
                     if same_port:
                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
                 if packet.haslayer(TCP):
                     if same_port:
                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
@@ -148,6 +195,32 @@ class MethodHolder(VppTestCase):
                                       "(inside network):", packet))
                 raise
 
                                       "(inside network):", packet))
                 raise
 
+    def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
+        """
+        Verify captured IPv6 packets on inside network
+
+        :param capture: Captured packets
+        :param src_ip: Source IP
+        :param dst_ip: Destination IP address
+        :param packet_num: Expected number of packets (Default 3)
+        """
+        self.assertEqual(packet_num, len(capture))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IPv6].src, src_ip)
+                self.assertEqual(packet[IPv6].dst, dst_ip)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                else:
+                    self.assertEqual(packet[ICMPv6EchoReply].id,
+                                     self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet "
+                                      "(inside network):", packet))
+                raise
+
     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
         """
         Verify captured packet that don't have to be translated
     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
         """
         Verify captured packet that don't have to be translated
@@ -312,8 +385,10 @@ class TestSNAT(MethodHolder):
             cls.icmp_id_in = 6305
             cls.icmp_id_out = 6305
             cls.snat_addr = '10.0.0.3'
             cls.icmp_id_in = 6305
             cls.icmp_id_out = 6305
             cls.snat_addr = '10.0.0.3'
+            cls.ipfix_src_port = 4739
+            cls.ipfix_domain_id = 1
 
 
-            cls.create_pg_interfaces(range(8))
+            cls.create_pg_interfaces(range(9))
             cls.interfaces = list(cls.pg_interfaces[0:4])
 
             for i in cls.interfaces:
             cls.interfaces = list(cls.pg_interfaces[0:4])
 
             for i in cls.interfaces:
@@ -321,7 +396,7 @@ class TestSNAT(MethodHolder):
                 i.config_ip4()
                 i.resolve_arp()
 
                 i.config_ip4()
                 i.resolve_arp()
 
-            cls.pg0.generate_remote_hosts(2)
+            cls.pg0.generate_remote_hosts(3)
             cls.pg0.configure_ipv4_neighbors()
 
             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
             cls.pg0.configure_ipv4_neighbors()
 
             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
@@ -344,6 +419,7 @@ class TestSNAT(MethodHolder):
                 i.resolve_arp()
 
             cls.pg7.admin_up()
                 i.resolve_arp()
 
             cls.pg7.admin_up()
+            cls.pg8.admin_up()
 
         except Exception:
             super(TestSNAT, cls).tearDownClass()
 
         except Exception:
             super(TestSNAT, cls).tearDownClass()
@@ -353,6 +429,26 @@ class TestSNAT(MethodHolder):
         """
         Clear SNAT configuration.
         """
         """
         Clear SNAT configuration.
         """
+        # I found no elegant way to do this
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index,
+                                   is_add=0)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index,
+                                   is_add=0)
+
+        for intf in [self.pg7, self.pg8]:
+            neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
+            for n in neighbors:
+                self.vapi.ip_neighbor_add_del(intf.sw_if_index,
+                                              n.mac_address,
+                                              n.ip_address,
+                                              is_add=0)
+
         if self.pg7.has_ip4_config:
             self.pg7.unconfig_ip4()
 
         if self.pg7.has_ip4_config:
             self.pg7.unconfig_ip4()
 
@@ -360,7 +456,10 @@ class TestSNAT(MethodHolder):
         for intf in interfaces:
             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
 
         for intf in interfaces:
             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
 
-        self.vapi.snat_ipfix(enable=0)
+        self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
+                             domain_id=self.ipfix_domain_id)
+        self.ipfix_src_port = 4739
+        self.ipfix_domain_id = 1
 
         interfaces = self.vapi.snat_interface_dump()
         for intf in interfaces:
 
         interfaces = self.vapi.snat_interface_dump()
         for intf in interfaces:
@@ -560,7 +659,7 @@ class TestSNAT(MethodHolder):
         self.verify_capture_out_with_icmp_errors(capture)
 
     def test_ping_out_interface_from_outside(self):
         self.verify_capture_out_with_icmp_errors(capture)
 
     def test_ping_out_interface_from_outside(self):
-        """ Ping SNAT out interface from outside """
+        """ Ping SNAT out interface from outside network """
 
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
 
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
@@ -587,6 +686,36 @@ class TestSNAT(MethodHolder):
                                   "(outside network):", packet))
             raise
 
                                   "(outside network):", packet))
             raise
 
+    def test_ping_internal_host_from_outside(self):
+        """ Ping internal host from outside network """
+
+        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+                                                 is_inside=0)
+
+        # out2in
+        pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+               IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
+               ICMP(id=self.icmp_id_out, type='echo-request'))
+        self.pg1.add_stream(pkt)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        self.verify_capture_in(capture, self.pg0, packet_num=1)
+        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
+
+        # in2out
+        pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+               IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
+               ICMP(id=self.icmp_id_in, type='echo-reply'))
+        self.pg0.add_stream(pkt)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(1)
+        self.verify_capture_out(capture, same_port=True, packet_num=1)
+        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
+
     def test_static_in(self):
         """ SNAT 1:1 NAT initialized from inside network """
 
     def test_static_in(self):
         """ SNAT 1:1 NAT initialized from inside network """
 
@@ -959,7 +1088,7 @@ class TestSNAT(MethodHolder):
                              self.icmp_id_in])
 
     def test_hairpinning(self):
                              self.icmp_id_in])
 
     def test_hairpinning(self):
-        """ SNAT hairpinning """
+        """ SNAT hairpinning - 1:1 NAT with port"""
 
         host = self.pg0.remote_hosts[0]
         server = self.pg0.remote_hosts[1]
 
         host = self.pg0.remote_hosts[0]
         server = self.pg0.remote_hosts[1]
@@ -993,6 +1122,7 @@ class TestSNAT(MethodHolder):
             self.assertEqual(ip.dst, server.ip4)
             self.assertNotEqual(tcp.sport, host_in_port)
             self.assertEqual(tcp.dport, server_in_port)
             self.assertEqual(ip.dst, server.ip4)
             self.assertNotEqual(tcp.sport, host_in_port)
             self.assertEqual(tcp.dport, server_in_port)
+            self.check_tcp_checksum(p)
             host_out_port = tcp.sport
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             host_out_port = tcp.sport
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
@@ -1014,10 +1144,177 @@ class TestSNAT(MethodHolder):
             self.assertEqual(ip.dst, host.ip4)
             self.assertEqual(tcp.sport, server_out_port)
             self.assertEqual(tcp.dport, host_in_port)
             self.assertEqual(ip.dst, host.ip4)
             self.assertEqual(tcp.sport, server_out_port)
             self.assertEqual(tcp.dport, host_in_port)
+            self.check_tcp_checksum(p)
         except:
             self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
         except:
             self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
+    def test_hairpinning2(self):
+        """ SNAT hairpinning - 1:1 NAT"""
+
+        server1_nat_ip = "10.0.0.10"
+        server2_nat_ip = "10.0.0.11"
+        host = self.pg0.remote_hosts[0]
+        server1 = self.pg0.remote_hosts[1]
+        server2 = self.pg0.remote_hosts[2]
+        server_tcp_port = 22
+        server_udp_port = 20
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+                                                 is_inside=0)
+
+        # add static mapping for servers
+        self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
+        self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
+
+        # host to server1
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             UDP(sport=self.udp_port_in, dport=server_udp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             ICMP(id=self.icmp_id_in, type='echo-request'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, self.snat_addr)
+                self.assertEqual(packet[IP].dst, server1.ip4)
+                if packet.haslayer(TCP):
+                    self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].dport, server_tcp_port)
+                    self.tcp_port_out = packet[TCP].sport
+                    self.check_tcp_checksum(packet)
+                elif packet.haslayer(UDP):
+                    self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].dport, server_udp_port)
+                    self.udp_port_out = packet[UDP].sport
+                else:
+                    self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
+                    self.icmp_id_out = packet[ICMP].id
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server1 to host
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             UDP(sport=server_udp_port, dport=self.udp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             ICMP(id=self.icmp_id_out, type='echo-reply'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server1_nat_ip)
+                self.assertEqual(packet[IP].dst, host.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].sport, server_tcp_port)
+                    self.check_tcp_checksum(packet)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].sport, server_udp_port)
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server2 to server1
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             UDP(sport=self.udp_port_in, dport=server_udp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             ICMP(id=self.icmp_id_in, type='echo-request'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server2_nat_ip)
+                self.assertEqual(packet[IP].dst, server1.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].dport, server_tcp_port)
+                    self.tcp_port_out = packet[TCP].sport
+                    self.check_tcp_checksum(packet)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].dport, server_udp_port)
+                    self.udp_port_out = packet[UDP].sport
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+                    self.icmp_id_out = packet[ICMP].id
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server1 to server2
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             UDP(sport=server_udp_port, dport=self.udp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             ICMP(id=self.icmp_id_out, type='echo-reply'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server1_nat_ip)
+                self.assertEqual(packet[IP].dst, server2.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].sport, server_tcp_port)
+                    self.check_tcp_checksum(packet)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].sport, server_udp_port)
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
     def test_max_translations_per_user(self):
         """ MAX translations per user - recycle the least recently used """
 
     def test_max_translations_per_user(self):
         """ MAX translations per user - recycle the least recently used """
 
@@ -1090,6 +1387,10 @@ class TestSNAT(MethodHolder):
 
     def test_ipfix_nat44_sess(self):
         """ S-NAT IPFIX logging NAT44 session created/delted """
 
     def test_ipfix_nat44_sess(self):
         """ S-NAT IPFIX logging NAT44 session created/delted """
+        self.ipfix_domain_id = 10
+        self.ipfix_src_port = 20202
+        colector_port = 30303
+        bind_layers(UDP, IPFIX, dport=30303)
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
@@ -1097,8 +1398,10 @@ class TestSNAT(MethodHolder):
         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
-                                     template_interval=10)
-        self.vapi.snat_ipfix()
+                                     template_interval=10,
+                                     collector_port=colector_port)
+        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+                             src_port=self.ipfix_src_port)
 
         pkts = self.create_stream_in(self.pg0, self.pg1)
         self.pg0.add_stream(pkts)
 
         pkts = self.create_stream_in(self.pg0, self.pg1)
         self.pg0.add_stream(pkts)
@@ -1113,6 +1416,12 @@ class TestSNAT(MethodHolder):
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
+            self.assertEqual(p[IP].src, self.pg3.local_ip4)
+            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+            self.assertEqual(p[UDP].dport, colector_port)
+            self.assertEqual(p[IPFIX].observationDomainID,
+                             self.ipfix_domain_id)
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
@@ -1130,7 +1439,8 @@ class TestSNAT(MethodHolder):
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
                                      template_interval=10)
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
                                      template_interval=10)
-        self.vapi.snat_ipfix()
+        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+                             src_port=self.ipfix_src_port)
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
@@ -1145,6 +1455,12 @@ class TestSNAT(MethodHolder):
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
+            self.assertEqual(p[IP].src, self.pg3.local_ip4)
+            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+            self.assertEqual(p[UDP].dport, 4739)
+            self.assertEqual(p[IPFIX].observationDomainID,
+                             self.ipfix_domain_id)
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
@@ -1281,6 +1597,145 @@ class TestSNAT(MethodHolder):
         capture = self.pg2.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip1)
 
         capture = self.pg2.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip1)
 
+    def test_dynamic_ipless_interfaces(self):
+        """ SNAT interfaces without configured ip dynamic map """
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8, self.snat_addr)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+    def test_static_ipless_interfaces(self):
+        """ SNAT 1:1 NAT interfaces without configured ip """
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture, self.snat_addr, True)
+
+    def test_static_with_port_ipless_interfaces(self):
+        """ SNAT 1:1 NAT with port interfaces without configured ip """
+
+        self.tcp_port_out = 30606
+        self.udp_port_out = 30607
+        self.icmp_id_out = 30608
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_address(self.snat_addr)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.tcp_port_in, self.tcp_port_out,
+                                     proto=IP_PROTOS.tcp)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.udp_port_in, self.udp_port_out,
+                                     proto=IP_PROTOS.udp)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.icmp_id_in, self.icmp_id_out,
+                                     proto=IP_PROTOS.icmp)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture)
+
     def tearDown(self):
         super(TestSNAT, self).tearDown()
         if not self.vpp_dead:
     def tearDown(self):
         super(TestSNAT, self).tearDown()
         if not self.vpp_dead:
@@ -1302,13 +1757,13 @@ class TestDeterministicNAT(MethodHolder):
 
         try:
             cls.tcp_port_in = 6303
 
         try:
             cls.tcp_port_in = 6303
-            cls.tcp_port_out = 6303
+            cls.tcp_external_port = 6303
             cls.udp_port_in = 6304
             cls.udp_port_in = 6304
-            cls.udp_port_out = 6304
+            cls.udp_external_port = 6304
             cls.icmp_id_in = 6305
             cls.snat_addr = '10.0.0.3'
 
             cls.icmp_id_in = 6305
             cls.snat_addr = '10.0.0.3'
 
-            cls.create_pg_interfaces(range(2))
+            cls.create_pg_interfaces(range(3))
             cls.interfaces = list(cls.pg_interfaces)
 
             for i in cls.interfaces:
             cls.interfaces = list(cls.pg_interfaces)
 
             for i in cls.interfaces:
@@ -1335,13 +1790,13 @@ class TestDeterministicNAT(MethodHolder):
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             TCP(sport=self.tcp_port_in, dport=self.tcp_port_out))
+             TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             UDP(sport=self.udp_port_in, dport=self.udp_port_out))
+             UDP(sport=self.udp_port_in, dport=self.udp_external_port))
         pkts.append(p)
 
         # ICMP
         pkts.append(p)
 
         # ICMP
@@ -1366,13 +1821,13 @@ class TestDeterministicNAT(MethodHolder):
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             TCP(dport=self.tcp_external_port, sport=self.tcp_port_out))
+             TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             UDP(dport=self.udp_external_port, sport=self.udp_port_out))
+             UDP(dport=self.udp_port_out, sport=self.udp_external_port))
         pkts.append(p)
 
         # ICMP
         pkts.append(p)
 
         # ICMP
@@ -1399,9 +1854,9 @@ class TestDeterministicNAT(MethodHolder):
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
                 if packet.haslayer(TCP):
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
                 if packet.haslayer(TCP):
-                    self.tcp_external_port = packet[TCP].sport
+                    self.tcp_port_out = packet[TCP].sport
                 elif packet.haslayer(UDP):
                 elif packet.haslayer(UDP):
-                    self.udp_external_port = packet[UDP].sport
+                    self.udp_port_out = packet[UDP].sport
                 else:
                     self.icmp_external_id = packet[ICMP].id
             except:
                 else:
                     self.icmp_external_id = packet[ICMP].id
             except:
@@ -1420,19 +1875,19 @@ class TestDeterministicNAT(MethodHolder):
             # SYN packet in->out
             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
             # SYN packet in->out
             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="S"))
             in_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
             self.pg_start()
             capture = out_if.get_capture(1)
             p = capture[0]
                      flags="S"))
             in_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
             self.pg_start()
             capture = out_if.get_capture(1)
             p = capture[0]
-            self.tcp_external_port = p[TCP].sport
+            self.tcp_port_out = p[TCP].sport
 
             # SYN + ACK packet out->in
             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
 
             # SYN + ACK packet out->in
             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="SA"))
             out_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
                      flags="SA"))
             out_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1442,7 +1897,7 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet in->out
             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
             # ACK packet in->out
             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="A"))
             in_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
                      flags="A"))
             in_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1453,6 +1908,21 @@ class TestDeterministicNAT(MethodHolder):
             self.logger.error("TCP 3 way handshake failed")
             raise
 
             self.logger.error("TCP 3 way handshake failed")
             raise
 
+    def verify_ipfix_max_entries_per_user(self, data):
+        """
+        Verify IPFIX maximum entries per user exceeded event
+
+        :param data: Decoded IPFIX data records
+        """
+        self.assertEqual(1, len(data))
+        record = data[0]
+        # natEvent
+        self.assertEqual(ord(record[230]), 13)
+        # natQuotaExceededEvent
+        self.assertEqual('\x03\x00\x00\x00', record[466])
+        # sourceIPv4Address
+        self.assertEqual(self.pg0.remote_ip4n, record[8])
+
     def test_deterministic_mode(self):
         """ S-NAT run deterministic mode """
         in_addr = '172.16.255.0'
     def test_deterministic_mode(self):
         """ S-NAT run deterministic mode """
         in_addr = '172.16.255.0'
@@ -1508,12 +1978,6 @@ class TestDeterministicNAT(MethodHolder):
         """ CGNAT translation test (TCP, UDP, ICMP) """
 
         nat_ip = "10.0.0.10"
         """ CGNAT translation test (TCP, UDP, ICMP) """
 
         nat_ip = "10.0.0.10"
-        self.tcp_port_out = 6303
-        self.udp_port_out = 6304
-        self.icmp_id_in = 6305
-        self.tcp_external_port = 7303
-        self.udp_external_port = 7304
-        self.icmp_id_out = 7305
 
         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                    32,
 
         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                    32,
@@ -1539,12 +2003,36 @@ class TestDeterministicNAT(MethodHolder):
         capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
         capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
+        # session dump test
+        sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
+        self.assertEqual(len(sessions), 3)
+
+        # TCP session
+        s = sessions[0]
+        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+        self.assertEqual(s.in_port, self.tcp_port_in)
+        self.assertEqual(s.out_port, self.tcp_port_out)
+        self.assertEqual(s.ext_port, self.tcp_external_port)
+
+        # UDP session
+        s = sessions[1]
+        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+        self.assertEqual(s.in_port, self.udp_port_in)
+        self.assertEqual(s.out_port, self.udp_port_out)
+        self.assertEqual(s.ext_port, self.udp_external_port)
+
+        # ICMP session
+        s = sessions[2]
+        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+        self.assertEqual(s.in_port, self.icmp_id_in)
+        self.assertEqual(s.out_port, self.icmp_external_id)
+
     def test_multiple_users(self):
         """ CGNAT multiple users """
 
         nat_ip = "10.0.0.10"
         port_in = 80
     def test_multiple_users(self):
         """ CGNAT multiple users """
 
         nat_ip = "10.0.0.10"
         port_in = 80
-        port_out = 6303
+        external_port = 6303
 
         host0 = self.pg0.remote_hosts[0]
         host1 = self.pg0.remote_hosts[1]
 
         host0 = self.pg0.remote_hosts[0]
         host1 = self.pg0.remote_hosts[1]
@@ -1560,7 +2048,7 @@ class TestDeterministicNAT(MethodHolder):
         # host0 to out
         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
         # host0 to out
         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
-             TCP(sport=port_in, dport=port_out))
+             TCP(sport=port_in, dport=external_port))
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1571,8 +2059,8 @@ class TestDeterministicNAT(MethodHolder):
             tcp = p[TCP]
             self.assertEqual(ip.src, nat_ip)
             self.assertEqual(ip.dst, self.pg1.remote_ip4)
             tcp = p[TCP]
             self.assertEqual(ip.src, nat_ip)
             self.assertEqual(ip.dst, self.pg1.remote_ip4)
-            self.assertEqual(tcp.dport, port_out)
-            external_port0 = tcp.sport
+            self.assertEqual(tcp.dport, external_port)
+            port_out0 = tcp.sport
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
@@ -1580,7 +2068,7 @@ class TestDeterministicNAT(MethodHolder):
         # host1 to out
         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
         # host1 to out
         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
-             TCP(sport=port_in, dport=port_out))
+             TCP(sport=port_in, dport=external_port))
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1591,8 +2079,8 @@ class TestDeterministicNAT(MethodHolder):
             tcp = p[TCP]
             self.assertEqual(ip.src, nat_ip)
             self.assertEqual(ip.dst, self.pg1.remote_ip4)
             tcp = p[TCP]
             self.assertEqual(ip.src, nat_ip)
             self.assertEqual(ip.dst, self.pg1.remote_ip4)
-            self.assertEqual(tcp.dport, port_out)
-            external_port1 = tcp.sport
+            self.assertEqual(tcp.dport, external_port)
+            port_out1 = tcp.sport
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
@@ -1604,7 +2092,7 @@ class TestDeterministicNAT(MethodHolder):
         # out to host0
         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
         # out to host0
         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
-             TCP(sport=port_out, dport=external_port0))
+             TCP(sport=external_port, dport=port_out0))
         self.pg1.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.pg1.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1616,7 +2104,7 @@ class TestDeterministicNAT(MethodHolder):
             self.assertEqual(ip.src, self.pg1.remote_ip4)
             self.assertEqual(ip.dst, host0.ip4)
             self.assertEqual(tcp.dport, port_in)
             self.assertEqual(ip.src, self.pg1.remote_ip4)
             self.assertEqual(ip.dst, host0.ip4)
             self.assertEqual(tcp.dport, port_in)
-            self.assertEqual(tcp.sport, port_out)
+            self.assertEqual(tcp.sport, external_port)
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
@@ -1624,7 +2112,7 @@ class TestDeterministicNAT(MethodHolder):
         # out to host1
         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
         # out to host1
         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
-             TCP(sport=port_out, dport=external_port1))
+             TCP(sport=external_port, dport=port_out1))
         self.pg1.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.pg1.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1636,11 +2124,26 @@ class TestDeterministicNAT(MethodHolder):
             self.assertEqual(ip.src, self.pg1.remote_ip4)
             self.assertEqual(ip.dst, host1.ip4)
             self.assertEqual(tcp.dport, port_in)
             self.assertEqual(ip.src, self.pg1.remote_ip4)
             self.assertEqual(ip.dst, host1.ip4)
             self.assertEqual(tcp.dport, port_in)
-            self.assertEqual(tcp.sport, port_out)
+            self.assertEqual(tcp.sport, external_port)
         except:
             self.logger.error(ppp("Unexpected or invalid packet", p))
             raise
 
         except:
             self.logger.error(ppp("Unexpected or invalid packet", p))
             raise
 
+        # session close api test
+        self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
+                                             port_out1,
+                                             self.pg1.remote_ip4n,
+                                             external_port)
+        dms = self.vapi.snat_det_map_dump()
+        self.assertEqual(dms[0].ses_num, 1)
+
+        self.vapi.snat_det_close_session_in(host0.ip4n,
+                                            port_in,
+                                            self.pg1.remote_ip4n,
+                                            external_port)
+        dms = self.vapi.snat_det_map_dump()
+        self.assertEqual(dms[0].ses_num, 0)
+
     def test_tcp_session_close_detection_in(self):
         """ CGNAT TCP session close initiated from inside network """
         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
     def test_tcp_session_close_detection_in(self):
         """ CGNAT TCP session close initiated from inside network """
         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
@@ -1658,7 +2161,7 @@ class TestDeterministicNAT(MethodHolder):
             # FIN packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             # FIN packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="F"))
             self.pg0.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
                      flags="F"))
             self.pg0.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1670,14 +2173,14 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
             # ACK packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="A"))
             pkts.append(p)
 
             # FIN packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
                      flags="A"))
             pkts.append(p)
 
             # FIN packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="F"))
             pkts.append(p)
 
                      flags="F"))
             pkts.append(p)
 
@@ -1689,7 +2192,7 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="A"))
             self.pg0.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
                      flags="A"))
             self.pg0.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1720,7 +2223,7 @@ class TestDeterministicNAT(MethodHolder):
             # FIN packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
             # FIN packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="F"))
             self.pg1.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
                      flags="F"))
             self.pg1.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1732,14 +2235,14 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="A"))
             pkts.append(p)
 
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                      flags="A"))
             pkts.append(p)
 
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="F"))
             pkts.append(p)
 
                      flags="F"))
             pkts.append(p)
 
@@ -1751,7 +2254,7 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
             # ACK packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="A"))
             self.pg1.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
                      flags="A"))
             self.pg1.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1797,6 +2300,11 @@ class TestDeterministicNAT(MethodHolder):
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                  is_inside=0)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                  is_inside=0)
+        self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
+                                     src_address=self.pg2.local_ip4n,
+                                     path_mtu=512,
+                                     template_interval=10)
+        self.vapi.snat_ipfix()
 
         pkts = []
         for port in range(1025, 2025):
 
         pkts = []
         for port in range(1025, 2025):
@@ -1812,20 +2320,48 @@ class TestDeterministicNAT(MethodHolder):
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-             UDP(sport=3000, dport=3000))
+             UDP(sport=3001, dport=3002))
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.assert_nothing_captured()
 
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.assert_nothing_captured()
 
+        # verify ICMP error packet
+        capture = self.pg0.get_capture(1)
+        p = capture[0]
+        self.assertTrue(p.haslayer(ICMP))
+        icmp = p[ICMP]
+        self.assertEqual(icmp.type, 3)
+        self.assertEqual(icmp.code, 1)
+        self.assertTrue(icmp.haslayer(IPerror))
+        inner_ip = icmp[IPerror]
+        self.assertEqual(inner_ip[UDPerror].sport, 3001)
+        self.assertEqual(inner_ip[UDPerror].dport, 3002)
+
         dms = self.vapi.snat_det_map_dump()
 
         self.assertEqual(1000, dms[0].ses_num)
 
         dms = self.vapi.snat_det_map_dump()
 
         self.assertEqual(1000, dms[0].ses_num)
 
+        # verify IPFIX logging
+        self.vapi.cli("ipfix flush")  # FIXME this should be an API call
+        capture = self.pg2.get_capture(2)
+        ipfix = IPFIXDecoder()
+        # first load template
+        for p in capture:
+            self.assertTrue(p.haslayer(IPFIX))
+            if p.haslayer(Template):
+                ipfix.add_template(p.getlayer(Template))
+        # verify events in data set
+        for p in capture:
+            if p.haslayer(Data):
+                data = ipfix.decode_data_set(p.getlayer(Set))
+                self.verify_ipfix_max_entries_per_user(data)
+
     def clear_snat(self):
         """
         Clear SNAT configuration.
         """
     def clear_snat(self):
         """
         Clear SNAT configuration.
         """
+        self.vapi.snat_ipfix(enable=0)
         self.vapi.snat_det_set_timeouts()
         deterministic_mappings = self.vapi.snat_det_map_dump()
         for dsm in deterministic_mappings:
         self.vapi.snat_det_set_timeouts()
         deterministic_mappings = self.vapi.snat_det_map_dump()
         for dsm in deterministic_mappings:
@@ -1847,5 +2383,441 @@ class TestDeterministicNAT(MethodHolder):
             self.logger.info(self.vapi.cli("show snat detail"))
             self.clear_snat()
 
             self.logger.info(self.vapi.cli("show snat detail"))
             self.clear_snat()
 
+
+class TestNAT64(MethodHolder):
+    """ NAT64 Test Cases """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestNAT64, cls).setUpClass()
+
+        try:
+            cls.tcp_port_in = 6303
+            cls.tcp_port_out = 6303
+            cls.udp_port_in = 6304
+            cls.udp_port_out = 6304
+            cls.icmp_id_in = 6305
+            cls.icmp_id_out = 6305
+            cls.nat_addr = '10.0.0.3'
+            cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+
+            cls.create_pg_interfaces(range(2))
+            cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
+            cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
+
+            for i in cls.ip6_interfaces:
+                i.admin_up()
+                i.config_ip6()
+                i.resolve_ndp()
+
+            for i in cls.ip4_interfaces:
+                i.admin_up()
+                i.config_ip4()
+                i.resolve_arp()
+
+        except Exception:
+            super(TestNAT64, cls).tearDownClass()
+            raise
+
+    def test_pool(self):
+        """ Add/delete address to NAT64 pool """
+        nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
+
+        self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
+
+        addresses = self.vapi.nat64_pool_addr_dump()
+        self.assertEqual(len(addresses), 1)
+        self.assertEqual(addresses[0].address, nat_addr)
+
+        self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
+
+        addresses = self.vapi.nat64_pool_addr_dump()
+        self.assertEqual(len(addresses), 0)
+
+    def test_interface(self):
+        """ Enable/disable NAT64 feature on the interface """
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        interfaces = self.vapi.nat64_interface_dump()
+        self.assertEqual(len(interfaces), 2)
+        pg0_found = False
+        pg1_found = False
+        for intf in interfaces:
+            if intf.sw_if_index == self.pg0.sw_if_index:
+                self.assertEqual(intf.is_inside, 1)
+                pg0_found = True
+            elif intf.sw_if_index == self.pg1.sw_if_index:
+                self.assertEqual(intf.is_inside, 0)
+                pg1_found = True
+        self.assertTrue(pg0_found)
+        self.assertTrue(pg1_found)
+
+        features = self.vapi.cli("show interface features pg0")
+        self.assertNotEqual(features.find('nat64-in2out'), -1)
+        features = self.vapi.cli("show interface features pg1")
+        self.assertNotEqual(features.find('nat64-out2in'), -1)
+
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
+
+        interfaces = self.vapi.nat64_interface_dump()
+        self.assertEqual(len(interfaces), 0)
+
+    def test_static_bib(self):
+        """ Add/delete static BIB entry """
+        in_addr = socket.inet_pton(socket.AF_INET6,
+                                   '2001:db8:85a3::8a2e:370:7334')
+        out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
+        in_port = 1234
+        out_port = 5678
+        proto = IP_PROTOS.tcp
+
+        self.vapi.nat64_add_del_static_bib(in_addr,
+                                           out_addr,
+                                           in_port,
+                                           out_port,
+                                           proto)
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+        static_bib_num = 0
+        for bibe in bib:
+            if bibe.is_static:
+                static_bib_num += 1
+                self.assertEqual(bibe.i_addr, in_addr)
+                self.assertEqual(bibe.o_addr, out_addr)
+                self.assertEqual(bibe.i_port, in_port)
+                self.assertEqual(bibe.o_port, out_port)
+        self.assertEqual(static_bib_num, 1)
+
+        self.vapi.nat64_add_del_static_bib(in_addr,
+                                           out_addr,
+                                           in_port,
+                                           out_port,
+                                           proto,
+                                           is_add=0)
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+        static_bib_num = 0
+        for bibe in bib:
+            if bibe.is_static:
+                static_bib_num += 1
+        self.assertEqual(static_bib_num, 0)
+
+    def test_set_timeouts(self):
+        """ Set NAT64 timeouts """
+        # verify default values
+        timeouts = self.vapi.nat64_get_timeouts()
+        self.assertEqual(timeouts.udp, 300)
+        self.assertEqual(timeouts.icmp, 60)
+        self.assertEqual(timeouts.tcp_trans, 240)
+        self.assertEqual(timeouts.tcp_est, 7440)
+        self.assertEqual(timeouts.tcp_incoming_syn, 6)
+
+        # set and verify custom values
+        self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
+                                     tcp_est=7450, tcp_incoming_syn=10)
+        timeouts = self.vapi.nat64_get_timeouts()
+        self.assertEqual(timeouts.udp, 200)
+        self.assertEqual(timeouts.icmp, 30)
+        self.assertEqual(timeouts.tcp_trans, 250)
+        self.assertEqual(timeouts.tcp_est, 7450)
+        self.assertEqual(timeouts.tcp_incoming_syn, 10)
+
+    def test_dynamic(self):
+        """ NAT64 dynamic translation test """
+        self.tcp_port_in = 6303
+        self.udp_port_in = 6304
+        self.icmp_id_in = 6305
+
+        ses_num_start = self.nat64_get_ses_num()
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        # in2out
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(3)
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+        # in2out
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(3)
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+        ses_num_end = self.nat64_get_ses_num()
+
+        self.assertEqual(ses_num_end - ses_num_start, 3)
+
+    def test_static(self):
+        """ NAT64 static translation test """
+        self.tcp_port_in = 60303
+        self.udp_port_in = 60304
+        self.icmp_id_in = 60305
+        self.tcp_port_out = 60303
+        self.udp_port_out = 60304
+        self.icmp_id_out = 60305
+
+        ses_num_start = self.nat64_get_ses_num()
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+                                           self.nat_addr_n,
+                                           self.tcp_port_in,
+                                           self.tcp_port_out,
+                                           IP_PROTOS.tcp)
+        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+                                           self.nat_addr_n,
+                                           self.udp_port_in,
+                                           self.udp_port_out,
+                                           IP_PROTOS.udp)
+        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+                                           self.nat_addr_n,
+                                           self.icmp_id_in,
+                                           self.icmp_id_out,
+                                           IP_PROTOS.icmp)
+
+        # in2out
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4, same_port=True)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(3)
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+        ses_num_end = self.nat64_get_ses_num()
+
+        self.assertEqual(ses_num_end - ses_num_start, 3)
+
+    @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+    def test_session_timeout(self):
+        """ NAT64 session timeout """
+        self.icmp_id_in = 1234
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+        self.vapi.nat64_set_timeouts(icmp=5)
+
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+
+        ses_num_before_timeout = self.nat64_get_ses_num()
+
+        sleep(15)
+
+        # ICMP session after timeout
+        ses_num_after_timeout = self.nat64_get_ses_num()
+        self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
+
+    def test_icmp_error(self):
+        """ NAT64 ICMP Error message translation """
+        self.tcp_port_in = 6303
+        self.udp_port_in = 6304
+        self.icmp_id_in = 6305
+
+        ses_num_start = self.nat64_get_ses_num()
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        # send some packets to create sessions
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture_ip4 = self.pg1.get_capture(len(pkts))
+        self.verify_capture_out(capture_ip4, packet_num=3,
+                                nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture_ip6 = self.pg0.get_capture(len(pkts))
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
+                                   self.pg0.remote_ip6)
+
+        # in2out
+        pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
+                ICMPv6DestUnreach(code=1) /
+                packet[IPv6] for packet in capture_ip6]
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, self.nat_addr)
+                self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+                self.assertEqual(packet[ICMP].type, 3)
+                self.assertEqual(packet[ICMP].code, 13)
+                inner = packet[IPerror]
+                self.assertEqual(inner.src, self.pg1.remote_ip4)
+                self.assertEqual(inner.dst, self.nat_addr)
+                if inner.haslayer(TCPerror):
+                    self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
+                elif inner.haslayer(UDPerror):
+                    self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
+                else:
+                    self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # out2in
+        pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+                IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+                ICMP(type=3, code=13) /
+                packet[IP] for packet in capture_ip4]
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IPv6].src, ip.src)
+                self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
+                icmp = packet[ICMPv6DestUnreach]
+                self.assertEqual(icmp.code, 1)
+                inner = icmp[IPerror6]
+                self.assertEqual(inner.src, self.pg0.remote_ip6)
+                self.assertEqual(inner.dst, ip.src)
+                if inner.haslayer(TCPerror):
+                    self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
+                elif inner.haslayer(UDPerror):
+                    self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
+                else:
+                    self.assertEqual(inner[ICMPv6EchoRequest].id,
+                                     self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+    def nat64_get_ses_num(self):
+        """
+        Return number of active NAT64 sessions.
+        """
+        ses_num = 0
+        st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
+        ses_num += len(st)
+        st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
+        ses_num += len(st)
+        st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
+        ses_num += len(st)
+        return ses_num
+
+    def clear_nat64(self):
+        """
+        Clear NAT64 configuration.
+        """
+        self.vapi.nat64_set_timeouts()
+
+        interfaces = self.vapi.nat64_interface_dump()
+        for intf in interfaces:
+            self.vapi.nat64_add_del_interface(intf.sw_if_index,
+                                              intf.is_inside,
+                                              is_add=0)
+
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+        for bibe in bib:
+            if bibe.is_static:
+                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+                                                   bibe.o_addr,
+                                                   bibe.i_port,
+                                                   bibe.o_port,
+                                                   bibe.proto,
+                                                   bibe.vrf_id,
+                                                   is_add=0)
+
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
+        for bibe in bib:
+            if bibe.is_static:
+                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+                                                   bibe.o_addr,
+                                                   bibe.i_port,
+                                                   bibe.o_port,
+                                                   bibe.proto,
+                                                   bibe.vrf_id,
+                                                   is_add=0)
+
+        bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
+        for bibe in bib:
+            if bibe.is_static:
+                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+                                                   bibe.o_addr,
+                                                   bibe.i_port,
+                                                   bibe.o_port,
+                                                   bibe.proto,
+                                                   bibe.vrf_id,
+                                                   is_add=0)
+
+        adresses = self.vapi.nat64_pool_addr_dump()
+        for addr in adresses:
+            self.vapi.nat64_add_del_pool_addr_range(addr.address,
+                                                    addr.address,
+                                                    is_add=0)
+
+    def tearDown(self):
+        super(TestNAT64, self).tearDown()
+        if not self.vpp_dead:
+            self.logger.info(self.vapi.cli("show nat64 pool"))
+            self.logger.info(self.vapi.cli("show nat64 interfaces"))
+            self.logger.info(self.vapi.cli("show nat64 bib tcp"))
+            self.logger.info(self.vapi.cli("show nat64 bib udp"))
+            self.logger.info(self.vapi.cli("show nat64 bib icmp"))
+            self.logger.info(self.vapi.cli("show nat64 session table tcp"))
+            self.logger.info(self.vapi.cli("show nat64 session table udp"))
+            self.logger.info(self.vapi.cli("show nat64 session table icmp"))
+            self.clear_nat64()
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)