Framework: Refactor complex functions in PLRSearch
[csit.git] / resources / libraries / python / PLRsearch / Integrator.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module for numerical integration, tightly coupled to PLRsearch algorithm.
15
16 See log_plus for an explanation why None acts as a special case "float" number.
17
18 TODO: Separate optimizations specific to PLRsearch and distribute the rest
19       as a standalone package so other projects may reuse.
20 """
21
22 import copy
23 import traceback
24
25 import dill
26 from numpy import random
27
28 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
29 # then switch to absolute imports within PLRsearch package.
30 # Current usage of relative imports is just a short term workaround.
31 import stat_trackers  # pylint: disable=relative-import
32
33
34 def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
35     """Call estimate_nd but catch any exception and send traceback."""
36     try:
37         return estimate_nd(communication_pipe, scale_coeff, trace_enabled)
38     except BaseException:
39         # Any subclass could have caused estimate_nd to stop before sending,
40         # so we have to catch them all.
41         traceback_string = traceback.format_exc()
42         communication_pipe.send(traceback_string)
43         # After sendig, re-raise, so usages other than "one process per call"
44         # keep behaving correctly.
45         raise
46
47
48 def generate_sample(averages, covariance_matrix, dimension, scale_coeff):
49     """Generate next sample for estimate_nd"""
50     covariance_matrix = copy.deepcopy(covariance_matrix)
51     for first in range(dimension):
52         for second in range(dimension):
53             covariance_matrix[first][second] *= scale_coeff
54     while 1:
55         sample_point = random.multivariate_normal(
56             averages, covariance_matrix, 1)[0].tolist()
57         # Multivariate Gauss can fall outside (-1, 1) interval
58         for first in range(dimension):
59             sample_coordinate = sample_point[first]
60             if sample_coordinate <= -1.0 or sample_coordinate >= 1.0:
61                 break
62         else:
63             return sample_point
64
65
66 def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
67     """Use Bayesian inference from control queue, put result to result queue.
68
69     TODO: Use a logging framework that works in a user friendly way.
70     (Note that multiprocessing_logging does not work well with robot
71     and robotbackgroundlogger only works for threads, not processes.
72     Or, wait for https://github.com/robotframework/robotframework/pull/2182
73     Anyway, the current implementation with trace_enabled looks ugly.)
74
75     The result is average and standard deviation for posterior distribution
76     of a single dependent (scalar, float) value.
77     The prior is assumed to be uniform on (-1, 1) for every parameter.
78     Number of parameters and the function for computing
79     the dependent value and likelihood both come from input.
80
81     The likelihood is assumed to be extremely uneven (but never zero),
82     so the function should return the logarithm of the likelihood.
83     The integration method is basically a Monte Carlo
84     (TODO: Add links to notions used here.),
85     but importance sampling is used in order to focus
86     on the part of parameter space with (relatively) non-negligible likelihood.
87
88     Multivariate Gauss distribution is used for focusing,
89     so only unimodal posterior distributions are handled correctly.
90     Initial samples are mostly used for shaping (and shifting)
91     the Gaussian distribution, later samples will probably dominate.
92     Thus, initially the algorithm behavior resembles more "find the maximum",
93     as opposed to "reliably integrate". As for later iterations of PLRsearch,
94     it is assumed that the distribution position does not change rapidly;
95     thus integration algorithm returns also the distribution data,
96     to be used as initial focus in next iteration.
97
98     There are workarounds in place that allow old or default focus tracker
99     to be updated reasonably, even when initial samples
100     of new iteration have way smaller (or larger) weights.
101
102     During the "find the maximum" phase, the focus tracker frequently takes
103     a wrong shape (compared to observed samples in equilibrium).
104     Therefore scale_coeff argument is left for humans to tweak,
105     so the convergence is reliable and quick.
106
107     Until the distribution locates itself roughly around
108     the maximum likeligood point, the integration results are probably wrong.
109     That means some minimal time is needed for the result to become reliable.
110
111     TODO: The folowing is not currently implemented.
112     The reported standard distribution attempts to signal inconsistence
113     (when one sample has dominating weight compared to the rest of samples),
114     but some human supervision is strongly encouraged.
115
116     To facilitate running in worker processes, arguments and results
117     are communicated via a pipe. The computation does not start
118     until arguments appear in the pipe, the computation stops
119     when another item (stop object) is detected in the pipe
120     (and result is put to pipe).
121
122     TODO: Create classes for arguments and results,
123           so their fields are documented (and code perhaps more readable).
124
125     Input/argument object (received from pipe)
126     is a 4-tuple of the following fields:
127     - dimension: Integer, number of parameters to consider.
128     - dilled_function: Function (serialized using dill), which:
129     - - Takes the dimension number of float parameters from (-1, 1).
130     - - Returns float 2-tuple of dependent value and parameter log-likelihood.
131     - param_focus_tracker: VectorStatTracker to use for initial focus.
132     - max_samples: None or a limit for samples to use.
133
134     Output/result object (sent to pipe queue)
135     is a 5-tuple of the following fields:
136     - value_tracker: ScalarDualStatTracker estimate of value posterior.
137     - param_focus_tracker: VectorStatTracker to use for initial focus next.
138     - debug_list: List of debug strings to log at main process.
139     - trace_list: List of trace strings to pass to main process if enabled.
140     - samples: Number of samples used in computation (to make it reproducible).
141     Trace strings are very verbose, it is not recommended to enable them.
142     In they are not enabled, trace_list will be empty.
143     It is recommended to edit some lines manually to debug_list if needed.
144
145     :param communication_pipe: Pipe to comunicate with boss process.
146     :param scale_coeff: Float number to tweak convergence speed with.
147     :param trace_enabled: Whether trace list should be populated at all.
148         Default: False
149     :type communication_pipe: multiprocessing.Connection (or compatible)
150     :type scale_coeff: float
151     :type trace_enabled: boolean
152     :raises OverflowError: If one sample dominates the rest too much.
153         Or if value_logweight_function does not handle
154         some part of parameter space carefully enough.
155     :raises numpy.linalg.LinAlgError: If the focus shape gets singular
156         (due to rounding errors). Try changing scale_coeff.
157     """
158
159     debug_list = list()
160     trace_list = list()
161     # Block until input object appears.
162     dimension, dilled_function, param_focus_tracker, max_samples = (
163         communication_pipe.recv())
164     debug_list.append("Called with param_focus_tracker {tracker!r}"
165                       .format(tracker=param_focus_tracker))
166
167     def trace(name, value):
168         """
169         Add a variable (name and value) to trace list (if enabled).
170
171         This is a closure (not a pure function),
172         as it accesses trace_list and trace_enabled
173         (without any of them being an explicit argument).
174
175         :param name: Any string identifying the value.
176         :param value: Any object to log repr of.
177         :type name: str
178         :type value: object
179         """
180         if trace_enabled:
181             trace_list.append(name + " " + repr(value))
182
183     value_logweight_function = dill.loads(dilled_function)
184     samples = 0
185     # Importance sampling produces samples of higher weight (important)
186     # more frequently, and corrects that by adding weight bonus
187     # for the less frequently (unimportant) samples.
188     # But "corrected_weight" is too close to "weight" to be readable,
189     # so "importance" is used instead, even if it runs contrary to what
190     # important region is.
191     value_tracker = stat_trackers.ScalarDualStatTracker()
192     param_sampled_tracker = stat_trackers.VectorStatTracker(dimension).reset()
193     if not param_focus_tracker:
194         # First call has None instead of a real (even empty) tracker.
195         param_focus_tracker = stat_trackers.VectorStatTracker(dimension)
196         param_focus_tracker.unit_reset()
197     else:
198         # Focus tracker has probably too high weight.
199         param_focus_tracker.log_sum_weight = None
200     random.seed(0)
201     while not communication_pipe.poll():
202         if max_samples and samples >= max_samples:
203             break
204         sample_point = generate_sample(param_focus_tracker.averages,
205                                        param_focus_tracker.covariance_matrix,
206                                        dimension,
207                                        scale_coeff)
208         trace("sample_point", sample_point)
209         samples += 1
210         trace("samples", samples)
211         value, log_weight = value_logweight_function(trace, *sample_point)
212         trace("value", value)
213         trace("log_weight", log_weight)
214         trace("focus tracker before adding", param_focus_tracker)
215         # Update focus related statistics.
216         param_distance = param_focus_tracker.add_without_dominance_get_distance(
217             sample_point, log_weight)
218         # The code above looked at weight (not importance).
219         # The code below looks at importance (not weight).
220         log_rarity = param_distance / 2.0
221         trace("log_rarity", log_rarity)
222         log_importance = log_weight + log_rarity
223         trace("log_importance", log_importance)
224         value_tracker.add(value, log_importance)
225         # Update sampled statistics.
226         param_sampled_tracker.add_get_shift(sample_point, log_importance)
227     debug_list.append("integrator used " + str(samples) + " samples")
228     debug_list.append(" ".join([
229         "value_avg", str(value_tracker.average),
230         "param_sampled_avg", repr(param_sampled_tracker.averages),
231         "param_sampled_cov", repr(param_sampled_tracker.covariance_matrix),
232         "value_log_variance", str(value_tracker.log_variance),
233         "value_log_secondary_variance",
234         str(value_tracker.secondary.log_variance)]))
235     communication_pipe.send(
236         (value_tracker, param_focus_tracker, debug_list, trace_list, samples))