cnat: Prepare extended snat policies
[vpp.git] / src / plugins / cnat / test / test_cnat.py
index 18e3baa..ff4c440 100644 (file)
@@ -3,12 +3,17 @@
 import unittest
 
 from framework import VppTestCase, VppTestRunner
-from vpp_ip import DpoProto
+from vpp_ip import DpoProto, INVALID_INDEX
+from itertools import product
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet import IP, UDP, TCP, ICMP
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+import struct
 
 from ipaddress import ip_address, ip_network, \
     IPv4Address, IPv6Address, IPv4Network, IPv6Network
@@ -19,25 +24,38 @@ from vpp_papi import VppEnum
 N_PKTS = 15
 
 
-def find_cnat_translation(test, id):
-    ts = test.vapi.cnat_translation_dump()
-    for t in ts:
-        if id == t.translation.id:
-            return True
-    return False
-
-
 class Ep(object):
     """ CNat endpoint """
 
-    def __init__(self, ip, port, l4p=TCP):
+    def __init__(self, ip=None, port=0, l4p=TCP,
+                 sw_if_index=INVALID_INDEX, is_v6=False):
         self.ip = ip
+        if ip is None:
+            self.ip = "::" if is_v6 else "0.0.0.0"
         self.port = port
         self.l4p = l4p
+        self.sw_if_index = sw_if_index
+        if is_v6:
+            self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
+        else:
+            self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
 
     def encode(self):
         return {'addr': self.ip,
-                'port': self.port}
+                'port': self.port,
+                'sw_if_index': self.sw_if_index,
+                'if_af': self.if_af}
+
+    @classmethod
+    def from_pg(cls, pg, is_v6=False):
+        if pg is None:
+            return cls(is_v6=is_v6)
+        else:
+            return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
+
+    @property
+    def isV6(self):
+        return ":" in self.ip
 
     def __str__(self):
         return ("%s:%d" % (self.ip, self.port))
@@ -69,6 +87,9 @@ class VppCNatTranslation(VppObject):
         for path in self.paths:
             self.encoded_paths.append(path.encode())
 
+    def __str__(self):
+        return ("%s %s %s" % (self.vip, self.iproto, self.paths))
+
     @property
     def vl4_proto(self):
         ip_proto = VppEnum.vl_api_ip_proto_t
@@ -77,9 +98,6 @@ class VppCNatTranslation(VppObject):
             TCP: ip_proto.IP_API_PROTO_TCP,
         }[self.iproto]
 
-    def delete(self):
-        r = self._test.vapi.cnat_translation_del(id=self.id)
-
     def add_vpp_config(self):
         r = self._test.vapi.cnat_translation_update(
             {'vip': self.vip.encode(),
@@ -103,10 +121,13 @@ class VppCNatTranslation(VppObject):
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
-        self._test.vapi.cnat_translation_del(self.id)
+        self._test.vapi.cnat_translation_del(id=self.id)
 
     def query_vpp_config(self):
-        return find_cnat_translation(self._test, self.id)
+        for t in self._test.vapi.cnat_translation_dump():
+            if self.id == t.translation.id:
+                return t.translation
+        return None
 
     def object_id(self):
         return ("cnat-translation-%s" % (self.vip))
@@ -116,39 +137,14 @@ class VppCNatTranslation(VppObject):
         return c[0][self.id]
 
 
-class VppCNATSourceNat(VppObject):
-
-    def __init__(self, test, address, exclude_subnets=[]):
-        self._test = test
-        self.address = address
-        self.exclude_subnets = exclude_subnets
-
-    def add_vpp_config(self):
-        a = ip_address(self.address)
-        if 4 == a.version:
-            self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
-        else:
-            self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
-        for subnet in self.exclude_subnets:
-            self.cnat_exclude_subnet(subnet, True)
-
-    def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
-        add = 1 if isAdd else 0
-        self._test.vapi.cnat_add_del_snat_prefix(
-                prefix=exclude_subnet, is_add=add)
-
-    def query_vpp_config(self):
-        return False
-
-    def remove_vpp_config(self):
-        return False
-
-
 class TestCNatTranslation(VppTestCase):
     """ CNat Translation """
     extra_vpp_punt_config = ["cnat", "{",
+                             "session-db-buckets", "64",
+                             "session-cleanup-timeout", "0.1",
                              "session-max-age", "1",
-                             "tcp-max-age", "1", "}"]
+                             "tcp-max-age", "1",
+                             "scanner", "off", "}"]
 
     @classmethod
     def setUpClass(cls):
@@ -177,10 +173,10 @@ class TestCNatTranslation(VppTestCase):
             i.admin_down()
         super(TestCNatTranslation, self).tearDown()
 
-    def cnat_create_translation(self, vip, nbr, isV6=False):
-        ip_v = "ip6" if isV6 else "ip4"
+    def cnat_create_translation(self, vip, nbr):
+        ip_v = "ip6" if vip.isV6 else "ip4"
         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
-        sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
+        sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
         t1 = VppCNatTranslation(
             self, vip.l4p, vip,
             [EpTuple(sep, dep), EpTuple(sep, dep)])
@@ -208,6 +204,7 @@ class TestCNatTranslation(VppTestCase):
                 rxs = self.send_and_expect(self.pg0,
                                            p1 * N_PKTS,
                                            self.pg1)
+                self.logger.info(self.vapi.cli("show trace max 1"))
 
                 for rx in rxs:
                     self.assert_packet_checksums_valid(rx)
@@ -274,11 +271,6 @@ class TestCNatTranslation(VppTestCase):
                                            p1 * N_PKTS,
                                            self.pg0)
 
