feat(jumpavg): speed up, use Python 3.8 features
[csit.git] / resources / libraries / python / jumpavg / classify.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module holding the classify function
15
16 Classification os one of primary purposes of this package.
17
18 Minimal message length principle is used
19 for grouping results into the list of groups,
20 assuming each group is a population of different Gaussian distribution.
21 """
22
23 import typing
24
25 from .AvgStdevStats import AvgStdevStats
26 from .BitCountingGroupList import BitCountingGroupList
27
28
29 def classify(
30     values: typing.Iterable[typing.Union[float, typing.Iterable[float]]]
31 ) -> BitCountingGroupList:
32     """Return the values in groups of optimal bit count.
33
34     Here, a value is either a float, or an iterable of floats.
35     Such iterables represent an undivisible sequence of floats.
36     Int is accepted anywhere instead of float.
37
38     Internally, such sequence is replaced by AvgStdevStats
39     after maximal value is found.
40
41     :param values: Sequence of runs to classify.
42     :type values: Iterable[Union[float, Iterable[float]]]
43     :returns: Classified group list.
44     :rtype: BitCountingGroupList
45     """
46     processed_values = list()
47     max_value = 0.0
48     for value in values:
49         if isinstance(value, (float, int)):
50             if value > max_value:
51                 max_value = value
52             processed_values.append(value)
53         else:
54             for subvalue in value:
55                 if subvalue > max_value:
56                     max_value = subvalue
57             processed_values.append(AvgStdevStats.for_runs(value))
58     # Glist means group list (BitCountingGroupList).
59     open_glists = list()
60     record_glist = BitCountingGroupList(max_value=max_value)
61     for value in processed_values:
62         new_open_glist = record_glist.copy_fast().append_group_of_runs([value])
63         record_glist = new_open_glist
64         for old_open_glist in open_glists:
65             old_open_glist.append_run_to_to_last_group(value)
66             if old_open_glist.bits < record_glist.bits:
67                 record_glist = old_open_glist
68         open_glists.append(new_open_glist)
69     previous_average = record_glist[0].stats.avg
70     for group in record_glist:
71         if group.stats.avg == previous_average:
72             group.comment = "normal"
73         elif group.stats.avg < previous_average:
74             group.comment = "regression"
75         elif group.stats.avg > previous_average:
76             group.comment = "progression"
77         previous_average = group.stats.avg
78     return record_glist