cnat: improve tests 74/34174/3
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>
Wed, 20 Oct 2021 15:41:07 +0000 (17:41 +0200)
committerBeno�t Ganne <bganne@cisco.com>
Thu, 21 Oct 2021 14:29:57 +0000 (14:29 +0000)
Type: improvement

This is an attempt to make the cnat tests more
readable by hiding the scapy packet generation
under a common context concept.

This in order to add tests covering a wider range
of scenarios. As of now, only VIP, snat & DHCP
being covered

Change-Id: Ia84868984506bbb652fe974e9a6f54f7a3cc0bd9
Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
test/test_cnat.py

index ff4c440..198dd73 100644 (file)
@@ -22,106 +22,113 @@ from vpp_object import VppObject
 from vpp_papi import VppEnum
 
 N_PKTS = 15
+N_REMOTE_HOSTS = 3
 
+SRC = 0
+DST = 1
 
-class Ep(object):
-    """ CNat endpoint """
 
-    def __init__(self, ip=None, port=0, l4p=TCP,
-                 sw_if_index=INVALID_INDEX, is_v6=False):
-        self.ip = ip
-        if ip is None:
-            self.ip = "::" if is_v6 else "0.0.0.0"
-        self.port = port
-        self.l4p = l4p
-        self.sw_if_index = sw_if_index
-        if is_v6:
-            self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
-        else:
-            self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
+class CnatCommonTestCase(VppTestCase):
+    """ CNat common test class """
 
-    def encode(self):
-        return {'addr': self.ip,
-                'port': self.port,
-                'sw_if_index': self.sw_if_index,
-                'if_af': self.if_af}
+    #
+    # turn the scanner off whilst testing otherwise sessions
+    # will time out
+    #
+    extra_vpp_punt_config = ["cnat", "{",
+                             "session-db-buckets", "64",
+                             "session-cleanup-timeout", "0.1",
+                             "session-max-age", "1",
+                             "tcp-max-age", "1",
+                             "scanner", "off", "}"]
 
     @classmethod
-    def from_pg(cls, pg, is_v6=False):
-        if pg is None:
-            return cls(is_v6=is_v6)
-        else:
-            return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
-
-    @property
-    def isV6(self):
-        return ":" in self.ip
+    def setUpClass(cls):
+        super(CnatCommonTestCase, cls).setUpClass()
 
-    def __str__(self):
-        return ("%s:%d" % (self.ip, self.port))
+    @classmethod
+    def tearDownClass(cls):
+        super(CnatCommonTestCase, cls).tearDownClass()
 
 
-class EpTuple(object):
+class Endpoint(object):
     """ CNat endpoint """
 
-    def __init__(self, src, dst):
-        self.src = src
-        self.dst = dst
+    def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
+        self.port = port
+        self.is_v6 = is_v6
+        self.sw_if_index = INVALID_INDEX
+        if pg is not None and pgi is not None:
+            # pg interface specified and remote index
+            self.ip = self.get_ip46(pg.remote_hosts[pgi])
+        elif pg is not None:
+            self.ip = None
+            self.sw_if_index = pg.sw_if_index
+        elif ip is not None:
+            self.ip = ip
+        else:
+            self.ip = "::" if self.is_v6 else "0.0.0.0"
+
+    def get_ip46(self, obj):
+        if self.is_v6:
+            return obj.ip6
+        return obj.ip4
+
+    def udpate(self, **kwargs):
+        self.__init__(**kwargs)
+
+    def _vpp_if_af(self):
+        if self.is_v6:
+            return VppEnum.vl_api_address_family_t.ADDRESS_IP6
+        return VppEnum.vl_api_address_family_t.ADDRESS_IP4
 
     def encode(self):
-        return {'src_ep': self.src.encode(),
-                'dst_ep': self.dst.encode()}
+        return {'addr': self.ip,
+                'port': self.port,
+                'sw_if_index': self.sw_if_index,
+                'if_af': self._vpp_if_af()}
 
     def __str__(self):
-        return ("%s->%s" % (self.src, self.dst))
+        return ("%s:%d" % (self.ip, self.port))
 
 
-class VppCNatTranslation(VppObject):
+class Translation(VppObject):
 
     def __init__(self, test, iproto, vip, paths):
         self._test = test
         self.vip = vip
         self.iproto = iproto
         self.paths = paths
-        self.encoded_paths = []
-        for path in self.paths:
-            self.encoded_paths.append(path.encode())
+        self.id = None
 
     def __str__(self):
         return ("%s %s %s" % (self.vip, self.iproto, self.paths))
 
-    @property
-    def vl4_proto(self):
+    def _vl4_proto(self):
         ip_proto = VppEnum.vl_api_ip_proto_t
         return {
             UDP: ip_proto.IP_API_PROTO_UDP,
             TCP: ip_proto.IP_API_PROTO_TCP,
         }[self.iproto]
 
+    def _encoded_paths(self):
+        return [{'src_ep': src.encode(),
+                 'dst_ep': dst.encode()} for (src, dst) in self.paths]
+
     def add_vpp_config(self):
         r = self._test.vapi.cnat_translation_update(
             {'vip': self.vip.encode(),
-             'ip_proto': self.vl4_proto,
+             'ip_proto': self._vl4_proto(),
              'n_paths': len(self.paths),
-             'paths': self.encoded_paths})
+             'paths': self._encoded_paths()})
         self._test.registry.register(self, self._test.logger)
         self.id = r.id
-
-    def modify_vpp_config(self, paths):
-        self.paths = paths
-        self.encoded_paths = []
-        for path in self.paths:
-            self.encoded_paths.append(path.encode())
-
-        r = self._test.vapi.cnat_translation_update(
-            {'vip': self.vip.encode(),
-             'ip_proto': self.vl4_proto,
-             'n_paths': len(self.paths),
-             'paths': self.encoded_paths})
-        self._test.registry.register(self, self._test.logger)
+        return self
 
     def remove_vpp_config(self):
+        assert(self.id is not None)
         self._test.vapi.cnat_translation_del(id=self.id)
+        return self
 
     def query_vpp_config(self):
         for t in self._test.vapi.cnat_translation_dump():
@@ -129,22 +136,190 @@ class VppCNatTranslation(VppObject):
                 return t.translation
         return None
 
-    def object_id(self):
-        return ("cnat-translation-%s" % (self.vip))
 