-        self.assertEqual(t1.get_stats()['packets'],
-                         N_PKTS *
-                         len(sports) *
-                         len(self.pg0.remote_hosts))
-
     def cnat_test_translation_update(self, t1, sports, isV6=False):
         ip_v = "ip6" if isV6 else "ip4"
         ip_class = IPv6 if isV6 else IP
@@ -338,7 +330,7 @@ class TestCNatTranslation(VppTestCase):
 
         trs = []
         for nbr, vip in enumerate(vips):
-            trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
+            trs.append(self.cnat_create_translation(vip, nbr))
 
         self.logger.info(self.vapi.cli("sh cnat client"))
         self.logger.info(self.vapi.cli("sh cnat translation"))
@@ -358,7 +350,7 @@ class TestCNatTranslation(VppTestCase):
             self.logger.info(self.vapi.cli("sh cnat session verbose"))
 
         #
-        # turn the scanner back on and wait untill the sessions
+        # turn the scanner back on and wait until the sessions
         # all disapper
         #
         self.vapi.cli("test cnat scanner on")
@@ -369,8 +361,10 @@ class TestCNatTranslation(VppTestCase):
             n_tries += 1
             sessions = self.vapi.cnat_session_dump()
             self.sleep(2)
+            self.logger.info(self.vapi.cli("show cnat session verbose"))
 
         self.assertTrue(n_tries < 100)
+        self.vapi.cli("test cnat scanner off")
 
         #
         # load some flows again and purge
@@ -389,12 +383,115 @@ class TestCNatTranslation(VppTestCase):
                                          self.pg2)
 
         for tr in trs:
-            tr.delete()
+            tr.remove_vpp_config()
 
         self.assertTrue(self.vapi.cnat_session_dump())
         self.vapi.cnat_session_purge()
         self.assertFalse(self.vapi.cnat_session_dump())
 
