Packets recieved on VLAN-0 map to the main interface
[vpp.git] / test / test_snat.py
index ace1723..0eceaab 100644 (file)
@@ -9,6 +9,7 @@ from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.l2 import Ether, ARP
 from scapy.data import IP_PROTOS
+from scapy.packet import bind_layers
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
 from time import sleep
@@ -312,8 +313,10 @@ class TestSNAT(MethodHolder):
             cls.icmp_id_in = 6305
             cls.icmp_id_out = 6305
             cls.snat_addr = '10.0.0.3'
+            cls.ipfix_src_port = 4739
+            cls.ipfix_domain_id = 1
 
-            cls.create_pg_interfaces(range(8))
+            cls.create_pg_interfaces(range(9))
             cls.interfaces = list(cls.pg_interfaces[0:4])
 
             for i in cls.interfaces:
@@ -321,7 +324,7 @@ class TestSNAT(MethodHolder):
                 i.config_ip4()
                 i.resolve_arp()
 
-            cls.pg0.generate_remote_hosts(2)
+            cls.pg0.generate_remote_hosts(3)
             cls.pg0.configure_ipv4_neighbors()
 
             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
@@ -344,6 +347,7 @@ class TestSNAT(MethodHolder):
                 i.resolve_arp()
 
             cls.pg7.admin_up()
+            cls.pg8.admin_up()
 
         except Exception:
             super(TestSNAT, cls).tearDownClass()
@@ -353,6 +357,26 @@ class TestSNAT(MethodHolder):
         """
         Clear SNAT configuration.
         """
+        # I found no elegant way to do this
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index,
+                                   is_add=0)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index,
+                                   is_add=0)
+
+        for intf in [self.pg7, self.pg8]:
+            neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
+            for n in neighbors:
+                self.vapi.ip_neighbor_add_del(intf.sw_if_index,
+                                              n.mac_address,
+                                              n.ip_address,
+                                              is_add=0)
+
         if self.pg7.has_ip4_config:
             self.pg7.unconfig_ip4()
 
@@ -360,7 +384,10 @@ class TestSNAT(MethodHolder):
         for intf in interfaces:
             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
 
-        self.vapi.snat_ipfix(enable=0)
+        self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
+                             domain_id=self.ipfix_domain_id)
+        self.ipfix_src_port = 4739
+        self.ipfix_domain_id = 1
 
         interfaces = self.vapi.snat_interface_dump()
         for intf in interfaces:
@@ -989,7 +1016,7 @@ class TestSNAT(MethodHolder):
                              self.icmp_id_in])
 
     def test_hairpinning(self):
-        """ SNAT hairpinning """
+        """ SNAT hairpinning - 1:1 NAT with port"""
 
         host = self.pg0.remote_hosts[0]
         server = self.pg0.remote_hosts[1]
@@ -1048,6 +1075,168 @@ class TestSNAT(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
+    def test_hairpinning2(self):
+        """ SNAT hairpinning - 1:1 NAT"""
+
+        server1_nat_ip = "10.0.0.10"
+        server2_nat_ip = "10.0.0.11"
+        host = self.pg0.remote_hosts[0]
+        server1 = self.pg0.remote_hosts[1]
+        server2 = self.pg0.remote_hosts[2]
+        server_tcp_port = 22
+        server_udp_port = 20
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+                                                 is_inside=0)
+
+        # add static mapping for servers
+        self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
+        self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
+
+        # host to server1
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             UDP(sport=self.udp_port_in, dport=server_udp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=host.ip4, dst=server1_nat_ip) /
+             ICMP(id=self.icmp_id_in, type='echo-request'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, self.snat_addr)
+                self.assertEqual(packet[IP].dst, server1.ip4)
+                if packet.haslayer(TCP):
+                    self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].dport, server_tcp_port)
+                    self.tcp_port_out = packet[TCP].sport
+                elif packet.haslayer(UDP):
+                    self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].dport, server_udp_port)
+                    self.udp_port_out = packet[UDP].sport
+                else:
+                    self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
+                    self.icmp_id_out = packet[ICMP].id
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server1 to host
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             UDP(sport=server_udp_port, dport=self.udp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=self.snat_addr) /
+             ICMP(id=self.icmp_id_out, type='echo-reply'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server1_nat_ip)
+                self.assertEqual(packet[IP].dst, host.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].sport, server_tcp_port)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].sport, server_udp_port)
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server2 to server1
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             UDP(sport=self.udp_port_in, dport=server_udp_port))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server2.ip4, dst=server1_nat_ip) /
+             ICMP(id=self.icmp_id_in, type='echo-request'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server2_nat_ip)
+                self.assertEqual(packet[IP].dst, server1.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].dport, server_tcp_port)
+                    self.tcp_port_out = packet[TCP].sport
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].dport, server_udp_port)
+                    self.udp_port_out = packet[UDP].sport
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+                    self.icmp_id_out = packet[ICMP].id
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # server1 to server2
+        pkts = []
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             UDP(sport=server_udp_port, dport=self.udp_port_out))
+        pkts.append(p)
+        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+             IP(src=server1.ip4, dst=server2_nat_ip) /
+             ICMP(id=self.icmp_id_out, type='echo-reply'))
+        pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, server1_nat_ip)
+                self.assertEqual(packet[IP].dst, server2.ip4)
+                if packet.haslayer(TCP):
+                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+                    self.assertEqual(packet[TCP].sport, server_tcp_port)
+                elif packet.haslayer(UDP):
+                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
+                    self.assertEqual(packet[UDP].sport, server_udp_port)
+                else:
+                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
     def test_max_translations_per_user(self):
         """ MAX translations per user - recycle the least recently used """
 
