feat(jumpavg): support small values via unit param
[csit.git] / resources / libraries / python / jumpavg / bit_counting_stats.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2022 Cisco and/or its affiliates.
+# Copyright (c) 2023 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -17,7 +17,7 @@ import dataclasses
 import math
 import typing
 
-from .AvgStdevStats import AvgStdevStats
+from .avg_stdev_stats import AvgStdevStats
 
 
 @dataclasses.dataclass
@@ -40,6 +40,8 @@ class BitCountingStats(AvgStdevStats):
     """Maximal sample value (real or estimated).
     Default value is there just for argument ordering reasons,
     leaving None leads to exceptions."""
+    unit: float = 1.0
+    """Typical resolution of the values."""
     prev_avg: typing.Optional[float] = None
     """Population average of the previous group (if any)."""
     bits: float = None
@@ -74,6 +76,8 @@ class BitCountingStats(AvgStdevStats):
             return
         if self.max_value <= 0.0:
             raise ValueError(f"Invalid max value: {self!r}")
+        max_value = self.max_value / self.unit
+        avg = self.avg / self.unit
         # Length of the sequence must be also counted in bits,
         # otherwise the message would not be decodable.
         # Model: probability of k samples is 1/k - 1/(k+1) == 1/k/(k+1)
@@ -82,36 +86,37 @@ class BitCountingStats(AvgStdevStats):
         if self.prev_avg is None:
             # Avg is considered to be uniformly distributed
             # from zero to max_value.
-            self.bits += math.log(self.max_value + 1.0, 2)
+            self.bits += math.log(max_value + 1, 2)
         else:
             # Opposite triangle distribution with minimum.
-            self.bits += math.log(
-                (self.max_value * (self.max_value + 1))
-                / (abs(self.avg - self.prev_avg) + 1),
-                2,
-            )
+            prev_avg = self.prev_avg / self.unit
+            norm = prev_avg * prev_avg
+            norm -= (prev_avg - 1) * max_value
+            norm += max_value * max_value / 2
+            self.bits -= math.log((abs(avg - prev_avg) + 1) / norm, 2)
         if self.size < 2:
             return
+        stdev = self.stdev / self.unit
         # Stdev is considered to be uniformly distributed
         # from zero to max_value. That is quite a bad expectation,
         # but resilient to negative samples etc.
-        self.bits += math.log(self.max_value + 1.0, 2)
+        self.bits += math.log(max_value + 1, 2)
         # Now we know the samples lie on sphere in size-1 dimensions.
         # So it is (size-2)-sphere, with radius^2 == stdev^2 * size.
         # https://en.wikipedia.org/wiki/N-sphere
         sphere_area_ln = math.log(2)
-        sphere_area_ln += math.log(math.pi) * ((self.size - 1) / 2.0)
-        sphere_area_ln -= math.lgamma((self.size - 1) / 2.0)
-        sphere_area_ln += math.log(self.stdev + 1.0) * (self.size - 2)
-        sphere_area_ln += math.log(self.size) * ((self.size - 2) / 2.0)
+        sphere_area_ln += math.log(math.pi) * ((self.size - 1) / 2)
+        sphere_area_ln -= math.lgamma((self.size - 1) / 2)
+        sphere_area_ln += math.log(stdev + 1) * (self.size - 2)
+        sphere_area_ln += math.log(self.size) * ((self.size - 2) / 2)
         self.bits += sphere_area_ln / math.log(2)
 
-    # TODO: Rename, so pylint stops complaining about signature change.
     @classmethod
-    def for_runs(
+    def for_runs_and_params(
         cls,
         runs: typing.Iterable[typing.Union[float, AvgStdevStats]],
         max_value: float,
+        unit: float = 1.0,
         prev_avg: typing.Optional[float] = None,
     ):
         """Return new stats instance describing the sequence of runs.
@@ -131,9 +136,11 @@ class BitCountingStats(AvgStdevStats):
 
         :param runs: Sequence of data to describe by the new metadata.
         :param max_value: Maximal expected value.
+        :param unit: Typical resolution of the values.
         :param prev_avg: Population average of the previous group, if any.
         :type runs: Iterable[Union[float, AvgStdevStats]]
         :type max_value: Union[float, NoneType]
+        :type unit: float
         :type prev_avg: Union[float, NoneType]
         :returns: The new stats instance.
         :rtype: cls
@@ -144,6 +151,7 @@ class BitCountingStats(AvgStdevStats):
             avg=asd.avg,
             stdev=asd.stdev,
             max_value=max_value,
+            unit=unit,
             prev_avg=prev_avg,
         )
         return ret_obj