feat(jumpavg): support small values via unit param
[csit.git] / resources / libraries / python / jumpavg / bit_counting_stats.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2022 Cisco and/or its affiliates.
+# Copyright (c) 2023 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -17,7 +17,7 @@ import dataclasses
 import math
 import typing
 
 import math
 import typing
 
-from .AvgStdevStats import AvgStdevStats
+from .avg_stdev_stats import AvgStdevStats
 
 
 @dataclasses.dataclass
 
 
 @dataclasses.dataclass
@@ -40,6 +40,8 @@ class BitCountingStats(AvgStdevStats):
     """Maximal sample value (real or estimated).
     Default value is there just for argument ordering reasons,
     leaving None leads to exceptions."""
     """Maximal sample value (real or estimated).
     Default value is there just for argument ordering reasons,
     leaving None leads to exceptions."""
+    unit: float = 1.0
+    """Typical resolution of the values."""
     prev_avg: typing.Optional[float] = None
     """Population average of the previous group (if any)."""
     bits: float = None
     prev_avg: typing.Optional[float] = None
     """Population average of the previous group (if any)."""
     bits: float = None
@@ -74,6 +76,8 @@ class BitCountingStats(AvgStdevStats):
             return
         if self.max_value <= 0.0:
             raise ValueError(f"Invalid max value: {self!r}")
             return
         if self.max_value <= 0.0:
             raise ValueError(f"Invalid max value: {self!r}")
+        max_value = self.max_value / self.unit
+        avg = self.avg / self.unit
         # Length of the sequence must be also counted in bits,
         # otherwise the message would not be decodable.
         # Model: probability of k samples is 1/k - 1/(k+1) == 1/k/(k+1)
         # Length of the sequence must be also counted in bits,
         # otherwise the message would not be decodable.
         # Model: probability of k samples is 1/k - 1/(k+1) == 1/k/(k+1)
@@ -82,36 +86,37 @@ class BitCountingStats(AvgStdevStats):
         if self.prev_avg is None:
             # Avg is considered to be uniformly distributed
             # from zero to max_value.
         if self.prev_avg is None:
             # Avg is considered to be uniformly distributed
             # from zero to max_value.
-            self.bits += math.log(self.max_value + 1.0, 2)
+            self.bits += math.log(max_value + 1, 2)
         else:
             # Opposite triangle distribution with minimum.
         else:
             # Opposite triangle distribution with minimum.
-            self.bits += math.log(
-                (self.max_value * (self.max_value + 1))
-                / (abs(self.avg - self.prev_avg) + 1),
-                2,
-            )
+            prev_avg = self.prev_avg / self.unit
+            norm = prev_avg * prev_avg
+            norm -= (prev_avg - 1) * max_value
+            norm += max_value * max_value / 2
+            self.bits -= math.log((abs(avg - prev_avg) + 1) / norm, 2)
         if self.size < 2:
             return
         if self.size < 2:
             return
+        stdev = self.stdev / self.unit
         # Stdev is considered to be uniformly distributed
         # from zero to max_value. That is quite a bad expectation,
         # but resilient to negative samples etc.
         # Stdev is considered to be uniformly distributed
         # from zero to max_value. That is quite a bad expectation,
         # but resilient to negative samples etc.
-        self.bits += math.log(self.max_value + 1.0, 2)
+        self.bits += math.log(max_value + 1, 2)
         # Now we know the samples lie on sphere in size-1 dimensions.
         # So it is (size-2)-sphere, with radius^2 == stdev^2 * size.
         # https://en.wikipedia.org/wiki/N-sphere
         sphere_area_ln = math.log(2)
         # Now we know the samples lie on sphere in size-1 dimensions.
         # So it is (size-2)-sphere, with radius^2 == stdev^2 * size.
         # https://en.wikipedia.org/wiki/N-sphere
         sphere_area_ln = math.log(2)
-        sphere_area_ln += math.log(math.pi) * ((self.size - 1) / 2.0)
-        sphere_area_ln -= math.lgamma((self.size - 1) / 2.0)
-        sphere_area_ln += math.log(self.stdev + 1.0) * (self.size - 2)
-        sphere_area_ln += math.log(self.size) * ((self.size - 2) / 2.0)
+        sphere_area_ln += math.log(math.pi) * ((self.size - 1) / 2)
+        sphere_area_ln -= math.lgamma((self.size - 1) / 2)
+        sphere_area_ln += math.log(stdev + 1) * (self.size - 2)
+        sphere_area_ln += math.log(self.size) * ((self.size - 2) / 2)
         self.bits += sphere_area_ln / math.log(2)
 
         self.bits += sphere_area_ln / math.log(2)
 
-    # TODO: Rename, so pylint stops complaining about signature change.
     @classmethod
     @classmethod
-    def for_runs(
+    def for_runs_and_params(
         cls,
         runs: typing.Iterable[typing.Union[float, AvgStdevStats]],
         max_value: float,
         cls,
         runs: typing.Iterable[typing.Union[float, AvgStdevStats]],
         max_value: float,
+        unit: float = 1.0,
         prev_avg: typing.Optional[float] = None,
     ):
         """Return new stats instance describing the sequence of runs.
         prev_avg: typing.Optional[float] = None,
     ):
         """Return new stats instance describing the sequence of runs.
@@ -131,9 +136,11 @@ class BitCountingStats(AvgStdevStats):
 
         :param runs: Sequence of data to describe by the new metadata.
         :param max_value: Maximal expected value.
 
         :param runs: Sequence of data to describe by the new metadata.
         :param max_value: Maximal expected value.
+        :param unit: Typical resolution of the values.
         :param prev_avg: Population average of the previous group, if any.
         :type runs: Iterable[Union[float, AvgStdevStats]]
         :type max_value: Union[float, NoneType]
         :param prev_avg: Population average of the previous group, if any.
         :type runs: Iterable[Union[float, AvgStdevStats]]
         :type max_value: Union[float, NoneType]
+        :type unit: float
         :type prev_avg: Union[float, NoneType]
         :returns: The new stats instance.
         :rtype: cls
         :type prev_avg: Union[float, NoneType]
         :returns: The new stats instance.
         :rtype: cls
@@ -144,6 +151,7 @@ class BitCountingStats(AvgStdevStats):
             avg=asd.avg,
             stdev=asd.stdev,
             max_value=max_value,
             avg=asd.avg,
             stdev=asd.stdev,
             max_value=max_value,
+            unit=unit,
             prev_avg=prev_avg,
         )
         return ret_obj
             prev_avg=prev_avg,
         )
         return ret_obj