ip: Replace Sematics for Interface IP addresses
[vpp.git] / test / test_ip6.py
index a8a730c..990b53e 100644 (file)
@@ -1,6 +1,7 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import socket
+from socket import inet_pton, inet_ntop
 import unittest
 
 from parameterized import parameterized
@@ -10,25 +11,26 @@ from scapy.contrib.mpls import MPLS
 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
-    ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
+    ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
+    IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.packet import Raw
-from scapy.utils import inet_pton, inet_ntop
 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
     in6_mactoifaceid
 from six import moves
 
 from framework import VppTestCase, VppTestRunner
 from util import ppp, ip6_normalize, mk_ll_addr
-from vpp_ip import DpoProto, VppIpAddress
+from vpp_ip import DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
-    VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, \
-    VppIpInterfaceAddress
+    VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
+    VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump
 from vpp_neighbor import find_nbr, VppNeighbor
 from vpp_pg_interface import is_ipv6_misc
 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from ipaddress import IPv6Network, IPv4Network, IPv6Address
+from vpp_policer import VppPolicer
+from ipaddress import IPv6Network, IPv6Address
 
 AF_INET6 = socket.AF_INET6
 
@@ -138,6 +140,7 @@ class TestIPv6ND(VppTestCase):
 
     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
                            filter_out_fn=is_ipv6_misc):
+        self.vapi.cli("clear trace")
         tx_intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -222,7 +225,6 @@ class TestIPv6(TestIPv6ND):
         """Run standard test teardown and log ``show ip6 neighbors``."""
         for i in self.interfaces:
             i.unconfig_ip6()
-            i.ip6_disable()
             i.admin_down()
         for i in self.sub_interfaces:
             i.remove_vpp_config()
@@ -578,9 +580,12 @@ class TestIPv6(TestIPv6ND):
                        self.pg0.remote_ip6,
                        self.pg1.remote_hosts[1].ip6)
 
-    def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
+    def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
+                    mtu=9000, pi_opt=None):
         if not dst_ip:
             dst_ip = intf.remote_ip6
+        if not src_ip:
+            src_ip = mk_ll_addr(intf.local_mac)
 
         # unicasted packets must come to the unicast mac
         self.assertEqual(rx[Ether].dst, intf.remote_mac)
@@ -595,8 +600,7 @@ class TestIPv6(TestIPv6ND):
 
         # and come from the router's link local
         self.assertTrue(in6_islladdr(rx[IPv6].src))
-        self.assertEqual(in6_ptop(rx[IPv6].src),
-                         in6_ptop(mk_ll_addr(intf.local_mac)))
+        self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
 
         # it should contain the links MTU
         ra = rx[ICMPv6ND_RA]
@@ -635,7 +639,9 @@ class TestIPv6(TestIPv6ND):
 
     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
                            filter_out_fn=is_ipv6_misc,
-                           opt=None):
+                           opt=None,
+                           src_ip=None):
+        self.vapi.cli("clear trace")
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -643,7 +649,7 @@ class TestIPv6(TestIPv6ND):
 
         self.assertEqual(len(rx), 1)
         rx = rx[0]