-    def get_stats(self):
-        c = self._test.statistics.get_counter("/net/cnat-translation")
-        return c[0][self.id]
+class CnatTestContext(object):
+    """
+    Usage :
 
+    ctx = CnatTestContext(self, TCP, is_v6=True)
 
-class TestCNatTranslation(VppTestCase):
+    # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
+    ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+
+    # We expect this to be NATed as
+    # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
+    ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+
+    # After running cnat_expect, we can send back the received packet
+    # and expect it be 'unnated' so that we get the original packet
+    ctx.cnat_send_return().cnat_expect_return()
+
+    # same thing for ICMP errors
+    ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+    """
+
+    def __init__(self, test, L4PROTO, is_v6):
+        self.L4PROTO = L4PROTO
+        self.is_v6 = is_v6
+        self._test = test
+
+    def get_ip46(self, obj):
+        if self.is_v6:
+            return obj.ip6
+        return obj.ip4
+
+    @property
+    def IP46(self):
+        return IPv6 if self.is_v6 else IP
+
+    def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port,
+                  no_replies=False):
+        if isinstance(src_id, int):
+            self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
+        else:
+            self.dst_addr = src_id
+        if isinstance(dst_id, int):
+            self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
+        else:
+            self.dst_addr = dst_id
+        self.src_port = src_port  # also ICMP id
+        self.dst_port = dst_port  # also ICMP type
+
+        if self.L4PROTO in [TCP, UDP]:
+            l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
+        elif self.L4PROTO in [ICMP] and not self.is_v6:
+            l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
+        elif self.L4PROTO in [ICMP] and self.is_v6:
+            l4 = ICMPv6EchoRequest(id=self.src_port)
+        p1 = (Ether(src=src_pg.remote_mac,
+                    dst=src_pg.local_mac) /
+              self.IP46(src=self.src_addr, dst=self.dst_addr) /
+              l4 /
+              Raw())
+
+        if no_replies:
+            self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
+        else:
+            self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
+        self.expected_src_pg = src_pg
+        self.expected_dst_pg = dst_pg
+        return self
+
+    def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
+        if isinstance(src_id, int):
+            self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
+        else:
+            self.expect_src_addr = src_id
+        if isinstance(dst_id, int):
+            self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
+        else:
+            self.expect_dst_addr = dst_id
+        self.expect_src_port = src_port
+        self.expect_dst_port = dst_port
+
+        if self.expect_src_port is None:
+            if self.L4PROTO in [TCP, UDP]:
+                self.expect_src_port = self.rxs[0][self.L4PROTO].sport
+            elif self.L4PROTO in [ICMP] and not self.is_v6:
+                self.expect_src_port = self.rxs[0][self.L4PROTO].id
+            elif self.L4PROTO in [ICMP] and self.is_v6:
+                self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
+
+        for rx in self.rxs:
+            self._test.assert_packet_checksums_valid(rx)
+            self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
+            self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
+            if self.L4PROTO in [TCP, UDP]:
+                self._test.assertEqual(
+                    rx[self.L4PROTO].dport, self.expect_dst_port)
+                self._test.assertEqual(
+                    rx[self.L4PROTO].sport, self.expect_src_port)
+            elif self.L4PROTO in [ICMP] and not self.is_v6:
+                self._test.assertEqual(
+                    rx[self.L4PROTO].type, self.expect_dst_port)
+                self._test.assertEqual(
+                    rx[self.L4PROTO].id, self.expect_src_port)
+            elif self.L4PROTO in [ICMP] and self.is_v6:
+                self._test.assertEqual(
+                    rx[ICMPv6EchoRequest].id, self.expect_src_port)
+        return self
+
+    def cnat_send_return(self):
+        """This sends the return traffic"""
+        if self.L4PROTO in [TCP, UDP]:
+            l4 = self.L4PROTO(sport=self.expect_dst_port,
+                              dport=self.expect_src_port)
+        elif self.L4PROTO in [ICMP] and not self.is_v6:
+            # icmp type 0 if echo reply
+            l4 = self.L4PROTO(id=self.expect_src_port, type=0)
+        elif self.L4PROTO in [ICMP] and self.is_v6:
+            l4 = ICMPv6EchoReply(id=self.expect_src_port)
+        src_mac = self.expected_dst_pg.remote_mac
+        p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) /
+              self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
+              l4 /
+              Raw())
+
+        self.return_rxs = self._test.send_and_expect(
+            self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
+        return self
+
+    def cnat_expect_return(self):
+        for rx in self.return_rxs:
+            self._test.assert_packet_checksums_valid(rx)
+            self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
+            self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
+            if self.L4PROTO in [TCP, UDP]:
+                self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
+                self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
+            elif self.L4PROTO in [ICMP] and not self.is_v6:
+                # icmp type 0 if echo reply
+                self._test.assertEqual(rx[self.L4PROTO].type, 0)
+                self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
+            elif self.L4PROTO in [ICMP] and self.is_v6:
+                self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
+        return self
+
+    def cnat_send_icmp_return_error(self):
+        """
+        This called after cnat_expect will send an icmp error
+        on the reverse path
+        """
+        ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
+        InnerIP = self.rxs[0][self.IP46]
+        p1 = (
+            Ether(src=self.expected_dst_pg.remote_mac,
+                  dst=self.expected_dst_pg.local_mac) /
+            self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
+            ICMPelem / InnerIP)
+        self.return_rxs = self._test.send_and_expect(
+            self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
+        return self
+
+    def cnat_expect_icmp_error_return(self):
+        ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
+        IP46err = IPerror6 if self.is_v6 else IPerror
+        L4err = TCPerror if self.L4PROTO is TCP else UDPerror
+        for rx in self.return_rxs:
+            self._test.assert_packet_checksums_valid(rx)
+            self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
+            self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
+            self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
+            self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
+            self._test.assertEqual(
+                rx[ICMP46][IP46err][L4err].sport, self.src_port)
+            self._test.assertEqual(
+                rx[ICMP46][IP46err][L4err].dport, self.dst_port)
+        return self
+
+# -------------------------------------------------------------------
+# -------------------------------------------------------------------
+# -------------------------------------------------------------------
+# -------------------------------------------------------------------
+
+
+class TestCNatTranslation(CnatCommonTestCase):
     """ CNat Translation """
-    extra_vpp_punt_config = ["cnat", "{",
-                             "session-db-buckets", "64",
-                             "session-cleanup-timeout", "0.1",
-                             "session-max-age", "1",
-                             "tcp-max-age", "1",
-                             "scanner", "off", "}"]
 
     @classmethod
     def setUpClass(cls):
@@ -158,6 +333,8 @@ class TestCNatTranslation(VppTestCase):
         super(TestCNatTranslation, self).setUp()
 
         self.create_pg_interfaces(range(3))
+        self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
+        self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
 
         for i in self.pg_interfaces:
             i.admin_up()
@@ -165,188 +342,87 @@ class TestCNatTranslation(VppTestCase):
             i.resolve_arp()
             i.config_ip6()
             i.resolve_ndp()
+            i.configure_ipv4_neighbors()
+            i.configure_ipv6_neighbors()
 
     def tearDown(self):
+        for translation in self.translations:
+            translation.remove_vpp_config()
+
+        self.vapi.cnat_session_purge()
+        self.assertFalse(self.vapi.cnat_session_dump())
+
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
             i.admin_down()
         super(TestCNatTranslation, self).tearDown()
 
-    def cnat_create_translation(self, vip, nbr):
-        ip_v = "ip6" if vip.isV6 else "ip4"
-        dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
-        sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
-        t1 = VppCNatTranslation(
-            self, vip.l4p, vip,
-            [EpTuple(sep, dep), EpTuple(sep, dep)])
-        t1.add_vpp_config()
-        return t1
-
-    def cnat_test_translation(self, t1, nbr, sports, isV6=False):
-        ip_v = "ip6" if isV6 else "ip4"
-        ip_class = IPv6 if isV6 else IP
-        vip = t1.vip
+    def cnat_translation(self):
+        """ CNat Translation """
+        self.logger.info(self.vapi.cli("sh cnat client"))
+        self.logger.info(self.vapi.cli("sh cnat translation"))
+
+        for nbr, translation in enumerate(self.translations):
+            vip = translation.vip
 
-        #
-        # Flows
-        #
-        for src in self.pg0.remote_hosts:
-            for sport in sports:
+            #
+            # Test Flows to the VIP
+            #
+            ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
+            for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
                 # from client to vip
-                p1 = (Ether(dst=self.pg0.local_mac,
-                            src=src.mac) /
-                      ip_class(src=getattr(src, ip_v), dst=vip.ip) /
-                      vip.l4p(sport=sport, dport=vip.port) /
-                      Raw())
-
-                self.vapi.cli("trace add pg-input 1")
-                rxs = self.send_and_expect(self.pg0,
-                                           p1 * N_PKTS,
-                                           self.pg1)
-                self.logger.info(self.vapi.cli("show trace max 1"))
-
-                for rx in rxs:
-                    self.assert_packet_checksums_valid(rx)
-                    self.assertEqual(
-                        rx[ip_class].dst,
-                        getattr(self.pg1.remote_hosts[nbr], ip_v))
-                    self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
-                    self.assertEqual(
-                        rx[ip_class].src,
-                        getattr(src, ip_v))
-                    self.assertEqual(rx[vip.l4p].sport, sport)
-
+                ctx.cnat_send(self.pg0, src_pgi, sport,
+                              self.pg1, vip.ip, vip.port)
+                dst_port = translation.paths[0][DST].port
+                ctx.cnat_expect(self.pg0, src_pgi, sport,
+                                self.pg1, nbr, dst_port)
                 # from vip to client
-                p1 = (Ether(dst=self.pg1.local_mac,
-                            src=self.pg1.remote_mac) /
-                      ip_class(src=getattr(
-                          self.pg1.remote_hosts[nbr],
-                          ip_v),
-                          dst=getattr(src, ip_v)) /
-                      vip.l4p(sport=4000 + nbr, dport=sport) /
-                      Raw())
-
-                rxs = self.send_and_expect(self.pg1,
-                                           p1 * N_PKTS,
-                                           self.pg0)
-
-                for rx in rxs:
-                    self.assert_packet_checksums_valid(rx)
-                    self.assertEqual(
-                        rx[ip_class].dst,
-                        getattr(src, ip_v))
-                    self.assertEqual(rx[vip.l4p].dport, sport)
-                    self.assertEqual(rx[ip_class].src, vip.ip)
-                    self.assertEqual(rx[vip.l4p].sport, vip.port)
+                ctx.cnat_send_return().cnat_expect_return()
 
                 #
                 # packets to the VIP that do not match a
                 # translation are dropped
                 #
-                p1 = (Ether(dst=self.pg0.local_mac,
-                            src=src.mac) /
-                      ip_class(src=getattr(src, ip_v), dst=vip.ip) /
-                      vip.l4p(sport=sport, dport=6666) /
-                      Raw())
-
-                self.send_and_assert_no_replies(self.pg0,
-                                                p1 * N_PKTS,
-                                                self.pg1)
+                ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1,
+                              vip.ip, 6666, no_replies=True)
 
                 #
                 # packets from the VIP that do not match a
                 # session are forwarded
                 #
-                p1 = (Ether(dst=self.pg1.local_mac,
-                            src=self.pg1.remote_mac) /
-                      ip_class(src=getattr(
-                          self.pg1.remote_hosts[nbr],
-                          ip_v),
-                          dst=getattr(src, ip_v)) /
-                      vip.l4p(sport=6666, dport=sport) /
-                      Raw())
-
-                rxs = self.send_and_expect(self.pg1,
-                                           p1 * N_PKTS,
-                                           self.pg0)
-
-    def cnat_test_translation_update(self, t1, sports, isV6=False):
-        ip_v = "ip6" if isV6 else "ip4"
-        ip_class = IPv6 if isV6 else IP
-        vip = t1.vip
-
-        #
-        # modify the translation to use a different backend
-        #
-        dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
-        sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
-        t1.modify_vpp_config([EpTuple(sep, dep)])
-
-        #
-        # existing flows follow the old path
-        #
-        for src in self.pg0.remote_hosts:
-            for sport in sports:
-                # from client to vip
-                p1 = (Ether(dst=self.pg0.local_mac,
-                            src=src.mac) /
-                      ip_class(src=getattr(src, ip_v), dst=vip.ip) /
-                      vip.l4p(sport=sport, dport=vip.port) /
-                      Raw())
-
-                rxs = self.send_and_expect(self.pg0,
-                                           p1 * N_PKTS,
-                                           self.pg1)
-
-        #
-        # new flows go to the new backend
-        #
-        for src in self.pg0.remote_hosts:
-            p1 = (Ether(dst=self.pg0.local_mac,
-                        src=src.mac) /
-                  ip_class(src=getattr(src, ip_v), dst=vip.ip) /
-                  vip.l4p(sport=9999, dport=vip.port) /
-                  Raw())
-
-            rxs = self.send_and_expect(self.pg0,
-                                       p1 * N_PKTS,
-                                       self.pg2)
-
-    def cnat_translation(self, vips, isV6=False):
-        """ CNat Translation """
-
-        ip_class = IPv6 if isV6 else IP
-        ip_v = "ip6" if isV6 else "ip4"
-        sports = [1234, 1233]
-
-        #
-        # turn the scanner off whilst testing otherwise sessions
-        # will time out
-        #
-        self.vapi.cli("test cnat scanner off")
-
-        sessions = self.vapi.cnat_session_dump()
-
-        trs = []
-        for nbr, vip in enumerate(vips):
-            trs.append(self.cnat_create_translation(vip, nbr))
-
-        self.logger.info(self.vapi.cli("sh cnat client"))
-        self.logger.info(self.vapi.cli("sh cnat translation"))
+                ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
+                ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
+
+            #
+            # modify the translation to use a different backend
+            #
+            old_dst_port = translation.paths[0][DST].port
+            translation.paths[0][DST].udpate(
+                pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6)
+            translation.add_vpp_config()
+
+            #
+            # existing flows follow the old path
+            #
+            for src_pgi in range(N_REMOTE_HOSTS):
+                for sport in [1234, 1233]:
+                    # from client to vip
+                    ctx.cnat_send(self.pg0, src_pgi, sport,
+                                  self.pg1, vip.ip, vip.port)
+                    ctx.cnat_expect(self.pg0, src_pgi, sport,
+                                    self.pg1, nbr, old_dst_port)
+                    # from vip to client
+                    ctx.cnat_send_return().cnat_expect_return()
+
+            #
+            # new flows go to the new backend
+            #
+            for src_pgi in range(N_REMOTE_HOSTS):
+                ctx.cnat_send(self.pg0, src_pgi, 9999,
+                              self.pg2, vip.ip, vip.port)
+                ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
 
-        #
-        # translations
-        #
-        for nbr, vip in enumerate(vips):
-            self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
-            self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
-            if isV6:
-                self.logger.info(self.vapi.cli(
-                    "sh ip6 fib %s" % self.pg0.remote_ip6))
-            else:
-                self.logger.info(self.vapi.cli(
-                    "sh ip fib %s" % self.pg0.remote_ip4))
             self.logger.info(self.vapi.cli("sh cnat session verbose"))
 
         #
@@ -369,168 +445,114 @@ class TestCNatTranslation(VppTestCase):
         #
         # load some flows again and purge
         #
-        for vip in vips:
-            for src in self.pg0.remote_hosts:
-                for sport in sports:
+        for translation in self.translations:
+            vip = translation.vip
+            ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
+            for src_pgi in range(N_REMOTE_HOSTS):
+                for sport in [1234, 1233]:
                     # from client to vip
-                    p1 = (Ether(dst=self.pg0.local_mac,
-                                src=src.mac) /
-                          ip_class(src=getattr(src, ip_v), dst=vip.ip) /
-                          vip.l4p(sport=sport, dport=vip.port) /
-                          Raw())
-                    self.send_and_expect(self.pg0,
-                                         p1 * N_PKTS,
-                                         self.pg2)
-
-        for tr in trs:
-            tr.remove_vpp_config()
-
-        self.assertTrue(self.vapi.cnat_session_dump())
-        self.vapi.cnat_session_purge()
-        self.assertFalse(self.vapi.cnat_session_dump())
+                    ctx.cnat_send(self.pg0, src_pgi, sport,
+                                  self.pg2, vip.ip, vip.port)
+                    ctx.cnat_expect(self.pg0, src_pgi,
+                                    sport, self.pg2, 0, 5000)
 
-    def test_icmp(self):
-        vips = [
-            Ep("30.0.0.1", 5555),
-            Ep("30.0.0.2", 5554),
-            Ep("30.0.0.2", 5553, UDP),
-            Ep("30::1", 6666),
-            Ep("30::2", 5553, UDP),
-        ]
-        sport = 1234
-
-        self.pg0.generate_remote_hosts(len(vips))
-        self.pg0.configure_ipv6_neighbors()
-        self.pg0.configure_ipv4_neighbors()
-
-        self.pg1.generate_remote_hosts(len(vips))
-        self.pg1.configure_ipv6_neighbors()
-        self.pg1.configure_ipv4_neighbors()
+    def _test_icmp(self):
 
-        self.vapi.cli("test cnat scanner off")
-        trs = []
-        for nbr, vip in enumerate(vips):
-            trs.append(self.cnat_create_translation(vip, nbr))
-
-        self.logger.info(self.vapi.cli("sh cnat client"))
-        self.logger.info(self.vapi.cli("sh cnat translation"))
-
-        for nbr, vip in enumerate(vips):
-            if vip.isV6:
-                client_addr = self.pg0.remote_hosts[0].ip6
-                remote_addr = self.pg1.remote_hosts[nbr].ip6
-                remote2_addr = self.pg2.remote_hosts[0].ip6
-            else:
-                client_addr = self.pg0.remote_hosts[0].ip4
-                remote_addr = self.pg1.remote_hosts[nbr].ip4
-                remote2_addr = self.pg2.remote_hosts[0].ip4
-            IP46 = IPv6 if vip.isV6 else IP
-            # from client to vip
-            p1 = (Ether(dst=self.pg0.local_mac,
-                        src=self.pg0.remote_hosts[0].mac) /
-                  IP46(src=client_addr, dst=vip.ip) /
-                  vip.l4p(sport=sport, dport=vip.port) /
-                  Raw())
-
-            rxs = self.send_and_expect(self.pg0,
-                                       p1 * N_PKTS,
-                                       self.pg1)
-
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].dst, remote_addr)
-                self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
-                self.assertEqual(rx[IP46].src, client_addr)
-                self.assertEqual(rx[vip.l4p].sport, sport)
-
-            InnerIP = rxs[0][IP46]
-
-            ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
-            ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
-            # from vip to client, ICMP error
-            p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
-                  IP46(src=remote_addr, dst=client_addr) /
-                  ICMPelem / InnerIP)
-
-            rxs = self.send_and_expect(self.pg1,
-                                       p1 * N_PKTS,
-                                       self.pg0)
-
-            TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
-            IP46error = IPerror6 if vip.isV6 else IPerror
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].src, vip.ip)
-                self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
-                self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
-                self.assertEqual(rx[ICMP46][IP46error]
-                                 [TCPUDPError].sport, sport)
-                self.assertEqual(rx[ICMP46][IP46error]
-                                 [TCPUDPError].dport, vip.port)
-
-            # from other remote to client, ICMP error
-            # outside shouldn't be NAT-ed
-            p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
-                  IP46(src=remote2_addr, dst=client_addr) /
-                  ICMPelem / InnerIP)
-
-            rxs = self.send_and_expect(self.pg1,
-                                       p1 * N_PKTS,
-                                       self.pg0)
-
-            TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
-            IP46error = IPerror6 if vip.isV6 else IPerror
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].src, remote2_addr)
-                self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
-                self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
-                self.assertEqual(rx[ICMP46][IP46error]
-                                 [TCPUDPError].sport, sport)
-                self.assertEqual(rx[ICMP46][IP46error]
-                                 [TCPUDPError].dport, vip.port)
-
-        self.vapi.cnat_session_purge()
+        #
+        # Testing ICMP
+        #
+        for nbr, translation in enumerate(self.translations):
+            vip = translation.vip
+            ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
+
+            #
+            # NATing ICMP errors
+            #
+            ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
+            dst_port = translation.paths[0][DST].port
+            ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
+            ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+
+            #
+            # ICMP errors with no VIP associated should not be
+            # modified
+            #
+            ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
+            dst_port = translation.paths[0][DST].port
+            ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
+            ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+
+    def _make_translations_v4(self):
+        self.translations = []
+        self.translations.append(Translation(
+            self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
+            [(
+                Endpoint(is_v6=False),
+                Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
+            )]
+        ).add_vpp_config())
+        self.translations.append(Translation(
+            self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
+            [(
+                Endpoint(is_v6=False),
+                Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
+            )]
+        ).add_vpp_config())
+        self.translations.append(Translation(
+            self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
+            [(
+                Endpoint(is_v6=False),
+                Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
+            )]
+        ).add_vpp_config())
+
+    def _make_translations_v6(self):
+        self.translations = []
+        self.translations.append(Translation(
+            self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True),
+            [(
+                Endpoint(is_v6=True),
+                Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
+            )]
+        ).add_vpp_config())
+        self.translations.append(Translation(
+            self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True),
+            [(
+                Endpoint(is_v6=True),
+                Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
+            )]
+        ).add_vpp_config())
+        self.translations.append(Translation(
+            self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True),
+            [(
+                Endpoint(is_v6=True),
+                Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
+            )]
+        ).add_vpp_config())
+
+    def test_icmp4(self):
+        # """ CNat Translation icmp v4 """
+        self._make_translations_v4()
+        self._test_icmp()
+
+    def test_icmp6(self):
+        # """ CNat Translation icmp v6 """
+        self._make_translations_v6()
+        self._test_icmp()
 
     def test_cnat6(self):
         # """ CNat Translation ipv6 """