@@ -1120,6 +1309,10 @@ class TestSNAT(MethodHolder):
 
     def test_ipfix_nat44_sess(self):
         """ S-NAT IPFIX logging NAT44 session created/delted """
+        self.ipfix_domain_id = 10
+        self.ipfix_src_port = 20202
+        colector_port = 30303
+        bind_layers(UDP, IPFIX, dport=30303)
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
@@ -1127,8 +1320,10 @@ class TestSNAT(MethodHolder):
         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
-                                     template_interval=10)
-        self.vapi.snat_ipfix()
+                                     template_interval=10,
+                                     collector_port=colector_port)
+        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+                             src_port=self.ipfix_src_port)
 
         pkts = self.create_stream_in(self.pg0, self.pg1)
         self.pg0.add_stream(pkts)
@@ -1143,6 +1338,12 @@ class TestSNAT(MethodHolder):
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
+            self.assertEqual(p[IP].src, self.pg3.local_ip4)
+            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+            self.assertEqual(p[UDP].dport, colector_port)
+            self.assertEqual(p[IPFIX].observationDomainID,
+                             self.ipfix_domain_id)
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
@@ -1160,7 +1361,8 @@ class TestSNAT(MethodHolder):
                                      src_address=self.pg3.local_ip4n,
                                      path_mtu=512,
                                      template_interval=10)
-        self.vapi.snat_ipfix()
+        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+                             src_port=self.ipfix_src_port)
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
@@ -1175,6 +1377,12 @@ class TestSNAT(MethodHolder):
         # first load template
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
+            self.assertEqual(p[IP].src, self.pg3.local_ip4)
+            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+            self.assertEqual(p[UDP].dport, 4739)
+            self.assertEqual(p[IPFIX].observationDomainID,
+                             self.ipfix_domain_id)
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
@@ -1311,6 +1519,145 @@ class TestSNAT(MethodHolder):
         capture = self.pg2.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip1)
 
+    def test_dynamic_ipless_interfaces(self):
+        """ SNAT interfaces without configured ip dynamic map """
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8, self.snat_addr)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+    def test_static_ipless_interfaces(self):
+        """ SNAT 1:1 NAT interfaces without configured ip """
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture, self.snat_addr, True)
+
+    def test_static_with_port_ipless_interfaces(self):
+        """ SNAT 1:1 NAT with port interfaces without configured ip """
+
+        self.tcp_port_out = 30606
+        self.udp_port_out = 30607
+        self.icmp_id_out = 30608
+
+        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+                                      self.pg7.remote_mac,
+                                      self.pg7.remote_ip4n,
+                                      is_static=1)
+        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+                                      self.pg8.remote_mac,
+                                      self.pg8.remote_ip4n,
+                                      is_static=1)
+
+        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg7.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg7.sw_if_index)
+        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+                                   dst_address_length=32,
+                                   next_hop_address=self.pg8.remote_ip4n,
+                                   next_hop_sw_if_index=self.pg8.sw_if_index)
+
+        self.snat_add_address(self.snat_addr)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.tcp_port_in, self.tcp_port_out,
+                                     proto=IP_PROTOS.tcp)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.udp_port_in, self.udp_port_out,
+                                     proto=IP_PROTOS.udp)
+        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+                                     self.icmp_id_in, self.icmp_id_out,
+                                     proto=IP_PROTOS.icmp)
+        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+                                                 is_inside=0)
+
+        # out2in
+        pkts = self.create_stream_out(self.pg8)
+        self.pg8.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg7.get_capture(len(pkts))
+        self.verify_capture_in(capture, self.pg7)
+
+        # in2out
+        pkts = self.create_stream_in(self.pg7, self.pg8)
+        self.pg7.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg8.get_capture(len(pkts))
+        self.verify_capture_out(capture)
+
     def tearDown(self):
         super(TestSNAT, self).tearDown()
         if not self.vpp_dead:
