1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """General purpose utilities.
17 import multiprocessing
23 from os import walk, makedirs, environ
24 from os.path import join, isdir
25 from shutil import move, Error
28 from errors import PresentationError
32 """Calculate mean value from the items.
34 :param items: Mean value is calculated from these items.
40 return float(sum(items)) / len(items)
44 """Calculate stdev from the items.
46 :param items: Stdev is calculated from these items.
53 variance = [(x - avg) ** 2 for x in items]
54 stddev = sqrt(mean(variance))
58 def relative_change(nr1, nr2):
59 """Compute relative change of two values.
61 :param nr1: The first number.
62 :param nr2: The second number.
65 :returns: Relative change of nr1.
69 return float(((nr2 - nr1) / nr1) * 100)
72 def remove_outliers(input_list, outlier_const=1.5, window=14):
73 """Return list with outliers removed, using split_outliers.
75 :param input_list: Data from which the outliers will be removed.
76 :param outlier_const: Outlier constant.
77 :param window: How many preceding values to take into account.
78 :type input_list: list of floats
79 :type outlier_const: float
81 :returns: The input list without outliers.
82 :rtype: list of floats
85 data = np.array(input_list)
86 upper_quartile = np.percentile(data, 75)
87 lower_quartile = np.percentile(data, 25)
88 iqr = (upper_quartile - lower_quartile) * outlier_const
89 quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
92 if quartile_set[0] <= y <= quartile_set[1]:
97 def split_outliers(input_series, outlier_const=1.5, window=14):
98 """Go through the input data and generate two pandas series:
99 - input data with outliers replaced by NAN
101 The function uses IQR to detect outliers.
103 :param input_series: Data to be examined for outliers.
104 :param outlier_const: Outlier constant.
105 :param window: How many preceding values to take into account.
106 :type input_series: pandas.Series
107 :type outlier_const: float
109 :returns: Input data with NAN outliers and Outliers.
110 :rtype: (pandas.Series, pandas.Series)
113 list_data = list(input_series.items())
114 head_size = min(window, len(list_data))
115 head_list = list_data[:head_size]
116 trimmed_data = pd.Series()
117 outliers = pd.Series()
118 for item_x, item_y in head_list:
119 item_pd = pd.Series([item_y, ], index=[item_x, ])
120 trimmed_data = trimmed_data.append(item_pd)
121 for index, (item_x, item_y) in list(enumerate(list_data))[head_size:]:
122 y_rolling_list = [y for (x, y) in list_data[index - head_size:index]]
123 y_rolling_array = np.array(y_rolling_list)
124 q1 = np.percentile(y_rolling_array, 25)
125 q3 = np.percentile(y_rolling_array, 75)
126 iqr = (q3 - q1) * outlier_const
128 item_pd = pd.Series([item_y, ], index=[item_x, ])
130 trimmed_data = trimmed_data.append(item_pd)
132 outliers = outliers.append(item_pd)
133 nan_pd = pd.Series([np.nan, ], index=[item_x, ])
134 trimmed_data = trimmed_data.append(nan_pd)
136 return trimmed_data, outliers
139 def get_files(path, extension=None, full_path=True):
140 """Generates the list of files to process.
142 :param path: Path to files.
143 :param extension: Extension of files to process. If it is the empty string,
144 all files will be processed.
145 :param full_path: If True, the files with full path are generated.
148 :type full_path: bool
149 :returns: List of files to process.
154 for root, _, files in walk(path):
155 for filename in files:
157 if filename.endswith(extension):
159 file_list.append(join(root, filename))
161 file_list.append(filename)
163 file_list.append(join(root, filename))
168 def get_rst_title_char(level):
169 """Return character used for the given title level in rst files.
171 :param level: Level of the title.
173 :returns: Character used for the given title level in rst files.
176 chars = ('=', '-', '`', "'", '.', '~', '*', '+', '^')
177 if level < len(chars):
183 def execute_command(cmd):
184 """Execute the command in a subprocess and log the stdout and stderr.
186 :param cmd: Command to execute.
188 :returns: Return code of the executed command.
193 proc = subprocess.Popen(
195 stdout=subprocess.PIPE,
196 stderr=subprocess.PIPE,
200 stdout, stderr = proc.communicate()
207 if proc.returncode != 0:
208 logging.error(" Command execution failed.")
209 return proc.returncode, stdout, stderr
212 def get_last_successful_build_number(jenkins_url, job_name):
213 """Get the number of the last successful build of the given job.
215 :param jenkins_url: Jenkins URL.
216 :param job_name: Job name.
217 :type jenkins_url: str
219 :returns: The build number as a string.
223 url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
224 cmd = "wget -qO- {url}".format(url=url)
226 return execute_command(cmd)
229 def get_last_completed_build_number(jenkins_url, job_name):
230 """Get the number of the last completed build of the given job.
232 :param jenkins_url: Jenkins URL.
233 :param job_name: Job name.
234 :type jenkins_url: str
236 :returns: The build number as a string.
240 url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
241 cmd = "wget -qO- {url}".format(url=url)
243 return execute_command(cmd)
246 def archive_input_data(spec):
247 """Archive the report.
249 :param spec: Specification read from the specification file.
250 :type spec: Specification
251 :raises PresentationError: If it is not possible to archive the input data.
254 logging.info(" Archiving the input data files ...")
256 extension = spec.input["file-format"]
257 data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
259 dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
260 logging.info(" Destination: {0}".format(dst))
266 for data_file in data_files:
267 logging.info(" Moving the file: {0} ...".format(data_file))
270 except (Error, OSError) as err:
271 raise PresentationError("Not possible to archive the input data.",
274 logging.info(" Done.")
277 def classify_anomalies(data, window):
278 """Evaluates if the sample value is an outlier, regression, normal or
279 progression compared to the previous data within the window.
280 We use the intervals defined as:
281 - regress: less than trimmed moving median - 3 * stdev
282 - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev
283 - progress: more than trimmed moving median + 3 * stdev
284 where stdev is trimmed moving standard deviation.
286 :param data: Full data set with the outliers replaced by nan.
287 :param window: Window size used to calculate moving average and moving
289 :type data: pandas.Series
291 :returns: Evaluated results.
298 win_size = data.size if data.size < window else window
299 tmm = data.rolling(window=win_size, min_periods=2).median()
300 tmstd = data.rolling(window=win_size, min_periods=2).std()
302 classification = ["normal", ]
304 for build, value in data.iteritems():
308 if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]):
309 classification.append("outlier")
310 elif value < (tmm[build] - 3 * tmstd[build]):
311 classification.append("regression")
312 elif value > (tmm[build] + 3 * tmstd[build]):
313 classification.append("progression")
315 classification.append("normal")
316 return classification
319 class Worker(multiprocessing.Process):
320 """Worker class used to process tasks in separate parallel processes.
323 def __init__(self, work_queue, data_queue, func):
326 :param work_queue: Queue with items to process.
327 :param data_queue: Shared memory between processes. Queue which keeps
328 the result data. This data is then read by the main process and used
329 in further processing.
330 :param func: Function which is executed by the worker.
331 :type work_queue: multiprocessing.JoinableQueue
332 :type data_queue: multiprocessing.Manager().Queue()
333 :type func: Callable object
335 super(Worker, self).__init__()
336 self._work_queue = work_queue
337 self._data_queue = data_queue
341 """Method representing the process's activity.
346 self.process(self._work_queue.get())
348 self._work_queue.task_done()
350 def process(self, item_to_process):
351 """Method executed by the runner.
353 :param item_to_process: Data to be processed by the function.
354 :type item_to_process: tuple
356 self._func(self.pid, self._data_queue, *item_to_process)