PLRsearch: Initial implementation and suites
[csit.git] / resources / libraries / python / PLRsearch / Integrator.py
diff --git a/resources/libraries/python/PLRsearch/Integrator.py b/resources/libraries/python/PLRsearch/Integrator.py
new file mode 100644 (file)
index 0000000..f8846fd
--- /dev/null
@@ -0,0 +1,378 @@
+# Copyright (c) 2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module for numerical integration, tightly coupled to PLRsearch algorithm.
+
+See log_plus for an explanation why None acts as a special case "float" number.
+
+TODO: Separate optimizations specific to PLRsearch
+      and distribute as a standalone package so other projects may reuse.
+"""
+
+import math
+import traceback
+
+import dill
+import numpy
+
+# TODO: The preferred way to consume this code is via a pip package.
+# If your project copies code instead, make sure your pylint check does not
+# require these imports to be absolute and descending from your superpackage.
+from log_plus import log_plus
+
+
+def try_estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
+    """Call estimate_nd but catch any exception and send traceback."""
+    try:
+        return estimate_nd(communication_pipe, scale_coeff, trace_enabled)
+    except BaseException:
+        # Any subclass could have caused estimate_nd to stop before sending,
+        # so we have to catch them all.
+        traceback_string = traceback.format_exc()
+        communication_pipe.send(traceback_string)
+        # After sendig, re-raise, so usages other than "one process per call"
+        # keep behaving correctly.
+        raise
+
+
+# FIXME: Pylint reports multiple complexity violations.
+# Refactor the code, using less (but structured) variables
+# and function calls for (relatively) loosly coupled code blocks.
+def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
+    """Use Bayesian inference from control queue, put result to result queue.
+
+    TODO: Use a logging framework that works in a user friendly way.
+    (Note that multiprocessing_logging does not work well with robot
+    and robotbackgroundlogger only works for threads, not processes.
+    Or, wait for https://github.com/robotframework/robotframework/pull/2182
+    Anyway, the current implementation with trace_enabled looks ugly.)
+
+    The result is average and standard deviation for posterior distribution
+    of a single dependent (positive scalar) value.
+    The prior is assumed to be uniform on (-1, 1) for every parameter.
+    Number of parameters and the function for computing
+    the dependent value and likelihood both come from input.
+
+    The likelihood is assumed to be extremely uneven (but never zero),
+    so the function should return the logarithm of the likelihood.
+    The integration method is basically a Monte Carlo
+    (TODO: Add links to notions used here.),
+    but importance sampling is used in order to focus
+    on the part of parameter space with (relatively) non-negligible likelihood.
+
+    Multivariate Gauss distribution is used for focusing,
+    so only unimodal posterior distributions are handled correctly.
+    Initial samples are mostly used for shaping (and shifting)
+    the Gaussian distribution, later samples will probably dominate.
+    Thus, initially the algorithm behavior resembles more "find the maximum",
+    as opposed to "reliably integrate". As for later iterations of PLRsearch,
+    it is assumed that the distribution position does not change rapidly;
+    thus integration algorithm returns also the distribution data,
+    to be used as initial focus in next iteration.
+
+    After some number of samples (depends on dimension),
+    the algorithm starts tracking few most likely samples to base the Gaussian
+    distribution around, mixing with estimates from observed samples.
+    The idea is that even when (usually) one of the samples dominates,
+    first few are treated as if equally likely, to get reasonable focus.
+    During the "find the maximum" phase, this set of best samples
+    frequently takes a wrong shape (compared to observed samples
+    in equilibrium). Therefore scale_coeff argument is left for humans to tweak,
+    so the convergence is reliable and quick.
+    Any data (other than correctly weighted samples) used to keep
+    distribution shape reasonable is called "bias", regardles of
+    whether it comes from input hint, or from tracking top samples.
+
+    Until the distribution locates itself roughly around
+    the maximum likeligood point, the integration results are probably wrong.
+    That means some minimal time is needed for the result to become reliable.
+    The reported standard distribution attempts to signal inconsistence
+    (when one sample has dominating weight compared to the rest of samples),
+    but some human supervision is strongly encouraged.
+
+    To facilitate running in worker processes, arguments and results
+    are communicated via pipe. The computation does not start
+    until arguments appear in the pipe, the computation stops
+    when another item (stop object) is detected in the pipe
+    (and result is put to pipe).
+
+    TODO: Create classes for arguments and results,
+          so their fields are documented (and code perhaps more readable).
+
+    Input/argument object (received from pipe)
+    is a 4-tuple of the following fields:
+    - dimension: Integer, number of parameters to consider.
+    - dilled_function: Function (serialized using dill), which:
+    - - Takes the dimension number of float parameters from (-1, 1).
+    - - Returns float 2-tuple of dependent value and parameter log-likelihood.
+    - param_hint_avg: Dimension-tuple of floats to start searching around.
+    - param_hint_cov: Covariance matrix defining initial focus shape.
+
+    Output/result object (sent to pipe queue)
+    is a 6-tuple of the following fields:
+    - value_avg: Float estimate of posterior average dependent value.
+    - value_stdev: Float estimate of posterior standard deviation of the value.
+    - param_importance_avg: Float tuple, center of Gaussian to use next.
+    - param_importance_cov: Float covariance matrix of the Gaussian to use next.
+    - debug_list: List of debug strings to log at main process.
+    - trace_list: List of trace strings to pass to main process if enabled.
+    Trace strings are very verbose, it is not recommended to enable them.
+    In they are not enabled, trace_list will be empty.
+    It is recommended to edit some lines manually to debug_list if needed.
+
+    :param communication_pipe: Pipe to comunicate with boss process.
+    :param scale_coeff: Float number to tweak convergence speed with.
+    :param trace_enabled: Whether trace list should be populated at all.
+        Default: False
+    :type communication_pipe: multiprocessing.Connection (or compatible)
+    :type scale_coeff: float
+    :type trace_enabled: boolean
+    :raises OverflowError: If one sample dominates the rest too much.
+        Or if value_logweight_function does not handle
+        some part of parameter space carefully enough.
+    :raises numpy.linalg.LinAlgError: If the focus shape gets singular
+        (due to rounding errors). Try changing scale_coeff.
+    """
+
+    # Block until input object appears.
+    dimension, dilled_function, param_hint_avg, param_hint_cov = (
+        communication_pipe.recv())
+    debug_list = list()
+    trace_list = list()
+    def trace(name, value):
+        """
+        Add a variable (name and value) to trace list (if enabled).
+
+        This is a closure (not a pure function),
+        as it accesses trace_list and trace_enabled
+        (without any of them being an explicit argument).
+
+        :param name: Any string identifying the value.
+        :param value: Any object to log repr of.
+        :type name: str
+        :type value: object
+        """
+        if trace_enabled:
+            trace_list.append(name + " " + repr(value))
+    value_logweight_function = dill.loads(dilled_function)
+    len_top = (dimension + 2) * (dimension + 1) / 2
+    top_weight_param = list()
+    samples = 0
+    log_sum_weight = None
+    # Importance sampling produces samples of higher weight (important)
+    # more frequently, and corrects that by adding weight bonus
+    # for the less frequently (unimportant) samples.
+    # But "corrected_weight" is too close to "weight" to be readable,
+    # so "importance" is used instead, even if it runs contrary to what
+    # important region is.
+    log_sum_importance = None
+    log_importance_best = None
+    value_avg = 0.0
+    # 1x1 dimensional covariance matrix is just variance.
+    # As variance is never negative, we can track logarithm.
+    value_log_variance = None
+    # Here "secondary" means "excluding the weightest sample".
+    log_secondary_sum_importance = None
+    value_secondary_avg = 0.0
+    value_log_secondary_variance = None
+    param_sampled_avg = [0.0 for first in range(dimension)]
+    # TODO: Examine whether we can gain speed by tracking triangle only.
+    # Covariance matrix can contain negative element (off-diagonal),
+    # so no logarithm here. This can lead to zeroes on diagonal,
+    # but we have biasing to make sure it does not hurt much.
+    param_sampled_cov = [[0.0 for first in range(dimension)]
+                         for second in range(dimension)]
+    # The next two variables do NOT need to be initialized here,
+    # but pylint is not a mathematician enough to understand that.
+    param_top_avg = [0.0 for first in range(dimension)]
+    param_top_cov = [[0.0 for first in range(dimension)]
+                     for second in range(dimension)]
+    if not (param_hint_avg and param_hint_cov):
+        # First call has Nones instead of useful hints.
+        param_hint_avg = [0.0 for first in range(dimension)]
+        param_hint_cov = [
+            [1.0 if first == second else 0.0 for first in range(dimension)]
+            for second in range(dimension)]
+    while not communication_pipe.poll():
+        # Compute focus data.
+        if len(top_weight_param) < len_top:
+            # Not enough samples for reasonable top, use hint bias.
+            param_focus_avg = param_hint_avg
+            param_focus_cov = param_hint_cov
+        else:
+            # We have both top samples and overall samples.
+            # Mix them according to how much the weightest sample dominates.
+            log_top_weight = top_weight_param[0][0]
+            log_weight_norm = log_plus(log_sum_weight, log_top_weight)
+            top_ratio = math.exp(log_top_weight - log_weight_norm)
+            sampled_ratio = math.exp(log_sum_weight - log_weight_norm)
+            trace("log_top_weight", log_top_weight)
+            trace("log_sum_weight", log_sum_weight)
+            trace("top_ratio", top_ratio)
+            trace("sampled_ratio", sampled_ratio)
+            param_focus_avg = [
+                sampled_ratio * param_sampled_avg[first]
+                + top_ratio * param_top_avg[first]
+                for first in range(dimension)]
+            param_focus_cov = [[
+                scale_coeff * (
+                    sampled_ratio * param_sampled_cov[first][second]
+                    + top_ratio * param_top_cov[first][second])
+                for first in range(dimension)] for second in range(dimension)]
+        trace("param_focus_avg", param_focus_avg)
+        trace("param_focus_cov", param_focus_cov)
+        # Generate next sample.
+        while 1:
+            # TODO: Inform pylint that correct version of numpy is available.
+            sample_point = numpy.random.multivariate_normal(
+                param_focus_avg, param_focus_cov, 1)[0]
+            # Multivariate Gauss can fall outside (-1, 1) interval
+            for first in range(dimension):
+                sample_coordinate = sample_point[first]
+                if sample_coordinate <= -1.0 or sample_coordinate >= 1.0:
+                    break
+            else:  # These two breaks implement "level two continue".
+                break
+        trace("sample_point", sample_point)
+        samples += 1
+        value, log_weight = value_logweight_function(*sample_point)
+        trace("value", value)
+        trace("log_weight", log_weight)
+        # Update bias related statistics.
+        log_sum_weight = log_plus(log_sum_weight, log_weight)
+        if len(top_weight_param) < len_top:
+            top_weight_param.append((log_weight, sample_point))
+        # Hack: top_weight_param[-1] is either the smallest,
+        # or the just appended to len_top-1 item list.
+        if (len(top_weight_param) >= len_top
+                and log_weight >= top_weight_param[-1][0]):
+            top_weight_param = top_weight_param[:-1]
+            top_weight_param.append((log_weight, sample_point))
+            top_weight_param.sort(key=lambda item: -item[0])
+            trace("top_weight_param", top_weight_param)
+            # top_weight_param has changed, recompute biases.
+            param_top_avg = top_weight_param[0][1]
+            param_top_cov = [[0.0 for first in range(dimension)]
+                             for second in range(dimension)]
+            top_item_count = 1
+            for _, near_top_param in top_weight_param[1:]:
+                top_item_count += 1
+                next_item_ratio = 1.0 / top_item_count
+                previous_items_ratio = 1.0 - next_item_ratio
+                param_shift = [
+                    near_top_param[first] - param_top_avg[first]
+                    for first in range(dimension)]
+                # Do not move center from the weightest sample.
+                for second in range(dimension):
+                    for first in range(dimension):
+                        param_top_cov[first][second] += (
+                            param_shift[first] * param_shift[second]
+                            * next_item_ratio)
+                        param_top_cov[first][second] *= previous_items_ratio
+            trace("param_top_avg", param_top_avg)
+            trace("param_top_cov", param_top_cov)
+        # The code above looked at weight (not importance).
+        # The code below looks at importance (not weight).
+        param_shift = [sample_point[first] - param_focus_avg[first]
+                       for first in range(dimension)]
+        rarity_gradient = numpy.linalg.solve(param_focus_cov, param_shift)
+        rarity_step = numpy.vdot(param_shift, rarity_gradient)
+        log_rarity = rarity_step / 2.0
+        trace("log_rarity", log_rarity)
+        trace("samples", samples)
+        log_importance = log_weight + log_rarity
+        trace("log_importance", log_importance)
+        # Update sampled statistics.
+        old_log_sum_importance = log_sum_importance
+        log_sum_importance = log_plus(old_log_sum_importance, log_importance)
+        trace("new log_sum_weight", log_sum_weight)
+        trace("log_sum_importance", log_sum_importance)
+        if old_log_sum_importance is None:
+            param_sampled_avg = list(sample_point)
+            value_avg = value
+            # Other value related quantities stay None.
+            continue
+        previous_samples_ratio = math.exp(
+            old_log_sum_importance - log_sum_importance)
+        new_sample_ratio = math.exp(log_importance - log_sum_importance)
+        param_shift = [sample_point[first] - param_sampled_avg[first]
+                       for first in range(dimension)]
+        value_shift = value - value_avg
+        for first in range(dimension):
+            param_sampled_avg[first] += param_shift[first] * new_sample_ratio
+        old_value_avg = value_avg
+        value_avg += value_shift * new_sample_ratio
+        value_absolute_shift = abs(value_shift)
+        for second in range(dimension):
+            for first in range(dimension):
+                param_sampled_cov[first][second] += (
+                    param_shift[first] * param_shift[second] * new_sample_ratio)
+                param_sampled_cov[first][second] *= previous_samples_ratio
+        trace("param_sampled_avg", param_sampled_avg)
+        trace("param_sampled_cov", param_sampled_cov)
+        update_secondary_stats = True
+        if log_importance_best is None or log_importance > log_importance_best:
+            log_importance_best = log_importance
+            log_secondary_sum_importance = old_log_sum_importance
+            value_secondary_avg = old_value_avg
+            value_log_secondary_variance = value_log_variance
+            update_secondary_stats = False
+            # TODO: Update all primary quantities before secondary ones.
+            # (As opposed to current hybrid code.)
+        if value_absolute_shift > 0.0:
+            value_log_variance = log_plus(
+                value_log_variance, 2 * math.log(value_absolute_shift)
+                + log_importance - log_sum_importance)
+        if value_log_variance is not None:
+            value_log_variance -= log_sum_importance - old_log_sum_importance
+        if not update_secondary_stats:
+            continue
+        # TODO: Pylint says the following variable name is bad.
+        # Make sure the refactor uses shorter names.
+        old_log_secondary_sum_importance = log_secondary_sum_importance
+        log_secondary_sum_importance = log_plus(
+            old_log_secondary_sum_importance, log_importance)
+        if old_log_secondary_sum_importance is None:
+            value_secondary_avg = value
+            continue
+        new_sample_secondary_ratio = math.exp(
+            log_importance - log_secondary_sum_importance)
+        # TODO: No would-be variable named old_value_secondary_avg
+        # appears in subsequent computations. Probably means there is a bug.
+        value_secondary_shift = value - value_secondary_avg
+        value_secondary_absolute_shift = abs(value_secondary_shift)
+        value_secondary_avg += (
+            value_secondary_shift * new_sample_secondary_ratio)
+        if value_secondary_absolute_shift > 0.0:
+            value_log_secondary_variance = log_plus(
+                value_log_secondary_variance, (
+                    2 * math.log(value_secondary_absolute_shift)
+                    + log_importance - log_secondary_sum_importance))
+        if value_log_secondary_variance is not None:
+            value_log_secondary_variance -= (
+                log_secondary_sum_importance - old_log_secondary_sum_importance)
+    debug_list.append("integrator used " + str(samples) + " samples")
+    debug_list.append(
+        "value_avg " + str(value_avg)
+        + " param_sampled_avg " + repr(param_sampled_avg)
+        + " param_sampled_cov " + repr(param_sampled_cov)
+        + " value_log_variance " + str(value_log_variance)
+        + " value_log_secondary_variance " + str(value_log_secondary_variance))
+    value_stdev = math.exp(
+        (2 * value_log_variance - value_log_secondary_variance) / 2.0)
+    debug_list.append("top_weight_param[0] " + repr(top_weight_param[0]))
+    # Intentionally returning param_focus_avg and param_focus_cov,
+    # instead of possibly hyper-focused bias or sampled.
+    communication_pipe.send(
+        (value_avg, value_stdev, param_focus_avg, param_focus_cov, debug_list,
+         trace_list))