Improve PLRsearch yet again
[csit.git] / resources / libraries / python / PLRsearch / stat_trackers.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module for tracking average and variance for data of weighted samples.
15
16 See log_plus for an explanation why None acts as a special case "float" number.
17
18 TODO: Current implementation sets zero averages on empty tracker.
19 Should we set None instead?
20
21 TODO: Implement __str__ and __repr__ for easier debugging.
22 """
23
24 import copy
25 import math
26
27 import numpy
28
29 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
30 # then switch to absolute imports within PLRsearch package.
31 # Current usage of relative imports is just a short term workaround.
32 from .log_plus import log_plus, safe_exp
33
34
35 class ScalarStatTracker(object):
36     """Class for tracking one-dimensional samples.
37
38     Variance of one-dimensional data cannot be negative,
39     so this class avoids possible underflows by tracking logarithm
40     of the variance instead.
41
42     Similarly, sum of weights is also tracked as a logarithm.
43     """
44
45     def __init__(self, log_sum_weight=None, average=0.0, log_variance=None):
46         """Initialize new tracker instance, empty by default.
47
48         :param log_sum_weight: Natural logarithm of sum of weights
49             of samples so far. Default: None (as log of 0.0).
50         :param average: Weighted average of the samples. Default: 0.0
51         :param log_variance: Natural logarithm of variance.
52             Default: None (as log of 0.0).
53         :type log_sum_weight: float or None
54         :type average: float
55         :type log_variance: float or None
56         """
57         self.log_sum_weight = log_sum_weight
58         self.average = average
59         self.log_variance = log_variance
60
61     def __repr__(self):
62         """Return string, which interpreted constructs state of self.
63
64         :returns: Expression contructing an equivalent instance.
65         :rtype: str
66         """
67         return ("ScalarStatTracker(log_sum_weight={lsw!r},average={a!r},"
68                 "log_variance={lv!r})".format(
69                     lsw=self.log_sum_weight, a=self.average,
70                     lv=self.log_variance))
71
72     def copy(self):
73         """Return new ScalarStatTracker instance with the same state as self.
74
75         The point of this method is the return type, instances of subclasses
76         can get rid of their additional data this way.
77
78         :returns: New instance with the same core state.
79         :rtype: ScalarStatTracker
80         """
81         return ScalarStatTracker(
82             self.log_sum_weight, self.average, self.log_variance)
83
84     def add(self, scalar_value, log_weight=0.0):
85         """Return updated stats corresponding to addition of another sample.
86
87         :param scalar_value: The scalar value of the sample.
88         :param log_weight: Natural logarithm of weight of the sample.
89             Default: 0.0 (as log of 1.0).
90         :type scalar_value: float
91         :type log_weight: float
92         :returns: Updated self.
93         :rtype: ScalarStatTracker
94         """
95         old_log_sum_weight = self.log_sum_weight
96         if old_log_sum_weight is None:
97             # First sample.
98             self.log_sum_weight = log_weight
99             self.average = scalar_value
100             return self
101         old_average = self.average
102         log_variance = self.log_variance
103         new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
104         log_sample_ratio = log_weight - new_log_sum_weight
105         sample_ratio = math.exp(log_sample_ratio)
106         shift = scalar_value - old_average
107         new_average = old_average + shift * sample_ratio
108         # The log_variance is updated in-place (instead of new_ vs old_)
109         # because of if-s detecting None-s where the value does not change.
110         absolute_shift = abs(shift)
111         if absolute_shift > 0.0:
112             log_square_shift = 2 * math.log(absolute_shift)
113             log_variance = log_plus(
114                 log_variance, log_square_shift + log_sample_ratio)
115         if log_variance is not None:
116             log_variance += old_log_sum_weight - new_log_sum_weight
117         self.log_sum_weight = new_log_sum_weight
118         self.average = new_average
119         self.log_variance = log_variance
120         return self
121
122
123 class ScalarDualStatTracker(ScalarStatTracker):
124     """Class for tracking one-dimensional samples, offering dual stats.
125
126     It means that instead of just primary stats (identical to what
127     ScalarStatTracker offers, that is why this is its subclass),
128     this tracker allso offers secondary stats.
129     Secondary stats are scalar stats that track the same data,
130     except that the most weighty (or later if tied) sample is not considered.
131
132     Users can analyze how do secondary stats differ from the primary ones,
133     and based on that decide whether they processed "enough" data.
134     One typical use is for Monte Carlo integrator to decide whether
135     the partial sums so far are reliable enough.
136     """
137
138     def __init__(
139             self, log_sum_weight=None, average=0.0, log_variance=None,
140             log_sum_secondary_weight=None, secondary_average=0.0,
141             log_secondary_variance=None, max_log_weight=None):
142         """Initialize new tracker instance, empty by default.
143
144         :param log_sum_weight: Natural logarithm of sum of weights
145             of samples so far. Default: None (as log of 0.0).
146         :param average: Weighted average of the samples. Default: 0.0
147         :param log_variance: Natural logarithm of variance.
148             Default: None (as log of 0.0).
149         :param log_sum_secondary_weight: Natural logarithm of sum of weights
150             of samples so far except weightest one.
151             Default: None (as log of 0.0).
152         :param secondary_average: Weighted average of the samples
153             except the weightest one. Default: 0.0
154         :param log_variance: Natural logarithm of variance od samples
155             except the weightest one. Default: None (as log of 0.0).
156         :param max_log_weight: Natural logarithm of weight of sample
157             counted in primary but not secondary stats.
158             Default: None (as log of 0.0).
159         :type log_sum_weight: float or None
160         :type average: float
161         :type log_variance: float or None
162         :type log_sum_secondary_weight: float or None
163         :type secondary_average: float
164         :type log_secondary_variance: float or None
165         :type max_log_weight: float or None
166         """
167         # Not using super() as the constructor signature is different,
168         # so in case of diamond inheritance mismatch would be probable.
169         ScalarStatTracker.__init__(self, log_sum_weight, average, log_variance)
170         self.secondary = ScalarStatTracker(
171             log_sum_secondary_weight, secondary_average, log_secondary_variance)
172         self.max_log_weight = max_log_weight
173
174     def __repr__(self):
175         """Return string, which interpreted constructs state of self.
176
177         :returns: Expression contructing an equivalent instance.
178         :rtype: str
179         """
180         sec = self.secondary
181         return (
182             "ScalarDualStatTracker(log_sum_weight={lsw!r},average={a!r},"
183             "log_variance={lv!r},log_sum_secondary_weight={lssw!r},"
184             "secondary_average={sa!r},log_secondary_variance={lsv!r},"
185             "max_log_weight={mlw!r})".format(
186                 lsw=self.log_sum_weight, a=self.average, lv=self.log_variance,
187                 lssw=sec.log_sum_weight, sa=sec.average, lsv=sec.log_variance,
188                 mlw=self.max_log_weight))
189
190     def add(self, scalar_value, log_weight=0.0):
191         """Return updated both stats after addition of another sample.
192
193         :param scalar_value: The scalar value of the sample.
194         :param log_weight: Natural logarithm of weight of the sample.
195             Default: 0.0 (as log of 1.0).
196         :type scalar_value: float
197         :type log_weight: float
198         :returns: Updated self.
199         :rtype: ScalarDualStatTracker
200         """
201         # Using super() as copy() and add() are not expected to change
202         # signature, so this way diamond inheritance will be supported.
203         primary = super(ScalarDualStatTracker, self)
204         if self.max_log_weight is None or log_weight >= self.max_log_weight:
205             self.max_log_weight = log_weight
206             self.secondary = primary.copy()
207         else:
208             self.secondary.add(scalar_value, log_weight)
209         primary.add(scalar_value, log_weight)
210         return self
211
212
213     def get_pessimistic_variance(self):
214         """Return estimate of variance reflecting weight effects.
215
216         Typical scenario is the primary tracker dominated by a single sample.
217         In worse case, secondary tracker is also dominated by
218         a single (but different) sample.
219
220         Current implementation simply returns variance of average
221         of the two trackers, as if they were independent.
222
223         :returns: Pessimistic estimate of variance (not stdev, no log).
224         :rtype: float
225         """
226         var_primary = safe_exp(self.log_variance)
227         var_secondary = safe_exp(self.secondary.log_variance)
228         var_combined = (var_primary + var_secondary) / 2
229         avg_half_diff = (self.average - self.secondary.average) / 2
230         var_combined += avg_half_diff * avg_half_diff
231         return var_combined
232
233
234 class VectorStatTracker(object):
235     """Class for tracking multi-dimensional samples.
236
237     Contrary to one-dimensional data, multi-dimensional covariance matrix
238     contains off-diagonal elements which can be negative.
239
240     But, sum of weights is never negative, so it is tracked as a logarithm.
241
242     The code assumes every method gets arguments with correct dimensionality
243     as set in constructor. No checks are performed (for performance reasons).
244
245     TODO: Should we provide a subclass with the dimensionality checks?
246     """
247
248     def __init__(
249             self, dimension=2, log_sum_weight=None, averages=None,
250             covariance_matrix=None):
251         """Initialize new tracker instance, two-dimenstional empty by default.
252
253         If any of latter two arguments is None, it means
254         the tracker state is invalid. Use reset method
255         to create empty tracker of constructed dimentionality.
256
257         :param dimension: Number of scalar components of samples.
258         :param log_sum_weight: Natural logarithm of sum of weights
259             of samples so far. Default: None (as log of 0.0).
260         :param averages: Weighted average of the samples.
261             Default: None
262         :param covariance_matrix: Variance matrix elements.
263             Default: None
264         :type log_sum_weight: float or None
265         :type averages: None or tuple of float
266         :type covariance_matrix: None or tuple of tuple of float
267         """
268         self.dimension = dimension
269         self.log_sum_weight = log_sum_weight
270         self.averages = averages
271         self.covariance_matrix = covariance_matrix
272
273     def __repr__(self):
274         """Return string, which interpreted constructs state of self.
275
276         :returns: Expression contructing an equivalent instance.
277         :rtype: str
278         """
279         return (
280             "VectorStatTracker(dimension={d!r},log_sum_weight={lsw!r},"
281             "averages={a!r},covariance_matrix={cm!r})".format(
282                 d=self.dimension, lsw=self.log_sum_weight, a=self.averages,
283                 cm=self.covariance_matrix))
284
285     def copy(self):
286         """Return new instance with the same state as self.
287
288         The main usage is to simplify debugging. This method allows
289         to store the old state, while self continues to track towards new state.
290
291         :returns: Created tracker instance.
292         :rtype: VectorStatTracker
293         """
294         return VectorStatTracker(
295             self.dimension, self.log_sum_weight, self.averages[:],
296             copy.deepcopy(self.covariance_matrix))
297
298     def reset(self):
299         """Return state set to empty data of proper dimensionality.
300
301         :returns: Updated self.
302         :rtype: VectorStatTracker
303         """
304         self.averages = [0.0 for _ in range(self.dimension)]
305         # TODO: Examine whether we can gain speed by tracking triangle only.
306         self.covariance_matrix = [[0.0 for _ in range(self.dimension)]
307                                   for _ in range(self.dimension)]
308         # TODO: In Python3, list comprehensions are generators,
309         # so they are not indexable. Put list() when converting.
310         return self
311
312     def unit_reset(self):
313         """Reset state but use unit matric as covariance.
314
315         :returns: Updated self.
316         :rtype: VectorStatTracker
317         """
318         self.reset()
319         for index in range(self.dimension):
320             self.covariance_matrix[index][index] = 1.0
321         return self
322
323     def add_get_shift(self, vector_value, log_weight=0.0):
324         """Return shift and update state to addition of another sample.
325
326         Shift is the vector from old average to new sample.
327         For most callers, returning shift is more useful than returning self.
328
329         :param vector_value: The value of the sample.
330         :param log_weight: Natural logarithm of weight of the sample.
331             Default: 0.0 (as log of 1.0).
332         :type vector_value: iterable of float
333         :type log_weight: float
334         :returns: Shift vector
335         :rtype: list of float
336         """
337         dimension = self.dimension
338         old_log_sum_weight = self.log_sum_weight
339         old_averages = self.averages
340         if not old_averages:
341             shift = [0.0 for index in range(dimension)]
342         else:
343             shift = [vector_value[index] - old_averages[index]
344                      for index in range(dimension)]
345         if old_log_sum_weight is None:
346             # First sample.
347             self.log_sum_weight = log_weight
348             self.averages = [vector_value[index] for index in range(dimension)]
349             # Not touching covariance matrix.
350             return shift
351         covariance_matrix = self.covariance_matrix
352         new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
353         data_ratio = math.exp(old_log_sum_weight - new_log_sum_weight)
354         sample_ratio = math.exp(log_weight - new_log_sum_weight)
355         new_averages = [old_averages[index] + shift[index] * sample_ratio
356                         for index in range(dimension)]
357         # It is easier to update covariance matrix in-place.
358         for second in range(dimension):
359             for first in range(dimension):
360                 element = covariance_matrix[first][second]
361                 element += shift[first] * shift[second] * sample_ratio
362                 element *= data_ratio
363                 covariance_matrix[first][second] = element
364         self.log_sum_weight = new_log_sum_weight
365         self.averages = new_averages
366         # self.covariance_matrix still points to the object we updated in-place.
367         return shift
368
369     # TODO: There are some uses for such a vector tracker,
370     # that does not track average, but weightest (latest if tied) value,
371     # and computes covariance matrix centered around that.
372     # But perhaps the following method would work similarly well?
373     def add_without_dominance_get_distance(self, vector_value, log_weight=0.0):
374         """Update stats, avoid having the sample dominate, return old distance.
375
376         If the weight of the incoming sample is far bigger
377         than the weight of all the previous data together,
378         convariance matrix would suffer from underflows.
379         To avoid that, this method manipulates both weights
380         before calling add().
381
382         The old covariance matrix (before update) can define a metric.
383         Shift is a vector, so the metric can be used to compute a distance
384         between old average and new sample.
385
386         :param vector_value: The value of the sample.
387         :param log_weight: Natural logarithm of weight of the sample.
388             Default: 0.0 (as log of 1.0).
389         :type vector_value: iterable of float
390         :type log_weight: float
391         :returns: Updated self.
392         :rtype: VectorStatTracker
393         """
394         lsw = self.log_sum_weight
395         if lsw is not None and lsw < log_weight - 1.0:
396             lsw = (lsw + log_weight) / 2.0
397             log_weight = lsw
398             self.log_sum_weight = lsw
399         old_metric = copy.deepcopy(self.covariance_matrix)
400         shift = self.add_get_shift(vector_value, log_weight)
401         gradient = numpy.linalg.solve(old_metric, shift)
402         distance = numpy.vdot(shift, gradient)
403         return distance