cnat: Prepare extended snat policies
[vpp.git] / src / plugins / cnat / test / test_cnat.py
index 34cd8b5..ff4c440 100644 (file)
@@ -3,13 +3,15 @@
 import unittest
 
 from framework import VppTestCase, VppTestRunner
-from vpp_ip import DpoProto
+from vpp_ip import DpoProto, INVALID_INDEX
+from itertools import product
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, TCP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
 
 import struct
 
@@ -22,25 +24,34 @@ from vpp_papi import VppEnum
 N_PKTS = 15
 
 
-def find_cnat_translation(test, id):
-    ts = test.vapi.cnat_translation_dump()
-    for t in ts:
-        if id == t.translation.id:
-            return True
-    return False
-
-
 class Ep(object):
     """ CNat endpoint """
 
-    def __init__(self, ip, port, l4p=TCP):
+    def __init__(self, ip=None, port=0, l4p=TCP,
+                 sw_if_index=INVALID_INDEX, is_v6=False):
         self.ip = ip
+        if ip is None:
+            self.ip = "::" if is_v6 else "0.0.0.0"
         self.port = port
         self.l4p = l4p
+        self.sw_if_index = sw_if_index
+        if is_v6:
+            self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
+        else:
+            self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
 
     def encode(self):
         return {'addr': self.ip,
-                'port': self.port}
+                'port': self.port,
+                'sw_if_index': self.sw_if_index,
+                'if_af': self.if_af}
+
+    @classmethod
+    def from_pg(cls, pg, is_v6=False):
+        if pg is None:
+            return cls(is_v6=is_v6)
+        else:
+            return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
 
     @property
     def isV6(self):
@@ -76,6 +87,9 @@ class VppCNatTranslation(VppObject):
         for path in self.paths:
             self.encoded_paths.append(path.encode())
 
+    def __str__(self):
+        return ("%s %s %s" % (self.vip, self.iproto, self.paths))
+
     @property
     def vl4_proto(self):
         ip_proto = VppEnum.vl_api_ip_proto_t
@@ -84,9 +98,6 @@ class VppCNatTranslation(VppObject):
             TCP: ip_proto.IP_API_PROTO_TCP,
         }[self.iproto]
 
-    def delete(self):
-        r = self._test.vapi.cnat_translation_del(id=self.id)
-
     def add_vpp_config(self):
         r = self._test.vapi.cnat_translation_update(
             {'vip': self.vip.encode(),
@@ -110,10 +121,13 @@ class VppCNatTranslation(VppObject):
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
-        self._test.vapi.cnat_translation_del(self.id)
+        self._test.vapi.cnat_translation_del(id=self.id)
 
     def query_vpp_config(self):
-        return find_cnat_translation(self._test, self.id)
+        for t in self._test.vapi.cnat_translation_dump():
+            if self.id == t.translation.id:
+                return t.translation
+        return None
 
     def object_id(self):
         return ("cnat-translation-%s" % (self.vip))
@@ -123,34 +137,6 @@ class VppCNatTranslation(VppObject):
         return c[0][self.id]
 
 
-class VppCNATSourceNat(VppObject):
-
-    def __init__(self, test, address, exclude_subnets=[]):
-        self._test = test
-        self.address = address
-        self.exclude_subnets = exclude_subnets
-
-    def add_vpp_config(self):
-        a = ip_address(self.address)
-        if 4 == a.version:
-            self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
-        else:
-            self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
-        for subnet in self.exclude_subnets:
-            self.cnat_exclude_subnet(subnet, True)
-
-    def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
-        add = 1 if isAdd else 0
-        self._test.vapi.cnat_add_del_snat_prefix(
-            prefix=exclude_subnet, is_add=add)
-
-    def query_vpp_config(self):
-        return False
-
-    def remove_vpp_config(self):
-        return False
-
-
 class TestCNatTranslation(VppTestCase):
     """ CNat Translation """
     extra_vpp_punt_config = ["cnat", "{",
@@ -218,6 +204,7 @@ class TestCNatTranslation(VppTestCase):
                 rxs = self.send_and_expect(self.pg0,
                                            p1 * N_PKTS,
                                            self.pg1)
+                self.logger.info(self.vapi.cli("show trace max 1"))
 
                 for rx in rxs:
                     self.assert_packet_checksums_valid(rx)
@@ -284,11 +271,6 @@ class TestCNatTranslation(VppTestCase):
                                            p1 * N_PKTS,
                                            self.pg0)
 
-        self.assertEqual(t1.get_stats()['packets'],
-                         N_PKTS *
-                         len(sports) *
-                         len(self.pg0.remote_hosts))
-
     def cnat_test_translation_update(self, t1, sports, isV6=False):
         ip_v = "ip6" if isV6 else "ip4"
         ip_class = IPv6 if isV6 else IP
@@ -379,7 +361,7 @@ class TestCNatTranslation(VppTestCase):
             n_tries += 1
             sessions = self.vapi.cnat_session_dump()
             self.sleep(2)
-            print(self.vapi.cli("show cnat session verbose"))
+            self.logger.info(self.vapi.cli("show cnat session verbose"))
 
         self.assertTrue(n_tries < 100)
         self.vapi.cli("test cnat scanner off")
@@ -401,7 +383,7 @@ class TestCNatTranslation(VppTestCase):
                                          self.pg2)
 
         for tr in trs:
-            tr.delete()
+            tr.remove_vpp_config()
 
         self.assertTrue(self.vapi.cnat_session_dump())
         self.vapi.cnat_session_purge()
@@ -545,8 +527,10 @@ class TestCNatTranslation(VppTestCase):
 class TestCNatSourceNAT(VppTestCase):
     """ CNat Source NAT """
     extra_vpp_punt_config = ["cnat", "{",
+                             "session-cleanup-timeout", "0.1",
                              "session-max-age", "1",
-                             "tcp-max-age", "1", "}"]
+                             "tcp-max-age", "1",
+                             "scanner", "off", "}"]
 
     @classmethod
     def setUpClass(cls):