-        vips = [
-            Ep("30::1", 5555),
-            Ep("30::2", 5554),
-            Ep("30::2", 5553, UDP),
-        ]
-
-        self.pg0.generate_remote_hosts(len(vips))
-        self.pg0.configure_ipv6_neighbors()
-        self.pg1.generate_remote_hosts(len(vips))
-        self.pg1.configure_ipv6_neighbors()
-
-        self.cnat_translation(vips, isV6=True)
+        self._make_translations_v6()
+        self.cnat_translation()
 
     def test_cnat4(self):
         # """ CNat Translation ipv4 """
+        self._make_translations_v4()
+        self.cnat_translation()
 
-        vips = [
-            Ep("30.0.0.1", 5555),
-            Ep("30.0.0.2", 5554),
-            Ep("30.0.0.2", 5553, UDP),
-        ]
-
-        self.pg0.generate_remote_hosts(len(vips))
-        self.pg0.configure_ipv4_neighbors()
-        self.pg1.generate_remote_hosts(len(vips))
-        self.pg1.configure_ipv4_neighbors()
 
-        self.cnat_translation(vips)
-
-
-class TestCNatSourceNAT(VppTestCase):
+class TestCNatSourceNAT(CnatCommonTestCase):
     """ CNat Source NAT """
-    extra_vpp_punt_config = ["cnat", "{",
-                             "session-cleanup-timeout", "0.1",
-                             "session-max-age", "1",
-                             "tcp-max-age", "1",
-                             "scanner", "off", "}"]
 
     @classmethod
     def setUpClass(cls):
