bool recycle;
 };
 
+/** \brief IP neighbour replace begin
+
+    The use-case is that, for some unspecified reason, the control plane
+    has a different set of neighbours it than VPP
+    currently has. The CP would thus like to 'replace' VPP's set
+    only by specifying what the new set shall be, i.e. it is not
+    going to delete anything that already eixts, rather, is wants any
+    unspecified neighbors deleted implicitly.
+    The CP declares the start of this procedure with this replace_begin
+    API Call, and when it has populated all neighbours it wants, it calls
+    the below replace_end API. From this point on it is of course free
+    to add and delete neighbours as usual.
+    The underlying mechanism by which VPP implements this replace is
+    intentionally left unspecified.
+
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+*/
+autoreply define ip_neighbor_replace_begin
+{
+  u32 client_index;
+  u32 context;
+};
+
+/** \brief IP neighbour replace end
+
+    see ip_neighbor_replace_begin description.
+
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+*/
+autoreply define ip_neighbor_replace_end
+{
+  u32 client_index;
+  u32 context;
+};
+
 /** \brief Register for IP4 ARP resolution event on receing ARP reply or
            MAC/IP info from ARP requests in L2 BDs
     @param client_index - opaque cookie to identify the sender
 
   return (ipn - ip_neighbor_pool);
 }
 
+static void
+ip_neighbor_touch (ip_neighbor_t * ipn)
+{
+  ipn->ipn_flags &= ~IP_NEIGHBOR_FLAG_STALE;
+}
+
 static bool
 ip_neighbor_is_dynamic (const ip_neighbor_t * ipn)
 {
    * list is time sorted, newest first */
   ip_neighbor_elt_t *elt, *head;
 
+  ip_neighbor_touch (ipn);
   ipn->ipn_time_last_updated = vlib_time_now (vlib_get_main ());
   ipn->ipn_n_probes = 0;
 
                       format_ip_neighbor_flags, flags, format_mac_address_t,
                       mac);
 
+      ip_neighbor_touch (ipn);
+
       /* Refuse to over-write static neighbor entry. */
       if (!(flags & IP_NEIGHBOR_FLAG_STATIC) &&
          (ipn->ipn_flags & IP_NEIGHBOR_FLAG_STATIC))
   vec_free (ipnis);
 }
 
+static walk_rc_t
+ip_neighbor_mark_one (index_t ipni, void *ctx)
+{
+  ip_neighbor_t *ipn;
+
+  ipn = ip_neighbor_get (ipni);
+
+  ipn->ipn_flags |= IP_NEIGHBOR_FLAG_STALE;
+
+  return (WALK_CONTINUE);
+}
+
+void
+ip_neighbor_mark (ip46_type_t type)
+{
+  ip_neighbor_walk (type, ~0, ip_neighbor_mark_one, NULL);
+}
+
+typedef struct ip_neighbor_sweep_ctx_t_
+{
+  index_t *ipnsc_stale;
+} ip_neighbor_sweep_ctx_t;
+
+static walk_rc_t
+ip_neighbor_sweep_one (index_t ipni, void *arg)
+{
+  ip_neighbor_sweep_ctx_t *ctx = arg;
+  ip_neighbor_t *ipn;
+
+  ipn = ip_neighbor_get (ipni);
+
+  if (ipn->ipn_flags & IP_NEIGHBOR_FLAG_STALE)
+    {
+      vec_add1 (ctx->ipnsc_stale, ipni);
+    }
+
+  return (WALK_CONTINUE);
+}
+
+void
+ip_neighbor_sweep (ip46_type_t type)
+{
+  ip_neighbor_sweep_ctx_t ctx = { };
+  index_t *ipni;
+
+  ip_neighbor_walk (type, ~0, ip_neighbor_sweep_one, &ctx);
+
+  vec_foreach (ipni, ctx.ipnsc_stale)
+  {
+    ip_neighbor_free (ip_neighbor_get (*ipni));
+  }
+  vec_free (ctx.ipnsc_stale);
+}
+
 /*
  * Remove any arp entries associated with the specified interface
  */
 
 extern void ip_neighbor_probe_dst (const ip_adjacency_t * adj,
                                   const ip46_address_t * ip);
 