@@ -568,49 +552,230 @@ class TestCNatSourceNAT(VppTestCase):
             i.config_ip6()
             i.resolve_ndp()
 
+        self.pg0.configure_ipv6_neighbors()
+        self.pg0.configure_ipv4_neighbors()
+        self.pg1.generate_remote_hosts(2)
+        self.pg1.configure_ipv4_neighbors()
+        self.pg1.configure_ipv6_neighbors()
+
+        self.vapi.cnat_set_snat_addresses(
+            snat_ip4=self.pg2.remote_hosts[0].ip4,
+            snat_ip6=self.pg2.remote_hosts[0].ip6,
+            sw_if_index=INVALID_INDEX)
+        self.vapi.feature_enable_disable(
+            enable=1,
+            arc_name="ip6-unicast",
+            feature_name="cnat-snat-ip6",
+            sw_if_index=self.pg0.sw_if_index)
+        self.vapi.feature_enable_disable(
+            enable=1,
+            arc_name="ip4-unicast",
+            feature_name="cnat-snat-ip4",
+            sw_if_index=self.pg0.sw_if_index)
+
+        policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
+        self.vapi.cnat_set_snat_policy(
+            policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
+        for i in self.pg_interfaces:
+            self.vapi.cnat_snat_policy_add_del_if(
+                sw_if_index=i.sw_if_index, is_add=1,
+                table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
+            self.vapi.cnat_snat_policy_add_del_if(
+                sw_if_index=i.sw_if_index, is_add=1,
+                table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
+
     def tearDown(self):
+        self.vapi.cnat_session_purge()
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
             i.admin_down()
         super(TestCNatSourceNAT, self).tearDown()
 
-    def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
-        t1 = VppCNATSourceNat(self, srcNatAddr)
-        t1.add_vpp_config()
-        cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
-        cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
-        self.vapi.feature_enable_disable(
-            enable=1,
-            arc_name=cnat_arc_name,
-            feature_name=cnat_feature_name,
-            sw_if_index=interface.sw_if_index)
+    def test_snat_v6(self):
+        # """ CNat Source Nat v6 """
+        self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
+        self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
+        self.sourcenat_test_icmp_err_conf(isV6=True)
+        self.sourcenat_test_icmp_echo6_conf()
 
-        return t1
+    def test_snat_v4(self):
+        # """ CNat Source Nat v4 """
+        self.sourcenat_test_tcp_udp_conf(TCP)
+        self.sourcenat_test_tcp_udp_conf(UDP)
+        self.sourcenat_test_icmp_err_conf()
+        self.sourcenat_test_icmp_echo4_conf()
 
-    def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
-        ip_v = "ip6" if isV6 else "ip4"
-        IP46 = IPv6 if isV6 else IP
-        sports = [1234, 1235, 1236]
-        dports = [6661, 6662, 6663]
+    def sourcenat_test_icmp_echo6_conf(self):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
 
-        self.pg0.generate_remote_hosts(1)
-        self.pg0.configure_ipv4_neighbors()
-        self.pg0.configure_ipv6_neighbors()
-        self.pg1.generate_remote_hosts(len(sports))
-        self.pg1.configure_ipv4_neighbors()
-        self.pg1.configure_ipv6_neighbors()
+        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+            client_addr = self.pg0.remote_hosts[0].ip6
+            remote_addr = self.pg1.remote_hosts[nbr].ip6
+            src_nat_addr = self.pg2.remote_hosts[0].ip6
 
-        self.vapi.cli("test cnat scanner on")
-        t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
+            # ping from pods to outside network
+            p1 = (
+                Ether(dst=self.pg0.local_mac,
+                      src=self.pg0.remote_hosts[0].mac) /
+                IPv6(src=client_addr, dst=remote_addr) /
+                ICMPv6EchoRequest(id=0xfeed) /
+                Raw())
+
+            rxs = self.send_and_expect(
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
+
+            for rx in rxs:
+                self.assertEqual(rx[IPv6].src, src_nat_addr)
+                self.assert_packet_checksums_valid(rx)
+
+            received_id = rx[0][ICMPv6EchoRequest].id
+            # ping reply from outside to pods
+            p2 = (
+                Ether(dst=self.pg1.local_mac,
+                      src=self.pg1.remote_hosts[nbr].mac) /
+                IPv6(src=remote_addr, dst=src_nat_addr) /
+                ICMPv6EchoReply(id=received_id))
+            rxs = self.send_and_expect(
+                self.pg1,
+                p2 * N_PKTS,
+                self.pg0)
+
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IPv6].src, remote_addr)
+                self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
+
+    def sourcenat_test_icmp_echo4_conf(self):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
+
+        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+            IP46 = IP
+            client_addr = self.pg0.remote_hosts[0].ip4
+            remote_addr = self.pg1.remote_hosts[nbr].ip4
+            src_nat_addr = self.pg2.remote_hosts[0].ip4
+
+            # ping from pods to outside network
+            p1 = (
+                Ether(dst=self.pg0.local_mac,
+                      src=self.pg0.remote_hosts[0].mac) /
+                IP46(src=client_addr, dst=remote_addr) /
+                ICMP(type=8, id=0xfeed) /
+                Raw())
+
+            rxs = self.send_and_expect(
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
+
+            for rx in rxs:
+                self.assertEqual(rx[IP46].src, src_nat_addr)
+                self.assert_packet_checksums_valid(rx)
+
+            received_id = rx[0][ICMP].id
+            # ping reply from outside to pods
+            p2 = (
+                Ether(dst=self.pg1.local_mac,
+                      src=self.pg1.remote_hosts[nbr].mac) /
+                IP46(src=remote_addr, dst=src_nat_addr) /
+                ICMP(type=0, id=received_id))
+            rxs = self.send_and_expect(
+                self.pg1,
+                p2 * N_PKTS,
+                self.pg0)
+
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].src, remote_addr)
+                self.assertEqual(rx[ICMP].id, 0xfeed)
+
+    def sourcenat_test_icmp_err_conf(self, isV6=False):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
+
+        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+            if isV6:
+                IP46 = IPv6
+                client_addr = self.pg0.remote_hosts[0].ip6
+                remote_addr = self.pg1.remote_hosts[nbr].ip6
+                src_nat_addr = self.pg2.remote_hosts[0].ip6
+                ICMP46 = ICMPv6DestUnreach
+                ICMPelem = ICMPv6DestUnreach(code=1)
+                IP46error = IPerror6
+            else:
+                IP46 = IP
+                client_addr = self.pg0.remote_hosts[0].ip4
+                remote_addr = self.pg1.remote_hosts[nbr].ip4
+                src_nat_addr = self.pg2.remote_hosts[0].ip4
+                IP46error = IPerror
+                ICMP46 = ICMP
+                ICMPelem = ICMP(type=11)
+
+            # from pods to outside network
+            p1 = (
+                Ether(dst=self.pg0.local_mac,
+                      src=self.pg0.remote_hosts[0].mac) /
+                IP46(src=client_addr, dst=remote_addr) /
+                TCP(sport=sports[nbr], dport=dports[nbr]) /
+                Raw())
+
+            rxs = self.send_and_expect(
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].dst, remote_addr)
+                self.assertEqual(rx[TCP].dport, dports[nbr])
+                self.assertEqual(rx[IP46].src, src_nat_addr)
+                sport = rx[TCP].sport
+
+            InnerIP = rxs[0][IP46]
+            # from outside to pods, ICMP error
+            p2 = (
+                Ether(dst=self.pg1.local_mac,
+                      src=self.pg1.remote_hosts[nbr].mac) /
+                IP46(src=remote_addr, dst=src_nat_addr) /
+                ICMPelem / InnerIP)
+
+            rxs = self.send_and_expect(
+                self.pg1,
+                p2 * N_PKTS,
+                self.pg0)
+
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].src, remote_addr)
+                self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+                self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPerror].sport, sports[nbr])
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPerror].dport, dports[nbr])
+
+    def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
 
         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
             if isV6:
+                IP46 = IPv6
                 client_addr = self.pg0.remote_hosts[0].ip6
                 remote_addr = self.pg1.remote_hosts[nbr].ip6
+                src_nat_addr = self.pg2.remote_hosts[0].ip6
+                exclude_prefix = ip_network(
+                    "%s/100" % remote_addr, strict=False)
             else:
+                IP46 = IP
                 client_addr = self.pg0.remote_hosts[0].ip4
                 remote_addr = self.pg1.remote_hosts[nbr].ip4
+                src_nat_addr = self.pg2.remote_hosts[0].ip4
+                exclude_prefix = ip_network(
+                    "%s/16" % remote_addr, strict=False)
             # from pods to outside network
             p1 = (
                 Ether(dst=self.pg0.local_mac,
@@ -619,22 +784,25 @@ class TestCNatSourceNAT(VppTestCase):
                 l4p(sport=sports[nbr], dport=dports[nbr]) /
                 Raw())
 
+            self.vapi.cli("trace add pg-input 1")
             rxs = self.send_and_expect(
                 self.pg0,
                 p1 * N_PKTS,
                 self.pg1)
+            self.logger.info(self.vapi.cli("show trace max 1"))
+
             for rx in rxs:
                 self.assert_packet_checksums_valid(rx)
                 self.assertEqual(rx[IP46].dst, remote_addr)
                 self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(rx[IP46].src, srcNatAddr)
+                self.assertEqual(rx[IP46].src, src_nat_addr)
                 sport = rx[l4p].sport
 
             # from outside to pods
             p2 = (
                 Ether(dst=self.pg1.local_mac,
                       src=self.pg1.remote_hosts[nbr].mac) /
-                IP46(src=remote_addr, dst=srcNatAddr) /
+                IP46(src=remote_addr, dst=src_nat_addr) /
                 l4p(sport=dports[nbr], dport=sport) /
                 Raw())
 
@@ -651,11 +819,8 @@ class TestCNatSourceNAT(VppTestCase):
                 self.assertEqual(rx[IP46].src, remote_addr)
 
             # add remote host to exclude list
-            subnet_mask = 100 if isV6 else 16
-            subnet = "%s/%d" % (remote_addr, subnet_mask)
-            exclude_subnet = ip_network(subnet, strict=False)
-
-            t1.cnat_exclude_subnet(exclude_subnet)
+            self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+                prefix=exclude_prefix, is_add=1)
             self.vapi.cnat_session_purge()
 
             rxs = self.send_and_expect(
@@ -669,7 +834,8 @@ class TestCNatSourceNAT(VppTestCase):
                 self.assertEqual(rx[IP46].src, client_addr)
 
             # remove remote host from exclude list
-            t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
+            self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+                prefix=exclude_prefix, is_add=0)
             self.vapi.cnat_session_purge()
 
             rxs = self.send_and_expect(
@@ -681,17 +847,128 @@ class TestCNatSourceNAT(VppTestCase):
                 self.assert_packet_checksums_valid(rx)
                 self.assertEqual(rx[IP46].dst, remote_addr)
                 self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(rx[IP46].src, srcNatAddr)
+                self.assertEqual(rx[IP46].src, src_nat_addr)
+
+            self.vapi.cnat_session_purge()
+
+
+class TestCNatDHCP(VppTestCase):
+    """ CNat Translation """
+    extra_vpp_punt_config = ["cnat", "{",
+                             "session-db-buckets", "64",
+                             "session-cleanup-timeout", "0.1",
+                             "session-max-age", "1",
+                             "tcp-max-age", "1",
+                             "scanner", "off", "}"]
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestCNatDHCP, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestCNatDHCP, cls).tearDownClass()
+
+    def tearDown(self):
+        for i in self.pg_interfaces:
+            i.admin_down()
+        super(TestCNatDHCP, self).tearDown()
+
+    def create_translation(self, vip_pg, *args, is_v6=False):
+        vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
+        paths = []
+        for (src_pg, dst_pg) in args:
+            paths.append(EpTuple(
+                Ep.from_pg(src_pg, is_v6=is_v6),
+                Ep.from_pg(dst_pg, is_v6=is_v6)
+            ))
+        t1 = VppCNatTranslation(self, TCP, vip, paths)
+        t1.add_vpp_config()
+        return t1
 