@@ -540,35 +562,18 @@ class TestCNatSourceNAT(VppTestCase):
     def tearDownClass(cls):
         super(TestCNatSourceNAT, cls).tearDownClass()
 
-    def setUp(self):
-        super(TestCNatSourceNAT, self).setUp()
-
-        self.create_pg_interfaces(range(3))
-
-        for i in self.pg_interfaces:
-            i.admin_up()
-            i.config_ip4()
-            i.resolve_arp()
-            i.config_ip6()
-            i.resolve_ndp()
-
-        self.pg0.configure_ipv6_neighbors()
-        self.pg0.configure_ipv4_neighbors()
-        self.pg1.generate_remote_hosts(2)
-        self.pg1.configure_ipv4_neighbors()
-        self.pg1.configure_ipv6_neighbors()
-
+    def _enable_disable_snat(self, is_enable=True):
         self.vapi.cnat_set_snat_addresses(
             snat_ip4=self.pg2.remote_hosts[0].ip4,
             snat_ip6=self.pg2.remote_hosts[0].ip6,
             sw_if_index=INVALID_INDEX)
         self.vapi.feature_enable_disable(
-            enable=1,
+            enable=1 if is_enable else 0,
             arc_name="ip6-unicast",
             feature_name="cnat-snat-ip6",
             sw_if_index=self.pg0.sw_if_index)
         self.vapi.feature_enable_disable(
-            enable=1,
+            enable=1 if is_enable else 0,
             arc_name="ip4-unicast",
             feature_name="cnat-snat-ip4",
             sw_if_index=self.pg0.sw_if_index)