+extern void ip_neighbor_mark (ip46_type_t type);
+extern void ip_neighbor_sweep (ip46_type_t type);
+
 /**
  * From the watcher to the API to publish a new neighbor
  */
 
   REPLY_MACRO (VL_API_IP_NEIGHBOR_CONFIG_REPLY);
 }
 
+static void
+vl_api_ip_neighbor_replace_begin_t_handler (vl_api_ip_neighbor_replace_begin_t
+                                           * mp)
+{
+  vl_api_ip_neighbor_replace_begin_reply_t *rmp;
+  int rv = 0;
+
+  ip_neighbor_mark (IP46_TYPE_IP4);
+  ip_neighbor_mark (IP46_TYPE_IP6);
+
+  REPLY_MACRO (VL_API_IP_NEIGHBOR_REPLACE_BEGIN_REPLY);
+}
+
+static void
+vl_api_ip_neighbor_replace_end_t_handler (vl_api_ip_neighbor_replace_end_t *
+                                         mp)
+{
+  vl_api_ip_neighbor_replace_end_reply_t *rmp;
+  int rv = 0;
+
+  ip_neighbor_sweep (IP46_TYPE_IP4);
+  ip_neighbor_sweep (IP46_TYPE_IP6);
+
+  REPLY_MACRO (VL_API_IP_NEIGHBOR_REPLACE_END_REPLY);
+}
+
 #define vl_msg_name_crc_list
 #include <vnet/ip-neighbor/ip_neighbor.api.h>
 #undef vl_msg_name_crc_list
 
 {
   ip_neighbor_flags_t flags = va_arg (*args, int);
 
-  if (flags & IP_NEIGHBOR_FLAG_STATIC)
-    s = format (s, "S");
-
-  if (flags & IP_NEIGHBOR_FLAG_DYNAMIC)
-    s = format (s, "D");
-
-  if (flags & IP_NEIGHBOR_FLAG_NO_FIB_ENTRY)
-    s = format (s, "N");
-
-  return s;
+#define _(a,b,c,d)                              \
+  if (flags & IP_NEIGHBOR_FLAG_##a)             \
+    s = format (s, "%s", d);
+  foreach_ip_neighbor_flag
+#undef _
+    return s;
 }
 
-
 u8 *
 format_ip_neighbor_key (u8 * s, va_list * va)
 {
 
   u8 stale_threshold;          /* Threshold in minutes to delete nei entry */
 } ip_neighbor_scan_arg_t;
 
+#define foreach_ip_neighbor_flag                 \
+  _(STATIC, 1 << 0, "static", "S")               \
+  _(DYNAMIC, 1 << 1, "dynamic", "D")             \
+  _(NO_FIB_ENTRY, 1 << 2, "no-fib-entry", "N")   \
+  _(PENDING, 1 << 3, "pending", "P")             \
+  _(STALE, 1 << 4, "stale", "A")                 \
+
 typedef enum ip_neighbor_flags_t_
 {
   IP_NEIGHBOR_FLAG_NONE = 0,
-  IP_NEIGHBOR_FLAG_STATIC = (1 << 0),
-  IP_NEIGHBOR_FLAG_DYNAMIC = (1 << 1),
-  IP_NEIGHBOR_FLAG_NO_FIB_ENTRY = (1 << 2),
-  IP_NEIGHBOR_FLAG_PENDING = (1 << 3),
+#define _(a,b,c,d) IP_NEIGHBOR_FLAG_##a = b,
+  foreach_ip_neighbor_flag
+#undef _
 } __attribute__ ((packed)) ip_neighbor_flags_t;
 
 typedef struct ip_neighbor_watcher_t_
 
                                  self.pg0.remote_hosts[0].ip4))
 
 
