Revert "fix(jobspec): Delete ipsec nfv density tests"
[csit.git] / resources / libraries / python / PLRsearch / stat_trackers.py
1 # Copyright (c) 2024 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module for tracking average and variance for data of weighted samples.
15
16 See log_plus for an explanation why None acts as a special case "float" number.
17
18 TODO: Current implementation sets zero averages on empty tracker.
19 Should we set None instead?
20
21 TODO: Implement __str__ and __repr__ for easier debugging.
22 """
23
24 import copy
25 import math
26
27 import numpy
28
29 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
30 # then switch to absolute imports within PLRsearch package.
31 # Current usage of relative imports is just a short term workaround.
32 from .log_plus import log_plus, safe_exp
33
34
35 class ScalarStatTracker:
36     """Class for tracking one-dimensional samples.
37
38     Variance of one-dimensional data cannot be negative,
39     so this class avoids possible underflows by tracking logarithm
40     of the variance instead.
41
42     Similarly, sum of weights is also tracked as a logarithm.
43     """
44
45     def __init__(self, log_sum_weight=None, average=0.0, log_variance=None):
46         """Initialize new tracker instance, empty by default.
47
48         :param log_sum_weight: Natural logarithm of sum of weights
49             of samples so far. Default: None (as log of 0.0).
50         :param average: Weighted average of the samples. Default: 0.0
51         :param log_variance: Natural logarithm of variance.
52             Default: None (as log of 0.0).
53         :type log_sum_weight: float or None
54         :type average: float
55         :type log_variance: float or None
56         """
57         self.log_sum_weight = log_sum_weight
58         self.average = average
59         self.log_variance = log_variance
60
61     def __repr__(self):
62         """Return string, which interpreted constructs state of self.
63
64         :returns: Expression constructing an equivalent instance.
65         :rtype: str
66         """
67         return (
68             f"ScalarStatTracker(log_sum_weight={self.log_sum_weight!r},"
69             f"average={self.average!r},log_variance={self.log_variance!r})"
70         )
71
72     def copy(self):
73         """Return new ScalarStatTracker instance with the same state as self.
74
75         The point of this method is the return type, instances of subclasses
76         can get rid of their additional data this way.
77
78         :returns: New instance with the same core state.
79         :rtype: ScalarStatTracker
80         """
81         return ScalarStatTracker(
82             self.log_sum_weight, self.average, self.log_variance
83         )
84
85     def add(self, scalar_value, log_weight=0.0):
86         """Return updated stats corresponding to addition of another sample.
87
88         :param scalar_value: The scalar value of the sample.
89         :param log_weight: Natural logarithm of weight of the sample.
90             Default: 0.0 (as log of 1.0).
91         :type scalar_value: float
92         :type log_weight: float
93         :returns: Updated self.
94         :rtype: ScalarStatTracker
95         """
96         old_log_sum_weight = self.log_sum_weight
97         if old_log_sum_weight is None:
98             # First sample.
99             self.log_sum_weight = log_weight
100             self.average = scalar_value
101             return self
102         old_average = self.average
103         log_variance = self.log_variance
104         new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
105         log_sample_ratio = log_weight - new_log_sum_weight
106         sample_ratio = math.exp(log_sample_ratio)
107         shift = scalar_value - old_average
108         new_average = old_average + shift * sample_ratio
109         # The log_variance is updated in-place (instead of new_ vs old_)
110         # because of if-s detecting None-s where the value does not change.
111         absolute_shift = abs(shift)
112         if absolute_shift > 0.0:
113             log_square_shift = 2 * math.log(absolute_shift)
114             log_variance = log_plus(
115                 log_variance, log_square_shift + log_sample_ratio
116             )
117         if log_variance is not None:
118             log_variance += old_log_sum_weight - new_log_sum_weight
119         self.log_sum_weight = new_log_sum_weight
120         self.average = new_average
121         self.log_variance = log_variance
122         return self
123
124
125 class ScalarDualStatTracker(ScalarStatTracker):
126     """Class for tracking one-dimensional samples, offering dual stats.
127
128     It means that instead of just primary stats (identical to what
129     ScalarStatTracker offers, that is why this is its subclass),
130     this tracker allso offers secondary stats.
131     Secondary stats are scalar stats that track the same data,
132     except that the most weighty (or later if tied) sample is not considered.
133
134     Users can analyze how do secondary stats differ from the primary ones,
135     and based on that decide whether they processed "enough" data.
136     One typical use is for Monte Carlo integrator to decide whether
137     the partial sums so far are reliable enough.
138     """
139
140     def __init__(
141         self,
142         log_sum_weight=None,
143         average=0.0,
144         log_variance=None,
145         log_sum_secondary_weight=None,
146         secondary_average=0.0,
147         log_secondary_variance=None,
148         max_log_weight=None,
149     ):
150         """Initialize new tracker instance, empty by default.
151
152         :param log_sum_weight: Natural logarithm of sum of weights
153             of samples so far. Default: None (as log of 0.0).
154         :param average: Weighted average of the samples. Default: 0.0
155         :param log_variance: Natural logarithm of variance.
156             Default: None (as log of 0.0).
157         :param log_sum_secondary_weight: Natural logarithm of sum of weights
158             of samples so far except weightest one.
159             Default: None (as log of 0.0).
160         :param secondary_average: Weighted average of the samples
161             except the weightest one. Default: 0.0
162         :param log_variance: Natural logarithm of variance od samples
163             except the weightest one. Default: None (as log of 0.0).
164         :param max_log_weight: Natural logarithm of weight of sample
165             counted in primary but not secondary stats.
166             Default: None (as log of 0.0).
167         :type log_sum_weight: float or None
168         :type average: float
169         :type log_variance: float or None
170         :type log_sum_secondary_weight: float or None
171         :type secondary_average: float
172         :type log_secondary_variance: float or None
173         :type max_log_weight: float or None
174         """
175         # Not using super() as the constructor signature is different,
176         # so in case of diamond inheritance mismatch would be probable.
177         ScalarStatTracker.__init__(self, log_sum_weight, average, log_variance)
178         self.secondary = ScalarStatTracker(
179             log_sum_secondary_weight, secondary_average, log_secondary_variance
180         )
181         self.max_log_weight = max_log_weight
182
183     def __repr__(self):
184         """Return string, which interpreted constructs state of self.
185
186         :returns: Expression contructing an equivalent instance.
187         :rtype: str
188         """
189         sec = self.secondary
190         return (
191             f"ScalarDualStatTracker(log_sum_weight={self.log_sum_weight!r},"
192             f"average={self.average!r},log_variance={self.log_variance!r},"
193             f"log_sum_secondary_weight={sec.log_sum_weight!r},"
194             f"secondary_average={sec.average!r},"
195             f"log_secondary_variance={sec.log_variance!r},"
196             f"max_log_weight={self.max_log_weight!r})"
197         )
198
199     def add(self, scalar_value, log_weight=0.0):
200         """Return updated both stats after addition of another sample.
201
202         :param scalar_value: The scalar value of the sample.
203         :param log_weight: Natural logarithm of weight of the sample.
204             Default: 0.0 (as log of 1.0).
205         :type scalar_value: float
206         :type log_weight: float
207         :returns: Updated self.
208         :rtype: ScalarDualStatTracker
209         """
210         # Using super() as copy() and add() are not expected to change
211         # signature, so this way diamond inheritance will be supported.
212         primary = super()
213         if self.max_log_weight is None or log_weight >= self.max_log_weight:
214             self.max_log_weight = log_weight
215             self.secondary = primary.copy()
216         else:
217             self.secondary.add(scalar_value, log_weight)
218         primary.add(scalar_value, log_weight)
219         return self
220
221     def get_pessimistic_variance(self):
222         """Return estimate of variance reflecting weight effects.
223
224         Typical scenario is the primary tracker dominated by a single sample.
225         In worse case, secondary tracker is also dominated by
226         a single (but different) sample.
227
228         Current implementation simply returns variance of average
229         of the two trackers, as if they were independent.
230
231         :returns: Pessimistic estimate of variance (not stdev, no log).
232         :rtype: float
233         """
234         var_primary = safe_exp(self.log_variance)
235         var_secondary = safe_exp(self.secondary.log_variance)
236         var_combined = (var_primary + var_secondary) / 2
237         avg_half_diff = (self.average - self.secondary.average) / 2
238         var_combined += avg_half_diff * avg_half_diff
239         return var_combined
240
241
242 class VectorStatTracker:
243     """Class for tracking multi-dimensional samples.
244
245     Contrary to one-dimensional data, multi-dimensional covariance matrix
246     contains off-diagonal elements which can be negative.
247
248     But, sum of weights is never negative, so it is tracked as a logarithm.
249
250     The code assumes every method gets arguments with correct dimensionality
251     as set in constructor. No checks are performed (for performance reasons).
252
253     TODO: Should we provide a subclass with the dimensionality checks?
254     """
255
256     def __init__(
257         self,
258         dimension=2,
259         log_sum_weight=None,
260         averages=None,
261         covariance_matrix=None,
262     ):
263         """Initialize new tracker instance, two-dimensional empty by default.
264
265         If any of latter two arguments is None, it means
266         the tracker state is invalid. Use reset method
267         to create empty tracker of constructed dimensionality.
268
269         :param dimension: Number of scalar components of samples.
270         :param log_sum_weight: Natural logarithm of sum of weights
271             of samples so far. Default: None (as log of 0.0).
272         :param averages: Weighted average of the samples.
273             Default: None
274         :param covariance_matrix: Variance matrix elements.
275             Default: None
276         :type log_sum_weight: float or None
277         :type averages: None or tuple of float
278         :type covariance_matrix: None or tuple of tuple of float
279         """
280         self.dimension = dimension
281         self.log_sum_weight = log_sum_weight
282         self.averages = averages
283         self.covariance_matrix = covariance_matrix
284
285     def __repr__(self):
286         """Return string, which interpreted constructs state of self.
287
288         :returns: Expression constructing an equivalent instance.
289         :rtype: str
290         """
291         return (
292             f"VectorStatTracker(dimension={self.dimension!r},"
293             f"log_sum_weight={self.log_sum_weight!r},"
294             f"averages={self.averages!r},"
295             f"covariance_matrix={self.covariance_matrix!r})"
296         )
297
298     def copy(self):
299         """Return new instance with the same state as self.
300
301         The main usage is to simplify debugging. This method allows
302         to store the old state, while self continues to track towards new state.
303
304         :returns: Created tracker instance.
305         :rtype: VectorStatTracker
306         """
307         return VectorStatTracker(
308             self.dimension,
309             self.log_sum_weight,
310             self.averages[:],
311             copy.deepcopy(self.covariance_matrix),
312         )
313
314     def reset(self):
315         """Return state set to empty data of proper dimensionality.
316
317         :returns: Updated self.
318         :rtype: VectorStatTracker
319         """
320         self.averages = [0.0 for _ in range(self.dimension)]
321         # TODO: Examine whether we can gain speed by tracking triangle only.
322         self.covariance_matrix = [
323             [0.0 for _ in range(self.dimension)] for _ in range(self.dimension)
324         ]
325         # TODO: In Python3, list comprehensions are generators,
326         # so they are not indexable. Put list() when converting.
327         return self
328
329     def unit_reset(self):
330         """Reset state but use unit matric as covariance.
331
332         :returns: Updated self.
333         :rtype: VectorStatTracker
334         """
335         self.reset()
336         for index in range(self.dimension):
337             self.covariance_matrix[index][index] = 1.0
338         return self
339
340     def add_get_shift(self, vector_value, log_weight=0.0):
341         """Return shift and update state to addition of another sample.
342
343         Shift is the vector from old average to new sample.
344         For most callers, returning shift is more useful than returning self.
345
346         :param vector_value: The value of the sample.
347         :param log_weight: Natural logarithm of weight of the sample.
348             Default: 0.0 (as log of 1.0).
349         :type vector_value: iterable of float
350         :type log_weight: float
351         :returns: Shift vector
352         :rtype: list of float
353         """
354         dimension = self.dimension
355         old_log_sum_weight = self.log_sum_weight
356         old_averages = self.averages
357         if not old_averages:
358             shift = [0.0 for _ in range(dimension)]
359         else:
360             shift = [
361                 vector_value[index] - old_averages[index]
362                 for index in range(dimension)
363             ]
364         if old_log_sum_weight is None:
365             # First sample.
366             self.log_sum_weight = log_weight
367             self.averages = [vector_value[index] for index in range(dimension)]
368             # Not touching covariance matrix.
369             return shift
370         covariance_matrix = self.covariance_matrix
371         new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
372         data_ratio = math.exp(old_log_sum_weight - new_log_sum_weight)
373         sample_ratio = math.exp(log_weight - new_log_sum_weight)
374         new_averages = [
375             old_averages[index] + shift[index] * sample_ratio
376             for index in range(dimension)
377         ]
378         # It is easier to update covariance matrix in-place.
379         for second in range(dimension):
380             for first in range(dimension):
381                 element = covariance_matrix[first][second]
382                 element += shift[first] * shift[second] * sample_ratio
383                 element *= data_ratio
384                 covariance_matrix[first][second] = element
385         self.log_sum_weight = new_log_sum_weight
386         self.averages = new_averages
387         # self.covariance_matrix still points to the object we updated in-place.
388         return shift
389
390     # TODO: There are some uses for such a vector tracker,
391     # that does not track average, but weightest (latest if tied) value,
392     # and computes covariance matrix centered around that.
393     # But perhaps the following method would work similarly well?
394     def add_without_dominance_get_distance(self, vector_value, log_weight=0.0):
395         """Update stats, avoid having the sample dominate, return old distance.
396
397         If the weight of the incoming sample is far bigger
398         than the weight of all the previous data together,
399         covariance matrix would suffer from underflow.
400         To avoid that, this method manipulates both weights
401         before calling add().
402
403         The old covariance matrix (before update) can define a metric.
404         Shift is a vector, so the metric can be used to compute a distance
405         between old average and new sample.
406
407         :param vector_value: The value of the sample.
408         :param log_weight: Natural logarithm of weight of the sample.
409             Default: 0.0 (as log of 1.0).
410         :type vector_value: iterable of float
411         :type log_weight: float
412         :returns: Updated self.
413         :rtype: VectorStatTracker
414         """
415         lsw = self.log_sum_weight
416         if lsw is not None and lsw < log_weight - 1.0:
417             lsw = (lsw + log_weight) / 2.0
418             log_weight = lsw
419             self.log_sum_weight = lsw
420         old_metric = copy.deepcopy(self.covariance_matrix)
421         shift = self.add_get_shift(vector_value, log_weight)
422         gradient = numpy.linalg.solve(old_metric, shift)
423         distance = numpy.vdot(shift, gradient)
424         return distance