2a7a05cae63412b87075b9fc9c6b8e3577cebbf3
[csit.git] / resources / libraries / python / PLRsearch / stat_trackers.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module for tracking average and variance for data of weighted samples.
15
16 See log_plus for an explanation why None acts as a special case "float" number.
17
18 TODO: Current implementation sets zero averages on empty tracker.
19 Should we set None instead?
20
21 TODO: Implement __str__ and __repr__ for easier debugging.
22 """
23
24 import copy
25 import math
26
27 import numpy
28
29 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
30 # then switch to absolute imports within PLRsearch package.
31 # Current usage of relative imports is just a short term workaround.
32 from .log_plus import log_plus, safe_exp
33
34
35 class ScalarStatTracker:
36     """Class for tracking one-dimensional samples.
37
38     Variance of one-dimensional data cannot be negative,
39     so this class avoids possible underflows by tracking logarithm
40     of the variance instead.
41
42     Similarly, sum of weights is also tracked as a logarithm.
43     """
44
45     def __init__(self, log_sum_weight=None, average=0.0, log_variance=None):
46         """Initialize new tracker instance, empty by default.
47
48         :param log_sum_weight: Natural logarithm of sum of weights
49             of samples so far. Default: None (as log of 0.0).
50         :param average: Weighted average of the samples. Default: 0.0
51         :param log_variance: Natural logarithm of variance.
52             Default: None (as log of 0.0).
53         :type log_sum_weight: float or None
54         :type average: float
55         :type log_variance: float or None
56         """
57         self.log_sum_weight = log_sum_weight
58         self.average = average
59         self.log_variance = log_variance
60
61     def __repr__(self):
62         """Return string, which interpreted constructs state of self.
63
64         :returns: Expression constructing an equivalent instance.
65         :rtype: str
66         """
67         return f"ScalarStatTracker(log_sum_weight={self.log_sum_weight!r}," \
68             f"average={self.average!r},log_variance={self.log_variance!r})"
69
70     def copy(self):
71         """Return new ScalarStatTracker instance with the same state as self.
72
73         The point of this method is the return type, instances of subclasses
74         can get rid of their additional data this way.
75
76         :returns: New instance with the same core state.
77         :rtype: ScalarStatTracker
78         """
79         return ScalarStatTracker(
80             self.log_sum_weight, self.average, self.log_variance
81         )
82
83     def add(self, scalar_value, log_weight=0.0):
84         """Return updated stats corresponding to addition of another sample.
85
86         :param scalar_value: The scalar value of the sample.
87         :param log_weight: Natural logarithm of weight of the sample.
88             Default: 0.0 (as log of 1.0).
89         :type scalar_value: float
90         :type log_weight: float
91         :returns: Updated self.
92         :rtype: ScalarStatTracker
93         """
94         old_log_sum_weight = self.log_sum_weight
95         if old_log_sum_weight is None:
96             # First sample.
97             self.log_sum_weight = log_weight
98             self.average = scalar_value
99             return self
100         old_average = self.average
101         log_variance = self.log_variance
102         new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
103         log_sample_ratio = log_weight - new_log_sum_weight
104         sample_ratio = math.exp(log_sample_ratio)
105         shift = scalar_value - old_average
106         new_average = old_average + shift * sample_ratio
107         # The log_variance is updated in-place (instead of new_ vs old_)
108         # because of if-s detecting None-s where the value does not change.
109         absolute_shift = abs(shift)
110         if absolute_shift > 0.0:
111             log_square_shift = 2 * math.log(absolute_shift)
112             log_variance = log_plus(
113                 log_variance, log_square_shift + log_sample_ratio)
114         if log_variance is not None:
115             log_variance += old_log_sum_weight - new_log_sum_weight
116         self.log_sum_weight = new_log_sum_weight
117         self.average = new_average
118         self.log_variance = log_variance
119         return self
120
121
122 class ScalarDualStatTracker(ScalarStatTracker):
123     """Class for tracking one-dimensional samples, offering dual stats.
124
125     It means that instead of just primary stats (identical to what
126     ScalarStatTracker offers, that is why this is its subclass),
127     this tracker allso offers secondary stats.
128     Secondary stats are scalar stats that track the same data,
129     except that the most weighty (or later if tied) sample is not considered.
130
131     Users can analyze how do secondary stats differ from the primary ones,
132     and based on that decide whether they processed "enough" data.
133     One typical use is for Monte Carlo integrator to decide whether
134     the partial sums so far are reliable enough.
135     """
136     def __init__(
137             self, log_sum_weight=None, average=0.0, log_variance=None,
138             log_sum_secondary_weight=None, secondary_average=0.0,
139             log_secondary_variance=None, max_log_weight=None):
140         """Initialize new tracker instance, empty by default.
141
142         :param log_sum_weight: Natural logarithm of sum of weights
143             of samples so far. Default: None (as log of 0.0).
144         :param average: Weighted average of the samples. Default: 0.0
145         :param log_variance: Natural logarithm of variance.
146             Default: None (as log of 0.0).
147         :param log_sum_secondary_weight: Natural logarithm of sum of weights
148             of samples so far except weightest one.
149             Default: None (as log of 0.0).
150         :param secondary_average: Weighted average of the samples
151             except the weightest one. Default: 0.0
152         :param log_variance: Natural logarithm of variance od samples
153             except the weightest one. Default: None (as log of 0.0).
154         :param max_log_weight: Natural logarithm of weight of sample
155             counted in primary but not secondary stats.
156             Default: None (as log of 0.0).
157         :type log_sum_weight: float or None
158         :type average: float
159         :type log_variance: float or None
160         :type log_sum_secondary_weight: float or None
161         :type secondary_average: float
162         :type log_secondary_variance: float or None
163         :type max_log_weight: float or None
164         """
165         # Not using super() as the constructor signature is different,
166         # so in case of diamond inheritance mismatch would be probable.
167         ScalarStatTracker.__init__(self, log_sum_weight, average, log_variance)
168         self.secondary = ScalarStatTracker(
169             log_sum_secondary_weight, secondary_average, log_secondary_variance
170         )
171         self.max_log_weight = max_log_weight
172
173     def __repr__(self):
174         """Return string, which interpreted constructs state of self.
175
176         :returns: Expression contructing an equivalent instance.
177         :rtype: str
178         """
179         sec = self.secondary
180         return f"ScalarDualStatTracker(log_sum_weight={self.log_sum_weight!r},"\
181             f"average={self.average!r},log_variance={self.log_variance!r}," \
182             f"log_sum_secondary_weight={sec.log_sum_weight!r}," \
183             f"secondary_average={sec.average!r}," \
184             f"log_secondary_variance={sec.log_variance!r}," \
185             f"max_log_weight={self.max_log_weight!r})"
186
187     def add(self, scalar_value, log_weight=0.0):
188         """Return updated both stats after addition of another sample.
189
190         :param scalar_value: The scalar value of the sample.
191         :param log_weight: Natural logarithm of weight of the sample.
192             Default: 0.0 (as log of 1.0).
193         :type scalar_value: float
194         :type log_weight: float
195         :returns: Updated self.
196         :rtype: ScalarDualStatTracker
197         """
198         # Using super() as copy() and add() are not expected to change
199         # signature, so this way diamond inheritance will be supported.
200         primary = super(ScalarDualStatTracker, self)
201         if self.max_log_weight is None or log_weight >= self.max_log_weight:
202             self.max_log_weight = log_weight
203             self.secondary = primary.copy()
204         else:
205             self.secondary.add(scalar_value, log_weight)
206         primary.add(scalar_value, log_weight)
207         return self
208
209     def get_pessimistic_variance(self):
210         """Return estimate of variance reflecting weight effects.
211
212         Typical scenario is the primary tracker dominated by a single sample.
213         In worse case, secondary tracker is also dominated by
214         a single (but different) sample.
215
216         Current implementation simply returns variance of average
217         of the two trackers, as if they were independent.
218
219         :returns: Pessimistic estimate of variance (not stdev, no log).
220         :rtype: float
221         """
222         var_primary = safe_exp(self.log_variance)
223         var_secondary = safe_exp(self.secondary.log_variance)
224         var_combined = (var_primary + var_secondary) / 2
225         avg_half_diff = (self.average - self.secondary.average) / 2
226         var_combined += avg_half_diff * avg_half_diff
227         return var_combined
228
229
230 class VectorStatTracker:
231     """Class for tracking multi-dimensional samples.
232
233     Contrary to one-dimensional data, multi-dimensional covariance matrix
234     contains off-diagonal elements which can be negative.
235
236     But, sum of weights is never negative, so it is tracked as a logarithm.
237
238     The code assumes every method gets arguments with correct dimensionality
239     as set in constructor. No checks are performed (for performance reasons).
240
241     TODO: Should we provide a subclass with the dimensionality checks?
242     """
243
244     def __init__(
245             self, dimension=2, log_sum_weight=None, averages=None,
246             covariance_matrix=None):
247         """Initialize new tracker instance, two-dimensional empty by default.
248
249         If any of latter two arguments is None, it means
250         the tracker state is invalid. Use reset method
251         to create empty tracker of constructed dimensionality.
252
253         :param dimension: Number of scalar components of samples.
254         :param log_sum_weight: Natural logarithm of sum of weights
255             of samples so far. Default: None (as log of 0.0).
256         :param averages: Weighted average of the samples.
257             Default: None
258         :param covariance_matrix: Variance matrix elements.
259             Default: None
260         :type log_sum_weight: float or None
261         :type averages: None or tuple of float
262         :type covariance_matrix: None or tuple of tuple of float
263         """
264         self.dimension = dimension
265         self.log_sum_weight = log_sum_weight
266         self.averages = averages
267         self.covariance_matrix = covariance_matrix
268
269     def __repr__(self):
270         """Return string, which interpreted constructs state of self.
271
272         :returns: Expression constructing an equivalent instance.
273         :rtype: str
274         """
275         return f"VectorStatTracker(dimension={self.dimension!r}," \
276             f"log_sum_weight={self.log_sum_weight!r}," \
277             f"averages={self.averages!r}," \
278             f"covariance_matrix={self.covariance_matrix!r})"
279
280     def copy(self):
281         """Return new instance with the same state as self.
282
283         The main usage is to simplify debugging. This method allows
284         to store the old state, while self continues to track towards new state.
285
286         :returns: Created tracker instance.
287         :rtype: VectorStatTracker
288         """
289         return VectorStatTracker(
290             self.dimension, self.log_sum_weight, self.averages[:],
291             copy.deepcopy(self.covariance_matrix)
292         )
293
294     def reset(self):
295         """Return state set to empty data of proper dimensionality.
296
297         :returns: Updated self.
298         :rtype: VectorStatTracker
299         """
300         self.averages = [0.0 for _ in range(self.dimension)]
301         # TODO: Examine whether we can gain speed by tracking triangle only.
302         self.covariance_matrix = [
303             [0.0 for _ in range(self.dimension)] for _ in range(self.dimension)
304         ]
305         # TODO: In Python3, list comprehensions are generators,
306         # so they are not indexable. Put list() when converting.
307         return self
308
309     def unit_reset(self):
310         """Reset state but use unit matric as covariance.
311
312         :returns: Updated self.
313         :rtype: VectorStatTracker
314         """
315         self.reset()
316         for index in range(self.dimension):
317             self.covariance_matrix[index][index] = 1.0
318         return self
319
320     def add_get_shift(self, vector_value, log_weight=0.0):
321         """Return shift and update state to addition of another sample.
322
323         Shift is the vector from old average to new sample.
324         For most callers, returning shift is more useful than returning self.
325
326         :param vector_value: The value of the sample.
327         :param log_weight: Natural logarithm of weight of the sample.
328             Default: 0.0 (as log of 1.0).
329         :type vector_value: iterable of float
330         :type log_weight: float
331         :returns: Shift vector
332         :rtype: list of float
333         """
334         dimension = self.dimension
335         old_log_sum_weight = self.log_sum_weight
336         old_averages = self.averages
337         if not old_averages:
338             shift = [0.0 for _ in range(dimension)]
339         else:
340             shift = [
341                 vector_value[index] - old_averages[index]
342                 for index in range(dimension)
343             ]
344         if old_log_sum_weight is None:
345             # First sample.
346             self.log_sum_weight = log_weight
347             self.averages = [vector_value[index] for index in range(dimension)]
348             # Not touching covariance matrix.
349             return shift
350         covariance_matrix = self.covariance_matrix
351         new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
352         data_ratio = math.exp(old_log_sum_weight - new_log_sum_weight)
353         sample_ratio = math.exp(log_weight - new_log_sum_weight)
354         new_averages = [
355             old_averages[index] + shift[index] * sample_ratio
356             for index in range(dimension)
357         ]
358         # It is easier to update covariance matrix in-place.
359         for second in range(dimension):
360             for first in range(dimension):
361                 element = covariance_matrix[first][second]
362                 element += shift[first] * shift[second] * sample_ratio
363                 element *= data_ratio
364                 covariance_matrix[first][second] = element
365         self.log_sum_weight = new_log_sum_weight
366         self.averages = new_averages
367         # self.covariance_matrix still points to the object we updated in-place.
368         return shift
369
370     # TODO: There are some uses for such a vector tracker,
371     # that does not track average, but weightest (latest if tied) value,
372     # and computes covariance matrix centered around that.
373     # But perhaps the following method would work similarly well?
374     def add_without_dominance_get_distance(self, vector_value, log_weight=0.0):
375         """Update stats, avoid having the sample dominate, return old distance.
376
377         If the weight of the incoming sample is far bigger
378         than the weight of all the previous data together,
379         covariance matrix would suffer from underflow.
380         To avoid that, this method manipulates both weights
381         before calling add().
382
383         The old covariance matrix (before update) can define a metric.
384         Shift is a vector, so the metric can be used to compute a distance
385         between old average and new sample.
386
387         :param vector_value: The value of the sample.
388         :param log_weight: Natural logarithm of weight of the sample.
389             Default: 0.0 (as log of 1.0).
390         :type vector_value: iterable of float
391         :type log_weight: float
392         :returns: Updated self.
393         :rtype: VectorStatTracker
394         """
395         lsw = self.log_sum_weight
396         if lsw is not None and lsw < log_weight - 1.0:
397             lsw = (lsw + log_weight) / 2.0
398             log_weight = lsw
399             self.log_sum_weight = lsw
400         old_metric = copy.deepcopy(self.covariance_matrix)
401         shift = self.add_get_shift(vector_value, log_weight)
402         gradient = numpy.linalg.solve(old_metric, shift)
403         distance = numpy.vdot(shift, gradient)
404         return distance