@@ -1332,9 +1679,9 @@ class TestDeterministicNAT(MethodHolder):
 
         try:
             cls.tcp_port_in = 6303
-            cls.tcp_port_out = 6303
+            cls.tcp_external_port = 6303
             cls.udp_port_in = 6304
-            cls.udp_port_out = 6304
+            cls.udp_external_port = 6304
             cls.icmp_id_in = 6305
             cls.snat_addr = '10.0.0.3'
 
@@ -1365,13 +1712,13 @@ class TestDeterministicNAT(MethodHolder):
         # TCP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             TCP(sport=self.tcp_port_in, dport=self.tcp_port_out))
+             TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
-             UDP(sport=self.udp_port_in, dport=self.udp_port_out))
+             UDP(sport=self.udp_port_in, dport=self.udp_external_port))
         pkts.append(p)
 
         # ICMP
@@ -1396,13 +1743,13 @@ class TestDeterministicNAT(MethodHolder):
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             TCP(dport=self.tcp_external_port, sport=self.tcp_port_out))
+             TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
         pkts.append(p)
 
         # UDP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
-             UDP(dport=self.udp_external_port, sport=self.udp_port_out))
+             UDP(dport=self.udp_port_out, sport=self.udp_external_port))
         pkts.append(p)
 
         # ICMP
@@ -1429,9 +1776,9 @@ class TestDeterministicNAT(MethodHolder):
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
                 if packet.haslayer(TCP):
-                    self.tcp_external_port = packet[TCP].sport
+                    self.tcp_port_out = packet[TCP].sport
                 elif packet.haslayer(UDP):
-                    self.udp_external_port = packet[UDP].sport
+                    self.udp_port_out = packet[UDP].sport
                 else:
                     self.icmp_external_id = packet[ICMP].id
             except:
@@ -1450,19 +1797,19 @@ class TestDeterministicNAT(MethodHolder):
             # SYN packet in->out
             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="S"))
             in_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
             self.pg_start()
             capture = out_if.get_capture(1)
             p = capture[0]
-            self.tcp_external_port = p[TCP].sport
+            self.tcp_port_out = p[TCP].sport
 
             # SYN + ACK packet out->in
             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="SA"))
             out_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1472,7 +1819,7 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet in->out
             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="A"))
             in_if.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1553,12 +1900,6 @@ class TestDeterministicNAT(MethodHolder):
         """ CGNAT translation test (TCP, UDP, ICMP) """
 
         nat_ip = "10.0.0.10"
-        self.tcp_port_out = 6303
-        self.udp_port_out = 6304
-        self.icmp_id_in = 6305
-        self.tcp_external_port = 7303
-        self.udp_external_port = 7304
-        self.icmp_id_out = 7305
 
         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                    32,
@@ -1584,12 +1925,36 @@ class TestDeterministicNAT(MethodHolder):
         capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
+        # session dump test
+        sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
+        self.assertEqual(len(sessions), 3)
+
+        # TCP session
+        s = sessions[0]
+        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+        self.assertEqual(s.in_port, self.tcp_port_in)
+        self.assertEqual(s.out_port, self.tcp_port_out)
+        self.assertEqual(s.ext_port, self.tcp_external_port)
+
+        # UDP session
+        s = sessions[1]
+        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+        self.assertEqual(s.in_port, self.udp_port_in)
+        self.assertEqual(s.out_port, self.udp_port_out)
+        self.assertEqual(s.ext_port, self.udp_external_port)
+
+        # ICMP session
+        s = sessions[2]
+        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+        self.assertEqual(s.in_port, self.icmp_id_in)
+        self.assertEqual(s.out_port, self.icmp_external_id)
+
     def test_multiple_users(self):
         """ CGNAT multiple users """
 
         nat_ip = "10.0.0.10"
         port_in = 80
-        port_out = 6303
+        external_port = 6303
 
         host0 = self.pg0.remote_hosts[0]
         host1 = self.pg0.remote_hosts[1]
@@ -1605,7 +1970,7 @@ class TestDeterministicNAT(MethodHolder):
         # host0 to out
         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
-             TCP(sport=port_in, dport=port_out))
+             TCP(sport=port_in, dport=external_port))
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1616,8 +1981,8 @@ class TestDeterministicNAT(MethodHolder):
             tcp = p[TCP]
             self.assertEqual(ip.src, nat_ip)
             self.assertEqual(ip.dst, self.pg1.remote_ip4)