+    def test_icmp(self):
+        vips = [
+            Ep("30.0.0.1", 5555),
+            Ep("30.0.0.2", 5554),
+            Ep("30.0.0.2", 5553, UDP),
+            Ep("30::1", 6666),
+            Ep("30::2", 5553, UDP),
+        ]
+        sport = 1234
+
+        self.pg0.generate_remote_hosts(len(vips))
+        self.pg0.configure_ipv6_neighbors()
+        self.pg0.configure_ipv4_neighbors()
+
+        self.pg1.generate_remote_hosts(len(vips))
+        self.pg1.configure_ipv6_neighbors()
+        self.pg1.configure_ipv4_neighbors()
+
+        self.vapi.cli("test cnat scanner off")
+        trs = []
+        for nbr, vip in enumerate(vips):
+            trs.append(self.cnat_create_translation(vip, nbr))
+
+        self.logger.info(self.vapi.cli("sh cnat client"))
+        self.logger.info(self.vapi.cli("sh cnat translation"))
+
+        for nbr, vip in enumerate(vips):
+            if vip.isV6:
+                client_addr = self.pg0.remote_hosts[0].ip6
+                remote_addr = self.pg1.remote_hosts[nbr].ip6
+                remote2_addr = self.pg2.remote_hosts[0].ip6
+            else:
+                client_addr = self.pg0.remote_hosts[0].ip4
+                remote_addr = self.pg1.remote_hosts[nbr].ip4
+                remote2_addr = self.pg2.remote_hosts[0].ip4
+            IP46 = IPv6 if vip.isV6 else IP
+            # from client to vip
+            p1 = (Ether(dst=self.pg0.local_mac,
+                        src=self.pg0.remote_hosts[0].mac) /
+                  IP46(src=client_addr, dst=vip.ip) /
+                  vip.l4p(sport=sport, dport=vip.port) /
+                  Raw())
+
+            rxs = self.send_and_expect(self.pg0,
+                                       p1 * N_PKTS,
+                                       self.pg1)
+
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].dst, remote_addr)
+                self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
+                self.assertEqual(rx[IP46].src, client_addr)
+                self.assertEqual(rx[vip.l4p].sport, sport)
+
+            InnerIP = rxs[0][IP46]
+
+            ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
+            ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
+            # from vip to client, ICMP error
+            p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+                  IP46(src=remote_addr, dst=client_addr) /
+                  ICMPelem / InnerIP)
+
+            rxs = self.send_and_expect(self.pg1,
+                                       p1 * N_PKTS,
+                                       self.pg0)
+
+            TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+            IP46error = IPerror6 if vip.isV6 else IPerror
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].src, vip.ip)
+                self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+                self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPUDPError].sport, sport)
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPUDPError].dport, vip.port)
+
+            # from other remote to client, ICMP error
+            # outside shouldn't be NAT-ed
+            p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+                  IP46(src=remote2_addr, dst=client_addr) /
+                  ICMPelem / InnerIP)
+
+            rxs = self.send_and_expect(self.pg1,
+                                       p1 * N_PKTS,
+                                       self.pg0)
+
+            TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+            IP46error = IPerror6 if vip.isV6 else IPerror
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].src, remote2_addr)
+                self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+                self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPUDPError].sport, sport)
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPUDPError].dport, vip.port)
+
+        self.vapi.cnat_session_purge()
+
     def test_cnat6(self):
         # """ CNat Translation ipv6 """
         vips = [
@@ -430,8 +527,10 @@ class TestCNatTranslation(VppTestCase):
 class TestCNatSourceNAT(VppTestCase):
     """ CNat Source NAT """
     extra_vpp_punt_config = ["cnat", "{",
+                             "session-cleanup-timeout", "0.1",
                              "session-max-age", "1",
-                             "tcp-max-age", "1", "}"]
+                             "tcp-max-age", "1",
+                             "scanner", "off", "}"]
 
     @classmethod
     def setUpClass(cls):
@@ -453,144 +552,424 @@ class TestCNatSourceNAT(VppTestCase):
             i.config_ip6()
             i.resolve_ndp()
 
