X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FPLRsearch%2FPLRsearch.py;h=b7c93443913ae2756f07c96536bccbb3d48083b9;hp=4205818d91769e7edf83099bbafdb3d2fd126dbb;hb=3d1e8a1fdfa0ec734adf7cc4370e955508882fdc;hpb=51511689eb9f93134878c314ba0349f28ef2ec4f diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py index 4205818d91..b7c9344391 100644 --- a/resources/libraries/python/PLRsearch/PLRsearch.py +++ b/resources/libraries/python/PLRsearch/PLRsearch.py @@ -43,6 +43,7 @@ class PLRsearch(object): Method othed than search (and than __init__) are just internal code structure. + TODO: Those method names should start with underscore then. """ @@ -162,7 +163,7 @@ class PLRsearch(object): :type min_rate: float :type max_rate: float :returns: Average and stdev of critical load estimate. - :rtype: 2-tuple of floats + :rtype: 2-tuple of float """ stop_time = time.time() + self.timeout min_rate = float(min_rate) @@ -174,7 +175,7 @@ class PLRsearch(object): focus_trackers = (None, None) transmit_rate = (min_rate + max_rate) / 2.0 lossy_loads = [max_rate] - zeros = [0, 0] # Cosecutive zero loss, separately for stretch and erf. + zeros = 0 # How many cosecutive zero loss results are happening. while 1: trial_number += 1 logging.info("Trial %(number)r", {"number": trial_number}) @@ -182,15 +183,14 @@ class PLRsearch(object): self.trial_duration_per_trial * trial_number, transmit_rate, trial_result_list, min_rate, max_rate, focus_trackers) measurement, average, stdev, avg1, avg2, focus_trackers = results - index = trial_number % 2 - zeros[index] += 1 + zeros += 1 # TODO: Ratio of fill rate to drain rate seems to have # exponential impact. Make it configurable, or is 4:3 good enough? if measurement.loss_fraction >= self.packet_loss_ratio_target: - for _ in range(4 * zeros[index]): + for _ in range(4 * zeros): lossy_loads.append(measurement.target_tr) if measurement.loss_count > 0: - zeros[index] = 0 + zeros = 0 lossy_loads.sort() if stop_time <= time.time(): return average, stdev @@ -201,20 +201,19 @@ class PLRsearch(object): next_load = (measurement.receive_rate / ( 1.0 - self.packet_loss_ratio_target)) else: - index = (trial_number + 1) % 2 - next_load = (avg1, avg2)[index] - if zeros[index] > 0: + next_load = (avg1 + avg2) / 2.0 + if zeros > 0: if lossy_loads[0] > next_load: - diminisher = math.pow(2.0, 1 - zeros[index]) + diminisher = math.pow(2.0, 1 - zeros) next_load = lossy_loads[0] + diminisher * next_load next_load /= (1.0 + diminisher) # On zero measurement, we need to drain obsoleted low losses # even if we did not use them to increase next_load, - # in order to get to usable loses with higher load. + # in order to get to usable loses at higher loads. if len(lossy_loads) > 3: lossy_loads = lossy_loads[3:] logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r", - {"z": zeros, "o": (avg1, avg2)[index], + {"z": zeros, "o": (avg1 + avg2) / 2.0, "n": next_load, "s": lossy_loads}) transmit_rate = min(max_rate, max(min_rate, next_load)) @@ -510,12 +509,12 @@ class PLRsearch(object): :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker :type max_samples: None or int :returns: Measurement and computation results. - :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers. + :rtype: _ComputeResult """ logging.debug( "measure_and_compute started with self %(self)r, trial_duration " - + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, " - + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r", + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, " + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r", {"self": self, "dur": trial_duration, "tr": transmit_rate, "trl": trial_result_list, "mr": max_rate, "track": focus_trackers, "ms": max_samples}) @@ -620,7 +619,7 @@ class PLRsearch(object): :type pipe: multiprocessing.Connection :returns: Computed value tracker, actual focus tracker, and number of samples used for this iteration. - :rtype: 3-tuple of tracker, tracker and int + :rtype: _PartialResult """ pipe.send(None) if not pipe.poll(10.0): @@ -660,23 +659,65 @@ class PLRsearch(object): @staticmethod def _get_result(measurement, stretch_result, erf_result): - """Collate results from measure_and_compute""" + """Process and collate results from measure_and_compute. + + Turn logarithm based values to exponential ones, + combine averages and stdevs of two fitting functions into a whole. + + :param measurement: The trial measurement obtained during computation. + :param stretch_result: Computation output for stretch fitting function. + :param erf_result: Computation output for erf fitting function. + :type measurement: ReceiveRateMeasurement + :type stretch_result: _PartialResult + :type erf_result: _PartialResult + :returns: Combined results. + :rtype: _ComputeResult + """ stretch_avg = stretch_result.value_tracker.average erf_avg = erf_result.value_tracker.average - # TODO: Take into account secondary stats. - stretch_stdev = math.exp(stretch_result.value_tracker.log_variance / 2) - erf_stdev = math.exp(erf_result.value_tracker.log_variance / 2) - avg = math.exp((stretch_avg + erf_avg) / 2.0) - var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0 - var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 - stdev = avg * math.sqrt(var) + stretch_var = stretch_result.value_tracker.get_pessimistic_variance() + erf_var = erf_result.value_tracker.get_pessimistic_variance() + avg_log = (stretch_avg + erf_avg) / 2.0 + var_log = (stretch_var + erf_var) / 2.0 + var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 + stdev_log = math.sqrt(var_log) + low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log) + avg = (low + upp) / 2 + stdev = avg - low trackers = (stretch_result.focus_tracker, erf_result.focus_tracker) sea = math.exp(stretch_avg) eea = math.exp(erf_avg) return _ComputeResult(measurement, avg, stdev, sea, eea, trackers) -_PartialResult = namedtuple('_PartialResult', - 'value_tracker focus_tracker samples') -_ComputeResult = namedtuple('_ComputeResult', 'measurement avg stdev ' + - 'stretch_exp_avg erf_exp_avg trackers') +# Named tuples, for multiple local variables to be passed as return value. +_PartialResult = namedtuple( + "_PartialResult", "value_tracker focus_tracker samples") +"""Two stat trackers and sample counter. + +:param value_tracker: Tracker for the value (critical load) being integrated. +:param focus_tracker: Tracker for focusing integration inputs (sample points). +:param samples: How many samples were used for the computation. +:type value_tracker: stat_trackers.ScalarDualStatTracker +:type focus_tracker: stat_trackers.VectorStatTracker +:type samples: int +""" + +_ComputeResult = namedtuple( + "_ComputeResult", + "measurement avg stdev stretch_exp_avg erf_exp_avg trackers") +"""Measurement, 4 computation result values, pair of trackers. + +:param measurement: The trial measurement result obtained during computation. +:param avg: Overall average of critical rate estimate. +:param stdev: Overall standard deviation of critical rate estimate. +:param stretch_exp_avg: Stretch fitting function estimate average exponentiated. +:param erf_exp_avg: Erf fitting function estimate average, exponentiated. +:param trackers: Pair of focus trackers to start next iteration with. +:type measurement: ReceiveRateMeasurement +:type avg: float +:type stdev: float +:type stretch_exp_avg: float +:type erf_exp_avg: float +:type trackers: 2-tuple of stat_trackers.VectorStatTracker +"""