-            self.assertEqual(tcp.dport, port_out)
-            external_port0 = tcp.sport
+            self.assertEqual(tcp.dport, external_port)
+            port_out0 = tcp.sport
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
@@ -1625,7 +1990,7 @@ class TestDeterministicNAT(MethodHolder):
         # host1 to out
         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
-             TCP(sport=port_in, dport=port_out))
+             TCP(sport=port_in, dport=external_port))
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1636,8 +2001,8 @@ class TestDeterministicNAT(MethodHolder):
             tcp = p[TCP]
             self.assertEqual(ip.src, nat_ip)
             self.assertEqual(ip.dst, self.pg1.remote_ip4)
-            self.assertEqual(tcp.dport, port_out)
-            external_port1 = tcp.sport
+            self.assertEqual(tcp.dport, external_port)
+            port_out1 = tcp.sport
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
@@ -1649,7 +2014,7 @@ class TestDeterministicNAT(MethodHolder):
         # out to host0
         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
-             TCP(sport=port_out, dport=external_port0))
+             TCP(sport=external_port, dport=port_out0))
         self.pg1.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1661,7 +2026,7 @@ class TestDeterministicNAT(MethodHolder):
             self.assertEqual(ip.src, self.pg1.remote_ip4)
             self.assertEqual(ip.dst, host0.ip4)
             self.assertEqual(tcp.dport, port_in)
-            self.assertEqual(tcp.sport, port_out)
+            self.assertEqual(tcp.sport, external_port)
         except:
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
@@ -1669,7 +2034,7 @@ class TestDeterministicNAT(MethodHolder):
         # out to host1
         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
-             TCP(sport=port_out, dport=external_port1))
+             TCP(sport=external_port, dport=port_out1))
         self.pg1.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1681,11 +2046,26 @@ class TestDeterministicNAT(MethodHolder):
             self.assertEqual(ip.src, self.pg1.remote_ip4)
             self.assertEqual(ip.dst, host1.ip4)
             self.assertEqual(tcp.dport, port_in)
-            self.assertEqual(tcp.sport, port_out)
+            self.assertEqual(tcp.sport, external_port)
         except:
             self.logger.error(ppp("Unexpected or invalid packet", p))
             raise
 
+        # session close api test
+        self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
+                                             port_out1,
+                                             self.pg1.remote_ip4n,
+                                             external_port)
+        dms = self.vapi.snat_det_map_dump()
+        self.assertEqual(dms[0].ses_num, 1)
+
+        self.vapi.snat_det_close_session_in(host0.ip4n,
+                                            port_in,
+                                            self.pg1.remote_ip4n,
+                                            external_port)
+        dms = self.vapi.snat_det_map_dump()
+        self.assertEqual(dms[0].ses_num, 0)
+
     def test_tcp_session_close_detection_in(self):
         """ CGNAT TCP session close initiated from inside network """
         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
@@ -1703,7 +2083,7 @@ class TestDeterministicNAT(MethodHolder):
             # FIN packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="F"))
             self.pg0.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1715,14 +2095,14 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="A"))
             pkts.append(p)
 
             # FIN packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="F"))
             pkts.append(p)
 
@@ -1734,7 +2114,7 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="A"))
             self.pg0.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1765,7 +2145,7 @@ class TestDeterministicNAT(MethodHolder):
             # FIN packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="F"))
             self.pg1.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1777,14 +2157,14 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="A"))
             pkts.append(p)
 
             # ACK packet in -> out
             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-                 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                      flags="F"))
             pkts.append(p)
 
@@ -1796,7 +2176,7 @@ class TestDeterministicNAT(MethodHolder):
             # ACK packet out -> in
             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
-                 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                      flags="A"))
             self.pg1.add_stream(p)
             self.pg_enable_capture(self.pg_interfaces)
@@ -1862,12 +2242,24 @@ class TestDeterministicNAT(MethodHolder):
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
-             UDP(sport=3000, dport=3000))
+             UDP(sport=3001, dport=3002))
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.assert_nothing_captured()
 
+        # verify ICMP error packet
+        capture = self.pg0.get_capture(1)
+        p = capture[0]
+        self.assertTrue(p.haslayer(ICMP))
+        icmp = p[ICMP]
+        self.assertEqual(icmp.type, 3)
+        self.assertEqual(icmp.code, 1)
+        self.assertTrue(icmp.haslayer(IPerror))
+        inner_ip = icmp[IPerror]
+        self.assertEqual(inner_ip[UDPerror].sport, 3001)
+        self.assertEqual(inner_ip[UDPerror].dport, 3002)
+
         dms = self.vapi.snat_det_map_dump()
 
         self.assertEqual(1000, dms[0].ses_num)