@@ -578,13 +583,32 @@ class TestCNatSourceNAT(VppTestCase):
             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
         for i in self.pg_interfaces:
             self.vapi.cnat_snat_policy_add_del_if(
-                sw_if_index=i.sw_if_index, is_add=1,
+                sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
             self.vapi.cnat_snat_policy_add_del_if(
-                sw_if_index=i.sw_if_index, is_add=1,
+                sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
 
+    def setUp(self):
+        super(TestCNatSourceNAT, self).setUp()
+
+        self.create_pg_interfaces(range(3))
+        self.pg1.generate_remote_hosts(2)
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+            i.config_ip6()
+            i.resolve_ndp()
+            i.configure_ipv6_neighbors()
+            i.configure_ipv4_neighbors()
+
+        self._enable_disable_snat(is_enable=True)
+
     def tearDown(self):
+        self._enable_disable_snat(is_enable=True)
+
         self.vapi.cnat_session_purge()
         for i in self.pg_interfaces:
             i.unconfig_ip4()
@@ -594,272 +618,72 @@ class TestCNatSourceNAT(VppTestCase):
 
     def test_snat_v6(self):
         # """ CNat Source Nat v6 """
-        self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
-        self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
-        self.sourcenat_test_icmp_err_conf(isV6=True)
-        self.sourcenat_test_icmp_echo6_conf()
+        self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
+        self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
+        self.sourcenat_test_icmp_echo_conf(is_v6=True)
 
     def test_snat_v4(self):
         # """ CNat Source Nat v4 """
         self.sourcenat_test_tcp_udp_conf(TCP)
         self.sourcenat_test_tcp_udp_conf(UDP)
-        self.sourcenat_test_icmp_err_conf()
-        self.sourcenat_test_icmp_echo4_conf()
-
-    def sourcenat_test_icmp_echo6_conf(self):
-        sports = [1234, 1235]
-        dports = [6661, 6662]
-
-        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
-            client_addr = self.pg0.remote_hosts[0].ip6
-            remote_addr = self.pg1.remote_hosts[nbr].ip6
-            src_nat_addr = self.pg2.remote_hosts[0].ip6
-
-            # ping from pods to outside network
-            p1 = (
-                Ether(dst=self.pg0.local_mac,
-                      src=self.pg0.remote_hosts[0].mac) /
-                IPv6(src=client_addr, dst=remote_addr) /
-                ICMPv6EchoRequest(id=0xfeed) /
-                Raw())
-
-            rxs = self.send_and_expect(
-                self.pg0,
-                p1 * N_PKTS,
-                self.pg1)
-
-            for rx in rxs:
-                self.assertEqual(rx[IPv6].src, src_nat_addr)
-                self.assert_packet_checksums_valid(rx)
-
-            received_id = rx[0][ICMPv6EchoRequest].id
-            # ping reply from outside to pods
-            p2 = (
-                Ether(dst=self.pg1.local_mac,
-                      src=self.pg1.remote_hosts[nbr].mac) /
-                IPv6(src=remote_addr, dst=src_nat_addr) /
-                ICMPv6EchoReply(id=received_id))
-            rxs = self.send_and_expect(
-                self.pg1,
-                p2 * N_PKTS,
-                self.pg0)
-
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IPv6].src, remote_addr)
-                self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
-
-    def sourcenat_test_icmp_echo4_conf(self):
-        sports = [1234, 1235]
-        dports = [6661, 6662]
-
-        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
-            IP46 = IP
-            client_addr = self.pg0.remote_hosts[0].ip4
-            remote_addr = self.pg1.remote_hosts[nbr].ip4
-            src_nat_addr = self.pg2.remote_hosts[0].ip4
-
-            # ping from pods to outside network
-            p1 = (
-                Ether(dst=self.pg0.local_mac,
-                      src=self.pg0.remote_hosts[0].mac) /
-                IP46(src=client_addr, dst=remote_addr) /
-                ICMP(type=8, id=0xfeed) /
-                Raw())
-
-            rxs = self.send_and_expect(
-                self.pg0,
-                p1 * N_PKTS,
-                self.pg1)
-
-            for rx in rxs:
-                self.assertEqual(rx[IP46].src, src_nat_addr)
-                self.assert_packet_checksums_valid(rx)
-
-            received_id = rx[0][ICMP].id
-            # ping reply from outside to pods
-            p2 = (
-                Ether(dst=self.pg1.local_mac,
-                      src=self.pg1.remote_hosts[nbr].mac) /
-                IP46(src=remote_addr, dst=src_nat_addr) /
-                ICMP(type=0, id=received_id))
-            rxs = self.send_and_expect(
-                self.pg1,
-                p2 * N_PKTS,
-                self.pg0)
-
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].src, remote_addr)
-                self.assertEqual(rx[ICMP].id, 0xfeed)
-
-    def sourcenat_test_icmp_err_conf(self, isV6=False):
-        sports = [1234, 1235]
-        dports = [6661, 6662]
-
-        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
-            if isV6:
-                IP46 = IPv6
-                client_addr = self.pg0.remote_hosts[0].ip6
-                remote_addr = self.pg1.remote_hosts[nbr].ip6
-                src_nat_addr = self.pg2.remote_hosts[0].ip6
-                ICMP46 = ICMPv6DestUnreach
-                ICMPelem = ICMPv6DestUnreach(code=1)
-                IP46error = IPerror6
-            else:
-                IP46 = IP
-                client_addr = self.pg0.remote_hosts[0].ip4
-                remote_addr = self.pg1.remote_hosts[nbr].ip4
-                src_nat_addr = self.pg2.remote_hosts[0].ip4
-                IP46error = IPerror
-                ICMP46 = ICMP
-                ICMPelem = ICMP(type=11)
-
-            # from pods to outside network
-            p1 = (
-                Ether(dst=self.pg0.local_mac,
-                      src=self.pg0.remote_hosts[0].mac) /
-                IP46(src=client_addr, dst=remote_addr) /
-                TCP(sport=sports[nbr], dport=dports[nbr]) /
-                Raw())
-
-            rxs = self.send_and_expect(
-                self.pg0,
-                p1 * N_PKTS,
-                self.pg1)
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].dst, remote_addr)
-                self.assertEqual(rx[TCP].dport, dports[nbr])
-                self.assertEqual(rx[IP46].src, src_nat_addr)
-                sport = rx[TCP].sport
-
-            InnerIP = rxs[0][IP46]
-            # from outside to pods, ICMP error
-            p2 = (
-                Ether(dst=self.pg1.local_mac,
-                      src=self.pg1.remote_hosts[nbr].mac) /
-                IP46(src=remote_addr, dst=src_nat_addr) /
-                ICMPelem / InnerIP)
-
-            rxs = self.send_and_expect(
-                self.pg1,
-                p2 * N_PKTS,
-                self.pg0)
-
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].src, remote_addr)
-                self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
-                self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
-                self.assertEqual(rx[ICMP46][IP46error]
-                                 [TCPerror].sport, sports[nbr])
-                self.assertEqual(rx[ICMP46][IP46error]
-                                 [TCPerror].dport, dports[nbr])
-
-    def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
-        sports = [1234, 1235]
-        dports = [6661, 6662]
-
-        for nbr, remote_host in enumerate(self.pg1.remote_hosts):
-            if isV6:
-                IP46 = IPv6
-                client_addr = self.pg0.remote_hosts[0].ip6
-                remote_addr = self.pg1.remote_hosts[nbr].ip6
-                src_nat_addr = self.pg2.remote_hosts[0].ip6
-                exclude_prefix = ip_network(
-                    "%s/100" % remote_addr, strict=False)
-            else:
-                IP46 = IP
-                client_addr = self.pg0.remote_hosts[0].ip4
-                remote_addr = self.pg1.remote_hosts[nbr].ip4
-                src_nat_addr = self.pg2.remote_hosts[0].ip4
-                exclude_prefix = ip_network(
-                    "%s/16" % remote_addr, strict=False)
-            # from pods to outside network
-            p1 = (
-                Ether(dst=self.pg0.local_mac,
-                      src=self.pg0.remote_hosts[0].mac) /
-                IP46(src=client_addr, dst=remote_addr) /
-                l4p(sport=sports[nbr], dport=dports[nbr]) /
-                Raw())
-
-            self.vapi.cli("trace add pg-input 1")
-            rxs = self.send_and_expect(
-                self.pg0,
-                p1 * N_PKTS,
-                self.pg1)
-            self.logger.info(self.vapi.cli("show trace max 1"))
-
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].dst, remote_addr)
-                self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(rx[IP46].src, src_nat_addr)
-                sport = rx[l4p].sport
-
-            # from outside to pods
-            p2 = (
-                Ether(dst=self.pg1.local_mac,
-                      src=self.pg1.remote_hosts[nbr].mac) /
-                IP46(src=remote_addr, dst=src_nat_addr) /
-                l4p(sport=dports[nbr], dport=sport) /
-                Raw())
-
-            rxs = self.send_and_expect(
-                self.pg1,
-                p2 * N_PKTS,
-                self.pg0)
-
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].dst, client_addr)
-                self.assertEqual(rx[l4p].dport, sports[nbr])
-                self.assertEqual(rx[l4p].sport, dports[nbr])
-                self.assertEqual(rx[IP46].src, remote_addr)
-
-            # add remote host to exclude list
-            self.vapi.cnat_snat_policy_add_del_exclude_pfx(
-                prefix=exclude_prefix, is_add=1)
-            self.vapi.cnat_session_purge()
-
-            rxs = self.send_and_expect(
-                self.pg0,
-                p1 * N_PKTS,
-                self.pg1)
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].dst, remote_addr)
-                self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(rx[IP46].src, client_addr)
-
-            # remove remote host from exclude list
-            self.vapi.cnat_snat_policy_add_del_exclude_pfx(
-                prefix=exclude_prefix, is_add=0)
-            self.vapi.cnat_session_purge()
-
-            rxs = self.send_and_expect(
-                self.pg0,
-                p1 * N_PKTS,
-                self.pg1)
-
-            for rx in rxs:
-                self.assert_packet_checksums_valid(rx)
-                self.assertEqual(rx[IP46].dst, remote_addr)
-                self.assertEqual(rx[l4p].dport, dports[nbr])
-                self.assertEqual(rx[IP46].src, src_nat_addr)
-
-            self.vapi.cnat_session_purge()
-
-
-class TestCNatDHCP(VppTestCase):
+        self.sourcenat_test_icmp_echo_conf()
+
+    def sourcenat_test_icmp_echo_conf(self, is_v6=False):
+        ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
+        # 8 is ICMP type echo (v4 only)
+        ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8)
+        ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
+        ctx.cnat_send_return().cnat_expect_return()
+
+    def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
+        ctx = CnatTestContext(self, L4PROTO, is_v6)
+        # we should source NAT
+        ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+        ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+        ctx.cnat_send_return().cnat_expect_return()
+
+        # exclude dst address of pg1.1 from snat
+        if is_v6:
+            exclude_prefix = ip_network(
+                "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False)
+        else:
+            exclude_prefix = ip_network(
+                "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False)
+
+        # add remote host to exclude list
+        self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+            prefix=exclude_prefix, is_add=1)
+
+        # We should not source NAT the id=1
+        ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+        ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
+        ctx.cnat_send_return().cnat_expect_return()
+
+        # But we should source NAT the id=0
+        ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+        ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+        ctx.cnat_send_return().cnat_expect_return()
+
+        # remove remote host from exclude list
+        self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+            prefix=exclude_prefix, is_add=0)
+        self.vapi.cnat_session_purge()
+
+        # We should source NAT again
+        ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+        ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
+        ctx.cnat_send_return().cnat_expect_return()
+
+        # test return ICMP error nating
+        ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+        ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
+        ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+
+        self.vapi.cnat_session_purge()
+
+
+class TestCNatDHCP(CnatCommonTestCase):
     """ CNat Translation """