+        self.pg0.configure_ipv6_neighbors()
+        self.pg0.configure_ipv4_neighbors()
+        self.pg1.generate_remote_hosts(2)
+        self.pg1.configure_ipv4_neighbors()
+        self.pg1.configure_ipv6_neighbors()
+
+        self.vapi.cnat_set_snat_addresses(
+            snat_ip4=self.pg2.remote_hosts[0].ip4,
+            snat_ip6=self.pg2.remote_hosts[0].ip6,
+            sw_if_index=INVALID_INDEX)
+        self.vapi.feature_enable_disable(
+            enable=1,
+            arc_name="ip6-unicast",
+            feature_name="cnat-snat-ip6",
+            sw_if_index=self.pg0.sw_if_index)
+        self.vapi.feature_enable_disable(
+            enable=1,
+            arc_name="ip4-unicast",
+            feature_name="cnat-snat-ip4",
+            sw_if_index=self.pg0.sw_if_index)
+
+        policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
+        self.vapi.cnat_set_snat_policy(
+            policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
+        for i in self.pg_interfaces:
+            self.vapi.cnat_snat_policy_add_del_if(
+                sw_if_index=i.sw_if_index, is_add=1,
+                table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
+            self.vapi.cnat_snat_policy_add_del_if(
+                sw_if_index=i.sw_if_index, is_add=1,
+                table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
+
     def tearDown(self):
+        self.vapi.cnat_session_purge()
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
             i.admin_down()
         super(TestCNatSourceNAT, self).tearDown()
 
-    def cnat_create_translation(self, srcNatAddr, interface, isV6=False):
-        t1 = VppCNATSourceNat(self, srcNatAddr)
-        t1.add_vpp_config()
-        cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
-        cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
-        self.vapi.feature_enable_disable(
-            enable=1,
-            arc_name=cnat_arc_name,
-            feature_name=cnat_feature_name,
-            sw_if_index=interface.sw_if_index)
+    def test_snat_v6(self):
+        # """ CNat Source Nat v6 """
+        self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
+        self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
+        self.sourcenat_test_icmp_err_conf(isV6=True)
+        self.sourcenat_test_icmp_echo6_conf()
 
-        return t1
+    def test_snat_v4(self):
+        # """ CNat Source Nat v4 """
+        self.sourcenat_test_tcp_udp_conf(TCP)
+        self.sourcenat_test_tcp_udp_conf(UDP)
+        self.sourcenat_test_icmp_err_conf()
+        self.sourcenat_test_icmp_echo4_conf()
 
-    def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
-        ip_v = "ip6" if isV6 else "ip4"
-        ip_class = IPv6 if isV6 else IP
-        sports = [1234, 1235, 1236]
-        dports = [6661, 6662, 6663]
+    def sourcenat_test_icmp_echo6_conf(self):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
 
-        self.pg0.generate_remote_hosts(1)
-        self.pg0.configure_ipv4_neighbors()
-        self.pg0.configure_ipv6_neighbors()
-        self.pg1.generate_remote_hosts(len(sports))
-        self.pg1.configure_ipv4_neighbors()
-        self.pg1.configure_ipv6_neighbors()
+        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+            client_addr = self.pg0.remote_hosts[0].ip6
+            remote_addr = self.pg1.remote_hosts[nbr].ip6
+            src_nat_addr = self.pg2.remote_hosts[0].ip6
 
-        self.vapi.cli("test cnat scanner on")
-        t1 = self.cnat_create_translation(srcNatAddr, self.pg0)
+            # ping from pods to outside network
+            p1 = (
+                Ether(dst=self.pg0.local_mac,
+                      src=self.pg0.remote_hosts[0].mac) /
+                IPv6(src=client_addr, dst=remote_addr) /
+                ICMPv6EchoRequest(id=0xfeed) /
+                Raw())
+
+            rxs = self.send_and_expect(
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
+
+            for rx in rxs:
+                self.assertEqual(rx[IPv6].src, src_nat_addr)
+                self.assert_packet_checksums_valid(rx)
+
+            received_id = rx[0][ICMPv6EchoRequest].id
+            # ping reply from outside to pods
+            p2 = (
+                Ether(dst=self.pg1.local_mac,
+                      src=self.pg1.remote_hosts[nbr].mac) /
+                IPv6(src=remote_addr, dst=src_nat_addr) /
+                ICMPv6EchoReply(id=received_id))
+            rxs = self.send_and_expect(
+                self.pg1,
+                p2 * N_PKTS,
+                self.pg0)
+
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IPv6].src, remote_addr)
+                self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
+
+    def sourcenat_test_icmp_echo4_conf(self):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
+
+        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+            IP46 = IP
+            client_addr = self.pg0.remote_hosts[0].ip4
+            remote_addr = self.pg1.remote_hosts[nbr].ip4
+            src_nat_addr = self.pg2.remote_hosts[0].ip4
+
+            # ping from pods to outside network
+            p1 = (
+                Ether(dst=self.pg0.local_mac,
+                      src=self.pg0.remote_hosts[0].mac) /
+                IP46(src=client_addr, dst=remote_addr) /
+                ICMP(type=8, id=0xfeed) /
+                Raw())
+
+            rxs = self.send_and_expect(
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
+
+            for rx in rxs:
+                self.assertEqual(rx[IP46].src, src_nat_addr)
+                self.assert_packet_checksums_valid(rx)
+
+            received_id = rx[0][ICMP].id
+            # ping reply from outside to pods
+            p2 = (
+                Ether(dst=self.pg1.local_mac,
+                      src=self.pg1.remote_hosts[nbr].mac) /
+                IP46(src=remote_addr, dst=src_nat_addr) /
+                ICMP(type=0, id=received_id))
+            rxs = self.send_and_expect(
+                self.pg1,
+                p2 * N_PKTS,
+                self.pg0)
+
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].src, remote_addr)
+                self.assertEqual(rx[ICMP].id, 0xfeed)
+
+    def sourcenat_test_icmp_err_conf(self, isV6=False):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
 
         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+            if isV6:
+                IP46 = IPv6
+                client_addr = self.pg0.remote_hosts[0].ip6
+                remote_addr = self.pg1.remote_hosts[nbr].ip6
+                src_nat_addr = self.pg2.remote_hosts[0].ip6
+                ICMP46 = ICMPv6DestUnreach
+                ICMPelem = ICMPv6DestUnreach(code=1)
+                IP46error = IPerror6
+            else:
+                IP46 = IP
+                client_addr = self.pg0.remote_hosts[0].ip4
+                remote_addr = self.pg1.remote_hosts[nbr].ip4
+                src_nat_addr = self.pg2.remote_hosts[0].ip4
+                IP46error = IPerror
+                ICMP46 = ICMP
+                ICMPelem = ICMP(type=11)
+
             # from pods to outside network
             p1 = (
-                Ether(
-                    dst=self.pg0.local_mac,
-                    src=self.pg0.remote_hosts[0].mac) /
-                ip_class(
-                    src=getattr(self.pg0.remote_hosts[0], ip_v),
-                    dst=getattr(remote_host, ip_v)) /
+                Ether(dst=self.pg0.local_mac,
+                      src=self.pg0.remote_hosts[0].mac) /
+                IP46(src=client_addr, dst=remote_addr) /
+                TCP(sport=sports[nbr], dport=dports[nbr]) /
+                Raw())
+
+            rxs = self.send_and_expect(
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].dst, remote_addr)
+                self.assertEqual(rx[TCP].dport, dports[nbr])
+                self.assertEqual(rx[IP46].src, src_nat_addr)
+                sport = rx[TCP].sport
+
+            InnerIP = rxs[0][IP46]
+            # from outside to pods, ICMP error
+            p2 = (
+                Ether(dst=self.pg1.local_mac,
+                      src=self.pg1.remote_hosts[nbr].mac) /
+                IP46(src=remote_addr, dst=src_nat_addr) /
+                ICMPelem / InnerIP)
+
+            rxs = self.send_and_expect(
+                self.pg1,
+                p2 * N_PKTS,
+                self.pg0)
+
+            for rx in rxs:
+                self.assert_packet_checksums_valid(rx)
+                self.assertEqual(rx[IP46].src, remote_addr)
+                self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+                self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPerror].sport, sports[nbr])
+                self.assertEqual(rx[ICMP46][IP46error]
+                                 [TCPerror].dport, dports[nbr])
+
+    def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
+        sports = [1234, 1235]
+        dports = [6661, 6662]
+
+        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+            if isV6:
+                IP46 = IPv6
+                client_addr = self.pg0.remote_hosts[0].ip6
+                remote_addr = self.pg1.remote_hosts[nbr].ip6
+                src_nat_addr = self.pg2.remote_hosts[0].ip6
+                exclude_prefix = ip_network(
+                    "%s/100" % remote_addr, strict=False)
+            else:
+                IP46 = IP
+                client_addr = self.pg0.remote_hosts[0].ip4
+                remote_addr = self.pg1.remote_hosts[nbr].ip4
+                src_nat_addr = self.pg2.remote_hosts[0].ip4
+                exclude_prefix = ip_network(
+                    "%s/16" % remote_addr, strict=False)
+            # from pods to outside network
+            p1 = (
+                Ether(dst=self.pg0.local_mac,
+                      src=self.pg0.remote_hosts[0].mac) /
+                IP46(src=client_addr, dst=remote_addr) /
                 l4p(sport=sports[nbr], dport=dports[nbr]) /
                 Raw())
 
