1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Module for numerical integration, tightly coupled to PLRsearch algorithm.
16 See log_plus for an explanation why None acts as a special case "float" number.
18 TODO: Separate optimizations specific to PLRsearch and distribute the rest
19 as a standalone package so other projects may reuse.
26 from numpy import random
28 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
29 # then switch to absolute imports within PLRsearch package.
30 # Current usage of relative imports is just a short term workaround.
31 from . import stat_trackers
34 def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
35 """Call estimate_nd but catch any exception and send traceback.
37 This function does not return anything, computation result
38 is sent via the communication pipe instead.
40 TODO: Move scale_coeff to a field of data class
41 with constructor/factory hiding the default value,
42 and receive its instance via pipe, instead of argument.
44 :param communication_pipe: Endpoint for communication with parent process.
45 :param scale_coeff: Float number to tweak convergence speed with.
46 :param trace_enabled: Whether to emit trace level debugs.
47 Keeping trace disabled improves speed and saves memory.
48 Enable trace only when debugging the computation itself.
49 :type communication_pipe: multiprocessing.Connection
50 :type scale_coeff: float
51 :type trace_enabled: bool
52 :raises BaseException: Anything raised by interpreter or estimate_nd.
55 estimate_nd(communication_pipe, scale_coeff, trace_enabled)
57 # Any subclass could have caused estimate_nd to stop before sending,
58 # so we have to catch them all.
59 traceback_string = traceback.format_exc()
60 communication_pipe.send(traceback_string)
61 # After sendig, re-raise, so usages other than "one process per call"
62 # keep behaving correctly.
66 def generate_sample(averages, covariance_matrix, dimension, scale_coeff):
67 """Generate next sample for estimate_nd.
69 Arguments control the multivariate normal "focus".
70 Keep generating until the sample point fits into unit area.
72 :param averages: Coordinates of the focus center.
73 :param covariance_matrix: Matrix controlling the spread around the average.
74 :param dimension: If N is dimension, average is N vector and matrix is NxN.
75 :param scale_coeff: Coefficient to conformally multiply the spread.
76 :type averages: Indexable of N floats
77 :type covariance_matrix: Indexable of N indexables of N floats
79 :type scale_coeff: float
80 :returns: The generated sample point.
81 :rtype: N-tuple of float
83 covariance_matrix = copy.deepcopy(covariance_matrix)
84 for first in range(dimension):
85 for second in range(dimension):
86 covariance_matrix[first][second] *= scale_coeff
88 sample_point = random.multivariate_normal(
89 averages, covariance_matrix, 1)[0].tolist()
90 # Multivariate Gauss can fall outside (-1, 1) interval
91 for first in range(dimension):
92 sample_coordinate = sample_point[first]
93 if sample_coordinate <= -1.0 or sample_coordinate >= 1.0:
99 def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
100 """Use Bayesian inference from control queue, put result to result queue.
102 TODO: Use a logging framework that works in a user friendly way.
103 (Note that multiprocessing_logging does not work well with robot
104 and robotbackgroundlogger only works for threads, not processes.
105 Or, wait for https://github.com/robotframework/robotframework/pull/2182
106 Anyway, the current implementation with trace_enabled looks ugly.)
108 The result is average and standard deviation for posterior distribution
109 of a single dependent (scalar, float) value.
110 The prior is assumed to be uniform on (-1, 1) for every parameter.
111 Number of parameters and the function for computing
112 the dependent value and likelihood both come from input.
114 The likelihood is assumed to be extremely uneven (but never zero),
115 so the function should return the logarithm of the likelihood.
116 The integration method is basically a Monte Carlo
117 (TODO: Add links to notions used here.),
118 but importance sampling is used in order to focus
119 on the part of parameter space with (relatively) non-negligible likelihood.
121 Multivariate Gauss distribution is used for focusing,
122 so only unimodal posterior distributions are handled correctly.
123 Initial samples are mostly used for shaping (and shifting)
124 the Gaussian distribution, later samples will probably dominate.
125 Thus, initially the algorithm behavior resembles more "find the maximum",
126 as opposed to "reliably integrate". As for later iterations of PLRsearch,
127 it is assumed that the distribution position does not change rapidly;
128 thus integration algorithm returns also the distribution data,
129 to be used as initial focus in next iteration.
131 There are workarounds in place that allow old or default focus tracker
132 to be updated reasonably, even when initial samples
133 of new iteration have way smaller (or larger) weights.
135 During the "find the maximum" phase, the focus tracker frequently takes
136 a wrong shape (compared to observed samples in equilibrium).
137 Therefore scale_coeff argument is left for humans to tweak,
138 so the convergence is reliable and quick.
140 Until the distribution locates itself roughly around
141 the maximum likeligood point, the integration results are probably wrong.
142 That means some minimal time is needed for the result to become reliable.
144 TODO: The folowing is not currently implemented.
145 The reported standard distribution attempts to signal inconsistence
146 (when one sample has dominating weight compared to the rest of samples),
147 but some human supervision is strongly encouraged.
149 To facilitate running in worker processes, arguments and results
150 are communicated via a pipe. The computation does not start
151 until arguments appear in the pipe, the computation stops
152 when another item (stop object) is detected in the pipe
153 (and result is put to pipe).
155 TODO: Create classes for arguments and results,
156 so their fields are documented (and code perhaps more readable).
158 Input/argument object (received from pipe)
159 is a 4-tuple of the following fields:
160 - dimension: Integer, number of parameters to consider.
161 - dilled_function: Function (serialized using dill), which:
162 - - Takes the dimension number of float parameters from (-1, 1).
163 - - Returns float 2-tuple of dependent value and parameter log-likelihood.
164 - param_focus_tracker: VectorStatTracker to use for initial focus.
165 - max_samples: None or a limit for samples to use.
167 Output/result object (sent to pipe queue)
168 is a 5-tuple of the following fields:
169 - value_tracker: ScalarDualStatTracker estimate of value posterior.
170 - param_focus_tracker: VectorStatTracker to use for initial focus next.
171 - debug_list: List of debug strings to log at main process.
172 - trace_list: List of trace strings to pass to main process if enabled.
173 - samples: Number of samples used in computation (to make it reproducible).
174 Trace strings are very verbose, it is not recommended to enable them.
175 In they are not enabled, trace_list will be empty.
176 It is recommended to edit some lines manually to debug_list if needed.
178 :param communication_pipe: Endpoint for communication with parent process.
179 :param scale_coeff: Float number to tweak convergence speed with.
180 :param trace_enabled: Whether trace list should be populated at all.
181 :type communication_pipe: multiprocessing.Connection
182 :type scale_coeff: float
183 :type trace_enabled: bool
184 :raises OverflowError: If one sample dominates the rest too much.
185 Or if value_logweight_function does not handle
186 some part of parameter space carefully enough.
187 :raises numpy.linalg.LinAlgError: If the focus shape gets singular
188 (due to rounding errors). Try changing scale_coeff.
193 # Block until input object appears.
194 dimension, dilled_function, param_focus_tracker, max_samples = (
195 communication_pipe.recv())
196 debug_list.append("Called with param_focus_tracker {tracker!r}"
197 .format(tracker=param_focus_tracker))
199 def trace(name, value):
201 Add a variable (name and value) to trace list (if enabled).
203 This is a closure (not a pure function),
204 as it accesses trace_list and trace_enabled
205 (without any of them being an explicit argument).
207 :param name: Any string identifying the value.
208 :param value: Any object to log repr of.
213 trace_list.append(name + " " + repr(value))
215 value_logweight_function = dill.loads(dilled_function)
217 # Importance sampling produces samples of higher weight (important)
218 # more frequently, and corrects that by adding weight bonus
219 # for the less frequently (unimportant) samples.
220 # But "corrected_weight" is too close to "weight" to be readable,
221 # so "importance" is used instead, even if it runs contrary to what
222 # important region is.
223 value_tracker = stat_trackers.ScalarDualStatTracker()
224 param_sampled_tracker = stat_trackers.VectorStatTracker(dimension).reset()
225 if not param_focus_tracker:
226 # First call has None instead of a real (even empty) tracker.
227 param_focus_tracker = stat_trackers.VectorStatTracker(dimension)
228 param_focus_tracker.unit_reset()
230 # Focus tracker has probably too high weight.
231 param_focus_tracker.log_sum_weight = None
233 while not communication_pipe.poll():
234 if max_samples and samples >= max_samples:
236 sample_point = generate_sample(
237 param_focus_tracker.averages, param_focus_tracker.covariance_matrix,
238 dimension, scale_coeff)
239 trace("sample_point", sample_point)
241 trace("samples", samples)
242 value, log_weight = value_logweight_function(trace, *sample_point)
243 trace("value", value)
244 trace("log_weight", log_weight)
245 trace("focus tracker before adding", param_focus_tracker)
246 # Update focus related statistics.
247 param_distance = param_focus_tracker.add_without_dominance_get_distance(
248 sample_point, log_weight)
249 # The code above looked at weight (not importance).
250 # The code below looks at importance (not weight).
251 log_rarity = param_distance / 2.0
252 trace("log_rarity", log_rarity)
253 log_importance = log_weight + log_rarity
254 trace("log_importance", log_importance)
255 value_tracker.add(value, log_importance)
256 # Update sampled statistics.
257 param_sampled_tracker.add_get_shift(sample_point, log_importance)
258 debug_list.append("integrator used " + str(samples) + " samples")
259 debug_list.append(" ".join([
260 "value_avg", str(value_tracker.average),
261 "param_sampled_avg", repr(param_sampled_tracker.averages),
262 "param_sampled_cov", repr(param_sampled_tracker.covariance_matrix),
263 "value_log_variance", str(value_tracker.log_variance),
264 "value_log_secondary_variance",
265 str(value_tracker.secondary.log_variance)]))
266 communication_pipe.send(
267 (value_tracker, param_focus_tracker, debug_list, trace_list, samples))