-        self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
+        self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
 
     def test_rs(self):
         """ IPv6 Router Solicitation Exceptions
@@ -666,8 +672,7 @@ class TestIPv6(TestIPv6ND):
         #  - expect an RA in return
         #
         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
-             IPv6(
-                 dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
+             IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
              ICMPv6ND_RS())
         pkts = [p]
         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
@@ -925,16 +930,53 @@ class TestIPv6(TestIPv6ND):
                                self.pg1.local_ip6_prefix_len),
                                is_no=1)
 
+        #
+        # change the link's link local, so we know that works too.
+        #
+        self.vapi.sw_interface_ip6_set_link_local_address(
+            sw_if_index=self.pg0.sw_if_index,
+            ip="fe80::88")
+
         self.pg0.ip6_ra_config(send_unicast=1)
         self.send_and_expect_ra(self.pg0, p,
                                 "RA with Prefix reverted to defaults",
-                                dst_ip=ll)
+                                dst_ip=ll,
+                                src_ip="fe80::88")
 
         #
         # Reset the periodic advertisements back to default values
         #
         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
 
+    def test_mld(self):
+        """ MLD Report """
+        #
+        # test one MLD is sent after applying an IPv6 Address on an interface
+        #
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        subitf = VppDot1QSubint(self, self.pg1, 99)
+
+        subitf.admin_up()
+        subitf.config_ip6()
+
+        rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
+
+        #
+        # hunt for the MLD on vlan 99
+        #
+        for rx in rxs:
+            # make sure ipv6 packets with hop by hop options have
+            # correct checksums
+            self.assert_packet_checksums_valid(rx)
+            if rx.haslayer(IPv6ExtHdrHopByHop) and \
+               rx.haslayer(Dot1Q) and \
+               rx[Dot1Q].vlan == 99:
+                mld = rx[ICMPv6MLReport2]
+
+        self.assertEqual(mld.records_number, 4)
+
 
 class TestIPv6IfAddrRoute(VppTestCase):
     """ IPv6 Interface Addr Route Test Case """
@@ -982,30 +1024,29 @@ class TestIPv6IfAddrRoute(VppTestCase):
         addr1 = "2001:10::10"
         addr2 = "2001:10::20"
 
-        if_addr1 = VppIpInterfaceAddress(self, self.pg0,
-                                         VppIpAddress(addr1), 64)
-        if_addr2 = VppIpInterfaceAddress(self, self.pg0,
-                                         VppIpAddress(addr2), 64)
-        self.assertFalse(if_addr1.query_vpp_config())  # 2001:10::/64
+        if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
+        if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
+        self.assertFalse(if_addr1.query_vpp_config())
         self.assertFalse(find_route(self, addr1, 128))
         self.assertFalse(find_route(self, addr2, 128))
 
         # configure first address, verify route present
         if_addr1.add_vpp_config()
-        self.assertTrue(if_addr1.query_vpp_config())  # 2001:10::/64
+        self.assertTrue(if_addr1.query_vpp_config())
         self.assertTrue(find_route(self, addr1, 128))
         self.assertFalse(find_route(self, addr2, 128))
 
         # configure second address, delete first, verify route not removed
         if_addr2.add_vpp_config()
         if_addr1.remove_vpp_config()
-        self.assertTrue(if_addr1.query_vpp_config())  # 2001:10::/64
+        self.assertFalse(if_addr1.query_vpp_config())
+        self.assertTrue(if_addr2.query_vpp_config())
         self.assertFalse(find_route(self, addr1, 128))
         self.assertTrue(find_route(self, addr2, 128))
 
         # delete second address, verify route removed
         if_addr2.remove_vpp_config()
-        self.assertFalse(if_addr1.query_vpp_config())  # 2001:10::/64
+        self.assertFalse(if_addr1.query_vpp_config())
         self.assertFalse(find_route(self, addr1, 128))
         self.assertFalse(find_route(self, addr2, 128))
 
@@ -1036,7 +1077,6 @@ class TestICMPv6Echo(VppTestCase):
         super(TestICMPv6Echo, self).tearDown()
         for i in self.pg_interfaces:
             i.unconfig_ip6()
-            i.ip6_disable()
             i.admin_down()
 
     def test_icmpv6_echo(self):
@@ -1160,7 +1200,7 @@ class TestIPv6RD(TestIPv6ND):
     def test_rd_receive_router_advertisement(self):
         """ Verify events triggered by received RA packets """
 
-        self.vapi.want_ip6_ra_events()
+        self.vapi.want_ip6_ra_events(enable=1)
 
         prefix_info_1 = ICMPv6NDOptPrefixInfo(
             prefix="1::2",
@@ -1266,6 +1306,17 @@ class TestIPv6RDControlPlane(TestIPv6ND):
                     list.append(str(entry.route.prefix.network_address))
         return list
 
+    def wait_for_no_default_route(self, n_tries=50, s_time=1):
+        while (n_tries):
+            fib = self.vapi.ip_route_dump(0, True)
+            default_routes = self.get_default_routes(fib)
+            if 0 == len(default_routes):
+                return True
+            n_tries = n_tries - 1
+            self.sleep(s_time)
+
+        return False
+
     def test_all(self):
         """ Test handling of SLAAC addresses and default routes """
 
@@ -1365,9 +1416,7 @@ class TestIPv6RDControlPlane(TestIPv6ND):
         self.sleep_on_vpp_time(1)
 
         # check that default route is deleted
-        fib = self.vapi.ip_route_dump(0, True)
-        default_routes = self.get_default_routes(fib)
-        self.assertEqual(len(default_routes), 0)
+        self.assertTrue(self.wait_for_no_default_route())
 
         # check FIB still contains the SLAAC address
         addresses = set(self.get_interface_addresses(fib, self.pg0))
@@ -1444,7 +1493,7 @@ class IPv6NDProxyTest(TestIPv6ND):
         # Add proxy support for the host
         #
         self.vapi.ip6nd_proxy_add_del(
-            ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
+            is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
             sw_if_index=self.pg1.sw_if_index)
 
         #
@@ -1469,7 +1518,7 @@ class IPv6NDProxyTest(TestIPv6ND):
              IPv6(dst=self.pg0._remote_hosts[2].ip6,
                   src=self.pg0.remote_ip6) /
              inet6.UDP(sport=10000, dport=20000) /
-             Raw('\xa5' * 100))
+             Raw(b'\xa5' * 100))
 
         self.pg0.add_stream(t)
         self.pg_enable_capture(self.pg_interfaces)
@@ -1511,7 +1560,7 @@ class IPv6NDProxyTest(TestIPv6ND):
                       lladdr=self.pg0._remote_hosts[2].mac))
 
         self.vapi.ip6nd_proxy_add_del(
-            ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
+            is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
             sw_if_index=self.pg2.sw_if_index)
 
         self.send_and_expect_na(self.pg2, ns_pg2,
@@ -1531,7 +1580,7 @@ class IPv6NDProxyTest(TestIPv6ND):
               IPv6(dst=self.pg0._remote_hosts[2].ip6,
                    src=self.pg0._remote_hosts[3].ip6) /
               inet6.UDP(sport=10000, dport=20000) /
-              Raw('\xa5' * 100))
+              Raw(b'\xa5' * 100))
 
         self.pg2.add_stream(t2)
         self.pg_enable_capture(self.pg_interfaces)
@@ -1552,10 +1601,10 @@ class IPv6NDProxyTest(TestIPv6ND):
         #
         self.vapi.ip6nd_proxy_add_del(
             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
-            sw_if_index=self.pg1.sw_if_index, is_del=1)
+            sw_if_index=self.pg1.sw_if_index, is_add=0)
         self.vapi.ip6nd_proxy_add_del(
             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
-            sw_if_index=self.pg2.sw_if_index, is_del=1)
+            sw_if_index=self.pg2.sw_if_index, is_add=0)
 
         self.assertFalse(find_nbr(self,
                                   self.pg2.sw_if_index,
@@ -1619,7 +1668,7 @@ class TestIPNull(VppTestCase):
                    dst=self.pg0.local_mac) /
              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
              inet6.UDP(sport=1234, dport=1234) /
-             Raw('\xa5' * 100))
+             Raw(b'\xa5' * 100))
 
         #
         # A route via IP NULL that will reply with ICMP unreachables
@@ -1718,12 +1767,12 @@ class TestIPDisabled(VppTestCase):
                     dst=self.pg1.local_mac) /
               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
               inet6.UDP(sport=1234, dport=1234) /
-              Raw('\xa5' * 100))
+              Raw(b'\xa5' * 100))
         pm = (Ether(src=self.pg1.remote_mac,
                     dst=self.pg1.local_mac) /
               IPv6(src="2001::1", dst="ffef::1") /
               inet6.UDP(sport=1234, dport=1234) /
-              Raw('\xa5' * 100))
+              Raw(b'\xa5' * 100))
 
         #
         # PG1 does not forward IP traffic
@@ -1834,7 +1883,7 @@ class TestIP6LoadBalance(VppTestCase):
             port_ip_hdr = (
                 IPv6(dst="3000::1", src="3000:1::1") /
                 inet6.UDP(sport=1234, dport=1234 + ii) /
-                Raw('\xa5' * 100))
+                Raw(b'\xa5' * 100))
             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
                                        dst=self.pg0.local_mac) /
                                  port_ip_hdr))
@@ -1856,7 +1905,7 @@ class TestIP6LoadBalance(VppTestCase):
             src_ip_hdr = (
                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
                 inet6.UDP(sport=1234, dport=1234) /
-                Raw('\xa5' * 100))
+                Raw(b'\xa5' * 100))
             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
                                       dst=self.pg0.local_mac) /
                                 src_ip_hdr))
@@ -1954,13 +2003,13 @@ class TestIP6LoadBalance(VppTestCase):
                                    src="4000:1::1") /
                               inet6.UDP(sport=1234,
                                         dport=1234 + ii) /
-                              Raw('\xa5' * 100)))
+                              Raw(b'\xa5' * 100)))
             src_pkts.append((Ether(src=self.pg0.remote_mac,
                                    dst=self.pg0.local_mac) /
                              IPv6(dst="4000::1",
                                   src="4000:1::%d" % ii) /
                              inet6.UDP(sport=1234, dport=1234) /
-                             Raw('\xa5' * 100)))
+                             Raw(b'\xa5' * 100)))
 
         route_3000_2 = VppIpRoute(self, "3000::2", 128,
                                   [VppRoutePath(self.pg3.remote_ip6,
@@ -2000,7 +2049,7 @@ class TestIP6LoadBalance(VppTestCase):
                                    src="6000:1::1") /
                               inet6.UDP(sport=1234,
                                         dport=1234 + ii) /
-                              Raw('\xa5' * 100)))
+                              Raw(b'\xa5' * 100)))
 
         route_5000_2 = VppIpRoute(self, "5000::2", 128,
                                   [VppRoutePath(self.pg3.remote_ip6,
@@ -2054,7 +2103,7 @@ class TestIP6Punt(VppTestCase):
              IPv6(src=self.pg0.remote_ip6,
                   dst=self.pg0.local_ip6) /
              inet6.TCP(sport=1234, dport=1234) /
-             Raw('\xa5' * 100))
+             Raw(b'\xa5' * 100))
 
         pkts = p * 1025
 
@@ -2071,8 +2120,8 @@ class TestIP6Punt(VppTestCase):
         #
         # add a policer
         #
-        policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
-                                            rate_type=1)
+        policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
+        policer.add_vpp_config()
         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
 
         self.vapi.cli("clear trace")
@@ -2092,8 +2141,7 @@ class TestIP6Punt(VppTestCase):
         # remove the policer. back to full rx
         #
         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
-        self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
-                                  rate_type=1, is_add=0)
+        policer.remove_vpp_config()
         self.send_and_expect(self.pg0, pkts, self.pg1)
 
         #
@@ -2221,12 +2269,12 @@ class TestIPDeag(VppTestCase):
                        dst=self.pg0.local_mac) /
                  IPv6(src="5::5", dst="1::1") /
                  inet6.TCP(sport=1234, dport=1234) /
-                 Raw('\xa5' * 100))
+                 Raw(b'\xa5' * 100))
         p_src = (Ether(src=self.pg0.remote_mac,
                        dst=self.pg0.local_mac) /
                  IPv6(src="2::2", dst="1::2") /
                  inet6.TCP(sport=1234, dport=1234) /
-                 Raw('\xa5' * 100))
+                 Raw(b'\xa5' * 100))
         pkts_dst = p_dst * 257
         pkts_src = p_src * 257
 
@@ -2268,7 +2316,7 @@ class TestIPDeag(VppTestCase):
                      dst=self.pg0.local_mac) /
                IPv6(src="3::4", dst="3::3") /
                inet6.TCP(sport=1234, dport=1234) /
-               Raw('\xa5' * 100))
+               Raw(b'\xa5' * 100))
 
         self.send_and_assert_no_replies(self.pg0, p_l * 257,
                                         "IP lookup loop")
@@ -2312,7 +2360,7 @@ class TestIP6Input(VppTestCase):
                           dst=self.pg1.remote_ip6,
                           hlim=1) /
                      inet6.UDP(sport=1234, dport=1234) /
-                     Raw('\xa5' * 100))
+                     Raw(b'\xa5' * 100))
 
         rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
         rx = rx[0]
@@ -2351,7 +2399,7 @@ class TestIP6Input(VppTestCase):
                           dst=dst or self.pg1.remote_ip6,
                           version=3) /
                      l4 /
-                     Raw('\xa5' * 100))
+                     Raw(b'\xa5' * 100))
 
         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
                                         remark=msg or "",
@@ -2365,11 +2413,313 @@ class TestIP6Input(VppTestCase):
              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
              IPv6ExtHdrHopByHop() /
              inet6.UDP(sport=1234, dport=1234) /
-             Raw('\xa5' * 100))
+             Raw(b'\xa5' * 100))
 
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
+
+class TestIPReplace(VppTestCase):
+    """ IPv6 Table Replace """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestIPReplace, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestIPReplace, cls).tearDownClass()
+
+    def setUp(self):
+        super(TestIPReplace, self).setUp()
+
+        self.create_pg_interfaces(range(4))
+
+        table_id = 1
+        self.tables = []
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip6()
+            i.resolve_arp()
+            i.generate_remote_hosts(2)
+            self.tables.append(VppIpTable(self, table_id,
+                                          True).add_vpp_config())
+            table_id += 1
+
+    def tearDown(self):
+        super(TestIPReplace, self).tearDown()
+        for i in self.pg_interfaces:
+            i.admin_down()
+            i.unconfig_ip4()
+
+    def test_replace(self):
+        """ IP Table Replace """
+
+        N_ROUTES = 20
+        links = [self.pg0, self.pg1, self.pg2, self.pg3]
+        routes = [[], [], [], []]
+
+        # the sizes of 'empty' tables
+        for t in self.tables:
+            self.assertEqual(len(t.dump()), 2)
+            self.assertEqual(len(t.mdump()), 5)
+
+        # load up the tables with some routes
+        for ii, t in enumerate(self.tables):
+            for jj in range(1, N_ROUTES):
+                uni = VppIpRoute(
+                    self, "2001::%d" % jj if jj != 0 else "2001::", 128,
+                    [VppRoutePath(links[ii].remote_hosts[0].ip6,
+                                  links[ii].sw_if_index),
+                     VppRoutePath(links[ii].remote_hosts[1].ip6,
+                                  links[ii].sw_if_index)],
+                    table_id=t.table_id).add_vpp_config()
+                multi = VppIpMRoute(
+                    self, "::",
+                    "ff:2001::%d" % jj, 128,
+                    MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+                    [VppMRoutePath(self.pg0.sw_if_index,
+                                   MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
+                                   proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+                     VppMRoutePath(self.pg1.sw_if_index,
+                                   MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+                                   proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+                     VppMRoutePath(self.pg2.sw_if_index,
+                                   MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+                                   proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+                     VppMRoutePath(self.pg3.sw_if_index,
+                                   MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+                                   proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
+                    table_id=t.table_id).add_vpp_config()
+                routes[ii].append({'uni': uni,
+                                   'multi': multi})
+
+        #
+        # replace the tables a few times
+        #
+        for kk in range(3):
+            # replace each table
+            for t in self.tables:
+                t.replace_begin()
+
+            # all the routes are still there
+            for ii, t in enumerate(self.tables):
+                dump = t.dump()
+                mdump = t.mdump()
+                for r in routes[ii]:
+                    self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+                    self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+            # redownload the even numbered routes
+            for ii, t in enumerate(self.tables):
+                for jj in range(0, N_ROUTES, 2):
+                    routes[ii][jj]['uni'].add_vpp_config()
+                    routes[ii][jj]['multi'].add_vpp_config()
+
+            # signal each table converged
+            for t in self.tables:
+                t.replace_end()
+
+            # we should find the even routes, but not the odd
+            for ii, t in enumerate(self.tables):
+                dump = t.dump()
+                mdump = t.mdump()
+                for jj in range(0, N_ROUTES, 2):
+                    self.assertTrue(find_route_in_dump(
+                        dump, routes[ii][jj]['uni'], t))
+                    self.assertTrue(find_mroute_in_dump(
+                        mdump, routes[ii][jj]['multi'], t))
+                for jj in range(1, N_ROUTES - 1, 2):
+                    self.assertFalse(find_route_in_dump(
+                        dump, routes[ii][jj]['uni'], t))
+                    self.assertFalse(find_mroute_in_dump(
+                        mdump, routes[ii][jj]['multi'], t))
+
+            # reload all the routes
+            for ii, t in enumerate(self.tables):
+                for r in routes[ii]:
+                    r['uni'].add_vpp_config()
+                    r['multi'].add_vpp_config()
+
+            # all the routes are still there
+            for ii, t in enumerate(self.tables):
+                dump = t.dump()
+                mdump = t.mdump()
+                for r in routes[ii]:
+                    self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+                    self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+        #
+        # finally flush the tables for good measure
+        #
+        for t in self.tables:
+            t.flush()
+            self.assertEqual(len(t.dump()), 2)
+            self.assertEqual(len(t.mdump()), 5)
+
+
+class TestIP6Replace(VppTestCase):
+    """ IPv4 Interface Address Replace """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestIP6Replace, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestIP6Replace, cls).tearDownClass()
+
+    def setUp(self):
+        super(TestIP6Replace, self).setUp()
+
+        self.create_pg_interfaces(range(4))
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+
+    def tearDown(self):
+        super(TestIP6Replace, self).tearDown()
+        for i in self.pg_interfaces:
+            i.admin_down()
+
+    def get_n_pfxs(self, intf):
+        return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
+
+    def test_replace(self):
+        """ IP interface address replace """
+
+        intf_pfxs = [[], [], [], []]
+
+        # add prefixes to each of the interfaces
+        for i in range(len(self.pg_interfaces)):
+            intf = self.pg_interfaces[i]
+
+            # 2001:16:x::1/64
+            addr = "2001:16:%d::1" % intf.sw_if_index
+            a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+            intf_pfxs[i].append(a)
+
+            # 2001:16:x::2/64 - a different address in the same subnet as above
+            addr = "2001:16:%d::2" % intf.sw_if_index
+            a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+            intf_pfxs[i].append(a)
+
+            # 2001:15:x::2/64 - a different address and subnet
+            addr = "2001:15:%d::2" % intf.sw_if_index
+            a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+            intf_pfxs[i].append(a)
+
+        # a dump should n_address in it
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+
+        #
+        # remove all the address thru a replace
+        #
+        self.vapi.sw_interface_address_replace_begin()
+        self.vapi.sw_interface_address_replace_end()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 0)
+
+        #
+        # add all the interface addresses back
+        #
+        for p in intf_pfxs:
+            for v in p:
+                v.add_vpp_config()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+
+        #
+        # replace again, but this time update/re-add the address on the first
+        # two interfaces
+        #
+        self.vapi.sw_interface_address_replace_begin()
+
+        for p in intf_pfxs[:2]:
+            for v in p:
+                v.add_vpp_config()
+
+        self.vapi.sw_interface_address_replace_end()
+
+        # on the first two the address still exist,
+        # on the other two they do not
+        for intf in self.pg_interfaces[:2]:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+        for p in intf_pfxs[:2]:
+            for v in p:
+                self.assertTrue(v.query_vpp_config())
+        for intf in self.pg_interfaces[2:]:
+            self.assertEqual(self.get_n_pfxs(intf), 0)
+
+        #
+        # add all the interface addresses back on the last two
+        #
+        for p in intf_pfxs[2:]:
+            for v in p:
+                v.add_vpp_config()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+
+        #
+        # replace again, this time add different prefixes on all the interfaces
+        #
+        self.vapi.sw_interface_address_replace_begin()
+
+        pfxs = []
+        for intf in self.pg_interfaces:
+            # 2001:18:x::1/64
+            addr = "2001:18:%d::1" % intf.sw_if_index
+            pfxs.append(VppIpInterfaceAddress(self, intf, addr,
+                                              64).add_vpp_config())
+
+        self.vapi.sw_interface_address_replace_end()
+
+        # only .18 should exist on each interface
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 1)
+        for pfx in pfxs:
+            self.assertTrue(pfx.query_vpp_config())
+
+        #
+        # remove everything
+        #
+        self.vapi.sw_interface_address_replace_begin()
+        self.vapi.sw_interface_address_replace_end()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 0)
+
+        #
+        # add prefixes to each interface. post-begin add the prefix from
+        # interface X onto interface Y. this would normally be an error
+        # since it would generate a 'duplicate address' warning. but in
+        # this case, since what is newly downloaded is sane, it's ok
+        #
+        for intf in self.pg_interfaces:
+            # 2001:18:x::1/64
+            addr = "2001:18:%d::1" % intf.sw_if_index
+            VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+
+        self.vapi.sw_interface_address_replace_begin()
+
+        pfxs = []
+        for intf in self.pg_interfaces:
+            # 2001:18:x::1/64
+            addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
+            pfxs.append(VppIpInterfaceAddress(self, intf,
+                                              addr, 64).add_vpp_config())
+
+        self.vapi.sw_interface_address_replace_end()
+
+        self.logger.info(self.vapi.cli("sh int addr"))
+
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 1)
+        for pfx in pfxs:
+            self.assertTrue(pfx.query_vpp_config())
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)