-    def test_cnat6_sourcenat(self):
-        # """ CNat Source Nat ipv6 """
-        self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
-        self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
+    def make_addr(self, sw_if_index, i, is_v6):
+        if is_v6:
+            return "fd01:%x::%u" % (sw_if_index, i + 1)
+        else:
+            return "172.16.%u.%u" % (sw_if_index, i)
 
-    def test_cnat4_sourcenat(self):
-        # """ CNat Source Nat ipv4 """
-        self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
-        self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
+    def make_prefix(self, sw_if_index, i, is_v6):
+        if is_v6:
+            return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
+        else:
+            return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
+
+    def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
+        qt1 = tr.query_vpp_config()
+        self.assertEqual(str(qt1.vip.addr), self.make_addr(
+            vip_pg.sw_if_index, i, is_v6))
+        for (src_pg, dst_pg), path in zip(args, qt1.paths):
+            if src_pg:
+                self.assertEqual(str(path.src_ep.addr), self.make_addr(
+                    src_pg.sw_if_index, i, is_v6))
+            if dst_pg:
+                self.assertEqual(str(path.dst_ep.addr), self.make_addr(
+                    dst_pg.sw_if_index, i, is_v6))
+
+    def config_ips(self, rng, is_add=1, is_v6=False):
+        for pg, i in product(self.pg_interfaces, rng):
+            self.vapi.sw_interface_add_del_address(
+                sw_if_index=pg.sw_if_index,
+                prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
+                is_add=is_add)
+
+    def test_dhcp_v4(self):
+        self.create_pg_interfaces(range(5))
+        for i in self.pg_interfaces:
+            i.admin_up()
+        pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+        t1 = self.create_translation(*pglist)
+        self.config_ips([0])
+        self.check_resolved(t1, *pglist)
+        self.config_ips([1])
+        self.config_ips([0], is_add=0)
+        self.check_resolved(t1, *pglist, i=1)
+        self.config_ips([1], is_add=0)
+        t1.remove_vpp_config()
+
+    def test_dhcp_v6(self):
+        self.create_pg_interfaces(range(5))
+        for i in self.pg_interfaces:
+            i.admin_up()
+        pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+        t1 = self.create_translation(*pglist, is_v6=True)
+        self.config_ips([0], is_v6=True)
+        self.check_resolved(t1, *pglist, is_v6=True)
+        self.config_ips([1], is_v6=True)
+        self.config_ips([0], is_add=0, is_v6=True)
+        self.check_resolved(t1, *pglist, i=1, is_v6=True)
+        self.config_ips([1], is_add=0, is_v6=True)
+        t1.remove_vpp_config()
+
+    def test_dhcp_snat(self):
+        self.create_pg_interfaces(range(1))
+        for i in self.pg_interfaces:
+            i.admin_up()
+        self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
+        self.config_ips([0], is_v6=False)
+        self.config_ips([0], is_v6=True)
+        r = self.vapi.cnat_get_snat_addresses()
+        self.assertEqual(str(r.snat_ip4), self.make_addr(
+            self.pg0.sw_if_index, 0, False))
+        self.assertEqual(str(r.snat_ip6), self.make_addr(
+            self.pg0.sw_if_index, 0, True))
+        self.config_ips([1], is_v6=False)
+        self.config_ips([1], is_v6=True)
+        self.config_ips([0], is_add=0, is_v6=False)
+        self.config_ips([0], is_add=0, is_v6=True)
+        r = self.vapi.cnat_get_snat_addresses()
+        self.assertEqual(str(r.snat_ip4), self.make_addr(
+            self.pg0.sw_if_index, 1, False))
+        self.assertEqual(str(r.snat_ip6), self.make_addr(
+            self.pg0.sw_if_index, 1, True))
+        self.config_ips([1], is_add=0, is_v6=False)
+        self.config_ips([1], is_add=0, is_v6=True)
+        self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
 
 
 if __name__ == '__main__':