-    extra_vpp_punt_config = ["cnat", "{",
-                             "session-db-buckets", "64",
-                             "session-cleanup-timeout", "0.1",
-                             "session-max-age", "1",
-                             "tcp-max-age", "1",
-                             "scanner", "off", "}"]
 
     @classmethod
     def setUpClass(cls):
@@ -874,100 +698,101 @@ class TestCNatDHCP(VppTestCase):
             i.admin_down()
         super(TestCNatDHCP, self).tearDown()
 
-    def create_translation(self, vip_pg, *args, is_v6=False):
-        vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
-        paths = []
-        for (src_pg, dst_pg) in args:
-            paths.append(EpTuple(
-                Ep.from_pg(src_pg, is_v6=is_v6),
-                Ep.from_pg(dst_pg, is_v6=is_v6)
-            ))
-        t1 = VppCNatTranslation(self, TCP, vip, paths)
-        t1.add_vpp_config()
-        return t1
-
-    def make_addr(self, sw_if_index, i, is_v6):
+    def make_addr(self, sw_if_index, addr_id, is_v6):
         if is_v6:
-            return "fd01:%x::%u" % (sw_if_index, i + 1)
-        else:
-            return "172.16.%u.%u" % (sw_if_index, i)
+            return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
+        return "172.16.%u.%u" % (sw_if_index, addr_id)
 