+            self.vapi.cli("trace add pg-input 1")
             rxs = self.send_and_expect(
-                                self.pg0,
-                                p1 * N_PKTS,
-                                self.pg1)
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
+            self.logger.info(self.vapi.cli("show trace max 1"))
+
             for rx in rxs:
                 self.assert_packet_checksums_valid(rx)
-                self.assertEqual(
-                    rx[ip_class].dst,
-                    getattr(remote_host, ip_v))
+                self.assertEqual(rx[IP46].dst, remote_addr)
                 self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(
-                    rx[ip_class].src,
-                    srcNatAddr)
+                self.assertEqual(rx[IP46].src, src_nat_addr)
                 sport = rx[l4p].sport
 
             # from outside to pods
             p2 = (
-                Ether(
-                    dst=self.pg1.local_mac,
-                    src=self.pg1.remote_hosts[nbr].mac) /
-                ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
+                Ether(dst=self.pg1.local_mac,
+                      src=self.pg1.remote_hosts[nbr].mac) /
+                IP46(src=remote_addr, dst=src_nat_addr) /
                 l4p(sport=dports[nbr], dport=sport) /
                 Raw())
 
             rxs = self.send_and_expect(
-                                    self.pg1,
-                                    p2 * N_PKTS,
-                                    self.pg0)
+                self.pg1,
+                p2 * N_PKTS,
+                self.pg0)
 
             for rx in rxs:
                 self.assert_packet_checksums_valid(rx)
