1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Module for tracking average and variance for data of weighted samples.
16 See log_plus for an explanation why None acts as a special case "float" number.
18 TODO: Current implementation sets zero averages on empty tracker.
19 Should we set None instead?
21 TODO: Implement __str__ and __repr__ for easier debugging.
29 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
30 # then switch to absolute imports within PLRsearch package.
31 # Current usage of relative imports is just a short term workaround.
32 from .log_plus import log_plus, safe_exp
35 class ScalarStatTracker:
36 """Class for tracking one-dimensional samples.
38 Variance of one-dimensional data cannot be negative,
39 so this class avoids possible underflows by tracking logarithm
40 of the variance instead.
42 Similarly, sum of weights is also tracked as a logarithm.
45 def __init__(self, log_sum_weight=None, average=0.0, log_variance=None):
46 """Initialize new tracker instance, empty by default.
48 :param log_sum_weight: Natural logarithm of sum of weights
49 of samples so far. Default: None (as log of 0.0).
50 :param average: Weighted average of the samples. Default: 0.0
51 :param log_variance: Natural logarithm of variance.
52 Default: None (as log of 0.0).
53 :type log_sum_weight: float or None
55 :type log_variance: float or None
57 self.log_sum_weight = log_sum_weight
58 self.average = average
59 self.log_variance = log_variance
62 """Return string, which interpreted constructs state of self.
64 :returns: Expression constructing an equivalent instance.
67 return f"ScalarStatTracker(log_sum_weight={self.log_sum_weight!r}," \
68 f"average={self.average!r},log_variance={self.log_variance!r})"
71 """Return new ScalarStatTracker instance with the same state as self.
73 The point of this method is the return type, instances of subclasses
74 can get rid of their additional data this way.
76 :returns: New instance with the same core state.
77 :rtype: ScalarStatTracker
79 return ScalarStatTracker(
80 self.log_sum_weight, self.average, self.log_variance
83 def add(self, scalar_value, log_weight=0.0):
84 """Return updated stats corresponding to addition of another sample.
86 :param scalar_value: The scalar value of the sample.
87 :param log_weight: Natural logarithm of weight of the sample.
88 Default: 0.0 (as log of 1.0).
89 :type scalar_value: float
90 :type log_weight: float
91 :returns: Updated self.
92 :rtype: ScalarStatTracker
94 old_log_sum_weight = self.log_sum_weight
95 if old_log_sum_weight is None:
97 self.log_sum_weight = log_weight
98 self.average = scalar_value
100 old_average = self.average
101 log_variance = self.log_variance
102 new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
103 log_sample_ratio = log_weight - new_log_sum_weight
104 sample_ratio = math.exp(log_sample_ratio)
105 shift = scalar_value - old_average
106 new_average = old_average + shift * sample_ratio
107 # The log_variance is updated in-place (instead of new_ vs old_)
108 # because of if-s detecting None-s where the value does not change.
109 absolute_shift = abs(shift)
110 if absolute_shift > 0.0:
111 log_square_shift = 2 * math.log(absolute_shift)
112 log_variance = log_plus(
113 log_variance, log_square_shift + log_sample_ratio)
114 if log_variance is not None:
115 log_variance += old_log_sum_weight - new_log_sum_weight
116 self.log_sum_weight = new_log_sum_weight
117 self.average = new_average
118 self.log_variance = log_variance
122 class ScalarDualStatTracker(ScalarStatTracker):
123 """Class for tracking one-dimensional samples, offering dual stats.
125 It means that instead of just primary stats (identical to what
126 ScalarStatTracker offers, that is why this is its subclass),
127 this tracker allso offers secondary stats.
128 Secondary stats are scalar stats that track the same data,
129 except that the most weighty (or later if tied) sample is not considered.
131 Users can analyze how do secondary stats differ from the primary ones,
132 and based on that decide whether they processed "enough" data.
133 One typical use is for Monte Carlo integrator to decide whether
134 the partial sums so far are reliable enough.
137 self, log_sum_weight=None, average=0.0, log_variance=None,
138 log_sum_secondary_weight=None, secondary_average=0.0,
139 log_secondary_variance=None, max_log_weight=None):
140 """Initialize new tracker instance, empty by default.
142 :param log_sum_weight: Natural logarithm of sum of weights
143 of samples so far. Default: None (as log of 0.0).
144 :param average: Weighted average of the samples. Default: 0.0
145 :param log_variance: Natural logarithm of variance.
146 Default: None (as log of 0.0).
147 :param log_sum_secondary_weight: Natural logarithm of sum of weights
148 of samples so far except weightest one.
149 Default: None (as log of 0.0).
150 :param secondary_average: Weighted average of the samples
151 except the weightest one. Default: 0.0
152 :param log_variance: Natural logarithm of variance od samples
153 except the weightest one. Default: None (as log of 0.0).
154 :param max_log_weight: Natural logarithm of weight of sample
155 counted in primary but not secondary stats.
156 Default: None (as log of 0.0).
157 :type log_sum_weight: float or None
159 :type log_variance: float or None
160 :type log_sum_secondary_weight: float or None
161 :type secondary_average: float
162 :type log_secondary_variance: float or None
163 :type max_log_weight: float or None
165 # Not using super() as the constructor signature is different,
166 # so in case of diamond inheritance mismatch would be probable.
167 ScalarStatTracker.__init__(self, log_sum_weight, average, log_variance)
168 self.secondary = ScalarStatTracker(
169 log_sum_secondary_weight, secondary_average, log_secondary_variance
171 self.max_log_weight = max_log_weight
174 """Return string, which interpreted constructs state of self.
176 :returns: Expression contructing an equivalent instance.
180 return f"ScalarDualStatTracker(log_sum_weight={self.log_sum_weight!r},"\
181 f"average={self.average!r},log_variance={self.log_variance!r}," \
182 f"log_sum_secondary_weight={sec.log_sum_weight!r}," \
183 f"secondary_average={sec.average!r}," \
184 f"log_secondary_variance={sec.log_variance!r}," \
185 f"max_log_weight={self.max_log_weight!r})"
187 def add(self, scalar_value, log_weight=0.0):
188 """Return updated both stats after addition of another sample.
190 :param scalar_value: The scalar value of the sample.
191 :param log_weight: Natural logarithm of weight of the sample.
192 Default: 0.0 (as log of 1.0).
193 :type scalar_value: float
194 :type log_weight: float
195 :returns: Updated self.
196 :rtype: ScalarDualStatTracker
198 # Using super() as copy() and add() are not expected to change
199 # signature, so this way diamond inheritance will be supported.
200 primary = super(ScalarDualStatTracker, self)
201 if self.max_log_weight is None or log_weight >= self.max_log_weight:
202 self.max_log_weight = log_weight
203 self.secondary = primary.copy()
205 self.secondary.add(scalar_value, log_weight)
206 primary.add(scalar_value, log_weight)
209 def get_pessimistic_variance(self):
210 """Return estimate of variance reflecting weight effects.
212 Typical scenario is the primary tracker dominated by a single sample.
213 In worse case, secondary tracker is also dominated by
214 a single (but different) sample.
216 Current implementation simply returns variance of average
217 of the two trackers, as if they were independent.
219 :returns: Pessimistic estimate of variance (not stdev, no log).
222 var_primary = safe_exp(self.log_variance)
223 var_secondary = safe_exp(self.secondary.log_variance)
224 var_combined = (var_primary + var_secondary) / 2
225 avg_half_diff = (self.average - self.secondary.average) / 2
226 var_combined += avg_half_diff * avg_half_diff
230 class VectorStatTracker:
231 """Class for tracking multi-dimensional samples.
233 Contrary to one-dimensional data, multi-dimensional covariance matrix
234 contains off-diagonal elements which can be negative.
236 But, sum of weights is never negative, so it is tracked as a logarithm.
238 The code assumes every method gets arguments with correct dimensionality
239 as set in constructor. No checks are performed (for performance reasons).
241 TODO: Should we provide a subclass with the dimensionality checks?
245 self, dimension=2, log_sum_weight=None, averages=None,
246 covariance_matrix=None):
247 """Initialize new tracker instance, two-dimensional empty by default.
249 If any of latter two arguments is None, it means
250 the tracker state is invalid. Use reset method
251 to create empty tracker of constructed dimensionality.
253 :param dimension: Number of scalar components of samples.
254 :param log_sum_weight: Natural logarithm of sum of weights
255 of samples so far. Default: None (as log of 0.0).
256 :param averages: Weighted average of the samples.
258 :param covariance_matrix: Variance matrix elements.
260 :type log_sum_weight: float or None
261 :type averages: None or tuple of float
262 :type covariance_matrix: None or tuple of tuple of float
264 self.dimension = dimension
265 self.log_sum_weight = log_sum_weight
266 self.averages = averages
267 self.covariance_matrix = covariance_matrix
270 """Return string, which interpreted constructs state of self.
272 :returns: Expression constructing an equivalent instance.
275 return f"VectorStatTracker(dimension={self.dimension!r}," \
276 f"log_sum_weight={self.log_sum_weight!r}," \
277 f"averages={self.averages!r}," \
278 f"covariance_matrix={self.covariance_matrix!r})"
281 """Return new instance with the same state as self.
283 The main usage is to simplify debugging. This method allows
284 to store the old state, while self continues to track towards new state.
286 :returns: Created tracker instance.
287 :rtype: VectorStatTracker
289 return VectorStatTracker(
290 self.dimension, self.log_sum_weight, self.averages[:],
291 copy.deepcopy(self.covariance_matrix)
295 """Return state set to empty data of proper dimensionality.
297 :returns: Updated self.
298 :rtype: VectorStatTracker
300 self.averages = [0.0 for _ in range(self.dimension)]
301 # TODO: Examine whether we can gain speed by tracking triangle only.
302 self.covariance_matrix = [
303 [0.0 for _ in range(self.dimension)] for _ in range(self.dimension)
305 # TODO: In Python3, list comprehensions are generators,
306 # so they are not indexable. Put list() when converting.
309 def unit_reset(self):
310 """Reset state but use unit matric as covariance.
312 :returns: Updated self.
313 :rtype: VectorStatTracker
316 for index in range(self.dimension):
317 self.covariance_matrix[index][index] = 1.0
320 def add_get_shift(self, vector_value, log_weight=0.0):
321 """Return shift and update state to addition of another sample.
323 Shift is the vector from old average to new sample.
324 For most callers, returning shift is more useful than returning self.
326 :param vector_value: The value of the sample.
327 :param log_weight: Natural logarithm of weight of the sample.
328 Default: 0.0 (as log of 1.0).
329 :type vector_value: iterable of float
330 :type log_weight: float
331 :returns: Shift vector
332 :rtype: list of float
334 dimension = self.dimension
335 old_log_sum_weight = self.log_sum_weight
336 old_averages = self.averages
338 shift = [0.0 for _ in range(dimension)]
341 vector_value[index] - old_averages[index]
342 for index in range(dimension)
344 if old_log_sum_weight is None:
346 self.log_sum_weight = log_weight
347 self.averages = [vector_value[index] for index in range(dimension)]
348 # Not touching covariance matrix.
350 covariance_matrix = self.covariance_matrix
351 new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
352 data_ratio = math.exp(old_log_sum_weight - new_log_sum_weight)
353 sample_ratio = math.exp(log_weight - new_log_sum_weight)
355 old_averages[index] + shift[index] * sample_ratio
356 for index in range(dimension)
358 # It is easier to update covariance matrix in-place.
359 for second in range(dimension):
360 for first in range(dimension):
361 element = covariance_matrix[first][second]
362 element += shift[first] * shift[second] * sample_ratio
363 element *= data_ratio
364 covariance_matrix[first][second] = element
365 self.log_sum_weight = new_log_sum_weight
366 self.averages = new_averages
367 # self.covariance_matrix still points to the object we updated in-place.
370 # TODO: There are some uses for such a vector tracker,
371 # that does not track average, but weightest (latest if tied) value,
372 # and computes covariance matrix centered around that.
373 # But perhaps the following method would work similarly well?
374 def add_without_dominance_get_distance(self, vector_value, log_weight=0.0):
375 """Update stats, avoid having the sample dominate, return old distance.
377 If the weight of the incoming sample is far bigger
378 than the weight of all the previous data together,
379 covariance matrix would suffer from underflow.
380 To avoid that, this method manipulates both weights
381 before calling add().
383 The old covariance matrix (before update) can define a metric.
384 Shift is a vector, so the metric can be used to compute a distance
385 between old average and new sample.
387 :param vector_value: The value of the sample.
388 :param log_weight: Natural logarithm of weight of the sample.
389 Default: 0.0 (as log of 1.0).
390 :type vector_value: iterable of float
391 :type log_weight: float
392 :returns: Updated self.
393 :rtype: VectorStatTracker
395 lsw = self.log_sum_weight
396 if lsw is not None and lsw < log_weight - 1.0:
397 lsw = (lsw + log_weight) / 2.0
399 self.log_sum_weight = lsw
400 old_metric = copy.deepcopy(self.covariance_matrix)
401 shift = self.add_get_shift(vector_value, log_weight)
402 gradient = numpy.linalg.solve(old_metric, shift)
403 distance = numpy.vdot(shift, gradient)