-    def make_prefix(self, sw_if_index, i, is_v6):
+    def make_prefix(self, sw_if_index, addr_id, is_v6):
         if is_v6:
-            return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
-        else:
-            return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
-
-    def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
-        qt1 = tr.query_vpp_config()
-        self.assertEqual(str(qt1.vip.addr), self.make_addr(
-            vip_pg.sw_if_index, i, is_v6))
-        for (src_pg, dst_pg), path in zip(args, qt1.paths):
-            if src_pg:
-                self.assertEqual(str(path.src_ep.addr), self.make_addr(
-                    src_pg.sw_if_index, i, is_v6))
-            if dst_pg:
-                self.assertEqual(str(path.dst_ep.addr), self.make_addr(
-                    dst_pg.sw_if_index, i, is_v6))
-
-    def config_ips(self, rng, is_add=1, is_v6=False):
-        for pg, i in product(self.pg_interfaces, rng):
-            self.vapi.sw_interface_add_del_address(
-                sw_if_index=pg.sw_if_index,
-                prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
-                is_add=is_add)
-
-    def test_dhcp_v4(self):
-        self.create_pg_interfaces(range(5))
+            return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
+        return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
+
+    def check_resolved(self, tr, addr_id, is_v6=False):
+        qt = tr.query_vpp_config()
+        self.assertEqual(str(qt.vip.addr), self.make_addr(
+            tr.vip.sw_if_index, addr_id, is_v6))
+        self.assertEqual(len(qt.paths), len(tr.paths))
+        for path_tr, path_qt in zip(tr.paths, qt.paths):
+            src_qt = path_qt.src_ep
+            dst_qt = path_qt.dst_ep
+            src_tr, dst_tr = path_tr
+            self.assertEqual(str(src_qt.addr), self.make_addr(
+                src_tr.sw_if_index, addr_id, is_v6))
+            self.assertEqual(str(dst_qt.addr), self.make_addr(
+                dst_tr.sw_if_index, addr_id, is_v6))
+
+    def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
+        self.vapi.sw_interface_add_del_address(
+            sw_if_index=pg.sw_if_index,
+            prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
+            is_add=1 if is_add else 0)
+
+    def _test_dhcp_v46(self, is_v6):
+        self.create_pg_interfaces(range(4))
         for i in self.pg_interfaces:
             i.admin_up()