-                self.assertEqual(
-                    rx[ip_class].dst,
-                    getattr(self.pg0.remote_hosts[0], ip_v))
+                self.assertEqual(rx[IP46].dst, client_addr)
                 self.assertEqual(rx[l4p].dport, sports[nbr])
                 self.assertEqual(rx[l4p].sport, dports[nbr])
-                self.assertEqual(
-                    rx[ip_class].src,
-                    getattr(remote_host, ip_v))
+                self.assertEqual(rx[IP46].src, remote_addr)
 
             # add remote host to exclude list
-            subnet_mask = 100 if isV6 else 16
-            subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
-            exclude_subnet = ip_network(subnet, strict=False)
-
-            t1.cnat_exclude_subnet(exclude_subnet)
+            self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+                prefix=exclude_prefix, is_add=1)
             self.vapi.cnat_session_purge()
 
             rxs = self.send_and_expect(
-                                self.pg0,
-                                p1 * N_PKTS,
-                                self.pg1)
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
             for rx in rxs:
                 self.assert_packet_checksums_valid(rx)
-                self.assertEqual(
-                    rx[ip_class].dst,
-                    getattr(remote_host, ip_v))
+                self.assertEqual(rx[IP46].dst, remote_addr)
                 self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(
-                    rx[ip_class].src,
-                    getattr(self.pg0.remote_hosts[0], ip_v))
+                self.assertEqual(rx[IP46].src, client_addr)
 
             # remove remote host from exclude list
-            t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
+            self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+                prefix=exclude_prefix, is_add=0)
             self.vapi.cnat_session_purge()
 
             rxs = self.send_and_expect(
-                    self.pg0,
-                    p1 * N_PKTS,
-                    self.pg1)
+                self.pg0,
+                p1 * N_PKTS,
+                self.pg1)
 
             for rx in rxs:
                 self.assert_packet_checksums_valid(rx)