+class NeighborReplaceTestCase(VppTestCase):
+    """ ARP/ND Replacement """
+
+    @classmethod
+    def setUpClass(cls):
+        super(NeighborReplaceTestCase, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(NeighborReplaceTestCase, cls).tearDownClass()
+
+    def setUp(self):
+        super(NeighborReplaceTestCase, self).setUp()
+
+        self.create_pg_interfaces(range(4))
+
+        # pg0 configured with ip4 and 6 addresses used for input
+        # pg1 configured with ip4 and 6 addresses used for output
+        # pg2 is unnumbered to pg0
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.config_ip6()
+            i.resolve_arp()
+            i.resolve_ndp()
+
+    def tearDown(self):
+        super(NeighborReplaceTestCase, self).tearDown()
+
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.unconfig_ip6()
+            i.admin_down()
+
+    def test_replace(self):
+        """ replace """
+
+        N_HOSTS = 16
+
+        for i in self.pg_interfaces:
+            i.generate_remote_hosts(N_HOSTS)
+            i.configure_ipv4_neighbors()
+            i.configure_ipv6_neighbors()
+
+        # replace them all
+        self.vapi.ip_neighbor_replace_begin()
+        self.vapi.ip_neighbor_replace_end()
+
+        for i in self.pg_interfaces:
+            for h in range(N_HOSTS):
+                self.assertFalse(find_nbr(self,
+                                          self.pg0.sw_if_index,
+                                          self.pg0.remote_hosts[h].ip4))
+                self.assertFalse(find_nbr(self,
+                                          self.pg0.sw_if_index,
+                                          self.pg0.remote_hosts[h].ip6))
+
+        #
+        # and them all back via the API
+        #
+        for i in self.pg_interfaces:
+            for h in range(N_HOSTS):
+                VppNeighbor(self,
+                            i.sw_if_index,
+                            i.remote_hosts[h].mac,
+                            i.remote_hosts[h].ip4).add_vpp_config()
+                VppNeighbor(self,
+                            i.sw_if_index,
+                            i.remote_hosts[h].mac,
+                            i.remote_hosts[h].ip6).add_vpp_config()
+
+        #
+        # begin the replacement again, this time touch some
+        # the neighbours on pg1 so they are not deleted
+        #
+        self.vapi.ip_neighbor_replace_begin()
+
+        # update from the API all neighbours on pg1
+        for h in range(N_HOSTS):
+            VppNeighbor(self,
+                        self.pg1.sw_if_index,
+                        self.pg1.remote_hosts[h].mac,
+                        self.pg1.remote_hosts[h].ip4).add_vpp_config()
+            VppNeighbor(self,
+                        self.pg1.sw_if_index,
+                        self.pg1.remote_hosts[h].mac,
+                        self.pg1.remote_hosts[h].ip6).add_vpp_config()
+
+        # update from the data-plane all neighbours on pg3
+        self.pg3.configure_ipv4_neighbors()
+        self.pg3.configure_ipv6_neighbors()
+
+        # complete the replacement
+        self.logger.info(self.vapi.cli("sh ip neighbors"))
+        self.vapi.ip_neighbor_replace_end()
+
+        for i in self.pg_interfaces:
+            if i == self.pg1 or i == self.pg3:
+                # neighbours on pg1 and pg3 are still present
+                for h in range(N_HOSTS):
+                    self.assertTrue(find_nbr(self,
+                                             i.sw_if_index,
+                                             i.remote_hosts[h].ip4))
+                    self.assertTrue(find_nbr(self,
+                                             i.sw_if_index,
+                                             i.remote_hosts[h].ip6))
+            else:
+                # all other neighbours are toast
+                for h in range(N_HOSTS):
+                    self.assertFalse(find_nbr(self,
+                                              i.sw_if_index,
+                                              i.remote_hosts[h].ip4))
+                    self.assertFalse(find_nbr(self,
+                                              i.sw_if_index,
+                                              i.remote_hosts[h].ip6))
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)