-        pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
-        t1 = self.create_translation(*pglist)
-        self.config_ips([0])
-        self.check_resolved(t1, *pglist)
-        self.config_ips([1])
-        self.config_ips([0], is_add=0)
-        self.check_resolved(t1, *pglist, i=1)
-        self.config_ips([1], is_add=0)
-        t1.remove_vpp_config()
+        paths = [
+            (Endpoint(pg=self.pg1, is_v6=is_v6),
+             Endpoint(pg=self.pg2, is_v6=is_v6)),
+            (Endpoint(pg=self.pg1, is_v6=is_v6),
+             Endpoint(pg=self.pg3, is_v6=is_v6))
+        ]
+        ep = Endpoint(pg=self.pg0, is_v6=is_v6)
+        t = Translation(self, TCP, ep, paths).add_vpp_config()
+        # Add an address on every interface
+        # and check it is reflected in the cnat config
+        for pg in self.pg_interfaces:
+            self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
+        self.check_resolved(t, addr_id=0, is_v6=is_v6)
+        # Add a new address on every interface, remove the old one
+        # and check it is reflected in the cnat config
+        for pg in self.pg_interfaces:
+            self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
+            self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
+        self.check_resolved(t, addr_id=1, is_v6=is_v6)
+        # remove the configuration
+        for pg in self.pg_interfaces:
+            self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
+        t.remove_vpp_config()
+
+    def test_dhcp_v4(self):
+        self._test_dhcp_v46(False)
 
     def test_dhcp_v6(self):
-        self.create_pg_interfaces(range(5))
-        for i in self.pg_interfaces:
-            i.admin_up()
-        pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
-        t1 = self.create_translation(*pglist, is_v6=True)
-        self.config_ips([0], is_v6=True)
-        self.check_resolved(t1, *pglist, is_v6=True)
-        self.config_ips([1], is_v6=True)
-        self.config_ips([0], is_add=0, is_v6=True)
-        self.check_resolved(t1, *pglist, i=1, is_v6=True)
-        self.config_ips([1], is_add=0, is_v6=True)
-        t1.remove_vpp_config()
+        self._test_dhcp_v46(True)
 
     def test_dhcp_snat(self):
         self.create_pg_interfaces(range(1))
         for i in self.pg_interfaces:
             i.admin_up()
         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
-        self.config_ips([0], is_v6=False)
-        self.config_ips([0], is_v6=True)
+        # Add an address on every interface
+        # and check it is reflected in the cnat config
+        for pg in self.pg_interfaces:
+            self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
+            self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
         r = self.vapi.cnat_get_snat_addresses()
         self.assertEqual(str(r.snat_ip4), self.make_addr(
-            self.pg0.sw_if_index, 0, False))
+            self.pg0.sw_if_index, addr_id=0, is_v6=False))
         self.assertEqual(str(r.snat_ip6), self.make_addr(
-            self.pg0.sw_if_index, 0, True))
-        self.config_ips([1], is_v6=False)
-        self.config_ips([1], is_v6=True)
-        self.config_ips([0], is_add=0, is_v6=False)
-        self.config_ips([0], is_add=0, is_v6=True)
+            self.pg0.sw_if_index, addr_id=0, is_v6=True))
+        # Add a new address on every interface, remove the old one
+        # and check it is reflected in the cnat config
+        for pg in self.pg_interfaces:
+            self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
+            self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
+            self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
+            self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
         r = self.vapi.cnat_get_snat_addresses()
         self.assertEqual(str(r.snat_ip4), self.make_addr(
-            self.pg0.sw_if_index, 1, False))
+            self.pg0.sw_if_index, addr_id=1, is_v6=False))
         self.assertEqual(str(r.snat_ip6), self.make_addr(
-            self.pg0.sw_if_index, 1, True))
-        self.config_ips([1], is_add=0, is_v6=False)
-        self.config_ips([1], is_add=0, is_v6=True)
+            self.pg0.sw_if_index, addr_id=1, is_v6=True))
+        # remove the configuration
+        for pg in self.pg_interfaces:
+            self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
+            self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)