-                self.assertEqual(
-                    rx[ip_class].dst,
-                    getattr(remote_host, ip_v))
+                self.assertEqual(rx[IP46].dst, remote_addr)
                 self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(
-                    rx[ip_class].src,
-                    srcNatAddr)
-
-    # def test_cnat6_sourcenat(self):
-    #     # """ CNat Source Nat ipv6 """
-    #     self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
-    #     self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
-
-    def test_cnat4_sourcenat(self):
-        # """ CNat Source Nat ipv4 """
-        self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
-        self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
+                self.assertEqual(rx[IP46].src, src_nat_addr)
+
+            self.vapi.cnat_session_purge()
+
+
+class TestCNatDHCP(VppTestCase):
+    """ CNat Translation """
+    extra_vpp_punt_config = ["cnat", "{",
+                             "session-db-buckets", "64",
+                             "session-cleanup-timeout", "0.1",
+                             "session-max-age", "1",
+                             "tcp-max-age", "1",
+                             "scanner", "off", "}"]
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestCNatDHCP, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestCNatDHCP, cls).tearDownClass()
+
+    def tearDown(self):
+        for i in self.pg_interfaces:
+            i.admin_down()
+        super(TestCNatDHCP, self).tearDown()
+
+    def create_translation(self, vip_pg, *args, is_v6=False):
+        vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
+        paths = []
+        for (src_pg, dst_pg) in args:
+            paths.append(EpTuple(
+                Ep.from_pg(src_pg, is_v6=is_v6),
+                Ep.from_pg(dst_pg, is_v6=is_v6)
+            ))
+        t1 = VppCNatTranslation(self, TCP, vip, paths)
+        t1.add_vpp_config()
+        return t1
+
+    def make_addr(self, sw_if_index, i, is_v6):
+        if is_v6:
+            return "fd01:%x::%u" % (sw_if_index, i + 1)
+        else:
+            return "172.16.%u.%u" % (sw_if_index, i)
+
+    def make_prefix(self, sw_if_index, i, is_v6):
+        if is_v6:
+            return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
+        else:
+            return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
+
+    def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
+        qt1 = tr.query_vpp_config()
+        self.assertEqual(str(qt1.vip.addr), self.make_addr(
+            vip_pg.sw_if_index, i, is_v6))
+        for (src_pg, dst_pg), path in zip(args, qt1.paths):
+            if src_pg:
+                self.assertEqual(str(path.src_ep.addr), self.make_addr(
+                    src_pg.sw_if_index, i, is_v6))
+            if dst_pg:
+                self.assertEqual(str(path.dst_ep.addr), self.make_addr(
+                    dst_pg.sw_if_index, i, is_v6))
+
+    def config_ips(self, rng, is_add=1, is_v6=False):
+        for pg, i in product(self.pg_interfaces, rng):
+            self.vapi.sw_interface_add_del_address(
+                sw_if_index=pg.sw_if_index,
+                prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
+                is_add=is_add)
+
+    def test_dhcp_v4(self):
+        self.create_pg_interfaces(range(5))
+        for i in self.pg_interfaces:
+            i.admin_up()
+        pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+        t1 = self.create_translation(*pglist)
+        self.config_ips([0])
+        self.check_resolved(t1, *pglist)
+        self.config_ips([1])
+        self.config_ips([0], is_add=0)
+        self.check_resolved(t1, *pglist, i=1)
+        self.config_ips([1], is_add=0)
+        t1.remove_vpp_config()
+
+    def test_dhcp_v6(self):
+        self.create_pg_interfaces(range(5))
+        for i in self.pg_interfaces:
+            i.admin_up()
+        pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+        t1 = self.create_translation(*pglist, is_v6=True)
+        self.config_ips([0], is_v6=True)
+        self.check_resolved(t1, *pglist, is_v6=True)
+        self.config_ips([1], is_v6=True)
+        self.config_ips([0], is_add=0, is_v6=True)
+        self.check_resolved(t1, *pglist, i=1, is_v6=True)
+        self.config_ips([1], is_add=0, is_v6=True)
+        t1.remove_vpp_config()
+
+    def test_dhcp_snat(self):
+        self.create_pg_interfaces(range(1))
+        for i in self.pg_interfaces:
+            i.admin_up()
+        self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
+        self.config_ips([0], is_v6=False)
+        self.config_ips([0], is_v6=True)
+        r = self.vapi.cnat_get_snat_addresses()
+        self.assertEqual(str(r.snat_ip4), self.make_addr(
+            self.pg0.sw_if_index, 0, False))
+        self.assertEqual(str(r.snat_ip6), self.make_addr(
+            self.pg0.sw_if_index, 0, True))
+        self.config_ips([1], is_v6=False)
+        self.config_ips([1], is_v6=True)
+        self.config_ips([0], is_add=0, is_v6=False)
+        self.config_ips([0], is_add=0, is_v6=True)
+        r = self.vapi.cnat_get_snat_addresses()
+        self.assertEqual(str(r.snat_ip4), self.make_addr(
+            self.pg0.sw_if_index, 1, False))
+        self.assertEqual(str(r.snat_ip6), self.make_addr(
+            self.pg0.sw_if_index, 1, True))
+        self.config_ips([1], is_add=0, is_v6=False)
+        self.config_ips([1], is_add=0, is_v6=True)
+        self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)