1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Module for numerical integration, tightly coupled to PLRsearch algorithm.
16 See log_plus for an explanation why None acts as a special case "float" number.
18 TODO: Separate optimizations specific to PLRsearch and distribute the rest
19 as a standalone package so other projects may reuse.
26 from numpy import random
28 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
29 # then switch to absolute imports within PLRsearch package.
30 # Current usage of relative imports is just a short term workaround.
31 import stat_trackers # pylint: disable=relative-import
34 def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
35 """Call estimate_nd but catch any exception and send traceback."""
37 return estimate_nd(communication_pipe, scale_coeff, trace_enabled)
39 # Any subclass could have caused estimate_nd to stop before sending,
40 # so we have to catch them all.
41 traceback_string = traceback.format_exc()
42 communication_pipe.send(traceback_string)
43 # After sendig, re-raise, so usages other than "one process per call"
44 # keep behaving correctly.
48 def generate_sample(averages, covariance_matrix, dimension, scale_coeff):
49 """Generate next sample for estimate_nd"""
50 covariance_matrix = copy.deepcopy(covariance_matrix)
51 for first in range(dimension):
52 for second in range(dimension):
53 covariance_matrix[first][second] *= scale_coeff
55 sample_point = random.multivariate_normal(
56 averages, covariance_matrix, 1)[0].tolist()
57 # Multivariate Gauss can fall outside (-1, 1) interval
58 for first in range(dimension):
59 sample_coordinate = sample_point[first]
60 if sample_coordinate <= -1.0 or sample_coordinate >= 1.0:
66 def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
67 """Use Bayesian inference from control queue, put result to result queue.
69 TODO: Use a logging framework that works in a user friendly way.
70 (Note that multiprocessing_logging does not work well with robot
71 and robotbackgroundlogger only works for threads, not processes.
72 Or, wait for https://github.com/robotframework/robotframework/pull/2182
73 Anyway, the current implementation with trace_enabled looks ugly.)
75 The result is average and standard deviation for posterior distribution
76 of a single dependent (scalar, float) value.
77 The prior is assumed to be uniform on (-1, 1) for every parameter.
78 Number of parameters and the function for computing
79 the dependent value and likelihood both come from input.
81 The likelihood is assumed to be extremely uneven (but never zero),
82 so the function should return the logarithm of the likelihood.
83 The integration method is basically a Monte Carlo
84 (TODO: Add links to notions used here.),
85 but importance sampling is used in order to focus
86 on the part of parameter space with (relatively) non-negligible likelihood.
88 Multivariate Gauss distribution is used for focusing,
89 so only unimodal posterior distributions are handled correctly.
90 Initial samples are mostly used for shaping (and shifting)
91 the Gaussian distribution, later samples will probably dominate.
92 Thus, initially the algorithm behavior resembles more "find the maximum",
93 as opposed to "reliably integrate". As for later iterations of PLRsearch,
94 it is assumed that the distribution position does not change rapidly;
95 thus integration algorithm returns also the distribution data,
96 to be used as initial focus in next iteration.
98 There are workarounds in place that allow old or default focus tracker
99 to be updated reasonably, even when initial samples
100 of new iteration have way smaller (or larger) weights.
102 During the "find the maximum" phase, the focus tracker frequently takes
103 a wrong shape (compared to observed samples in equilibrium).
104 Therefore scale_coeff argument is left for humans to tweak,
105 so the convergence is reliable and quick.
107 Until the distribution locates itself roughly around
108 the maximum likeligood point, the integration results are probably wrong.
109 That means some minimal time is needed for the result to become reliable.
111 TODO: The folowing is not currently implemented.
112 The reported standard distribution attempts to signal inconsistence
113 (when one sample has dominating weight compared to the rest of samples),
114 but some human supervision is strongly encouraged.
116 To facilitate running in worker processes, arguments and results
117 are communicated via a pipe. The computation does not start
118 until arguments appear in the pipe, the computation stops
119 when another item (stop object) is detected in the pipe
120 (and result is put to pipe).
122 TODO: Create classes for arguments and results,
123 so their fields are documented (and code perhaps more readable).
125 Input/argument object (received from pipe)
126 is a 4-tuple of the following fields:
127 - dimension: Integer, number of parameters to consider.
128 - dilled_function: Function (serialized using dill), which:
129 - - Takes the dimension number of float parameters from (-1, 1).
130 - - Returns float 2-tuple of dependent value and parameter log-likelihood.
131 - param_focus_tracker: VectorStatTracker to use for initial focus.
132 - max_samples: None or a limit for samples to use.
134 Output/result object (sent to pipe queue)
135 is a 5-tuple of the following fields:
136 - value_tracker: ScalarDualStatTracker estimate of value posterior.
137 - param_focus_tracker: VectorStatTracker to use for initial focus next.
138 - debug_list: List of debug strings to log at main process.
139 - trace_list: List of trace strings to pass to main process if enabled.
140 - samples: Number of samples used in computation (to make it reproducible).
141 Trace strings are very verbose, it is not recommended to enable them.
142 In they are not enabled, trace_list will be empty.
143 It is recommended to edit some lines manually to debug_list if needed.
145 :param communication_pipe: Pipe to comunicate with boss process.
146 :param scale_coeff: Float number to tweak convergence speed with.
147 :param trace_enabled: Whether trace list should be populated at all.
149 :type communication_pipe: multiprocessing.Connection (or compatible)
150 :type scale_coeff: float
151 :type trace_enabled: boolean
152 :raises OverflowError: If one sample dominates the rest too much.
153 Or if value_logweight_function does not handle
154 some part of parameter space carefully enough.
155 :raises numpy.linalg.LinAlgError: If the focus shape gets singular
156 (due to rounding errors). Try changing scale_coeff.
161 # Block until input object appears.
162 dimension, dilled_function, param_focus_tracker, max_samples = (
163 communication_pipe.recv())
164 debug_list.append("Called with param_focus_tracker {tracker!r}"
165 .format(tracker=param_focus_tracker))
167 def trace(name, value):
169 Add a variable (name and value) to trace list (if enabled).
171 This is a closure (not a pure function),
172 as it accesses trace_list and trace_enabled
173 (without any of them being an explicit argument).
175 :param name: Any string identifying the value.
176 :param value: Any object to log repr of.
181 trace_list.append(name + " " + repr(value))
183 value_logweight_function = dill.loads(dilled_function)
185 # Importance sampling produces samples of higher weight (important)
186 # more frequently, and corrects that by adding weight bonus
187 # for the less frequently (unimportant) samples.
188 # But "corrected_weight" is too close to "weight" to be readable,
189 # so "importance" is used instead, even if it runs contrary to what
190 # important region is.
191 value_tracker = stat_trackers.ScalarDualStatTracker()
192 param_sampled_tracker = stat_trackers.VectorStatTracker(dimension).reset()
193 if not param_focus_tracker:
194 # First call has None instead of a real (even empty) tracker.
195 param_focus_tracker = stat_trackers.VectorStatTracker(dimension)
196 param_focus_tracker.unit_reset()
198 # Focus tracker has probably too high weight.
199 param_focus_tracker.log_sum_weight = None
201 while not communication_pipe.poll():
202 if max_samples and samples >= max_samples:
204 sample_point = generate_sample(param_focus_tracker.averages,
205 param_focus_tracker.covariance_matrix,
208 trace("sample_point", sample_point)
210 trace("samples", samples)
211 value, log_weight = value_logweight_function(trace, *sample_point)
212 trace("value", value)
213 trace("log_weight", log_weight)
214 trace("focus tracker before adding", param_focus_tracker)
215 # Update focus related statistics.
216 param_distance = param_focus_tracker.add_without_dominance_get_distance(
217 sample_point, log_weight)
218 # The code above looked at weight (not importance).
219 # The code below looks at importance (not weight).
220 log_rarity = param_distance / 2.0
221 trace("log_rarity", log_rarity)
222 log_importance = log_weight + log_rarity
223 trace("log_importance", log_importance)
224 value_tracker.add(value, log_importance)
225 # Update sampled statistics.
226 param_sampled_tracker.add_get_shift(sample_point, log_importance)
227 debug_list.append("integrator used " + str(samples) + " samples")
228 debug_list.append(" ".join([
229 "value_avg", str(value_tracker.average),
230 "param_sampled_avg", repr(param_sampled_tracker.averages),
231 "param_sampled_cov", repr(param_sampled_tracker.covariance_matrix),
232 "value_log_variance", str(value_tracker.log_variance),
233 "value_log_secondary_variance",
234 str(value_tracker.secondary.log_variance)]))
235 communication_pipe.send(
236 (value_tracker, param_focus_tracker, debug_list, trace_list, samples))